In [59]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import requests
from pyspark.sql.types import *
import time
from opencage.geocoder import OpenCageGeocode
import geohash2 as gh
import zipfile
import os

# Create a SparkSession

In [60]:
    # Create a SparkSession
spark = SparkSession.builder \
    .appName("Local ETL Job") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

In [61]:
spark

# 1. Restaurant data

In [4]:
path = 'restaurant_csv'
df = spark.read.option("mergeSchema", "true").option("recursiveFileLookup", "true").option("header", "true").csv(path)
df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657| -

In [5]:
df.count()

1997

In [6]:
#show where lat or lng is null
df.select("*").where(col("lat").isNull() | col("lng").isNull()).show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|null|null|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [7]:
# define a UDF to call OpenCage Geocoding API and retrieve latitude and longitude values
def geocode(city, country):
    geocoder = OpenCageGeocode('7397d2a67a214bbbbd50f3ca1cfd164e')
    query = city + ', ' + country
    results = geocoder.geocode(query)
    if results and 'geometry' in results[0]:
        lat = results[0]['geometry']['lat']
        lng = results[0]['geometry']['lng']
        return [lat, lng]
    else:
        return None
geocode_udf = udf(geocode, ArrayType(FloatType()))


In [8]:
# create a new dataframe with rows where lat or lng is null
df_null = df.filter(df['lat'].isNull() | df['lng'].isNull())
df_null.show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|null|null|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [9]:
# apply the geocode UDF to the new dataframe
df_null = df_null.withColumn('lat_lng', geocode_udf(df_null['city'], df_null['country']))

# update the original dataframe with the geocoded values
df = df.join(df_null.select('id', 'lat_lng'), 'id', 'left_outer') \
    .withColumn('lat', when(df['lat'].isNull(), df_null['lat_lng'][0]).otherwise(df['lat'])) \
    .withColumn('lng', when(df['lng'].isNull(), df_null['lat_lng'][1]).otherwise(df['lng'])) \
    .drop('lat_lng')


In [11]:
# create a new dataframe with rows where lat or lng is null
df_null = df.filter(df['lat'].isNull() | df['lng'].isNull())
df_null.show()

+---+------------+--------------+-----------------------+-------+----+---+---+
| id|franchise_id|franchise_name|restaurant_franchise_id|country|city|lat|lng|
+---+------------+--------------+-----------------------+-------+----+---+---+
+---+------------+--------------+-----------------------+-------+----+---+---+



In [12]:
#save in csv
df.write.csv('restaurant_geocoded.csv', header=True, mode='overwrite')

In [13]:
# read the geocoded dataframe from csv using the defined schema
df_geocoded = spark.read.csv("restaurant_geocoded.csv", header=True)

In [15]:
# change the type of the lat and lng columns to float
df_geocoded = df_geocoded.withColumn('lat', df_geocoded['lat'].cast(FloatType())) \
    .withColumn('lng', df_geocoded['lng'].cast(FloatType()))

In [16]:
# define a UDF to generate a geohash from latitude and longitude values
def geohash(lat, lng):
    return gh.encode(lat, lng, precision=4)
geohash_udf = udf(geohash)

In [17]:
# add a new column with the geohash values to the dataframe
df_geocoded = df_geocoded.withColumn('geohash', geohash_udf(df_geocoded['lat'], df_geocoded['lng']))

In [19]:
df_geocoded.where("id = 85899345920").show()

+-----------+------------+--------------+-----------------------+-------+------+--------+---------+-------+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|     lat|      lng|geohash|
+-----------+------------+--------------+-----------------------+-------+------+--------+---------+-------+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|   dnpe|
+-----------+------------+--------------+-----------------------+-------+------+--------+---------+-------+



# 2. Weather data

In [20]:
path = 'weather_dataset'
weather_df = spark.read.option("mergeSchema", "true").option("recursiveFileLookup", "true").parquet(path)

In [21]:
weather_df.show()

+--------+-------+----------+----------+----------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+--------+-------+----------+----------+----------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|
|-104.924|19.1866|      82.0|      27.8|2017-08-29|
|-104.876|19.1899|      82.0|      27.8|2017-08-29|
|-104.828|19.1932|      81.6|      27.6|2017-08-29|
| -104.78|19.1964|      81.6|      27.6|2017-08-29|
|-104.732|19.1997|      81.6|      27.6|2017-08-29|
|-104.684| 19.203|      77.8|      25.4|2017-08-29|
|-104.636|19

In [22]:
weather_df.count()

112394743

In [23]:
# add a new column with the geohash values to the dataframe
weather_df = weather_df.withColumn('geohash', geohash_udf(weather_df['lat'], weather_df['lng']))

In [24]:
weather_df.show()

+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+--------+-------+----------+----------+----------+-------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|   9e31|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|   9e31|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|   9e34|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|   9e34|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|   9e34|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|   9e34|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|   9e34|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|   9emm|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|   9emm|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|   9emm|
|-104.924|19.1866|      82.0|      27.8|2017-08-29|   9emm|
|-104.876|19.1899|      82.0|      27.8|2017-08-29|   9emm|
|-104.828|19.1932|      81.6|      27.6|2017-08-29|   9emm|
| -104.78|19.1964|      81.6|      27.6|

In [25]:
#show where geohash and wthr_date together duplicate
weather_df.groupBy('wthr_date', 'geohash').count().where('count > 1').show()

+----------+-------+-----+
| wthr_date|geohash|count|
+----------+-------+-----+
|2017-08-29|   9eqv|    8|
|2017-08-29|   9g6q|    7|
|2017-08-29|   9etf|   30|
|2017-08-29|   d58h|   29|
|2017-08-29|   d5zm|   26|
|2017-08-29|   9sp1|   26|
|2017-08-29|   9u38|   25|
|2017-08-29|   9sre|   26|
|2017-08-29|   9st3|   27|
|2017-08-29|   9swd|   28|
|2017-08-29|   dhvf|   27|
|2017-08-29|   9tj1|   27|
|2017-08-29|   9mpj|   27|
|2017-08-29|   9vmm|   28|
|2017-08-29|   9v80|   24|
|2017-08-29|   9mm9|   23|
|2017-08-29|   9vtf|   28|
|2017-08-29|   9ves|   24|
|2017-08-29|   9mrx|   25|
|2017-08-29|   9vv6|   18|
+----------+-------+-----+
only showing top 20 rows



In [26]:
#drop rows where geohash and wthr_date together duplicate
weather_df = weather_df.dropDuplicates(['wthr_date', 'geohash'])

In [27]:
weather_df.groupBy('wthr_date', 'geohash').count().where('count > 1').show()

+---------+-------+-----+
|wthr_date|geohash|count|
+---------+-------+-----+
+---------+-------+-----+



In [28]:
#get the size of the dataframe
weather_df.count()

31882677

In [29]:
# rename lat and lng columns to avoid ambiguity
weather_df = weather_df.withColumnRenamed('lat', 'lat_weather') \
    .withColumnRenamed('lng', 'lng_weather')

In [None]:
#save in parquet
weather_df.write.parquet('weather_cleaned_from_duplicates.parquet', mode='overwrite')

In [31]:
# read the cleaned weather dataframe from parquet
weather_df = spark.read.parquet("weather_cleaned_from_duplicates.parquet")

In [34]:
weather_df.show()

+-----------+-----------+----------+----------+----------+-------+
|lng_weather|lat_weather|avg_tmpr_f|avg_tmpr_c| wthr_date|geohash|
+-----------+-----------+----------+----------+----------+-------+
|   -101.238|    19.7128|      61.0|      16.1|2017-08-29|   9g80|
|   -71.6781|    18.4932|      78.7|      25.9|2017-08-29|   d7m1|
|   -104.629|    20.3924|      63.3|      17.4|2017-08-29|   9ets|
|   -88.2381|    20.6006|      82.5|      28.1|2017-08-29|   d59m|
|   -73.4394|    19.5544|      85.6|      29.8|2017-08-29|   d77z|
|   -81.2557|    25.1487|      84.3|      29.1|2017-08-29|   dhqp|
|   -101.569|    27.0982|      82.1|      27.8|2017-08-29|   9szf|
|    -110.47|    27.0939|      89.0|      31.7|2017-08-29|   9sc6|
|   -112.818|    27.2501|      89.6|      32.0|2017-08-29|   9kzg|
|    -103.66|    28.3193|      71.7|      22.1|2017-08-29|   9tn3|
|   -105.114|    28.3219|      73.8|      23.2|2017-08-29|   9tj3|
|   -102.302|    29.0287|      70.9|      21.6|2017-08-29|   9

In [36]:
weather_df.groupBy('geohash').count().where('count > 1').show()

+-------+-----+
|geohash|count|
+-------+-----+
|   dq3v|   92|
|   9qve|   92|
|   9y03|   92|
|   9y1v|   92|
|   drkj|   92|
|   9vtm|   92|
|   f0k0|   92|
|   dnsx|   92|
|   9q7g|   92|
|   c21v|   92|
|   9ru6|   92|
|   9yuh|   92|
|   9yhq|   92|
|   cbkb|   92|
|   f09w|   92|
|   dp78|   92|
|   9tsv|   92|
|   dq0g|   92|
|   d580|   92|
|   f01x|   92|
+-------+-----+
only showing top 20 rows



In [39]:
#drop lng_weather and lat_weather
weather_df = weather_df.drop('lng_weather', 'lat_weather')

# Join df_geocoded and weather_df

In [82]:
# Left-join weather and restaurant_geocoded data using the four-character geohash. Make sure to avoid data multiplication and keep your job idempotent
df_joined = df_geocoded.join(weather_df, 'geohash', 'left_outer')

In [83]:
df_joined.show()

+-------+------------+------------+--------------+-----------------------+-------+-------+------+-------+----------+----------+----------+
|geohash|          id|franchise_id|franchise_name|restaurant_franchise_id|country|   city|   lat|    lng|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+------------+------------+--------------+-----------------------+-------+-------+------+-------+----------+----------+----------+
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|      72.9|      22.7|2017-08-28|
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|      78.5|      25.8|2017-08-19|
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|      57.5|      14.2|2016-10-20|
|   9yt8|120259084300|          13| The Firehouse|                  59829|     US|Branson|36.633|-93.272|      72.6|      22.6|2016-10-19|
|   9yt8|120259084300|     

In [84]:
df_joined.count()

172591

In [85]:
#show where any column geohash, id,franchise_id,franchise_name,restaurant_franchise_id,country,city,lat,lng,avg_tmpr_f,avg_tmpr_c, wthr_date are null
df_joined.where(df_joined['geohash'].isNull() | df_joined['id'].isNull() | df_joined['franchise_id'].isNull() | df_joined['franchise_name'].isNull() | df_joined['restaurant_franchise_id'].isNull() | df_joined['country'].isNull() | df_joined['city'].isNull() | df_joined['lat'].isNull() | df_joined['lng'].isNull() | df_joined['avg_tmpr_f'].isNull() | df_joined['avg_tmpr_c'].isNull() | df_joined['wthr_date'].isNull()).count()


122

In [86]:
#drop Nan values
df_joined = df_joined.dropna()

In [87]:
#show where any column geohash, id,franchise_id,franchise_name,restaurant_franchise_id,country,city,lat,lng,avg_tmpr_f,avg_tmpr_c, wthr_date are null
df_joined.where(df_joined['geohash'].isNull() | df_joined['id'].isNull() | df_joined['franchise_id'].isNull() | df_joined['franchise_name'].isNull() | df_joined['restaurant_franchise_id'].isNull() | df_joined['country'].isNull() | df_joined['city'].isNull() | df_joined['lat'].isNull() | df_joined['lng'].isNull() | df_joined['avg_tmpr_f'].isNull() | df_joined['avg_tmpr_c'].isNull() | df_joined['wthr_date'].isNull()).count()


0

In [88]:
#group by wthr_date and geohash
df_joined.groupBy('wthr_date', 'geohash').count().where('count > 1').show()

+----------+-------+-----+
| wthr_date|geohash|count|
+----------+-------+-----+
|2017-08-21|   9wst|    3|
|2017-08-20|   9wst|    3|
|2017-08-01|   9wst|    3|
|2017-08-11|   9wst|    3|
|2016-10-14|   9wst|    3|
|2016-10-20|   9wst|    3|
|2017-09-09|   9wst|    3|
|2017-09-25|   9wst|    3|
|2017-08-17|   9wst|    3|
|2017-09-19|   9wst|    3|
|2016-10-09|   9wst|    3|
|2016-10-30|   9wst|    3|
|2017-09-08|   9wst|    3|
|2017-08-09|   9wst|    3|
|2017-09-01|   9wst|    3|
|2017-08-18|   9wst|    3|
|2017-09-17|   9wst|    3|
|2017-08-12|   9wst|    3|
|2016-10-18|   9wst|    3|
|2016-10-31|   9wst|    3|
+----------+-------+-----+
only showing top 20 rows



In [89]:
#show where geohash equals 9wst and wthr_date equals 2017-08-21
df_joined.where(df_joined['geohash'] == '9wst').where(df_joined['wthr_date'] == '2017-08-21').show()

+-------+-----------+------------+------------------+-----------------------+-------+-------+------+--------+----------+----------+----------+
|geohash|         id|franchise_id|    franchise_name|restaurant_franchise_id|country|   city|   lat|     lng|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+-----------+------------+------------------+-----------------------+-------+-------+------+--------+----------+----------+----------+
|   9wst|34359738375|           8|   The Green Olive|                  50333|     US|Alamosa|37.471| -105.86|      62.2|      16.8|2017-08-21|
|   9wst|17179869197|          14|The Gourmet Garden|                  10902|     US|Alamosa|37.481|-105.906|      62.2|      16.8|2017-08-21|
|   9wst|25769803786|          11|   The Flaming Wok|                  72230|     US|Alamosa|37.475|-105.895|      62.2|      16.8|2017-08-21|
+-------+-----------+------------+------------------+-----------------------+-------+-------+------+--------+----------+----------+----------+

In [90]:
#show where geohash equals dnpe
df_joined.where(df_joined['geohash'] == 'dnpe').show()

+-------+-----------+------------+--------------+-----------------------+-------+------+--------+---------+----------+----------+----------+
|geohash|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|     lat|      lng|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+-----------+------------+--------------+-----------------------+-------+------+--------+---------+----------+----------+----------+
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      82.3|      27.9|2017-08-23|
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      77.7|      25.4|2017-08-26|
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      73.0|      22.8|2017-09-13|
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      75.1|      23.9|2017-09-29|
|   dnpe|8589

In [91]:
df_joined.where(df_joined['geohash'] == 'dnpe').count()

184

In [93]:
#preserving data partitioning in the parquet format
df_joined.write.partitionBy('franchise_name').parquet('final_partitioned_WITHOUT_DROPPING.parquet', mode='overwrite')

In [94]:
#drop rows where geohash and wthr_date together duplicate
df_joined = df_joined.dropDuplicates(['wthr_date', 'geohash'])

In [95]:
df_joined.count()

62897

In [96]:
df_joined.where(df_joined['geohash'] == 'dnpe').count()

92

In [97]:
#show where geohash equals dnpe
df_joined.where(df_joined['geohash'] == 'dnpe').show()

+-------+-----------+------------+--------------+-----------------------+-------+------+--------+---------+----------+----------+----------+
|geohash|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|     lat|      lng|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+-----------+------------+--------------+-----------------------+-------+------+--------+---------+----------+----------+----------+
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      71.7|      22.1|2016-10-01|
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      70.0|      21.1|2016-10-02|
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      70.2|      21.2|2016-10-03|
|   dnpe|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|      68.9|      20.5|2016-10-04|
|   dnpe|8589

In [98]:
#preserving data partitioning in the parquet format.
df_joined.write.partitionBy('city').parquet('final_partitioned_DROPPED_DUPLICATES.parquet', mode='overwrite')