# NYC Taxi ETL – Dataproc / PySpark

Este notebook ejecuta **solo la parte ETL** del proyecto:

1. Lee el CSV crudo desde Google Cloud Storage (GCS).
2. Aplica limpieza y transformaciones usando funciones de `src/features/`.
3. Escribe:
   - capa **curated** en formato Parquet.
   - agregados de viajes por hora en otra ruta de GCS.

> Se debe de de haber creado los módulos en `src/` (`gcs/paths.py`, `utils/spark_builder.py`, `features/transformations.py`, `pipeline/etl_writer.py`) antes de ejecutar este notebook.


## 1. Configuración de entorno y paths de proyecto

In [1]:
import os, sys


project_root = "/home/barcenasvac/nyc-taxi-etl-pyspark"
src_path = os.path.join(project_root, "src")

if src_path not in sys.path:
    sys.path.append(src_path)

print("Project root:", project_root)
print("SRC path    :", src_path)

Project root: /home/barcenasvac/nyc-taxi-etl-pyspark
SRC path    : /home/barcenasvac/nyc-taxi-etl-pyspark/src


## 2. Importar utilidades y rutas

In [2]:
# Importar funciones del pipeline
from pipeline.main_etl import read_raw_data
from features.transformations import clean_and_transform
from pipeline.etl_writer import write_curated, write_aggregates

from utils.spark_builder import build_spark_session
from gcs.paths import GCS_RAW_PATH, GCS_CURATED_PATH, GCS_AGG_TRIPS_BY_HOUR
from features.transformations import clean_and_transform
from pipeline.etl_writer import write_curated, write_aggregates

print("RAW     :", GCS_RAW_PATH)
print("CURATED :", GCS_CURATED_PATH)
print("AGG HOUR:", GCS_AGG_TRIPS_BY_HOUR)

PROJECT ROOT: /home/barcenasvac/nyc-taxi-etl-pyspark
SRC PATH    : /home/barcenasvac/nyc-taxi-etl-pyspark/src
RAW     : gs://nyc-taxi-etl/raw/nyc_taxi/yellow_tripdata_2015-01.csv
CURATED : gs://nyc-taxi-etl/curated/nyc_taxi/yellow_2015_01
AGG HOUR: gs://nyc-taxi-etl/agg/nyc_taxi/trips_by_hour_2015_01


## 3. Crear sesión de Spark

In [3]:
spark = build_spark_session("NYC Taxi – ETL Notebook")
spark.version

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/19 18:51:52 INFO SparkEnv: Registering MapOutputTracker
25/11/19 18:51:52 INFO SparkEnv: Registering BlockManagerMaster
25/11/19 18:51:52 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/19 18:51:52 INFO SparkEnv: Registering OutputCommitCoordinator


'3.5.3'

## 4. Lectura de datos crudos desde GCS

In [4]:
df_raw = (
    spark.read
         .option("header", "true")
         .option("inferSchema", "true")
         .csv(GCS_RAW_PATH)
)

print("Muestra de datos crudos:")
df_raw.show(5, truncate=False)

print("Esquema crudo:")
df_raw.printSchema()

                                                                                

Muestra de datos crudos:
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude  |pickup_latitude   |RateCodeID|store_and_fwd_flag|dropoff_longitude |dropoff_latitude  |payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|2       |2015-01-15 19:05:39 |2015-01-15 19:23:42  |1              |1.59         |-73.993896484375  |40.750110

## 5. Limpieza y transformaciones

In [5]:
df_clean = clean_and_transform(df_raw)

print("Muestra de datos limpios:")
df_clean.show(5, truncate=False)

print("Esquema después de limpieza:")
df_clean.printSchema()

Muestra de datos limpios:


                                                                                

+------------+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+-----------+-----------+----------+------------------+------------+
|payment_type|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude  |pickup_latitude   |RateCodeID|store_and_fwd_flag|dropoff_longitude |dropoff_latitude  |fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|trip_duration_min |pickup_date|pickup_hour|pickup_dow|avg_speed_kmh     |payment_desc|
+------------+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+-----------+-----+-------+----------+------------+

## 6. Escritura de capa curated y agregados por hora

In [6]:
from time import perf_counter

print("Registros después de limpieza:", df_clean.count())

t0 = perf_counter()
write_curated(df_clean, GCS_CURATED_PATH)
t_curated = perf_counter() - t0

t0 = perf_counter()

write_aggregates(df_clean, GCS_AGG_TRIPS_BY_HOUR, sample_fraction=0.05)
t_agg = perf_counter() - t0

print(f"Tiempo escritura curated : {t_curated:.2f} s")
print(f"Tiempo escritura agg hour: {t_agg:.2f} s")


                                                                                

Registros después de limpieza: 12380927
Escribiendo datos curated en: gs://nyc-taxi-etl/curated/nyc_taxi/yellow_2015_01


                                                                                

Generando agregados a partir de una muestra del 5.0% de los datos...
Ejemplo de trips_by_hour:


                                                                                

+-----------+-----------+-----------+------------------+------------------+------------------+
|pickup_date|pickup_hour|total_trips|   avg_distance_mi|  avg_total_amount|  avg_duration_min|
+-----------+-----------+-----------+------------------+------------------+------------------+
| 2015-01-01|          0|       1297|3.0157517347725524|15.306545875096369|13.610382934978155|
| 2015-01-01|          1|       1491|3.1860831656606305|16.488631790744464|14.717762128325509|
| 2015-01-01|          2|       1437| 3.244947807933195|16.128371607515646|  13.7315356065878|
| 2015-01-01|          3|       1157|3.3942005185825406|15.999490060501284|12.956107749927972|
| 2015-01-01|          4|        832|3.6012139423076923| 16.05341346153845| 12.39014423076923|
| 2015-01-01|          5|        382|  4.21065445026178|17.716020942408377|12.493760907504363|
| 2015-01-01|          6|        296| 3.974831081081082|16.512364864864868|11.865202702702703|
| 2015-01-01|          7|        242|3.92954545454

                                                                                

Tiempo escritura curated : 73.65 s
Tiempo escritura agg hour: 54.19 s


## 7. Verificación rápida de los datos escritos

In [7]:
print("Leyendo de nuevo la capa curated desde GCS…")
curated_df = spark.read.parquet(GCS_CURATED_PATH)
curated_df.show(5, truncate=False)

print("Leyendo agregados por hora desde GCS…")
agg_df = spark.read.parquet(GCS_AGG_TRIPS_BY_HOUR)
agg_df.orderBy('pickup_date', 'pickup_hour').show(10, truncate=False)

Leyendo de nuevo la capa curated desde GCS…


                                                                                

+------------+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+-----------------+-----------+----------+------------------+------------+-----------+
|payment_type|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude  |pickup_latitude   |RateCodeID|store_and_fwd_flag|dropoff_longitude |dropoff_latitude  |fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|trip_duration_min|pickup_hour|pickup_dow|avg_speed_kmh     |payment_desc|pickup_date|
+------------+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+-----------+-----+-------+----------+------------+--

[Stage 24:>                                                         (0 + 4) / 4]

+-----------+-----------+------------------+------------------+------------------+-----------+
|pickup_hour|total_trips|avg_distance_mi   |avg_total_amount  |avg_duration_min  |pickup_date|
+-----------+-----------+------------------+------------------+------------------+-----------+
|0          |1297       |3.0157517347725524|15.306545875096369|13.610382934978155|2015-01-01 |
|1          |1491       |3.1860831656606305|16.488631790744464|14.717762128325509|2015-01-01 |
|2          |1437       |3.244947807933195 |16.128371607515646|13.7315356065878  |2015-01-01 |
|3          |1157       |3.3942005185825406|15.999490060501284|12.956107749927972|2015-01-01 |
|4          |832        |3.6012139423076923|16.05341346153845 |12.39014423076923 |2015-01-01 |
|5          |382        |4.21065445026178  |17.716020942408377|12.493760907504363|2015-01-01 |
|6          |296        |3.974831081081082 |16.512364864864868|11.865202702702703|2015-01-01 |
|7          |242        |3.9295454545454547|16.752

                                                                                

In [8]:
import time

# Medición de tiempos
overall_start = time.perf_counter()

# ------------------------- #
# 1. Lectura RAW
# ------------------------- #
t0 = time.perf_counter()
df_raw = read_raw_data(spark)
read_time = time.perf_counter() - t0

# ------------------------- #
# 2. Clean & Transform
# ------------------------- #
t0 = time.perf_counter()
df_clean = clean_and_transform(df_raw)
clean_time = time.perf_counter() - t0

# ------------------------- #
# 3. Escribir curated & agg
# ------------------------- #
t0 = time.perf_counter()
df_clean = df_clean.cache()
print("Número de registros después de limpieza:", df_clean.count())

write_curated(df_clean, GCS_CURATED_PATH)
write_aggregates(df_clean, GCS_AGG_TRIPS_BY_HOUR)
write_time = time.perf_counter() - t0

overall_time = time.perf_counter() - overall_start

print("=" * 80)
print("RESUMEN ETL")
print(f"  Lectura CSV           : {read_time:.2f} s")
print(f"  Limpieza/transform    : {clean_time:.2f} s")
print(f"  Escritura Parquet     : {write_time:.2f} s")
print(f"  Tiempo total pipeline : {overall_time:.2f} s")
print("=" * 80)


Leyendo datos crudos desde: gs://nyc-taxi-etl/raw/nyc_taxi/yellow_tripdata_2015-01.csv


                                                                                

Ejemplo de datos crudos:
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude  |pickup_latitude   |RateCodeID|store_and_fwd_flag|dropoff_longitude |dropoff_latitude  |payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|2       |2015-01-15 19:05:39 |2015-01-15 19:23:42  |1              |1.59         |-73.993896484375  |40.750110

                                                                                

Número de registros después de limpieza: 12380927
Escribiendo datos curated en: gs://nyc-taxi-etl/curated/nyc_taxi/yellow_2015_01


                                                                                

Generando agregados a partir de una muestra del 5.0% de los datos...
Ejemplo de trips_by_hour:


                                                                                

+-----------+-----------+-----------+------------------+------------------+------------------+
|pickup_date|pickup_hour|total_trips|   avg_distance_mi|  avg_total_amount|  avg_duration_min|
+-----------+-----------+-----------+------------------+------------------+------------------+
| 2015-01-01|          0|       1297|3.0157517347725524|15.306545875096369|13.610382934978155|
| 2015-01-01|          1|       1491|3.1860831656606305|16.488631790744464|14.717762128325509|
| 2015-01-01|          2|       1437| 3.244947807933195|16.128371607515646|  13.7315356065878|
| 2015-01-01|          3|       1157|3.3942005185825406|15.999490060501284|12.956107749927972|
| 2015-01-01|          4|        832|3.6012139423076923| 16.05341346153845| 12.39014423076923|
| 2015-01-01|          5|        382|  4.21065445026178|17.716020942408377|12.493760907504363|
| 2015-01-01|          6|        296| 3.974831081081082|16.512364864864868|11.865202702702703|
| 2015-01-01|          7|        242|3.92954545454

                                                                                

RESUMEN ETL
  Lectura CSV           : 25.94 s
  Limpieza/transform    : 0.15 s
  Escritura Parquet     : 98.70 s
  Tiempo total pipeline : 124.79 s
