In [1]:
import numpy as np
import pandas as pd
import matplotlib as plt
import pyspark as sp
from pyspark.sql.functions import col, year, to_timestamp, sum as _sum, desc, date_format, countDistinct, collect_set, round, first
from pyspark.sql import Row

In [2]:
spark = sp.sql.SparkSession.builder.appName("Merging Weather").master("local[4]").config("spark.driver.memory", "6g").config("spark.executor.memory", "4g").config("spark.network.timeout", "600s").config("spark.executor.heartbeatInterval", "60s").getOrCreate()

In [3]:
df = spark.read.parquet("D:/data/top20_mta_data_1")

In [5]:
df.printSchema()

root
 |-- station_complex: string (nullable = true)
 |-- transit_timestamp: string (nullable = true)
 |-- station_complex_id: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- fare_class_category: string (nullable = true)
 |-- ridership: integer (nullable = true)
 |-- transfers: integer (nullable = true)
 |-- Georeference: string (nullable = true)
 |-- transit_mode: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)



In [6]:
df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+---------------+-----------------+------------------+--------------+-------------------+---------+---------+------------+------------+-------+---------+----+---------+--------+
|station_complex|transit_timestamp|station_complex_id|payment_method|fare_class_category|ridership|transfers|Georeference|transit_mode|borough|timestamp|year|longitude|latitude|
+---------------+-----------------+------------------+--------------+-------------------+---------+---------+------------+------------+-------+---------+----+---------+--------+
|              0|                0|                 0|             0|                  0|        0|        0|           0|           0|      0|        0|   0|        0|       0|
+---------------+-----------------+------------------+--------------+-------------------+---------+---------+------------+------------+-------+---------+----+---------+--------+



In [7]:
df_weather2023 = spark.read.csv("D:/data/subway_weather_2023.csv", header=True)
df_weather2023.printSchema()
df_weather2024 = spark.read.csv("D:/data/subway_weather_2024.csv", header=True)
df_weather2024.printSchema()

root
 |-- time: string (nullable = true)
 |-- temperature_C: string (nullable = true)
 |-- precipitation_mm: string (nullable = true)
 |-- humidity_%: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)

root
 |-- time: string (nullable = true)
 |-- temperature_C: string (nullable = true)
 |-- precipitation_mm: string (nullable = true)
 |-- humidity_%: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [8]:
df.drop("timestamp", "year")

DataFrame[station_complex: string, transit_timestamp: string, station_complex_id: string, payment_method: string, fare_class_category: string, ridership: int, transfers: int, Georeference: string, transit_mode: string, borough: string, longitude: double, latitude: double]

In [9]:
df_weather2023 = df_weather2023.withColumn("timestamp_temp", to_timestamp("time", "yyyy-MM-dd'T'HH:mm"))
df_weather2023 = df_weather2023.withColumn("transit_timestamp", date_format("timestamp_temp", "MM/dd/yyyy hh:mm:ss a"))
df_weather2023.show(10)

+----------------+-------------+----------------+----------+----------+---------+----------+-------------------+--------------------+
|            time|temperature_C|precipitation_mm|humidity_%|station_id| latitude| longitude|     timestamp_temp|   transit_timestamp|
+----------------+-------------+----------------+----------+----------+---------+----------+-------------------+--------------------+
|2023-01-01T00:00|         10.5|             1.2|        99|         1|40.751778|-73.976845|2023-01-01 00:00:00|01/01/2023 12:00:...|
|2023-01-01T01:00|         10.1|             1.2|        99|         1|40.751778|-73.976845|2023-01-01 01:00:00|01/01/2023 01:00:...|
|2023-01-01T02:00|         10.0|             0.2|        98|         1|40.751778|-73.976845|2023-01-01 02:00:00|01/01/2023 02:00:...|
|2023-01-01T03:00|         10.1|             0.0|        96|         1|40.751778|-73.976845|2023-01-01 03:00:00|01/01/2023 03:00:...|
|2023-01-01T04:00|          9.5|             0.0|        94|  

In [10]:
df_weather2024 = df_weather2024.withColumn("timestamp_temp", to_timestamp("time", "yyyy-MM-dd'T'HH:mm"))
df_weather2024 = df_weather2024.withColumn("transit_timestamp", date_format("timestamp_temp", "MM/dd/yyyy hh:mm:ss a"))
df_weather2024.show(10)

+----------------+-------------+----------------+----------+----------+---------+----------+-------------------+--------------------+
|            time|temperature_C|precipitation_mm|humidity_%|station_id| latitude| longitude|     timestamp_temp|   transit_timestamp|
+----------------+-------------+----------------+----------+----------+---------+----------+-------------------+--------------------+
|2024-01-01T00:00|          1.3|             0.1|        79|         1|40.751778|-73.976845|2024-01-01 00:00:00|01/01/2024 12:00:...|
|2024-01-01T01:00|          0.8|             0.0|        81|         1|40.751778|-73.976845|2024-01-01 01:00:00|01/01/2024 01:00:...|
|2024-01-01T02:00|          1.9|             0.0|        79|         1|40.751778|-73.976845|2024-01-01 02:00:00|01/01/2024 02:00:...|
|2024-01-01T03:00|          2.2|             0.0|        79|         1|40.751778|-73.976845|2024-01-01 03:00:00|01/01/2024 03:00:...|
|2024-01-01T04:00|          2.2|             0.0|        79|  

In [11]:
df_weather2023.select("transit_timestamp").show(2, truncate=False)

+----------------------+
|transit_timestamp     |
+----------------------+
|01/01/2023 12:00:00 AM|
|01/01/2023 01:00:00 AM|
+----------------------+
only showing top 2 rows



In [14]:
df_weather = df_weather2023.union(df_weather2024)
df_weather.drop("time")
df_weather.printSchema()

root
 |-- time: string (nullable = true)
 |-- temperature_C: string (nullable = true)
 |-- precipitation_mm: string (nullable = true)
 |-- humidity_%: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- timestamp_temp: timestamp (nullable = true)
 |-- transit_timestamp: string (nullable = true)



In [15]:
df_weather.drop("time")
df_weather.printSchema()

root
 |-- time: string (nullable = true)
 |-- temperature_C: string (nullable = true)
 |-- precipitation_mm: string (nullable = true)
 |-- humidity_%: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- timestamp_temp: timestamp (nullable = true)
 |-- transit_timestamp: string (nullable = true)



In [16]:
df.printSchema()

root
 |-- station_complex: string (nullable = true)
 |-- transit_timestamp: string (nullable = true)
 |-- station_complex_id: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- fare_class_category: string (nullable = true)
 |-- ridership: integer (nullable = true)
 |-- transfers: integer (nullable = true)
 |-- Georeference: string (nullable = true)
 |-- transit_mode: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)



In [18]:
# Create aliases for both DataFrames
df_main = df.alias("main")
df_weather_renamed = df_weather.selectExpr(
    "longitude as weather_longitude",
    "latitude as weather_latitude",
    "transit_timestamp as weather_transit_timestamp",
    "temperature_C",
    "precipitation_mm",
    "`humidity_%`"  # Escape special character using backticks
).alias("weather")

# Join using the original columns
joined_df = df_main.join(
    df_weather_renamed,
    (col("main.longitude") == col("weather.weather_longitude")) &
    (col("main.latitude") == col("weather.weather_latitude")) &
    (col("main.transit_timestamp") == col("weather.weather_transit_timestamp")),
    how="left"
)

# Show the joined result
joined_df.show(truncate=False)

+--------------------------------------------------+----------------------+------------------+--------------+--------------------------------+---------+---------+----------------------------+------------+---------+-------------------+----+----------+---------+-----------------+----------------+-------------------------+-------------+----------------+----------+
|station_complex                                   |transit_timestamp     |station_complex_id|payment_method|fare_class_category             |ridership|transfers|Georeference                |transit_mode|borough  |timestamp          |year|longitude |latitude |weather_longitude|weather_latitude|weather_transit_timestamp|temperature_C|precipitation_mm|humidity_%|
+--------------------------------------------------+----------------------+------------------+--------------+--------------------------------+---------+---------+----------------------------+------------+---------+-------------------+----+----------+---------+------------

In [19]:
joined_df.write.mode("overwrite").option("compression", "snappy").parquet("D:/data/mta_top20_joined_1")

In [25]:
joined_df = joined_df.drop("weather_longitude", "weather_latitude", "weather_transit_timestamp", "timestamp", "year", "Georeference")
joined_df.printSchema()

root
 |-- station_complex: string (nullable = true)
 |-- transit_timestamp: string (nullable = true)
 |-- station_complex_id: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- fare_class_category: string (nullable = true)
 |-- ridership: integer (nullable = true)
 |-- transfers: integer (nullable = true)
 |-- transit_mode: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- temperature_C: string (nullable = true)
 |-- precipitation_mm: string (nullable = true)
 |-- humidity_%: string (nullable = true)



In [26]:
joined_df.write.mode("overwrite").option("compression", "snappy").parquet("D:/data/mta_top20_joined_1")