In [1]:
# Import the necessary libraries
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, isnull, broadcast
from pyspark.sql.types import FloatType, StringType
import os
import pygeohash as pgh

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("ETLJob")\
    .config("spark.executor.memory", "8g")\
    .config("spark.driver.memory", "4g")\
    .getOrCreate()

In [3]:
# Define input path
input_path = "restaurant_csv/"

In [4]:
# Create a list of CSV files from given restaurant_csv directory
csv_files = [os.path.join(input_path, f) for f in os.listdir(input_path) if f.endswith('.csv')]

In [5]:
# Read the CSV files
restaurant_df = spark.read.format("csv").option("header", "true").load(csv_files)

In [6]:
restaurant_df.count()

1997

In [7]:
restaurant_df.where(col("lat").isNull()).show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|null|null|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [8]:
# Define the OpenCage Geocoding API endpoint and API key
API_ENDPOINT = "https://api.opencagedata.com/geocode/v1/json"
API_KEY = "YOUR_API_KEY"

# Filter the dataframe to include only rows with null values in the latitude or longitude columns
null_lat_lon_df = restaurant_df.filter(isnull("lat") | isnull("lng"))

# Iterate over each row in the filtered dataframe and map the latitude and longitude using the OpenCage Geocoding API
for row in null_lat_lon_df.collect():
    # Extract the address from the row
    address = row["city"]

    # Build the query parameters for the API call
    params = {
        "q": address,
        "key": API_KEY
    }

    # Make the API call
    response = requests.get(API_ENDPOINT, params=params)

    # Extract the latitude and longitude from the API response
    if response.ok:
        data = response.json()
        lat = data["results"][0]["geometry"]["lat"]
        lon = data["results"][0]["geometry"]["lng"]

        # Update the latitude and longitude columns of the original dataframe
        restaurant_df = restaurant_df.withColumn("lat", when(col("city") == address, lat).otherwise(col("lat")))
        restaurant_df = restaurant_df.withColumn("lng", when(col("city") == address, lon).otherwise(col("lng")))

In [9]:
# Filter records with id 85899345920
restaurant_df.where(col("id") == 85899345920).show()

+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|       lat|        lng|
+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.4014089|-79.3864339|
+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+



In [10]:
# Define a UDF to generate a geohash from latitude and longitude
@udf(returnType=StringType())
def geohash_udf(lat, lng):
    lat = float(lat)
    lng = float(lng)
    return pgh.encode(lat, lng)


In [11]:
restaurant_df = restaurant_df.withColumn("geohash", geohash_udf(restaurant_df.lat, restaurant_df.lng))

In [15]:
restaurant_df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+------------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|     geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+------------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|dn4hey9prwrk|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|u09tvyu7fqsf|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|9xydf2x8seef|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|u2edm2zt4deg|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|gcpugwkuj7u0|


In [16]:
# Read the weather dataset from a given source
path = 'weather_dataset'
weather_df = spark.read.parquet(path)

In [17]:
weather_df.show()

+--------+-------+----------+----------+----------+----+-----+---+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|
+--------+-------+----------+----------+----------+----+-----+---+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|2017|    8| 29|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|2017|    8| 29|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|2017|    8| 29|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|2017|    8| 29|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|2017|    8| 29|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|2017|    8| 29|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|2017|    8| 29|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|2017|    8| 29|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|2017|    8| 29|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|2017|    8| 29|
|-104.924|19.1866|      82.0|      27.8|2017-08-29|2017|    8| 29|
|-104.876|19.1899|      82.0|      27.8|2017-08-29|2017|    8|

In [19]:
# Define a UDF to generate a geohash from latitude and longitude
weather_df = weather_df.withColumn("geohash", geohash_udf(weather_df.lat, weather_df.lng))

In [20]:
weather_df.show()

+--------+-------+----------+----------+----------+----+-----+---+------------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|     geohash|
+--------+-------+----------+----------+----------+----+-----+---+------------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|2017|    8| 29|9e31bndep6k1|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|2017|    8| 29|9e31cptdtbgt|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|2017|    8| 29|9e3440xcmhcm|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|2017|    8| 29|9e3453dbge89|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|2017|    8| 29|9e34h6mpdxw3|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|2017|    8| 29|9e34je2n95tb|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|2017|    8| 29|9e34ns6vrxt2|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|2017|    8| 29|9emm14gz4nsy|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|2017|    8| 29|9emm45wg08s2|
|-104.972|19.1832|      82.0|      27.8|

In [21]:
# add new column to restaurant_df with four characters of geohash
restaurant_df = restaurant_df.withColumn("geohash_4", restaurant_df.geohash.substr(0, 4))

# add new column to weather_df with four characters of geohash
weather_df = weather_df.withColumn("geohash_4", weather_df.geohash.substr(0, 4))

In [22]:
# drop lat lng and geohash columns from restaurant_df
restaurant_df = restaurant_df.drop("lat", "lng", "geohash")

# drop lat lng and geohash columns from weather_df
weather_df = weather_df.drop("lat", "lng", "geohash")

In [23]:
# drop duplicates from restaurant_df based on geohash_4 and wthr_date, because we don't want to keep duplicate weather records from same location and date
weather_df = weather_df.dropDuplicates(["geohash_4", "wthr_date"])

In [24]:
# join restaurant_df and weather_df on geohash_4
df_joined = restaurant_df.join(weather_df, "geohash_4", "left")

In [31]:
# new partitioned parquet file
df_joined.write.mode("overwrite").partitionBy('year', 'month', 'day').parquet("enriched_data")

In [32]:
# read parquet file
parquet_df = spark.read.parquet("enriched_data")

In [33]:
parquet_df.show()

+---------+------------+------------+------------------+-----------------------+-------+-----------------+----------+----------+----------+----+-----+---+
|geohash_4|          id|franchise_id|    franchise_name|restaurant_franchise_id|country|             city|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|
+---------+------------+------------+------------------+-----------------------+-------+-----------------+----------+----------+----------+----+-----+---+
|     9fcp| 51539607552|           1|           Savoria|                   6934|     US|          Whigham|      82.6|      28.1|2016-10-05|2016|   10|  5|
|     9pzv| 25769803813|          38|         Cafe Viva|                  72230|     US|          Newport|      50.5|      10.3|2016-10-05|2016|   10|  5|
|     9q5f| 25769803781|           6|   The Daily Grind|                  72230|     US|      Studio City|      64.0|      17.8|2016-10-05|2016|   10|  5|
|     9q9x| 42949672973|          14|The Gourmet Garden|              