In [0]:
df = spark.read.csv('/mnt/robot-dreams-source-mount/Lecture_3/nyc_taxi/taxi_zone_lookup.csv', header=True, inferSchema=True)
display(df)



LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [0]:
%sql
CREATE CATALOG igor_zabelkin_nyc_catalog
MANAGED LOCATION 's3://izabelkin-emr-databricks/igor_zabelkin_nyc_catalog/';


In [0]:
%sql
USE CATALOG igor_zabelkin_nyc_catalog;


In [0]:
%sql
CREATE SCHEMA trips_schema;


In [0]:
%sql
USE SCHEMA trips_schema;

In [0]:
YELLOW_PATH = "/mnt/robot-dreams-source-mount/home-work-1-unified/nyc_taxi/yellow/"
GREEN_PATH = "/mnt/robot-dreams-source-mount/home-work-1-unified/nyc_taxi/green/"
ZONES_PATH = "/mnt/robot-dreams-source-mount/home-work-1-unified/nyc_taxi/taxi_zone_lookup.csv"

yellow_df = spark.read.option("recursiveFileLookup", "true").parquet(YELLOW_PATH)
green_df = spark.read.option("recursiveFileLookup", "true").parquet(GREEN_PATH)
#zones_df = spark.read.option("header", "true").csv(ZONES_PATH)


In [0]:
from pyspark.sql.functions import col, lit
yellow_df = yellow_df.withColumn("taxi_type", lit("yellow"))
green_df = green_df.withColumn("taxi_type", lit("green"))


In [0]:
trips_df = yellow_df.unionByName(green_df)


In [0]:
from pyspark.sql.functions import unix_timestamp

trips_df = trips_df.withColumn(
    "duration_min",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60
)

trips_df = trips_df.filter(
    (col("trip_distance") >= 0.1) &
    (col("fare_amount") >= 2.0) &
    (col("duration_min") >= 1.0)
)


In [0]:
from pyspark.sql.functions import hour, date_format

trips_df = trips_df \
    .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
    .withColumn("pickup_day_of_week", date_format("tpep_pickup_datetime", "E"))


In [0]:
taxi_zones_df = spark.read.option("header", True).csv(
    "s3a://robot-dreams-source-data/Lecture_3/nyc_taxi/taxi_zone_lookup.csv"
)

# JOIN для pickup_zone
trips_df = trips_df.join(
    taxi_zones_df.withColumnRenamed("LocationID", "PULocationID")
                 .withColumnRenamed("Zone", "tpep_pickup_zone"),
    on="PULocationID",
    how="left"
)

# JOIN для dropoff_zone
trips_df = trips_df.join(
    taxi_zones_df.withColumnRenamed("LocationID", "DOLocationID")
                 .withColumnRenamed("Zone", "tpep_dropoff_zone"),
    on="DOLocationID",
    how="left"
)




In [0]:
trips_df.write.format("delta").mode("overwrite").saveAsTable("igor_zabelkin_nyc_catalog.trips_schema.raw_trips")




In [0]:
%sql
SELECT * 
FROM igor_zabelkin_nyc_catalog.trips_schema.raw_trips
LIMIT 10;


VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,tpep_pickup_datetime,tpep_dropoff_datetime,Airport_fee,taxi_type,duration_min,pickup_hour,pickup_day_of_week
1,1,2.0,1,N,107,249,1,9.5,0.5,0.5,2.1,0.0,0.0,12.6,,2014-02-01T00:05:03,2014-02-01T00:15:44,,yellow,10.683333333333334,0,Sat
1,1,0.7,1,N,158,90,1,4.5,0.5,0.5,1.5,0.0,0.0,7.0,,2014-02-01T00:17:19,2014-02-01T00:20:15,,yellow,2.933333333333333,0,Sat
1,2,2.4,1,N,234,246,1,11.5,0.5,0.5,2.5,0.0,0.0,15.0,,2014-02-01T00:24:08,2014-02-01T00:37:07,,yellow,12.983333333333333,0,Sat
1,1,2.3,1,N,107,140,1,9.0,0.5,0.5,2.5,0.0,0.0,12.5,,2014-02-01T00:00:23,2014-02-01T00:09:26,,yellow,9.05,0,Sat
1,1,0.9,1,N,162,141,2,5.0,0.5,0.5,0.0,0.0,0.0,6.0,,2014-02-01T00:18:58,2014-02-01T00:22:34,,yellow,3.6,0,Sat
1,1,4.1,1,N,141,112,1,22.5,0.5,0.5,5.85,0.0,0.0,29.35,,2014-02-01T00:24:05,2014-02-01T00:56:09,,yellow,32.06666666666667,0,Sat
1,3,5.4,1,Y,237,87,1,20.5,0.5,0.5,5.37,0.0,0.0,26.87,,2014-02-01T00:08:11,2014-02-01T00:34:11,,yellow,26.0,0,Sat
1,1,5.2,1,N,211,239,1,19.0,0.5,0.5,3.0,0.0,0.0,23.0,,2014-02-01T00:51:34,2014-02-01T01:13:37,,yellow,22.05,0,Sat
1,1,1.0,1,N,246,48,1,5.5,0.5,0.5,1.62,0.0,0.0,8.12,,2014-02-01T00:06:00,2014-02-01T00:11:28,,yellow,5.466666666666667,0,Sat
1,1,0.9,1,N,230,170,2,6.5,0.5,0.5,0.0,0.0,0.0,7.5,,2014-02-01T00:29:42,2014-02-01T00:37:02,,yellow,7.333333333333333,0,Sat


In [0]:
from pyspark.sql.functions import (
    col, count, avg, max, min, sum as _sum, expr
)

zone_summary_df = trips_df.groupBy("tpep_pickup_zone").agg(
  count("*").alias("total_trips"),
    avg("trip_distance").alias("avg_trip_distance"),
    avg("total_amount").alias("avg_total_amount"),
    avg("tip_amount").alias("avg_tip_amount"),
    _sum(expr("CASE WHEN taxi_type = 'yellow' THEN 1 ELSE 0 END")).alias("yellow_trips"),
    _sum(expr("CASE WHEN taxi_type = 'green' THEN 1 ELSE 0 END")).alias("green_trips"),
    max("trip_distance").alias("max_trip_distance"),
    min("tip_amount").alias("min_tip_amount"),
    _sum("total_amount").alias("total_trip_amount")
).withColumn(
    "yellow_share", col("yellow_trips") / col("total_trips")
).withColumn(
    "green_share", col("green_trips") / col("total_trips")
).drop("yellow_trips", "green_trips")


In [0]:
zone_summary_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("igor_zabelkin_nyc_catalog.trips_schema.zone_summary")


In [0]:
%sql
SELECT * 
FROM igor_zabelkin_nyc_catalog.trips_schema.zone_summary
LIMIT 10;

tpep_pickup_zone,total_trips,avg_trip_distance,avg_total_amount,avg_tip_amount,max_trip_distance,min_tip_amount,total_trip_amount,yellow_share,green_share
Homecrest,12564,5.402462591531358,24.67882043935055,1.4318863419293215,126.5,0.0,310064.7000000003,1.0,0.0
Governor's Island/Ellis Island/Liberty Island,1867,3.901039100160685,18.741730048205675,1.9904392072844133,29.1,0.0,34990.81,1.0,0.0
Corona,31674,13.896767695902009,34.835970196375506,3.0720827808297027,250984.47,0.0,1103394.5199999977,1.0,0.0
Bensonhurst West,13792,31.847469547563804,27.815447360788877,1.467676189095127,350696.98,0.0,383630.65000000014,1.0,0.0
Westerleigh,601,8.390948419301163,41.96752079866888,3.374292845257903,90.06,0.0,25222.48,1.0,0.0
Newark Airport,5356,9.416075429424945,79.8441598207618,8.151911874533235,202.9,0.0,427645.3200000002,1.0,0.0
Douglaston,3276,9.991883394383391,41.84302503052495,2.5622496947496947,92.0,0.0,137077.74999999974,1.0,0.0
Charleston/Tottenville,1520,29.433947368421062,90.90380921052638,1.9243157894736844,103.0,0.0,138173.7900000001,1.0,0.0
East Concourse/Concourse Village,35916,9.26870781824256,19.71248468649072,0.925197961911126,130074.91,0.0,707993.6000000007,1.0,0.0
Pelham Parkway,9879,5.780610385666563,25.234409353173405,1.1854590545601782,104.15,0.0,249290.73000000007,1.0,0.0


In [0]:
from pyspark.sql.functions import col, when, count, avg, sum as _sum

# Створення колонки is_high_fare
trips_df = trips_df.withColumn("is_high_fare", when(col("fare_amount") > 30, 1).otherwise(0))

# Агрегація по pickup_day_of_week
agg_by_day_df = trips_df.groupBy("pickup_day_of_week").agg(
    count("*").alias("total_trips_per_day"),
    avg("duration_min").alias("avg_duration_per_zone"),
    (_sum("is_high_fare") / count("*")).alias("high_fare_share")
)


In [0]:
agg_by_day_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("igor_zabelkin_nyc_catalog.trips_schema.zone_days_summary")


In [0]:
%sql
SELECT * 
FROM igor_zabelkin_nyc_catalog.trips_schema.zone_days_summary
LIMIT 10;

pickup_day_of_week,total_trips_per_day,avg_duration_per_zone,high_fare_share
Sun,96476303,17.16877265273479,0.077867815892572
Mon,94955153,17.59578114926912,0.0803072267178591
Thu,110419470,19.159769097183226,0.0760153349767029
Sat,110640675,16.984093358221674,0.0564432203617702
Wed,108052890,24.94693185161461,0.0712480526897522
Fri,112035832,18.19801555556513,0.0740196672078982
Tue,103241476,17.69097570098703,0.0706559638880017
