In [1]:
from pyspark.sql import SparkSession, DataFrame, functions
import pandas as pd
import os
import sys 
import requests 

exampleDir = os.path.join(os.environ["SPARK_HOME"], "jars")
exampleJars = []
for x in os.listdir(exampleDir):
    if x in ["azure-storage-blob-11.0.1.jar","azure-storage-8.6.6.jar", "hadoop-azure-3.3.4.jar"]:
        exampleJars.append(os.path.join(exampleDir, x))
# Set up Spark session
print(exampleJars)
spark = SparkSession.builder.config("spark.jars", ",".join(exampleJars))\
                           .master("local[*]").getOrCreate() 
# Set up Azure Blob Storage credentials
spark.conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.conf.set("fs.azure.account.auth.type.bd201stacc.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.bd201stacc.dfs.core.windows.net", 
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.bd201stacc.dfs.core.windows.net", "f3905ff9-16d4-43ac-9011-842b661d556d")
spark.conf.set("fs.azure.account.oauth2.client.secret.bd201stacc.dfs.core.windows.net", "yTw8Q~-PZUWV.fRyihGEY16i51HK0yoCbRvyUaXV")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.bd201stacc.dfs.core.windows.net", 
"https://login.microsoftonline.com/b41b72d0-4e9f-4c26-8a69-f949f367c91d/oauth2/token")
spark.conf.set("fs.azure.sas.m06sparkbasics.bd201stacc.dfs.core.windows.net",
"https://bd201stacc.blob.core.windows.net/m06sparkbasics?sv=2020-02-10&ss=bfqt&srt=sco&sp=rlx&se=2031-04-08T17:32:24Z&st=2021-04-08T09:32:24Z&spr=https&sig=5An75VCvK%2FCfuPiiWf8knAHhrMNIR%2BE37oUx3b%2FLUQc%3D") 
   

# Read data from the container
hdata = spark.read.format("csv")\
    .option("header", "true") \
    .option("compression", "gzip") \
         .load('abfs://m06sparkbasics@bd201stacc.dfs.core.windows.net/hotels')

wdata = spark.read.format("parquet")\
    .option("header", "true") \
         .load('abfs://m06sparkbasics@bd201stacc.dfs.core.windows.net/weather/year=2017/month=08/day=01')



['C:\\spark\\jars\\azure-storage-8.6.6.jar', 'C:\\spark\\jars\\azure-storage-blob-11.0.1.jar', 'C:\\spark\\jars\\hadoop-azure-3.3.4.jar']
(2494, 7)
(1230518, 5)


In [2]:
hdata.show()

+------------+--------------------+-------+---------------+--------------------+----------+------------+
|          Id|                Name|Country|           City|             Address|  Latitude|   Longitude|
+------------+--------------------+-------+---------------+--------------------+----------+------------+
|           2|Parkside Inn At I...|     US|Incline Village|1003 Tahoe Boulev...| 39.244493| -119.936437|
|  8589934592|      Cadillac Motel|     US|     Brandywine|     16101 Crain Hwy|  38.66893|   -76.87629|
| 17179869184|  Days Inn Brookings|     US|      Brookings|         2500 6th St|  44.31141|   -96.76286|
| 17179869187|             Motel 6|     US|       Grayling|     6843 W M 72 Hwy| 44.657326|   -84.74439|
| 25769803780|      Carleton Hotel|     US|       Carleton|       927 Monroe St|  42.05927|   -83.39095|
| 42949672960|Americana Resort ...|     US|         Dillon|         135 Main St|      null|        null|
| 42949672965|Comfort Inn Delan...|     US|         Del

In [13]:
print((hdata.count(), len(hdata.columns)))

(2494, 7)


In [3]:
wdata.show()

+--------+-------+----------+----------+----------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|
+--------+-------+----------+----------+----------+
|-104.423|21.5478|      73.6|      23.1|2017-08-01|
|-104.374| 21.551|      72.6|      22.6|2017-08-01|
|-104.325|21.5541|      71.7|      22.1|2017-08-01|
|-104.276|21.5573|      70.9|      21.6|2017-08-01|
|-104.227|21.5604|      70.5|      21.4|2017-08-01|
|-104.178|21.5635|      70.1|      21.2|2017-08-01|
|-104.129|21.5665|      69.8|      21.0|2017-08-01|
| -104.08|21.5696|      69.5|      20.8|2017-08-01|
|-104.032|21.5726|      69.3|      20.7|2017-08-01|
|-103.983|21.5757|      71.8|      22.1|2017-08-01|
|-103.934|21.5787|      74.5|      23.6|2017-08-01|
|-103.885|21.5817|      76.5|      24.7|2017-08-01|
|-103.836|21.5846|      77.2|      25.1|2017-08-01|
|-103.787|21.5876|      77.9|      25.5|2017-08-01|
|-103.738|21.5905|      75.2|      24.0|2017-08-01|
|-103.689|21.5935|      70.5|      21.4|2017-08-01|
| -103.64|21

In [14]:
print((wdata.count(), len(wdata.columns)))

(1230518, 5)


In [4]:
null_data = hdata.filter((functions.col("Latitude").isNull()) | (functions.col("Longitude").isNull()) 
| (functions.col("Latitude") == 'NA') | (functions.col("Longitude") == 'NA'))

In [5]:
null_data.show()

+-------------+--------------------+-------+-----------+--------------------+--------+---------+
|           Id|                Name|Country|       City|             Address|Latitude|Longitude|
+-------------+--------------------+-------+-----------+--------------------+--------+---------+
|  42949672960|Americana Resort ...|     US|     Dillon|         135 Main St|    null|     null|
|  60129542147|Ubaa Old Crawford...|     US|Des Plaines|     5460 N River Rd|    null|     null|
| 455266533383|        Busy B Ranch|     US|  Jefferson|  1100 W Prospect Rd|    null|     null|
|1108101562370|             Motel 6|     US|   Rockport|       106 W 11th St|    null|     null|
|1382979469315|           La Quinta|     US| Twin Falls|    539 Pole Line Rd|    null|     null|
|1975684956168| Hotel Daniel Vienna|     AT|     Vienna|Landstra er G rte...|      NA|       NA|
|2439541424130|Fleming s Selecti...|     AT|     Vienna|Josefst dter Stra...|      NA|       NA|
|2465311227906|Cordial Theater

In [6]:
if not null_data.rdd.isEmpty():
    api_key = "952d0417ad6a4f4d9f1f4dfa579f0c03"
    base_url = "https://api.opencagedata.com/geocode/v1/json"
    for row in null_data.collect():
        address = row["Address"]
        city = row["City"]
        country = row["Country"]
        url = f"{base_url}?q={address},{city},{country}&key={api_key}"
        response = requests.get(url)
        if response.status_code == 200:
            geodata = response.json()
            if len(geodata["results"]) > 0:
                latitude = geodata["results"][0]["geometry"]["lat"]
                longitude = geodata["results"][0]["geometry"]["lng"]
                hdata = hdata.withColumn("Latitude", functions.when(functions.col("Address") == address, latitude)\
                .otherwise(functions.col("Latitude")))
                hdata = hdata.withColumn("Longitude", functions.when(functions.col("Address") == address, longitude)\
                .otherwise(functions.col("Longitude")))
        else:
            print(f"Error mapping latitude and longitude for {address}, {city}, {country}: {response.status_code}")

In [7]:
new_null_data = hdata.filter((functions.col("Latitude").isNull()) | (functions.col("Longitude").isNull())
 | (functions.col("Latitude") == 'NA') | (functions.col("Longitude") == 'NA'))
new_null_data.show()

+---+----+-------+----+-------+--------+---------+
| Id|Name|Country|City|Address|Latitude|Longitude|
+---+----+-------+----+-------+--------+---------+
+---+----+-------+----+-------+--------+---------+



In [8]:
import pygeohash as pgh
def geo_l(x, y):
    geo = []
    for i in range(len(x)):
        geo.append(pgh.encode(float(x[i]), float(y[i]), 4))
    return geo


In [9]:
hgeo = hdata.toPandas()
wgeo = wdata.toPandas()

x = hgeo['Latitude']
y = hgeo['Longitude']

hgeo['Geohash'] = pd.Series(geo_l(x,y)).values
hgeo = spark.createDataFrame(hgeo)

x = wgeo['lat']
y = wgeo['lng']

wgeo['Geohash'] = pd.Series(geo_l(x,y)).values
wgeo = spark.createDataFrame(wgeo)


In [10]:
hgeo.show()

+------------+--------------------+-------+---------------+--------------------+----------+------------+-------+
|          Id|                Name|Country|           City|             Address|  Latitude|   Longitude|Geohash|
+------------+--------------------+-------+---------------+--------------------+----------+------------+-------+
|           2|Parkside Inn At I...|     US|Incline Village|1003 Tahoe Boulev...| 39.244493| -119.936437|   9qfx|
|  8589934592|      Cadillac Motel|     US|     Brandywine|     16101 Crain Hwy|  38.66893|   -76.87629|   dqc7|
| 17179869184|  Days Inn Brookings|     US|      Brookings|         2500 6th St|  44.31141|   -96.76286|   9zgh|
| 17179869187|             Motel 6|     US|       Grayling|     6843 W M 72 Hwy| 44.657326|   -84.74439|   dpgw|
| 25769803780|      Carleton Hotel|     US|       Carleton|       927 Monroe St|  42.05927|   -83.39095|   dpkx|
| 42949672960|Americana Resort ...|     US|         Dillon|         135 Main St|39.6286685|-106.

In [15]:
print((hgeo.count(), len(hgeo.columns)))

(2494, 8)


In [11]:
wgeo.show()

+--------+-------+----------+----------+----------+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|Geohash|
+--------+-------+----------+----------+----------+-------+
|-104.423|21.5478|      73.6|      23.1|2017-08-01|   9evd|
|-104.374| 21.551|      72.6|      22.6|2017-08-01|   9evf|
|-104.325|21.5541|      71.7|      22.1|2017-08-01|   9evf|
|-104.276|21.5573|      70.9|      21.6|2017-08-01|   9evf|
|-104.227|21.5604|      70.5|      21.4|2017-08-01|   9evf|
|-104.178|21.5635|      70.1|      21.2|2017-08-01|   9evf|
|-104.129|21.5665|      69.8|      21.0|2017-08-01|   9evf|
| -104.08|21.5696|      69.5|      20.8|2017-08-01|   9evf|
|-104.032|21.5726|      69.3|      20.7|2017-08-01|   9ey4|
|-103.983|21.5757|      71.8|      22.1|2017-08-01|   9ey4|
|-103.934|21.5787|      74.5|      23.6|2017-08-01|   9ey4|
|-103.885|21.5817|      76.5|      24.7|2017-08-01|   9ey4|
|-103.836|21.5846|      77.2|      25.1|2017-08-01|   9ey4|
|-103.787|21.5876|      77.9|      25.5|

In [16]:
print((wgeo.count(), len(wgeo.columns)))

(1230518, 6)


In [12]:
new = hgeo.join(wgeo.drop_duplicates(['Geohash']).drop('lat', 'lng'), ['Geohash'], "left")
new.show()
new.write.parquet("merged_data")

+-------+-------------+--------------------+-------+--------------+--------------------+----------+------------+----------+----------+----------+
|Geohash|           Id|                Name|Country|          City|             Address|  Latitude|   Longitude|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+-------------+--------------------+-------+--------------+--------------------+----------+------------+----------+----------+----------+
|   9qce| 137438953472|The Citizen Hotel...|     US|    Sacramento|            926 J St|38.5802767|-121.4938057|      81.8|      27.7|2017-08-01|
|   9tuv| 661424963585|       Village Lodge|     US|       Ruidoso|      1000 Mechem Dr| 33.351721| -105.674432|      60.2|      15.7|2017-08-01|
|   9vk0| 111669149696|      Comfort Suites|     US|       Houston|1055 E Mcnee Road...| 29.681085|  -95.402996|      83.0|      28.3|2017-08-01|
|   9wst|1013612281863|         Hampton Inn|     US|       Alamosa|     710 Mariposa St|  37.48132|  -105.90579|      63.2| 

In [19]:
new.show(truncate=True)

+-------+-------------+--------------------+-------+--------------+--------------------+----------+------------+----------+----------+----------+
|Geohash|           Id|                Name|Country|          City|             Address|  Latitude|   Longitude|avg_tmpr_f|avg_tmpr_c| wthr_date|
+-------+-------------+--------------------+-------+--------------+--------------------+----------+------------+----------+----------+----------+
|   9qce| 137438953472|The Citizen Hotel...|     US|    Sacramento|            926 J St|38.5802767|-121.4938057|      81.8|      27.7|2017-08-01|
|   9tuv| 661424963585|       Village Lodge|     US|       Ruidoso|      1000 Mechem Dr| 33.351721| -105.674432|      60.2|      15.7|2017-08-01|
|   9vk0| 111669149696|      Comfort Suites|     US|       Houston|1055 E Mcnee Road...| 29.681085|  -95.402996|      83.0|      28.3|2017-08-01|
|   9wst|1013612281863|         Hampton Inn|     US|       Alamosa|     710 Mariposa St|  37.48132|  -105.90579|      63.2| 

In [17]:
print((new.count(), len(new.columns)))

(2494, 11)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import pygeohash as pgh

# Define UDF to generate geohash from latitude and longitude

def geohash_udf(lat, lon):
    return pgh.encode(lat, lon, precision=4)

# Register UDF
udf_geohash = udf(geohash_udf, StringType())

# Add new column with geohash
hotel_data_with_geohash = hdata.withColumn("Geohash", udf_geohash("Latitude", "Longitude"))
hotel_data_with_geohash.show()

