importing all necessary requirements

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, avg, when, col, lit, desc
from pyspark.sql.types import StringType
import geohash2 as geohash
from dotenv import load_dotenv
import requests
import os


Initialize SparkSession, define paths and load data 

In [2]:
spark = SparkSession.builder.appName("RestaurantWeatherAnalysis").getOrCreate()
restaurant_data_path = "/mnt/c/Users/daniy/Desktop/code/epam/Spark_module/restaurant_csv/restaurant_csv"
weather_data_path = "/mnt/c/Users/daniy/Desktop/code/epam/Spark_module/weather"
restaurant_df = spark.read.option("header", "true").csv(restaurant_data_path)
weather_df = spark.read.option("recursiveFileLookup", "true").parquet(weather_data_path)
restaurant_df.printSchema()
weather_df.printSchema()

your 131072x1 screen size is bogus. expect trouble
24/11/22 01:00:17 WARN Utils: Your hostname, Daniyar resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/22 01:00:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/22 01:00:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)

root
 |-- lng: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- avg_tmpr_f: double (nullable = true)
 |-- avg_tmpr_c: double (nullable = true)
 |-- wthr_date: string (nullable = true)



Convert lat and lng columns to numeric format

In [3]:

restaurant_df = restaurant_df.withColumn("lat", col("lat").cast("double"))
restaurant_df = restaurant_df.withColumn("lng", col("lng").cast("double"))

weather_df = weather_df.withColumn("lat", col("lat").cast("double"))
weather_df = weather_df.withColumn("lng", col("lng").cast("double"))


Checking for missing values

In [4]:
print("Missing data in restaurant_df:")
restaurant_df.filter("lat IS NULL OR lng IS NULL").show()

print("Missing data in weather_df:")
weather_df.filter("lat IS NULL OR lng IS NULL").show()


Missing data in restaurant_df:
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+

Missing data in weather_df:




+---+---+----------+----------+---------+
|lng|lat|avg_tmpr_f|avg_tmpr_c|wthr_date|
+---+---+----------+----------+---------+
+---+---+----------+----------+---------+



                                                                                

Load API key from environment variables and with UDF receiving coordinates from opencagedata

In [5]:
load_dotenv()

def get_coordinates(city, country):
    api_key = os.getenv("OPENCAGE_API_KEY")
    if not api_key:
        raise ValueError("API key for OpenCage is not set in environment variables.")
    url = f"https://api.opencagedata.com/geocode/v1/json?q={city},{country}&key={api_key}"
    response = requests.get(url).json()
    if response['results']:
        lat = response['results'][0]['geometry']['lat']
        lng = response['results'][0]['geometry']['lng']
        return lat, lng
    return None, None

Fetching coordinates for a specific city

In [None]:
city = "Dillon"
country = "US"
lat, lng = get_coordinates(city, country)
print(f"Coordinates for {city}, {country}: Latitude {lat}, Longitude {lng}")


Coordinates for Dillon, US: Latitude 34.4014089, Longitude -79.3864339


Update missing and verify updated coordinates

In [7]:
updated_restaurant_df = restaurant_df.withColumn(
    "lat",
    when(col("id") == "85899345920", lit(34.4014089)).otherwise(col("lat"))
).withColumn(
    "lng",
    when(col("id") == "85899345920", lit(-79.3864339)).otherwise(col("lng"))
)


updated_restaurant_df.filter(col("id") == "85899345920").show()



+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|       lat|        lng|
+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.4014089|-79.3864339|
+-----------+------------+--------------+-----------------------+-------+------+----------+-----------+



Function to generate geohash

In [8]:
def generate_geohash(lat, lon):
    return geohash.encode(lat, lon, precision=4)

geohash_udf = udf(generate_geohash, StringType())


Add geohash column and verify data with geohashes

In [9]:
restaurant_df = updated_restaurant_df.withColumn("geohash", geohash_udf("lat", "lng"))

weather_df = weather_df.withColumn("geohash", geohash_udf("lat", "lng"))


restaurant_df.show(5, truncate=False)
weather_df.show(5, truncate=False)


+------------+------------+----------------+-----------------------+-------+----------+------+-------+-------+
|id          |franchise_id|franchise_name  |restaurant_franchise_id|country|city      |lat   |lng    |geohash|
+------------+------------+----------------+-----------------------+-------+----------+------+-------+-------+
|197568495625|10          |The Golden Spoon|24784                  |US     |Decatur   |34.578|-87.021|dn4h   |
|17179869242 |59          |Azalea Cafe     |10902                  |FR     |Paris     |48.861|2.368  |u09t   |
|214748364826|27          |The Corner Cafe |92040                  |US     |Rapid City|44.08 |-103.25|9xyd   |
|154618822706|51          |The Pizzeria    |41484                  |AT     |Vienna    |48.213|16.413 |u2ed   |
|163208757312|65          |Chef's Corner   |96638                  |GB     |London    |51.495|-0.191 |gcpu   |
+------------+------------+----------------+-----------------------+-------+----------+------+-------+-------+
o

Aggregate weather data

In [10]:
aggregated_weather = weather_df.groupBy("geohash").agg(
    avg("avg_tmpr_c").alias("avg_temperature_c"),
    avg("avg_tmpr_f").alias("avg_temperature_f")
)

aggregated_weather.show(5, truncate=False)


24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:01:08 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+------------------+-----------------+
|geohash|avg_temperature_c |avg_temperature_f|
+-------+------------------+-----------------+
|d580   |27.949068322981358|82.30904503105593|
|9g8d   |15.891032608695653|60.60267857142856|
|d79k   |27.317431561996784|81.1713768115942 |
|d5cd   |27.77519603796946 |81.99649195212547|
|9gfm   |26.82876552795031 |80.29219720496897|
+-------+------------------+-----------------+
only showing top 5 rows



                                                                                

Left join restaurant and weather data

In [11]:
joined_df = restaurant_df.join(aggregated_weather, "geohash", "left")

joined_df.show(5, truncate=False)


24/11/22 01:03:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:03:02 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+------------+------------+-----------------+-----------------------+-------+----------+------+-------+------------------+-----------------+
|geohash|id          |franchise_id|franchise_name   |restaurant_franchise_id|country|city      |lat   |lng    |avg_temperature_c |avg_temperature_f|
+-------+------------+------------+-----------------+-----------------------+-------+----------+------+-------+------------------+-----------------+
|dnqx   |103079215121|18          |The Harvest Room |4340                   |US     |Mount Airy|36.481|-80.602|19.375336322869956|66.87460252751735|
|dn4h   |197568495625|10          |The Golden Spoon |24784                  |US     |Decatur   |34.578|-87.021|22.17667224080268 |71.91659698996655|
|sp3e   |154618822678|23          |The Hungry Pig   |41484                  |ES     |Barcelona |41.383|2.17   |21.233152173913044|70.22282608695652|
|gcpv   |171798691900|61          |Blue Water Bistro|65939                  |GB     |London    |51.519|-0.

Remove duplicates

In [12]:
joined_df = joined_df.dropDuplicates()


Analyze data

In [13]:
avg_temp_per_restaurant = joined_df.groupBy("id", "franchise_name", "city").agg(
    avg("avg_temperature_c").alias("average_temperature_c")
)
avg_temp_per_restaurant.show(5, truncate=False)


24/11/22 01:06:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:29 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:06:29 WARN RowBasedKeyValueBatch: Calling spill() on

+------------+---------------+-----------+---------------------+
|id          |franchise_name |city       |average_temperature_c|
+------------+---------------+-----------+---------------------+
|77309411338 |The Flaming Wok|Memphis    |22.562538819875783   |
|180388626433|Bella Cucina   |Van Horn   |26.32203659506763    |
|249108103173|The Daily Grind|Glendale   |30.035696821515888   |
|188978561043|The Brasserie  |Glen Burnie|22.567934782608695   |
|77309411397 |Earth and Vine |Juneau     |NULL                 |
+------------+---------------+-----------+---------------------+
only showing top 5 rows



                                                                                

Restaurants with the highest temperature

In [14]:
joined_df.orderBy(desc("avg_temperature_c")).show(5, truncate=False)


24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:10:09 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+------------+------------+---------------+-----------------------+-------+--------+------+--------+------------------+-----------------+
|geohash|id          |franchise_id|franchise_name |restaurant_franchise_id|country|city    |lat   |lng     |avg_temperature_c |avg_temperature_f|
+-------+------------+------------+---------------+-----------------------+-------+--------+------+--------+------------------+-----------------+
|9tbq   |163208757255|8           |The Green Olive|96638                  |US     |Phoenix |33.455|-112.067|30.037913367489516|86.06921285514673|
|9tbn   |249108103173|6           |The Daily Grind|55897                  |US     |Glendale|33.539|-112.183|30.035696821515888|86.06511817440915|
|9tbp   |257698037766|7           |Cafe Roma      |26468                  |US     |Glendale|33.621|-112.186|29.744970142397804|85.5393661001378 |
|9myc   |85899345926 |7           |Cafe Roma      |18952                  |US     |Wellton |32.673|-114.146|29.6301279405695

                                                                                

Saving the joined data

In [15]:
output_path = "/home/dany/code/epam/spark_task/output"
joined_df.write.mode("overwrite").parquet(output_path)


24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 01:13:50 WARN RowBasedKeyValueBatch: Calling spill() on