# Tarea 1 — PySpark (Big Data)

## 1) Entorno de ejecución
Se ejecuta Apache Spark mediante **PySpark** en un entorno local (SparkSession) o en un servidor en línea (p. ej., Google Colab).

---

## 2) Conjunto de datos elegido

**Dataset:** NYC TLC **Yellow Taxi Trip Records** (enero, febrero y marzo 2025), en formato **Parquet**.  
Fuente oficial: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

**Descripción:** Registros de viajes de taxi amarillo con variables como:
- Fecha/hora de pickup y dropoff
- Zonas de pickup/dropoff
- Distancia del viaje
- Pasajeros
- Montos y desglose de cobro (fare, tip, tolls, total, etc.)

**Justificación de elección:**
- Es un dataset real y de gran volumen, ideal para practicar Spark.
- Está en **Parquet**, formato eficiente y estándar en entornos Big Data.
- Permite aplicar filtrado, estadística descriptiva y operaciones aritméticas entre columnas.

---

## 3) Objetivos con PySpark
1. Cargar los archivos Parquet (sin subirlos al repositorio).
2. Filtrar registros relevantes.
3. Generar estadísticas descriptivas básicas.
4. Realizar operaciones aritméticas entre columnas (ej. porcentajes, totales, comparaciones).


In [6]:
import os, sys
from pyspark.sql import SparkSession

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("PYSPARK_SEGURO")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.extraJavaOptions", "-Djava.net.preferIPv4Stack=true")
    .config("spark.executor.extraJavaOptions", "-Djava.net.preferIPv4Stack=true")
    # para ver el error real si vuelve a crashear
    .config("spark.python.worker.faulthandler.enabled", "true")
    .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true")
    # a veces ayuda en Windows
    .config("spark.python.worker.reuse", "false")
    .getOrCreate()
)

print("Spark listo ✅")
print("Python:", sys.executable)


Spark listo ✅
Python: C:\Users\Oscar Ferreira\venv_pyspark\Scripts\python.exe


## 4) Carga del dataset (Yellow Taxi 2025 - Q1)

Se cargan los archivos Parquet correspondientes a enero, febrero y marzo 2025
utilizando PySpark. No es necesario subir los archivos al repositorio.



In [12]:
# ===============================
# 4) CARGA DE LOS ARCHIVOS PARQUET
# ===============================

ruta_base = r"C:\Users\Oscar Ferreira\OneDrive - AUTO LINEAS AMERICA SA DE CV\Escritorio\MCD\6 - DATOS MASIVOS"

df = spark.read.parquet(
    f"{ruta_base}/yellow_tripdata_2025-01.parquet",
    f"{ruta_base}/yellow_tripdata_2025-02.parquet",
    f"{ruta_base}/yellow_tripdata_2025-03.parquet"
)

print("DataFrame cargado correctamente ✅")



DataFrame cargado correctamente ✅


## 5) Exploración inicial del dataset

A continuación se realiza una exploración preliminar del conjunto de datos
para comprender su estructura, volumen y comportamiento general.


In [29]:
from pyspark.sql import functions as F
import pandas as pd

total_df = df.select(F.count("*").alias("Total de Registros"))

# Convertir a Pandas
total_pd = total_df.toPandas()

# Aplicar formato con separador de miles
total_pd["Total de Registros"] = total_pd["Total de Registros"].map("{:,}".format)

display(total_pd)

estructura = spark.createDataFrame(
    [(col, str(dtype)) for col, dtype in df.dtypes],
    ["Columna", "Tipo de Dato"]
)

display(estructura.toPandas())

display(df.limit(5).toPandas())



# Obtener estadísticas descriptivas
desc_pd = df.describe().toPandas()

# Identificar columnas numéricas automáticamente
cols_numericas = desc_pd.columns.drop("summary")

# Convertir a numérico y redondear
desc_pd[cols_numericas] = (
    desc_pd[cols_numericas]
    .apply(pd.to_numeric, errors="coerce").round(4)
)



# Mostrar tabla formateada
display(desc_pd.style.format("{:,.2f}", subset=desc_pd.columns[1:])
)



Unnamed: 0,Total de Registros
0,11198026


Unnamed: 0,Columna,Tipo de Dato
0,VendorID,int
1,tpep_pickup_datetime,timestamp_ntz
2,tpep_dropoff_datetime,timestamp_ntz
3,passenger_count,bigint
4,trip_distance,double
5,RatecodeID,bigint
6,store_and_fwd_flag,string
7,PULocationID,int
8,DOLocationID,int
9,payment_type,bigint


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-03-01 00:17:16,2025-03-01 00:25:52,1,0.9,1,N,140,236,1,7.9,3.5,0.5,2.6,0.0,1.0,15.5,2.5,0.0,0.0
1,1,2025-03-01 00:37:38,2025-03-01 00:43:51,1,0.6,1,N,140,262,1,6.5,3.5,0.5,2.3,0.0,1.0,13.8,2.5,0.0,0.0
2,2,2025-03-01 00:24:35,2025-03-01 00:39:49,1,1.94,1,N,161,68,1,14.9,1.0,0.5,5.16,0.0,1.0,25.81,2.5,0.0,0.75
3,2,2025-03-01 00:56:16,2025-03-01 01:01:35,2,0.95,1,N,231,13,1,7.2,1.0,0.5,2.59,0.0,1.0,15.54,2.5,0.0,0.75
4,1,2025-03-01 00:01:44,2025-03-01 00:10:00,1,1.5,1,N,163,236,1,8.6,4.25,0.5,2.85,0.0,1.0,17.2,2.5,0.0,0.75


Unnamed: 0,summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,count,11198026.0,8934277.0,11198026.0,8934277.0,8934277.0,11198026.0,11198026.0,11198026.0,11198026.0,11198026.0,11198026.0,11198026.0,11198026.0,11198026.0,11198026.0,8934277.0,8934277.0,11198026.0
1,mean,1.8,1.29,6.18,2.45,,163.31,162.53,0.98,17.24,1.26,0.48,2.85,0.44,0.96,25.67,2.23,0.13,0.52
2,stddev,0.48,0.74,581.82,11.51,,65.41,69.83,0.72,261.99,1.85,0.13,3.77,2.01,0.27,262.31,0.91,0.48,0.36
3,min,1.0,0.0,0.0,1.0,,1.0,1.0,0.0,-1807.6,-9.25,-0.5,-220.0,-142.17,-1.0,-1832.85,-2.5,-1.75,-0.75
4,max,7.0,9.0,320136.29,99.0,,265.0,265.0,5.0,863372.12,22.55,10.5,440.0,916.87,1.0,863380.37,2.5,6.75,1.5


## 6) Filtrado de datos

En esta sección se aplican filtros al conjunto de datos con el objetivo
de trabajar únicamente con registros válidos y consistentes.

Se consideran viajes que cumplen con las siguientes condiciones:

- Distancia del viaje mayor a 0 millas.
- Monto de tarifa (fare_amount) mayor a 0.
- Monto total (total_amount) mayor a 0.

Este procedimiento reduce ruido en el análisis y previene problemas
matemáticos en cálculos posteriores (por ejemplo, divisiones entre cero).


In [28]:


df_filtrado = (
    df
    .filter(F.col("trip_distance") > 0)
    .filter(F.col("fare_amount") > 0)
    .filter(F.col("total_amount") > 0)
)

# Conteo
conteo_pd = (
    df_filtrado
    .select(F.count("*").alias("Registros válidos"))
    .toPandas()
)

# Formato con separador de miles
conteo_pd["Registros válidos"] = conteo_pd["Registros válidos"].map("{:,}".format)

display(conteo_pd)


Unnamed: 0,Registros válidos
0,10413258


## 7) Estadística descriptiva básica

Se calculan métricas descriptivas sobre variables clave del dataset
para comprender el comportamiento general de los viajes.

Las métricas consideradas incluyen:

- Promedio
- Valor mínimo
- Valor máximo
- Desviación estándar

Estas estadísticas permiten analizar la magnitud típica de las distancias,
los montos cobrados y la variabilidad de los viajes registrados.


In [26]:
estadisticas = df_filtrado.select(
    F.avg("trip_distance").alias("Distancia Promedio"),
    F.min("trip_distance").alias("Distancia Mínima"),
    F.max("trip_distance").alias("Distancia Máxima"),
    F.avg("total_amount").alias("Monto Promedio"),
    F.max("total_amount").alias("Monto Máximo")
)

display(estadisticas.toPandas())


Unnamed: 0,Distancia Promedio,Distancia Mínima,Distancia Máxima,Monto Promedio,Monto Máximo
0,5.83,0.01,281085.57,27.26,863380.37


## 8) Operaciones aritméticas entre columnas

Se construyen nuevas variables derivadas a partir de operaciones
aritméticas entre columnas existentes.

Se generan los siguientes indicadores:

- Porcentaje de propina (tip_pct):
  tip_amount / total_amount

- Costo por milla (cost_per_mile):
  total_amount / trip_distance

Estas transformaciones permiten analizar la relación entre el monto total
del viaje, la distancia recorrida y el comportamiento de las propinas.


In [27]:
df_calculado = (
    df_filtrado
    .withColumn(
        "tip_pct",
        F.when(
            F.col("total_amount") > 0,
            F.col("tip_amount") / F.col("total_amount")
        )
    )
    .withColumn(
        "cost_per_mile",
        F.when(
            F.col("trip_distance") > 0,
            F.col("total_amount") / F.col("trip_distance")
        )
    )
)

display(
    df_calculado
    .select("tip_amount","total_amount","tip_pct",
            "trip_distance","cost_per_mile")
    .limit(10)
    .toPandas()
)


Unnamed: 0,tip_amount,total_amount,tip_pct,trip_distance,cost_per_mile
0,2.6,15.5,0.17,0.9,17.22
1,2.3,13.8,0.17,0.6,23.0
2,5.16,25.81,0.2,1.94,13.3
3,2.59,15.54,0.17,0.95,16.36
4,2.85,17.2,0.17,1.5,11.47
5,2.0,20.8,0.1,2.0,10.4
6,4.55,27.3,0.17,3.27,8.35
7,2.0,16.35,0.12,0.95,17.21
8,3.85,23.1,0.17,2.09,11.05
9,3.57,21.42,0.17,1.43,14.98
