In [0]:
from pyspark.sql.functions import col, hour, dayofweek, unix_timestamp, count, avg, max, min, when, coalesce
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import functions as F

def print_sorted_schema(df, label="DataFrame"):
    print(f"\n Schema for {label} (sorted):")
    for name in sorted(df.columns):
        dtype = dict(df.dtypes)[name]
        print(f" - {name}: {dtype}")

# ---------- Параметри ----------
today = datetime.today().strftime("%Y-%m-%d")
zone_lookup_path = "s3://robot-dreams-source-data/home-work-1/nyc_taxi/taxi_zone_lookup.csv"
yellow_path = "s3://robot-dreams-source-data/home-work-1-unified/nyc_taxi/yellow/"
green_path = "s3://robot-dreams-source-data/home-work-1-unified/nyc_taxi/green/"
output_path_zone = f"s3://vmilichenko-hw/results/zone_statistic/{today}/zone_statistic.parquet"
output_path_days = f"s3://vmilichenko-hw/results/zone_statistic/{today}/zone_days_statstic.parquet"


# ---------- Рекурсивне завантаження даних ----------
yellow_df = spark.read.option("recursiveFileLookup", "true").parquet(yellow_path)
green_df = spark.read.option("recursiveFileLookup", "true").parquet(green_path)

#print_sorted_schema(yellow_df, "yellow_df")
#print_sorted_schema(green_df, "green_df")
# ---------- Об'єднання датафреймів ----------
raw_trips_df = yellow_df.unionByName(green_df)
# ---------- Обчисленння duration та додаткових колонок ----------
raw_trips_addcol_df = raw_trips_df.withColumn(
    "duration_min",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60
).withColumn(
    "pickup_hour", hour("tpep_pickup_datetime")
).withColumn(
    "pickup_day_of_week", dayofweek("tpep_pickup_datetime")
)
# ---------- Фільтрація аномалій ----------
raw_trips_filtered_df = raw_trips_addcol_df.filter(
    (col("trip_distance") >= 0.1) &
    (col("total_amount") >= 2) &
    (col("duration_min") >= 1)
)
# ---------- JOIN з зонами ----------
zones_df = spark.read.option("header", True).csv(zone_lookup_path)
zones_df = zones_df.withColumnRenamed("LocationID", "location_id")
#print_sorted_schema(zones_df, "zones_df")

df_enriched = raw_trips_filtered_df \
    .join(zones_df.withColumnRenamed("Zone", "pickup_zone"), 
          raw_trips_filtered_df["PULocationID"] == col("location_id"), "left") \
    .drop("location_id") \
    .join(zones_df.withColumnRenamed("Zone", "dropoff_zone"), 
          raw_trips_filtered_df["DOLocationID"] == col("location_id"), "left") \
    .drop("location_id") \
    .drop("borough") \
    .drop("service_zone")     
           
#print_sorted_schema(df_enriched, "df_enriched")
# 6. Збереження в Delta Lake (Unity Catalog + S3)
df_enriched.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("vmilichenko_nyc_catalog.nyc_taxi.raw_trips")