In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

'3.3.2'

In [16]:
df_raw_data = spark.read.parquet('data/raw/yellow/2024/10/*')

df_raw_data.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [42]:
df_raw_data = spark.read \
    .option("header", "true") \
    .parquet('data/raw/yellow/2024/10/*') \
    .drop('Airport_fee') 

In [136]:
df_raw_data_casted = df_raw_data \
    .withColumn("VendorID", df_raw_data["VendorID"].cast(types.IntegerType())) \
    .withColumn("tpep_pickup_datetime", df_raw_data["tpep_pickup_datetime"].cast(types.TimestampType())) \
    .withColumn("tpep_dropoff_datetime", df_raw_data["tpep_dropoff_datetime"].cast(types.TimestampType())) \
    .withColumn("passenger_count", df_raw_data["passenger_count"].cast(types.IntegerType())) \
    .withColumn("trip_distance", df_raw_data["trip_distance"].cast(types.DoubleType())) \
    .withColumn("RatecodeID", df_raw_data["RatecodeID"].cast(types.IntegerType())) \
    .withColumn("store_and_fwd_flag", df_raw_data["store_and_fwd_flag"].cast(types.StringType())) \
    .withColumn("PULocationID", df_raw_data["PULocationID"].cast(types.IntegerType())) \
    .withColumn("DOLocationID", df_raw_data["DOLocationID"].cast(types.IntegerType())) \
    .withColumn("payment_type", df_raw_data["payment_type"].cast(types.IntegerType())) \
    .withColumn("fare_amount", df_raw_data["fare_amount"].cast(types.DoubleType())) \
    .withColumn("extra", df_raw_data["extra"].cast(types.DoubleType())) \
    .withColumn("mta_tax", df_raw_data["mta_tax"].cast(types.DoubleType())) \
    .withColumn("tip_amount", df_raw_data["tip_amount"].cast(types.DoubleType())) \
    .withColumn("tolls_amount", df_raw_data["tolls_amount"].cast(types.DoubleType())) \
    .withColumn("improvement_surcharge", df_raw_data["improvement_surcharge"].cast(types.DoubleType())) \
    .withColumn("total_amount", df_raw_data["total_amount"].cast(types.DoubleType())) \
    .withColumn("congestion_surcharge", df_raw_data["congestion_surcharge"].cast(types.DoubleType()))

df_raw_data_casted.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [50]:
df_raw_data_casted \
    .repartition(4) \
    .write.parquet('data/pq/yellow/2024/10/', mode='overwrite')

In [53]:
df_pq = spark.read.parquet ('data/pq/yellow/2024/10/*')

df_pq = df_pq \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

df_pq.registerTempTable('yellow_tripdata_1024')



In [76]:
df_pq = spark.sql("""
SELECT
    COUNT (*) AS num_trips,
    CAST(pickup_datetime AS DATE) AS date
FROM yellow_tripdata_1024
WHERE pickup_datetime >= '2024-10-15'
    AND pickup_datetime < '2024-10-16'
GROUP BY date
""")

df_pq.show()

+---------+----------+
|num_trips|      date|
+---------+----------+
|   128097|2024-10-15|
+---------+----------+



In [99]:
df_pq = spark.sql("""
SELECT
   pickup_datetime,
   dropoff_datetime,
   ROUND ((UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600, 2) AS time_diff_hours
FROM yellow_tripdata_1024
ORDER BY time_diff_hours DESC
LIMIT 3
""")

df_pq.show()

+-------------------+-------------------+---------------+
|    pickup_datetime|   dropoff_datetime|time_diff_hours|
+-------------------+-------------------+---------------+
|2024-10-16 14:03:49|2024-10-23 08:40:53|         162.62|
|2024-10-03 19:47:25|2024-10-09 19:06:55|         143.33|
|2024-10-22 17:00:55|2024-10-28 10:46:33|         137.76|
+-------------------+-------------------+---------------+



In [114]:
df_zones_csv = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

df_zones_csv.write.parquet('data/pq/zones')

In [125]:
df_zones = spark.read.parquet('data/pq/zones/*')

df_pq_table = spark.read.parquet('data/pq/yellow/2024/10/*')

df_join = df_pq_table.join(df_zones, df_pq_table.PULocationID == df_zones.LocationID)

df_join.registerTempTable('trips_zones')

In [140]:
df_join = spark.sql("""
SELECT 
    COUNT (*) AS num_trips,
    PULocationID,
    Zone
FROM trips_zones
GROUP BY 2,3
ORDER BY 1
LIMIT 3
""")
df_join.show(truncate=False)

+---------+------------+---------------------------------------------+
|num_trips|PULocationID|Zone                                         |
+---------+------------+---------------------------------------------+
|1        |105         |Governor's Island/Ellis Island/Liberty Island|
|2        |5           |Arden Heights                                |
|2        |199         |Rikers Island                                |
+---------+------------+---------------------------------------------+

