In [1]:
# The code imports the necessary modules to work with PySpark:
"""
SparkSession - to create a Spark session.
reduce - to combine multiple DataFrames.
DataFrame - for working with data in the form of tables.
col - for accessing columns in PySpark expressions.
"""
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

In [2]:
# Create a Spark session with certain configuration parameters:
"""
appName("Spark_ETL"): 
Sets the Spark application name to "Spark_ETL".

config('spark.sql.shuffle.partitions', 200):
Sets the number of partitions for shuffle operations (eg grouping, joining) to 200. Also a parameter for performance optimization.
"""
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("spark.io.compression.codec", "zstd") \
    .config("spark.sql.execution.pythonUDF.arrow.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sol.shuffle.part-tions", 200) \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "5") \
    .config("spark.sql.files.maxRecordsPerFile", 10_000) \
    .config("spark.debug.maxToStringFields", 1000) \
    .config("spark.executor.heartbeatInterval", "3600s") \
    .config("spark.network.timeout", "7200s") \
    .config("spark.network.timeoutInterval", "3600s") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
# Counts: 00000 = 400, 00001 = 397, 00002 = 400, 00003 = 400, 00000 = 400,
# Create a file_paths list containing the paths to five CSV files in the restaurant_datasets/restaurant_csv/ directory
# Used this method instead of reading all excel files entirely. When I read all the files, it takes a long time to load. So I optimized it this way

file_paths = [f'restaurant_datasets/restaurant_csv/part-0000{i}-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv' for i in range(5)]
file_paths

['restaurant_datasets/restaurant_csv/part-00000-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv',
 'restaurant_datasets/restaurant_csv/part-00001-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv',
 'restaurant_datasets/restaurant_csv/part-00002-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv',
 'restaurant_datasets/restaurant_csv/part-00003-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv',
 'restaurant_datasets/restaurant_csv/part-00004-c8acc470-919e-4ea9-b274-11488238c85e-c000.csv']

In [4]:
# Read all files and combine them into one DataFrame
dfs = [spark.read.csv(file_path, header=True, inferSchema=True) for file_path in file_paths]
dfs

[DataFrame[id: bigint, franchise_id: int, franchise_name: string, restaurant_franchise_id: int, country: string, city: string, lat: double, lng: double],
 DataFrame[id: bigint, franchise_id: int, franchise_name: string, restaurant_franchise_id: int, country: string, city: string, lat: double, lng: double],
 DataFrame[id: bigint, franchise_id: int, franchise_name: string, restaurant_franchise_id: int, country: string, city: string, lat: double, lng: double],
 DataFrame[id: bigint, franchise_id: int, franchise_name: string, restaurant_franchise_id: int, country: string, city: string, lat: double, lng: double],
 DataFrame[id: bigint, franchise_id: int, franchise_name: string, restaurant_franchise_id: int, country: string, city: string, lat: double, lng: double]]

In [5]:
# Combining all DataFrames into one
# Reduce is used to sequentially apply a union operation to all DataFrames in a list.
combined_df = reduce(DataFrame.union, dfs)
combined_df.show()

+------------+------------+--------------------+-----------------------+-------+------------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|              city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+------------------+------+--------+
|257698037796|          37|          Cafe Crepe|                  26468|     IT|             Milan|45.533|   9.171|
| 25769803831|          56|    The Waffle House|                  72230|     FR|             Paris|48.873|   2.305|
| 85899345988|          69|      Dragonfly Cafe|                  18952|     NL|         Amsterdam|52.392|   4.911|
|111669149758|          63|          Cafe Paris|                  84488|     NL|Amsterdam Zuidoost| 52.31|   4.942|
|163208757268|          21|      The Lazy Daisy|                  96638|     US|          Columbus|40.115| -83.015|
|154618822662|           7|           Cafe Roma|                  41484|

In [6]:
# Counting the number
record_count = combined_df.count()
record_count

1997

In [7]:
# Check column "lat" for emptiness
null_latitude_df = combined_df.filter(col("lat").isNull())
null_latitude_df.show()
null_latitude_df.count()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



1

In [8]:
# Check column "lng" for emptiness
null_longitude_df = combined_df.filter(col("lng").isNull())
null_longitude_df.show()
null_latitude_df.count()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



1

In [9]:
# Used ready-made code from the OpenCage Geocoding API website to check lat and lng values when transmitting addresses.

from opencage.geocoder import OpenCageGeocode

key = "your_key"
geocoder = OpenCageGeocode(key)

city_value = null_latitude_df.select("city").first()[0]
country_value = null_latitude_df.select("country").first()[0]
query = f'{city_value}, {country_value}'
results = geocoder.geocode(query)

print(u"%f; %f; %s; %s; %s" % (results[0]["geometry"]["lat"], 
                        results[0]["geometry"]["lng"],
                        results[0]["components"]["country_code"],
                        results[0]["annotations"]["timezone"]["name"],
                        results[0]["formatted"]))

34.401409; -79.386434; us; America/New_York; Dillon County, South Carolina, United States of America


In [10]:
""" 
Implements the geocode_address(address) function, 
which uses the OpenCage geocoding service API to convert an address to coordinates (latitude and longitude)
"""
import requests

def geocode_address(address):
    # my api_key and api url
    api_key = "your_key"
    api_url = f"https://api.opencagedata.com/geocode/v1/json?q={address}&key={api_key}"
    # rest request
    response = requests.get(api_url)
    data = response.json()

    if response.status_code == 200 and data.get("results"):
        # Returning coordinates from the first result
        return data["results"][0]["geometry"]["lat"], data["results"][0]["geometry"]["lng"]
    else:
        # If we couldn't get the coordinates, return None
        return None, None

In [47]:
# Select the city and country from the first line that have the empty value to check that the functions work correctly
city_value = null_latitude_df.select("city").first()[0]
country_value = null_latitude_df.select("country").first()[0]
print(city_value + ', ' + country_value)

query = f'{city_value}, {country_value}'
print(geocode_address(query))

Dillon, US
(34.4014089, -79.3864339)


In [12]:
"""
This is a custom function that will be applied to the DataFrame columns to update the coordinate values. 
If latitude (lat) or longitude (lng) values are missing, the function uses geocoding to obtain new coordinates based on the city and country. 
If the values already exist, the function leaves them unchanged.
"""
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, FloatType
# Function to update values
def update_coordinates(lat, lng, city_value, country_value):
    if lat is None or lng is None:
        # If the value is incorrect, update using the API
        address = f'{city_value}, {country_value}'
        new_lat, new_lng = geocode_address(address)
        return new_lat, new_lng
    else:
        # If the value already exists, leave it unchanged
        return lat, lng

In [29]:
"""
Applies the update_coordinates function to the columns of the DataFrame. 
The StructType schema specifies that the UDF will return a structure with two fields: updated_latitude and updated_longitude. 
These fields will contain the updated latitude and longitude values respectively.
"""
update_coordinates_udf = udf(update_coordinates, StructType([
    StructField("updated_latitude", FloatType()),
    StructField("updated_longitude", FloatType())
]))

In [30]:
# Creating a new column 'updated_coordinates' in DataFrame 'combined_df'
updated_df = combined_df.withColumn("updated_coordinates", update_coordinates_udf("lat", "lng", "city", "country"))

In [31]:
updated_df.show()

+------------+------------+--------------------+-----------------------+-------+------------------+------+--------+-------------------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|              city|   lat|     lng|updated_coordinates|
+------------+------------+--------------------+-----------------------+-------+------------------+------+--------+-------------------+
|257698037796|          37|          Cafe Crepe|                  26468|     IT|             Milan|45.533|   9.171|    {45.533, 9.171}|
| 25769803831|          56|    The Waffle House|                  72230|     FR|             Paris|48.873|   2.305|    {48.873, 2.305}|
| 85899345988|          69|      Dragonfly Cafe|                  18952|     NL|         Amsterdam|52.392|   4.911|    {52.392, 4.911}|
|111669149758|          63|          Cafe Paris|                  84488|     NL|Amsterdam Zuidoost| 52.31|   4.942|     {52.31, 4.942}|
|163208757268|          21|      The Lazy Daisy|

In [32]:
# Updates the values of the lat and lng columns in the DataFrame updated_df using information from the new updated_coordinates column
updated_df = updated_df.withColumn("lat", col("updated_coordinates.updated_latitude"))
updated_df = updated_df.withColumn("lng", col("updated_coordinates.updated_longitude"))

# Deleting a column "updated_coordinates"
updated_df = updated_df.drop("updated_coordinates")

updated_df.filter(col("id") == 85899345920).show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [17]:
# Now check again for empty values
null_longitude_df = updated_df.filter(col("lat").isNull())
null_longitude_df.show()
null_latitude_df.count()


+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



1

In [18]:
# Round the lat and lng to three fractional values
from pyspark.sql.functions import round
updated_df = updated_df.withColumn("lat",round(col("lat"), 3))
updated_df = updated_df.withColumn("lng",round(col("lng"), 3))
updated_df.show(truncate=False)

+------------+------------+---------------------+-----------------------+-------+------------------+------+--------+
|id          |franchise_id|franchise_name       |restaurant_franchise_id|country|city              |lat   |lng     |
+------------+------------+---------------------+-----------------------+-------+------------------+------+--------+
|257698037796|37          |Cafe Crepe           |26468                  |IT     |Milan             |45.533|9.171   |
|25769803831 |56          |The Waffle House     |72230                  |FR     |Paris             |48.873|2.305   |
|85899345988 |69          |Dragonfly Cafe       |18952                  |NL     |Amsterdam         |52.392|4.911   |
|111669149758|63          |Cafe Paris           |84488                  |NL     |Amsterdam Zuidoost|52.31 |4.942   |
|163208757268|21          |The Lazy Daisy       |96638                  |US     |Columbus          |40.115|-83.015 |
|154618822662|7           |Cafe Roma            |41484          

In [193]:
import geohash2
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Define a UDF to compute geohash, handling None values
@udf(StringType())
def safe_geohash(lat, lng):
    if lat is not None and lng is not None:
        return geohash2.encode(lat, lng, precision=4)
    else:
        return None

# Register the UDF
geohash_udf = safe_geohash

# Apply the UDF to create a new column "geohash" in the dataframe
geohash_df = updated_df.withColumn("geohash", geohash_udf(col("lat"), col("lng")))


geohash_df.show()

+------------+------------+--------------------+-----------------------+-------+------------------+------+--------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|              city|   lat|     lng|geohash|
+------------+------------+--------------------+-----------------------+-------+------------------+------+--------+-------+
|257698037796|          37|          Cafe Crepe|                  26468|     IT|             Milan|45.533|   9.171|   u0ne|
| 25769803831|          56|    The Waffle House|                  72230|     FR|             Paris|48.873|   2.305|   u09w|
| 85899345988|          69|      Dragonfly Cafe|                  18952|     NL|         Amsterdam|52.392|   4.911|   u176|
|111669149758|          63|          Cafe Paris|                  84488|     NL|Amsterdam Zuidoost| 52.31|   4.942|   u179|
|163208757268|          21|      The Lazy Daisy|                  96638|     US|          Columbus|40.115| -83.015|   dphu|
|1546188

In [196]:
"""
This line generates a list of file paths using rglob("*.parquet"), 
which recursively searches for all files with the .parquet extension in the specified directory and its subdirectories. 
The list comprehension converts the Path objects to strings.
After running this code, parquet_files will contain the paths to all .parquet files in the specified directory
"""
from pathlib import Path

parquet_files = []
directory_path = f"Weather_ds/weather/"
# Get a list of .parquet files in the specified directory
parquet_files = [str(file) for file in Path(directory_path).rglob("*.parquet")]

len(parquet_files)

3

In [198]:
# Here I save all links to .parquet files in one list. Because when I read all the .parquet files at once, it takes a long time to load.
# So I decided to read one file at a time. Then I delete duplicates. To implement such a method, you need to have a ready-made dataframe.
# And each time read one file, process this file (remove duplicates), and fill new dataframes to the existing dataframe.
weather_data = spark.read.parquet(parquet_files[0])

In [199]:
"""
To use weather and restaurants dataframes, you need a common value.
In this case, our common value will be geohash.
To do this, I create a new geohash column in the weather dataframe based on the lat and lng columns
"""
# Register the UDF
weather_data_udf = safe_geohash

# Apply the UDF to create a new column "geohash" in the dataframe
weather_data = weather_data.withColumn("geohash", weather_data_udf(col("lat"), col("lng")))

weather_data.show()

+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
|-111.202|18.7496|      82.1|      27.8|2016-10-21|   9e2f|
|-111.155| 18.755|      82.1|      27.8|2016-10-21|   9e2f|
|-111.107|18.7604|      82.0|      27.8|2016-10-21|   9e2f|
|-111.059|18.7657|      81.9|      27.7|2016-10-21|   9e34|
|-111.012|18.7711|      81.8|      27.7|2016-10-21|   9e34|
|-110.964|18.7764|      81.7|      27.6|2016-10-21|   9e34|
|-110.916|18.7818|      81.6|      27.6|2016-10-21|   9e34|
|-110.869|18.7871|      81.7|      27.6|2016-10-21|   9e34|
|-110.821|18.7924|      81.8|      27.7|2016-10-21|   9e34|
|-110.773|18.7977|      81.9|      27.7|2016-10-21|   9e34|
|-110.726|18.8029|      81.9|      27.7|2016-10-21|   9e36|
|-105.221|19.3026|      83.1|      28.4|2016-10-21|   9emj|
|-105.173| 19.306|      82.9|      28.3|2016-10-21|   9emj|
|-105.125|19.3094|      82.6|      28.1|

In [200]:
"""
This cycle processes Parquet files. 
Each file is read into a DataFrame (current_data), duplicates are removed by the "wthr_date" and "geohash" columns, 
and then the data is merged into the common DataFrame weather_data using the union operation.
After the loop completes, a duplicate removal operation is performed on the "wthr_date" and "geohash" columns.

The overall result of the code is to combine data from several Parquet files into one DataFrame weather_data on the "wthr_date" and "geohash" columns
"""
weather_data = weather_data.dropDuplicates(["wthr_date","geohash"])
for i in range(1,len(parquet_files)):
    current_data = spark.read.parquet(parquet_files[i])
    current_data = current_data.withColumn("geohash", weather_data_udf(col("lat"), col("lng")))
    current_data = current_data.dropDuplicates(["wthr_date","geohash"])
    weather_data = weather_data.union(current_data)
weather_data = weather_data.dropDuplicates(["wthr_date","geohash"])

In [201]:
# After deleting duplicates.
weather_data.limit(20).show()

+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
|-72.0244|18.4577|      74.6|      23.7|2016-10-21|   d7kc|
|-105.308|20.3914|      74.2|      23.4|2016-10-21|   9eth|
|-89.6325|20.9328|      79.6|      26.4|2016-10-21|   d58r|
|-104.275| 20.918|      65.4|      18.6|2016-10-21|   9etz|
| -77.941|20.4832|      81.1|      27.3|2016-10-21|   d78s|
| -74.852|20.7737|      80.9|      27.2|2016-10-21|   d7dy|
|-100.543|22.7064|      60.4|      15.8|2016-10-21|   9u09|
|-79.8007|22.5267|      74.0|      23.3|2016-10-21|   dhp2|
|-102.268|23.4083|      57.7|      14.3|2016-10-21|   9spm|
|-109.762|23.0307|      81.3|      27.4|2016-10-21|   9s1g|
|-99.2092|24.2582|      67.3|      19.6|2016-10-21|   9u36|
|-102.269|24.2772|      61.1|      16.2|2016-10-21|   9sr6|
|-98.4268|24.9637|      69.6|      20.9|2016-10-21|   9u6n|
|-83.9856| 51.161|      34.9|       1.6|

In [202]:
# Connecting dataframes weather_data and geohash_df using the left join. Common column is geohash
joined_df = weather_data.join(geohash_df, on="geohash", how="left")

In [203]:
joined_df.show()

+-------+--------+-------+----------+----------+----------+----+------------+--------------+-----------------------+-------+----+----+----+
|geohash|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|  id|franchise_id|franchise_name|restaurant_franchise_id|country|city| lat| lng|
+-------+--------+-------+----------+----------+----------+----+------------+--------------+-----------------------+-------+----+----+----+
|   d7kc|-72.0244|18.4577|      74.6|      23.7|2016-10-21|NULL|        NULL|          NULL|                   NULL|   NULL|NULL|NULL|NULL|
|   9eth|-105.308|20.3914|      74.2|      23.4|2016-10-21|NULL|        NULL|          NULL|                   NULL|   NULL|NULL|NULL|NULL|
|   d58r|-89.6325|20.9328|      79.6|      26.4|2016-10-21|NULL|        NULL|          NULL|                   NULL|   NULL|NULL|NULL|NULL|
|   9etz|-104.275| 20.918|      65.4|      18.6|2016-10-21|NULL|        NULL|          NULL|                   NULL|   NULL|NULL|NULL|NULL|
|   d78s| -77.941|20

In [209]:
# Used inner to show same values
joined_df = weather_data.join(geohash_df, on="geohash", how="inner")
joined_df.select(["country", "city", "franchise_name", "geohash", "avg_tmpr_f", "avg_tmpr_c", "wthr_date"]).show()

+-------+------------------+------------------+-------+----------+----------+----------+
|country|              city|    franchise_name|geohash|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+------------------+------------------+-------+----------+----------+----------+
|     US|          Marathon|           Savoria|   dhqk|      78.5|      25.8|2016-10-21|
|     US|            Normal|    The Red Pepper|   dp0w|      45.6|       7.6|2016-10-21|
|     US|       Bloomington|    The Red Pepper|   dp0w|      45.6|       7.6|2016-10-21|
|     US|       Bloomington|The Gourmet Garden|   dp0w|      45.6|       7.6|2016-10-21|
|     US|       Bloomington|    The Lazy Daisy|   dp0w|      45.6|       7.6|2016-10-21|
|     US|         Woodstock|    The Hungry Pig|   dp90|      42.1|       5.6|2016-10-21|
|     US|      Crystal Lake| The Blue Elephant|   dp90|      42.1|       5.6|2016-10-21|
|     US|   Boiling Springs|  The Golden Spoon|   dnjx|      61.4|      16.3|2016-10-21|
|     US|            