In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
data = spark.read.csv('/Users/ablaikhann/Desktop/restaurant_csv', header=True)

In [7]:
data.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|     lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+--------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578| -87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|   2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City|44.080|-103.250|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213|  16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495|  -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657| -

In [8]:
from pyspark.sql.functions import concat, col, when, udf
from pyspark.sql.types import DoubleType, StringType

data = data.withColumn("lat",col("lat").cast('double'))
data = data.withColumn("lng",col("lng").cast('double'))


In [9]:
! pip3 install opencage >> /dev/null

In [10]:
from opencage.geocoder import OpenCageGeocode
key = 'here is my token'
geocoder = OpenCageGeocode(key)

In [11]:
def fill_null_lat_lng(country, city, method='lat'):
    address = f"{city}, {country}"  
    response = geocoder.geocode(address)
    if response and 'geometry' in response[0]:
        return response[0]['geometry'].get(method)  
    else:
        return None

In [12]:
# unittesting for geocoding

assert abs(fill_null_lat_lng('Kazakhstan', 'Pavlodar') - 52.287303) <= 1 # arbitrary known address
assert abs(fill_null_lat_lng('Kazakhstan', 'Pavlodar', 'lng') - 76.967402) <= 1  # arbitrary known address
assert abs(fill_null_lat_lng('Us', 'Rapid City', 'lat') - 44.080) <= 1  # address from dataset
assert abs(fill_null_lat_lng('Us', 'Rapid City', 'lng') - -103.250) <= 1  # address from dataset


NotAuthorizedError: Your API key is not authorized. You may have entered it incorrectly.

In [29]:
fill_null_lat_lng_udf = udf(fill_null_lat_lng, DoubleType())

In [30]:
data.filter(col('lat').isNull() | col('lng').isNull()).show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



From the above it is seen that there is only one entity with Nulls

In [31]:
data = data.withColumn('lat', 
                       when(col('lat').isNull(), fill_null_lat_lng_udf(col('country'), col('city'), 'lat'))
                       .otherwise(col('lat')))

data = data.withColumn('lng', 
                       when(col('lng').isNull(), fill_null_lat_lng_udf(col('country'), col('city'), 'lng'))
                       .otherwise(col('lng')))

In [32]:
data.filter(col('lat').isNull() | col('lng').isNull()).show()


+---+------------+--------------+-----------------------+-------+----+---+---+
| id|franchise_id|franchise_name|restaurant_franchise_id|country|city|lat|lng|
+---+------------+--------------+-----------------------+-------+----+---+---+
+---+------------+--------------+-----------------------+-------+----+---+---+



After mapping address to lat and lng, we have removed all Nulls

In [34]:
! pip install python-geohash  >> /dev/null

In [36]:
import geohash

def generate_geohash(lat, lon):
    return geohash.encode(lat, lon, precision=4) 
generate_geohash_udf = udf(generate_geohash, StringType())

In [66]:
# Geohashing unit testing

assert generate_geohash(52.287303, 76.967402) == 'v9q9' # an arbitrary example
assert generate_geohash(0, 0) == 's000' # 0 case
assert generate_geohash(-90, 90) == 'n000' # edge case
assert generate_geohash(34.578, -87.021) == 'dn4h' # an arbitrary case from data, checked manually


In [39]:
data = data.withColumn('geohash', generate_geohash_udf(col('lat'), col('lng')))


In [40]:
data.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|   dn4h|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|   u09t|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|   9xyd|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|   u2ed|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|   gcpu|
| 68719476763|          28|    The Spicy Pickle|

In [44]:
weather = spark.read.format('parquet').option("header","true").option("inferSchema","true") \
  .load("./volume/weather/*")

In [45]:
weather.show()

+--------+-------+----------+----------+----------+-----+---+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|month|day|
+--------+-------+----------+----------+----------+-----+---+
|-111.202|18.7496|      81.1|      27.3|2016-10-05|   10|  5|
|-111.155| 18.755|      81.2|      27.3|2016-10-05|   10|  5|
|-111.107|18.7604|      81.1|      27.3|2016-10-05|   10|  5|
|-111.059|18.7657|      81.0|      27.2|2016-10-05|   10|  5|
|-111.012|18.7711|      80.9|      27.2|2016-10-05|   10|  5|
|-110.964|18.7764|      80.9|      27.2|2016-10-05|   10|  5|
|-110.916|18.7818|      80.8|      27.1|2016-10-05|   10|  5|
|-110.869|18.7871|      80.9|      27.2|2016-10-05|   10|  5|
|-110.821|18.7924|      81.1|      27.3|2016-10-05|   10|  5|
|-110.773|18.7977|      81.3|      27.4|2016-10-05|   10|  5|
|-110.726|18.8029|      81.4|      27.4|2016-10-05|   10|  5|
|-105.221|19.3026|      83.1|      28.4|2016-10-05|   10|  5|
|-105.173| 19.306|      83.0|      28.3|2016-10-05|   10|  5|
|-105.12

In [46]:
weather = weather.withColumn('geohash', generate_geohash_udf(col('lat'), col('lng')))
weather.show()

+--------+-------+----------+----------+----------+-----+---+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|month|day|geohash|
+--------+-------+----------+----------+----------+-----+---+-------+
|-111.202|18.7496|      81.1|      27.3|2016-10-05|   10|  5|   9e2f|
|-111.155| 18.755|      81.2|      27.3|2016-10-05|   10|  5|   9e2f|
|-111.107|18.7604|      81.1|      27.3|2016-10-05|   10|  5|   9e2f|
|-111.059|18.7657|      81.0|      27.2|2016-10-05|   10|  5|   9e34|
|-111.012|18.7711|      80.9|      27.2|2016-10-05|   10|  5|   9e34|
|-110.964|18.7764|      80.9|      27.2|2016-10-05|   10|  5|   9e34|
|-110.916|18.7818|      80.8|      27.1|2016-10-05|   10|  5|   9e34|
|-110.869|18.7871|      80.9|      27.2|2016-10-05|   10|  5|   9e34|
|-110.821|18.7924|      81.1|      27.3|2016-10-05|   10|  5|   9e34|
|-110.773|18.7977|      81.3|      27.4|2016-10-05|   10|  5|   9e34|
|-110.726|18.8029|      81.4|      27.4|2016-10-05|   10|  5|   9e36|
|-105.221|19.3026|  

In [83]:
from pyspark.sql import functions as F
# to avoid multiplication, I simply average temperature in every geohash at the given date
weather_aggregated = weather.groupBy('geohash', 'wthr_date').agg(
   F.mean('avg_tmpr_f').alias('avg_tmpr_f'),
   F.mean('avg_tmpr_c').alias('avg_tmpr_c')
)
enriched_data = data.join(weather_aggregated, ['geohash'], 'left')
enriched_data.show()

+-------+------------+------------+---------------+-----------------------+-------+-----------+------+-------+----------+------------------+------------------+
|geohash|          id|franchise_id| franchise_name|restaurant_franchise_id|country|       city|   lat|    lng| wthr_date|        avg_tmpr_f|        avg_tmpr_c|
+-------+------------+------------+---------------+-----------------------+-------+-----------+------+-------+----------+------------------+------------------+
|   9vff|206158430215|           8|The Green Olive|                  53370|     US|Haltom City|32.789| -97.28|2016-10-01| 71.31071428571428|21.828571428571426|
|   9vff|206158430215|           8|The Green Olive|                  53370|     US|Haltom City|32.789| -97.28|2016-10-02| 72.00000000000001|            22.225|
|   9vff|206158430215|           8|The Green Olive|                  53370|     US|Haltom City|32.789| -97.28|2016-10-05| 82.01071428571429|27.789285714285718|
|   9vff|206158430215|           8|The G

In [55]:
enriched_data.write.mode('overwrite').partitionBy('wthr_date').parquet("./volume/data_enriched")

In [75]:
data.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|   dn4h|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|   u09t|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|   9xyd|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|   u2ed|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|   gcpu|
| 68719476763|          28|    The Spicy Pickle|