In [1]:
## Introduction to spark/pysprak

In [2]:
# import findspark
# findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark import SparkFiles

In [3]:
spark = SparkSession.builder \
    .appName("NYC-data-processing") \
    .config("spark.driver.memory", "4g") \
    .master("local") \
    .getOrCreate()

print(f'The current spark version is {spark.version}')

The current spark version is 3.5.0


In [70]:
# ------------ Yellow NYC Taxi Trip ------------ #

In [4]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-07 13:36:01--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.157.184.33, 108.157.184.174, 108.157.184.53, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.157.184.33|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-07 13:36:27 (2.46 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [5]:
# file_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet'
df_yellow_dataset = spark.read.parquet('yellow_tripdata_2024-10.parquet')

In [6]:
# Show the schema and sample data
print("Schema:")
df_yellow_dataset.printSchema()

Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [7]:
print("\nSample data:")
df_yellow_dataset.show(5, truncate=False)


Sample data:
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|2       |2024-10-01 00:30:44 |2024-10-01 00:48:26  |1              |3.0          |1         |N                 |162         |246         |1           |18.4       |1.0  |0.5

In [8]:
# Change timestamp_ntz into datetime
df_yellow_dataset \
    .withColumn('tpep_pickup_datetime', F.to_date(df_yellow_dataset.tpep_pickup_datetime)) \
    .withColumn('tpep_dropoff_datetime', F.to_date(df_yellow_dataset.tpep_dropoff_datetime)) \
    .select('VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count') \
    .show()

+--------+--------------------+---------------------+---------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|
+--------+--------------------+---------------------+---------------+
|       2|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              1|
|       2|          2024-10-01|           2024-10-01|              2|
|       1|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              0|
|       1|          2024-10-01|           2024-10-01|              1|
|       1|          2024-10-01|           2024-10-01|              1|
|       2|          

In [9]:
import pandas as pd

In [10]:
# Convert Parquet to CSV
def parquet_to_csv(parquet_path, csv_path='output.csv'):
    # Read Parquet
    df_yellow_dataset_parquet = pd.read_parquet('yellow_tripdata_2024-10.parquet')

    # Save as CSV
    df_yellow_dataset_parquet.to_csv('yellow_tripdata_2024-10.csv', index=False)
    print(f"Data saved to: {'yellow_tripdata_2024-10.csv'}")

In [11]:
parquet_to_csv('sample.parquet', 'sample.csv')

Data saved to: yellow_tripdata_2024-10.csv


In [12]:
from pyspark.sql import types

In [101]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [102]:
df_yellow_dataset_csv = spark.read \
                             .option("header", "true") \
                             .schema(yellow_schema) \
                             .csv('yellow_tripdata_2024-10.csv')
df_yellow_dataset_csv.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|           NULL|          3.0|      NULL|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.5|         0.0|                  1.0

In [103]:
df_converted_csv = df_yellow_dataset_parquet \
    .withColumn('pickup_date', F.to_date(df_yellow_dataset_csv.tpep_pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df_yellow_dataset_csv.tpep_dropoff_datetime)) \
    .select('vendorid', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+--------+-----------+------------+------------+------------+
|vendorid|pickup_date|dropoff_date|PULocationID|DOLocationID|
+--------+-----------+------------+------------+------------+
|       2| 2024-10-01|  2024-10-01|         162|         246|
|       1| 2024-10-01|  2024-10-01|          48|         236|
|       1| 2024-10-01|  2024-10-01|         142|          24|
|       1| 2024-10-01|  2024-10-01|         233|          75|
|       1| 2024-10-01|  2024-10-01|         262|         262|
|       2| 2024-10-01|  2024-10-01|         137|         137|
|       1| 2024-10-01|  2024-10-01|         142|          48|
|       1| 2024-10-01|  2024-10-01|         230|         161|
|       1| 2024-10-01|  2024-10-01|         142|         237|
|       1| 2024-10-01|  2024-10-01|         162|         145|
|       1| 2024-10-01|  2024-10-01|         229|         162|
|       2| 2024-10-01|  2024-10-01|         162|         162|
|       2| 2024-10-01|  2024-10-01|         162|         230|
|       

In [105]:
df_yellow_dataset.write.parquet('data/pq/yellow_tripdata/2024/10/', mode='overwrite')

In [106]:
df_pq = spark.read.parquet('data/pq/yellow_tripdata/2024/10/')

In [107]:
df_pq \
    .withColumn('pickup_date', F.to_date(df_pq.tpep_pickup_datetime)) \
    .withColumn('dropoff_datetime', F.to_date(df_pq.tpep_dropoff_datetime)) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

128893

In [18]:
# Spark/Pyspark DataFrames

In [26]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    return f'e/{num:03x}'

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [20]:
# Spark/PySpark SQL

In [21]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-07 13:40:15--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.157.184.223, 108.157.184.53, 108.157.184.174, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.157.184.223|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-07 13:40:16 (369 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [22]:
# Write csv file as parquet file
df_taxi_zone_lookup = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)

In [23]:
# Save as parquet file
df_taxi_zone_lookup.write.mode("overwrite").parquet("parquet/taxi_zone_lookup")

In [24]:
# Show the schema and sample data
print("Schema:")
df_taxi_zone_lookup.printSchema()

Schema:
root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [28]:
common_columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [69]:
# ------------ Green NYC Taxi Trip ------------ #

In [35]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-10.parquet

--2025-03-07 13:53:34--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.157.184.53, 108.157.184.33, 108.157.184.223, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.157.184.53|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1353731 (1.3M) [binary/octet-stream]
Saving to: ‘green_tripdata_2024-10.parquet’


2025-03-07 13:53:37 (1.03 MB/s) - ‘green_tripdata_2024-10.parquet’ saved [1353731/1353731]



In [109]:
# file_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-10.parquet
df_green_dataset = spark.read.parquet('green_tripdata_2024-10.parquet')

In [110]:
# Convert Parquet to CSV
def parquet_to_csv_1(parquet_path, csv_path='output.csv'):
    # Read Parquet
    df_green_dataset_parquet = pd.read_parquet('green_tripdata_2024-10.parquet')

    # Save as CSV
    df_green_dataset_parquet.to_csv('green_tripdata_2024-10.csv', index=False)
    print(f"Data saved to: {'green_tripdata_2024-10.csv'}")

In [111]:
parquet_to_csv_1('sample.parquet', 'sample.csv')

Data saved to: green_tripdata_2024-10.csv


In [112]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [113]:
df_green_dataset_csv = spark.read \
                             .option("header", "true") \
                             .schema(green_schema) \
                             .csv('green_tripdata_2024-10.csv')
df_green_dataset_csv.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2024-10-01 00:52:13|  2024-10-01 01:02:39|                 N|      NULL|          75|         238|           NULL|          2.1|       12.8|  1.0|    0.

In [114]:
df_converted_csv_1 = df_green_dataset_csv \
    .withColumn('pickup_date', F.to_date(df_green_dataset_csv.lpep_pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df_green_dataset_csv.lpep_dropoff_datetime)) \
    .select('vendorid', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+--------+-----------+------------+------------+------------+
|vendorid|pickup_date|dropoff_date|PULocationID|DOLocationID|
+--------+-----------+------------+------------+------------+
|       2| 2024-10-01|  2024-10-01|          75|         238|
|       2| 2024-10-01|  2024-10-01|         134|          82|
|       2| 2024-10-01|  2024-10-01|         202|         260|
|       2| 2024-10-01|  2024-10-01|         130|         218|
|       2| 2024-10-01|  2024-10-01|          42|          94|
|       2| 2024-10-01|  2024-10-01|         220|         220|
|       2| 2024-10-01|  2024-10-01|          75|         235|
|       2| 2024-10-01|  2024-10-01|         256|          17|
|       2| 2024-10-01|  2024-10-01|         129|         129|
|       2| 2024-10-01|  2024-10-01|          95|         196|
|       2| 2024-09-30|  2024-09-30|         255|         264|
|       2| 2024-09-30|  2024-10-01|          95|          95|
|       2| 2024-10-01|  2024-10-01|         152|          41|
|       

In [115]:
df_green_dataset.write.parquet('data/pq/green_tripdata/2024/10/', mode='overwrite')

In [116]:
df_pq_1 = spark.read.parquet('data/pq/green_tripdata/2024/10/')

In [117]:
set(df_pq.columns) & set(df_pq_1.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [118]:
df_pg_yellow = df_pq \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
df_pg_yellow.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [119]:
df_pg_green = df_pq_1 \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
df_pg_green.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [120]:
common_columns = []

for col in df_pg_green.columns:
    if col in df_pg_yellow.columns:
        common_columns.append(col)

In [121]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [122]:
df_green_temp = \
    df_pg_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

In [123]:
df_yellow_temp = \
    df_pg_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

In [124]:
df_nyc_taxi_data = df_green_temp.unionAll(df_yellow_temp)

In [125]:
df_nyc_taxi_data.groupBy('service_type').count().show()

+------------+-------+
|service_type|  count|
+------------+-------+
|       green|  56147|
|      yellow|3833771|
+------------+-------+



In [126]:
# Spark/PySpark SQL

In [129]:
df_nyc_taxi_data.registerTempTable('nyc_taxi_data')

In [130]:
# Show the schema and sample data
print("Schema:")
df_nyc_taxi_data.printSchema()

Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- service_type: string (nullable = false)



In [131]:
spark.sql("""
SELECT
    service_type,
    count(1) as Type_N
FROM
    nyc_taxi_data
GROUP BY
    service_type;
""").show()

+------------+-------+
|service_type| Type_N|
+------------+-------+
|       green|  56147|
|      yellow|3833771|
+------------+-------+



In [132]:
df_nyc_taxi_reveneue = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    nyc_taxi_data
GROUP BY
    1, 2, 3
""")

In [133]:
df_nyc_taxi_reveneue.show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|          28|2024-

In [134]:
df_nyc_taxi_reveneue.coalesce(1).write.parquet('data/pq/report/tripdata_revenue/', mode='overwrite')