In [1]:
#--------------- Librerías -------------------------------------------------------
import os
import warnings
import numpy as np
import time
from datetime import datetime
warnings.filterwarnings('ignore')

# PySpark
os.environ["HADOOP_HOME"] = "C:\\hadoop"
from pyspark.sql import SparkSession
from pyspark.sql import functions as F   # <-- todas las funciones SQL de Spark
from pyspark.sql.types import DoubleType

# Keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
#--------------------------------------------------------------------------------

In [2]:
#-----------------------SparkSession---------------------------------------------
# Misma configuración que antes
# se añadio spark.sql.adaptive.enabled = true para que Catalyst optimice aún más. 
# Ajusta el plan de ejecución en runtime según estadísticas reales de los datos.
spark = SparkSession.builder \
    .appName("NYC_Taxi_DataFrame_DeepLearning") \
    .master("local[8]") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.maxResultSize", "8g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.python.worker.timeout", "600s") \
    .config("spark.sql.shuffle.partitions", "16") \
    .config("spark.default.parallelism", "16") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print(" Sesión de Spark creada")
print("  Network timeout: 800s")

 Sesión de Spark creada
  Network timeout: 800s


In [3]:
#----------------------Cargar datos----------------------------------------------
# Igual que antes — spark.read.parquet devuelve un DataFrame directamente.
DATA_PATH = "C:/Users/PC/Documents/DocumentosGustavo/Github/Maestria/BigData/nyc-taxi-spark/data/yellow/2024/yellow_tripdata_2024-01.parquet"

print("\n Cargando dataset")
df = spark.read.parquet(DATA_PATH)
print(f" Numero de registros: {df.count():,}")
print(f" Columnas: {len(df.columns)}")


 Cargando dataset
 Numero de registros: 2,964,624
 Columnas: 19


In [5]:
print("\nEsquema del dataset:")
df.printSchema()


Esquema del dataset:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [6]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

**Antes (RDD):** se usaba una función Python `extract_and_scale_features` aplicada con `.map()`.  
Esto serializa cada fila Python ↔ JVM (Java Virtual Machine), lo cual es costoso.

**Ahora (DataFrame):** todo el filtrado y la creación de columnas se hace con  
`pyspark.sql.functions`, que ejecuta directamente en la JVM de Spark sin cruzar a Python.

| RDD | DataFrame |
|-----|----------|
| `lambda row: row.trip_distance` | `F.col("trip_distance")` |
| `if x is None or x <= 0` | `.filter(F.col(...).isNotNull() & ...)` |
| `float(datetime.hour)` | `F.hour(F.col("tpep_pickup_datetime"))` |
| `(x - μ) / σ` manual | expresión algebraica sobre columnas |
| `.filter(lambda x: x is not None)` | `.dropna()` o `.filter(...)` |

In [7]:
#----------------------Feature Engineering con DataFrame--------------------------
print("\nProcesando features con DataFrame API")
start = time.time()

#Seleccionar columnas de interés
df_selected = df.select(
    "trip_distance",
    "passenger_count",
    "tpep_pickup_datetime",
    "fare_amount"
)

#Filtrado (equivalente al if/return None del RDD)
#Mismas reglas de negocio que con rdds: distancia, pasajeros y tarifa válidos.
df_filtered = df_selected.filter(
    F.col("trip_distance").isNotNull() &
    F.col("passenger_count").isNotNull() &
    F.col("tpep_pickup_datetime").isNotNull() &
    F.col("fare_amount").isNotNull() &
    (F.col("trip_distance") > 0)  & (F.col("trip_distance") < 100) &
    (F.col("passenger_count") > 0) & (F.col("passenger_count") <= 6) &
    (F.col("fare_amount") > 0)    & (F.col("fare_amount") < 200)
)

#Extraer hora y día de la semana directamente con funciones Spark
#F.hour() y F.dayofweek() corren en la JVM, sin llamar a Python por fila.
#Nota: F.dayofweek() devuelve 1=Domingo ... 7=Sábado (convención Spark/SQL)
#Para mantener la misma convención que antes (1=Lunes) ajustamos con ((dow+5)%7)+1
df_with_time = df_filtered.withColumn(
    "hour_value", F.hour(F.col("tpep_pickup_datetime")).cast(DoubleType())
).withColumn(
    "day_of_week",
    (((F.dayofweek(F.col("tpep_pickup_datetime")) + 5) % 7) + 1).cast(DoubleType())
)

#Normalización z-score con los mismos parámetros que el RDD original
#trip_distance:   (x - 3.0) / 5.0
#passenger_count: (x - 1.5) / 1.0
#hour_value:      (x - 12.0) / 7.0
#day_of_week:     (x - 4.0) / 2.0
df_scaled = df_with_time.select(
    ((F.col("trip_distance")   - 3.0)  / 5.0).alias("feat_distance"),
    ((F.col("passenger_count") - 1.5)  / 1.0).alias("feat_passengers"),
    ((F.col("hour_value")      - 12.0) / 7.0).alias("feat_hour"),
    ((F.col("day_of_week")     - 4.0)  / 2.0).alias("feat_dow"),
    F.col("fare_amount").cast(DoubleType()).alias("label")
)

#Cachear el DataFrame procesado (equivalente a .cache() en RDD)
df_scaled = df_scaled.repartition(16).cache()

total_scaled = df_scaled.count()  # acción que materializa el cache
print(f"Registros después de la limpieza: {total_scaled:,} registros.")
print(f" Completado en {time.time()-start:.1f}s.")


Procesando features con DataFrame API
Registros después de la limpieza: 2,722,784 registros.
 Completado en 4.4s.


In [8]:
# registros limpios con dataframe 
df_scaled.show(5)

+--------------------+---------------+--------------------+--------+-----+
|       feat_distance|feat_passengers|           feat_hour|feat_dow|label|
+--------------------+---------------+--------------------+--------+-----+
|              -0.046|           -0.5|  0.8571428571428571|    -1.5| 19.1|
|              -0.274|            4.5| -0.5714285714285714|    -1.0| 10.7|
|-0.22999999999999998|           -0.5| 0.14285714285714285|     1.0| 11.4|
| 0.06200000000000001|           -0.5|  1.2857142857142858|    -0.5| 16.3|
|              -0.292|           -0.5|-0.14285714285714285|     1.0|  8.6|
+--------------------+---------------+--------------------+--------+-----+
only showing top 5 rows


In [10]:
#----------------------División Train/Test----------------------------------------
# randomSplit funciona igual en DataFrames que en RDDs.
train_df, test_df = df_scaled.randomSplit([0.8, 0.2]) #Se quita el seed para que la división sea diferente cada vez, mostrando la variabilidad del proceso.

# Persistir en memoria+disco (mismo comportamiento que StorageLevel.MEMORY_AND_DISK)
from pyspark import StorageLevel
train_df = train_df.repartition(16).persist(StorageLevel.MEMORY_AND_DISK)
test_df  = test_df.repartition(8).persist(StorageLevel.MEMORY_AND_DISK)

train_count = train_df.count()
test_count  = test_df.count()

print(f"\n Train: {train_count:,} registros")
print(f" Test:  {test_count:,} registros")


 Train: 2,177,235 registros
 Test:  545,549 registros


Arquitectura de la red neuronal: 
- capa oculta 1: 64 neuronas, función de activación relu
- capa oculta 2: 32 neuronas, función de activación relu
- capa oculta 3: 16 neuronas, función de activación relu
- capa oculta 4:  8 neuronas, función de activación relu
- capa de salida: 1 neurona , función de activación lineal

- Optimizador: Adam
- Función de costo: MSE
- Metricas adicionales: MAE

In [11]:
#----------------------Modelo-----------------------------------------------------
# La tasa de aprendizaje es 0.001
def create_model():
    model = Sequential([
        Dense(64, activation='relu', input_shape=(4,)),
        BatchNormalization(),
        Dropout(0.2),
        Dense(32, activation='relu'),
        BatchNormalization(),
        Dropout(0.2),
        Dense(16, activation='relu'),
        Dense(8,  activation='relu'),
        Dense(1,  activation='linear')
    ])
    model.compile(optimizer=Adam(0.001), loss='mse', metrics=['mae'])
    return model

model = create_model()
print("\n Modelo creado")
model.summary()


 Modelo creado


Generador de batches con dataframe: 

**Antes (RDD):** `toLocalIterator()` iteraba partición por partición, convirtiendo cada fila a Python.  
**Ahora (DataFrame):** se usa `df.sample().toPandas()` que:
1. El muestreo corre en la JVM.
2. La conversión a Pandas/NumPy se hace una sola vez por batch (más eficiente que fila a fila).
3. Permite usar directamente `numpy` arrays para Keras.

In [12]:
#----------------------Generador de batches con DataFrame-------------------------
class DataFrameBatchGenerator:
    """
    Genera batches para Keras desde un Spark DataFrame.
    
    Idea: df.sample() + toPandas() por batch.
    - sample() corre en JVM: distribuido y eficiente.
    - toPandas() trae solo el batch muestreado al driver.
    - Sin serialización fila a fila (a diferencia de toLocalIterator en RDD).
    """

    def __init__(self, df, batch_size=4096, num_batches_per_epoch=None,
                 feature_cols=None, label_col="label"):
        self.df = df
        self.batch_size = batch_size
        self.label_col = label_col
        self.feature_cols = feature_cols or [c for c in df.columns if c != label_col]
        self.total_samples = df.count()

        if num_batches_per_epoch:
            self.num_batches = num_batches_per_epoch
        else:
            self.num_batches = max(1, self.total_samples // batch_size)

    def generate_batches(self, seed=42):
        """
        Genera batches usando sample() + toPandas().
        
        Para obtener múltiples batches independientes se re-muestrea
        en cada llamada con una seed distinta, garantizando variedad
        sin necesidad de iterar sobre todo el DataFrame.
        """
        samples_needed = self.batch_size * self.num_batches
        fraction = min(1.0, samples_needed / self.total_samples)

        # sample() ejecuta en JVM — distribuido
        sampled_df = self.df.sample(withReplacement=False, fraction=fraction, seed=seed)

        # toPandas() trae los datos al driver en un solo batch de red
        pandas_df = sampled_df.select(self.feature_cols + [self.label_col]).toPandas()

        X_all = pandas_df[self.feature_cols].values.astype(np.float32)
        y_all = pandas_df[self.label_col].values.astype(np.float32)

        # Yield batch a batch desde el array en memoria
        for i in range(0, len(X_all), self.batch_size):
            X_batch = X_all[i : i + self.batch_size]
            y_batch = y_all[i : i + self.batch_size]
            if len(X_batch) == 0:
                break
            yield X_batch, y_batch


# Configuración — mismos hiperparámetros que la versión RDD
BATCH_SIZE             = 4096
BATCHES_PER_EPOCH_TRAIN = 400
BATCHES_PER_EPOCH_VAL   = 20

FEATURE_COLS = ["feat_distance", "feat_passengers", "feat_hour", "feat_dow"]

train_generator = DataFrameBatchGenerator(
    train_df,
    batch_size=BATCH_SIZE,
    num_batches_per_epoch=BATCHES_PER_EPOCH_TRAIN,
    feature_cols=FEATURE_COLS
)

test_generator = DataFrameBatchGenerator(
    test_df,
    batch_size=BATCH_SIZE,
    num_batches_per_epoch=BATCHES_PER_EPOCH_VAL,
    feature_cols=FEATURE_COLS
)

print(f"  Batch size: {BATCH_SIZE}")
print(f"  Batches/época train: {BATCHES_PER_EPOCH_TRAIN}")
print(f"  Samples/época: {BATCH_SIZE * BATCHES_PER_EPOCH_TRAIN:,}")

  Batch size: 4096
  Batches/época train: 400
  Samples/época: 1,638,400


In [13]:
#----------------------Entrenamiento----------------------------------------------
print("Entrenamiento.")

EPOCHS = 15

print(f"\n   Configuración:")
print(f"   Épocas: {EPOCHS}")
print(f"   Batches/época: {BATCHES_PER_EPOCH_TRAIN}")
print(f"   Batch size: {BATCH_SIZE}")

history = {'loss': [], 'mae': [], 'val_loss': [], 'val_mae': []}

print("\n  Iniciando el entrenamiento\n")
start_time = time.time()

for epoch in range(EPOCHS):
    epoch_start = time.time()
    print(f"\nÉpoca {epoch+1}/{EPOCHS}")
    print("-" * 60)

    epoch_losses, epoch_maes = [], []
    batch_count = 0

    try:
        for X_batch, y_batch in train_generator.generate_batches(seed=epoch):
            metrics = model.train_on_batch(X_batch, y_batch, return_dict=True)
            epoch_losses.append(metrics['loss'])
            epoch_maes.append(metrics['mae'])
            batch_count += 1

            if batch_count % 100 == 0:
                print(f"  Batch {batch_count}/{BATCHES_PER_EPOCH_TRAIN} - "
                      f"loss: {np.mean(epoch_losses[-20:]):.4f}")
    except Exception as e:
        print(f"   Error en batch {batch_count}: {e}")
        continue

    if not epoch_losses:
        print("  No se completaron batches, saltando época")
        continue

    train_loss = np.mean(epoch_losses)
    train_mae  = np.mean(epoch_maes)

    # Validación
    val_losses, val_maes = [], []
    for X_val, y_val in test_generator.generate_batches(seed=epoch):
        val_metrics = model.test_on_batch(X_val, y_val, return_dict=True)
        val_losses.append(val_metrics['loss'])
        val_maes.append(val_metrics['mae'])

    val_loss = np.mean(val_losses) if val_losses else train_loss
    val_mae  = np.mean(val_maes)   if val_maes  else train_mae

    history['loss'].append(train_loss)
    history['mae'].append(train_mae)
    history['val_loss'].append(val_loss)
    history['val_mae'].append(val_mae)

    epoch_time = time.time() - epoch_start
    print(f"\n    Época {epoch+1}:")
    print(f"     loss: {train_loss:.4f} - mae: {train_mae:.4f}")
    print(f"     val_loss: {val_loss:.4f} - val_mae: {val_mae:.4f}")
    print(f"     Tiempo: {epoch_time:.1f}s")

    # Early stopping
    if epoch > 3 and val_loss > history['val_loss'][-2]:
        patience = getattr(model, 'patience', 0) + 1
        model.patience = patience
        if patience >= 3:
            print(f"\n   Early stopping")
            break
    else:
        model.patience = 0

training_time = time.time() - start_time
print("\n" + "="*80)
print("  Entrenamiento completo")
print("="*80)
print(f"  Tiempo: {training_time/60:.2f} minutos")
print(f"  Épocas: {len(history['loss'])}")
print(f"  Mejor val_loss: {min(history['val_loss']):.4f}")

Entrenamiento.

   Configuración:
   Épocas: 15
   Batches/época: 400
   Batch size: 4096

  Iniciando el entrenamiento


Época 1/15
------------------------------------------------------------
  Batch 100/400 - loss: 448.2045
  Batch 200/400 - loss: 254.0927
  Batch 300/400 - loss: 179.6310
  Batch 400/400 - loss: 142.6634

    Época 1:
     loss: 303.2677 - mae: 11.5501
     val_loss: 137.1804 - val_mae: 6.8106
     Tiempo: 12.2s

Época 2/15
------------------------------------------------------------
  Batch 100/400 - loss: 117.0829
  Batch 200/400 - loss: 103.5555
  Batch 300/400 - loss: 93.6439
  Batch 400/400 - loss: 86.1382

    Época 2:
     loss: 104.8514 - mae: 5.6784
     val_loss: 84.7104 - val_mae: 4.9617
     Tiempo: 10.8s

Época 3/15
------------------------------------------------------------
  Batch 100/400 - loss: 79.0901
  Batch 200/400 - loss: 74.6411
  Batch 300/400 - loss: 70.8400
  Batch 400/400 - loss: 67.6158

    Época 3:
     loss: 74.6835 - mae: 4.5934
     

In [14]:
#----------------------Evaluación-------------------------------------------------
print("\nEvaluando modelo...")

eval_generator = DataFrameBatchGenerator(
    test_df,
    batch_size=4096,
    num_batches_per_epoch=100,
    feature_cols=FEATURE_COLS
)

all_predictions, all_actuals = [], []

for X_test, y_test_batch in eval_generator.generate_batches(seed=99):
    y_pred = model.predict(X_test, verbose=0)
    all_predictions.extend(y_pred.flatten().tolist())
    all_actuals.extend(y_test_batch.tolist())

y_test_eval = np.array(all_actuals)
y_pred_eval = np.array(all_predictions)

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

mse  = mean_squared_error(y_test_eval, y_pred_eval)
rmse = np.sqrt(mse)
mae  = mean_absolute_error(y_test_eval, y_pred_eval)
r2   = r2_score(y_test_eval, y_pred_eval)
mape = np.mean(np.abs((y_test_eval - y_pred_eval) / y_test_eval)) * 100

print("Resultados")
print(f"\n  R²:   {r2:.4f} ({r2*100:.1f}%)")
print(f"  RMSE: ${rmse:.4f}")
print(f"  MAE:  ${mae:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"\n  Evaluado en {len(y_test_eval):,} predicciones")


Evaluando modelo...
Resultados

  R²:   0.8911 (89.1%)
  RMSE: $5.4791
  MAE:  $2.5810
  MAPE: 17.72%

  Evaluado en 409,431 predicciones


In [15]:
#----------------------Guardar----------------------------------------------------
#import os
#os.makedirs("modelos", exist_ok=True)
#model_path = f"modelos/taxi_DataFrame_{datetime.now().strftime('%Y%m%d_%H%M%S')}.h5"
#model.save(model_path)
#print(f"\n✓ Modelo guardado: {model_path}")

Cambios: 

| Paso | Versión RDD | Versión DataFrame |
|------|-------------|------------------|
| Selección de columnas | `.select(...).rdd.map(lambda row: ...)` | `.select("col1", "col2", ...)` |
| Filtrado / limpieza | `lambda row: if ... return None` | `.filter(F.col(...).isNotNull() & ...)` |
| Extracción de hora | `datetime.hour` en Python | `F.hour(F.col("tpep_pickup_datetime"))` en JVM |
| Día de la semana | `datetime.weekday()` en Python | `F.dayofweek(...)` en JVM |
| Normalización | Operaciones Python por fila | Expresiones algebraicas sobre columnas (JVM) |
| Cache | `.cache()` sobre RDD | `.cache()` sobre DataFrame (mismo API) |
| Persistencia | `StorageLevel.MEMORY_AND_DISK` | `StorageLevel.MEMORY_AND_DISK` (igual) |
| Generador de batches | `toLocalIterator()` fila a fila | `sample() + toPandas()` por batch |
| Entrenamiento Keras | `train_on_batch()` igual | `train_on_batch()` igual |
