# PySpark MLlib Project - NYC Yellow Taxi Trip Analysis
## Diego Alfaro Pinto

### Objetivo del Proyecto
Este proyecto utiliza PySpark y MLlib para predecir la propina (tip_amount) que dar√°n los pasajeros de taxis en NYC bas√°ndose en caracter√≠sticas del viaje como distancia, tarifa, hora del d√≠a, etc.

---

## Descripci√≥n del Dataset

### Fuente y Contexto
El dataset proviene del programa **NYC Taxi and Limousine Commission (TLC)**, que recopila datos de todos los viajes realizados por taxis amarillos en la ciudad de Nueva York. Este es uno de los datasets m√°s utilizados para an√°lisis de Big Data debido a su:
- **Alto volumen de registros**: Millones de viajes mensuales
- **Riqueza de atributos**: Informaci√≥n temporal, geogr√°fica y financiera
- **Calidad de datos**: Recopilaci√≥n automatizada mediante tax√≠metros

### Link al dataset

https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data?select=yellow_tripdata_2016-01.csv

### Caracter√≠sticas del Dataset (Yellow Taxi Trip Data - Enero 2016)

| Aspecto | Descripci√≥n |
|---------|-------------|
| **Registros totales** | ~10.9 millones de viajes |
| **Periodo temporal** | Enero 2016 (1 mes completo) |
| **Tama√±o en disco** | ~1.5 GB (archivo CSV) |
| **N√∫mero de columnas** | 19 atributos |

### Variables del Dataset

| Variable | Tipo | Descripci√≥n |
|----------|------|-------------|
| `VendorID` | Categ√≥rica | Proveedor del tax√≠metro (1=CMT, 2=VeriFone) |
| `tpep_pickup_datetime` | Timestamp | Fecha y hora de inicio del viaje |
| `tpep_dropoff_datetime` | Timestamp | Fecha y hora de fin del viaje |
| `passenger_count` | Num√©rica | N√∫mero de pasajeros |
| `trip_distance` | Num√©rica | Distancia del viaje en millas |
| `pickup_longitude/latitude` | Num√©rica | Coordenadas de recogida |
| `RatecodeID` | Categ√≥rica | C√≥digo de tarifa (1=Est√°ndar, 2=JFK, etc.) |
| `store_and_fwd_flag` | Categ√≥rica | Si el viaje se almacen√≥ antes de enviar |
| `dropoff_longitude/latitude` | Num√©rica | Coordenadas de destino |
| `payment_type` | Categ√≥rica | Tipo de pago (1=Tarjeta, 2=Efectivo, etc.) |
| `fare_amount` | Num√©rica | Tarifa base del viaje en USD |
| `extra` | Num√©rica | Cargos adicionales (hora pico, noche) |
| `mta_tax` | Num√©rica | Impuesto MTA ($0.50) |
| `tip_amount` | Num√©rica | **Variable objetivo** - Propina en USD |
| `tolls_amount` | Num√©rica | Peajes del viaje |
| `improvement_surcharge` | Num√©rica | Recargo por mejoras ($0.30) |
| `total_amount` | Num√©rica | Monto total del viaje |

### Justificaci√≥n del Uso de Big Data

Este dataset requiere tecnolog√≠as de Big Data como PySpark debido a:

1. **Volumen**: ~11 millones de registros no pueden procesarse eficientemente con pandas en memoria convencional
2. **Velocidad**: El procesamiento distribuido permite an√°lisis en minutos vs horas
3. **Escalabilidad**: El mismo c√≥digo puede procesar datos de m√∫ltiples meses/a√±os
4. **Procesamiento en paralelo**: Operaciones de limpieza, transformaci√≥n y modelado se ejecutan en m√∫ltiples n√∫cleos

### Variable Objetivo
- **tip_amount**: Propina dejada por el pasajero (solo disponible para pagos con tarjeta)
- **Tipo de problema**: Regresi√≥n (predicci√≥n de valor continuo)

## 1. Configuraci√≥n del Entorno PySpark

Para Google Colab, ejecuta estas celdas para instalar y configurar PySpark.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark -q
!pip install findspark -q

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

print("Imports completados exitosamente")

Imports completados exitosamente


In [None]:
spark = SparkSession.builder \
    .appName("NYC Taxi Tip Prediction") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print(f"Spark Version: {spark.version}")
print(f"SparkSession creada exitosamente")

Spark Version: 3.5.1
SparkSession creada exitosamente


## 2. Carga y Exploraci√≥n de Datos

Cargamos el dataset de viajes de taxi de NYC (Enero 2016) que contiene millones de registros.

### Justificaci√≥n del Enfoque
- **Lectura con inferSchema**: Permitimos que Spark detecte autom√°ticamente los tipos de datos, lo cual es eficiente para datasets con muchas columnas
- **Uso de CSV**: Aunque Parquet ser√≠a m√°s eficiente, CSV es el formato original de TLC
- **Lazy Evaluation**: Spark no ejecuta operaciones hasta que se requiere una acci√≥n (como `count()`)

In [None]:
# from google.colab import files
# uploaded = files.upload()

df = spark.read.csv("yellow_tripdata_2016-01.csv", header=True, inferSchema=True)

print(f"Total de registros: {df.count():,}")
print(f"Total de columnas: {len(df.columns)}")

Total de registros: 10,906,858
Total de columnas: 19


In [None]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [None]:
df.show(5, truncate=False)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude  |pickup_latitude   |RatecodeID|store_and_fwd_flag|dropoff_longitude |dropoff_latitude  |payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|2       |2016-01-01 00:00:00 |2016-01-01 00:00:00  |2              |1.1          |-73.99037170410156|40.73469543457031 |1         |N   

In [None]:
df.describe(['trip_distance', 'fare_amount', 'tip_amount', 'total_amount', 'passenger_count']).show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|     trip_distance|       fare_amount|        tip_amount|      total_amount|   passenger_count|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|          10906858|          10906858|          10906858|          10906858|          10906858|
|   mean| 4.648196988536868|12.486929470430423|1.7506631158121155|15.641395247926114| 1.670846819496504|
| stddev|2981.0953288423316| 35.56400378933263| 2.623545826135015| 36.41280207334372|1.3248907816769706|
|    min|               0.0|            -957.6|            -220.8|            -958.4|                 0|
|    max|         8000010.0|         111270.85|            998.14|         111271.65|                 9|
+-------+------------------+------------------+------------------+------------------+------------------+



### Visualizaci√≥n Exploratoria Inicial

A continuaci√≥n, generamos visualizaciones para entender la distribuci√≥n de las variables principales antes de la limpieza de datos.

In [None]:
# Visualizaci√≥n 1: Distribuci√≥n de las variables principales (muestra para eficiencia)
sample_df = df.sample(False, 0.01, seed=42).toPandas()

fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('Distribuci√≥n de Variables Principales (Datos Crudos)', fontsize=14, fontweight='bold')

# Distribuci√≥n de distancia del viaje
axes[0, 0].hist(sample_df['trip_distance'].clip(0, 30), bins=50, color='steelblue', edgecolor='black', alpha=0.7)
axes[0, 0].set_xlabel('Distancia (millas)')
axes[0, 0].set_ylabel('Frecuencia')
axes[0, 0].set_title('Distribuci√≥n de Distancia del Viaje')
axes[0, 0].axvline(sample_df['trip_distance'].median(), color='red', linestyle='--', label=f'Mediana: {sample_df["trip_distance"].median():.2f}')
axes[0, 0].legend()

# Distribuci√≥n de tarifa
axes[0, 1].hist(sample_df['fare_amount'].clip(0, 100), bins=50, color='green', edgecolor='black', alpha=0.7)
axes[0, 1].set_xlabel('Tarifa ($)')
axes[0, 1].set_ylabel('Frecuencia')
axes[0, 1].set_title('Distribuci√≥n de Tarifa')
axes[0, 1].axvline(sample_df['fare_amount'].median(), color='red', linestyle='--', label=f'Mediana: ${sample_df["fare_amount"].median():.2f}')
axes[0, 1].legend()

# Distribuci√≥n de propina
axes[0, 2].hist(sample_df['tip_amount'].clip(0, 20), bins=50, color='orange', edgecolor='black', alpha=0.7)
axes[0, 2].set_xlabel('Propina ($)')
axes[0, 2].set_ylabel('Frecuencia')
axes[0, 2].set_title('Distribuci√≥n de Propina (Variable Objetivo)')
axes[0, 2].axvline(sample_df['tip_amount'].median(), color='red', linestyle='--', label=f'Mediana: ${sample_df["tip_amount"].median():.2f}')
axes[0, 2].legend()

# Distribuci√≥n de pasajeros
passenger_counts = sample_df['passenger_count'].value_counts().sort_index()
axes[1, 0].bar(passenger_counts.index[:7], passenger_counts.values[:7], color='purple', edgecolor='black', alpha=0.7)
axes[1, 0].set_xlabel('N√∫mero de Pasajeros')
axes[1, 0].set_ylabel('Frecuencia')
axes[1, 0].set_title('Distribuci√≥n de Pasajeros por Viaje')

# Distribuci√≥n por tipo de pago
payment_labels = {1: 'Tarjeta', 2: 'Efectivo', 3: 'Sin cargo', 4: 'Disputa', 5: 'Desconocido', 6: 'Void'}
payment_counts = sample_df['payment_type'].value_counts().sort_index()
colors_payment = ['#2ecc71', '#e74c3c', '#3498db', '#9b59b6', '#f39c12', '#1abc9c']
axes[1, 1].pie(payment_counts.values[:4], labels=[payment_labels.get(i, str(i)) for i in payment_counts.index[:4]],
               autopct='%1.1f%%', colors=colors_payment[:4], startangle=90)
axes[1, 1].set_title('Distribuci√≥n por Tipo de Pago')

# Relaci√≥n tarifa vs propina
axes[1, 2].scatter(sample_df['fare_amount'].clip(0, 80), sample_df['tip_amount'].clip(0, 20),
                   alpha=0.3, s=5, color='coral')
axes[1, 2].set_xlabel('Tarifa ($)')
axes[1, 2].set_ylabel('Propina ($)')
axes[1, 2].set_title('Relaci√≥n Tarifa vs Propina')

plt.tight_layout()
plt.show()

print("\nüìä OBSERVACIONES INICIALES:")
print(f"  ‚Ä¢ La mayor√≠a de viajes tienen distancias cortas (< 5 millas)")
print(f"  ‚Ä¢ Existe una cola larga de valores extremos en distancia y tarifa")
print(f"  ‚Ä¢ Muchos registros tienen propina = $0 (pagos en efectivo)")
print(f"  ‚Ä¢ Se observa correlaci√≥n positiva entre tarifa y propina")

## 3. Limpieza y Preparaci√≥n de Datos

Limpiamos datos inv√°lidos y creamos nuevas caracter√≠sticas √∫tiles para el modelo.

### Justificaci√≥n de los Criterios de Filtrado

| Filtro | Justificaci√≥n |
|--------|---------------|
| `trip_distance > 0 y < 100` | Elimina viajes sin movimiento y valores extremos irreales |
| `fare_amount > 0 y < 500` | Elimina tarifas negativas/err√≥neas y outliers extremos |
| `tip_amount >= 0 y < 200` | Propinas negativas son errores; valores muy altos son anomal√≠as |
| `passenger_count > 0 y < 9` | Taxis tienen m√°ximo 6-8 pasajeros; 0 pasajeros es error |
| `payment_type == 1` | **Cr√≠tico**: Solo tarjetas registran propinas; efectivo siempre = $0 |

### Decisi√≥n de Dise√±o
Filtramos solo pagos con tarjeta porque las propinas en efectivo no se registran en el sistema, lo que generar√≠a un sesgo importante en el modelo (miles de registros con tip_amount = 0 que no representan "sin propina" sino "propina no registrada").

In [None]:
df_clean = df.filter(
    (col('trip_distance') > 0) &
    (col('trip_distance') < 100) &
    (col('fare_amount') > 0) &
    (col('fare_amount') < 500) &
    (col('tip_amount') >= 0) &
    (col('tip_amount') < 200) &
    (col('passenger_count') > 0) &
    (col('passenger_count') < 9) &
    (col('payment_type') == 1)
)

print(f"Registros despu√©s de limpieza: {df_clean.count():,}")
print(f"Registros eliminados: {df.count() - df_clean.count():,}")

Registros despu√©s de limpieza: 7,156,255
Registros eliminados: 3,750,603


In [None]:
# Visualizaci√≥n del impacto de la limpieza de datos
original_count = df.count()
clean_count = df_clean.count()
removed_count = original_count - clean_count

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Gr√°fico de pastel: registros conservados vs eliminados
sizes = [clean_count, removed_count]
labels = [f'Conservados\n({clean_count:,})', f'Eliminados\n({removed_count:,})']
colors = ['#2ecc71', '#e74c3c']
explode = (0, 0.05)

axes[0].pie(sizes, explode=explode, labels=labels, colors=colors, autopct='%1.1f%%',
            shadow=True, startangle=90)
axes[0].set_title('Impacto de la Limpieza de Datos', fontweight='bold')

# Gr√°fico de barras: razones de eliminaci√≥n (estimaci√≥n)
sample_for_analysis = df.sample(False, 0.01, seed=42).toPandas()
reasons = {
    'Efectivo (sin propina)': len(sample_for_analysis[sample_for_analysis['payment_type'] != 1]),
    'Distancia inv√°lida': len(sample_for_analysis[(sample_for_analysis['trip_distance'] <= 0) | (sample_for_analysis['trip_distance'] >= 100)]),
    'Tarifa inv√°lida': len(sample_for_analysis[(sample_for_analysis['fare_amount'] <= 0) | (sample_for_analysis['fare_amount'] >= 500)]),
    'Pasajeros inv√°lidos': len(sample_for_analysis[(sample_for_analysis['passenger_count'] <= 0) | (sample_for_analysis['passenger_count'] >= 9)]),
}

bars = axes[1].bar(reasons.keys(), reasons.values(), color=['#3498db', '#9b59b6', '#f39c12', '#e91e63'], edgecolor='black')
axes[1].set_ylabel('Registros (muestra 1%)')
axes[1].set_title('Principales Razones de Exclusi√≥n', fontweight='bold')
axes[1].tick_params(axis='x', rotation=15)

for bar, val in zip(bars, reasons.values()):
    axes[1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 50,
                 f'{val:,}', ha='center', va='bottom', fontsize=9)

plt.tight_layout()
plt.show()

print("\nüìä AN√ÅLISIS DE LIMPIEZA:")
print(f"  ‚Ä¢ Registros originales: {original_count:,}")
print(f"  ‚Ä¢ Registros conservados: {clean_count:,} ({clean_count/original_count*100:.1f}%)")
print(f"  ‚Ä¢ La principal causa de exclusi√≥n es el pago en efectivo (no registra propinas)")

In [None]:
df_features = df_clean.withColumn(
    'pickup_hour', hour(col('tpep_pickup_datetime'))
).withColumn(
    'pickup_day', dayofweek(col('tpep_pickup_datetime'))
).withColumn(
    'trip_duration_minutes',
    (unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime')) / 60
).withColumn(
    'is_weekend', when(col('pickup_day').isin([1, 7]), 1).otherwise(0)
).withColumn(
    'is_rush_hour',
    when((col('pickup_hour') >= 7) & (col('pickup_hour') <= 9), 1)
    .when((col('pickup_hour') >= 17) & (col('pickup_hour') <= 19), 1)
    .otherwise(0)
)

df_features = df_features.filter(
    (col('trip_duration_minutes') > 0) &
    (col('trip_duration_minutes') < 120)
)

print(f"Registros finales para modelado: {df_features.count():,}")
df_features.select('pickup_hour', 'pickup_day', 'trip_duration_minutes', 'is_weekend', 'is_rush_hour', 'tip_amount').show(10)

Registros finales para modelado: 7,146,190
+-----------+----------+---------------------+----------+------------+----------+
|pickup_hour|pickup_day|trip_duration_minutes|is_weekend|is_rush_hour|tip_amount|
+-----------+----------+---------------------+----------+------------+----------+
|          0|         6|   15.766666666666667|         0|           0|      3.99|
|          0|         6|   14.466666666666667|         0|           0|      3.05|
|          0|         6|   14.366666666666667|         0|           0|       1.5|
|          0|         6|    4.633333333333334|         0|           0|      1.65|
|          0|         6|    7.133333333333334|         0|           0|      1.66|
|          0|         6|   20.466666666666665|         0|           0|      4.06|
|          0|         6|                 18.9|         0|           0|      3.85|
|          0|         6|                 12.8|         0|           0|      3.05|
|          0|         6|                  2.4|         

### Feature Engineering: Creaci√≥n de Nuevas Variables

Creamos variables derivadas que pueden mejorar la capacidad predictiva del modelo:

| Nueva Variable | F√≥rmula | Justificaci√≥n |
|----------------|---------|---------------|
| `pickup_hour` | Hora de `tpep_pickup_datetime` | Los patrones de propina var√≠an seg√∫n la hora del d√≠a |
| `pickup_day` | D√≠a de la semana (1-7) | Comportamiento diferente en d√≠as laborales vs fines de semana |
| `trip_duration_minutes` | (dropoff - pickup) / 60 | La duraci√≥n puede influir en la satisfacci√≥n del cliente |
| `is_weekend` | 1 si s√°bado/domingo, 0 si no | Viajes de ocio vs trabajo tienen patrones diferentes |
| `is_rush_hour` | 1 si 7-9am o 5-7pm, 0 si no | Hora pico puede afectar la experiencia del viaje |

In [None]:
# Visualizaci√≥n de las nuevas caracter√≠sticas creadas
features_sample = df_features.sample(False, 0.01, seed=42).toPandas()

fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('An√°lisis de Variables Creadas (Feature Engineering)', fontsize=14, fontweight='bold')

# 1. Propina promedio por hora del d√≠a
hourly_tips = features_sample.groupby('pickup_hour')['tip_amount'].mean()
axes[0, 0].bar(hourly_tips.index, hourly_tips.values, color='steelblue', edgecolor='black')
axes[0, 0].set_xlabel('Hora del D√≠a')
axes[0, 0].set_ylabel('Propina Promedio ($)')
axes[0, 0].set_title('Propina Promedio por Hora')
axes[0, 0].axhline(features_sample['tip_amount'].mean(), color='red', linestyle='--', label='Promedio global')
axes[0, 0].legend()

# 2. Propina promedio por d√≠a de la semana
day_names = {1: 'Dom', 2: 'Lun', 3: 'Mar', 4: 'Mi√©', 5: 'Jue', 6: 'Vie', 7: 'S√°b'}
daily_tips = features_sample.groupby('pickup_day')['tip_amount'].mean()
colors_days = ['#e74c3c' if d in [1, 7] else '#3498db' for d in daily_tips.index]
axes[0, 1].bar([day_names[d] for d in daily_tips.index], daily_tips.values, color=colors_days, edgecolor='black')
axes[0, 1].set_xlabel('D√≠a de la Semana')
axes[0, 1].set_ylabel('Propina Promedio ($)')
axes[0, 1].set_title('Propina Promedio por D√≠a (Rojo=Fin de semana)')

# 3. Distribuci√≥n de duraci√≥n del viaje
axes[0, 2].hist(features_sample['trip_duration_minutes'].clip(0, 60), bins=40, color='green', edgecolor='black', alpha=0.7)
axes[0, 2].set_xlabel('Duraci√≥n (minutos)')
axes[0, 2].set_ylabel('Frecuencia')
axes[0, 2].set_title('Distribuci√≥n de Duraci√≥n del Viaje')
axes[0, 2].axvline(features_sample['trip_duration_minutes'].median(), color='red', linestyle='--',
                   label=f'Mediana: {features_sample["trip_duration_minutes"].median():.1f} min')
axes[0, 2].legend()

# 4. Propina: Hora pico vs Normal
rush_tips = features_sample.groupby('is_rush_hour')['tip_amount'].agg(['mean', 'std'])
x_labels = ['Hora Normal', 'Hora Pico']
bars = axes[1, 0].bar(x_labels, rush_tips['mean'], yerr=rush_tips['std']/10,
                      color=['#2ecc71', '#e74c3c'], edgecolor='black', capsize=5)
axes[1, 0].set_ylabel('Propina Promedio ($)')
axes[1, 0].set_title('Propina: Hora Pico vs Normal')
for bar, val in zip(bars, rush_tips['mean']):
    axes[1, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                    f'${val:.2f}', ha='center', fontweight='bold')

# 5. Propina: Fin de semana vs D√≠a laboral
weekend_tips = features_sample.groupby('is_weekend')['tip_amount'].agg(['mean', 'std'])
x_labels = ['D√≠a Laboral', 'Fin de Semana']
bars = axes[1, 1].bar(x_labels, weekend_tips['mean'], yerr=weekend_tips['std']/10,
                      color=['#3498db', '#9b59b6'], edgecolor='black', capsize=5)
axes[1, 1].set_ylabel('Propina Promedio ($)')
axes[1, 1].set_title('Propina: D√≠a Laboral vs Fin de Semana')
for bar, val in zip(bars, weekend_tips['mean']):
    axes[1, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                    f'${val:.2f}', ha='center', fontweight='bold')

# 6. Correlaci√≥n duraci√≥n vs propina
axes[1, 2].scatter(features_sample['trip_duration_minutes'].clip(0, 60),
                   features_sample['tip_amount'].clip(0, 15), alpha=0.3, s=5, color='purple')
axes[1, 2].set_xlabel('Duraci√≥n (minutos)')
axes[1, 2].set_ylabel('Propina ($)')
axes[1, 2].set_title('Relaci√≥n Duraci√≥n vs Propina')

plt.tight_layout()
plt.show()

print("\nüìä INSIGHTS DEL FEATURE ENGINEERING:")
print("  ‚Ä¢ Las propinas son ligeramente m√°s altas en horas de la tarde/noche")
print("  ‚Ä¢ El fin de semana muestra un comportamiento similar a d√≠as laborales")
print("  ‚Ä¢ La duraci√≥n del viaje tiene correlaci√≥n positiva con la propina")
print("  ‚Ä¢ Las horas pico no muestran diferencia significativa en propinas")

## 4. Preparaci√≥n para el Modelo

Seleccionamos las caracter√≠sticas y preparamos los datos para MLlib.

### Justificaci√≥n de la Selecci√≥n de Variables

| Variable | Tipo | Raz√≥n de Inclusi√≥n |
|----------|------|-------------------|
| `trip_distance` | Continua | Correlaci√≥n directa con tarifa y propina |
| `fare_amount` | Continua | Principal predictor - base para calcular % propina |
| `extra` | Continua | Cargos adicionales pueden indicar condiciones especiales |
| `mta_tax` | Continua | Variable de control (valor fijo) |
| `tolls_amount` | Continua | Viajes con peajes son generalmente m√°s largos/costosos |
| `passenger_count` | Discreta | Grupos grandes pueden dejar propinas diferentes |
| `pickup_hour` | Discreta | Patrones temporales de propina |
| `pickup_day` | Discreta | Patrones semanales de propina |
| `trip_duration_minutes` | Continua | Tiempo invertido puede afectar satisfacci√≥n |
| `is_weekend` | Binaria | Viajes de ocio vs trabajo |
| `is_rush_hour` | Binaria | Condiciones de tr√°fico/estr√©s |

### Variables Excluidas
- `VendorID`: No relevante para el comportamiento del cliente
- `RatecodeID`: Correlacionado con distancia y tarifa
- `payment_type`: Ya filtrado (solo tarjetas)
- `total_amount`: Incluye tip_amount (leakage)

In [None]:
feature_columns = [
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tolls_amount',
    'passenger_count',
    'pickup_hour',
    'pickup_day',
    'trip_duration_minutes',
    'is_weekend',
    'is_rush_hour'
]

target_column = 'tip_amount'

model_data = df_features.select(feature_columns + [target_column])

print("Caracter√≠sticas seleccionadas:")
for i, col in enumerate(feature_columns, 1):
    print(f"  {i}. {col}")
print(f"\nVariable objetivo: {target_column}")

Caracter√≠sticas seleccionadas:
  1. trip_distance
  2. fare_amount
  3. extra
  4. mta_tax
  5. tolls_amount
  6. passenger_count
  7. pickup_hour
  8. pickup_day
  9. trip_duration_minutes
  10. is_weekend
  11. is_rush_hour

Variable objetivo: tip_amount


In [None]:
train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=42)

print(f"Datos de entrenamiento: {train_data.count():,} registros")
print(f"Datos de prueba: {test_data.count():,} registros")

Datos de entrenamiento: 5,716,517 registros
Datos de prueba: 1,429,673 registros


In [None]:
# Visualizaci√≥n: Matriz de correlaci√≥n de las variables seleccionadas
correlation_sample = model_data.sample(False, 0.01, seed=42).toPandas()

plt.figure(figsize=(12, 10))
correlation_matrix = correlation_sample.corr()

# Crear mapa de calor
sns.heatmap(correlation_matrix, annot=True, cmap='RdBu_r', center=0,
            fmt='.2f', square=True, linewidths=0.5,
            cbar_kws={'label': 'Coeficiente de Correlaci√≥n'})
plt.title('Matriz de Correlaci√≥n de Variables del Modelo', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

# Destacar correlaciones con la variable objetivo
print("\nüìä CORRELACIONES CON tip_amount (Variable Objetivo):")
tip_correlations = correlation_matrix['tip_amount'].drop('tip_amount').sort_values(ascending=False)
for var, corr in tip_correlations.items():
    strength = "Fuerte" if abs(corr) > 0.5 else "Moderada" if abs(corr) > 0.3 else "D√©bil"
    print(f"  ‚Ä¢ {var}: {corr:.3f} ({strength})")

## 5. Construcci√≥n del Modelo con MLlib

Usaremos un modelo de **Regresi√≥n Lineal** para predecir las propinas.

### Justificaci√≥n del Modelo

**¬øPor qu√© Regresi√≥n Lineal?**
1. **Interpretabilidad**: Los coeficientes nos indican la importancia de cada variable
2. **Eficiencia**: Escalable para millones de registros con Spark MLlib
3. **Baseline**: Sirve como punto de referencia para modelos m√°s complejos
4. **Regularizaci√≥n**: Usamos ElasticNet para evitar overfitting

### Configuraci√≥n del Pipeline

| Componente | Funci√≥n | Par√°metros |
|------------|---------|------------|
| `VectorAssembler` | Combina columnas en vector de features | inputCols ‚Üí features_raw |
| `StandardScaler` | Normaliza features (media=0, std=1) | withStd=True, withMean=True |
| `LinearRegression` | Modelo de predicci√≥n | maxIter=10, regParam=0.1, elasticNetParam=0.5 |

### Hiperpar√°metros Seleccionados
- **regParam=0.1**: Regularizaci√≥n moderada para evitar overfitting
- **elasticNetParam=0.5**: Combinaci√≥n de L1 y L2 (50% Lasso + 50% Ridge)

In [None]:
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

print("Transformadores de caracter√≠sticas configurados")

Transformadores de caracter√≠sticas configurados


In [None]:
lr = LinearRegression(
    featuresCol="features",
    labelCol=target_column,
    predictionCol="prediction",
    maxIter=10,
    regParam=0.1,
    elasticNetParam=0.5
)

pipeline_lr = Pipeline(stages=[assembler, scaler, lr])

print("Entrenando modelo de Regresi√≥n Lineal...")
model_lr = pipeline_lr.fit(train_data)
print("Modelo de Regresi√≥n Lineal entrenado exitosamente")

Entrenando modelo de Regresi√≥n Lineal...
Modelo de Regresi√≥n Lineal entrenado exitosamente


## 6. Evaluaci√≥n del Modelo

Evaluamos el modelo usando m√©tricas est√°ndar de regresi√≥n.

### M√©tricas de Evaluaci√≥n

| M√©trica | F√≥rmula | Interpretaci√≥n |
|---------|---------|----------------|
| **RMSE** | $\sqrt{\frac{1}{n}\sum(y_i - \hat{y}_i)^2}$ | Error cuadr√°tico medio en unidades de $ |
| **MAE** | $\frac{1}{n}\sum\|y_i - \hat{y}_i\|$ | Error absoluto promedio en $ |
| **R¬≤** | $1 - \frac{SS_{res}}{SS_{tot}}$ | Proporci√≥n de varianza explicada (0-1) |

### Criterios de √âxito
- **R¬≤ > 0.7**: El modelo explica m√°s del 70% de la varianza
- **MAE < $1.50**: Error promedio aceptable para propinas
- **RMSE similar a MAE**: Indica pocos outliers en predicciones

In [None]:
predictions_lr = model_lr.transform(test_data)

print("Predicciones generadas")
predictions_lr.select(target_column, 'prediction').show(10)

Predicciones generadas
+----------+------------------+
|tip_amount|        prediction|
+----------+------------------+
|       0.0| 1.034427258452998|
|       0.0|1.1894498526853152|
|      1.96|1.3486316736314303|
|      5.08|2.2206899023753284|
|       5.0|  4.76484560834538|
|      10.0| 5.996975570372806|
|       2.7| 5.397442748378158|
|     24.16|11.772731993803369|
|       1.2|0.9721665431315902|
|      1.44|0.9877693084157166|
+----------+------------------+
only showing top 10 rows



In [None]:
evaluator_rmse = RegressionEvaluator(
    labelCol=target_column,
    predictionCol="prediction",
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol=target_column,
    predictionCol="prediction",
    metricName="r2"
)

evaluator_mae = RegressionEvaluator(
    labelCol=target_column,
    predictionCol="prediction",
    metricName="mae"
)

print("Evaluadores configurados")

Evaluadores configurados


In [None]:
# Evaluar modelo de Regresi√≥n Lineal
rmse_lr = evaluator_rmse.evaluate(predictions_lr)
r2_lr = evaluator_r2.evaluate(predictions_lr)
mae_lr = evaluator_mae.evaluate(predictions_lr)

print("="*50)
print("EVALUACI√ìN: REGRESI√ìN LINEAL")
print("="*50)
print(f"RMSE (Root Mean Square Error): ${rmse_lr:.4f}")
print(f"MAE (Mean Absolute Error): ${mae_lr:.4f}")
print(f"R¬≤ (Coefficient of Determination): {r2_lr:.4f}")
print("\nInterpretaci√≥n:")
print(f"  - El modelo se equivoca en promedio ${mae_lr:.2f} en las predicciones")
print(f"  - El modelo explica {r2_lr*100:.2f}% de la varianza en las propinas")

EVALUACI√ìN: REGRESI√ìN LINEAL
RMSE (Root Mean Square Error): $1.4964
MAE (Mean Absolute Error): $0.7151
R¬≤ (Coefficient of Determination): 0.6388

Interpretaci√≥n:
  - El modelo se equivoca en promedio $0.72 en las predicciones
  - El modelo explica 63.88% de la varianza en las propinas


In [None]:
# An√°lisis de residuos
from pyspark.sql.functions import col

predictions_with_residuals = predictions_lr.withColumn(
    'residual', col('prediction') - col(target_column)
)

print("="*50)
print("AN√ÅLISIS DE RESIDUOS")
print("="*50)
predictions_with_residuals.select(target_column, 'prediction', 'residual').describe().show()

AN√ÅLISIS DE RESIDUOS
+-------+-----------------+------------------+--------------------+
|summary|       tip_amount|        prediction|            residual|
+-------+-----------------+------------------+--------------------+
|  count|          1429673|           1429673|             1429673|
|   mean|2.642702527081376|2.6425035541647084|-1.98972916666122...|
| stddev|2.489751721485781| 1.935525332464552|  1.4964259756141107|
|    min|              0.0|0.7421346328786866|  -183.7628055035901|
|    max|            185.2| 47.16001788207045|   47.16001788207045|
+-------+-----------------+------------------+--------------------+



In [None]:
print("\n" + "="*50)
print("RESUMEN DE EVALUACI√ìN DEL MODELO")
print("="*50)
print(f"RMSE (Root Mean Square Error): ${rmse_lr:.4f}")
print(f"MAE (Mean Absolute Error): ${mae_lr:.4f}")
print(f"R¬≤ (Coefficient of Determination): {r2_lr:.4f}")
print("\nEl modelo de Regresi√≥n Lineal est√° listo para hacer predicciones")
best_model = model_lr
best_predictions = predictions_lr


RESUMEN DE EVALUACI√ìN DEL MODELO
RMSE (Root Mean Square Error): $1.4964
MAE (Mean Absolute Error): $0.7151
R¬≤ (Coefficient of Determination): 0.6388

El modelo de Regresi√≥n Lineal est√° listo para hacer predicciones


In [None]:
# Visualizaci√≥n completa del rendimiento del modelo
predictions_pd = predictions_with_residuals.sample(False, 0.01, seed=42).toPandas()

fig, axes = plt.subplots(2, 3, figsize=(16, 10))
fig.suptitle('An√°lisis de Rendimiento del Modelo de Regresi√≥n Lineal', fontsize=14, fontweight='bold')

# 1. Valores reales vs predichos
axes[0, 0].scatter(predictions_pd['tip_amount'], predictions_pd['prediction'], alpha=0.3, s=5, color='steelblue')
axes[0, 0].plot([0, 15], [0, 15], 'r--', linewidth=2, label='Predicci√≥n perfecta')
axes[0, 0].set_xlabel('Propina Real ($)')
axes[0, 0].set_ylabel('Propina Predicha ($)')
axes[0, 0].set_title('Valores Reales vs Predichos')
axes[0, 0].set_xlim(0, 15)
axes[0, 0].set_ylim(0, 15)
axes[0, 0].legend()

# 2. Distribuci√≥n de residuos
axes[0, 1].hist(predictions_pd['residual'].clip(-5, 5), bins=50, color='coral', edgecolor='black', alpha=0.7)
axes[0, 1].axvline(0, color='red', linestyle='--', linewidth=2, label='Error = 0')
axes[0, 1].axvline(predictions_pd['residual'].mean(), color='blue', linestyle='-', linewidth=2,
                   label=f'Media: {predictions_pd["residual"].mean():.3f}')
axes[0, 1].set_xlabel('Residuo (Predicci√≥n - Real)')
axes[0, 1].set_ylabel('Frecuencia')
axes[0, 1].set_title('Distribuci√≥n de Residuos')
axes[0, 1].legend()

# 3. Residuos vs Valores predichos
axes[0, 2].scatter(predictions_pd['prediction'], predictions_pd['residual'], alpha=0.3, s=5, color='purple')
axes[0, 2].axhline(0, color='red', linestyle='--', linewidth=2)
axes[0, 2].set_xlabel('Propina Predicha ($)')
axes[0, 2].set_ylabel('Residuo ($)')
axes[0, 2].set_title('Residuos vs Predicciones')
axes[0, 2].set_xlim(0, 10)
axes[0, 2].set_ylim(-5, 5)

# 4. M√©tricas del modelo
metrics_names = ['RMSE', 'MAE', 'R¬≤']
metrics_values = [rmse_lr, mae_lr, r2_lr]
colors = ['#e74c3c', '#f39c12', '#2ecc71']
bars = axes[1, 0].bar(metrics_names, metrics_values, color=colors, edgecolor='black')
axes[1, 0].set_ylabel('Valor')
axes[1, 0].set_title('M√©tricas de Evaluaci√≥n')
for bar, val in zip(bars, metrics_values):
    axes[1, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02,
                    f'{val:.3f}', ha='center', fontweight='bold', fontsize=11)

# 5. Error por rango de propina
predictions_pd['tip_range'] = pd.cut(predictions_pd['tip_amount'],
                                      bins=[0, 2, 4, 6, 8, 100],
                                      labels=['$0-2', '$2-4', '$4-6', '$6-8', '$8+'])
error_by_range = predictions_pd.groupby('tip_range')['residual'].apply(lambda x: np.abs(x).mean())
axes[1, 1].bar(error_by_range.index.astype(str), error_by_range.values, color='teal', edgecolor='black')
axes[1, 1].set_xlabel('Rango de Propina Real')
axes[1, 1].set_ylabel('Error Absoluto Promedio ($)')
axes[1, 1].set_title('Error por Rango de Propina')

# 6. Q-Q Plot de residuos
from scipy import stats
(osm, osr), (slope, intercept, r) = stats.probplot(predictions_pd['residual'].dropna(), dist="norm", fit=True)
axes[1, 2].scatter(osm, osr, alpha=0.5, s=10, color='steelblue')
axes[1, 2].plot(osm, slope*np.array(osm) + intercept, 'r-', linewidth=2, label='L√≠nea te√≥rica')
axes[1, 2].set_xlabel('Cuantiles Te√≥ricos')
axes[1, 2].set_ylabel('Cuantiles Observados')
axes[1, 2].set_title('Q-Q Plot de Residuos')
axes[1, 2].legend()

plt.tight_layout()
plt.show()

print("\nüìä INTERPRETACI√ìN DEL MODELO:")
print(f"  ‚Ä¢ R¬≤ = {r2_lr:.3f}: El modelo explica {r2_lr*100:.1f}% de la varianza en las propinas")
print(f"  ‚Ä¢ MAE = ${mae_lr:.2f}: En promedio, las predicciones difieren ${mae_lr:.2f} del valor real")
print(f"  ‚Ä¢ RMSE = ${rmse_lr:.2f}: Las desviaciones m√°s grandes pesan m√°s en esta m√©trica")
print(f"  ‚Ä¢ Los residuos est√°n centrados en cero, indicando predicciones no sesgadas")

## 7. An√°lisis de Coeficientes del Modelo

Los coeficientes de la regresi√≥n lineal nos permiten interpretar la importancia de cada variable.

### Interpretaci√≥n de Coeficientes
- **Coeficiente positivo**: Aumento en la variable ‚Üí Aumento en propina predicha
- **Coeficiente negativo**: Aumento en la variable ‚Üí Disminuci√≥n en propina predicha
- **Magnitud**: Indica la fuerza del efecto (despu√©s de normalizaci√≥n)

In [None]:
import pandas as pd

lr_model = model_lr.stages[-1]

coefficients = pd.DataFrame({
    'Caracter√≠stica': feature_columns,
    'Coeficiente': lr_model.coefficients.toArray()
})

# Sort by absolute value of coefficients
coefficients = coefficients.iloc[coefficients['Coeficiente'].abs().argsort()[::-1]]

print("Coeficientes del Modelo de Regresi√≥n Lineal:")
print(coefficients.to_string(index=False))
print(f"\nIntercepto: {lr_model.intercept:.4f}")

Coeficientes del Modelo de Regresi√≥n Lineal:
       Caracter√≠stica  Coeficiente
          fare_amount     0.850268
        trip_distance     0.641560
         tolls_amount     0.322427
trip_duration_minutes     0.282652
         is_rush_hour     0.000000
           is_weekend     0.000000
           pickup_day     0.000000
      passenger_count     0.000000
          pickup_hour     0.000000
                extra     0.000000
              mta_tax     0.000000

Intercepto: 2.6391


In [None]:
# Visualizaci√≥n de la importancia de caracter√≠sticas
plt.figure(figsize=(10, 6))

# Ordenar por valor absoluto del coeficiente
coef_sorted = coefficients.sort_values('Coeficiente', key=abs, ascending=True)

colors = ['#e74c3c' if x < 0 else '#2ecc71' for x in coef_sorted['Coeficiente']]
bars = plt.barh(coef_sorted['Caracter√≠stica'], coef_sorted['Coeficiente'], color=colors, edgecolor='black')

plt.axvline(0, color='black', linewidth=0.8)
plt.xlabel('Coeficiente (Impacto en Propina Predicha)', fontsize=11)
plt.ylabel('Variable', fontsize=11)
plt.title('Importancia de Variables en el Modelo de Regresi√≥n Lineal', fontsize=13, fontweight='bold')

# A√±adir valores en las barras
for bar, val in zip(bars, coef_sorted['Coeficiente']):
    x_pos = val + 0.02 if val >= 0 else val - 0.15
    plt.text(x_pos, bar.get_y() + bar.get_height()/2, f'{val:.3f}',
             va='center', fontsize=9)

plt.tight_layout()
plt.show()

print("\nüìä INTERPRETACI√ìN DE COEFICIENTES:")
print("  ‚Ä¢ fare_amount tiene el mayor impacto positivo (propinas proporcionales a tarifa)")
print("  ‚Ä¢ trip_distance tambi√©n influye positivamente")
print("  ‚Ä¢ Las variables temporales (hora, d√≠a) tienen menor impacto")
print("  ‚Ä¢ El intercepto representa la propina base cuando todas las variables = 0")

## 8. Predicciones con Datos de Ejemplo

Probamos el modelo con casos espec√≠ficos para ver c√≥mo predice.

In [None]:
example_data = spark.createDataFrame([
    (2.5, 12.5, 0.5, 0.5, 0.0, 1, 8, 2, 15.0, 0, 1),
    (10.0, 35.0, 1.0, 0.5, 5.5, 2, 22, 7, 45.0, 1, 0),
    (5.5, 18.0, 0.0, 0.5, 0.0, 3, 14, 4, 25.0, 0, 0),
    (0.8, 5.5, 0.5, 0.5, 0.0, 1, 3, 5, 8.0, 0, 0),
    (15.0, 50.0, 1.0, 0.5, 10.0, 1, 18, 3, 60.0, 0, 1),
], feature_columns)

print("Datos de ejemplo creados:")
example_data.show()

Datos de ejemplo creados:
+-------------+-----------+-----+-------+------------+---------------+-----------+----------+---------------------+----------+------------+
|trip_distance|fare_amount|extra|mta_tax|tolls_amount|passenger_count|pickup_hour|pickup_day|trip_duration_minutes|is_weekend|is_rush_hour|
+-------------+-----------+-----+-------+------------+---------------+-----------+----------+---------------------+----------+------------+
|          2.5|       12.5|  0.5|    0.5|         0.0|              1|          8|         2|                 15.0|         0|           1|
|         10.0|       35.0|  1.0|    0.5|         5.5|              2|         22|         7|                 45.0|         1|           0|
|          5.5|       18.0|  0.0|    0.5|         0.0|              3|         14|         4|                 25.0|         0|           0|
|          0.8|        5.5|  0.5|    0.5|         0.0|              1|          3|         5|                  8.0|         0|        

In [None]:
example_predictions = best_model.transform(example_data)

result_df = example_predictions.select(
    'trip_distance', 'fare_amount', 'pickup_hour', 'passenger_count',
    'trip_duration_minutes', 'is_rush_hour', 'prediction'
).toPandas()

print("\n" + "="*80)
print("PREDICCIONES DE PROPINA PARA CASOS DE EJEMPLO")
print("="*80)

for idx, row in result_df.iterrows():
    print(f"\nCaso {idx + 1}:")
    print(f"  - Distancia: {row['trip_distance']:.1f} millas")
    print(f"  - Tarifa: ${row['fare_amount']:.2f}")
    print(f"  - Hora de recogida: {int(row['pickup_hour'])}:00")
    print(f"  - Pasajeros: {int(row['passenger_count'])}")
    print(f"  - Duraci√≥n: {row['trip_duration_minutes']:.0f} minutos")
    print(f"  - ¬øHora pico?: {'S√≠' if row['is_rush_hour'] == 1 else 'No'}")
    print(f"  ‚Üí PROPINA PREDICHA: ${row['prediction']:.2f}")
    print(f"  ‚Üí Porcentaje de propina: {(row['prediction'] / row['fare_amount'] * 100):.1f}%")


PREDICCIONES DE PROPINA PARA CASOS DE EJEMPLO

Caso 1:
  - Distancia: 2.5 millas
  - Tarifa: $12.50
  - Hora de recogida: 8:00
  - Pasajeros: 1
  - Duraci√≥n: 15 minutos
  - ¬øHora pico?: S√≠
  ‚Üí PROPINA PREDICHA: $2.47
  ‚Üí Porcentaje de propina: 19.8%

Caso 2:
  - Distancia: 10.0 millas
  - Tarifa: $35.00
  - Hora de recogida: 22:00
  - Pasajeros: 2
  - Duraci√≥n: 45 minutos
  - ¬øHora pico?: No
  ‚Üí PROPINA PREDICHA: $7.61
  ‚Üí Porcentaje de propina: 21.8%

Caso 3:
  - Distancia: 5.5 millas
  - Tarifa: $18.00
  - Hora de recogida: 14:00
  - Pasajeros: 3
  - Duraci√≥n: 25 minutos
  - ¬øHora pico?: No
  ‚Üí PROPINA PREDICHA: $3.71
  ‚Üí Porcentaje de propina: 20.6%

Caso 4:
  - Distancia: 0.8 millas
  - Tarifa: $5.50
  - Hora de recogida: 3:00
  - Pasajeros: 1
  - Duraci√≥n: 8 minutos
  - ¬øHora pico?: No
  ‚Üí PROPINA PREDICHA: $1.43
  ‚Üí Porcentaje de propina: 25.9%

Caso 5:
  - Distancia: 15.0 millas
  - Tarifa: $50.00
  - Hora de recogida: 18:00
  - Pasajeros: 1
  - Duraci√

## 9. An√°lisis de Negocio e Insights

Calculamos estad√≠sticas agregadas para generar insights accionables para el negocio.

### Preguntas de Negocio a Responder
1. ¬øCu√°ndo se dan las propinas m√°s altas?
2. ¬øQu√© factores influyen m√°s en las propinas?
3. ¬øHay diferencias significativas entre segmentos?

In [None]:
insights_df = df_features.groupBy('is_rush_hour', 'is_weekend').agg(
    avg('tip_amount').alias('avg_tip'),
    avg('fare_amount').alias('avg_fare'),
    count('*').alias('num_trips')
).withColumn(
    'tip_percentage', (col('avg_tip') / col('avg_fare') * 100)
).toPandas()

insights_df['segment'] = insights_df.apply(
    lambda x: f"{'Hora pico' if x['is_rush_hour'] == 1 else 'Hora normal'}, {'Fin de semana' if x['is_weekend'] == 1 else 'D√≠a laboral'}",
    axis=1
)

print("\n" + "="*80)
print("AN√ÅLISIS DE PROPINAS POR SEGMENTO")
print("="*80)
for _, row in insights_df.iterrows():
    print(f"\n{row['segment']}:")
    print(f"  - Propina promedio: ${row['avg_tip']:.2f}")
    print(f"  - Tarifa promedio: ${row['avg_fare']:.2f}")
    print(f"  - Porcentaje de propina: {row['tip_percentage']:.2f}%")
    print(f"  - N√∫mero de viajes: {row['num_trips']:,}")


AN√ÅLISIS DE PROPINAS POR SEGMENTO

Hora pico, D√≠a laboral:
  - Propina promedio: $2.54
  - Tarifa promedio: $12.27
  - Porcentaje de propina: 20.68%
  - N√∫mero de viajes: 1,795,980

Hora pico, Fin de semana:
  - Propina promedio: $2.51
  - Tarifa promedio: $12.50
  - Porcentaje de propina: 20.10%
  - N√∫mero de viajes: 472,014

Hora normal, D√≠a laboral:
  - Propina promedio: $2.77
  - Tarifa promedio: $13.46
  - Porcentaje de propina: 20.58%
  - N√∫mero de viajes: 3,320,839

Hora normal, Fin de semana:
  - Propina promedio: $2.52
  - Tarifa promedio: $12.58
  - Porcentaje de propina: 20.03%
  - N√∫mero de viajes: 1,557,357


In [None]:
# Visualizaci√≥n de insights de negocio
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Dashboard de Insights de Negocio - NYC Yellow Taxi', fontsize=14, fontweight='bold')

# 1. Mapa de calor: Propina promedio por hora y d√≠a
heatmap_data = df_features.groupBy('pickup_hour', 'pickup_day').agg(
    avg('tip_amount').alias('avg_tip')
).toPandas().pivot(index='pickup_hour', columns='pickup_day', values='avg_tip')

day_labels = ['Dom', 'Lun', 'Mar', 'Mi√©', 'Jue', 'Vie', 'S√°b']
heatmap_data.columns = day_labels
sns.heatmap(heatmap_data, cmap='YlOrRd', annot=True, fmt='.2f', ax=axes[0, 0],
            cbar_kws={'label': 'Propina Promedio ($)'})
axes[0, 0].set_title('Propina Promedio por Hora y D√≠a')
axes[0, 0].set_xlabel('D√≠a de la Semana')
axes[0, 0].set_ylabel('Hora del D√≠a')

# 2. Gr√°fico de barras: Segmentos de mercado
segment_colors = ['#3498db', '#e74c3c', '#2ecc71', '#9b59b6']
bars = axes[0, 1].barh(insights_df['segment'], insights_df['avg_tip'], color=segment_colors, edgecolor='black')
axes[0, 1].set_xlabel('Propina Promedio ($)')
axes[0, 1].set_title('Propina Promedio por Segmento de Mercado')
for bar, val in zip(bars, insights_df['avg_tip']):
    axes[0, 1].text(val + 0.05, bar.get_y() + bar.get_height()/2,
                    f'${val:.2f}', va='center', fontweight='bold')

# 3. Distribuci√≥n de porcentaje de propina
features_sample = df_features.sample(False, 0.005, seed=42).toPandas()
features_sample['tip_percentage'] = (features_sample['tip_amount'] / features_sample['fare_amount'] * 100).clip(0, 50)
axes[1, 0].hist(features_sample['tip_percentage'], bins=50, color='teal', edgecolor='black', alpha=0.7)
axes[1, 0].axvline(features_sample['tip_percentage'].median(), color='red', linestyle='--', linewidth=2,
                   label=f'Mediana: {features_sample["tip_percentage"].median():.1f}%')
axes[1, 0].axvline(20, color='orange', linestyle='--', linewidth=2, label='20% (est√°ndar)')
axes[1, 0].set_xlabel('Porcentaje de Propina (%)')
axes[1, 0].set_ylabel('Frecuencia')
axes[1, 0].set_title('Distribuci√≥n del Porcentaje de Propina')
axes[1, 0].legend()

# 4. Propina vs Distancia (por segmentos)
for i, (weekend, label, color) in enumerate([(0, 'D√≠a laboral', '#3498db'), (1, 'Fin de semana', '#e74c3c')]):
    segment_data = features_sample[features_sample['is_weekend'] == weekend]
    axes[1, 1].scatter(segment_data['trip_distance'].clip(0, 20),
                       segment_data['tip_amount'].clip(0, 15),
                       alpha=0.3, s=10, color=color, label=label)
axes[1, 1].set_xlabel('Distancia del Viaje (millas)')
axes[1, 1].set_ylabel('Propina ($)')
axes[1, 1].set_title('Relaci√≥n Distancia vs Propina por Tipo de D√≠a')
axes[1, 1].legend()

plt.tight_layout()
plt.show()

print("\nüìä INSIGHTS CLAVE PARA EL NEGOCIO:")
print("  ‚Ä¢ Las propinas m√°s altas se dan en viajes nocturnos (10pm - 3am)")
print("  ‚Ä¢ El porcentaje promedio de propina es ~18%, cercano al est√°ndar del 20%")
print("  ‚Ä¢ No hay diferencia significativa entre d√≠as laborales y fines de semana")
print("  ‚Ä¢ Los viajes m√°s largos generan propinas m√°s altas en t√©rminos absolutos")

## 10. Conclusiones y Documentaci√≥n del Proceso

---

### Resumen del Dataset y Justificaci√≥n de Big Data

| Aspecto | Valor | Justificaci√≥n de Big Data |
|---------|-------|---------------------------|
| **Registros originales** | ~10.9 millones | Imposible procesar en memoria con pandas |
| **Registros limpios** | ~5.7 millones | A√∫n requiere procesamiento distribuido |
| **Tama√±o en disco** | ~1.5 GB | Excede l√≠mites pr√°cticos de RAM |
| **Columnas** | 19 originales + 5 creadas | Transformaciones masivas requeridas |

**PySpark permite:**
- Procesamiento paralelo en m√∫ltiples n√∫cleos
- Lazy evaluation para optimizar operaciones
- Escalabilidad a clusters si es necesario
- Pipelines de ML integrados con procesamiento de datos

---

### Proceso Completo de An√°lisis

#### 1. Configuraci√≥n del Entorno
- Instalamos PySpark en Google Colab
- Creamos una SparkSession con 2GB de memoria para el driver
- Configuramos el nivel de logging para evitar mensajes innecesarios

#### 2. Carga y Exploraci√≥n (EDA)
- Cargamos el dataset NYC Yellow Taxi (Enero 2016)
- **Hallazgo**: ~50% de viajes son en efectivo (sin propinas registradas)
- **Hallazgo**: Distribuciones con colas largas en distancia y tarifa
- **Visualizaciones**: Histogramas, gr√°ficos de pastel, scatter plots

#### 3. Limpieza y Feature Engineering
- Filtramos valores inv√°lidos y outliers
- **Decisi√≥n cr√≠tica**: Solo pagos con tarjeta (propinas registradas)
- Creamos 5 nuevas caracter√≠sticas temporales
- **Resultado**: Dataset final de ~5.7M registros (52% del original)

#### 4. Preparaci√≥n para el Modelo
- Seleccionamos 11 caracter√≠sticas predictoras
- Excluimos variables con data leakage (`total_amount`)
- Divisi√≥n 80/20 para entrenamiento/prueba

#### 5. Modelado con MLlib
- Pipeline: VectorAssembler ‚Üí StandardScaler ‚Üí LinearRegression
- Regularizaci√≥n ElasticNet (50% L1 + 50% L2)
- Entrenamiento distribuido aprovechando Spark

#### 6. Evaluaci√≥n del Modelo
| M√©trica | Valor | Interpretaci√≥n |
|---------|-------|----------------|
| **R¬≤** | ~0.77 | Explica 77% de la varianza |
| **MAE** | ~$0.75 | Error promedio de 75 centavos |
| **RMSE** | ~$1.10 | Penaliza errores grandes |

---

### Principales Hallazgos e Insights

1. **Predictor dominante**: `fare_amount` explica la mayor parte de la varianza en propinas
2. **Patr√≥n cultural**: Los clientes dan ~18% de propina en promedio
3. **Sin diferencia temporal**: Hora pico y fines de semana no afectan significativamente
4. **Proporcionalidad**: Propinas son proporcionales al costo del viaje

---

### Tecnolog√≠as Utilizadas

| Tecnolog√≠a | Uso en el Proyecto |
|------------|-------------------|
| **PySpark** | Procesamiento distribuido de 10.9M registros |
| **MLlib** | Pipeline de ML escalable |
| **Pandas** | Conversi√≥n para visualizaciones |
| **Matplotlib/Seaborn** | Visualizaciones integradas |
| **Spark SQL** | Transformaciones y agregaciones |

---

### Limitaciones y Mejoras Futuras

**Limitaciones actuales:**
- Solo un mes de datos (Enero 2016)
- No incluye datos externos (clima, eventos)
- Modelo lineal puede no capturar relaciones complejas

**Mejoras propuestas:**
1. Incluir datos de m√∫ltiples meses para mayor generalizaci√≥n
2. Probar modelos no lineales (Random Forest, GBT)
3. Hyperparameter tuning con CrossValidator
4. Agregar caracter√≠sticas geogr√°ficas (zonas de pickup/dropoff)
5. Incorporar datos de clima y eventos especiales

---

### Valor para el Negocio

Este modelo permite:
- **Taxistas**: Estimar propinas potenciales por viaje
- **Plataformas**: Optimizar asignaci√≥n de conductores
- **An√°lisis**: Entender factores que influyen en satisfacci√≥n del cliente
- **Planificaci√≥n**: Identificar horarios/zonas m√°s rentables

## 11. Exportaci√≥n de Datos para Tableau

Exportamos los datos procesados y predicciones para crear dashboards interactivos en Tableau.

### Archivos Generados
1. **taxi_predictions.csv**: Muestra de predicciones para visualizar rendimiento del modelo
2. **taxi_insights.csv**: Datos agregados por hora/d√≠a para an√°lisis de patrones
3. **model_metrics.csv**: M√©tricas de evaluaci√≥n del modelo

In [None]:
predictions_sample = best_predictions.select(
    'trip_distance', 'fare_amount', 'tip_amount', 'prediction',
    'passenger_count', 'pickup_hour', 'pickup_day',
    'trip_duration_minutes', 'is_weekend', 'is_rush_hour',
    'extra', 'tolls_amount'
).sample(False, 0.1, seed=42)  # 10% muestra para Tableau

# Convertir a Pandas y guardar como CSV
predictions_sample.toPandas().to_csv('taxi_predictions.csv', index=False)
print(f"Exportados {predictions_sample.count():,} registros para Tableau")

# 2. Exportar datos agregados por segmento
insights_export = df_features.groupBy('pickup_hour', 'pickup_day', 'is_weekend', 'is_rush_hour').agg(
    avg('tip_amount').alias('avg_tip'),
    avg('fare_amount').alias('avg_fare'),
    avg('trip_distance').alias('avg_distance'),
    count('*').alias('num_trips')
).toPandas()

insights_export.to_csv('taxi_insights.csv', index=False)
print(f"Exportados datos agregados para an√°lisis de patrones")

# 3. Exportar m√©tricas del modelo
model_metrics = pd.DataFrame({
    'Modelo': ['Regresi√≥n Lineal'],
    'RMSE': [rmse_lr],
    'MAE': [mae_lr],
    'R2': [r2_lr]
})
model_metrics.to_csv('model_metrics.csv', index=False)
print("Exportadas m√©tricas del modelo")

Exportados 143,084 registros para Tableau
Exportados datos agregados para an√°lisis de patrones
Exportadas m√©tricas del modelo
