# Weather × Ridership Analysis

Project: Analyze the impact of weather on NYC public transportation ridership

- Task 1: Initialize Spark Session
- Task 2: Load MTA Daily Ridership Data
- Task 3: Load Weather Data (Open-Meteo)
- Task 4: Join Ridership with Weather on Date
- Task 5: Feature Engineering - Add Flags or Bins
- Task 6: Analysis - Compute Correlations
- Task 7: Aggregations - Average Ridership by Flags
- Task 8: Train (Subway) Ridership Model
- Task 9: Predict Upcoming Ridership From Forecast
- Task 10: Save Results


### Preliminaries: Installing Libraries & Downloading Data

Install the Required Libraries:

In [323]:
!pip install pyspark findspark pandas sodapy openmeteo-requests requests-cache retry-requests numpy pandas



### Importing Libraries

Importing the Required Libraries

In [324]:
import findspark
import warnings
import requests
import re

def warn(*args, **kwargs):
    pass

warnings.warn = warn
warnings.filterwarnings('ignore')

findspark.init()

# Spark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, to_date, when, weekofyear, avg, expr
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.types import (
    StructType, StructField,
    StringType, DoubleType
)

# Socrata (MTA Open Data)
import pandas as pd
from sodapy import Socrata

# Open Meteo
import openmeteo_requests
import requests_cache
from retry_requests import retry

### Initialize Spark Session


In [325]:
spark = SparkSession.builder \
    .appName("WeatherRidershipAnalysis") \
    .master("local[*]") \
    .getOrCreate()

### Tasks

#### Task 1: Load MTA Daily Ridership Data

In [326]:
client = Socrata("data.ny.gov", None)

results = client.get("vxuj-8kew", limit=2000)

# Convert to Pandas DataFrame
results_df = pd.DataFrame.from_records(results)

results_df.head()



Unnamed: 0,date,subways_total_estimated_ridership,subways_of_comparable_pre_pandemic_day,buses_total_estimated_ridersip,buses_of_comparable_pre_pandemic_day,lirr_total_estimated_ridership,lirr_of_comparable_pre_pandemic_day,metro_north_total_estimated_ridership,metro_north_of_comparable_pre_pandemic_day,access_a_ride_total_scheduled_trips,access_a_ride_of_comparable_pre_pandemic_day,bridges_and_tunnels_total_traffic,bridges_and_tunnels_of_comparable_pre_pandemic_day,staten_island_railway_total_estimated_ridership,staten_island_railway_of_comparable_pre_pandemic_day
0,2020-03-01T00:00:00.000,2212965,0.97,984908,0.99,86790,1.0,55825,0.59,19922,1.13,786960,0.98,1636,0.52
1,2020-03-02T00:00:00.000,5329915,0.96,2209066,0.99,321569,1.03,180701,0.66,30338,1.02,874619,0.95,17140,1.07
2,2020-03-03T00:00:00.000,5481103,0.98,2228608,0.99,319727,1.02,190648,0.69,32767,1.1,882175,0.96,17453,1.09
3,2020-03-04T00:00:00.000,5498809,0.99,2177165,0.97,311662,0.99,192689,0.7,34297,1.15,905558,0.98,17136,1.07
4,2020-03-05T00:00:00.000,5496453,0.99,2244515,1.0,307597,0.98,194386,0.7,33209,1.12,929298,1.01,17203,1.08


In [327]:
# Convert Pandas DataFrame to Spark DataFrame
daily_df = spark.createDataFrame(results_df)

# Isolate Columns
daily_df = daily_df.select(
            col("date"),
            col("subways_total_estimated_ridership").alias("subway_ridership"),
            col("buses_total_estimated_ridersip").alias("bus_ridership"),
            col("lirr_total_estimated_ridership").alias("lirr_ridership"),
            col("metro_north_total_estimated_ridership").alias("metro_north_ridership")
        )

# Adjust Date Format
daily_df = daily_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))

daily_df.show(5)

+----------+----------------+-------------+--------------+---------------------+
|      date|subway_ridership|bus_ridership|lirr_ridership|metro_north_ridership|
+----------+----------------+-------------+--------------+---------------------+
|2020-03-01|         2212965|       984908|         86790|                55825|
|2020-03-02|         5329915|      2209066|        321569|               180701|
|2020-03-03|         5481103|      2228608|        319727|               190648|
|2020-03-04|         5498809|      2177165|        311662|               192689|
|2020-03-05|         5496453|      2244515|        307597|               194386|
+----------+----------------+-------------+--------------+---------------------+
only showing top 5 rows



#### Task 2: Load Weather Data (Open-Meteo)

In [328]:
# Setup Open-Meteo API Client
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)

# Get the Minimum and Maximum Dates from the Ridership Dataset
start_date = daily_df.selectExpr("min(date)").collect()[0][0].strftime('%Y-%m-%d')
end_date = daily_df.selectExpr("max(date)").collect()[0][0].strftime('%Y-%m-%d')

url = "https://archive-api.open-meteo.com/v1/archive"
params = {
	"latitude": 40.7143,
	"longitude": -74.006,
  "start_date": start_date,
  "end_date": end_date,
	"daily": ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "wind_speed_10m_max", "wind_gusts_10m_max"],
    "temperature_unit": "fahrenheit",
    "windspeed_unit": "kmh",
    "precipitation_unit": "mm",
    "timezone": "America/New_York"
}
responses = openmeteo.weather_api(url, params=params)

# Process Location
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()}{response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")

# Process Daily Data
daily_data = {}
daily = response.Daily()
daily_data["date"] = pd.date_range(
    start = pd.to_datetime(daily.Time(), unit = "s", utc = True),
    end = pd.to_datetime(daily.TimeEnd(), unit = "s", utc = True),
    freq = pd.Timedelta(seconds = daily.Interval()),
    inclusive = "left"
)

# Extract Data for Each Daily Variable
daily_data["temperature_max_f"] = daily.Variables(0).ValuesAsNumpy()
daily_data["temperature_min_f"] = daily.Variables(1).ValuesAsNumpy()
daily_data["precipitation_mm"] = daily.Variables(2).ValuesAsNumpy()
daily_data["wind_speed_kmh"] = daily.Variables(3).ValuesAsNumpy()
daily_data["wind_gust_kmh"] = daily.Variables(4).ValuesAsNumpy()

weather_df = pd.DataFrame(data = daily_data)

# Convert Pandas DataFrame to Spark DataFrame
weather_df = spark.createDataFrame(weather_df)

# Convert the Date Column to YYYY-MM-DD Format
weather_df = weather_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd HH:mm:ss"))

weather_df.show(5)

Coordinates 40.738136291503906°N -74.04254150390625°E
Elevation 51.0 m asl
Timezone b'America/New_York'b'GMT-4'
Timezone difference to GMT+0 -14400 s
+----------+------------------+------------------+-----------------+------------------+------------------+
|      date| temperature_max_f| temperature_min_f| precipitation_mm|    wind_speed_kmh|     wind_gust_kmh|
+----------+------------------+------------------+-----------------+------------------+------------------+
|2020-03-01|   44.010498046875| 23.40049934387207|              0.0|22.737070083618164|42.839996337890625|
|2020-03-02|56.070499420166016|29.160499572753906|              0.0|29.237674713134766| 48.23999786376953|
|2020-03-03|56.070499420166016| 43.38050079345703| 6.59999942779541|25.537172317504883|48.599998474121094|
|2020-03-04|55.350502014160156| 40.86050033569336|3.299999952316284| 34.32670974731445| 61.92000198364258|
|2020-03-05|50.670501708984375| 34.02050018310547|              0.0|19.694992065429688|33.83999633789

#### Task 3: Join Ridership with Weather on Date

In [329]:
joined = daily_df.join(weather_df, on="date", how="inner")
joined.show(5)

+----------+----------------+-------------+--------------+---------------------+------------------+------------------+-------------------+------------------+------------------+
|      date|subway_ridership|bus_ridership|lirr_ridership|metro_north_ridership| temperature_max_f| temperature_min_f|   precipitation_mm|    wind_speed_kmh|     wind_gust_kmh|
+----------+----------------+-------------+--------------+---------------------+------------------+------------------+-------------------+------------------+------------------+
|2020-08-24|         1307345|        34021|         75145|                35904| 88.83049774169922| 70.83049774169922|0.10000000149011612|15.580141067504883|33.119998931884766|
|2021-01-27|         1660278|      1004792|         70353|                35635|39.510501861572266|27.810501098632812| 0.6000000238418579| 23.86341094970703|42.119998931884766|
|2021-06-22|         2354873|      1172110|        116165|                97637| 76.05049896240234| 59.220500946044

#### Task 4: Feature Engineering - Add Flags or Bins

In [330]:
enriched = (
    joined
    .withColumn("rain_flag", when(col("precipitation_mm") > 0, 1).otherwise(0))
    .withColumn("hot_day_flag", when(col("temperature_max_f") >= 85, 1).otherwise(0))
    .withColumn("cold_day_flag", when(col("temperature_min_f") <= 32, 1).otherwise(0))
)

#### Task 6: Analysis - Compute Correlations

In [331]:
enriched = enriched.withColumn("subway_ridership", enriched["subway_ridership"].cast(IntegerType()))
enriched = enriched.withColumn("bus_ridership", enriched["bus_ridership"].cast(IntegerType()))
enriched = enriched.withColumn("lirr_ridership", enriched["lirr_ridership"].cast(IntegerType()))
enriched = enriched.withColumn("metro_north_ridership", enriched["metro_north_ridership"].cast(IntegerType()))

enriched = enriched.withColumn("temperature_max_f", enriched["temperature_max_f"].cast(DoubleType()))
enriched = enriched.withColumn("precipitation_mm", enriched["precipitation_mm"].cast(DoubleType()))

# Subway
corr_temp = enriched.stat.corr("subway_ridership", "temperature_max_f")
corr_precip = enriched.stat.corr("subway_ridership", "precipitation_mm")
print(f"Correlation (Subway Ridership vs. Max Temperature): {corr_temp}")
print(f"Correlation (Subway Ridership vs. Precipitation): {corr_precip}")

# Bus
corr_temp = enriched.stat.corr("bus_ridership", "temperature_max_f")
corr_precip = enriched.stat.corr("bus_ridership", "precipitation_mm")
print(f"Correlation (Bus Ridership vs. Max Temperature): {corr_temp}")
print(f"Correlation (Bus Ridership vs. Precipitation): {corr_precip}")

# LIRR
corr_temp = enriched.stat.corr("lirr_ridership", "temperature_max_f")
corr_precip = enriched.stat.corr("lirr_ridership", "precipitation_mm")
print(f"Correlation (LIRR Ridership vs. Max Temperature): {corr_temp}")
print(f"Correlation (LIRR Ridership vs. Precipitation): {corr_precip}")

# Metro North
corr_temp = enriched.stat.corr("metro_north_ridership", "temperature_max_f")
corr_precip = enriched.stat.corr("metro_north_ridership", "precipitation_mm")
print(f"Correlation (Metro North Ridership vs. Max Temperature): {corr_temp}")
print(f"Correlation (Metro North Ridership vs. Precipitation): {corr_precip}")


Correlation (Subway Ridership vs. Max Temperature): 0.029333704697265418
Correlation (Subway Ridership vs. Precipitation): -0.051985664617168435
Correlation (Bus Ridership vs. Max Temperature): -0.02623031244966192
Correlation (Bus Ridership vs. Precipitation): -0.06336134435388766
Correlation (LIRR Ridership vs. Max Temperature): 0.10713230068337701
Correlation (LIRR Ridership vs. Precipitation): -0.04383321519343369
Correlation (Metro North Ridership vs. Max Temperature): 0.08327988617343784
Correlation (Metro North Ridership vs. Precipitation): -0.044880853497027764


#### Task 7: Aggregations - Average Ridership by Flags

##### Rain

In [332]:
avg_by_rain = (
    enriched
      .groupBy()
      .pivot("rain_flag", [False, True])
      .agg(
          avg("subway_ridership").alias("avg_subway"),
          avg("bus_ridership").alias("avg_bus"),
          avg("lirr_ridership").alias("avg_lirr"),
          avg("metro_north_ridership").alias("avg_metro_north")
      )
      .withColumnRenamed("false_avg_subway", "avg_subway_no_rain")
      .withColumnRenamed("true_avg_subway",  "avg_subway_rain")
      .withColumnRenamed("false_avg_bus",     "avg_bus_no_rain")
      .withColumnRenamed("true_avg_bus",      "avg_bus_rain")
      .withColumnRenamed("false_avg_lirr",    "avg_lirr_no_rain")
      .withColumnRenamed("true_avg_lirr",     "avg_lirr_rain")
      .withColumnRenamed("false_avg_metro_north", "avg_metro_north_no_rain")
      .withColumnRenamed("true_avg_metro_north",  "avg_metro_north_rain")
      .withColumn(
          "pct_change_subway",
          expr("(avg_subway_rain  - avg_subway_no_rain) / avg_subway_no_rain * 100")
      )
      .withColumn(
          "pct_change_bus",
          expr("(avg_bus_rain - avg_bus_no_rain) / avg_bus_no_rain * 100")
      )
      .withColumn(
          "pct_change_lirr",
          expr("(avg_lirr_rain - avg_lirr_no_rain) / avg_lirr_no_rain * 100")
      )
      .withColumn(
          "pct_change_metro_north",
          expr("(avg_metro_north_rain - avg_metro_north_no_rain) / avg_metro_north_no_rain * 100")
      )
)

avg_by_rain = avg_by_rain.selectExpr(
    """
    stack(
      4,
      'subway', avg_subway_no_rain, avg_subway_rain, pct_change_subway,
      'bus',    avg_bus_no_rain,    avg_bus_rain,    pct_change_bus,
      'lirr',   avg_lirr_no_rain,   avg_lirr_rain,   pct_change_lirr,
      'metro_north', avg_metro_north_no_rain, avg_metro_north_rain, pct_change_metro_north
    )
    as (mode, avg_no_rain, avg_rain, pct_change)
    """
)

avg_by_rain.show(truncate=False)

+-----------+------------------+------------------+------------------+
|mode       |avg_no_rain       |avg_rain          |pct_change        |
+-----------+------------------+------------------+------------------+
|subway     |2618371.6755504054|2469480.6067907996|-5.686399304953812|
|bus        |1049352.7219003476|975543.6002190581 |-7.033776168953307|
|lirr       |143331.55388180766|134484.84665936473|-6.172197944451222|
|metro_north|121296.59096176129|113954.90142387732|-6.052675907601092|
+-----------+------------------+------------------+------------------+



##### Hot Day

In [333]:
avg_by_hot_day = (
    enriched
      .groupBy()
      .pivot("hot_day_flag", [False, True])
      .agg(
          avg("subway_ridership").alias("avg_subway"),
          avg("bus_ridership").alias("avg_bus"),
          avg("lirr_ridership").alias("avg_lirr"),
          avg("metro_north_ridership").alias("avg_metro_north")
      )
      .withColumnRenamed("false_avg_subway", "avg_subway_no_hot_day")
      .withColumnRenamed("true_avg_subway",  "avg_subway_hot_day")
      .withColumnRenamed("false_avg_bus",     "avg_bus_no_hot_day")
      .withColumnRenamed("true_avg_bus",      "avg_bus_hot_day")
      .withColumnRenamed("false_avg_lirr",    "avg_lirr_no_hot_day")
      .withColumnRenamed("true_avg_lirr",     "avg_lirr_hot_day")
      .withColumnRenamed("false_avg_metro_north", "avg_metro_north_no_hot_day")
      .withColumnRenamed("true_avg_metro_north",  "avg_metro_north_hot_day")
      .withColumn(
          "pct_change_subway",
          expr("(avg_subway_hot_day  - avg_subway_no_hot_day) / avg_subway_no_hot_day * 100")
      )
      .withColumn(
          "pct_change_bus",
          expr("(avg_bus_hot_day - avg_bus_no_hot_day) / avg_bus_no_hot_day * 100")
      )
      .withColumn(
          "pct_change_lirr",
          expr("(avg_lirr_hot_day - avg_lirr_no_hot_day) / avg_lirr_no_hot_day * 100")
      )
      .withColumn(
          "pct_change_metro_north",
          expr("(avg_metro_north_hot_day - avg_metro_north_no_hot_day) / avg_metro_north_no_hot_day * 100")
      )
)

avg_by_hot_day = avg_by_hot_day.selectExpr(
    """
    stack(
      4,
      'subway', avg_subway_no_hot_day, avg_subway_hot_day, pct_change_subway,
      'bus',    avg_bus_no_hot_day,    avg_bus_hot_day,    pct_change_bus,
      'lirr',   avg_lirr_no_hot_day,   avg_lirr_hot_day,   pct_change_lirr,
      'metro_north', avg_metro_north_no_hot_day, avg_metro_north_hot_day, pct_change_metro_north
    )
    as (mode, avg_no_hot_day, avg_hot_day, pct_change)
    """
)

avg_by_hot_day.show(truncate=False)

+-----------+------------------+------------------+-------------------+
|mode       |avg_no_hot_day    |avg_hot_day       |pct_change         |
+-----------+------------------+------------------+-------------------+
|subway     |2551300.2735849055|2460876.9623655914|-3.5442049748326063|
|bus        |1022882.1044025157|913334.1935483871 |-10.709729927098245|
|lirr       |138087.62389937107|144733.7311827957 |4.812963751384301  |
|metro_north|117251.9962264151 |119833.91935483871|2.2020291436555923 |
+-----------+------------------+------------------+-------------------+



##### Cold Day

In [334]:
avg_by_cold_day = (
    enriched
      .groupBy()
      .pivot("cold_day_flag", [False, True])
      .agg(
          avg("subway_ridership").alias("avg_subway"),
          avg("bus_ridership").alias("avg_bus"),
          avg("lirr_ridership").alias("avg_lirr"),
          avg("metro_north_ridership").alias("avg_metro_north")
      )
      .withColumnRenamed("false_avg_subway", "avg_subway_no_cold_day")
      .withColumnRenamed("true_avg_subway",  "avg_subway_cold_day")
      .withColumnRenamed("false_avg_bus",     "avg_bus_no_cold_day")
      .withColumnRenamed("true_avg_bus",      "avg_bus_cold_day")
      .withColumnRenamed("false_avg_lirr",    "avg_lirr_no_cold_day")
      .withColumnRenamed("true_avg_lirr",     "avg_lirr_cold_day")
      .withColumnRenamed("false_avg_metro_north", "avg_metro_north_no_cold_day")
      .withColumnRenamed("true_avg_metro_north",  "avg_metro_north_cold_day")
      .withColumn(
          "pct_change_subway",
          expr("(avg_subway_cold_day  - avg_subway_no_cold_day) / avg_subway_no_cold_day * 100")
      )
      .withColumn(
          "pct_change_bus",
          expr("(avg_bus_cold_day - avg_bus_no_cold_day) / avg_bus_no_cold_day * 100")
      )
      .withColumn(
          "pct_change_lirr",
          expr("(avg_lirr_cold_day - avg_lirr_no_cold_day) / avg_lirr_no_cold_day * 100")
      )
      .withColumn(
          "pct_change_metro_north",
          expr("(avg_metro_north_cold_day - avg_metro_north_no_cold_day) / avg_metro_north_no_cold_day * 100")
      )
)

avg_by_cold_day = avg_by_cold_day.selectExpr(
    """
    stack(
      4,
      'subway', avg_subway_no_cold_day, avg_subway_cold_day, pct_change_subway,
      'bus',    avg_bus_no_cold_day,    avg_bus_cold_day,    pct_change_bus,
      'lirr',   avg_lirr_no_cold_day,   avg_lirr_cold_day,   pct_change_lirr,
      'metro_north', avg_metro_north_no_cold_day, avg_metro_north_cold_day, pct_change_metro_north
    )
    as (mode, avg_no_cold_day, avg_cold_day, pct_change)
    """
)

avg_by_cold_day.show(truncate=False)

+-----------+------------------+------------------+------------------+
|mode       |avg_no_cold_day   |avg_cold_day      |pct_change        |
+-----------+------------------+------------------+------------------+
|subway     |2555917.8948106593|2484433.234285714 |-2.796829298393436|
|bus        |1008325.2026647966|1023974.1914285715|1.5519783421477311|
|lirr       |141089.9495091164 |129387.22285714286|-8.294514735238028|
|metro_north|119777.10589060308|108336.08571428571|-9.551925713388734|
+-----------+------------------+------------------+------------------+



#### Task 8: Train (Subway) Ridership Model

In [335]:
# Select Features and Target Variable
feature_cols = ["temperature_max_f", "precipitation_mm", "rain_flag", "hot_day_flag", "cold_day_flag"]
target_col = "subway_ridership" # Or 'bus_ridership', 'lirr_ridership', 'metro_north_ridership'

# Assemble Features into a Vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Create a Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol=target_col)

# Create a Pipeline
pipeline = Pipeline(stages=[assembler, lr])

# Split Data into Training and Testing Sets
# 80% Train, 20% Test
(trainingData, testData) = enriched.randomSplit([0.8, 0.2], seed=12345)

# Train the Model
model = pipeline.fit(trainingData)

# Make Predictions on the Test Data
predictions = model.transform(testData)

# Show Predictions and Actual Values (for a sample)
predictions.select("date", target_col, "prediction").show(5)

# Evaluate the Model
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

+----------+----------------+------------------+
|      date|subway_ridership|        prediction|
+----------+----------------+------------------+
|2020-03-07|         2814637| 2480925.717104908|
|2020-03-25|          690032|2479018.6029411154|
|2020-03-26|          680360| 2601721.313820609|
|2020-03-30|          545215|2483011.6699996525|
|2020-04-06|          445454|2521821.5455808067|
+----------+----------------+------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 1016507.9553998578


#### Task 9: Predict Upcoming Ridership Based on Forecast

In [336]:
# Setup Open Meteo for Forecast
url = "https://api.open-meteo.com/v1/forecast"
params = {
	"latitude": 40.7143,
	"longitude": -74.006,
	"daily": ["temperature_2m_max", "temperature_2m_min", "precipitation_sum"],
	"timezone": "America/New_York",
	"forecast_days": 7,
	"temperature_unit": "fahrenheit"
}
responses = openmeteo.weather_api(url, params=params)

# Process Location
response = responses[0]

# Process Daily Data
daily = response.Daily()
daily_temperature_2m_max = daily.Variables(0).ValuesAsNumpy()
daily_temperature_2m_min = daily.Variables(1).ValuesAsNumpy()
daily_precipitation_sum = daily.Variables(2).ValuesAsNumpy()

daily_data = {"date": pd.date_range(
	start = pd.to_datetime(daily.Time(), unit = "s", utc = True),
	end = pd.to_datetime(daily.TimeEnd(), unit = "s", utc = True),
	freq = pd.Timedelta(seconds = daily.Interval()),
	inclusive = "left"
)}

daily_data["temperature_2m_max"] = daily_temperature_2m_max
daily_data["temperature_2m_min"] = daily_temperature_2m_min
daily_data["precipitation_sum"] = daily_precipitation_sum

daily_dataframe = pd.DataFrame(data = daily_data)

# Convert Pandas DataFrame to Spark DataFrame
forecast_spark_df = spark.createDataFrame(daily_dataframe)

# Rename Columns to Match the Training Data Schema
forecast_spark_df = forecast_spark_df.withColumnRenamed("temperature_2m_max", "temperature_max_f")
forecast_spark_df = forecast_spark_df.withColumnRenamed("temperature_2m_min", "temperature_min_f")
forecast_spark_df = forecast_spark_df.withColumnRenamed("precipitation_sum", "precipitation_mm")

# Convert the Date Column to YYYY-MM-DD Format
forecast_spark_df = forecast_spark_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd HH:mm:ss+00:00"))

# Cast Relevant Columns to Match the Training Data Schema
forecast_spark_df = forecast_spark_df.withColumn("temperature_max_f", col("temperature_max_f").cast(DoubleType()))
forecast_spark_df = forecast_spark_df.withColumn("temperature_min_f", col("temperature_min_f").cast(DoubleType()))
forecast_spark_df = forecast_spark_df.withColumn("precipitation_mm", col("precipitation_mm").cast(DoubleType()))

# Add Feature Flag Columns Based on Forecast Data
forecast_spark_df = (
    forecast_spark_df
    .withColumn("rain_flag", when(col("precipitation_mm") > 0, 1).otherwise(0))
    .withColumn("hot_day_flag", when(col("temperature_max_f") >= 85, 1).otherwise(0))
    .withColumn("cold_day_flag", when(col("temperature_min_f") <= 32, 1).otherwise(0))
)

# Make Predictions on the Forecast Data
predicted_ridership = model.transform(forecast_spark_df)

# Show the Predicted Ridership
predicted_ridership.select("date", "prediction").show()

+----------+------------------+
|      date|        prediction|
+----------+------------------+
|2025-05-15|2632402.1138472324|
|2025-05-16|2538818.7618064247|
|2025-05-17| 2664801.724473774|
|2025-05-18| 2636671.702145283|
|2025-05-19| 2641439.492606911|
|2025-05-20|  2624513.79504054|
|2025-05-21|2481349.3226583963|
+----------+------------------+



#### Task 10: Save Results

In [337]:
enriched.write.mode("overwrite").parquet("/output/enriched_data.parquet")
avg_by_rain.write.mode("overwrite").csv("/output/avg_ridership_by_rain.csv")
avg_by_hot_day.write.mode("overwrite").csv("/output/avg_ridership_by_hot_day.csv")
avg_by_cold_day.write.mode("overwrite").csv("/output/avg_ridership_by_cold_day.csv")
model.write().overwrite().save("/output/ridership_model")
predicted_ridership.write.mode("overwrite").parquet("/output/predicted_ridership.parquet")

### Stop Spark Session

In [338]:
spark.stop()