# NYC Yellow Cab – PySpark Analysis Notebook

Este cuaderno descarga datos públicos de Yellow Taxi del TLC de NYC (formato Parquet), los carga con **PySpark** y realiza un análisis exploratorio básico.

**Qué incluye:**
- Instalación (opcional) de PySpark en caso de que el entorno no lo tenga.
- Descarga parametrizable de archivos mensuales (`yellow_tripdata_YYYY-MM.parquet`) desde el CDN oficial del TLC.
- Carga a `Spark DataFrame`, revisión de esquema y limpieza básica.
- Métricas rápidas (viajes, distancia, importe), histogramas y patrones por hora/día.
- *Join* con el catálogo de zonas de taxi para nombres de pickups/dropoffs.
- Ejemplos de consultas prácticas.
  
> **Nota:** Las celdas que descargan datos usan `requests`. Si tu entorno no tiene acceso a internet, primero descarga los archivos y ajústales la ruta local.


In [1]:
# (Opcional) Instala PySpark si no está disponible.
# Si ya tienes PySpark, puedes saltarte esta celda.
try:
    import pyspark  # noqa: F401
    print("PySpark ya está disponible.")
except ModuleNotFoundError:
    %pip install -q pyspark==3.5.1 findspark==2.0.1
    import importlib, sys
    importlib.invalidate_caches()
    print("PySpark instalado.")

Note: you may need to restart the kernel to use updated packages.
PySpark instalado.


In [2]:
# Importaciones básicas y sesión de Spark
import os, sys, math, textwrap, json, gzip, io, time, pathlib
from datetime import datetime
from typing import List

import findspark
findspark.init()

from pyspark.sql import SparkSession, functions as F, types as T, Window

spark = (SparkSession.builder
         .appName("NYC Yellow Cab EDA")
         .config("spark.sql.shuffle.partitions", "200")
         .getOrCreate())

spark

25/10/14 17:45:15 WARN Utils: Your hostname, dalia resolves to a loopback address: 127.0.1.1; using 192.168.1.83 instead (on interface wlp2s0)
25/10/14 17:45:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/14 17:45:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Parámetros

In [3]:
# Puedes ajustar el año/mes aquí. Por defecto tomamos varios meses recientes.
# El TLC publica archivos en: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_YYYY-MM.parquet
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"
DATA_DIR = "./data/nyc_taxi"  # carpeta local donde guardaremos los archivos
ZONE_LOOKUP_URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"

# Elige meses a bajar (YYYY-MM). Puedes poner solo uno, p.ej. ["2025-06"]
MONTHS = ["2025-06", "2025-05", "2025-04"]

os.makedirs(DATA_DIR, exist_ok=True)
print("DATA_DIR:", os.path.abspath(DATA_DIR))
MONTHS

DATA_DIR: /home/dalia/Downloads/data/nyc_taxi


['2025-06', '2025-05', '2025-04']

## Descarga de archivos (Parquet)

In [None]:
import requests

def download_if_not_exists(url: str, dest_path: str, chunk_size: int = 2**20):
    p = pathlib.Path(dest_path)
    if p.exists() and p.stat().st_size > 0:
        print(f"Ya existe: {p.name} ({p.stat().st_size/1e6:.1f} MB)")
        return dest_path
    print(f"Descargando: {url}")
    with requests.get(url, stream=True, timeout=60) as r:
        r.raise_for_status()
        with open(dest_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    print(f"Guardado en: {dest_path}")
    return dest_path

# Descarga de los meses seleccionados
local_parquets = []
for ym in MONTHS:
    url = f"{BASE_URL}/yellow_tripdata_{ym}.parquet"
    local_path = os.path.join(DATA_DIR, f"yellow_tripdata_{ym}.parquet")
    try:
        download_if_not_exists(url, local_path)
        local_parquets.append(local_path)
    except Exception as e:
        print(f"Error descargando {ym}: {e}")

print("Archivos locales:", local_parquets)

Descargando: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-06.parquet


## Cargar datos a Spark

In [None]:
# Cargamos todos los Parquet disponibles
if not local_parquets:
    raise FileNotFoundError("No hay archivos Parquet descargados. Ajusta MONTHS o descarga manualmente.")

df = spark.read.parquet(*local_parquets)

# Normalizamos nombres de columnas a lower_snake_case por conveniencia
for c in df.columns:
    df = df.withColumnRenamed(c, c.lower())

print("Número de filas:", df.count())
df.printSchema()

## Limpieza y filtros básicos

In [None]:
# Filtramos registros con valores imposibles o fuera de rango.
# Puedes ajustar umbrales según tus necesidades.
df_clean = (
    df
    .filter(F.col("passenger_count").isNotNull())
    .filter((F.col("passenger_count") >= 0) & (F.col("passenger_count") <= 6))
    .filter(F.col("trip_distance").isNotNull() & (F.col("trip_distance") >= 0) & (F.col("trip_distance") <= 100))
    .filter(F.col("fare_amount").isNotNull() & (F.col("fare_amount") >= 0) & (F.col("fare_amount") <= 1000))
    .filter(F.col("total_amount").isNotNull() & (F.col("total_amount") >= -50) & (F.col("total_amount") <= 1500))
    .filter(F.col("tpep_pickup_datetime").isNotNull() & F.col("tpep_dropoff_datetime").isNotNull())
    .withColumn("trip_minutes", (F.col("tpep_dropoff_datetime").cast("timestamp").cast("long") - F.col("tpep_pickup_datetime").cast("timestamp").cast("long"))/60.0)
    .filter((F.col("trip_minutes") > 0) & (F.col("trip_minutes") <= 360))  # hasta 6 horas
    .withColumn("pickup_date", F.to_date("tpep_pickup_datetime"))
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
    .withColumn("pickup_dow", F.date_format("pickup_date", "E"))
)

df_clean.cache()
print("Filas después de limpieza:", df_clean.count())
df_clean.limit(5).toPandas()

## KPIs rápidos

In [None]:
summary = df_clean.agg(
    F.count("*").alias("trips"),
    F.mean("trip_distance").alias("avg_distance_mi"),
    F.expr("percentile_approx(trip_distance, 0.5)").alias("median_distance_mi"),
    F.mean("total_amount").alias("avg_total_amount"),
    F.expr("percentile_approx(total_amount, 0.5)").alias("median_total_amount"),
    F.mean("trip_minutes").alias("avg_trip_minutes")
).toPandas()

summary

## Histograma de `trip_distance`

In [None]:
# Para graficar, convertimos una muestra a Pandas
import matplotlib.pyplot as plt

sample = df_clean.select("trip_distance").sample(False, 0.02, seed=42).toPandas()
plt.figure()
plt.hist(sample["trip_distance"].dropna(), bins=50)
plt.title("Distribución de distancia del viaje (millas)")
plt.xlabel("trip_distance (mi)")
plt.ylabel("frecuencia")
plt.show()

## Viajes por hora del día

In [None]:
by_hour = (df_clean.groupBy("pickup_hour")
           .agg(F.count("*").alias("trips"))
           .orderBy("pickup_hour"))

pdf = by_hour.toPandas()
display(pdf)

plt.figure()
plt.plot(pdf["pickup_hour"], pdf["trips"], marker="o")
plt.title("Viajes por hora de recogida")
plt.xlabel("Hora (0-23)")
plt.ylabel("Viajes")
plt.xticks(range(0,24,1))
plt.show()

## Distribución por `payment_type`

In [None]:
payment_map = {0: "Unknown", 1: "Credit card", 2: "Cash", 3: "No charge", 4: "Dispute", 5: "Unknown", 6: "Voided"}
df_pay = (df_clean.groupBy("payment_type").agg(F.count("*").alias("trips"))).withColumn(
    "payment_desc",
    F.when(F.col("payment_type").isNull(), F.lit("Null"))
     .otherwise(F.create_map([F.lit(x) for x in sum(payment_map.items(), ())])[F.col("payment_type")])
)

pdf = df_pay.orderBy(F.desc("trips")).toPandas()
display(pdf)

## Join con catálogo de zonas

In [None]:
# Descargamos el lookup de zonas de taxi (CSV) y lo cargamos con Spark
zone_csv_local = os.path.join(DATA_DIR, "taxi_zone_lookup.csv")
try:
    download_if_not_exists(ZONE_LOOKUP_URL, zone_csv_local)
except Exception as e:
    print("No se pudo descargar el catálogo de zonas. Puedes bajarlo manualmente:", e)

zones = (spark.read
         .option("header", True)
         .csv(zone_csv_local))

zones = zones.select(
    F.col("LocationID").cast("int").alias("LocationID"),
    F.col("Borough").alias("borough"),
    F.col("Zone").alias("zone"),
    F.col("service_zone").alias("service_zone"),
)

dfz = (df_clean
       .join(zones.withColumnRenamed("LocationID", "PULocationID"), on="PULocationID", how="left")
       .withColumnRenamed("borough", "pu_borough")
       .withColumnRenamed("zone", "pu_zone")
       .withColumnRenamed("service_zone", "pu_service_zone")
       .join(zones.withColumnRenamed("LocationID", "DOLocationID"), on="DOLocationID", how="left")
       .withColumnRenamed("borough", "do_borough")
       .withColumnRenamed("zone", "do_zone")
       .withColumnRenamed("service_zone", "do_service_zone")
)

dfz.select("pu_borough","pu_zone","do_borough","do_zone","trip_distance","total_amount").limit(5).toPandas()

## Zonas con más pickups

In [None]:
top_pu = (dfz.groupBy("pu_borough","pu_zone")
           .agg(F.count("*").alias("trips"))
           .orderBy(F.desc("trips"))
           .limit(20))

top_pu.toPandas()

## Velocidad promedio por zona (millas/hora)

In [None]:
# Estimación simple de velocidad = distancia (mi) / tiempo (h), filtrando outliers
df_speed = (dfz
            .filter((F.col("trip_distance") > 0) & (F.col("trip_minutes") > 0))
            .withColumn("mph", F.col("trip_distance") / (F.col("trip_minutes")/60.0))
            .filter((F.col("mph") > 1) & (F.col("mph") < 60)))

avg_speed = (df_speed.groupBy("pu_borough","pu_zone")
             .agg(F.mean("mph").alias("avg_mph"),
                  F.count("*").alias("trips"))
             .filter(F.col("trips") >= 100)   # mínimo de observaciones
             .orderBy(F.desc("avg_mph")))

avg_speed.limit(20).toPandas()

## Detección simple de anomalías en tarifas

In [None]:
# Regla simple: outliers por total_amount usando IQR
q = df_clean.approxQuantile("total_amount", [0.25, 0.75], 0.01)
q1, q3 = q
iqr = q3 - q1
low, high = (q1 - 1.5*iqr), (q3 + 1.5*iqr)

anoms = (df_clean
         .filter((F.col("total_amount") < low) | (F.col("total_amount") > high))
         .select("tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count",
                 "trip_distance", "fare_amount", "total_amount", "PULocationID", "DOLocationID")
         .orderBy(F.desc("total_amount"))
         .limit(50))

anoms.toPandas()

## Guardar muestra limpia a Parquet (opcional)

In [None]:
# Guarda un subconjunto para compartir o pruebas rápidas
output_dir = os.path.join(DATA_DIR, "clean_sample_parquet")
(df_clean
 .limit(200000)  # ajusta el tamaño según tu equipo
 .coalesce(1)
 .write
 .mode("overwrite")
 .parquet(output_dir))

print("Guardado en:", os.path.abspath(output_dir))

---

### Siguientes pasos
- Agregar mapas (unir con shapefiles/GeoJSON y usar `geopandas` fuera de Spark).
- Construir *dashboards* (p. ej. con **Streamlit**) o pipelines (con **Airflow**).
- Entrenar modelos (tiempo estimado, predicción de propina) usando **Spark MLlib**.

¿Necesitas una versión con **Delta Lake** o **AWS S3/EMR/Spark on K8s**? Puedo preparar otra variante.
