In [1]:
# import utils
from utils import ingest_landing, load_bronze, sanitize_columns

In [2]:
# Connect to Spark and Minio
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from minio import Minio

spark = SparkSession.builder.appName("Jupyter").getOrCreate()

access_key = "admin" 
secret_key = "password"
minio_api_host = "http://minio:9000"
minio_client = Minio("minio:9000", access_key=access_key, secret_key=secret_key, secure=False)

spark

24/08/05 09:15:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Ingestion Step - Landing Zone - yellow_tripdata
from datetime import datetime, timedelta

startDate = datetime(2024, 1, 1)
endDate = datetime(2024, 6, 1)
 
addDays = timedelta(days=31)
while startDate <= endDate:
    fmt_date = startDate.strftime("%Y-%m")
    ingest_landing(src=f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{fmt_date}.parquet", 
               local_dest=f"/home/iceberg/data/yellow_tripdata_{fmt_date}.parquet",
               minio_dest=f"yellow_taxi_trip_records/yellow_tripdata_{fmt_date}.parquet",
               minio_client=minio_client)
    startDate += addDays

Ingestion https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet Successful
Ingestion https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet Successful
Ingestion https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet Successful
Ingestion https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet Successful
Ingestion https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet Successful


In [None]:
# Loading Step - Bronze Zone - yellow trip data
from datetime import datetime, timedelta

startDate = datetime(2024, 1, 1)
endDate = datetime(2024, 6, 1)
 
addDays = timedelta(days=31)
while startDate <= endDate:
    fmt_date = startDate.strftime("%Y-%m")
    load_bronze(local_src=f"/home/iceberg/data/yellow_tripdata_{fmt_date}.parquet",
                minio_src=f"yellow_taxi_trip_records/yellow_tripdata_{fmt_date}.parquet",
                dest_table="bronze.nyc_yellow_tripdata",
                file_type="parquet",
                spark=spark,
                minio_client=minio_client)
    startDate += addDays



In [None]:
# Silver Zone - schema definition - nyc_yellow_tripdata
spark.sql("""
CREATE TABLE IF NOT EXISTS silver.nyc_yellow_tripdata (
  `vendor_id` BIGINT,
  `pickup_timestamp` TIMESTAMP,
  `dropoff_timestamp` TIMESTAMP,
  `passenger_count` DOUBLE,
  `trip_distance` DOUBLE,
  `rate_code_id` DOUBLE,
  `store_and_fwd_flag` STRING,
  `pickup_location_id` BIGINT,
  `dropoff_location_id` BIGINT,
  `payment_type` BIGINT,
  `fare_amount` DOUBLE,
  `extra` DOUBLE,
  `mta_tax` DOUBLE,
  `tip_amount` DOUBLE,
  `tolls_amount` DOUBLE,
  `improvement_surcharge` DOUBLE,
  `total_amount` DOUBLE,
  `congestion_surcharge` DOUBLE,
  `airport_fee` DOUBLE,
  `ingest_time` TIMESTAMP
  )
USING iceberg
PARTITIONED BY (month(pickup_timestamp))
TBLPROPERTIES(
  'write.target-file-size-bytes'='5242880'
)
""")

In [None]:
bronze_transform = spark.read.table("bronze.nyc_yellow_tripdata")

# sanitize column names
columns_to_rename = {"VendorID": "vendor_id", 
                     "RatecodeID": "rate_code_id", 
                     "PULocationID": "pickup_location_id", 
                     "DOLocationID": "dropoff_location_id", 
                     "Airport_fee": "airport_fee",
                     "tpep_pickup_datetime": "pickup_timestamp",
                     "tpep_dropoff_datetime": "dropoff_timestamp",
                    }

bronze_transform = sanitize_columns(bronze_transform, columns_to_rename)

# remove invalid records - 0 trip_distance, no passenger count, or 0 passenger count, 0 total_amount
bronze_transform = bronze_transform.filter(~((f.col("trip_distance") <= 0) 
                                             | (f.col("passenger_count").isNull()) 
                                             | (f.col("passenger_count") <= 0)
                                             | (f.col("total_amount") <= 0)
                                             | (f.col("dropoff_timestamp") <= f.col("pickup_timestamp"))
                                             | (f.day(f.col("dropoff_timestamp")) - f.day(f.col("pickup_timestamp")) > 1)
                                             | (f.year(f.col("pickup_timestamp")) < 2024)
                                            ))

# deduplicate against silver
silver = spark.read.table("silver.nyc_yellow_tripdata")
bronze_transform = bronze_transform.unionAll(silver)
bronze_transform = bronze_transform.selectExpr(
  "*", 
  "count(*) over (partition by vendor_id, pickup_timestamp, dropoff_timestamp, passenger_count, trip_distance, rate_code_id, store_and_fwd_flag, pickup_location_id, dropoff_location_id, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, airport_fee) as cnt"
).filter(f.col("cnt") == 1).drop("cnt")

# if transformed df is not empty then write to silver
if len(bronze_transform.head(1)) > 0:
    # add ingestion time for silver
    bronze_transform = bronze_transform.withColumn("ingest_time", f.current_timestamp())
    bronze_transform.writeTo("silver.nyc_yellow_tripdata").append()
    print("Records written to silver.")

else:
    print("No new records to write to silver.")

In [None]:
yellow_trip_average_metrics = spark.sql("""
with cte as (
select
    year(a.pickup_timestamp) as trip_year,
    month(a.pickup_timestamp) as trip_month,
    case when dayofweek(a.pickup_timestamp) = 1 then 'Sunday' 
    when dayofweek(a.pickup_timestamp) = 2 then 'Monday' 
    when dayofweek(a.pickup_timestamp) = 3 then 'Tuesday' 
    when dayofweek(a.pickup_timestamp) = 4 then 'Wednesday' 
    when dayofweek(a.pickup_timestamp) = 5 then 'Thursday' 
    when dayofweek(a.pickup_timestamp) = 6 then 'Friday' 
    when dayofweek(a.pickup_timestamp) = 7 then 'Saturday'
    end as trip_weekday,
    dayofweek(a.pickup_timestamp) as dayofweek_number,
    b.borough as pickup_borough,
    c.borough as dropoff_borough,
    round((unix_timestamp(a.dropoff_timestamp)-unix_timestamp(a.pickup_timestamp))/(60), 2) as trip_duration_min,
    a.trip_distance,
    a.total_amount
from silver.nyc_yellow_tripdata a
left join silver.taxi_zone_lookup b on a.pickup_location_id = b.location_id
left join silver.taxi_zone_lookup c on a.dropoff_location_id = c.location_id )

select trip_year, trip_month, trip_weekday, dayofweek_number, pickup_borough, 
    round(avg(trip_duration_min), 2) as average_trip_duration_min, 
    round(avg(trip_distance), 2) as average_trip_distance_miles, 
    round(avg(total_amount), 2) as average_total_amount
from cte 
where pickup_borough is not null and pickup_borough != 'N/A' and pickup_borough != 'Unknown'
group by trip_year, trip_month, trip_weekday, dayofweek_number, pickup_borough
order by trip_year, trip_month, dayofweek_number asc
""")

yellow_trip_average_metrics.writeTo("gold.yellow_trip_average_metrics").createOrReplace()