## Data Preprocessing with PySpark
---

In [1]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, date_format, substring, to_timestamp

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1741207229711_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Initialize a Spark session
spark = SparkSession.builder.appName("FinalProject - Data Preprocessing").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
### 1. Data Load - Weather Dataset

In [3]:
# Load the weather dataset from an S3 bucket
weather_data_path = "s3://csc555-jaewon/final_project/2024_chicago_hourly_weather_from_january_to_february.csv"
weather_df = spark.read.option("header", "true").csv(weather_data_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Get the number of rows and columns
weather_num_rows = weather_df.count()
weather_num_columns = len(weather_df.columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
print(f"Rows: {weather_num_rows}, Columns: {weather_num_columns}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rows: 8784, Columns: 8

In [6]:
weather_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------+--------------------+-------------+-------------+--------------------+-------------+-----------+
|            time|temperature_2m|relative_humidity_2m|precipitation|windspeed_10m|apparent_temperature|windgusts_10m|weathercode|
+----------------+--------------+--------------------+-------------+-------------+--------------------+-------------+-----------+
|2024-01-01T00:00|             0|                  85|          0.1|         30.4|                -6.9|         44.3|         71|
|2024-01-01T01:00|           0.1|                  86|          0.1|         30.1|                -6.7|         44.6|         71|
|2024-01-01T02:00|           0.4|                  85|          0.1|         27.8|                -6.1|         43.6|         71|
|2024-01-01T03:00|           0.2|                  85|            0|         25.4|                  -6|           40|          3|
|2024-01-01T04:00|          -0.3|                  85|            0|         23.1|        

---
### 2. Data Load - Taxi Dataset

In [7]:
# Load the taxi dataset from an S3 bucket
taxi_data_path = "s3://csc555-jaewon/final_project/2024_chicago_taxi_trips_from_january_to_february.csv"
taxi_df = spark.read.option("header", "true").csv(taxi_data_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Get the number of rows and columns
taxi_num_rows = taxi_df.count()
taxi_num_columns = len(taxi_df.columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
print(f"Rows: {taxi_num_rows}, Columns: {taxi_num_columns}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rows: 865208, Columns: 23

In [10]:
taxi_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----+-----+------+----------+------------+--------------------+------------------------+-------------------------+------------------------+-------------------------+--------------------------+--------------------------+
|             Trip ID|             Taxi ID|Trip Start Timestamp|Trip End Timestamp|Trip Seconds|Trip Miles|Pickup Census Tract|Dropoff Census Tract|Pickup Community Area|Dropoff Community Area| Fare|Tips|Tolls|Extras|Trip Total|Payment Type|             Company|Pickup Centroid Latitude|Pickup Centroid Longitude|Pickup Centroid Location|Dropoff Centroid Latitude|Dropoff Centroid Longitude|Dropoff Centroid  Location|
+--------------------+--------------------+--------------------+------------------+------------+----------+-------------------+--------------------+--------------

---
### 3. Data Preprocessing - Weather Dataset

In [11]:
# Create a new column by formatting the timestamp
weather_df = weather_df.withColumn("Hour Slot", date_format(to_timestamp(col("time"), "yyyy-MM-dd'T'HH:mm"), "MM/dd/yyyy HH"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Cast columns to float type
weather_df = weather_df.withColumn("temperature_2m", col("temperature_2m").cast("float")) \
                       .withColumn("relative_humidity_2m", col("relative_humidity_2m").cast("float")) \
                       .withColumn("precipitation", col("precipitation").cast("float")) \
                       .withColumn("windspeed_10m", col("windspeed_10m").cast("float")) \
                       .withColumn("apparent_temperature", col("apparent_temperature").cast("float")) \
                       .withColumn("windgusts_10m", col("windgusts_10m").cast("float"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
weather_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- time: string (nullable = true)
 |-- temperature_2m: float (nullable = true)
 |-- relative_humidity_2m: float (nullable = true)
 |-- precipitation: float (nullable = true)
 |-- windspeed_10m: float (nullable = true)
 |-- apparent_temperature: float (nullable = true)
 |-- windgusts_10m: float (nullable = true)
 |-- weathercode: string (nullable = true)
 |-- Hour Slot: string (nullable = true)

In [14]:
weather_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------+--------------------+-------------+-------------+--------------------+-------------+-----------+-------------+
|            time|temperature_2m|relative_humidity_2m|precipitation|windspeed_10m|apparent_temperature|windgusts_10m|weathercode|    Hour Slot|
+----------------+--------------+--------------------+-------------+-------------+--------------------+-------------+-----------+-------------+
|2024-01-01T00:00|           0.0|                85.0|          0.1|         30.4|                -6.9|         44.3|         71|01/01/2024 00|
|2024-01-01T01:00|           0.1|                86.0|          0.1|         30.1|                -6.7|         44.6|         71|01/01/2024 01|
|2024-01-01T02:00|           0.4|                85.0|          0.1|         27.8|                -6.1|         43.6|         71|01/01/2024 02|
|2024-01-01T03:00|           0.2|                85.0|          0.0|         25.4|                -6.0|         40.0|          3|01/01/2

---
### 4. Data Preprocessing - Taxi Dataset

In [15]:
# Select the column
taxi_df = taxi_df.select("Trip Start Timestamp")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Create a new column by formatting the timestamp
taxi_df = taxi_df.withColumn("Formatted Timestamp",
                             F.date_format(F.to_timestamp(F.col("Trip Start Timestamp"), "M/d/yyyy HH:mm"), "MM/dd/yyyy HH:mm"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Extract the "Hour Slot" from the "Formatted Timestamp" column
taxi_df = taxi_df.withColumn("Hour Slot", F.substring(F.col("Formatted Timestamp"), 1, 13))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Count the number of taxi trips for each hour slot
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") # Set the legacy time parser policy
taxi_count_df = taxi_df.groupBy('Hour Slot').count().orderBy('Hour Slot')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
taxi_count_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+
|    Hour Slot|count|
+-------------+-----+
|01/01/2024 00|  462|
|01/01/2024 01|  522|
|01/01/2024 02|  490|
|01/01/2024 03|  269|
|01/01/2024 04|  150|
+-------------+-----+
only showing top 5 rows

In [20]:
# Check if the total count matches the initial row count of the taxi dataset
total_count = taxi_count_df.agg(F.sum('count').alias('total_count')).collect() # Sum the total number of taxi trips
if total_count[0]['total_count'] == taxi_num_rows:
    print("Taxi trip count has been successfully completed.")
else:
    print("There was an issue with the taxi trip count.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Taxi trip count has been successfully completed.

---
### 5. Data Merging  - Merged Dataset

In [21]:
# Merge the datasets
merged_df = taxi_count_df.join(weather_df, on = "Hour Slot", how = "inner")
merged_df = merged_df.orderBy("Hour Slot", ascending = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
merged_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+----------------+--------------+--------------------+-------------+-------------+--------------------+-------------+-----------+
|    Hour Slot|count|            time|temperature_2m|relative_humidity_2m|precipitation|windspeed_10m|apparent_temperature|windgusts_10m|weathercode|
+-------------+-----+----------------+--------------+--------------------+-------------+-------------+--------------------+-------------+-----------+
|01/01/2024 00|  462|2024-01-01T00:00|           0.0|                85.0|          0.1|         30.4|                -6.9|         44.3|         71|
|01/01/2024 01|  522|2024-01-01T01:00|           0.1|                86.0|          0.1|         30.1|                -6.7|         44.6|         71|
|01/01/2024 02|  490|2024-01-01T02:00|           0.4|                85.0|          0.1|         27.8|                -6.1|         43.6|         71|
|01/01/2024 03|  269|2024-01-01T03:00|           0.2|                85.0|          0.0|         25.

---
### 6. Data Export to S3 - Merged Dataset

In [23]:
# Export the merged dataset to an S3 bucket as a CSV file
merged_df.write.csv("s3://csc555-jaewon/final_project/2024_chicago_merged_dataset.csv", header = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…