In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark import SparkConf, SparkContext
import os
from dotenv import load_dotenv

In [2]:
conf = SparkConf()
conf.set("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.24.2,net.snowflake:spark-snowflake_2.12:3.1.2")
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

In [3]:
load_dotenv(dotenv_path="/home/jovyan/work/.env")

sfOptions = {
    "sfURL": os.getenv("URL"),
    "sfDatabase": os.getenv("DB"),
    "sfSchema": "ANALYTICS",
    "sfWarehouse": os.getenv("WAREHOUSE"),
    "sfRole": os.getenv("ROLE"),
    "sfUser": os.getenv("USER"),
    "sfPassword": os.getenv("PASSWORD")
}

In [4]:
df = spark.read.format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "ANALYTICS.OBT_TRIPS") \
    .load()

print("tabla cargada")

tabla cargada


a) Top 10 zonas de pickup por volumen mensual

In [28]:
top_pickup = (
    df.groupBy("MONTH", "PU_ZONE")
      .agg(F.count("*").alias("num_trips"))
      .orderBy( "MONTH", F.col("num_trips").desc())
)
top_pickup.show()

+-----+--------------------+---------+
|MONTH|             PU_ZONE|num_trips|
+-----+--------------------+---------+
|    1|Upper East Side S...|  3047066|
|    1|Upper East Side N...|  2925423|
|    1|      Midtown Center|  2890009|
|    1|        Midtown East|  2481739|
|    1|Times Sq/Theatre ...|  2478174|
|    1|Penn Station/Madi...|  2440131|
|    1|            Union Sq|  2269092|
|    1|         Murray Hill|  2241200|
|    1| Lincoln Square East|  2170625|
|    1|        Clinton East|  2165022|
|    1|        East Village|  2071936|
|    1|         JFK Airport|  2055335|
|    1|       Midtown North|  1948385|
|    1|Upper West Side S...|  1925936|
|    1|     Lenox Hill West|  1794045|
|    1|        East Chelsea|  1746989|
|    1|            Gramercy|  1722188|
|    1|   LaGuardia Airport|  1708302|
|    1|       Midtown South|  1660350|
|    1|        West Village|  1544942|
+-----+--------------------+---------+
only showing top 20 rows



b) Top 10 zonas de dropoff por volumen mensual.

In [29]:
top_dropoff = (
    df.groupBy("MONTH", "DO_ZONE")
      .agg(F.count("*").alias("num_trips"))
      .orderBy( "MONTH", F.col("num_trips").desc())
)
top_dropoff.show()

+-----+--------------------+---------+
|MONTH|             DO_ZONE|num_trips|
+-----+--------------------+---------+
|    1|Upper East Side N...|  3076863|
|    1|Upper East Side S...|  2738783|
|    1|      Midtown Center|  2714918|
|    1|         Murray Hill|  2272433|
|    1|Times Sq/Theatre ...|  2194933|
|    1|        Midtown East|  2146806|
|    1| Lincoln Square East|  1992453|
|    1|            Union Sq|  1975954|
|    1|        Clinton East|  1946386|
|    1|Upper West Side S...|  1921607|
|    1|     Lenox Hill West|  1876772|
|    1|Penn Station/Madi...|  1839065|
|    1|        East Village|  1784186|
|    1|       Midtown North|  1731205|
|    1|        East Chelsea|  1678802|
|    1|Upper West Side N...|  1643838|
|    1|            Gramercy|  1565393|
|    1|      Yorkville West|  1499737|
|    1|       Midtown South|  1494509|
|    1|     Lenox Hill East|  1444425|
+-----+--------------------+---------+
only showing top 20 rows



c) Evolución mensual de total_amount y tip_pct por borough.

In [8]:
evolucion_borough = (
    df.groupBy("MONTH", "PU_BOROUGH")
      .agg(
          F.round(F.avg("TOTAL_AMOUNT"), 2).alias("avg_total_amount"),
          F.round(F.avg("TIP_PCT"), 2).alias("avg_tip_pct")
      )
      .orderBy("MONTH")
)
evolucion_borough.show()

+-----+-------------+----------------+-----------+
|MONTH|   PU_BOROUGH|avg_total_amount|avg_tip_pct|
+-----+-------------+----------------+-----------+
|    1|Staten Island|           49.68|       4.12|
|    1|    Manhattan|           15.48|      10.63|
|    1|      Unknown|           18.17|      10.47|
|    1|       Queens|           39.81|       8.62|
|    1|          N/A|           61.26|       6.17|
|    1|     Brooklyn|           17.58|       8.19|
|    1|        Bronx|           20.19|       2.12|
|    1|          EWR|           90.86|      10.59|
|    2|    Manhattan|           15.73|      10.72|
|    2|      Unknown|           17.93|      10.55|
|    2|       Queens|           38.01|       8.58|
|    2|     Brooklyn|           17.66|       8.37|
|    2|Staten Island|           49.09|       4.41|
|    2|        Bronx|           19.09|       1.81|
|    2|          EWR|           89.77|       10.5|
|    2|          N/A|           58.63|        5.7|
|    3|Staten Island|          

d) Ticket promedio (avg total_amount) por service_type y mes.

In [9]:
ticket_service = (
    df.groupBy("MONTH", "SERVICE_TYPE")
      .agg(F.round(F.avg("TOTAL_AMOUNT"), 2).alias("avg_total_amount"))
      .orderBy("MONTH")
)
ticket_service.show()

+-----+------------+----------------+
|MONTH|SERVICE_TYPE|avg_total_amount|
+-----+------------+----------------+
|    1|       green|           15.37|
|    1|      yellow|           17.82|
|    2|       green|           15.29|
|    2|      yellow|           17.81|
|    3|       green|           15.32|
|    3|      yellow|           18.45|
|    4|       green|           15.56|
|    4|      yellow|           18.65|
|    5|       green|           15.93|
|    5|      yellow|           19.26|
|    6|       green|            15.9|
|    6|      yellow|           19.29|
|    7|       green|           15.86|
|    7|      yellow|           19.01|
|    8|       green|           16.07|
|    8|      yellow|           18.99|
|    9|       green|           16.29|
|    9|      yellow|           19.04|
|   10|       green|           16.12|
|   10|      yellow|           18.95|
+-----+------------+----------------+
only showing top 20 rows



e) Viajes por hora del día y día de semana (picos).

In [10]:
trips_by_hour_day = (
    df.groupBy("PICKUP_HOUR", "DAY_OF_WEEK")
      .agg(F.count("*").alias("num_trips"))
      .orderBy("DAY_OF_WEEK", "PICKUP_HOUR")
)
trips_by_hour_day.show()

+-----------+-----------+---------+
|PICKUP_HOUR|DAY_OF_WEEK|num_trips|
+-----------+-----------+---------+
|          0|          0|  1813408|
|          1|          0|  1147755|
|          2|          0|  1332276|
|          3|          0|  1950314|
|          4|          0|  2884855|
|          5|          0|  4118295|
|          6|          0|  5178440|
|          7|          0|  5875104|
|          8|          0|  6252902|
|          9|          0|  6316318|
|         10|          0|  6375938|
|         11|          0|  6206169|
|         12|          0|  6189142|
|         13|          0|  6327145|
|         14|          0|  6112753|
|         15|          0|  5545338|
|         16|          0|  5096868|
|         17|          0|  4772658|
|         18|          0|  4023361|
|         19|          0|  3064250|
+-----------+-----------+---------+
only showing top 20 rows



f) p50/p90 de trip_duration_min por borough de pickup.

In [11]:
duration_pct = (
    df.groupBy("PU_BOROUGH")
      .agg(
          F.percentile_approx("TRIP_DURATION_MIN", 0.5).alias("p50_duration"),
          F.percentile_approx("TRIP_DURATION_MIN", 0.9).alias("p90_duration")
      )
)
duration_pct.show()

+-------------+------------+------------+
|   PU_BOROUGH|p50_duration|p90_duration|
+-------------+------------+------------+
|       Queens|        24.0|        54.0|
|          EWR|         0.0|         2.0|
|      Unknown|        10.0|        28.0|
|     Brooklyn|        13.0|        33.0|
|Staten Island|        23.0|        71.0|
|          N/A|         1.0|        60.0|
|    Manhattan|        11.0|        25.0|
|        Bronx|        13.0|        38.0|
+-------------+------------+------------+



g) `avg_speed_mph` por franja horaria (6–9, 17–20) y borough.

In [12]:
df_speed = df.withColumn("time_slot", 
    F.when((F.col("PICKUP_HOUR").between(6,9)), "morning")
   .when((F.col("PICKUP_HOUR").between(17,20)), "evening")
   .otherwise("other")
)

avg_speed_slot = (
    df_speed.groupBy("PU_BOROUGH", "time_slot")
            .agg(F.round(F.avg("AVG_SPEED_MPH"),2).alias("avg_speed_mph"))
)
avg_speed_slot.show()

+-------------+---------+-------------+
|   PU_BOROUGH|time_slot|avg_speed_mph|
+-------------+---------+-------------+
|          N/A|    other|         47.5|
|       Queens|  morning|        31.65|
|    Manhattan|    other|        23.15|
|     Brooklyn|    other|        34.37|
|        Bronx|  evening|        82.72|
|        Bronx|    other|         93.8|
|       Queens|  evening|        29.13|
|    Manhattan|  evening|        23.14|
|Staten Island|  evening|      1201.89|
|      Unknown|  morning|        28.11|
|     Brooklyn|  evening|        28.45|
|     Brooklyn|  morning|        35.23|
|       Queens|    other|        37.22|
|          EWR|    other|        69.88|
|      Unknown|    other|        23.65|
|    Manhattan|  morning|        14.65|
|Staten Island|    other|        59.72|
|          N/A|  morning|        403.5|
|      Unknown|  evening|        69.38|
|        Bronx|  morning|        93.62|
+-------------+---------+-------------+
only showing top 20 rows



h) Participación por `payment_type_desc` y su relación con `tip_pct`.

In [14]:
payment_tip = (
    df.groupBy("PAYMENT_TYPE_DESC")
      .agg(
          F.count("*").alias("num_trips"),
          F.round(F.avg("TIP_PCT"),2).alias("avg_tip_pct")
      )
      .orderBy(F.col("num_trips").desc())
)
payment_tip.show()

+-----------------+---------+-----------+
|PAYMENT_TYPE_DESC|num_trips|avg_tip_pct|
+-----------------+---------+-----------+
|      Credit card|572759671|      15.07|
|             Cash|255987421|        0.0|
|  Flex Fare trip | 17444119|      15.77|
|        No charge|  4183884|       0.02|
|          Dispute|  3616567|       0.03|
|    Not specified|  1913192|       3.12|
|          Unknown|     3045|        0.1|
+-----------------+---------+-----------+



i) ¿Qué `rate_code_desc` concentran mayor `trip_distance` y `total_amount`?

In [15]:
rate_codes = (
    df.groupBy("RATE_CODE_DESC")
      .agg(
          F.round(F.sum("TRIP_DISTANCE"),2).alias("total_distance"),
          F.round(F.sum("TOTAL_AMOUNT"),2).alias("total_amount"),
          F.count("*").alias("num_trips")
      )
      .orderBy(F.col("total_distance").desc())
)
rate_codes.show(10)

+--------------------+---------------+-----------------+---------+
|      RATE_CODE_DESC| total_distance|     total_amount|num_trips|
+--------------------+---------------+-----------------+---------+
|       Standard rate|3.50396455861E9|1.335229084586E10|807771673|
|             Unknown| 8.0639682204E8|    5.605215275E8| 20644902|
|                 JFK|   4.30163568E8|  1.38767537936E9| 19754845|
|     Negotiated fare|  5.114722168E7|   2.9153840659E8|  5259618|
|              Newark|  2.851988972E7|   1.6022611566E8|  1752357|
|Nassau or Westche...|  2.096426269E7|     6.90564696E7|   717169|
|          Group ride|        8929.84|        185685.24|     7335|
+--------------------+---------------+-----------------+---------+



j) Mix yellow vs green por mes y borough.

In [16]:
mix_service = (
    df.groupBy("MONTH", "PU_BOROUGH", "SERVICE_TYPE")
      .agg(F.count("*").alias("num_trips"))
      .orderBy("MONTH", "PU_BOROUGH")
)
mix_service.show()

+-----+-------------+------------+---------+
|MONTH|   PU_BOROUGH|SERVICE_TYPE|num_trips|
+-----+-------------+------------+---------+
|    1|        Bronx|      yellow|   122246|
|    1|        Bronx|       green|   342339|
|    1|     Brooklyn|       green|  2157929|
|    1|     Brooklyn|      yellow|   994453|
|    1|          EWR|       green|      109|
|    1|          EWR|      yellow|     5614|
|    1|    Manhattan|       green|  1948766|
|    1|    Manhattan|      yellow| 65761203|
|    1|          N/A|      yellow|    53418|
|    1|          N/A|       green|     3129|
|    1|       Queens|       green|  1711803|
|    1|       Queens|      yellow|  4630697|
|    1|Staten Island|      yellow|     3074|
|    1|Staten Island|       green|     1436|
|    1|      Unknown|      yellow|  1028484|
|    1|      Unknown|       green|     9652|
|    2|        Bronx|       green|   377194|
|    2|        Bronx|      yellow|   113415|
|    2|     Brooklyn|       green|  2130266|
|    2|   

k) Top 20 flujos PU→DO por volumen y su ticket promedio.

In [17]:
flows = (
    df.groupBy("PU_ZONE", "DO_ZONE")
      .agg(F.count("*").alias("num_trips"),
           F.round(F.avg("TOTAL_AMOUNT"),2).alias("avg_ticket"))
      .orderBy(F.col("num_trips").desc())
)
flows.show(20)

+--------------------+--------------------+---------+----------+
|             PU_ZONE|             DO_ZONE|num_trips|avg_ticket|
+--------------------+--------------------+---------+----------+
|                 N/A|                 N/A|  7690526|     17.71|
|Upper East Side S...|Upper East Side N...|  4503615|     10.33|
|Upper East Side N...|Upper East Side S...|  3848736|     11.24|
|Upper East Side N...|Upper East Side N...|  3579641|      8.55|
|Upper East Side S...|Upper East Side S...|  3426337|      9.09|
|Upper West Side S...|Upper West Side N...|  2013131|       8.9|
|Upper West Side S...| Lincoln Square East|  1996940|      9.44|
|Upper East Side S...|      Midtown Center|  1931239|     12.12|
|Upper East Side S...|        Midtown East|  1924965|      10.7|
| Lincoln Square East|Upper West Side S...|  1905808|       9.9|
|      Midtown Center|Upper East Side S...|  1847546|     11.74|
|Upper West Side N...|Upper West Side S...|  1738209|      8.87|
|     Lenox Hill West|Upp

l) Distribución de `passenger_count` y efecto en `total_amount`.

In [18]:
passenger_effect = (
    df.groupBy("PASSENGER_COUNT")
      .agg(
          F.count("*").alias("num_trips"),
          F.round(F.avg("TOTAL_AMOUNT"),2).alias("avg_total_amount")
      )
      .orderBy("PASSENGER_COUNT")
)
passenger_effect.show()

+---------------+---------+----------------+
|PASSENGER_COUNT|num_trips|avg_total_amount|
+---------------+---------+----------------+
|           NULL| 19357311|           26.41|
|              0|  5884595|           19.76|
|              1|610196659|            18.1|
|              2|117860353|           19.49|
|              3| 32659828|           18.96|
|              4| 15819035|           19.84|
|              5| 33521501|           17.04|
|              6| 20598754|           16.87|
|              7|     3856|           46.66|
|              8|     3961|           48.64|
|              9|     2039|           61.21|
|             32|        1|           60.35|
|             48|        1|            40.3|
|             96|        2|           12.59|
|            112|        1|           14.76|
|            192|        2|            9.15|
+---------------+---------+----------------+



m) Impacto de `tolls_amount` y `congestion_surcharge` por zona.

In [19]:
fees_effect = (
    df.groupBy("PU_ZONE")
      .agg(
          F.round(F.avg("TOLLS_AMOUNT"),2).alias("avg_tolls"),
          F.round(F.avg("CONGESTION_SURCHARGE"),2).alias("avg_congestion"),
          F.round(F.avg("TOTAL_AMOUNT"),2).alias("avg_total_amount")
      )
      .orderBy(F.col("avg_congestion").desc())
)
fees_effect.show()

+--------------------+---------+--------------+----------------+
|             PU_ZONE|avg_tolls|avg_congestion|avg_total_amount|
+--------------------+---------+--------------+----------------+
|Upper East Side S...|     0.07|          2.43|           14.15|
|      Yorkville East|     0.19|          2.43|           15.81|
|Upper East Side N...|      0.1|          2.43|           14.64|
|Sutton Place/Turt...|     0.12|          2.43|           15.17|
|Greenwich Village...|     0.06|          2.43|           16.22|
| Lincoln Square East|     0.11|          2.43|           15.47|
|        West Village|     0.09|          2.43|           16.23|
|     Lenox Hill West|     0.09|          2.43|           14.53|
|Upper West Side S...|     0.13|          2.43|            15.2|
|                SoHo|     0.08|          2.42|           16.85|
|     Lenox Hill East|     0.14|          2.42|           16.31|
| Lincoln Square West|     0.12|          2.42|           15.11|
|        East Village|   

n) Proporción de viajes cortos vs largos por borough y estacionalidad.

In [21]:
df_length = df.withColumn(
    "trip_type", 
    F.when((F.col("TRIP_DISTANCE") > 0) & (F.col("TRIP_DISTANCE") <= 5), "short")
     .when(F.col("TRIP_DISTANCE") > 5, "long")
     .otherwise("other")  # Optionally add an "other" category if needed
)
trip_type_ratio = (
    df_length.groupBy("PU_BOROUGH", "MONTH", "trip_type")
             .agg(F.count("*").alias("num_trips"))
)
trip_type_ratio.show()

+-------------+-----+---------+---------+
|   PU_BOROUGH|MONTH|trip_type|num_trips|
+-------------+-----+---------+---------+
|    Manhattan|    3|     long|  6483951|
|       Queens|   11|    short|  1792807|
|       Queens|    3|    other|   156192|
|       Queens|    1|    short|  2188277|
|      Unknown|    3|     long|   127841|
|       Queens|   10|    other|   157068|
|        Bronx|    3|     long|   146163|
|        Bronx|    7|     long|   124325|
|       Queens|    7|    other|   154906|
|     Brooklyn|    2|    other|    57099|
|Staten Island|    3|     long|     2435|
|          EWR|    4|    other|     4335|
|        Bronx|    1|     long|   130460|
|          N/A|    7|    short|    16935|
|      Unknown|    4|    short|   678388|
|        Bronx|    4|    short|   330101|
|          N/A|    2|    other|    17887|
|       Queens|    2|     long|  3479328|
|          N/A|    1|     long|    16471|
|        Bronx|    5|     long|   140553|
+-------------+-----+---------+---

o) Diferencias por vendor en `avg_speed_mph` y `trip_duration_min`.

In [22]:
vendor_stats = (
    df.groupBy("VENDOR_NAME")
      .agg(
          F.round(F.avg("AVG_SPEED_MPH"),2).alias("avg_speed"),
          F.round(F.avg("TRIP_DURATION_MIN"),2).alias("avg_duration")
      )
)
vendor_stats.show()

+--------------------+---------+------------+
|         VENDOR_NAME|avg_speed|avg_duration|
+--------------------+---------+------------+
|  Curb Mobility, LLC|     17.2|       21.28|
|Creative Mobile T...|    34.16|       15.90|
|Myle Technologies...|    10.39|       33.20|
|       Not specified|    10.64|       14.18|
|               Helix|     NULL|        0.00|
+--------------------+---------+------------+



p) Relación método de pago ↔ `tip_amount` por hora.

In [23]:
tip_hour = (
    df.groupBy("PAYMENT_TYPE_DESC", "PICKUP_HOUR")
      .agg(F.round(F.avg("TIP_AMOUNT"),2).alias("avg_tip"))
      .orderBy("PICKUP_HOUR")
)
tip_hour.show()

+-----------------+-----------+-------+
|PAYMENT_TYPE_DESC|PICKUP_HOUR|avg_tip|
+-----------------+-----------+-------+
|      Credit card|          0|   3.51|
|          Dispute|          0|   0.04|
|             Cash|          0|    0.0|
|        No charge|          0|    0.0|
|          Unknown|          0|    0.0|
|    Not specified|          0|   0.88|
|  Flex Fare trip |          0|   1.49|
|          Unknown|          1|    0.0|
|          Dispute|          1|   0.03|
|      Credit card|          1|   3.32|
|        No charge|          1|    0.0|
|             Cash|          1|    0.0|
|    Not specified|          1|   0.73|
|  Flex Fare trip |          1|   1.52|
|        No charge|          2|   0.04|
|          Dispute|          2|   0.03|
|          Unknown|          2|    0.0|
|             Cash|          2|    0.0|
|      Credit card|          2|   2.83|
|    Not specified|          2|   0.74|
+-----------------+-----------+-------+
only showing top 20 rows



q) Zonas con percentil 99 de duración/distancia fuera de rango (posible congestión/eventos).

In [5]:

df_valid = df.filter(
    (F.col("TRIP_DISTANCE") > 0) &
    (F.col("TRIP_DURATION_MIN") > 0)
)

df_ratio = df_valid.withColumn(
    "duration_per_mile", 
    F.col("TRIP_DURATION_MIN") / F.col("TRIP_DISTANCE")
)

p99_duration_per_mile = df_ratio.approxQuantile("duration_per_mile", [0.99], 0.01)[0]

print(f"Percentil 99 de duración por milla: {p99_duration_per_mile:.2f} min/milla")

outliers_congestion = df_ratio.filter(
    F.col("duration_per_mile") >= p99_duration_per_mile
)

# VALORES IMPOSIBLES
outliers_physic = df_ratio.filter(
    (F.col("TRIP_DISTANCE") > 100) |
    (F.col("TRIP_DURATION_MIN") > 300)
)

outliers_total = outliers_congestion.union(outliers_physic)

zonas_congestion = (
    outliers_total.groupBy("PU_ZONE")
        .agg(F.count("*").alias("num_outlier_trips"))
        .orderBy(F.col("num_outlier_trips").desc())
)

zonas_congestion.limit(20).show(truncate=False)



Percentil 99 de duración por milla: 125373065.00 min/milla
+----------------------------+-----------------+
|PU_ZONE                     |num_outlier_trips|
+----------------------------+-----------------+
|JFK Airport                 |63577            |
|Times Sq/Theatre District   |47126            |
|Midtown Center              |47009            |
|LaGuardia Airport           |45371            |
|Penn Station/Madison Sq West|43110            |
|East Village                |39681            |
|Upper East Side South       |39421            |
|Clinton East                |39059            |
|Midtown East                |37224            |
|Upper East Side North       |35822            |
|Union Sq                    |35313            |
|Murray Hill                 |34483            |
|Midtown North               |33156            |
|Midtown South               |31283            |
|Lincoln Square East         |31122            |
|East Chelsea                |30654            |
|Upper Wes

r) Yield por milla (`total_amount`/`trip_distance`) por borough y hora.

In [5]:
# VALORES RAZONABLES
df_filtered = df.filter((F.col("TRIP_DISTANCE") > 0) & (F.col("TRIP_DISTANCE") < 1000))

df_yield = df_filtered.withColumn("yield", F.col("TOTAL_AMOUNT") / F.col("TRIP_DISTANCE"))


yield_stats = (
    df_yield.groupBy("PU_BOROUGH", "PICKUP_HOUR")
            .agg(F.round(F.avg("yield"), 2).alias("avg_yield"))

)

yield_stats.show()

+-------------+-----------+---------+
|   PU_BOROUGH|PICKUP_HOUR|avg_yield|
+-------------+-----------+---------+
|          N/A|          7|   101.34|
|       Queens|         14|    12.22|
|    Manhattan|         15|    10.98|
|     Brooklyn|         21|     8.41|
|          N/A|         21|    227.4|
|       Queens|          2|    13.19|
|          N/A|          9|    80.88|
|      Unknown|         15|    23.69|
|     Brooklyn|         13|    14.53|
|       Queens|          4|    10.68|
|    Manhattan|         17|     9.27|
|    Manhattan|          8|    10.65|
|       Queens|          8|    11.97|
|Staten Island|         21|     53.2|
|       Queens|          6|    10.08|
|       Queens|          7|    11.33|
|          N/A|         15|   195.35|
|          N/A|         11|    92.31|
|     Brooklyn|         18|     8.56|
|    Manhattan|         19|     9.13|
+-------------+-----------+---------+
only showing top 20 rows



s) Cambios YoY en volumen y ticket promedio por service_type.

In [6]:
yoy_service = (
    df.groupBy("SERVICE_TYPE", "YEAR")
      .agg(
          F.count("*").alias("num_trips"),
          F.round(F.avg("TOTAL_AMOUNT"),2).alias("avg_ticket")
      )
      .orderBy("SERVICE_TYPE", "YEAR")
)
yoy_service.show()


+------------+----+---------+----------+
|SERVICE_TYPE|YEAR|num_trips|avg_ticket|
+------------+----+---------+----------+
|       green|2008|      364|     12.93|
|       green|2009|       65|     29.81|
|       green|2010|      348|     17.95|
|       green|2012|        3|      9.79|
|       green|2014|    30689|     15.69|
|       green|2015| 19233058|     14.84|
|       green|2016| 16374092|     14.64|
|       green|2017| 11728076|     14.24|
|       green|2018|  8894836|      16.1|
|       green|2019|  6299334|     18.33|
|       green|2020|  1730597|     20.17|
|       green|2021|  1068840|     23.93|
|       green|2022|   840466|     19.33|
|       green|2023|   787021|     23.86|
|       green|2024|   660172|     24.27|
|       green|2025|   397630|     24.85|
|       green|2030|        2|      6.15|
|       green|2035|        1|       0.0|
|       green|2041|        1|       0.0|
|       green|2062|        1|       3.6|
+------------+----+---------+----------+
only showing top

t) Días con alta congestion_surcharge: efecto en total_amount vs días “normales”

In [7]:
df_cong = df.withColumn("high_congestion", F.when(F.col("CONGESTION_SURCHARGE")>2, "high").otherwise("normal"))

congestion_effect = (
    df_cong.groupBy("PU_BOROUGH", "high_congestion")
            .agg(
                F.count("*").alias("num_trips"),
                F.round(F.avg("TOTAL_AMOUNT"),2).alias("avg_total_amount")
            )
)
congestion_effect.show()


+-------------+---------------+---------+----------------+
|   PU_BOROUGH|high_congestion|num_trips|avg_total_amount|
+-------------+---------------+---------+----------------+
|     Brooklyn|         normal| 34604064|           18.11|
|      Unknown|           high|  1253807|           25.84|
|     Brooklyn|           high|   802673|           31.04|
|Staten Island|         normal|    53221|           51.35|
|       Queens|           high| 11079699|           70.07|
|          EWR|         normal|    74410|           92.13|
|    Manhattan|           high|234042485|           19.55|
|        Bronx|           high|    33318|           44.29|
|       Queens|         normal| 61420560|           34.96|
|    Manhattan|         normal|498016059|           14.69|
|      Unknown|         normal|  8545317|           18.34|
|          N/A|         normal|   708068|           66.05|
|          N/A|           high|     8137|           55.88|
|        Bronx|         normal|  5265008|           20.3