In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NY Taxi Analysis") \
    .getOrCreate()

spark

Скачиваем и читаем данные

In [2]:
import urllib.request

base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data"
months = ["01", "02", "03", "04", "05", "06"]

for m in months:
    url = f"{base_url}/yellow_tripdata_2025-{m}.parquet"
    fname = f"yellow_tripdata_2025-{m}.parquet"
    print("Скачиваю", url)
    urllib.request.urlretrieve(url, fname)

print("Готово!")

Скачиваю https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
Скачиваю https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-02.parquet
Скачиваю https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-03.parquet
Скачиваю https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-04.parquet
Скачиваю https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-05.parquet
Скачиваю https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-06.parquet
Готово!


In [3]:
df = spark.read.parquet("yellow_tripdata_2025-*.parquet")

df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+------

Чистим данные по условиям задания

In [4]:
start = "2025-01-01"
end   = "2025-06-30 23:59:59"

Фильтруем

In [13]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [12]:
df_clean = (
    df
    # Время посадки
    .filter((F.col("tpep_pickup_datetime") >= start) &
            (F.col("tpep_pickup_datetime") <= end))
    # Время высадки
    .filter((F.col("tpep_dropoff_datetime") >= start) &
            (F.col("tpep_dropoff_datetime") <= end))
    # Дистанция > 0
    .filter(F.col("trip_distance") > 0)
    # Пассажиров > 0
    .filter(F.col("passenger_count") > 0)
)

df_clean.count(), df.count()

(18285526, 24083384)

Добавляем колонку с датой и часами посадки/высадки

In [14]:
df_clean = (
    df_clean
    .withColumn("pickup_hour",  F.hour("tpep_pickup_datetime"))
    .withColumn("dropoff_hour", F.hour("tpep_dropoff_datetime"))
    .withColumn("pickup_date",  F.to_date("tpep_pickup_datetime"))
)

In [15]:
df_clean.select("tpep_pickup_datetime", "pickup_date", "pickup_hour").show(5)

+--------------------+-----------+-----------+
|tpep_pickup_datetime|pickup_date|pickup_hour|
+--------------------+-----------+-----------+
| 2025-01-01 00:18:38| 2025-01-01|          0|
| 2025-01-01 00:32:40| 2025-01-01|          0|
| 2025-01-01 00:44:04| 2025-01-01|          0|
| 2025-01-01 00:14:27| 2025-01-01|          0|
| 2025-01-01 00:21:34| 2025-01-01|          0|
+--------------------+-----------+-----------+
only showing top 5 rows



Оставляем только нужные колонки

In [16]:
df_sel = df_clean.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "total_amount",
    "pickup_hour",
    "dropoff_hour",
    "pickup_date"
)

df_sel.show(5)

+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+------------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|total_amount|pickup_hour|dropoff_hour|pickup_date|
+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+------------+-----------+
| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         229|         237|        18.0|          0|           0| 2025-01-01|
| 2025-01-01 00:32:40|  2025-01-01 00:35:13|              1|          0.5|         236|         237|       12.12|          0|           0| 2025-01-01|
| 2025-01-01 00:44:04|  2025-01-01 00:46:01|              1|          0.6|         141|         141|        12.1|          0|           0| 2025-01-01|
| 2025-01-01 00:14:27|  2025-01-01 00:20:01|              3|         0.52|         244|       

In [17]:
zones = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)
zones.show(5)
zones.printSchema()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



Присоединяем названия зон вместо ID

In [18]:
zones_pickup = zones.select(
    F.col("LocationID").alias("PULocationID_zone"),
    F.col("Zone").alias("pickup_zone")
)

df_joined = df_sel.join(
    zones_pickup,
    df_sel.PULocationID == zones_pickup.PULocationID_zone,
    "left"
).drop("PULocationID_zone")

Теперь добавим зону высадки

In [19]:
zones_dropoff = zones.select(
    F.col("LocationID").alias("DOLocationID_zone"),
    F.col("Zone").alias("dropoff_zone")
)

df_joined = df_joined.join(
    zones_dropoff,
    df_joined.DOLocationID == zones_dropoff.DOLocationID_zone,
    "left"
).drop("DOLocationID_zone")

df_joined.select(
    "PULocationID", "pickup_zone",
    "DOLocationID", "dropoff_zone"
).show(5, truncate=False)

+------------+-----------------------------+------------+------------------------+
|PULocationID|pickup_zone                  |DOLocationID|dropoff_zone            |
+------------+-----------------------------+------------+------------------------+
|229         |Sutton Place/Turtle Bay North|237         |Upper East Side South   |
|236         |Upper East Side North        |237         |Upper East Side South   |
|141         |Lenox Hill West              |141         |Lenox Hill West         |
|244         |Washington Heights South     |244         |Washington Heights South|
|244         |Washington Heights South     |116         |Hamilton Heights        |
+------------+-----------------------------+------------+------------------------+
only showing top 5 rows



Часовая агрегация количества заказов по зоне посадки

In [20]:
hourly = (
    df_joined
    .groupBy("pickup_zone", "pickup_date", "pickup_hour")
    .agg(F.count("*").alias("rides"))
)

hourly.show(10)

+--------------------+-----------+-----------+-----+
|         pickup_zone|pickup_date|pickup_hour|rides|
+--------------------+-----------+-----------+-----+
|     Lower East Side| 2025-01-01|          0|  133|
|        Clinton West| 2025-01-01|          3|   38|
|   Battery Park City| 2025-01-01|          4|    1|
|Times Sq/Theatre ...| 2025-01-01|          6|   41|
|Downtown Brooklyn...| 2025-01-01|          6|    2|
|          Whitestone| 2025-01-01|          6|    1|
|Upper East Side N...| 2025-01-01|          9|   38|
|Long Island City/...| 2025-01-01|         10|    1|
|  Stuyvesant Heights| 2025-01-01|         14|    1|
|              Hollis| 2025-01-01|         15|    1|
+--------------------+-----------+-----------+-----+
only showing top 10 rows



Среднее почасовое количество заказов по зонам + pivot

In [21]:
pivoted = (
    hourly
    .groupBy("pickup_zone")
    .pivot("pickup_hour", list(range(24)))
    .avg("rides")
    .orderBy("pickup_zone")
)

pivoted.show(5)

+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|         pickup_zone|                 0|                 1|                 2|                 3|                 4|                 5|                 6|                 7|                 8|                 9|                10|                11|                12|                13|                14|                15|                16|                17|                18|                19|                20|                21|                22|                23|
+--------------------+------------------+-

Сохраняем итоговый набор в parquet

In [22]:
pivoted.write.mode("overwrite").parquet("nyc_taxi_zone_hourly_avg_2025H1.parquet")