## Imports

In [0]:
from pyspark.sql.functions import lit, col, hour, unix_timestamp, dayofweek
from pyspark.sql.types import *
from pyspark.sql import functions as f
from pyspark.sql.functions import broadcast

## Setting up Unity Catalog

In [0]:
%sql
CREATE CATALOG havdeeva_nyc_catalog
MANAGED LOCATION 's3://havdeeva-databricks/catalog_data/';

-- SHOW CATALOGS;

CREATE SCHEMA havdeeva_nyc_catalog.trips_schema;

-- SHOW SCHEMAS IN havdeeva_nyc_catalog;

USE CATALOG havdeeva_nyc_catalog;
-- SELECT current_catalog(), current_schema();

GRANT ALL PRIVILEGES ON CATALOG `havdeeva_nyc_catalog` TO `deniskulemza1@gmail.com`

## Import Yellow and Green Taxi data

In [0]:
%run "./Dataframes unification function"

In [0]:
# Define path
yellow_taxi_path = "s3://robot-dreams-source-data/home-work-1/nyc_taxi/yellow"
green_taxi_path = "s3://robot-dreams-source-data/home-work-1/nyc_taxi/green"

# Define schemas
yellow_target_schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", LongType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", LongType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", LongType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("payment_type", LongType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("airport_fee", DoubleType(), True),
])

green_target_schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("RatecodeID", LongType(), True),
    StructField("PULocationID", LongType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("passenger_count", LongType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("ehail_fee", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_type", LongType(), True),
    StructField("trip_type", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
])

# Apply unification function to data
yellow_taxi_df = process_parquet_files(spark, yellow_taxi_path, yellow_target_schema)
green_taxi_df = process_parquet_files(spark, green_taxi_path, green_target_schema)

# Check the results
# display(yellow_taxi_df)
yellow_taxi_df.printSchema()
# display(green_taxi_df)
green_taxi_df.printSchema()

# Unify columns and add 'taxi_type' column
green_taxi_df = green_taxi_df \
    .withColumn("taxi_type", lit("green")) \
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

yellow_taxi_df = yellow_taxi_df \
    .withColumn("taxi_type", lit("yellow")) \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [0]:
yellow_taxi_df.display()

In [0]:
display(yellow_taxi_df)

## Combine the data into a single raw_trips_df dataframe

In [0]:
# Union the data
raw_trips_df = yellow_taxi_df.unionByName(green_taxi_df, allowMissingColumns = True)

# Check the schema
print("United dataframe schema:")
raw_trips_df.printSchema()

# Cache raw_trips_df data for better perfomance  
# raw_trips_df.cache()

# Data check to test union 
# raw_trips_df.count() # + trigger catching as an action

# raw_trips_df.rdd.getNumPartitions()

## Filter out anomalous records

In [0]:
# Define filter conditions for valid data
valid_condition = (
    (col("trip_distance") >= 0.1) &  # trips with distance >= 0.1 km
    (col("fare_amount") >= 2) &      # trips with fare amount >= $2
    ((unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))) >= 60)  # trips with duration >= 1 minute
)

# Filter valid trips (normal trips)
filtered_trips_df = raw_trips_df.filter(valid_condition)

# Filtering abnormal trips 
abnormal_trips_df = raw_trips_df.filter(~valid_condition)  

# Show example of filtered data
display(filtered_trips_df)

# Show example of abnormal data
display(abnormal_trips_df)

## *Save abnormal data to DELTA table

In [0]:
%sql
CREATE OR REPLACE TABLE havdeeva_nyc_catalog.trips_schema.raw_trips_abnormal (
    VendorID LONG,
    pickup_datetime TIMESTAMP,
    dropoff_datetime TIMESTAMP,
    passenger_count LONG,
    trip_distance DOUBLE,
    RatecodeID LONG,
    store_and_fwd_flag STRING,
    PULocationID LONG,
    DOLocationID LONG,
    payment_type LONG,
    fare_amount DOUBLE,
    extra DOUBLE,
    mta_tax DOUBLE,
    tip_amount DOUBLE,
    tolls_amount DOUBLE,
    improvement_surcharge DOUBLE,
    total_amount DOUBLE,
    congestion_surcharge DOUBLE,
    airport_fee DOUBLE,
    taxi_type STRING NOT NULL,
    ehail_fee DOUBLE,
    trip_type DOUBLE
)
USING DELTA
TBLPROPERTIES (
    'delta.feature.timestampNtz' = 'supported'
);

In [0]:
abnormal_trips_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("havdeeva_nyc_catalog.trips_schema.raw_trips_abnormal")

In [0]:
%sql
-- describe history havdeeva_nyc_catalog.trips_schema.raw_trips_abnormal;
-- select * from havdeeva_nyc_catalog.trips_schema.raw_trips_abnormal;

## Add new columns

In [0]:
# Add new columns to the filtered dataset 
new_trips_df = filtered_trips_df.withColumn(
    "pickup_hour", hour(col("pickup_datetime"))  # Extract pickup hour from timestamp
).withColumn(
    "pickup_day_of_week", dayofweek(col("pickup_datetime"))  # Extract pickup day of week 
).withColumn(
    "duration_min", (unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))) / 60  # Calculate duration in minutes
)

print("New trips DataFrame schema:")
new_trips_df.printSchema()

# Show example of added data
display(new_trips_df)

## Perform a JOIN with taxi_zone_lookup.csv

In [0]:
# Define path to taxi_zone_lookup.csv
TAXI_ZONE_LOOKUP_PATH = "s3://robot-dreams-source-data/home-work-1-unified/nyc_taxi/taxi_zone_lookup.csv"

# Read the taxi_zone_lookup DataFrame
taxi_zone_lookup_df = spark.read.option("header", "true").csv(TAXI_ZONE_LOOKUP_PATH)

# Add columns PULocationID and DOLocationID to taxi_zone_lookup_df
taxi_zone_lookup_prepared_df = taxi_zone_lookup_df \
    .withColumn("PULocationID", col("LocationID")) \
    .withColumn("DOLocationID", col("LocationID"))

# Perform a broadcast join with new_trips_df
final_stage_df = new_trips_df \
    .join(
        broadcast(taxi_zone_lookup_prepared_df.alias("pickup")), 
        new_trips_df["PULocationID"] == col("pickup.PULocationID"),
        how="left"
    ) \
    .withColumnRenamed("Zone", "pickup_zone") \
    .join(
        broadcast(taxi_zone_lookup_prepared_df.alias("dropoff")), 
        new_trips_df["DOLocationID"] == col("dropoff.DOLocationID"),
        how="left"
    ) \
    .withColumnRenamed("Zone", "dropoff_zone") \
    .select(
        new_trips_df["*"],  # Select all columns from new_trips_df
        col("pickup_zone"),
        col("dropoff_zone")
    )

# Show the schema and the resulting DataFrame
final_stage_df.printSchema()
display(final_stage_df)

## Save data to raw_trips DELTA table

In [0]:
%sql
CREATE OR REPLACE TABLE havdeeva_nyc_catalog.trips_schema.raw_trips (
    VendorID LONG,
    pickup_datetime TIMESTAMP,
    dropoff_datetime TIMESTAMP,
    passenger_count LONG,
    trip_distance DOUBLE,
    RatecodeID LONG,
    store_and_fwd_flag STRING,
    PULocationID LONG,
    DOLocationID LONG,
    payment_type LONG,
    fare_amount DOUBLE,
    extra DOUBLE,
    mta_tax DOUBLE,
    tip_amount DOUBLE,
    tolls_amount DOUBLE,
    improvement_surcharge DOUBLE,
    total_amount DOUBLE,
    congestion_surcharge DOUBLE,
    airport_fee DOUBLE,
    taxi_type STRING NOT NULL,
    ehail_fee DOUBLE,
    trip_type DOUBLE,
    pickup_hour INT,
    pickup_day_of_week INT,
    duration_min DOUBLE,
    pickup_zone STRING,
    dropoff_zone STRING
)
USING DELTA
TBLPROPERTIES (
    'delta.feature.timestampNtz' = 'supported'
);

In [0]:
final_stage_df \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("havdeeva_nyc_catalog.trips_schema.raw_trips")

In [0]:
%sql
-- describe history havdeeva_nyc_catalog.trips_schema.raw_trips;
select * from havdeeva_nyc_catalog.trips_schema.raw_trips;

## zone_summary DataFrame creation

In [0]:
# Group the DataFrame by pickup_zone and perform aggregations
zone_summary_df = spark.table('havdeeva_nyc_catalog.trips_schema.raw_trips').groupBy("pickup_zone").agg(
    f.count("*").alias("total_trips"),                               # Total trips
    f.avg("trip_distance").alias("avg_trip_distance"),               # Average distance
    f.avg("total_amount").alias("avg_total_amount"),                 # Average total amount
    f.avg("tip_amount").alias("avg_tip_amount"),                     # Average tip amount
    f.max("trip_distance").alias("max_trip_distance"),               # Maximum trip distance
    f.min("tip_amount").alias("min_tip_amount"),                     # Minimum tip amount
    f.sum(f.when(col("taxi_type") == "yellow", 1).otherwise(0))      # Count yellow trips
        .alias("yellow_trips"),
    f.sum(f.when(col("taxi_type") == "green", 1).otherwise(0))       # Count green trips
        .alias("green_trips")
)

# Compute yellow_share and green_share
zone_summary_df = zone_summary_df.withColumn(
    "yellow_share", (f.col("yellow_trips") / f.col("total_trips")) * 100
).withColumn(
    "green_share", (f.col("green_trips") / f.col("total_trips")) * 100
).drop("yellow_trips", "green_trips")  # Drop intermediate counts

# Show the resulting zone_summary DataFrame
display(zone_summary_df)

## Save data to zone_summary DELTA table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS havdeeva_nyc_catalog.trips_schema.zone_summary
(
    pickup_zone STRING,
    total_trips BIGINT NOT NULL,
    avg_trip_distance DOUBLE,
    avg_total_amount DOUBLE,
    avg_tip_amount DOUBLE,
    max_trip_distance DOUBLE,
    min_tip_amount DOUBLE,
    yellow_share DOUBLE,
    green_share DOUBLE
)
USING DELTA;

In [0]:
zone_summary_df \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("havdeeva_nyc_catalog.trips_schema.zone_summary")

In [0]:
%sql
-- select * from havdeeva_nyc_catalog.trips_schema.zone_summary;
describe history havdeeva_nyc_catalog.trips_schema.zone_summary;

## zone_days_summary creation

In [0]:
%sql
CREATE OR REPLACE TABLE havdeeva_nyc_catalog.trips_schema.zone_days_summary
USING DELTA
AS
SELECT
    pickup_zone,
    pickup_day_of_week,
    COUNT(*) AS total_trips, -- Total trips per zone and day
    SUM(CASE WHEN total_amount > 30 THEN 1 ELSE 0 END) AS high_fare_trips, -- Total trips with fare > 30
    (SUM(CASE WHEN total_amount > 30 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) AS high_fare_share -- Percentage of trips with fare > 30
FROM
    havdeeva_nyc_catalog.trips_schema.raw_trips
GROUP BY
    pickup_zone,
    pickup_day_of_week;

In [0]:
%sql
select * from havdeeva_nyc_catalog.trips_schema.zone_days_summary;
-- describe history havdeeva_nyc_catalog.trips_schema.zone_days_summary;