In [32]:
pip install geohash2

Collecting geohash2
  Downloading geohash2-1.1.tar.gz (15 kB)
  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: geohash2
  Building wheel for geohash2 (setup.py) ... [?25ldone
[?25h  Created wheel for geohash2: filename=geohash2-1.1-py3-none-any.whl size=15543 sha256=b14b381a351207a859d61325d15ca6e0e08301a967b79cf388ef5f37a7c04042
  Stored in directory: /home/jovyan/.cache/pip/wheels/f6/7c/c4/1b3c6fea0ebc53bf730dc86bbee7a713d501455dfb4c1f0623
Successfully built geohash2
Installing collected packages: geohash2
Successfully installed geohash2-1.1
Note: you may need to restart the kernel to use updated packages.


In [74]:
import os
import time
import requests
import geohash2
from pyspark.sql import functions as func

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, udf, avg, first
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

## Dealing with missing values and geohash in restaurants data
Location of output and input files:
```python
input_file = "./in_data/restaurant_csv/"  
output_file_coord = "./out_data/restaurant_with_coordinates_csv"  
output_file_geohash = "./out_data/restaurant_with_geohash_csv"
```
**process_restaurant_data** - main function for processing the restaurant data

**get_coordinates_udf** - user defined functions for getting coordinates by REST API 

**generate_geohash** - user defined functions for getting geohash

In [43]:
# Сheck and update restaurant data
def process_restaurant_data(input_file, output_file_coord, output_file_geohash, spark):
    
    # Load restaurant data into DataFrame
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("franchise_id", StringType(), True),
        StructField("franchise_name", StringType(), True),
        StructField("restaurant_franchise_id", StringType(), True),
        StructField("country", StringType(), True),
        StructField("city", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lng", DoubleType(), True)
    ])

    restaurants_df = spark.read.option("header", "true").schema(schema).csv(os.path.join(input_file, "*.csv"))#spark.read.csv(input_file, header=True, schema=schema) 

    print("Schema: ")
    restaurants_df.printSchema()

    print("Restaurant data:")
    restaurants_df.show()

    print(f"Data with {restaurants_df.count()} rows.")
    
    # Split the DataFrame into two: existing coordinates and missing coordinates
    df_existing_coords = restaurants_df.filter((col('lat').isNotNull()) & (col('lng').isNotNull()))
    df_missing_coords = restaurants_df.filter((col('lat').isNull()) | (col('lng').isNull()))

    print("Data with correct values: ")
    df_existing_coords.show()
    print(f"Found {df_existing_coords.count()} rows with correct latitude or longitude.")
    
    print("Data with incorrect values: ")
    df_missing_coords.show()
    print(f"Found {df_missing_coords.count()} rows with missing latitude or longitude.")

    # Apply the UDF to the incorrect rows to get the coordinates with API
    df_missing_coords = df_missing_coords.withColumn(
        'coordinates', get_coordinates_udf(col('country'), col('city'))
    )
    
    # Split the coordinates into separate latitude and longitude columns
    df_missing_coords = df_missing_coords.withColumn(
        'lat', df_missing_coords['coordinates']['lat']
    ).withColumn(
        'lng', df_missing_coords['coordinates']['lng']
    )

    df_missing_coords = df_missing_coords.drop('coordinates')
    df_missing_coords.show()
    
    # Replace the missing coordinates in the original DataFrame
    df_final_coords = df_existing_coords.unionByName(df_missing_coords)
    df_final_coords.show()
    print(f"Final DataFrame with coordinates: {df_final_coords.count()}.")
    
    df_final_coords.write.csv(output_file_coord, header=True)

    df_final_geohash = df_final_coords.withColumn("geohash", generate_geohash(col("lat"), col("lng")))
    df_final_geohash.show()
    print(f"Final DataFrame with geohash: {df_final_geohash.count()}.")

    df_final_geohash.write.csv(output_file_geohash, header=True)
    
# udf to fetch coordinates for missing values
@udf(returnType=StructType([
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True)
]))
def get_coordinates_udf(city, country):

    api_key = "f16afc83d85d455a908b89b803c159a2"
    
    if not city or not country:
        return None, None

    url = f'https://api.opencagedata.com/geocode/v1/json?q={city},{country}&key={api_key}'

    try:
        # Get request with API and parse the response
        response = requests.get(url).json()

        # Check if there are valid results in the response
        if response['results']:
            lat = response['results'][0]['geometry']['lat']
            lng = response['results'][0]['geometry']['lng']
            return lat, lng
        else:
            return None, None
            
    except Exception as e:
        print(f"Error occurred for {city}, {country}: {e}")
        return None, None  

# udf to generate a geohash (4 characters long)
@udf(returnType=StringType())
def generate_geohash(lat, lng):
    if lat is not None and lng is not None:
        geohash_value = geohash2.encode(lat, lng, precision=4)  
        return geohash_value
    return None
    
if __name__ == "__main__":

    input_file = "./in_data/restaurant_csv/"  
    output_file_coord = "./out_data/restaurant_with_coordinates_csv"  
    output_file_geohash = "./out_data/restaurant_with_geohash_csv" 
    
    # Initialize Spark
    spark = SparkSession.builder \
        .appName("GeocodeMissingLatLon") \
        .getOrCreate()
    
    process_restaurant_data(input_file, output_file_coord, output_file_geohash, spark)

Schema: 
root
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)

Restaurant data:
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid

## Dealing with weather dataset
* Get data for October 2016
* Get geohash for weather dataset

In [77]:

# Data for October 2016
directories = ["./in_data/1_October_1-4/weather/year=2016/month=10/day=01",
                   "./in_data/1_October_1-4/weather/year=2016/month=10/day=02",
                   "./in_data/1_October_1-4/weather/year=2016/month=10/day=03",
                   "./in_data/1_October_1-4/weather/year=2016/month=10/day=04",
                   "./in_data/2_October_5-8/weather/year=2016/month=10/day=05",
                   "./in_data/2_October_5-8/weather/year=2016/month=10/day=06",
                   "./in_data/2_October_5-8/weather/year=2016/month=10/day=07",
                   "./in_data/2_October_5-8/weather/year=2016/month=10/day=08",
                   "./in_data/3_October_9-12/weather/year=2016/month=10/day=09",
                   "./in_data/3_October_9-12/weather/year=2016/month=10/day=10",
                   "./in_data/3_October_9-12/weather/year=2016/month=10/day=11",
                   "./in_data/3_October_9-12/weather/year=2016/month=10/day=12",
                   "./in_data/4_October_13-16/weather/year=2016/month=10/day=13",
                   "./in_data/4_October_13-16/weather/year=2016/month=10/day=14",
                   "./in_data/4_October_13-16/weather/year=2016/month=10/day=15",
                   "./in_data/4_October_13-16/weather/year=2016/month=10/day=16",
                   "./in_data/5_October_17-20/weather/year=2016/month=10/day=17",
                   "./in_data/5_October_17-20/weather/year=2016/month=10/day=18",
                   "./in_data/5_October_17-20/weather/year=2016/month=10/day=19",
                   "./in_data/5_October_17-20/weather/year=2016/month=10/day=20",
                   "./in_data/6_October_21-24/weather/year=2016/month=10/day=21",
                   "./in_data/6_October_21-24/weather/year=2016/month=10/day=22",
                   "./in_data/6_October_21-24/weather/year=2016/month=10/day=23",
                   "./in_data/6_October_21-24/weather/year=2016/month=10/day=24",
                   "./in_data/7_October_25-28/weather/year=2016/month=10/day=25",
                   "./in_data/7_October_25-28/weather/year=2016/month=10/day=26",
                   "./in_data/7_October_25-28/weather/year=2016/month=10/day=27",
                   "./in_data/7_October_25-28/weather/year=2016/month=10/day=28",
                   "./in_data/8_October_29-31/weather/year=2016/month=10/day=29",
                   "./in_data/8_October_29-31/weather/year=2016/month=10/day=30",
                   "./in_data/8_October_29-31/weather/year=2016/month=10/day=31"
                  ]

# Initialize Spark
spark = SparkSession.builder \
    .appName("ReadWeatherParquet") \
    .getOrCreate()

# Read data from directories
weather_october_df = spark.read.parquet(*directories)

In [78]:
weather_october_df.show()

+--------+-------+----------+----------+----------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+--------+-------+----------+----------+----------+
|-111.202|18.7496|      82.7|      28.2|2016-10-12|
|-111.155| 18.755|      82.7|      28.2|2016-10-12|
|-111.107|18.7604|      82.7|      28.2|2016-10-12|
|-111.059|18.7657|      82.5|      28.1|2016-10-12|
|-111.012|18.7711|      82.5|      28.1|2016-10-12|
|-110.964|18.7764|      82.4|      28.0|2016-10-12|
|-110.916|18.7818|      82.3|      27.9|2016-10-12|
|-110.869|18.7871|      82.4|      28.0|2016-10-12|
|-110.821|18.7924|      82.6|      28.1|2016-10-12|
|-110.773|18.7977|      82.8|      28.2|2016-10-12|
|-110.726|18.8029|      82.8|      28.2|2016-10-12|
|-105.221|19.3026|      83.6|      28.7|2016-10-12|
|-105.173| 19.306|      83.6|      28.7|2016-10-12|
|-105.125|19.3094|      83.6|      28.7|2016-10-12|
|-105.077|19.3128|      83.7|      28.7|2016-10-12|
|-105.029|19.3162|      83.8|      28.8|2016-10-12|
| -104.98|19

In [87]:
print(f"Final DataFrame count: {weather_october_df.count()}.")

# Generate geohash for weather
weather_october_geohash = weather_october_df.withColumn("geohash", generate_geohash(col("lat"), col("lng")))
weather_october_geohash.show()
print(f"Final DataFrame with geohash: {weather_october_geohash.count()}.")

Final DataFrame count: 37333145.
+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
|-111.202|18.7496|      82.7|      28.2|2016-10-12|   9e2f|
|-111.155| 18.755|      82.7|      28.2|2016-10-12|   9e2f|
|-111.107|18.7604|      82.7|      28.2|2016-10-12|   9e2f|
|-111.059|18.7657|      82.5|      28.1|2016-10-12|   9e34|
|-111.012|18.7711|      82.5|      28.1|2016-10-12|   9e34|
|-110.964|18.7764|      82.4|      28.0|2016-10-12|   9e34|
|-110.916|18.7818|      82.3|      27.9|2016-10-12|   9e34|
|-110.869|18.7871|      82.4|      28.0|2016-10-12|   9e34|
|-110.821|18.7924|      82.6|      28.1|2016-10-12|   9e34|
|-110.773|18.7977|      82.8|      28.2|2016-10-12|   9e34|
|-110.726|18.8029|      82.8|      28.2|2016-10-12|   9e36|
|-105.221|19.3026|      83.6|      28.7|2016-10-12|   9emj|
|-105.173| 19.306|      83.6|      28.7|2016-10-12|   9emj|
|-105.1

In [95]:
#Rename columns in weather_df to avoid conflicts
weather_geohash_renamed = weather_october_geohash.withColumnRenamed("lat", "weather_lat") \
    .withColumnRenamed("lng", "weather_lng")

## Left-join weather and restaurant data using the four-character geohash

In [80]:
schema_restaurant = StructType([
        StructField("id", StringType(), True),
        StructField("franchise_id", StringType(), True),
        StructField("franchise_name", StringType(), True),
        StructField("restaurant_franchise_id", StringType(), True),
        StructField("country", StringType(), True),
        StructField("city", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lng", DoubleType(), True),
        StructField("geohash", StringType(), True)
    ])

restaurant_df_with_geohash = spark.read.csv("./out_data/restaurant_with_geohash_csv", header=True, schema=schema_restaurant)

In [89]:
# left join on the geohash column
joined_df = restaurant_df_with_geohash.join(weather_geohash_renamed, on="geohash", how="left")

## Deduplicate the data based on highest avg_tmpr_f

In [90]:
#Deduplicate the data based on highest avg_tmpr_f
window_spec = Window.partitionBy("geohash").orderBy(func.col("avg_tmpr_f").desc())

joined_df = joined_df.withColumn("row_num", func.row_number().over(window_spec))

# Filter the row with the highest temperature
deduplicated_df = joined_df.filter(func.col("row_num") == 1).drop("row_num")

In [91]:
#Aggregation, select first values for other fields
deduplicated_df = deduplicated_df.groupBy(
    "id", "franchise_id", "franchise_name", "restaurant_franchise_id", "country", "city", 
    "lat", "lng", "geohash"
).agg(
    func.first("avg_tmpr_f").alias("avg_tmpr_f"),
    func.first("avg_tmpr_c").alias("avg_tmpr_c"),
    func.first("wthr_date").alias("wthr_date"),
    func.first("weather_lat").alias("weather_lat"),
    func.first("weather_lng").alias("weather_lng")
)

## Write to Parquet. Final data by partiotioned by wthr_date

In [93]:
# Write to Parquet, partitioned by wthr_date
output_final_files = "./out_data/final_data_parquet"
deduplicated_df.write \
    .partitionBy("wthr_date") \
    .parquet(output_final_files, mode="overwrite")