# TLC Trip Data Record Analysis

This is the notebook used to analyze the TLC Trip Data Record. In this notebook, we will enrich the data with the New York weather data, public holiday data, drop unnecessary columns, and create new columns for further analysis.

We will save the data in Amazon S3 bucket partioned by year, month, day, hour and vehicle operater. And prepare the data to be used in visualization tool, and machine learning.

In [None]:
# Print the magics in Glue Spark kernel
%help

## 1. Launch the Glue Interactive Sessions development environment

To use this notebook, you must run the AWS Glue environment.

We develop the notebook in the Glue Interactive Sessions development environment. To launch the environment, follow the steps below:

In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 8

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# pyspark.sql.functions
from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek, date_format
from pyspark.sql.functions import when
from pyspark.sql.functions import coalesce

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


In [None]:
import os
# Detect if the code is running in a Glue job
def is_glue_job():
    try:
        args = getResolvedOptions(sys.argv,['JOB_NAME'])
        print("JOB_NAME: ", args['JOB_NAME'])
        return True
    except:
        return False

print("Running in Glue Job: ", is_glue_job())

## 2. TLC Trip Data Record 

The TLC Trip Data Record is a public dataset provided by the New York City Taxi and Limousine Commission (TLC) that contains data on over 1.1 billion taxi trips in New York City. The data is stored in Amazon S3 as a CSV file with a separate file for each month and year. The data is partitioned by year and month. The data is available from 2009 to the present. The data is updated on a monthly basis.

Before we start, we have already download the For-hire Vehicle (FHV) trip records from 2019.2 to 2013.6. The data is stored in the S3 bucket.

### 2.1 Load Trip Record Data

Load the data from S3 bucket and convert to a Spark DataFrame. The Parquet schema since 2023-02 has changed, the PULocationID & DOLocationID was INT64 before 2023-02, but INT32 after 2023-02. To make the data consistent, we will convert to INT64.

To to this, we put the data in two folders in Amazon S3 bucket, `fhvhv` contains the data before 2023-02, and `fhvhv_abnormal` contains the data since 2023-02. We load the data into 2 Spark DataFrame, cast the INT32 columns to INT64, concatenate the two DataFrame.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, IntegerType, BinaryType, TimestampType, LongType

schema_since_2023_02 = StructType([
    StructField("hvfhs_license_num", StringType()),
    StructField("dispatching_base_num", StringType()),
    StructField("request_datetime", TimestampType()),
    StructField("pickup_datetime", TimestampType()),
    StructField("dropoff_datetime", TimestampType()),
    StructField("PULocationID", IntegerType()),
    StructField("DOLocationID", IntegerType()),
    StructField("trip_miles", DoubleType()),
    StructField("trip_time", LongType()),
    StructField("base_passenger_fare", DoubleType()),
    StructField("tolls", DoubleType()),
    StructField("sales_tax", DoubleType()),
    StructField("congestion_surcharge", DoubleType()),
    StructField("tips", DoubleType()),
    StructField("driver_pay", DoubleType()),
    StructField("shared_request_flag", StringType()),
    StructField("shared_match_flag", StringType())
])

schema_before_2023_02 = StructType([
    StructField("hvfhs_license_num", StringType()),
    StructField("dispatching_base_num", StringType()),
    StructField("request_datetime", TimestampType()),
    StructField("pickup_datetime", TimestampType()),
    StructField("dropoff_datetime", TimestampType()),
    StructField("PULocationID", LongType()),
    StructField("DOLocationID", LongType()),
    StructField("trip_miles", DoubleType()),
    StructField("trip_time", LongType()),
    StructField("base_passenger_fare", DoubleType()),
    StructField("tolls", DoubleType()),
    StructField("sales_tax", DoubleType()),
    StructField("congestion_surcharge", DoubleType()),
    StructField("tips", DoubleType()),
    StructField("driver_pay", DoubleType()),
    StructField("shared_request_flag", StringType()),
    StructField("shared_match_flag", StringType())
])

In [None]:
# if running in Glue job, read all data, otherwise, read a sample
if is_glue_job():
    print("Running in Glue job, read all data")
    df_before_2023_02 = spark.read.schema(schema_before_2023_02).parquet("s3://qiaoshi-aws-ml/tlc/fhvhv/")
    df_since_2023_02 = spark.read.schema(schema_since_2023_02).parquet("s3://qiaoshi-aws-ml/tlc/fhvhv_abnormal/")
else:
    print("Running in a development environment, read a sample")
    df_before_2023_02 = spark.read.schema(schema_before_2023_02).parquet("s3://qiaoshi-aws-ml/tlc/fhvhv/").sample(False, 0.002) # 0.2% sample
    df_since_2023_02 = spark.read.schema(schema_since_2023_02).parquet("s3://qiaoshi-aws-ml/tlc/fhvhv_abnormal/").sample(False, 0.002) # 0.2% sample

df_since_2023_02 = df_since_2023_02.withColumn("PULocationID", df_since_2023_02["PULocationID"].cast("long"))
df_since_2023_02 = df_since_2023_02.withColumn("DOLocationID", df_since_2023_02["DOLocationID"].cast("long"))
print("Count before 2023-02: ", df_before_2023_02.count())
print("Count since 2023-02: ", df_since_2023_02.count())

In [None]:
df_records = df_before_2023_02.union(df_since_2023_02)
print("Total counts after union: ", df_records.count())

### 2.2 Data Transformation

In this seciton, we did the following data transformation:

- The column `request_datetime` contains null, we fill it with the `pickup_datetime`.
- The column `congestion_surcharge` contains null value, we fill it with 0.
- The column `trip_time` is null, we fill it with the difference between `dropoff_datetime` and `pickup_datetime`.
- Add a column `pickup_time`, indicates how long it takes from the request to pickup.
- Convert the `hvfhs_license_num` to the rider name, Juno, Uber, Via, Lyft.
- Add columns `year`, `month`, `day`, `hour`, `weekday_n` based on the `request_datetime` for further partition or analysis.
- Add a column `requset_hour`, indicates the hour of the request in "yyyy-MM-dd HH:00:00" format.
- Convert the `trip_miles` from miles to kilometers.

The `hvfhs_license_num` is the TLC license number of the HVFHS base or business. Convert it to the well-recognized name as following.
- HV0002: Juno
- HV0003: Uber
- HV0004: Via
- HV0005: Lyft


In [None]:
# fill the request_datetime with pickup_datetime if null
df_records = df_records.withColumn("request_datetime", coalesce(df_records["request_datetime"], df_records["pickup_datetime"]))

# if the value of congestion_surcharge is null, fill it with 0
df_records = df_records.fillna({'congestion_surcharge': 0.0})
print("Total counts after filling null: ", df_records.count())

# Add column `trip_time` from `dropoff_datetime` and `pickup_datetime`.
from pyspark.sql.functions import unix_timestamp
df_records = df_records.withColumn("trip_time", (unix_timestamp(df_records["dropoff_datetime"]) - unix_timestamp(df_records["pickup_datetime"])).cast("double"))

# add pickup_time
df_records = df_records.withColumn("pickup_time", (unix_timestamp(df_records["pickup_datetime"]) - unix_timestamp(df_records["request_datetime"]) ).cast("double"))

if not is_glue_job():
    df_records.show(5)

In [None]:
# add a new column to indicate the rider
df_records = df_records.withColumn("rider", 
                        when(df_records["hvfhs_license_num"] == "HV0002", "Juno")
                        .when(df_records["hvfhs_license_num"] == "HV0003", "Uber")
                        .when(df_records["hvfhs_license_num"] == "HV0004", "Via")
                        .when(df_records["hvfhs_license_num"] == "HV0005", "Lyft")
                        .otherwise("Unknown")  # Optional: for unmapped values
                        )

# drop the column
df_records = df_records.drop("hvfhs_license_num")

print("Total counts after adding rider: ", df_records.count())

if not is_glue_job():
    df_records.show(5)

In [None]:
# Add columns `year`, `month`, `day`, `hour`, `weekday` from `request_datetime`.
df_records = df_records.withColumn("year", year(df_records["request_datetime"]))
df_records = df_records.withColumn("month", month(df_records["request_datetime"]))
df_records = df_records.withColumn("day", dayofmonth(df_records["request_datetime"]))
df_records = df_records.withColumn("hour", hour(df_records["request_datetime"]))
df_records = df_records.withColumn("weekday_n", dayofweek(df_records["request_datetime"]))

# Add a new column to indicate the hour of the request, example, 2019-01-02 03:00:00
df_records = df_records.withColumn("request_hour", date_format(df_records["request_datetime"], "yyyy-MM-dd HH:00:00"))


In [None]:
# Convert the trip_miles from miles to kilometers
from pyspark.sql.functions import round

df_records = df_records.withColumn("trip_km", round(df_records["trip_miles"] * 1.60934, 2))

# drop duplicated column
df_records = df_records.drop("trip_miles")

if not is_glue_job():
    df_records.show(5)

In [None]:
# print the final schema of the DataFrame
df_records.printSchema()

## 3. Public holidays in New York City

We used ChatGPT to get the public holidays in New York City in from 2019 ~ 2023. The following is the code we used to get the public holidays, and ask it to output in CSV format. And save the result, and upload it to the S3 bucket.

```
List all public holidays in New York from 2019 to 2023. Output in CSV format. Here is an example:

<example>
Year,Month,Day,Holiday
2019,1,1,New Year's Day
2019,1,21,Martin Luther King Jr. Day
</example>
```

Command to upload to the Amazon S3 bucket.
```
aws s3 cp ~/Developer/sjtu/data-analytics/data/holidays_ny.csv s3://qiaoshi-aws-ml/tlc/holidays/ny.csv
```


### 3.1 Read the Holiday Data from S3 bucket

The holiday data is stored in the S3 bucket, we read the data into a Spark DataFrame.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

holiday_schema = StructType([
    StructField("year", StringType()),
    StructField("month", StringType()),
    StructField("day", StringType()),
    StructField("holiday", StringType()),
])

# Read the CSV file from S3 into a DataFrame
df_holidays = spark.read.schema(holiday_schema).csv("s3://qiaoshi-aws-ml/tlc/holidays/ny.csv", header=True)

# Show the first 5 rows of the DataFrame
if not is_glue_job():
    df_holidays.show(5)

print("Total counts of holidays: ", df_holidays.count())

## 4. TLC Trip Data Record Enrichment

In this section, we will enrich the TLC Trip Data Record with the public holidays data.

### 4.1 Enrich with Publich Holiday Data

In [None]:
# Join the TLC Trip Data Record with the public holiday data
df_enriched = df_records.join(df_holidays, on=['year', 'month', 'day'], how='left')

# Add a column is_holiday of type boolean
df_enriched = df_enriched.withColumn("is_holiday", df_enriched["holiday"].isNotNull())

# drop the holiday date
df_enriched = df_enriched.drop("holiday")

print("Add column is_holiday, and drop column holiday done.")

if not is_glue_job():
    df_enriched.show(3)

## 5. Save the Enriched data to S3

In this section, we will save the processed data to S3 in Parquet format, and partioned by year and rider. The partioned data will be used in the visualization tool, and machine learning. And it is good for parallel processing.

In [None]:
# save the result to S3 in parquet format, partitioned by year, month, rider

if is_glue_job():
    df_enriched.write.mode("overwrite").partitionBy("year", "month", "rider").parquet("s3://qiaoshi-aws-ml/tlc/results/full/trips/")
else:
    df_enriched.write.mode("overwrite").partitionBy("year", "month", "rider").parquet("s3://qiaoshi-aws-ml/tlc/results/sample/trips/")

print("Output the enriched data to S3 done.")

In [None]:
df_enriched.printSchema()

## 6. Data Aggregation

We will group the data by year, month, rider, pickup location, and aggreated to calculate the average pickup time, average cost per kilometers.




### 6.1 Aggregate the data by year, month, rider, pickup location

In this section, we will aggregate the data by year, month, rider, pickup location, and calculate the average pickup time, average cost per kilometers, and etc.

- avg_pickup_time
- avg_base_fare_per_km
- avg_tips_per_km
- avg_driver_pay_per_km

In [None]:
from pyspark.sql.functions import sum, col, avg, round

df_grouped_by_month = df_enriched.groupBy("rider", "year", "month", "PULocationID").agg(
    round(avg("pickup_time"), 2).alias("avg_pickup_time"),
    round(avg(col("base_passenger_fare") / col("trip_km")), 2).alias("avg_base_fare_per_km"),
    round(avg(col("tips") / col("trip_km")), 2).alias("avg_tips_per_km"),
    round(avg(col("driver_pay") / col("trip_km")), 2).alias("avg_driver_pay_per_km"),
)

print("The df_grouped_by_month count is: ", df_grouped_by_month.count())

if not is_glue_job():
    df_grouped_by_month.show(5)

In [None]:
# write the result to the S3 bucket
if not is_glue_job():
    df_grouped_by_month.write.mode("overwrite").csv("s3://qiaoshi-aws-ml/tlc/results/sample/avg_by_month/", header=True)
else:
    df_grouped_by_month.write.mode("overwrite").csv("s3://qiaoshi-aws-ml/tlc/results/full/avg_by_month/", header=True)

print("Output the df_grouped_by_month to S3 done.")

### 6.2 Aggregate the data by weekday_n, rider, PUlocationID

We aggregte the data by weekday_n, rider, PUlocationID, and calculate the average pickup time, average cost per kilometers, and etc.

In [None]:
df_grouped_by_weekday = df_enriched.filter(df_enriched["is_holiday"] == False).groupBy("rider", "weekday_n", "PULocationID").agg(
    round(avg("pickup_time"), 2).alias("avg_pickup_time"),
    round(avg(col("base_passenger_fare") / col("trip_km")), 2).alias("avg_base_fare_per_km"),
    round(avg(col("tips") / col("trip_km")), 2).alias("avg_tips_per_km"),
    round(avg(col("driver_pay") / col("trip_km")), 2).alias("avg_driver_pay_per_km"),
)

print("The df_grouped_by_weekday count is: ", df_grouped_by_weekday.count())

if not is_glue_job():
    df_grouped_by_weekday.show(5)

In [None]:
if not is_glue_job():
    df_grouped_by_weekday.write.mode("overwrite").csv("s3://qiaoshi-aws-ml/tlc/results/sample/avg_by_weekday/", header=True)
else:
    df_grouped_by_weekday.write.mode("overwrite").csv("s3://qiaoshi-aws-ml/tlc/results/full/avg_by_weekday/", header=True)

print("Output the df_grouped_by_weekday to S3 done.")

## 6.3 Analyze the total trips, total kilometers, total cost, total tips, total driver pay, total drivers

In this section, we aggregate the data to output, and group by rider, year, month

- sum_trip_time
- sum_base_fare
- sum_sales_tax
- sum_tips
- sum_driver_pay
- sum_trip_km
- distinct_drivers


In [None]:
from pyspark.sql.functions import sum, count_distinct, round, col, date_format, concat, lit

df_grouped_by_rider = df_enriched.groupBy("rider", "year", "month").agg(
    round((sum("trip_time") / 60 / 60), 2).alias("sum_trip_time_hour"),
    round(sum("base_passenger_fare"), 2).alias("sum_base_fare"),
    round(sum("sales_tax"), 2).alias("sum_sales_tax"),
    round(sum("tips"), 2).alias("sum_tips"),
    round(sum("driver_pay"), 2).alias("sum_driver_pay"),
    round(sum("trip_km"), 2).alias("sum_trip_km"),
    round(sum("base_passenger_fare") - sum("driver_pay"), 2).alias("sum_rider_income"), # base_passenger_fare - driver_pay
    round((sum("driver_pay") + sum("tips")) / sum("trip_km"), 2).alias("avg_driver_income_per_km"), # driver_pay + tips / trip_km
    count_distinct("dispatching_base_num").alias("distinct_drivers")
).withColumn("year_month", date_format(concat(col("year"), lit("-"), col("month"), lit("-01")), "yyyy-MM-dd"))

print("The df_grouped_by_rider count is: ", df_grouped_by_rider.count())

if not is_glue_job():
    df_grouped_by_rider.show(5)


In [None]:
# write the result to the S3 bucket

if not is_glue_job():
    df_grouped_by_rider.write.mode("overwrite").csv("s3://qiaoshi-aws-ml/tlc/results/sample/group_by_rider/", header=True)
else:
    df_grouped_by_rider.write.mode("overwrite").csv("s3://qiaoshi-aws-ml/tlc/results/full/grup_by_rider/", header=True)

print("Output the df_grouped_by_rider to S3 done.")