In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize SparkSession
spark = SparkSession.builder.appName("Read CSV Example").getOrCreate()

# Define the correct path to the directory containing your CSV files
path_to_csv = "/home/jovyan/work/PT-Spark/restaurant_csv/"

# Read all CSV files from the directory into a DataFrame
restaurant_df  = spark.read.csv(path_to_csv, header=True, inferSchema=True)

# Display the first few rows of the DataFrame to verify the data was read correctly
restaurant_df .show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|
|163208757312|          65|       Chef's Corner|                  96638|     GB|        London|51.495| -0.191|
| 68719476763|          28|    The Spicy Pickle|                  77517|     US|      Grayling|44.657|-84.744|
|

In [31]:

# Iterate over all columns and count NULL values for each column
for column in restaurant_df.columns:
    nulls_count = restaurant_df.filter(col(column).isNull()).count()
    print(f"Кількість NULL значень в колонці {column}: {nulls_count}")

Кількість NULL значень в колонці id: 0
Кількість NULL значень в колонці franchise_id: 0
Кількість NULL значень в колонці franchise_name: 0
Кількість NULL значень в колонці restaurant_franchise_id: 0
Кількість NULL значень в колонці country: 0
Кількість NULL значень в колонці city: 0
Кількість NULL значень в колонці lat: 1
Кількість NULL значень в колонці lng: 1


In [32]:
restaurant_df .filter(col("lat").isNull() | col("lng").isNull()).show()


+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [3]:
import requests

def fetch_coordinates(city, country, api_key='f6bb0876190e423ebea1880fd18f0d61'):
    url = f'https://api.opencagedata.com/geocode/v1/json?q={city}+{country}&key={api_key}'
    response = requests.get(url)
    data = response.json()

    if data['results']:
        lat = data['results'][0]['geometry']['lat']
        lng = data['results'][0]['geometry']['lng']
        return (lat, lng)
    return (None, None)


In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, StructType, StructField

schema = StructType([
    StructField("lat", FloatType(), True),
    StructField("lng", FloatType(), True)
])

fetch_coordinates_udf = udf(fetch_coordinates, schema)

# Assume df is your DataFrame
# Select the row with missing values
row_to_update = restaurant_df.filter(restaurant_df['lat'].isNull() & restaurant_df['lng'].isNull())

# Update this row using a UDF
updated_row = row_to_update.withColumn("new_coords", fetch_coordinates_udf(row_to_update['city'], row_to_update['country']))
updated_row = updated_row.withColumn("lat", updated_row["new_coords"].getItem("lat")).withColumn("lng", updated_row["new_coords"].getItem("lng"))
updated_row = updated_row.drop("new_coords")

# Show the updated row
updated_row.show()


+-----------+------------+--------------+-----------------------+-------+------+--------+---------+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|     lat|      lng|
+-----------+------------+--------------+-----------------------+-------+------+--------+---------+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40141|-79.38644|
+-----------+------------+--------------+-----------------------+-------+------+--------+---------+



In [5]:
from pyspark.sql import Row
from pyspark.sql.functions import when

# Collect the updated data as a list of strings
updated_data = [row.asDict() for row in updated_row.collect()]

# Update the main DataFrame
for row_data in updated_data:
    restaurant_df = restaurant_df.withColumn("lat", when(restaurant_df["id"] == row_data["id"], row_data["lat"]).otherwise(restaurant_df["lat"]))
    restaurant_df = restaurant_df.withColumn("lng", when(restaurant_df["id"] == row_data["id"], row_data["lng"]).otherwise(restaurant_df["lng"]))

# Check the updated DataFrame
restaurant_df.filter(restaurant_df['id'] == 85899345920).show()


+-----------+------------+--------------+-----------------------+-------+------+-----------------+------------------+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city|              lat|               lng|
+-----------+------------+--------------+-----------------------+-------+------+-----------------+------------------+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|34.40140914916992|-79.38643646240234|
+-----------+------------+--------------+-----------------------+-------+------+-----------------+------------------+



In [6]:
# Path to the folder month=10, which contains the folders day=01, day=02, day=03, day=04
path = "/home/jovyan/work/PT-Spark/weather/weather"

# Reading all Parquet files from all folders day=xx in DataFrame
weather_df = spark.read.parquet(path)

# Show data
weather_df.show(n=10)

+--------+-------+----------+----------+----------+----+-----+---+
|     lng|    lat|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|
+--------+-------+----------+----------+----------+----+-----+---+
|-104.423|21.5478|      73.6|      23.1|2017-08-01|2017|    8|  1|
|-104.374| 21.551|      72.6|      22.6|2017-08-01|2017|    8|  1|
|-104.325|21.5541|      71.7|      22.1|2017-08-01|2017|    8|  1|
|-104.276|21.5573|      70.9|      21.6|2017-08-01|2017|    8|  1|
|-104.227|21.5604|      70.5|      21.4|2017-08-01|2017|    8|  1|
|-104.178|21.5635|      70.1|      21.2|2017-08-01|2017|    8|  1|
|-104.129|21.5665|      69.8|      21.0|2017-08-01|2017|    8|  1|
| -104.08|21.5696|      69.5|      20.8|2017-08-01|2017|    8|  1|
|-104.032|21.5726|      69.3|      20.7|2017-08-01|2017|    8|  1|
|-103.983|21.5757|      71.8|      22.1|2017-08-01|2017|    8|  1|
+--------+-------+----------+----------+----------+----+-----+---+
only showing top 10 rows



In [9]:
import pygeohash as pgh
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a UDF to generate a 4-character geohash
def generate_geohash(lat, lng):
    return pgh.encode(lat, lng, precision=4)

generate_geohash_udf = udf(generate_geohash, StringType())

# Add geohash column to weather data
weather_df = weather_df.withColumn("geohash", generate_geohash_udf("lat", "lng"))

# Add geohash column to restaurant data
restaurant_df = restaurant_df.withColumn("geohash", generate_geohash_udf("lat", "lng"))


In [10]:
from pyspark.sql.functions import col, year, month
# Extract month and year from the 'wthr_date' column
weather_df = weather_df.withColumn("month", month(col("wthr_date")))
weather_df = weather_df.withColumn("year", year(col("wthr_date")))


In [15]:
# Split the dataset into three parts by month values
months = weather_df.select('month').distinct().rdd.flatMap(lambda x: x).collect()

# Create an empty DataFrame to hold the results
final_df = spark.createDataFrame([], weather_df.schema)

# Iterate through each unique month, process the data and collect the results
for month in months:
    # Filter the dataset by the current monthцем
    subset_df = weather_df.filter(weather_df.month == month)
    
    # Remove duplicates in this subset
    unique_subset_df = subset_df.dropDuplicates(['geohash', 'day', 'month', 'year'])
    
    # Add unique strings to the final result
    final_df = final_df.union(unique_subset_df)
    print(final_df.count())



2426613
4850979


In [16]:
final_df.count()

4850979

In [22]:
# Remove 'lng' and 'lat' columns from merged DataFrame
final_cleaned_df = final_df.drop('lng', 'lat')

In [23]:
# Perform a left join using the geohash column
joined_df = restaurant_df.join(final_cleaned_df, on="geohash", how="left")

# Drop duplicates if necessary to prevent data multiplication
# You may specify subset of columns in dropDuplicates() to consider for identifying duplicates
joined_df = joined_df.dropDuplicates()

In [24]:
joined_df.show()

+-------+------------+------------+-----------------+-----------------------+-------+-------------+------+--------+----------+----------+----------+----+-----+---+
|geohash|          id|franchise_id|   franchise_name|restaurant_franchise_id|country|         city|   lat|     lng|avg_tmpr_f|avg_tmpr_c| wthr_date|year|month|day|
+-------+------------+------------+-----------------+-----------------------+-------+-------------+------+--------+----------+----------+----------+----+-----+---+
|   9prt|          25|          26| The Silver Spoon|                  47732|     US|Crescent City|41.748|-124.203|      54.7|      12.6|2017-08-03|2017|    8|  3|
|   9prt|          25|          26| The Silver Spoon|                  47732|     US|Crescent City|41.748|-124.203|      56.9|      13.8|2017-08-05|2017|    8|  5|
|   9prt|          25|          26| The Silver Spoon|                  47732|     US|Crescent City|41.748|-124.203|      57.8|      14.3|2017-08-06|2017|    8|  6|
|   9prt|       

In [25]:
# Save DataFrame in Parquet format with partitioning by 'year' and 'month'
joined_df.write.partitionBy('year', 'month','day').parquet('/home/jovyan/work/PT-Spark/result/data.parquet')
