# Spark ETL Job - Restaurant Weather Data Processing

This notebook implements a Spark ETL job that:
1. Reads restaurant data from CSV files
2. Checks for null/invalid latitude and longitude values
3. Geocodes missing coordinates using OpenCage Geocoding API
4. Generates 4-character geohashes for spatial matching
5. Left-joins restaurant data with weather data using geohash
6. Stores enriched data in Parquet format with partitioning

## Prerequisites
- Apache Spark installed locally
- Python packages: pyspark, python-geohash, requests, pandas

## 1. Setup and Imports

In [1]:
# Install required packages if not already installed
!pip install pyspark python-geohash requests pandas pyarrow -q

In [2]:
# Import required libraries
import os
import sys
import time
import builtins  # For Python's built-in min/max functions
from typing import Optional, Tuple

# Spark imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, udf, when, broadcast, first, coalesce, lit, count, avg,
    min as spark_min, max as spark_max  # Rename to avoid conflict with Python builtins
)
from pyspark.sql.types import (
    StringType, DoubleType, IntegerType, StructType, StructField
)

# Geohash library
import geohash as gh

# HTTP requests for geocoding
import requests

print("All imports successful!")

All imports successful!


In [3]:
# Configuration
BASE_PATH = os.getcwd()  # Current directory (Spark_Task folder)
RESTAURANT_DATA_PATH = os.path.join(BASE_PATH, "restaurant_csv")
WEATHER_DATA_PATH = os.path.join(BASE_PATH, "weather_data", "weather")  # Extracted weather parquet data
OUTPUT_PATH = os.path.join(BASE_PATH, "output", "enriched_data")

# Geohash precision (4 characters as per task requirements)
GEOHASH_PRECISION = 4

# OpenCage API Key (set your key here or as environment variable)
OPENCAGE_API_KEY = os.environ.get("OPENCAGE_API_KEY", 'cdbec992e2f64810a392f4cf2c6f2aa9')

print(f"Base path: {BASE_PATH}")
print(f"Restaurant data: {RESTAURANT_DATA_PATH}")
print(f"Weather data: {WEATHER_DATA_PATH}")
print(f"Output path: {OUTPUT_PATH}")
print(f"OpenCage API Key: {'Set' if OPENCAGE_API_KEY else 'Not set (geocoding will be skipped)'}")

Base path: /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task
Restaurant data: /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task/restaurant_csv
Weather data: /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task/weather_data/weather
Output path: /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task/output/enriched_data
OpenCage API Key: Set


## 2. Initialize Spark Session

In [4]:
# Create Spark Session
spark = (
    SparkSession.builder
    .appName("Restaurant_Weather_ETL")
    .master("local[*]")
    .config("spark.sql.parquet.compression.codec", "snappy")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/22 23:38:00 WARN Utils: Your hostname, Batyrkhans-MacBook-Pro-4.local, resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
25/12/22 23:38:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/22 23:38:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.1
Spark UI: http://172.20.10.2:4040


## 3. Define Helper Functions

### 3.1 Geohash Generation Function

In [5]:
def generate_geohash(lat: Optional[float], lng: Optional[float], precision: int = 4) -> Optional[str]:
    """
    Generate a geohash from latitude and longitude coordinates.
    
    Args:
        lat: Latitude coordinate
        lng: Longitude coordinate  
        precision: Geohash precision (default 4 characters)
        
    Returns:
        Geohash string of specified precision, or None if coordinates are invalid
    """
    if lat is None or lng is None:
        return None
    
    try:
        # Validate coordinate ranges
        if not (-90 <= lat <= 90) or not (-180 <= lng <= 180):
            return None
        
        # Clamp to open interval to avoid boundary errors
        # Using builtins.min/max to avoid conflict with PySpark functions
        eps = 1e-9
        safe_lat = builtins.min(builtins.max(lat, -90 + eps), 90 - eps)
        safe_lng = builtins.min(builtins.max(lng, -180 + eps), 180 - eps)
        
        return gh.encode(safe_lat, safe_lng, precision=precision)
    except (ValueError, TypeError):
        return None

# Test the function
print("Geohash examples (4-character precision):")
print(f"  New York (40.7128, -74.0060): {generate_geohash(40.7128, -74.0060)}")
print(f"  Paris (48.8566, 2.3522): {generate_geohash(48.8566, 2.3522)}")
print(f"  London (51.5074, -0.1278): {generate_geohash(51.5074, -0.1278)}")
print(f"  Milan (45.4642, 9.1900): {generate_geohash(45.4642, 9.1900)}")
print(f"  Null coordinates: {generate_geohash(None, None)}")
print(f"  Invalid latitude (91, 0): {generate_geohash(91, 0)}")

Geohash examples (4-character precision):
  New York (40.7128, -74.0060): dr5r
  Paris (48.8566, 2.3522): u09t
  London (51.5074, -0.1278): gcpv
  Milan (45.4642, 9.1900): u0nd
  Null coordinates: None
  Invalid latitude (91, 0): None


### 3.2 OpenCage Geocoding Function

In [6]:
class OpenCageGeocoder:
    """OpenCage Geocoding API client for resolving coordinates from addresses."""
    
    BASE_URL = "https://api.opencagedata.com/geocode/v1/json"
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key
        self._last_request_time = 0
    
    def _rate_limit(self):
        """Apply rate limiting (1 request per second for free tier)."""
        current_time = time.time()
        time_since_last = current_time - self._last_request_time
        if time_since_last < 1.0:
            time.sleep(1.0 - time_since_last)
        self._last_request_time = time.time()
    
    def geocode(self, city: str, country: str) -> Tuple[Optional[float], Optional[float]]:
        """
        Geocode a city and country to get latitude and longitude.
        
        Returns:
            Tuple of (latitude, longitude) or (None, None) if geocoding fails
        """
        if not self.api_key:
            return (None, None)
        
        self._rate_limit()
        query = f"{city}, {country}"
        
        params = {
            "q": query,
            "key": self.api_key,
            "limit": 1,
            "no_annotations": 1
        }
        
        try:
            response = requests.get(self.BASE_URL, params=params, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            
            if data.get("results") and len(data["results"]) > 0:
                geometry = data["results"][0].get("geometry", {})
                return (geometry.get("lat"), geometry.get("lng"))
            
            return (None, None)
            
        except Exception as e:
            print(f"Geocoding error for {query}: {e}")
            return (None, None)

# Initialize geocoder
geocoder = OpenCageGeocoder(OPENCAGE_API_KEY)
print("Geocoder initialized")

Geocoder initialized


### 3.3 Register Spark UDF for Geohash

In [7]:
# Register geohash UDF for use in Spark DataFrames
# Note: We import builtins inside the UDF to avoid serialization issues with PySpark min/max

@udf(StringType())
def geohash_udf(lat, lng):
    """Spark UDF for generating 4-character geohash."""
    import builtins as bi  # Import inside UDF to get Python's built-in min/max
    import geohash as gh_local  # Re-import inside UDF for serialization
    
    if lat is None or lng is None:
        return None
    try:
        if not (-90 <= lat <= 90) or not (-180 <= lng <= 180):
            return None
        eps = 1e-9
        safe_lat = bi.min(bi.max(lat, -90 + eps), 90 - eps)
        safe_lng = bi.min(bi.max(lng, -180 + eps), 180 - eps)
        return gh_local.encode(safe_lat, safe_lng, precision=4)
    except:
        return None

print("Geohash UDF registered successfully")

Geohash UDF registered successfully


## 4. Read Restaurant Data

In [8]:
# Read restaurant data from CSV files
restaurant_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(RESTAURANT_DATA_PATH)
)

# Ensure correct column types
restaurant_df = restaurant_df.select(
    col("id").cast("long"),
    col("franchise_id").cast("int"),
    col("franchise_name").cast("string"),
    col("restaurant_franchise_id").cast("int"),
    col("country").cast("string"),
    col("city").cast("string"),
    col("lat").cast("double"),
    col("lng").cast("double")
)

print(f"Total restaurant records: {restaurant_df.count()}")
print("\nSchema:")
restaurant_df.printSchema()

Total restaurant records: 1997

Schema:
root
 |-- id: long (nullable = true)
 |-- franchise_id: integer (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)



In [9]:
# Display sample restaurant data
print("Sample restaurant data:")
restaurant_df.show(10, truncate=False)

Sample restaurant data:
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|id          |franchise_id|franchise_name      |restaurant_franchise_id|country|city          |lat   |lng    |
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|197568495625|10          |The Golden Spoon    |24784                  |US     |Decatur       |34.578|-87.021|
|17179869242 |59          |Azalea Cafe         |10902                  |FR     |Paris         |48.861|2.368  |
|214748364826|27          |The Corner Cafe     |92040                  |US     |Rapid City    |44.08 |-103.25|
|154618822706|51          |The Pizzeria        |41484                  |AT     |Vienna        |48.213|16.413 |
|163208757312|65          |Chef's Corner       |96638                  |GB     |London        |51.495|-0.191 |
|68719476763 |28          |The Spicy Pickle    |77517                  |US     |Grayling

In [10]:
# Show statistics for coordinates
print("Coordinate statistics:")
restaurant_df.select("lat", "lng").describe().show()

Coordinate statistics:
+-------+-----------------+------------------+
|summary|              lat|               lng|
+-------+-----------------+------------------+
|  count|             1996|              1996|
|   mean|43.99021893787576|-34.52430861723447|
| stddev|7.142288389673525| 49.89812670573716|
|    min|          -25.437|          -159.481|
|    max|             58.3|           115.164|
+-------+-----------------+------------------+



## 5. Check for Null/Invalid Coordinates

In [11]:
# Identify records with null coordinates
null_coords_df = restaurant_df.filter(
    col("lat").isNull() | col("lng").isNull()
)

null_count = null_coords_df.count()
print(f"Records with NULL coordinates: {null_count}")

if null_count > 0:
    print("\nSample records with NULL coordinates:")
    null_coords_df.show(10, truncate=False)

Records with NULL coordinates: 1

Sample records with NULL coordinates:
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|id         |franchise_id|franchise_name|restaurant_franchise_id|country|city  |lat |lng |
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|1           |Savoria       |18952                  |US     |Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [12]:
# Identify records with invalid coordinates (outside valid range)
invalid_coords_df = restaurant_df.filter(
    (col("lat").isNotNull() & col("lng").isNotNull()) &
    (
        (col("lat") < -90) | (col("lat") > 90) |
        (col("lng") < -180) | (col("lng") > 180)
    )
)

invalid_count = invalid_coords_df.count()
print(f"Records with INVALID coordinates (out of range): {invalid_count}")

if invalid_count > 0:
    print("\nSample records with invalid coordinates:")
    invalid_coords_df.select("id", "franchise_name", "city", "country", "lat", "lng").show(20, truncate=False)

Records with INVALID coordinates (out of range): 0


In [13]:
# Summary of data quality issues
total_records = restaurant_df.count()
valid_records = total_records - null_count - invalid_count

print("=" * 50)
print("DATA QUALITY SUMMARY")
print("=" * 50)
print(f"Total records:              {total_records:,}")
print(f"Valid coordinates:          {valid_records:,} ({100*valid_records/total_records:.1f}%)")
print(f"NULL coordinates:           {null_count:,} ({100*null_count/total_records:.1f}%)")
print(f"Invalid coordinates:        {invalid_count:,} ({100*invalid_count/total_records:.1f}%)")
print("=" * 50)

DATA QUALITY SUMMARY
Total records:              1,997
Valid coordinates:          1,996 (99.9%)
NULL coordinates:           1 (0.1%)
Invalid coordinates:        0 (0.0%)


## 6. Geocode Missing Coordinates

This step uses the OpenCage Geocoding API to fill in missing coordinates based on city and country.

**Note:** Requires OPENCAGE_API_KEY environment variable to be set.

In [14]:
def geocode_missing_coordinates(df: DataFrame, geocoder: OpenCageGeocoder) -> DataFrame:
    """
    Fill in missing coordinates using OpenCage Geocoding API.
    """
    if not geocoder.api_key:
        print("Warning: No OpenCage API key provided. Skipping geocoding.")
        return df
    
    # Get records needing geocoding
    needs_geocoding = df.filter(col("lat").isNull() | col("lng").isNull())
    
    if needs_geocoding.count() == 0:
        print("No records need geocoding.")
        return df
    
    # Get unique city/country combinations
    unique_locations = (
        needs_geocoding
        .select("city", "country")
        .distinct()
        .collect()
    )
    
    print(f"Geocoding {len(unique_locations)} unique locations...")
    
    # Geocode each unique location
    geocoded_data = []
    for i, row in enumerate(unique_locations):
        lat, lng = geocoder.geocode(row.city, row.country)
        geocoded_data.append((row.city, row.country, lat, lng))
        if (i + 1) % 10 == 0:
            print(f"  Processed {i + 1}/{len(unique_locations)} locations")
    
    # Create lookup DataFrame
    geocoded_schema = StructType([
        StructField("lookup_city", StringType(), True),
        StructField("lookup_country", StringType(), True),
        StructField("geocoded_lat", DoubleType(), True),
        StructField("geocoded_lng", DoubleType(), True),
    ])
    
    geocoded_df = spark.createDataFrame(geocoded_data, geocoded_schema)
    
    # Join and coalesce coordinates
    result = df.join(
        broadcast(geocoded_df),
        (df.city == geocoded_df.lookup_city) & (df.country == geocoded_df.lookup_country),
        "left"
    ).select(
        df.id,
        df.franchise_id,
        df.franchise_name,
        df.restaurant_franchise_id,
        df.country,
        df.city,
        coalesce(df.lat, geocoded_df.geocoded_lat).alias("lat"),
        coalesce(df.lng, geocoded_df.geocoded_lng).alias("lng")
    )
    
    print("Geocoding complete!")
    return result

In [15]:
# Apply geocoding if API key is available
if OPENCAGE_API_KEY and null_count > 0:
    print("Geocoding missing coordinates...")
    restaurant_df = geocode_missing_coordinates(restaurant_df, geocoder)
    
    # Check remaining null coordinates
    remaining_nulls = restaurant_df.filter(col("lat").isNull() | col("lng").isNull()).count()
    print(f"Remaining records with null coordinates: {remaining_nulls}")
else:
    if not OPENCAGE_API_KEY:
        print("Skipping geocoding - no API key provided")
        print("To enable geocoding, set: export OPENCAGE_API_KEY=your_key")
    else:
        print("No null coordinates to geocode")

Geocoding missing coordinates...
Geocoding 1 unique locations...
Geocoding complete!


                                                                                

Remaining records with null coordinates: 0


## 7. Generate Geohash Column

Generate a 4-character geohash for each restaurant based on its coordinates.

**Why 4 characters?**
- 4-character geohashes represent areas of approximately 39km x 20km
- This provides good granularity for matching restaurants with nearby weather stations
- Balances precision with join efficiency

In [16]:
# Add geohash column to restaurant data
restaurant_with_geohash = restaurant_df.withColumn(
    "geohash",
    geohash_udf(col("lat"), col("lng"))
)

print("Added geohash column to restaurant data")
print("\nSample data with geohash:")
restaurant_with_geohash.select(
    "id", "franchise_name", "city", "country", "lat", "lng", "geohash"
).show(15, truncate=False)

Added geohash column to restaurant data

Sample data with geohash:
+------------+--------------------+--------------+-------+------+-------+-------+
|id          |franchise_name      |city          |country|lat   |lng    |geohash|
+------------+--------------------+--------------+-------+------+-------+-------+
|197568495625|The Golden Spoon    |Decatur       |US     |34.578|-87.021|dn4h   |
|17179869242 |Azalea Cafe         |Paris         |FR     |48.861|2.368  |u09t   |
|214748364826|The Corner Cafe     |Rapid City    |US     |44.08 |-103.25|9xyd   |
|154618822706|The Pizzeria        |Vienna        |AT     |48.213|16.413 |u2ed   |
|163208757312|Chef's Corner       |London        |GB     |51.495|-0.191 |gcpu   |
|68719476763 |The Spicy Pickle    |Grayling      |US     |44.657|-84.744|dpgw   |
|223338299419|The Spicy Pickle    |Oswego        |US     |43.452|-76.532|dr9x   |
|240518168650|Greenhouse Cafe     |Amsterdam     |NL     |52.37 |4.897  |u173   |
|128849018936|The Yellow Submar

                                                                                

In [17]:
# Verify geohash generation
geohash_stats = restaurant_with_geohash.agg(
    count("*").alias("total_records"),
    count("geohash").alias("records_with_geohash"),
    (count("*") - count("geohash")).alias("records_without_geohash")
)

print("Geohash generation statistics:")
geohash_stats.show()

Geohash generation statistics:
+-------------+--------------------+-----------------------+
|total_records|records_with_geohash|records_without_geohash|
+-------------+--------------------+-----------------------+
|         1997|                1997|                      0|
+-------------+--------------------+-----------------------+



In [18]:
# Show unique geohash count
unique_geohashes = restaurant_with_geohash.select("geohash").distinct().count()
print(f"Unique geohashes in restaurant data: {unique_geohashes}")

# Show top geohashes by count
print("\nTop 10 geohashes by restaurant count:")
restaurant_with_geohash.groupBy("geohash").count().orderBy(col("count").desc()).show(10)

Unique geohashes in restaurant data: 692

Top 10 geohashes by restaurant count:
+-------+-----+
|geohash|count|
+-------+-----+
|   gcpv|  208|
|   u09t|  182|
|   sp3e|  171|
|   u09w|  169|
|   u0nd|  122|
|   u2ed|  116|
|   gcpu|  101|
|   u173|   61|
|   u179|    8|
|   u0n6|    8|
+-------+-----+
only showing top 10 rows


## 8. Read Weather Data

Read the weather data from the extracted parquet files.

**Weather Data Schema:**
- `lng`: Longitude
- `lat`: Latitude
- `avg_tmpr_f`: Average temperature in Fahrenheit
- `avg_tmpr_c`: Average temperature in Celsius
- `wthr_date`: Weather observation date
- `year`, `month`, `day`: Partition columns

In [19]:
# Read weather data from parquet files
print(f"Reading weather data from: {WEATHER_DATA_PATH}")

weather_df = spark.read.parquet(WEATHER_DATA_PATH)

print(f"\nTotal weather records: {weather_df.count():,}")
print("\nWeather data schema:")
weather_df.printSchema()

Reading weather data from: /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task/weather_data/weather

Total weather records: 112,394,743

Weather data schema:
root
 |-- lng: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- avg_tmpr_f: double (nullable = true)
 |-- avg_tmpr_c: double (nullable = true)
 |-- wthr_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



In [20]:
# Display sample weather data
print("Sample weather data:")
weather_df.show(10, truncate=False)

Sample weather data:
+--------+-------+----------+----------+----------+----+-----+---+
|lng     |lat    |avg_tmpr_f|avg_tmpr_c|wthr_date |year|month|day|
+--------+-------+----------+----------+----------+----+-----+---+
|-111.09 |18.6251|80.7      |27.1      |2017-08-29|2017|8    |29 |
|-111.042|18.6305|80.7      |27.1      |2017-08-29|2017|8    |29 |
|-110.995|18.6358|80.7      |27.1      |2017-08-29|2017|8    |29 |
|-110.947|18.6412|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-110.9  |18.6465|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-110.852|18.6518|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-110.804|18.6571|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-105.068|19.1765|82.4      |28.0      |2017-08-29|2017|8    |29 |
|-105.02 |19.1799|82.0      |27.8      |2017-08-29|2017|8    |29 |
|-104.972|19.1832|82.0      |27.8      |2017-08-29|2017|8    |29 |
+--------+-------+----------+----------+----------+----+-----+---+
only showing top 10 rows


In [21]:
# Weather data statistics
print("Weather data statistics:")
weather_df.select("lat", "lng", "avg_tmpr_c", "avg_tmpr_f").describe().show()

Weather data statistics:


25/12/22 23:38:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+------------------+------------------+-----------------+
|summary|               lat|               lng|        avg_tmpr_c|       avg_tmpr_f|
+-------+------------------+------------------+------------------+-----------------+
|  count|         112394743|         112394743|         112394743|        112394743|
|   mean|  36.0821450186546|-39.06072982875904|15.324629014899868|59.58432871365851|
| stddev|26.116940502760713| 88.74779671660777|10.409133429838565|18.73636861533826|
|    min|          -59.7956|          -179.796|             -43.3|            -46.0|
|    max|           85.5535|             180.0|              44.2|            111.5|
+-------+------------------+------------------+------------------+-----------------+



                                                                                

In [22]:
# Check date range in weather data
print("Weather data date range:")
weather_df.select(
    spark_min("wthr_date").alias("min_date"),
    spark_max("wthr_date").alias("max_date")
).show()

Weather data date range:




+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2016-10-01|2017-09-30|
+----------+----------+



                                                                                

### 8.1 Add Geohash to Weather Data

In [23]:
# Add geohash column to weather data
print("Adding geohash column to weather data...")

weather_with_geohash = weather_df.withColumn(
    "geohash",
    geohash_udf(col("lat"), col("lng"))
)

print("Sample weather data with geohash:")
weather_with_geohash.select("lat", "lng", "geohash", "avg_tmpr_c", "wthr_date").show(10)

Adding geohash column to weather data...
Sample weather data with geohash:
+-------+--------+-------+----------+----------+
|    lat|     lng|geohash|avg_tmpr_c| wthr_date|
+-------+--------+-------+----------+----------+
|18.6251| -111.09|   9e31|      27.1|2017-08-29|
|18.6305|-111.042|   9e31|      27.1|2017-08-29|
|18.6358|-110.995|   9e34|      27.1|2017-08-29|
|18.6412|-110.947|   9e34|      27.2|2017-08-29|
|18.6465|  -110.9|   9e34|      27.2|2017-08-29|
|18.6518|-110.852|   9e34|      27.2|2017-08-29|
|18.6571|-110.804|   9e34|      27.2|2017-08-29|
|19.1765|-105.068|   9emm|      28.0|2017-08-29|
|19.1799| -105.02|   9emm|      27.8|2017-08-29|
|19.1832|-104.972|   9emm|      27.8|2017-08-29|
+-------+--------+-------+----------+----------+
only showing top 10 rows


In [24]:
# Count unique geohashes in weather data
weather_unique_geohashes = weather_with_geohash.select("geohash").distinct().count()
print(f"Unique geohashes in weather data: {weather_unique_geohashes:,}")



Unique geohashes in weather data: 346,655


                                                                                

## 9. Left Join Restaurant and Weather Data

Join the restaurant data with weather data using the 4-character geohash.

**Important considerations:**
- Use LEFT JOIN to keep all restaurant records
- Deduplicate weather data by geohash to avoid data multiplication
- Ensure idempotency (same input → same output)

In [25]:
# Deduplicate weather data by geohash to prevent data multiplication
# Aggregate weather metrics by geohash (average values across all dates and locations with same geohash)

print("Deduplicating weather data by geohash...")

weather_dedupe = (
    weather_with_geohash
    .groupBy("geohash")
    .agg(
        first("lat", ignorenulls=True).alias("weather_lat"),
        first("lng", ignorenulls=True).alias("weather_lng"),
        avg("avg_tmpr_f").alias("weather_avg_tmpr_f"),
        avg("avg_tmpr_c").alias("weather_avg_tmpr_c"),
        first("wthr_date", ignorenulls=True).alias("weather_wthr_date")
    )
)

# Rename geohash to avoid collision
weather_dedupe = weather_dedupe.withColumnRenamed("geohash", "weather_geohash")

print(f"Weather records before deduplication: {weather_with_geohash.count():,}")
print(f"Weather records after deduplication (unique geohashes): {weather_dedupe.count():,}")

Deduplicating weather data by geohash...
Weather records before deduplication: 112,394,743




Weather records after deduplication (unique geohashes): 346,655


                                                                                

In [26]:
# Check for matching geohashes between restaurant and weather data
restaurant_geohashes = set(restaurant_with_geohash.select("geohash").distinct().rdd.flatMap(lambda x: x).collect())
weather_geohashes = set(weather_dedupe.select("weather_geohash").distinct().rdd.flatMap(lambda x: x).collect())

matching_geohashes = restaurant_geohashes.intersection(weather_geohashes)

print(f"Unique geohashes in restaurants: {len(restaurant_geohashes)}")
print(f"Unique geohashes in weather: {len(weather_geohashes)}")
print(f"Matching geohashes: {len(matching_geohashes)}")
print(f"\nSample matching geohashes: {list(matching_geohashes)[:10]}")



Unique geohashes in restaurants: 692
Unique geohashes in weather: 346655
Matching geohashes: 685

Sample matching geohashes: ['9ysv', '9q96', 'c2kx', '9wst', 'w4ru', 'djbb', '9zpp', 'dpdg', '87zc', '9vfg']


                                                                                

In [27]:
# Perform LEFT JOIN
print("Performing left join on geohash...")

enriched_df = restaurant_with_geohash.join(
    weather_dedupe,
    restaurant_with_geohash.geohash == weather_dedupe.weather_geohash,
    "left"
).drop("weather_geohash")

print(f"\nRecords before join: {restaurant_with_geohash.count()}")
print(f"Records after join: {enriched_df.count()}")
print("\n✓ Record count preserved (no data multiplication)")

Performing left join on geohash...

Records before join: 1997
Records after join: 1997

✓ Record count preserved (no data multiplication)


In [28]:
# Display sample enriched data
print("Sample enriched data (restaurant + weather):")
enriched_df.select(
    "id", "franchise_name", "city", "country", "geohash",
    "weather_avg_tmpr_c", "weather_avg_tmpr_f", "weather_wthr_date"
).show(15, truncate=False)

Sample enriched data (restaurant + weather):


                                                                                

+------------+---------------------+----------+-------+-------+------------------+------------------+-----------------+
|id          |franchise_name       |city      |country|geohash|weather_avg_tmpr_c|weather_avg_tmpr_f|weather_wthr_date|
+------------+---------------------+----------+-------+-------+------------------+------------------+-----------------+
|120259084300|The Firehouse        |Branson   |US     |9yt8   |21.164558232931736|70.09377510040161 |2017-08-29       |
|103079215121|The Harvest Room     |Mount Airy|US     |dnqx   |19.375336322869963|66.87460252751735 |2017-08-29       |
|197568495640|The Cozy Cafe        |Oskaloosa |US     |9zq5   |18.437726449275356|65.18727355072461 |2017-09-07       |
|197568495625|The Golden Spoon     |Decatur   |US     |dn4h   |22.17667224080268 |71.91659698996655 |2017-09-07       |
|171798691904|Chef's Corner        |London    |GB     |gcpv   |13.345652173913043|56.02065217391305 |2017-08-13       |
|171798691900|Blue Water Bistro    |Lond

In [29]:
# Check join statistics
matched = enriched_df.filter(col("weather_avg_tmpr_c").isNotNull()).count()
unmatched = enriched_df.filter(col("weather_avg_tmpr_c").isNull()).count()

print("=" * 50)
print("JOIN STATISTICS")
print("=" * 50)
print(f"Total records:        {enriched_df.count():,}")
print(f"Matched with weather: {matched:,} ({100*matched/enriched_df.count():.1f}%)")
print(f"No weather match:     {unmatched:,} ({100*unmatched/enriched_df.count():.1f}%)")
print("=" * 50)

                                                                                

JOIN STATISTICS
Total records:        1,997
Matched with weather: 1,875 (93.9%)
No weather match:     122 (6.1%)


In [30]:
# Show final schema
print("\nFinal enriched data schema:")
enriched_df.printSchema()


Final enriched data schema:
root
 |-- id: long (nullable = true)
 |-- franchise_id: integer (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- geohash: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lng: double (nullable = true)
 |-- weather_avg_tmpr_f: double (nullable = true)
 |-- weather_avg_tmpr_c: double (nullable = true)
 |-- weather_wthr_date: string (nullable = true)



## 10. Save Enriched Data to Parquet

Store the enriched data in Parquet format with partitioning by country.

In [31]:
# Write enriched data to Parquet with partitioning
print(f"Writing enriched data to: {OUTPUT_PATH}")

(
    enriched_df
    .write
    .mode("overwrite")
    .partitionBy("country")
    .parquet(OUTPUT_PATH)
)

print("\n✓ Data written successfully!")

Writing enriched data to: /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task/output/enriched_data


[Stage 135:>                                                      (0 + 10) / 11]


✓ Data written successfully!


                                                                                

In [32]:
# Verify output
print("Output directory structure:")
for item in sorted(os.listdir(OUTPUT_PATH)):
    item_path = os.path.join(OUTPUT_PATH, item)
    if os.path.isdir(item_path):
        files = len([f for f in os.listdir(item_path) if f.endswith('.parquet')])
        print(f"  {item}/ ({files} parquet files)")
    else:
        print(f"  {item}")

Output directory structure:
  ._SUCCESS.crc
  _SUCCESS
  country=AT/ (2 parquet files)
  country=ES/ (1 parquet files)
  country=FR/ (2 parquet files)
  country=GB/ (4 parquet files)
  country=IT/ (3 parquet files)
  country=NL/ (3 parquet files)
  country=US/ (11 parquet files)


In [33]:
# Read back and verify
print("\nVerifying saved data...")
verified_df = spark.read.parquet(OUTPUT_PATH)

print(f"Records read back: {verified_df.count()}")
print(f"Original records: {enriched_df.count()}")
print(f"\n✓ Data integrity verified: {verified_df.count() == enriched_df.count()}")


Verifying saved data...
Records read back: 1997
Original records: 1997

✓ Data integrity verified: True


## 11. Summary and Statistics

In [34]:
# Final summary
print("=" * 60)
print("ETL PIPELINE SUMMARY")
print("=" * 60)
print(f"")
print(f"INPUT:")
print(f"  Restaurant records:     {restaurant_df.count():,}")
print(f"  Weather records:        {weather_df.count():,}")
print(f"")
print(f"DATA QUALITY:")
print(f"  Null coordinates:       {null_count:,}")
print(f"  Invalid coordinates:    {invalid_count:,}")
print(f"")
print(f"GEOHASH:")
print(f"  Precision:              {GEOHASH_PRECISION} characters")
print(f"  Restaurant geohashes:   {unique_geohashes:,}")
print(f"  Weather geohashes:      {weather_unique_geohashes:,}")
print(f"  Matching geohashes:     {len(matching_geohashes):,}")
print(f"")
print(f"JOIN RESULTS:")
print(f"  Total enriched records: {enriched_df.count():,}")
print(f"  With weather data:      {matched:,} ({100*matched/enriched_df.count():.1f}%)")
print(f"  Without weather data:   {unmatched:,}")
print(f"")
print(f"OUTPUT:")
print(f"  Format:                 Parquet (Snappy compression)")
print(f"  Partitioning:           By country")
print(f"  Location:               {OUTPUT_PATH}")
print("=" * 60)

ETL PIPELINE SUMMARY

INPUT:
  Restaurant records:     1,997
  Weather records:        112,394,743

DATA QUALITY:
  Null coordinates:       1
  Invalid coordinates:    0

GEOHASH:
  Precision:              4 characters
  Restaurant geohashes:   692
  Weather geohashes:      346,655
  Matching geohashes:     685

JOIN RESULTS:
  Total enriched records: 1,997
  With weather data:      1,875 (93.9%)
  Without weather data:   122

OUTPUT:
  Format:                 Parquet (Snappy compression)
  Partitioning:           By country
  Location:               /Users/batyagg/Projects/EPAM_Data_Engineering/Spark_Task/output/enriched_data


In [35]:
# Records per country
print("\nRecords by country:")
enriched_df.groupBy("country").count().orderBy(col("count").desc()).show()


Records by country:
+-------+-----+
|country|count|
+-------+-----+
|     US|  835|
|     FR|  347|
|     GB|  317|
|     ES|  171|
|     IT|  130|
|     AT|  121|
|     NL|   76|
+-------+-----+



In [36]:
# Average temperature by country (for matched records)
print("\nAverage temperature by country (matched records only):")
(
    enriched_df
    .filter(col("weather_avg_tmpr_c").isNotNull())
    .groupBy("country")
    .agg(
        count("*").alias("record_count"),
        avg("weather_avg_tmpr_c").alias("avg_temperature_c")
    )
    .orderBy(col("record_count").desc())
    .show()
)


Average temperature by country (matched records only):




+-------+------------+------------------+
|country|record_count| avg_temperature_c|
+-------+------------+------------------+
|     US|         829|19.258809987070315|
|     FR|         347|14.061344129808282|
|     GB|         317|13.353674050198899|
|     ES|         171|21.233152173913012|
|     IT|         130|17.223269230769265|
|     NL|          76|14.080499141876443|
|     AT|           5|15.106521739130434|
+-------+------------+------------------+



                                                                                

## 12. Idempotency Test

Verify that running the ETL multiple times produces the same result.

In [37]:
# Test idempotency - run the write again
print("Testing idempotency...")

# First run already done, read result
result1 = spark.read.parquet(OUTPUT_PATH)
count1 = result1.count()

# Second run
enriched_df.write.mode("overwrite").partitionBy("country").parquet(OUTPUT_PATH)
result2 = spark.read.parquet(OUTPUT_PATH)
count2 = result2.count()

print(f"First run count:  {count1}")
print(f"Second run count: {count2}")
print(f"\n✓ Idempotency verified: {count1 == count2}")

Testing idempotency...


                                                                                

First run count:  1997
Second run count: 1997

✓ Idempotency verified: True


## 13. Cleanup

In [38]:
# Stop Spark session
# Uncomment to stop the session when done
# spark.stop()
# print("Spark session stopped")