In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType, LongType, StringType
import pyspark.sql.functions as F
import boto3
from functools import reduce
from datetime import datetime
import time

s3_bucket_name = "robot-dreams-source-data"
yellow_taxi_path = "home-work-1/nyc_taxi/yellow"
green_taxi_path = "home-work-1/nyc_taxi/green"
zone_path = "home-work-1/nyc_taxi"

VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
4,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def list_s3_parquet_files(bucket: str, prefix: str) -> list:
    """
    Get list of parquet files from S3 by prefix.
    """
    s3 = boto3.client("s3")
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

    files = []
    for page in pages:
        for obj in page.get('Contents', []):
            key = obj['Key']
            if key.endswith('.parquet'):
                files.append(f"s3://{bucket}/{key}")
    return files

def load_and_cast_parquet_files_s3(spark, bucket: str, prefix: str, schema_map: dict, source_label: str, rename_map: dict = None):
    """
    Downloads parquet files from S3, converts them to a single schema, and merges them.

    :param spark: SparkSession
    :param bucket: S3 bucket name
    :param prefix: prefix inside the bucket
    :param schema_map: dictionary with types
    :param source_label: string that will be added to the source column
    :param rename_map: optional {old_col: new_col} to align different names
    :return: merged DataFrame
    """
    parquet_files = list_s3_parquet_files(bucket, prefix)
    if not parquet_files:
        raise Exception(f"No parquet files found on path s3://{bucket}/{prefix}")

    dfs = []
    for file_path in parquet_files:
        df = spark.read.parquet(file_path)
        
        if rename_map:
            for old_name, new_name in rename_map.items():
                if old_name in df.columns:
                    df = df.withColumnRenamed(old_name, new_name)

        for col_name, dtype in schema_map.items():
            if col_name in df.columns:
                df = df.withColumn(col_name, F.col(col_name).cast(dtype))
        
        # add taxi_type column
        df = df.withColumn("taxi_type", F.lit(source_label))
        dfs.append(df)

        # Union all dataframes
        unified_df = reduce(lambda x, y: x.unionByName(y, allowMissingColumns=True), dfs)

    return unified_df


def save_single_parquet_to_s3(df, temp_path, bucket, tmp_prefix, final_prefix, spark, wait_seconds=5):
    """
    Saves a Spark DataFrame as a single Parquet file to S3 and renames the part file.

    :param df: Spark DataFrame
    :param temp_path:
    :param bucket: s3 bucket name
    :param tmp_prefix: temporary path to save part-file
    :param final_prefix: path to save renamed file
    :param spark: SparkSession
    :param wait_seconds: seconds to wait for consistency (default: 5)
    """
    # Save DataFrame to temporary path in S3 (as part-*.parquet)
    df.coalesce(1).write.mode("overwrite").parquet(temp_path)

    # Use boto3 to find the part file
    s3 = boto3.client("s3")
    
    time.sleep(wait_seconds)  # Wait for eventual consistency in S3

    response = s3.list_objects_v2(Bucket=bucket, Prefix=tmp_prefix)
    part_key = None
    for obj in response.get("Contents", []):
        if obj["Key"].endswith(".parquet") and "part-" in obj["Key"]:
            part_key = obj["Key"]
            break

    if not part_key:
        raise Exception(f"No part file found in {temp_path}")

    # Copy and rename the part file to final path
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key": part_key},
        Key=final_prefix
    )

    # Delete original part file
    s3.delete_object(Bucket=bucket, Key=part_key)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
schema = {
    "VendorID": LongType(),
    "pickup_datetime": StringType(),
    "dropoff_datetime": StringType(),
    "passenger_count": LongType(),
    "trip_distance": DoubleType(),
    "RatecodeID": LongType(),
    "store_and_fwd_flag": StringType(),
    "PULocationID": LongType(),
    "DOLocationID": LongType(),
    "payment_type": LongType(),
    "fare_amount": DoubleType(),
    "extra": DoubleType(),
    "mta_tax": DoubleType(),
    "tip_amount": DoubleType(),
    "tolls_amount": DoubleType(),
    "ehail_fee": DoubleType(),
    "improvement_surcharge": DoubleType(),
    "total_amount": DoubleType(),
    "trip_type": DoubleType(),
    "congestion_surcharge": DoubleType(),
    "airport_fee": DoubleType()
}
rename_map = {
    "lpep_pickup_datetime": "pickup_datetime",
    "tpep_pickup_datetime": "pickup_datetime",
    "lpep_dropoff_datetime": "dropoff_datetime",
    "tpep_dropoff_datetime": "dropoff_datetime"
}

df_green = load_and_cast_parquet_files_s3(
    spark,
    bucket=s3_bucket_name,
    prefix=green_taxi_path,
    source_label = "green",
    schema_map=schema,
    rename_map=rename_map
)

df_yellow = load_and_cast_parquet_files_s3(
    spark,
    bucket=s3_bucket_name,
    prefix=yellow_taxi_path,
    source_label = "yellow",
    schema_map=schema,
    rename_map=rename_map
)

all_taxi_df = df_green.unionByName(df_yellow, allowMissingColumns=True)

# Add columns pickup_hour, pickup_day_of_week, duration_min and filter df
raw_trips_wo_zones_df = all_taxi_df \
    .withColumn("pickup_ts", F.to_timestamp("pickup_datetime")) \
    .withColumn("dropoff_ts", F.to_timestamp("dropoff_datetime")) \
    .withColumn("pickup_hour", F.hour("pickup_datetime")) \
    .withColumn("pickup_day_of_week", F.date_format("pickup_ts", "EEEE")) \
    .withColumn("duration_min",
                F.round((F.unix_timestamp("dropoff_ts") - F.unix_timestamp("pickup_ts")) / 60, 2)
    ) \
    .filter(
        ( F.col("trip_distance") >= 0.1) &
        ( F.col("fare_amount") >= 2) &
        ( F.col("duration_min") >= 1)
    )

zones_df = spark.read.csv(f"s3://{s3_bucket_name}/{zone_path}/taxi_zone_lookup.csv", header=True, inferSchema=True)
raw_trips_df = raw_trips_wo_zones_df \
    .join(
        zones_df.selectExpr("LocationID as PULocationID", "Zone as pickup_zone"),
        on="PULocationID",
        how="left"
    ) \
    .join(
        zones_df.selectExpr("LocationID as DOLocationID", "Zone as dropoff_zone"),
        on="DOLocationID",
        how="left"
    )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [4]:
# Aggregation of trips by pickup zones
zone_summary = raw_trips_df.groupBy("pickup_zone").agg(
    F.count("*").alias("total_trips"),
    F.round(F.avg("trip_distance"),3).alias("avg_trip_distance"),
    F.round(F.avg("total_amount"),2).alias("avg_total_amount"),
    F.round(F.avg("tip_amount"),2).alias("avg_tip_amount"),
    F.sum(F.when(F.col("taxi_type") == "yellow", 1).otherwise(0)).alias("yellow_trips"),
    F.sum(F.when(F.col("taxi_type") == "green", 1).otherwise(0)).alias("green_trips"),
    F.max("trip_distance").alias("max_trip_distance"),
    F.min("tip_amount").alias("min_tip_amount")
).withColumn(
    "yellow_share",
    F.when(F.col("total_trips") != 0, F.round(F.col("yellow_trips") / F.col("total_trips"),2)).otherwise(0)
).withColumn(
    "green_share",
    F.when(F.col("total_trips") != 0, F.round(F.col("green_trips") / F.col("total_trips"),2)).otherwise(0)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Save result zone statistic
date = datetime.today().strftime("%Y-%m-%d")
bucket_for_results = "vsentishcheva-hw"
tmp_prefix = f"results/tmp_folder/{date}"
tmp_path = f"s3://{bucket_for_results}/{tmp_prefix}"

zone_file_name = "zone_statistic.parquet"
zone_prefix = f"results/zone_statistic/{date}/{zone_file_name}"

save_single_parquet_to_s3(
    df=zone_summary,
    temp_path=tmp_path,
    bucket = bucket_for_results,
    tmp_prefix=tmp_prefix,
    final_prefix=zone_prefix,
    spark=spark)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Aggregation of trips by pickup zones and days of the week
zone_days_summary = raw_trips_df.groupBy("pickup_zone", "pickup_day_of_week").agg(
    F.count("*").alias("total_trips"),
    F.round((F.sum(F.when(F.col("total_amount") > 30, 1).otherwise(0)) / F.count("*")),2).alias("high_fare_share")
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Save result zone days statstic

zone_days_file_name = "zone_days_statistic.parquet"
zone_days_prefix = f"results/zone_days_statistic/{date}/{zone_days_file_name}"

save_single_parquet_to_s3(
    df=zone_summary,
    temp_path=tmp_path,
    bucket = bucket_for_results,
    tmp_prefix=tmp_prefix,
    final_prefix=zone_days_prefix,
    spark=spark)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…