In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("NYC Taxi Data Processing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

urls = [
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-04.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-05.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-06.parquet"
]

dfs = []
for url in urls:
    df = spark.read.parquet(url)
    dfs.append(df)

trips_df = dfs[0]
for df in dfs[1:]:
    trips_df = trips_df.union(df)

trips_df = trips_df.filter(
    (col("tpep_pickup_datetime") >= "2025-01-01") &
    (col("tpep_pickup_datetime") <= "2025-06-30") &
    (col("tpep_dropoff_datetime") >= "2025-01-01") &
    (col("tpep_dropoff_datetime") <= "2025-06-30") &
    (col("trip_distance") > 0) &
    (col("passenger_count") > 0)
)

trips_df = trips_df.withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
                   .withColumn("dropoff_hour", hour("tpep_dropoff_datetime"))

selected_columns = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime", 
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "total_amount",
    "pickup_hour",
    "dropoff_hour"
]

trips_df = trips_df.select(selected_columns)

zones_df = spark.read.option("header", "true").csv("https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv")

trips_with_zones_df = trips_df.alias("trips") \
    .join(zones_df.alias("pickup_zone"), 
          col("trips.PULocationID") == col("pickup_zone.LocationID")) \
    .join(zones_df.alias("dropoff_zone"), 
          col("trips.DOLocationID") == col("dropoff_zone.LocationID")) \
    .select(
        col("trips.*"),
        col("pickup_zone.Zone").alias("pickup_zone"),
        col("dropoff_zone.Zone").alias("dropoff_zone")
    )

hourly_aggregation = trips_with_zones_df.groupBy("pickup_zone", "pickup_hour") \
    .agg(count("*").alias("trip_count")) \
    .orderBy("pickup_zone", "pickup_hour")
daily_hourly_aggregation = trips_with_zones_df \
    .withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
    .groupBy("pickup_zone", "pickup_date", "pickup_hour") \
    .agg(count("*").alias("daily_trip_count"))

avg_hourly_aggregation = daily_hourly_aggregation.groupBy("pickup_zone", "pickup_hour") \
    .agg(avg("daily_trip_count").alias("avg_trip_count")) \
    .orderBy("pickup_zone", "pickup_hour")

final_result = avg_hourly_aggregation.groupBy("pickup_zone") \
    .pivot("pickup_hour", [str(i) for i in range(24)]) \
    .agg(first("avg_trip_count")) \
    .orderBy("pickup_zone")

for i in range(24):
    final_result = final_result.withColumnRenamed(str(i), f"hour_{i}")

final_result.write.mode("overwrite").parquet("nyc_taxi_analysis_results.parquet")

print("Схема итогового DataFrame:")
final_result.printSchema()

print("\nПервые 10 строк результатов:")
final_result.show(10, truncate=False)

print("\nСтатистика по данным:")
final_result.describe().show()

spark.stop()