In [1]:
import os
import pandas as pd
import numpy as np
import pygeohash as gh
from functools import reduce

from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql.types import StructField, StructType, StringType, LongType, TimestampType, ShortType, DateType, DoubleType
from pyspark.sql.functions import col, when, row_number

from opencage.geocoder import OpenCageGeocode
from opencage.geocoder import InvalidInputError, RateLimitExceededError, UnknownError
from pprint import pprint

ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath("SparkJob.py"))))))
DATA_DIR = os.path.join(ROOT_DIR, "data")
DATA_HOTELS = os.path.join(DATA_DIR, "hotels")
DATA_WEATHER = os.path.join(DATA_DIR, "weather")

GEOCODER_KEY = os.getenv("GEOCODER_KEY", "8566ce4a944a43b49d3d81f8c4202719")


hotel_schema = StructType(
    [
        StructField("Id", StringType(), True),
        StructField("Name", StringType(), True),
        StructField("Country", StringType(), True),
        StructField("City", StringType(), True),
        StructField("Address", StringType(), True),
        StructField("Latitude", StringType(), True),
        StructField("Longitude", StringType(), True),
    ]
)

weather_schema = StructType(
    [
        StructField("lng", DoubleType(), True),
        StructField("lat", DoubleType(), True),
        StructField("avg_tmpr_f", DoubleType(), True),
        StructField("avg_tmpr_c", DoubleType(), True),
        StructField("wthr_date", StringType(), True),
        StructField("wthr_year", StringType(), True),
        StructField("wthr_month", StringType(), True),
        StructField("wthr_day", StringType(), True),
    ]
)


In [2]:
def initialize_Spark():
    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Simple etl job") \
        .getOrCreate()

    print("Spark Initialized", "\n")

    return spark

In [3]:
spark = initialize_Spark()

21/09/20 11:55:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark Initialized 



## Read hotel data using os

In [None]:
hotelsfiles = [os.path.join(DATA_HOTELS, f) for f in os.listdir(DATA_HOTELS) if os.path.isfile(os.path.join(DATA_HOTELS, f))]
hotelsfiles

In [None]:
# https://walkenho.github.io/merging-multiple-dataframes-in-pyspark/
# https://mungingdata.com/pyspark/union-unionbyname-merge-dataframes/

def loadDFWithSchema(spark, hotelfile):
    df = spark \
        .read \
        .format("csv") \
        .schema(hotel_schema) \
        .option("header", "true") \
        .load(hotelfile)

    return df

In [None]:
dfs = list()

for file in hotelsfiles:
    dfs.append(loadDFWithSchema(spark, file))

In [None]:
df_hotels = reduce(DataFrame.unionAll, dfs)

In [None]:
df_hotels.count()

## Read hotel data using spark

In [4]:
df_hotels = spark.read.csv(DATA_HOTELS + "/*.csv.gz", header=True)

## Work with hotels

In [None]:
df_hotels.count()

In [None]:
df_hotels.first(), df_hotels.first().Id, type(df_hotels.first().Id)

In [None]:
df_hotels.filter(col("Latitude").isNull() | col("Latitude").rlike("NA")).show(),
df_hotels.filter(col("Longitude").isNull() | col("Longitude").rlike("NA")).show()

## Geocoder

In [5]:
geocoder = OpenCageGeocode(GEOCODER_KEY)

In [None]:
results = geocoder.reverse_geocode(44.8303087, -0.5761911)
print(results)

In [None]:
# query = u'Bosutska ulica 10, Trnje, Zagreb, Croatia'
query = u'189 Swans Falls Rd, Fryeburg'
results = geocoder.geocode(query)

print(u'%f;%f;%s;%s' % (results[0]['geometry']['lat'],
                        results[0]['geometry']['lng'],
                        results[0]['components']['country_code'],
                        results[0]['annotations']['timezone']['name']))

## Fixing latitude

In [6]:
ids_lat = [row.Id for row in df_hotels.filter(col("Latitude").isNull() | col("Latitude").rlike("NA")).collect()]
len(ids_lat)

34

In [None]:
df_hotels.filter(col("Id") == ids_lat[0]).collect()[0].Address

In [7]:
def get_lat_by_row(row):
    row_dict = row.asDict()
    if row_dict.get("Id") in ids_lat:
        query = f'{row_dict.get("Address")}, {row_dict.get("City")}'
        results = geocoder.geocode(query)
        row_dict['Latitude'] = str(results[0]['geometry']['lat'])
        newrow = Row(**row_dict)
        return newrow
    return row



df_hotels_rdd = df_hotels.rdd
df_hotels_rdd_new = df_hotels_rdd.map(lambda row: get_lat_by_row(row))
df_hotels_lat = spark.createDataFrame(df_hotels_rdd_new)


# for id in ids_lat:
#     address = df_hotels.filter(col("Id") == id).collect()[0].Address
#     city = df_hotels.filter(col("Id") == id).collect()[0].City
#     print(df_hotels.filter(col("Id") == id).collect()[0].Latitude)
#     df_hotels = df_hotels.withColumn(
#         "Latitude",
#         when(
#             col("Id") == id,
#             get_lat_by_address(address, city)
#         ).otherwise("Latitude")
#     )
#     print(df_hotels.filter(col("Id") == id).collect()[0].Latitude)


                                                                                

In [None]:
df_hotels_lat.filter(col("Latitude").isNull() | col("Latitude").rlike("NA")).show()

## Change Latitude using Pandas

In [None]:
pandas_df = df.toPandas()


In [None]:
pandas_df.head(5)

In [None]:
nullids = list(pandas_df[(pandas_df['Latitude'].isnull()) | (pandas_df['Latitude'] == 'NA')].Id)
print(nullids)
pandas_df[(pandas_df['Latitude'].isnull()) | (pandas_df['Latitude'] == 'NA')]

In [None]:
def get_lat_by_address(address, city):
    query = f'{address}, {city}'
    results = geocoder.geocode(query)
    # print(query, str(results[0]['geometry']['lat']))
    return str(results[0]['geometry']['lat'])

nullLatitudeIds = list(pandas_df[(pandas_df['Latitude'].isnull()) | (pandas_df['Latitude'] == 'NA')].Id)

for id in nullLatitudeIds:
    address = pandas_df.loc[pandas_df.Id == id, 'Address'].values[0]
    city = pandas_df.loc[pandas_df.Id == id, 'City'].values[0]
    pandas_df.loc[pandas_df.Id == id, 'Latitude'] = get_lat_by_address(address, city)

## Fixing longitude

In [8]:
ids_lng = [row.Id for row in df_hotels_lat.filter(col("Longitude").isNull() | col("Longitude").rlike("NA")).collect()]
len(ids_lng)

                                                                                

34

In [None]:
df_hotels_lat.filter(col("Id") == ids_lng[0]).collect()[0].Address

In [9]:
def get_lng_by_address(row):
    row_dict = row.asDict()
    if row_dict.get("Id") in ids_lng:
        query = f'{row_dict.get("Address")}, {row_dict.get("City")}'
        results = geocoder.geocode(query)
        row_dict['Longitude'] = str(results[0]['geometry']['lng'])
        newrow = Row(**row_dict)
        return newrow
    return row



df_hotels_lat_rdd = df_hotels_lat.rdd
df_hotels_lat_lng_rdd_new = df_hotels_lat_rdd.map(lambda row: get_lng_by_address(row))
df_hotels_new = spark.createDataFrame(df_hotels_lat_lng_rdd_new)

# for id in ids_lng:
#     address = df_hotels.filter(col("Id") == id).collect()[0].Address
#     city = df_hotels.filter(col("Id") == id).collect()[0].City
#     print(df_hotels.filter(col("Id") == id).collect()[0].Longitude)
#     df_hotels = df_hotels.withColumn(
#         "Longitude",
#         when(
#             col("Id") == id,
#             get_lng_by_address(address, city)
#         ).otherwise("NA")
#     )
#     print(df_hotels.filter(col("Id") == id).collect()[0].Longitude)


                                                                                

In [None]:
df_hotels_new.filter(col("Longitude").isNull() | col("Longitude").rlike("NA")).show()

## Change Longitude using Pandas

In [None]:
def get_lng_by_address(address, city):
    query = f'{address}, {city}'
    results = geocoder.geocode(query)
    print(query, str(results[0]['geometry']['lng']))
    return str(results[0]['geometry']['lng'])

nullLongitudeIds = list(pandas_df[(pandas_df['Longitude'].isnull()) | (pandas_df['Longitude'] == 'NA')].Id)

for id in nullLongitudeIds:
    address = pandas_df.loc[pandas_df.Id == id, 'Address'].values[0]
    city = pandas_df.loc[pandas_df.Id == id, 'City'].values[0]
    pandas_df.loc[pandas_df.Id == id, 'Longitude'] = get_lng_by_address(address, city)

In [None]:
# RESULT
pandas_df[(pandas_df['Latitude'].isnull()) | (pandas_df['Latitude'] == 'NA')],\
pandas_df[(pandas_df['Longitude'].isnull()) | (pandas_df['Longitude'] == 'NA')]

## GEOHASH

## Pandas

In [None]:
pandas_df['Geohash'] = pandas_df.apply(lambda x: gh.encode(float(x.Latitude), float(x.Longitude), precision=4), axis=1)
pandas_df.head(5)

In [None]:
new_df = spark.createDataFrame(pandas_df)

In [None]:
new_df.filter(col("Latitude").isNull()).show(), new_df.filter(col("Longitude").isNull()).show()


## PySpark

In [10]:
def geohash_function(row):
    row_dict = row.asDict()
    row_dict['Geohash'] = gh.encode(float(row_dict.get("Latitude")), float(row_dict.get("Longitude")), precision=4)
    newrow = Row(**row_dict)
    return newrow


In [11]:
df_hotels_new_rdd = df_hotels_new.rdd
df_hotels_rdd_new = df_hotels_new_rdd.map(lambda row: geohash_function(row))
df_hotels_geohash = spark.createDataFrame(df_hotels_rdd_new)

                                                                                

In [None]:
df_hotels_geohash.show(5)

In [None]:
df_hotels_geohash.filter(col("Latitude").isNull() | col("Latitude").rlike("NA")).show(),
df_hotels_geohash.filter(col("Longitude").isNull() | col("Longitude").rlike("NA")).show()

## Start work with weather

## Weather import files using os

In [None]:
weatherfiles = list()

ignorelist = ['.DS_Store',]

for root, d_names, f_names in os.walk(DATA_WEATHER):
    for f in f_names:
        if f not in ignorelist and 'crc' not in f:
            weatherfiles.append(os.path.join(root, f))

# 553
len(weatherfiles)

In [None]:
dfs_weather_list = list()

for file in weatherfiles:
    dfs_weather_list.append(spark.read.parquet(file))

len(dfs_weather_list)

In [None]:
df_weather = reduce(DataFrame.unionAll, dfs_weather_list)

In [None]:
df_weather.count()

## Weather import files using spark

In [12]:
df_weather = spark.read.parquet(DATA_WEATHER, header=True)

In [None]:
df_weather.count()

In [None]:
df_weather.show(4)

## Geohash weather

In [13]:
def geohash_function(row):
    row_dict = row.asDict()
    row_dict['Geohash'] = gh.encode(float(row_dict.get("lat")), float(row_dict.get("lng")), precision=4)
    newrow = Row(**row_dict)
    return newrow

df_weather_rdd = df_weather.rdd
df_weather_rdd_new = df_weather_rdd.map(lambda row: geohash_function(row))
df_weather_geohash = spark.createDataFrame(df_weather_rdd_new)

                                                                                

In [None]:
df_weather_geohash.show(3)

## Filtering weather df using geohash in hotels df

In [14]:
df_hotels_geohash_list = [str(row['Geohash']) for row in df_hotels_geohash.collect()]

                                                                                

In [None]:
len(df_hotels_geohash_list), df_hotels_geohash.count()

In [15]:
df_weather_geohash_filter = df_weather_geohash.filter(df_weather_geohash.Geohash.isin(df_hotels_geohash_list))

In [None]:
df_weather_geohash_filter.count()

In [None]:
df_weather_geohash_filter.show(3)

## Join dfs - weather filtering by hotels' geohash

### Inner Join

In [16]:
result_df_inner = df_hotels_geohash.join(df_weather_geohash_filter, ["Geohash"])

In [17]:
result_df_inner.count()

                                                                                

2344623

In [17]:
result_df_inner.show(21)



+-------+----------+-------------------+-------+-------+-------------+---------+-----------+--------+-------+----------+----------+----------+----+-----+---+
|Geohash|        Id|               Name|Country|   City|      Address| Latitude|  Longitude|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|
+-------+----------+-------------------+-------+-------+-------------+---------+-----------+--------+-------+----------+----------+----------+----+-----+---+
|   9r2z|8589934594|Holiday Inn Express|     US|Ashland|555 Clover Ln|42.183544|-122.663345|-122.582|42.0163|      73.1|      22.8|2017-08-29|2017|    8| 29|
|   9r2z|8589934594|Holiday Inn Express|     US|Ashland|555 Clover Ln|42.183544|-122.663345|-122.524|42.0251|      73.6|      23.1|2017-08-29|2017|    8| 29|
|   9r2z|8589934594|Holiday Inn Express|     US|Ashland|555 Clover Ln|42.183544|-122.663345|-122.467|42.0339|      75.3|      24.1|2017-08-29|2017|    8| 29|
|   9r2z|8589934594|Holiday Inn Express|     US|Ashl

                                                                                

### Left Join

In [None]:
result_df_lj_f = df_hotels_geohash.join(df_weather_geohash_filter, ["Geohash"], how='left_outer')

In [None]:
result_df_lj_f.count()

In [None]:
result_df_lj_f.show(3)

### Difference between result_df_inner_f & result_df_lj_f

In [None]:
df_dif = result_df_lj_f.subtract(result_df_inner_f)
df_dif.show(10)

## Join dfs - without filtering

join(self, other, on=None, how=None)

* param other: Right side of the join
* param on: a string for the join column name
* param how: default inner. Must be one of inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti.

### Inner Join

In [None]:
result_df_inner = df_hotels_geohash.join(df_weather_geohash, ["Geohash"])

In [None]:
result_df_inner.count()

In [None]:
result_df_inner.show(3)

### Left Join

In [None]:
result_df_lj = df_hotels_geohash.join(df_weather_geohash, ["Geohash"], how='left_outer')

In [None]:
result_df_lj.count()

In [None]:
result_df_lj.show(3)


### Difference between result_df_inner & result_df_lj

In [None]:
df_dif = result_df_lj.subtract(result_df_inner)
df_dif.show(10)