In [2]:
import findspark
import sys
import os
import glob
findspark.init()
from pyspark.sql import SparkSession, SQLContext
from opencage.geocoder import OpenCageGeocode
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import udf, isnull, col, coalesce, when
from dotenv import load_dotenv
from geohash2 import encode
load_dotenv()
os.system("export PYTHONHASHSEED=0")


1

In [3]:
spark = SparkSession.builder.appName("test") \
 .config("spark.driver.memory", "8g") \
 .config("spark.executor.memory", "8g") \
 .getOrCreate()
spark.version

'3.4.0'

In [4]:
res_path = "./restaurant_csv/restaurant_csv"
res_files = [os.path.join(res_path, f) for f in os.listdir(res_path) if f.endswith('.csv')]
res_df = spark.read.format("csv").option("header", "true").load(res_files)
res_df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657| -

In [5]:
#Check for null values in lat and lng for testing purposes
res_df.filter(isnull("lat") | isnull("lng")).show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|null|null|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [6]:
# Defining the function to get the lat and lng values from OpenCageAPI
key = os.getenv('API_KEY')
geocoder = OpenCageGeocode(key)
def get_lat(city):
    result = geocoder.geocode(city)
    if result and len(result):
        return result[0]['geometry']['lat']
    return None

def get_lng(city):
    result = geocoder.geocode(city)
    if result and len(result):
        return result[0]['geometry']['lng']
    return None
#Defining udf function to get a string as a return type
get_lat_udf = udf(get_lat, returnType=StringType())
get_lng_udf = udf(get_lng, returnType=StringType())

In [7]:
# Create a new data frame containing all null lat and lng rows
null_lat_lng_df = res_df.filter((isnull(col("lat"))) | (col("lat") == '') | (isnull(col("lng"))) | (col("lng") == ''))

# Apply the UDF to the filtered DataFrame
updated_null_lat_lng_df = null_lat_lng_df.withColumn("lat", when((isnull(col("lat"))) | (col("lat") == ''), get_lat_udf(col("city"))).otherwise(col("lat")))
updated_null_lat_lng_df = updated_null_lat_lng_df.withColumn("lng", when((isnull(col("lng"))) | (col("lng") == ''), get_lng_udf(col("city"))).otherwise(col("lng")))


In [8]:
# Remove the rows with null or empty lat and lng values from the original DataFrame
res_df = res_df.filter(~((isnull(col("lat"))) | (col("lat") == '') | (isnull(col("lng"))) | (col("lng") == '')))

# Join the updated rows back to the original DataFrame
res_df = res_df.unionByName(updated_null_lat_lng_df)

In [10]:
# Check if the null values were filled
res_df.where(col("id") == 85899345920).show()

+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|       lat|        lng|
+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.4014089|-79.3864339|
+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+



In [11]:
# Defining the geohash function to return encoded lat and lng with percision equal to 4
def geohash_udf(lat, lng):
    return encode(float(lat), float(lng), precision=4)
# UDF function to return String Type
geohash_func = udf(geohash_udf, returnType=StringType())

In [12]:
# Geohash column to the restaurant dataframe, using cast to ensure that the values of lat and lng are float type
res_df = res_df.withColumn('geohash', geohash_func(col('lat'), col('lng')))

In [14]:
# Show the dataframed with geohash
res_df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|   dn4h|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|   u09t|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|   9xyd|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|   u2ed|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|   gcpu|
| 68719476763|          28|    The Spicy

In [13]:
# Save restaurant dataframe in the local storage, repartitioning into 5 files 
res_df.repartition(5).write.parquet('./output_data/restaurant', mode='overwrite')

In [16]:
#read data into the new dataframe
restaurant_df = spark.read.parquet('./output_data/restaurant/*.parquet')

In [44]:
# Set path to weather data
weather_path = './weather_data/'

# Load all partitions, excluded day, month, year
partitions = [spark.read.parquet(partition_path) for partition_path in glob.glob(weather_path + "*/weather/*/*/*")]

# Union all the partitions
weather_df = partitions[0]
for partition in partitions[1:]:
    weather_df = weather_df.union(partition)


In [45]:
# Check if the data was read 
weather_df.show()

+--------+-------+----------+----------+----------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+--------+-------+----------+----------+----------+
|-104.423|21.5478|      72.6|      22.6|2017-08-05|
|-104.374| 21.551|      71.7|      22.1|2017-08-05|
|-104.325|21.5541|      70.8|      21.6|2017-08-05|
|-104.276|21.5573|      70.1|      21.2|2017-08-05|
|-104.227|21.5604|      69.7|      20.9|2017-08-05|
|-104.178|21.5635|      69.3|      20.7|2017-08-05|
|-104.129|21.5665|      68.9|      20.5|2017-08-05|
| -104.08|21.5696|      68.6|      20.3|2017-08-05|
|-104.032|21.5726|      68.3|      20.2|2017-08-05|
|-103.983|21.5757|      70.7|      21.5|2017-08-05|
|-103.934|21.5787|      73.3|      22.9|2017-08-05|
|-103.885|21.5817|      75.3|      24.1|2017-08-05|
|-103.836|21.5846|      76.2|      24.6|2017-08-05|
|-103.787|21.5876|      77.0|      25.0|2017-08-05|
|-103.738|21.5905|      74.6|      23.7|2017-08-05|
|-103.689|21.5935|      70.2|      21.2|2017-08-05|
| -103.64|21

In [47]:
# Geohash column to the weather dataframe, using cast to ensure that the values of lat and lng are float type
weather_df = weather_df.withColumn('geohash', geohash_func(col('lat').cast(FloatType()), col('lng').cast(FloatType())))

In [48]:
# Show dataframe with geohash
weather_df.show()

+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
|-104.423|21.5478|      72.6|      22.6|2017-08-05|   9evd|
|-104.374| 21.551|      71.7|      22.1|2017-08-05|   9evf|
|-104.325|21.5541|      70.8|      21.6|2017-08-05|   9evf|
|-104.276|21.5573|      70.1|      21.2|2017-08-05|   9evf|
|-104.227|21.5604|      69.7|      20.9|2017-08-05|   9evf|
|-104.178|21.5635|      69.3|      20.7|2017-08-05|   9evf|
|-104.129|21.5665|      68.9|      20.5|2017-08-05|   9evf|
| -104.08|21.5696|      68.6|      20.3|2017-08-05|   9evf|
|-104.032|21.5726|      68.3|      20.2|2017-08-05|   9ey4|
|-103.983|21.5757|      70.7|      21.5|2017-08-05|   9ey4|
|-103.934|21.5787|      73.3|      22.9|2017-08-05|   9ey4|
|-103.885|21.5817|      75.3|      24.1|2017-08-05|   9ey4|
|-103.836|21.5846|      76.2|      24.6|2017-08-05|   9ey4|
|-103.787|21.5876|      77.0|      25.0|

In [49]:
# Dropping duplicates on geohash and wthr_date to avoid data duplication before joining. 
weather_df = weather_df.dropDuplicates(['wthr_date', 'geohash'])

In [50]:
# Drop lat and lng as they are not needed, as we have lat and lng in res df
weather_df = weather_df.drop("lat", "lng")

In [51]:
# Write parquet decresing number of partition to 10 using coalesce
weather_df.coalesce(10).write.parquet('./output_data/weather', mode='overwrite')

In [52]:
# Read the data
weather_dtf = spark.read.parquet('./output_data/weather/*.parquet')

In [53]:
# Check if the data was read
weather_dtf.show()

+----------+----------+----------+-------+
|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+----------+----------+----------+-------+
|      86.3|      30.2|2017-08-05|   d5uh|
|      85.5|      29.7|2017-08-05|   9gcz|
|      87.1|      30.6|2017-08-05|   9s7f|
|      84.5|      29.2|2017-08-05|   dk22|
|      79.5|      26.4|2017-08-05|   9sxf|
|      81.8|      27.7|2017-08-05|   9s8c|
|      77.9|      25.5|2017-08-05|   9tp8|
|      82.8|      28.2|2017-08-05|   9vps|
|      83.8|      28.8|2017-08-05|   9v73|
|      81.2|      27.3|2017-08-05|   djjy|
|      82.6|      28.1|2017-08-05|   9v7f|
|      82.9|      28.3|2017-08-05|   djk6|
|      78.4|      25.8|2017-08-05|   djt0|
|      69.1|      20.6|2017-08-05|   9mms|
|      79.0|      26.1|2017-08-05|   9te4|
|      81.0|      27.2|2017-08-05|   djdj|
|      78.7|      25.9|2017-08-05|   9tem|
|      87.1|      30.6|2017-08-05|   9mx5|
|      77.5|      25.3|2017-08-05|   9tg3|
|      78.5|      25.8|2017-08-05|   djbm|
+----------

In [54]:
# Left join the weather_dft and restaurant_df on geohash
joined_df = weather_dtf.join(restaurant_df, on=['geohash'], how='left')

In [58]:
# See if the left join was completed succesfully 
joined_df.printSchema()

root
 |-- geohash: string (nullable = true)
 |-- avg_tmpr_f: double (nullable = true)
 |-- avg_tmpr_c: double (nullable = true)
 |-- wthr_date: string (nullable = true)
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)



In [55]:
# Drop rows with null values
filtered_joined_df = joined_df.dropna()

In [56]:
# Write a parquet file partitionedBy franchise_name
filtered_joined_df.write.partitionBy('franchise_name').parquet('./enriched_data', mode='overwrite')

In [59]:
# Showing results 
filtered_joined_df.show()

+-------+----------+----------+----------+------------+------------+------------------+-----------------------+-------+----------------+------+--------+
|geohash|avg_tmpr_f|avg_tmpr_c| wthr_date|          id|franchise_id|    franchise_name|restaurant_franchise_id|country|            city|   lat|     lng|
+-------+----------+----------+----------+------------+------------+------------------+-----------------------+-------+----------------+------+--------+
|   dpey|      64.3|      17.9|2017-08-05|180388626456|          25|     The Cozy Cafe|                   7531|     US|            Alma|43.377| -84.607|
|   9yuv|      61.9|      16.6|2017-08-05|154618822674|          19|  The Tasting Room|                  41484|     US|     Lees Summit|38.903| -94.357|
|   dn6j|      69.1|      20.6|2017-08-05|  8589934599|           8|   The Green Olive|                  12630|     US|       Nashville|36.079| -86.955|
|   9q92|      69.0|      20.6|2017-08-05|249108103182|          15| The Pearl Kit

In [57]:
# Checking data integrity
filtered_joined_df.where(col('geohash') == 'dpey').show()
filtered_joined_df.where(col('geohash') == 'dpey').count()

+-------+----------+----------+----------+------------+------------+--------------+-----------------------+-------+----+------+-------+
|geohash|avg_tmpr_f|avg_tmpr_c| wthr_date|          id|franchise_id|franchise_name|restaurant_franchise_id|country|city|   lat|    lng|
+-------+----------+----------+----------+------------+------------+--------------+-----------------------+-------+----+------+-------+
|   dpey|      64.3|      17.9|2017-08-05|180388626456|          25| The Cozy Cafe|                   7531|     US|Alma|43.377|-84.607|
|   dpey|      66.2|      19.0|2017-08-13|180388626456|          25| The Cozy Cafe|                   7531|     US|Alma|43.377|-84.607|
|   dpey|      69.3|      20.7|2017-08-19|180388626456|          25| The Cozy Cafe|                   7531|     US|Alma|43.377|-84.607|
|   dpey|      62.0|      16.7|2017-08-28|180388626456|          25| The Cozy Cafe|                   7531|     US|Alma|43.377|-84.607|
|   dpey|      65.3|      18.5|2016-10-06|180388

92