In [1]:
import os, subprocess
import pyspark
print("PySpark version:", pyspark.__version__)

PySpark version: 4.0.1


In [2]:
from pyspark.sql import SparkSession

java_home = subprocess.check_output(["/usr/libexec/java_home", "-v", "17"]).decode().strip()
os.environ["JAVA_HOME"] = java_home
os.environ["PATH"] = java_home + "/bin:" + os.environ["PATH"]

spark = (
    SparkSession.builder
    .appName("Lakehouse-NYC-Taxi")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.sql.ansi.enabled", "false")
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")
    .getOrCreate()
)

print("ANSI mode:", spark.conf.get("spark.sql.ansi.enabled"))
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/01 22:16:18 WARN Utils: Your hostname, Amaias-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.34 instead (on interface en0)
25/12/01 22:16:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/01 22:16:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/01 22:16:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


ANSI mode: false


In [3]:
BASE = "/Users/amaiamartingrande/Desktop/WORKSPACE/arquitectura de datos/trabajo_final"

raw_path    = f"{BASE}/data/raw/yellow_tripdata_2025-01.parquet"
bronze_path = f"{BASE}/data/bronze/yellow_tripdata_2025-01"
silver_path = f"{BASE}/data/silver/yellow_tripdata_2025-01"
gold_base   = f"{BASE}/data/gold"

raw_path

'/Users/amaiamartingrande/Desktop/WORKSPACE/arquitectura de datos/trabajo_final/data/raw/yellow_tripdata_2025-01.parquet'

In [4]:
df_raw = spark.read.parquet(raw_path)

df_raw.printSchema()
print("Filas RAW:", df_raw.count())
df_raw.show(5)

                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

Filas RAW: 3475226
+--------+--------------------+---------------------+---

In [5]:
(df_raw.write
    .mode("overwrite")
    .parquet(bronze_path)
)

print("BRONZE guardado en:", bronze_path)



BRONZE guardado en: /Users/amaiamartingrande/Desktop/WORKSPACE/arquitectura de datos/trabajo_final/data/bronze/yellow_tripdata_2025-01


                                                                                

In [6]:
df_bronze = spark.read.parquet(bronze_path)
print("Filas BRONZE:", df_bronze.count())
df_bronze.printSchema()

Filas BRONZE: 3475226
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



empezar a hacer procesamiento de datos
nulos, outlierts, etc

In [7]:
from pyspark.sql.functions import col, when

df_silver = df_bronze

# 1) Quitar nulos en timestamps
df_silver = df_silver.filter(
    col("tpep_pickup_datetime").isNotNull() &
    col("tpep_dropoff_datetime").isNotNull()
)

# 2) Quitar viajes con dropoff anterior al pickup
df_silver = df_silver.filter(
    col("tpep_dropoff_datetime") >= col("tpep_pickup_datetime")
)

# 3) Trip distance válida
df_silver = df_silver.filter(col("trip_distance") > 0)

# 4) Importes válidos
df_silver = df_silver.filter(col("fare_amount") >= 0)
df_silver = df_silver.filter(col("total_amount") >= 0)

# 5) passenger_count nulo → 1
df_silver = df_silver.withColumn(
    "passenger_count",
    when(col("passenger_count").isNull(), 1).otherwise(col("passenger_count"))
)

# 6) Eliminar duplicados exactos
df_silver = df_silver.dropDuplicates()

print("Filas SILVER:", df_silver.count())
df_silver.show(5)

                                                                                

Filas SILVER: 3254338




+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-01-12 18:19:28|  2025-01-12 18:27:12|              3|         1.14|         1|                 N|         144|    

                                                                                

OPTIMIZACION + UDF

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# 7) UDF para clasificar los viajes por distancia
def distance_bucket(d):
    if d is None:
        return "unknown"
    if d < 1:
        return "short"
    elif d < 5:
        return "medium"
    else:
        return "long"

bucket_udf = udf(distance_bucket, StringType())

df_silver = df_silver.withColumn(
    "distance_type",
    bucket_udf(col("trip_distance"))
)

# 8) Optimización: repartition + cache
df_silver = df_silver.repartition(8).cache()
print("Filas SILVER (cacheadas):", df_silver.count())

df_silver.select("trip_distance", "distance_type").show(5)

[Stage 25:>                                                         (0 + 8) / 8]

Filas SILVER (cacheadas): 3254338
+-------------+-------------+
|trip_distance|distance_type|
+-------------+-------------+
|          2.8|       medium|
|          1.0|       medium|
|         1.52|       medium|
|         0.79|        short|
|          1.7|       medium|
+-------------+-------------+
only showing top 5 rows


                                                                                

In [9]:
from pyspark.sql.functions import col

# Dataset para MLlib: predecir si la propina es alta (>20% de la tarifa)
df_ml = (df_silver
    .filter(col("fare_amount") > 0)  # evitar divisiones raras
    .withColumn(
        "high_tip",
        (col("tip_amount") > col("fare_amount") * 0.2).cast("int")
    )
    .select("trip_distance", "fare_amount", "passenger_count", "high_tip")
)

df_ml.show(5)

+-------------+-----------+---------------+--------+
|trip_distance|fare_amount|passenger_count|high_tip|
+-------------+-----------+---------------+--------+
|          2.8|       17.0|              1|       0|
|          1.0|       8.68|              1|       0|
|         1.52|        8.6|              1|       1|
|         0.79|        9.3|              1|       1|
|          1.7|       10.0|              1|       1|
+-------------+-----------+---------------+--------+
only showing top 5 rows


In [10]:
(df_silver.write
    .mode("overwrite")
    .parquet(silver_path)
)

print("SILVER guardado en:", silver_path)

25/11/29 12:27:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
[Stage 41:>                                                         (0 + 8) / 8]

SILVER guardado en: /Users/amaiamartingrande/Desktop/WORKSPACE/arquitectura de datos/trabajo_final/data/silver/yellow_tripdata_2025-01


                                                                                

In [11]:
# LOOKUP DE ZONAS NYC (PULocationID → Borough, Zone)
lookup_path = f"{BASE}/data/raw/taxi_zone_lookup.csv"

df_zones = (spark.read
    .option("header", True)
    .csv(lookup_path)
)

print("ZONES (lookup) ejemplo:")
df_zones.show(5)
df_zones.printSchema()

from pyspark.sql.functions import count, avg, desc

# JOIN SILVER + ZONAS
df_silver_zones = (
    df_silver.join(
        df_zones,
        df_silver.PULocationID == df_zones.LocationID,
        "left"
    )
    .drop("LocationID")  # para no duplicar la columna
)

print("Silver enriquecido con Borough/Zone:")
df_silver_zones.select("PULocationID", "Borough", "Zone").show(10)
print("Filas Silver + Zones:", df_silver_zones.count())

ZONES (lookup) ejemplo:
+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows
root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

Silver enriquecido con Borough/Zone:
+------------+---------+--------------------+
|PULocationID|  Borough|                Zone|
+------------+---------+--------------------+
|         229|Manhattan|Sutton Place/Turt...|
|         198| 

CELDA KPI 1: Viajes por hora

In [12]:
from pyspark.sql.functions import hour, count, avg, desc

kpi_hour = (df_silver
    .withColumn("pickup_hour", hour("tpep_pickup_datetime"))
    .groupBy("pickup_hour")
    .agg(count("*").alias("n_trips"))
    .orderBy("pickup_hour")
)

kpi_hour.show(24)

+-----------+-------+
|pickup_hour|n_trips|
+-----------+-------+
|          0|  86562|
|          1|  59525|
|          2|  40228|
|          3|  25574|
|          4|  17936|
|          5|  20529|
|          6|  46622|
|          7|  96808|
|          8| 133871|
|          9| 137280|
|         10| 143062|
|         11| 154420|
|         12| 168520|
|         13| 178559|
|         14| 193971|
|         15| 204555|
|         16| 203152|
|         17| 229403|
|         18| 236628|
|         19| 200682|
|         20| 184642|
|         21| 194224|
|         22| 170529|
|         23| 127056|
+-----------+-------+



                                                                                

CELDA KPI 2: Distancia y precio medio por hora

In [13]:
kpi_hour_stats = (df_silver
    .withColumn("pickup_hour", hour("tpep_pickup_datetime"))
    .groupBy("pickup_hour")
    .agg(
        count("*").alias("n_trips"),
        avg("trip_distance").alias("avg_distance"),
        avg("total_amount").alias("avg_total_amount")
    )
    .orderBy("pickup_hour")
)

kpi_hour_stats.show(24, truncate=False)

+-----------+-------+------------------+------------------+
|pickup_hour|n_trips|avg_distance      |avg_total_amount  |
+-----------+-------+------------------+------------------+
|0          |86562  |4.669431852313954 |27.278998983387652|
|1          |59525  |4.714422679546404 |25.150271986560252|
|2          |40228  |3.185260515064132 |24.077484836432333|
|3          |25574  |3.426994603894579 |24.991685305388277|
|4          |17936  |5.097031668153434 |30.727854036574517|
|5          |20529  |38.20205027034929 |34.759384772760534|
|6          |46622  |22.720364634721818|29.081421002960035|
|7          |96808  |7.653165337575419 |26.70427340715646 |
|8          |133871 |6.680931792546568 |25.249211479708023|
|9          |137280 |8.065031031468559 |25.36064073426577 |
|10         |143062 |5.094711313975777 |25.743043855111786|
|11         |154420 |2.8714472866209073|25.461045784224854|
|12         |168520 |2.970854023261337 |30.945607880370304|
|13         |178559 |3.536811866105881 |

CELDA KPI 3: Top zonas de pickup

In [14]:
kpi_pickup_zone = (df_silver
    .groupBy("PULocationID")
    .agg(count("*").alias("n_trips"))
    .orderBy(desc("n_trips"))
)

kpi_pickup_zone.show(10)

+------------+-------+
|PULocationID|n_trips|
+------------+-------+
|         161| 161432|
|         237| 158115|
|         236| 149858|
|         132| 134868|
|         230| 118291|
|         186| 114154|
|         162| 112650|
|         142| 106067|
|         239|  91921|
|         163|  91581|
+------------+-------+
only showing top 10 rows


CELDA KPI 4: Propina media por pasajeros

In [15]:
kpi_tip_passengers = (df_silver
    .groupBy("passenger_count")
    .agg(
        count("*").alias("n_trips"),
        avg("tip_amount").alias("avg_tip")
    )
    .orderBy("passenger_count")
)

kpi_tip_passengers.show(10)

+---------------+-------+------------------+
|passenger_count|n_trips|           avg_tip|
+---------------+-------+------------------+
|              0|  23654| 3.023835292128183|
|              1|2664538| 2.988434343965048|
|              2| 393964|3.8288335990090294|
|              3|  87833|3.6062190748351886|
|              4|  54794| 3.601218199072892|
|              5|  17625| 3.407278297872341|
|              6|  11920| 3.408447986577181|
|              7|      1|             16.75|
|              8|      6|11.198333333333332|
|              9|      3| 7.333333333333333|
+---------------+-------+------------------+



In [16]:
from pyspark.sql.functions import count, avg, desc

kpi_borough = (df_silver_zones
    .groupBy("Borough")
    .agg(count("*").alias("n_trips"))
    .orderBy(desc("n_trips"))
)

print("KPI: número de viajes por Borough")
kpi_borough.show()

KPI: número de viajes por Borough
+-------------+-------+
|      Borough|n_trips|
+-------------+-------+
|    Manhattan|2908190|
|       Queens| 269387|
|     Brooklyn|  55473|
|        Bronx|  12775|
|      Unknown|   7670|
|          N/A|    563|
|Staten Island|    203|
|          EWR|     77|
+-------------+-------+



In [17]:
kpi_borough_fare = (df_silver_zones
    .groupBy("Borough")
    .agg(
        avg("fare_amount").alias("avg_fare"),
        avg("tip_amount").alias("avg_tip"),
        avg("total_amount").alias("avg_total")
    )
    .orderBy(desc("avg_total"))
)

print("KPI: ticket medio por Borough")
kpi_borough_fare.show(truncate=False)

KPI: ticket medio por Borough


[Stage 95:>                                                         (0 + 8) / 8]

+-------------+------------------+-------------------+------------------+
|Borough      |avg_fare          |avg_tip            |avg_total         |
+-------------+------------------+-------------------+------------------+
|EWR          |72.09636363636363 |11.251948051948052 |89.39350649350648 |
|N/A          |68.24863232682061 |6.883801065719362  |80.33909413854352 |
|Queens       |54.127900789570596|8.13576494040174   |72.76658517299802 |
|Staten Island|27.33620689655172 |0.47467980295566503|39.89467980295566 |
|Bronx        |30.149356555772997|0.16815342465753425|33.53580039138951 |
|Brooklyn     |26.367242081733423|0.7064995583436984 |29.721310006669974|
|Unknown      |18.851577574967397|3.6471773142112114 |28.158147327249026|
|Manhattan    |14.671068778174082|2.7136404670947036 |22.75478285462694 |
+-------------+------------------+-------------------+------------------+



                                                                                

In [18]:
kpi_hour.write.mode("overwrite").parquet(f"{gold_base}/kpi_hour")
kpi_hour_stats.write.mode("overwrite").parquet(f"{gold_base}/kpi_hour_stats")
kpi_pickup_zone.write.mode("overwrite").parquet(f"{gold_base}/kpi_pickup_zone")
kpi_tip_passengers.write.mode("overwrite").parquet(f"{gold_base}/kpi_tip_passengers")
kpi_borough.write.mode("overwrite").parquet(f"{gold_base}/kpi_borough")
kpi_borough_fare.write.mode("overwrite").parquet(f"{gold_base}/kpi_borough_fare")

print("GOLD guardado en:", gold_base)

GOLD guardado en: /Users/amaiamartingrande/Desktop/WORKSPACE/arquitectura de datos/trabajo_final/data/gold


In [19]:
df_ml.show(5)

+-------------+-----------+---------------+--------+
|trip_distance|fare_amount|passenger_count|high_tip|
+-------------+-----------+---------------+--------+
|          2.8|       17.0|              1|       0|
|          1.0|       8.68|              1|       0|
|         1.52|        8.6|              1|       1|
|         0.79|        9.3|              1|       1|
|          1.7|       10.0|              1|       1|
+-------------+-----------+---------------+--------+
only showing top 5 rows


MLIB MODELO DE PROPINA ALTA

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 1) Ensamblar features
assembler = VectorAssembler(
    inputCols=["trip_distance", "fare_amount", "passenger_count"],
    outputCol="features"
)

df_features = assembler.transform(df_ml).select("features", "high_tip")

# 2) Train / Test split
train, test = df_features.randomSplit([0.8, 0.2], seed=42)

# 3) Modelo de regresión logística -- para clasificacion binaria y asi poder calcular las tips 
lr = LogisticRegression(featuresCol="features", labelCol="high_tip")
model_lr = lr.fit(train)

# 4) Predicciones
pred = model_lr.transform(test)

# 5) Evaluación (AUC ROC)
evaluator = BinaryClassificationEvaluator(
    labelCol="high_tip",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(pred)
print("AUC ROC:", auc)

# 6) Ver algunas filas
pred.select("features", "high_tip", "probability").show(10, truncate=False)

25/11/29 12:27:40 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

AUC ROC: 0.5785788578627196
+--------------+--------+----------------------------------------+
|features      |high_tip|probability                             |
+--------------+--------+----------------------------------------+
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
|[0.01,3.0,1.0]|0       |[0.39470174358240334,0.6052982564175966]|
+--------------+--------+----------------------------------------+
only showing top 10 rows


                                                                                

25/11/29 12:35:50 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 231921 ms exceeds timeout 120000 ms
25/11/29 12:35:50 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/29 12:35:51 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$