In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import (DateType, IntegerType, StringType, StructField,
                               StructType)

import matplotlib.pyplot as plt
import seaborn as sns

def get_daily_boarding_total():
    spark = SparkSession.builder.appName("CTA Ridership Daily Total").getOrCreate()

    csv_file_path = "./../datasets/Cleaned_CTA_Ridership_L_Daily_Total.csv"

    cta_df = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(csv_file_path)
    )

    schema = StructType(
        [
            StructField("station_id", IntegerType(), True),
            StructField("stationname", StringType(), True),
            StructField("date", StringType(), True),
            StructField("daytype", StringType(), True),
            StructField("rides", IntegerType(), True),
        ]
    )

    cta_df = spark.read.schema(schema).option("header", "true").csv(csv_file_path)

    return cta_df


In [2]:
df = get_daily_boarding_total()
df.toPandas().head()

24/04/28 14:08:05 WARN Utils: Your hostname, Yashs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.20.20.20 instead (on interface en0)
24/04/28 14:08:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/28 14:08:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Unnamed: 0,station_id,stationname,date,daytype,rides
0,41280,Jefferson Park,12/22/2017,W,6104
1,41000,Cermak-Chinatown,12/18/2017,W,3636
2,40280,Central-Lake,12/02/2017,A,1270
3,40140,Dempster-Skokie,12/19/2017,W,1759
4,40690,Dempster,12/03/2017,U,499


In [33]:
from pyspark.sql.functions import avg, col, month, sum, to_date, when, year

cta_df = df.withColumn("date", to_date(col("date"), "MM/DD/yyyy")) \
    .withColumn("year", year(col("date"))) \
    .withColumn("month", month(col("date"))) 

# Extract the year and month from the date
# cta_df = cta_df.withColumn("year", year(col("date"))).withColumn("month", month(col("date")))

cta_df.toPandas().head()

                                                                                

Unnamed: 0,station_id,stationname,date,daytype,rides,year,month
0,,stationname,,daytype,,,
1,41280.0,Jefferson Park,,W,6104.0,,
2,41000.0,Cermak-Chinatown,,W,3636.0,,
3,40280.0,Central-Lake,,A,1270.0,,
4,40140.0,Dempster-Skokie,,W,1759.0,,


In [34]:
monthly_avg_daily_boarding = cta_df.groupBy("year", "month").agg(avg("rides").alias("average_rides")).orderBy("year", "month")


# Convert to a Pandas DataFrame for plotting
monthly_avg_daily_boarding_pd = monthly_avg_daily_boarding.toPandas()
monthly_avg_daily_boarding_pd.head()
# # Create a 'year-month' column for easier plotting
# monthly_avg_daily_boarding_pd["year_month"] = monthly_avg_daily_boarding_pd["year"].astype(str) + "-" + monthly_avg_daily_boarding_pd["month"].astype(str)

# # Create the plot
# plt.figure(figsize=(12, 6))
# sns.lineplot(x="year_month", y="average_rides", data=monthly_avg_daily_boarding_pd, marker="o")
# plt.title("Monthly Average Daily Boarding")
# plt.xlabel("Year-Month")
# plt.ylabel("Average Daily Boarding")
# plt.xticks(rotation=45)
# plt.tight_layout()


                                                                                

Unnamed: 0,year,month,average_rides
0,,,3040.204504
1,2001.0,1.0,2811.00366
2,2002.0,1.0,2796.454112
3,2003.0,1.0,2694.259428
4,2004.0,1.0,2569.104044


24/04/28 03:51:12 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 885731 ms exceeds timeout 120000 ms
24/04/28 03:51:12 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/28 03:51:13 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, split, sum
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

spark = SparkSession.builder.appName("WeatherData").getOrCreate()

weather_csv_path = "./../datasets/Weather_data.csv"
weather_df = (
    spark.read.option("header", "true")
    .csv(weather_csv_path)
)

csv_file_path = "./../datasets/Cleaned_CTA_Ridership_L_Daily_Total.csv"

cta_df = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv(csv_file_path)
)

                                                                                

In [18]:
weather_df = weather_df.withColumn("date", to_date(col("Date"), "yyyy-MM-DD"))
cta_df = cta_df.withColumn("date", to_date(col("date"), "MM/dd/yyyy"))


In [19]:
cta_grouped = cta_df.groupBy("date").agg(sum("rides").alias("total_rides"))


In [20]:
cta_grouped.toPandas().head()

                                                                                

Unnamed: 0,date,total_rides
0,2018-05-28,263296
1,2018-08-10,622704
2,2019-05-08,620493
3,2019-06-04,614068
4,2020-08-24,125103


In [21]:
joined_df = weather_df.join(cta_grouped, on="date", how="inner")


In [22]:
joined_pd = joined_df.select("date", "PRCP (Inches)", "total_rides").orderBy("date").toPandas()


                                                                                

In [23]:
joined_pd.head()

Unnamed: 0,date,PRCP (Inches),total_rides
0,2007-01-01,0.0,182534
1,2007-01-02,0.0,375184
2,2007-01-03,0.0,465798
3,2007-01-04,0.0,455522
4,2007-01-05,1.55,472466
