In [2]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("data_work").getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/14 16:37:37 WARN Utils: Your hostname, timur-EliteMini-Series, resolves to a loopback address: 127.0.1.1; using 192.168.2.121 instead (on interface enp1s0)
25/12/14 16:37:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/14 16:37:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [24]:
from pyspark.sql.functions import col, when, lit, udf, avg, broadcast
from pyspark.sql.types import StringType

In [4]:
import requests
import urllib.parse
import pygeohash as pgh
import sys

In [5]:
df_restaurants = spark.read.csv("Desktop/Spark/restaurant_csv/", header=True, inferSchema=True)

In [6]:
df_restaurants.show(20)

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657|-84.744|
|

In [7]:
rows = df_restaurants.count()
cols = len(df_restaurants.columns)

print(f"Rows: {rows}, Columns: {cols}")

print("Dataset's Schema: ")
print(df_restaurants.printSchema)

Rows: 1997, Columns: 8
Dataset's Schema: 
<bound method DataFrame.printSchema of DataFrame[id: bigint, franchise_id: int, franchise_name: string, restaurant_franchise_id: int, country: string, city: string, lat: double, lng: double]>


In [8]:

# Filter restaurants with missing coordinates
missing_coords_df = df_restaurants.filter(
    col("lat").isNull() | col("lng").isNull()
)

# DF with the missing values
missing_coords_df.show()


+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [9]:
# We have only one missing value
missing_row = missing_coords_df.limit(1).collect()[0].asDict()

print("Row needing geocoding:")
print(missing_row)

Row needing geocoding:
{'id': 85899345920, 'franchise_id': 1, 'franchise_name': 'Savoria', 'restaurant_franchise_id': 18952, 'country': 'US', 'city': 'Dillon', 'lat': None, 'lng': None}


In [10]:
API_KEY = "908fca41ea834a5db40f4328f826ca4b"

address_field = "Dillon, US"  
query = urllib.parse.quote(str(address_field))

url = f"https://api.opencagedata.com/geocode/v1/json?q={query}&key={API_KEY}"

response = requests.get(url).json()

new_lat = response["results"][0]["geometry"]["lat"]
new_lng = response["results"][0]["geometry"]["lng"]

print(f"Fetched lat/lng: {new_lat}, {new_lng}")


Fetched lat/lng: 34.4014089, -79.3864339


In [11]:
# Fill the missing value

df_fixed = df_restaurants \
    .withColumn("lat", when(col("lat").isNull(), lit(new_lat)).otherwise(col("lat"))) \
    .withColumn("lng", when(col("lng").isNull(), lit(new_lng)).otherwise(col("lng")))

#print("Updated dataset:")
df_fixed.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657|-84.744|
|

In [12]:
#print("Updated dataset missing values:")

# Filter restaurants with missing coordinates
check_missing_coords_df = df_fixed.filter(
    col("lat").isNull() | col("lng").isNull()
)

# DF with the missing values
check_missing_coords_df.show()


+---+------------+--------------+-----------------------+-------+----+---+---+
| id|franchise_id|franchise_name|restaurant_franchise_id|country|city|lat|lng|
+---+------------+--------------+-----------------------+-------+----+---+---+
+---+------------+--------------+-----------------------+-------+----+---+---+



In [13]:
# Define UDF to encode lat/lng to 4-character geohash
@udf(StringType())
def geohash_4(lat, lng):
    try:
        if lat is None or lng is None:
            return None
        # make sure lat/lng are floats
        return pgh.encode(float(lat), float(lng))[:4]
    except Exception as e:
        print(f"Error in geohash: {e}, lat={lat}, lng={lng}")
        return None


In [14]:
# Apply the UDF to create a new 'geohash' column
df_fixed = df_fixed.withColumn("geohash", geohash_4(col("lat"), col("lng")))

In [15]:
df_fixed.show(20)

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|   dn4h|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|   u09t|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|   9xyd|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|   u2ed|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|   gcpu|
| 68719476763|          28|    The Spicy Pickle|

In [19]:
df = spark.read.parquet("/home/timur/Desktop/weather")

df.printSchema()
df.show(10, truncate=False)


root
 |-- lng: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- avg_tmpr_f: double (nullable = true)
 |-- avg_tmpr_c: double (nullable = true)
 |-- wthr_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

+--------+-------+----------+----------+----------+----+-----+---+
|lng     |lat    |avg_tmpr_f|avg_tmpr_c|wthr_date |year|month|day|
+--------+-------+----------+----------+----------+----+-----+---+
|-111.09 |18.6251|80.7      |27.1      |2017-08-29|2017|8    |29 |
|-111.042|18.6305|80.7      |27.1      |2017-08-29|2017|8    |29 |
|-110.995|18.6358|80.7      |27.1      |2017-08-29|2017|8    |29 |
|-110.947|18.6412|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-110.9  |18.6465|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-110.852|18.6518|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-110.804|18.6571|80.9      |27.2      |2017-08-29|2017|8    |29 |
|-105.068|19.1765|

In [20]:
rows = df.count()
cols = len(df.columns)

print(f"Rows: {rows}, Columns: {cols}")

Rows: 106242153, Columns: 8


In [21]:
df = df.withColumn("geohash", geohash_4(col("lat"), col("lng")))

In [22]:
df.show()

+--------+-------+----------+----------+----------+----+-----+---+-------+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|geohash|
+--------+-------+----------+----------+----------+----+-----+---+-------+
| -111.09|18.6251|      80.7|      27.1|2017-08-29|2017|    8| 29|   9e31|
|-111.042|18.6305|      80.7|      27.1|2017-08-29|2017|    8| 29|   9e31|
|-110.995|18.6358|      80.7|      27.1|2017-08-29|2017|    8| 29|   9e34|
|-110.947|18.6412|      80.9|      27.2|2017-08-29|2017|    8| 29|   9e34|
|  -110.9|18.6465|      80.9|      27.2|2017-08-29|2017|    8| 29|   9e34|
|-110.852|18.6518|      80.9|      27.2|2017-08-29|2017|    8| 29|   9e34|
|-110.804|18.6571|      80.9|      27.2|2017-08-29|2017|    8| 29|   9e34|
|-105.068|19.1765|      82.4|      28.0|2017-08-29|2017|    8| 29|   9emm|
| -105.02|19.1799|      82.0|      27.8|2017-08-29|2017|    8| 29|   9emm|
|-104.972|19.1832|      82.0|      27.8|2017-08-29|2017|    8| 29|   9emm|
|-104.924|19.1866|      8

In [26]:

df_weather_agg = (
    df
    .groupBy("geohash")
    .agg(
        avg("avg_tmpr_c").alias("avg_tmpr_c"),
        avg("avg_tmpr_f").alias("avg_tmpr_f")
    ))


In [28]:
# Assuming df_restaurants also has geohash_4
df_joined = df_fixed.join(
    broadcast(df_weather_agg),
    on="geohash",
    how="left"
)

In [29]:
print("Restaurants rows:", df_fixed.count())
print("Joined rows:", df_joined.count())


Restaurants rows: 1997
Joined rows: 1997
