In [162]:
import os

import requests
import pygeohash as gh
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType

from utils import count_nulls

In [44]:
spark = SparkSession.builder.master("local[*]").appName("SparkPractice").getOrCreate()

In [45]:
spark

In [32]:
csv_dir = "data/restaurant_csv"
csv_files = [os.path.join(csv_dir, f) for f in os.listdir(csv_dir) if f.endswith('.csv')]

In [33]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(csv_files)

In [34]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- franchise_id: string (nullable = true)
 |-- franchise_name: string (nullable = true)
 |-- restaurant_franchise_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)



In [35]:
df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657| -

In [36]:
df.count()

1997

In [61]:
#select specific null values in columns

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
          ).show()

+---+------------+--------------+-----------------------+-------+----+---+---+
| id|franchise_id|franchise_name|restaurant_franchise_id|country|city|lat|lng|
+---+------------+--------------+-----------------------+-------+----+---+---+
|  0|           0|             0|                      0|      0|   0|  1|  1|
+---+------------+--------------+-----------------------+-------+----+---+---+



In [69]:
null_coordinates = df.filter("lat is null OR lng is null")

In [71]:
null_coordinates.show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|null|null|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [111]:
for row in null_coordinates.collect():
    url = f"https://api.opencagedata.com/geocode/v1/json?q={row['country'], row['city']}&key=693980bb02594cbdb36dbe5af4d8ee1c"
    request = requests.get(url)
    response = request.json()
    geometry = response['results'][0]['geometry']
    lat = geometry.get('lat')
    lng = geometry.get('lng')

    df = df.na.fill({'lat': str(lat), 'lng': str(lng)})

In [112]:
df.filter('lat is null OR lng is null').show()

+---+------------+--------------+-----------------------+-------+----+---+---+
| id|franchise_id|franchise_name|restaurant_franchise_id|country|city|lat|lng|
+---+------------+--------------+-----------------------+-------+----+---+---+
+---+------------+--------------+-----------------------+-------+----+---+---+



In [132]:
df = df.withColumn("lat", df["lat"].cast(FloatType()))

In [133]:
df = df.withColumn("lng", df["lng"].cast(FloatType()))

In [134]:
def geohash(lat, lng):
    return gh.encode(lat, lng, precision=4)


geohash_udf = udf(geohash)

In [137]:
df = df.withColumn('geohash', geohash_udf(df['lat'], df['lng']))

In [138]:
df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|   dn4h|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|   u09t|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|   9xyd|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|   u2ed|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|   gcpu|
| 68719476763|          28|    The Spicy Pickle|

## 2. Weather Data Exploration

In [122]:
path = 'data/weather_dataset'
weather_df = spark.read.option("mergeSchema", "true").option("recursiveFileLookup", "true").parquet(path)

In [123]:
weather_df.printSchema()

root
 |-- lng: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- avg_tmpr_f: double (nullable = true)
 |-- avg_tmpr_c: double (nullable = true)
 |-- wthr_date: string (nullable = true)



In [124]:
weather_df.show()

+--------+-------+----------+----------+----------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+--------+-------+----------+----------+----------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|
|-104.924|19.1866|      82.0|      27.8|2017-08-29|
|-104.876|19.1899|      82.0|      27.8|2017-08-29|
|-104.828|19.1932|      81.6|      27.6|2017-08-29|
| -104.78|19.1964|      81.6|      27.6|2017-08-29|
|-104.732|19.1997|      81.6|      27.6|2017-08-29|
|-104.684| 19.203|      77.8|      25.4|2017-08-29|
|-104.636|19

In [125]:
weather_df.count()

112394743

In [126]:
count_nulls(weather_df)

Unnamed: 0,count
lng,0
lat,0
avg_tmpr_f,0
avg_tmpr_c,0


In [139]:
weather_df = weather_df.withColumn('geohash', geohash_udf(weather_df['lat'], weather_df['lng']))

In [140]:
weather_df.show()

+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|   9e31|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|   9e31|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|   9e34|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|   9e34|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|   9e34|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|   9e34|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|   9e34|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|   9emm|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|   9emm|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|   9emm|
|-104.924|19.1866|      82.0|      27.8|2017-08-29|   9emm|
|-104.876|19.1899|      82.0|      27.8|2017-08-29|   9emm|
|-104.828|19.1932|      81.6|      27.6|2017-08-29|   9emm|
| -104.78|19.1964|      81.6|      27.6|

In [141]:
df.groupby(['geohash']) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False) \
    .show()

+-------+-----+
|geohash|count|
+-------+-----+
|   gcpv|  208|
|   u09t|  182|
|   sp3e|  171|
|   u09w|  169|
|   u0nd|  122|
|   u2ed|  116|
|   gcpu|  102|
|   u173|   61|
|   u179|    8|
|   u0n6|    8|
|   u176|    7|
|   u10j|    6|
|   dqcm|    5|
|   9mud|    5|
|   u2e9|    5|
|   9vrf|    4|
|   dr5r|    4|
|   dp0w|    4|
|   9r0y|    4|
|   9qh0|    4|
+-------+-----+
only showing top 20 rows



In [147]:
#drop duplicates in weather_df based on geohash and wthr_date
weather_df = weather_df.dropDuplicates(['geohash', 'wthr_date'])

In [148]:
weather_df.count()

31882677

In [157]:
#rename weather lat and long columns to weather_lat and weather_lng
weather_df = weather_df.withColumnRenamed('lat', 'weather_lat')
weather_df = weather_df.withColumnRenamed('lng', 'weather_lng')

In [158]:
#Left-join weather and restaurant data using the four-character geohash
joined_df = df.join(weather_df, on=['geohash'], how='left')

In [159]:
joined_df.show()

+-------+------------+------------+--------------+-----------------------+-------+-------+------+-------+-----------+-----------+----------+----------+----------+
|geohash|          id|franchise_id|franchise_name|restaurant_franchise_id|country|   city|   lat|    lng|weather_lng|weather_lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+------------+------------+--------------+-----------------------+-------+-------+------+-------+-----------+-----------+----------+----------+----------+
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|   -93.5004|     36.606|      78.5|      25.8|2017-09-22|
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|   -93.5004|     36.606|      76.9|      24.9|2017-08-15|
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|   -93.4969|    36.5629|      62.7|      17.1|2016-10-10|
|   9yt8|120259084300|

In [160]:
joined_df.count()

172591

In [161]:
#Store the enriched data in the local file system, preserving data partitioning in the parquet format.
joined_df.write.mode("overwrite").parquet("enriched_data.parquet")