In [1]:
print(spark.version)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
180,application_1752831585852_0177,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3.5.5-amzn-0

In [2]:
import boto3
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("NYC Taxi ETL").getOrCreate()
s3 = boto3.client("s3")

def list_year_folders(bucket: str, prefix: str) -> list[str]:
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/")
    prefixes = []
    for page in pages:
        for cp in page.get("CommonPrefixes", []):
            prefixes.append(cp["Prefix"])
    return prefixes

BUCKET = "robot-dreams-source-data"
Y_PREFIX = "home-work-1-unified/nyc_taxi/yellow/"
G_PREFIX = "home-work-1-unified/nyc_taxi/green/"

# 1) Збираємо список наявних папок
yellow_years = list_year_folders(BUCKET, Y_PREFIX)
green_years  = list_year_folders(BUCKET, G_PREFIX)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# 2) Формуємо повні s3a-шляхи
yellow_paths = [f"s3a://{BUCKET}/{p}" for p in yellow_years]
green_paths  = [f"s3a://{BUCKET}/{p}" for p in green_years]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# 3) Зчитуємо всі разом, додаємо taxi_type і об’єднуємо
yellow_df = spark.read.parquet(*yellow_paths).withColumn("taxi_type", F.lit("yellow"))
green_df  = spark.read.parquet(*green_paths) .withColumn("taxi_type", F.lit("green"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# 4) Об’єднуємо в один DataFrame
raw_trips_df = yellow_df.unionByName(green_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# 5) Перевірка: кілька рядків і схема
raw_trips_df.printSchema()
raw_trips_df.show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- taxi_type: string (nullable = false)

+--------+---------------+-------------+----------+------------------+------------+---------

In [7]:
# 6) Фільтрація аномальних поїздок і розрахунок duration_sec
filtered_df = raw_trips_df \
    .withColumn(
        "duration_sec",
        F.unix_timestamp("tpep_dropoff_datetime")
        - F.unix_timestamp("tpep_pickup_datetime")
    ) \
    .filter(
        (F.col("trip_distance") >= 0.1) &
        (F.col("total_amount")  >= 2)   &
        (F.col("duration_sec")   >= 60)
    )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# 7) Додаємо pickup_hour, pickup_day_of_week та duration_min
processed_df = filtered_df \
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime")) \
    .withColumn("pickup_day_of_week", F.date_format("tpep_pickup_datetime", "EEEE")) \
    .withColumn("duration_min", (F.col("duration_sec")/60).cast("double"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# 8) Перевірка: схема та перші 5 рядків
processed_df.printSchema()
processed_df.show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- taxi_type: string (nullable = false)
 |-- duration_sec: long (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pi

In [10]:
# 9) при зчитуванні csv запит довго відпрацьовував, тому я зробив трохи іншу логіку збору цього файлу (хоча можна було простіше: https://spark.apache.org/docs/latest/sql-data-sources-csv.html)

import boto3
import csv
from io import StringIO
from pyspark.sql import functions as F

# Завантажуємо CSV напряму через boto3 (лише GetObject)
s3 = boto3.client("s3")
obj = s3.get_object(
    Bucket="robot-dreams-source-data",
    Key="home-work-1-unified/nyc_taxi/taxi_zone_lookup.csv"
)
text = obj["Body"].read().decode("utf-8")

# Розбиваємо на рядки і парсимо через csv.DictReader
lines = text.splitlines()
reader = csv.DictReader(lines)  
rows = [row for row in reader]  # кожен row: {'LocationID': '1', 'Borough':'EWR', 'Zone':'Newark Airport', ...}

# Створюємо Spark DataFrame (усі колонки спочатку string)
zones_df_spark = spark.createDataFrame(rows)

# Кастимо типи руками
zones_df = zones_df_spark \
    .withColumn("LocationID",   F.col("LocationID").cast("int")) \
    .withColumn("Borough",      F.col("Borough").cast("string")) \
    .withColumn("Zone",         F.col("Zone").cast("string")) \
    .withColumn("service_zone", F.col("service_zone").cast("string"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# 10) Готуємо lookup для JOIN
pickup_lookup = zones_df.select(
    F.col("LocationID").alias("PULocationID"),
    F.col("Zone")      .alias("pickup_zone")
)
dropoff_lookup = zones_df.select(
    F.col("LocationID").alias("DOLocationID"),
    F.col("Zone")      .alias("dropoff_zone")
)

# 11) JOIN з processed_df
joined_df = processed_df \
    .join(pickup_lookup,  on="PULocationID", how="left") \
    .join(dropoff_lookup, on="DOLocationID", how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# 12) Створюємо zone_summary із необхідними агрегаціями
zone_summary = joined_df.groupBy("pickup_zone").agg(
    # Загальна кількість поїздок
    F.count("*").alias("total_trips"),
    # Cередня відстань поїздки
    F.avg("trip_distance").alias("avg_trip_distance"),
    # Середній загальний тариф
    F.avg("total_amount").alias("avg_total_amount"),
    # Середня сума чайових
    F.avg("tip_amount").alias("avg_tip_amount"),
    # Частка жовтих поїздок у зоні
    (F.sum(F.when(F.col("taxi_type") == "yellow", 1).otherwise(0)) 
     / F.count("*")
    ).alias("yellow_share"),
    # Частка зелених поїздок у зоні
    (F.sum(F.when(F.col("taxi_type") == "green", 1).otherwise(0)) 
     / F.count("*")
    ).alias("green_share"),
    # Максимальна відстань поїздки
    F.max("trip_distance").alias("max_trip_distance"),
    # Мінімальна сума чайових
    F.min("tip_amount").alias("min_tip_amount")
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# 13) # Пишемо вже маленький DF в S3 одним файлом
bucket      = "odubynskyi-emr-studio"
run_date    = "22.07.2025"
output_path = f"s3a://{bucket}/results/zone_statistic/{run_date}/zone_statistic.parquet"

# repartition(1) робить один таск shuffle замість багатьох
zone_summary.repartition(1) \
    .write \
    .mode("overwrite") \
    .parquet(output_path)

print(f"Zone_summary written to {output_path}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Zone_summary written to s3a://odubynskyi-emr-studio/results/zone_statistic/22.07.2025/zone_statistic.parquet

In [14]:
# 14) Фільтрація поїздок у середу
wednesday_df = joined_df.filter(
    F.col("pickup_day_of_week") == "Wednesday"
)

# 15) Агрегація по зоні посадки:
#    – total_trips      загальна кількість поїздок
#    – high_fare_share  частка поїздок з total_amount > $30
wednesday_zone_stats = wednesday_df.groupBy("pickup_zone").agg(
    F.count("*").alias("total_trips"),
    (
      F.sum(F.when(F.col("total_amount") > 30, 1).otherwise(0)) 
      / F.count("*")
    ).alias("high_fare_share")
)

# 16) Перевірка результату
wednesday_zone_stats.printSchema()
wednesday_zone_stats.show(10, truncate=False)

# 17) Збереження результату одним файлом у S3
bucket    = "odubynskyi-emr-studio"
run_date  = "22.07.2025"
output    = f"s3a://{bucket}/results/zone_days_statstic/{run_date}/zone_days_statstic.parquet"

wednesday_zone_stats.coalesce(1) \
    .write \
    .mode("overwrite") \
    .parquet(output)

# 18) Підтвердження запису
print(f"Wednesday stats written to {output}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_zone: string (nullable = true)
 |-- total_trips: long (nullable = false)
 |-- high_fare_share: double (nullable = true)

+------------------------------+-----------+--------------------+
|pickup_zone                   |total_trips|high_fare_share     |
+------------------------------+-----------+--------------------+
|Pelham Parkway                |1424       |0.32724719101123595 |
|Rego Park                     |9043       |0.3168196395001659  |
|Kew Gardens Hills             |2685       |0.3437616387337058  |
|Jackson Heights               |45404      |0.13600123337150913 |
|Yorkville West                |1814066    |0.05382163603749809 |
|Stuy Town/Peter Cooper Village|277540     |0.047798515529293074|
|Clinton Hill                  |38527      |0.09479066628598126 |
|East Harlem South             |676749     |0.06344449714739142 |
|Longwood                      |987        |0.3171225937183384  |
|Hudson Sq                     |687364     |0.07586955383173981 |
+---

In [16]:
zone_summary.printSchema()
zone_summary.show(10, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_zone: string (nullable = true)
 |-- total_trips: long (nullable = false)
 |-- avg_trip_distance: double (nullable = true)
 |-- avg_total_amount: double (nullable = true)
 |-- avg_tip_amount: double (nullable = true)
 |-- yellow_share: double (nullable = true)
 |-- green_share: double (nullable = true)
 |-- max_trip_distance: double (nullable = true)
 |-- min_tip_amount: double (nullable = true)

+------------------------------+-----------+------------------+------------------+------------------+------------+-----------+-----------------+--------------+
|pickup_zone                   |total_trips|avg_trip_distance |avg_total_amount  |avg_tip_amount    |yellow_share|green_share|max_trip_distance|min_tip_amount|
+------------------------------+-----------+------------------+------------------+------------------+------------+-----------+-----------------+--------------+
|Westerleigh                   |609        |8.840492610837437 |41.99747126436779 |3.592692939244664 |1.0