In [12]:
#--------------- Librerias ----------------------------------------------
#Librerias tipicas para el análisis de datos
import os
import warnings
import numpy as np
import time
from datetime import datetime
warnings.filterwarnings('ignore')

#Librerias para la implementación de pyspark
os.environ["HADOOP_HOME"] = "C:\\hadoop"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

#Libreraias para la implementación de keras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Layer, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
#----------------------------------------------------------------------------

In [13]:
#-----------------------SparkSession-----------------------------------------
spark = SparkSession.builder \
    .appName("NYC_Taxi_Spark_DeepLearning") \
    .master("local[8]") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.maxResultSize", "8g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.python.worker.timeout", "600s") \
    .config("spark.sql.shuffle.partitions", "16") \
    .config("spark.default.parallelism", "16") \
    .config("spark.rdd.compress", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print(" Sessión de Spark creada")
print("  Network timeout: 800s")
print("  Worker timeout: 600s")

 Sessión de Spark creada
  Network timeout: 800s
  Worker timeout: 600s


In [14]:
#----------------------Cargar datos-----------------------------------------------
DATA_PATH = "C:/Users/PC/Documents/DocumentosGustavo/Github/Maestria/BigData/nyc-taxi-spark/data/yellow/2024/yellow_tripdata_2024-01.parquet"

print("\n Cargando dataset")
df = spark.read.parquet(DATA_PATH)
print(f" Numero de registros: {df.count():,}")
print(f" Columnas: {len(df.columns)}")


 Cargando dataset
 Numero de registros: 2,964,624
 Columnas: 19


In [15]:
print("\nEsquema del dataset:")
df.printSchema()


Esquema del dataset:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [16]:
print("Primeros 5 registros")
df.show(5)

Primeros 5 registros
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  

Para la implementación de esta tarea se elegiran las variables: 
- trip_distance
- passenger_count
- tpep_pickup_datetime
- fare_amount

In [None]:
#----------------------Feature Engineering----------------------------------------
def extract_and_scale_features(row):
    trip_distance, passenger_count, datetime, fare_amount = row
    
    if (trip_distance is None or trip_distance <= 0 or trip_distance >= 100 or
        passenger_count is None or passenger_count <= 0 or passenger_count > 6 or
        datetime is None or
        fare_amount is None or fare_amount <= 0 or fare_amount >= 200):
        return None
    """
    trip_distance <= 0: Viajes inválidos (errores de sensor).
    trip_distance >= 100: Outliers extremos (probablemente errores).
    passenger_count <= 0 or > 6: NYC taxis tienen máximo 5 pasajeros + 1 niño.
    fare_amount <= 0 or >= 200: Errores de medición o fraudes.

    """
    hour_value = float(datetime.hour) #hora del día: 0-23
    day_of_week = float(datetime.weekday() + 1) #dia de la semana: L-D
    
    features = [
        float((trip_distance - 3.0) / 5.0),
        float((passenger_count - 1.5) / 1.0),
        float((hour_value - 12.0) / 7.0),
        float((day_of_week - 4.0) / 2.0)
    ]
    """
    Normalización:  z = (x - μ) / σ, esto por que 
    Convergencia: Gradientes similares en todas las features.
    Velocidad: Adam converge más rápido con datos escalados.
    Estabilidad numérica: Evita overflow/underflow.
    ReLU: Funciona mejor con datos centrados en 0.

    """
    return (features, float(fare_amount)) #Estructura: Tupla (lista_features, label). Compatible con RDD map-reduce y Keras.

print("\nProcesando features")
start = time.time()

rdd_features = df.select( #convierte a rdd
    "trip_distance", "passenger_count", "tpep_pickup_datetime", "fare_amount"
).rdd.map(lambda row: (
    row.trip_distance, row.passenger_count, row.tpep_pickup_datetime, row.fare_amount # Transformación 1-a-1. Ejecución: Lazy (no se ejecuta hasta una acción).
)) #Variables que se eligieron

rdd_scaled = rdd_features \
    .map(extract_and_scale_features) \
    .filter(lambda x: x is not None) \
    .repartition(16) \
    .cache()
# lambda x:x is not None:  Elimina registros None (inválidos)
# .repartition(16): Redistribuir datos en 16 particiones balanceadas
total_scaled = rdd_scaled.count()
print(f"Registros despues de la limpieza: {total_scaled:,} registros. \n Completado en {time.time()-start:.1f}s.")


Procesando features
Registros despues de la limpieza: 2,722,784 registros. 
 Completado en 58.7s.


In [17]:
#----------------------División Train/Test----------------------------------------
train_rdd, test_rdd = rdd_scaled.randomSplit([0.8, 0.2]) #80% entrenamiento y 20% prueba aleatorio 

from pyspark import StorageLevel
train_rdd = train_rdd.repartition(16).persist(StorageLevel.MEMORY_AND_DISK)
test_rdd = test_rdd.repartition(8).persist(StorageLevel.MEMORY_AND_DISK)
#StorageLevel.MEMORY_AND_DISK: primer intenta usar memoria, sino cabe utiliza disco

train_count = train_rdd.count()
test_count = test_rdd.count()

print(f"\n Train: {train_count:,} registros")
print(f" Test: {test_count:,} registros")


 Train: 2,178,814 registros
 Test: 543,970 registros


Para este trabajo, utilizaremos 
$$ E_{ck} = \frac{1}{N}\sum_{k=1}^{K} \sum_{x_{i} \in C_{k}} \mu_{kp} \lvert x_{n} - c_{k} \rvert^{2}$$
La arquitectura a trabajar sera la siguiente: 
- Input (batch, 4 variables)
- CompetitiveLearningLayer: $\lvert x_{n} - c_{k} \rvert^{2}$ que son las distancias. Despues $\mu_{kp} = softmax(-distancia)$ la cual suavisa la competición (batch, 64 neuronas)
- Capa oculta 1: 32 neuronas con activacion relu
- Capa oculta 2: 16 neuronas con activacion relu
- Capa oculta 3:  8 neuronas con activacion relu
- Capa salida  :  1 neurona con activacion lineal.
- Optimizador: Adam
- Función de costo: MSE
- Metrica adicional: MAE

In [None]:
#----------------------Capa de Aprendizaje Competitivo--------------------
class CompetitiveLearningLayer(Layer):
    """
    Implementa la función de costo competitivo:
        E_ck = (1/N) * sum_k sum_{x_i in C_k} mu_kp * ||x_n - c_k||^2

    donde:
      - c_k  : centros de cluster 
      - mu_kp: se modela mediante soft-competition 
               mu_kp = softmax(-||x_n - c_k||^2)
               Neurona ganadora -> cercana a 1
               Neuronas perdedoras ->  cercana a 0
    Competicion relajada
    Flujo de información:
      input (batch, 4) --> distancias (batch, K) --> membresías (batch, K)

    Los gradientes del MSE final fluyen hacia atrás actualizando tanto
    los centros c_k como los pesos de las capas Dense posteriores.
    """
    def __init__(self, n_clusters, **kwargs):
        super(CompetitiveLearningLayer, self).__init__(**kwargs)
        self.n_clusters = n_clusters

    def build(self, input_shape):
        n_features = int(input_shape[-1])
        # Centros de cluster c_k: forma (K, n_features)
        # Inicialización glorot para estabilidad del gradiente
        self.cluster_centers = self.add_weight(
            name='cluster_centers',
            shape=(self.n_clusters, n_features),
            initializer='glorot_uniform',
            trainable=True
        )
        super(CompetitiveLearningLayer, self).build(input_shape)

    def call(self, inputs):
        # inputs: (batch, n_features)

        # Expandir dimensiones para broadcast
        # x_expanded: (batch, 1, n_features)
        # c_expanded: (1, K, n_features)
        x_expanded = tf.expand_dims(inputs, axis=1)
        c_expanded = tf.expand_dims(self.cluster_centers, axis=0)

        # Distancias cuadradas ||x_n - c_k||^2: (batch, K)
        distances_sq = tf.reduce_sum(tf.square(x_expanded - c_expanded), axis=-1)

        # Membresías mu_kp = softmax(-distancias)
        # La neurona mas cercana obtiene la mayor membresía -> competición suave
        # El signo negativo invierte: menor distancia = mayor activación
        memberships = tf.nn.softmax(-distances_sq, axis=-1)  # (batch, K)

        return memberships

    def get_config(self):
        config = super().get_config()
        config.update({'n_clusters': self.n_clusters})
        return config


#----------------------Tasa de Aprendizaje Variable-----------------------
class LinearDecayLR(tf.keras.optimizers.schedules.LearningRateSchedule):
    """
    Implementa: η(t) = η₀(1 - t/T)
    
    donde:
        η₀: tasa de aprendizaje inicial
        t:  paso/batch actual (0, 1, 2, ..., T-1)
        T:  número total de pasos (epochs × batches_per_epoch)
    
    La tasa decrece linealmente desde η₀ hasta ~0 a medida que
    el entrenamiento avanza de t=0 a t=T.
    """
    def __init__(self, initial_learning_rate, total_steps):
        super(LinearDecayLR, self).__init__()
        self.initial_learning_rate = initial_learning_rate
        self.total_steps = total_steps
    
    def __call__(self, step):
        # η(t) = η₀(1 - t/T)
        step = tf.cast(step, tf.float32)
        total_steps = tf.cast(self.total_steps, tf.float32)
        decay_factor = 1.0 - (step / total_steps)
        # Evitar que la tasa llegue exactamente a 0 (min = 1e-7)
        decay_factor = tf.maximum(decay_factor, 1e-7 / self.initial_learning_rate)
        return self.initial_learning_rate * decay_factor
    
    def get_config(self):
        return {
            'initial_learning_rate': self.initial_learning_rate,
            'total_steps': self.total_steps
        }


#----------------------Modelo con Aprendizaje Competitivo-----------------
def create_model(epochs=15, batches_per_epoch=400):
    """

    La primera capa Dense(64, relu) es reemplazada por CompetitiveLearningLayer(64)
    que implementa competición suave entre K=64 centros de cluster.
    El resto de la arquitectura y funciones de costo se conservan
    de la tarea de big data y deep learning.
    
    Tasa de aprendizaje variable con decaimiento lineal
        η(t) = η₀(1 - t/T)
        donde η₀=0.001, T=epochs×batches_per_epoch
    """
    # Configuración del learning rate con decaimiento lineal
    initial_lr = 0.001  # η₀
    total_steps = epochs * batches_per_epoch  # T
    
    lr_schedule = LinearDecayLR(
        initial_learning_rate=initial_lr,
        total_steps=total_steps
    )
    
    model = Sequential([
        # --- Capa 1: Aprendizaje Competitivo con Promedio (K=64 clusters) ---
        # Parámetros entrenables: 64 * 4 = 256 (centros c_k)
        # Salida: vector de 64 membresías mu_kp en [0, 1] con suma = 1
        CompetitiveLearningLayer(64, input_shape=(4,)),
        BatchNormalization(),
        Dropout(0.2),

        # --- Capas de regresión (conservadas del modelo original) ---
        Dense(32, activation='relu'),
        BatchNormalization(),
        Dropout(0.2),
        Dense(16, activation='relu'),
        Dense(8, activation='relu'),
        Dense(1, activation='linear')
    ])

    # Funciones de costo conservadas: MSE + MAE
    # Optimizador Adam CON tasa de aprendizaje variable
    model.compile(
        optimizer=Adam(learning_rate=lr_schedule),
        loss='mse',
        metrics=['mae']
    )
    
    print(f"\n Configuración de Learning Rate:")
    print(f"   Fórmula: η(t) = η₀(1 - t/T)")
    print(f"   η₀ (inicial): {initial_lr}")
    print(f"   T (total steps): {total_steps} = {epochs} épocas × {batches_per_epoch} batches")
    print(f"   Decaimiento: lineal de {initial_lr} → ~0")
    
    return model

# Crear modelo con parámetros de entrenamiento
EPOCHS = 15
BATCHES_PER_EPOCH = 400
model = create_model(epochs=EPOCHS, batches_per_epoch=BATCHES_PER_EPOCH)

print("\n Modelo con Aprendizaje Competitivo y LR Variable creado")
model.summary()


 Configuración de Learning Rate:
   Fórmula: η(t) = η₀(1 - t/T)
   η₀ (inicial): 0.001
   T (total steps): 6000 = 15 épocas × 400 batches
   Decaimiento: lineal de 0.001 → ~0

 Modelo con Aprendizaje Competitivo y LR Variable creado


In [19]:
#----------------------Generador de batches--------------------------------
class RobustRDDBatchGenerator: #Generador que usa toLocalIterator().
    
    def __init__(self, rdd, batch_size=4096, num_batches_per_epoch=None):
        self.rdd = rdd
        self.batch_size = batch_size
        self.total_samples = rdd.count()
        
        if num_batches_per_epoch:
            self.num_batches = num_batches_per_epoch
        else:
            self.num_batches = max(1, self.total_samples // batch_size)
    
    def generate_batches(self, seed=42):
        """
        Genera batches usando toLocalIterator.
        
        toLocalIterator:
        - Itera sobre RDD SIN collect masivo
        - No causa timeout
        - Procesa partición por partición
        - 100% RDD distribuido
        """
        # Sample del RDD
        fraction = min(1.0, (self.batch_size * self.num_batches) / self.total_samples)
        sampled_rdd = self.rdd.sample(False, fraction, seed=seed)
        
        # Usar toLocalIterator
        batch_data = []
        batch_count = 0
        
        
        for item in sampled_rdd.toLocalIterator():
            batch_data.append(item)
            
            # Cuando el batch está lleno, yield
            if len(batch_data) >= self.batch_size:
                X_batch = np.array([x[0] for x in batch_data], dtype=np.float32)
                y_batch = np.array([x[1] for x in batch_data], dtype=np.float32)
                
                yield X_batch, y_batch
                
                batch_data = []
                batch_count += 1
                
                # Limitar número de batches
                if batch_count >= self.num_batches:
                    break
        
        # Último batch parcial
        if batch_data and batch_count < self.num_batches:
            X_batch = np.array([x[0] for x in batch_data], dtype=np.float32)
            y_batch = np.array([x[1] for x in batch_data], dtype=np.float32)
            yield X_batch, y_batch

# Configuración
BATCH_SIZE = 4096  
BATCHES_PER_EPOCH_TRAIN = 400
BATCHES_PER_EPOCH_VAL = 20

train_generator = RobustRDDBatchGenerator(
    train_rdd, 
    batch_size=BATCH_SIZE,
    num_batches_per_epoch=BATCHES_PER_EPOCH_TRAIN
)

test_generator = RobustRDDBatchGenerator(
    test_rdd,
    batch_size=BATCH_SIZE,
    num_batches_per_epoch=BATCHES_PER_EPOCH_VAL
)
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Batches/época train: {BATCHES_PER_EPOCH_TRAIN}")
print(f"  Samples/época: {BATCH_SIZE * BATCHES_PER_EPOCH_TRAIN:,}")

  Batch size: 4096
  Batches/época train: 400
  Samples/época: 1,638,400


In [20]:
#----------------------Entrenamiento----------------------------------------------
print("Entrenamiento.")

EPOCHS = 15

print(f"\n   Configuración:")
print(f"   Épocas: {EPOCHS}")
print(f"   Batches/época: {BATCHES_PER_EPOCH_TRAIN}")
print(f"   Batch size: {BATCH_SIZE}")

history = {'loss': [], 'mae': [], 'val_loss': [], 'val_mae': []}

print("\n  Iniciando el entrenamiento\n")
start_time = time.time()

for epoch in range(EPOCHS):
    epoch_start = time.time()
    print(f"\nÉpoca {epoch+1}/{EPOCHS}")
    print("-" * 60)
    
    epoch_losses = []
    epoch_maes = []
    
    # Entrenar
    batch_count = 0
    try:
        for X_batch, y_batch in train_generator.generate_batches(seed=epoch):
            metrics = model.train_on_batch(X_batch, y_batch, return_dict=True)
            epoch_losses.append(metrics['loss'])
            epoch_maes.append(metrics['mae'])
            batch_count += 1
            
            if batch_count % 100 == 0:
                print(f"  Batch {batch_count}/{BATCHES_PER_EPOCH_TRAIN} - "
                      f"loss: {np.mean(epoch_losses[-20:]):.4f}")
    except Exception as e:
        print(f"   Error en batch {batch_count}: {e}")
        print(f"  Siguiente epoca")
        continue
    
    if not epoch_losses:
        print("  No se completaron batches, saltando época")
        continue
    
    train_loss = np.mean(epoch_losses)
    train_mae = np.mean(epoch_maes)
    
    # Validación
    val_losses = []
    val_maes = []
    for X_val, y_val in test_generator.generate_batches(seed=epoch):
        val_metrics = model.test_on_batch(X_val, y_val, return_dict=True)
        val_losses.append(val_metrics['loss'])
        val_maes.append(val_metrics['mae'])
    
    val_loss = np.mean(val_losses) if val_losses else train_loss
    val_mae = np.mean(val_maes) if val_maes else train_mae
    
    history['loss'].append(train_loss)
    history['mae'].append(train_mae)
    history['val_loss'].append(val_loss)
    history['val_mae'].append(val_mae)
    
    epoch_time = time.time() - epoch_start
    print(f"\n    Época {epoch+1}:")
    print(f"     loss: {train_loss:.4f} - mae: {train_mae:.4f}")
    print(f"     val_loss: {val_loss:.4f} - val_mae: {val_mae:.4f}")
    print(f"     Tiempo: {epoch_time:.1f}s")
    
    # Early stopping
    if epoch > 3 and val_loss > history['val_loss'][-2]:
        patience = getattr(model, 'patience', 0) + 1
        model.patience = patience
        if patience >= 3:
            print(f"\n   Early stopping")
            break
    else:
        model.patience = 0

training_time = time.time() - start_time

print("\n" + "="*80)
print("✓ Entrenamiento completo")
print("="*80)
print(f"  Tiempo: {training_time/60:.2f} minutos")
print(f"  Épocas: {len(history['loss'])}")
print(f"  Mejor val_loss: {min(history['val_loss']):.4f}")

Entrenamiento.

   Configuración:
   Épocas: 15
   Batches/época: 400
   Batch size: 4096

  Iniciando el entrenamiento


Época 1/15
------------------------------------------------------------
  Batch 100/400 - loss: 354.8284
  Batch 200/400 - loss: 200.0264
  Batch 300/400 - loss: 145.2214
  Batch 400/400 - loss: 117.4939

    Época 1:
     loss: 252.5377 - mae: 10.2105
     val_loss: 116.9337 - val_mae: 6.3361
     Tiempo: 49.6s

Época 2/15
------------------------------------------------------------
  Batch 100/400 - loss: 103.5262
  Batch 200/400 - loss: 92.3970
  Batch 300/400 - loss: 84.1779
  Batch 400/400 - loss: 77.8937

    Época 2:
     loss: 93.4163 - mae: 5.4792
     val_loss: 76.7775 - val_mae: 4.8446
     Tiempo: 47.3s

Época 3/15
------------------------------------------------------------
  Batch 100/400 - loss: 72.0168
  Batch 200/400 - loss: 68.2198
  Batch 300/400 - loss: 65.0250
  Batch 400/400 - loss: 62.2472

    Época 3:
     loss: 68.2709 - mae: 4.5063
     va

In [21]:
#----------------------Evaluación-------------------------------------------------
print("\nEvaluando modelo.")

eval_generator = RobustRDDBatchGenerator(
    test_rdd,
    batch_size=4096,
    num_batches_per_epoch=100
)

all_predictions = []
all_actuals = []

for X_test, y_test_batch in eval_generator.generate_batches(seed=99):
    y_pred = model.predict(X_test, verbose=0)
    all_predictions.extend(y_pred.flatten().tolist())
    all_actuals.extend(y_test_batch.tolist())

y_test_eval = np.array(all_actuals)
y_pred_eval = np.array(all_predictions)

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

mse = mean_squared_error(y_test_eval, y_pred_eval)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test_eval, y_pred_eval)
r2 = r2_score(y_test_eval, y_pred_eval)
mape = np.mean(np.abs((y_test_eval - y_pred_eval) / y_test_eval)) * 100

print("Resultados")
print(f"\n  R²:   {r2:.4f} ({r2*100:.1f}%)")
print(f"  RMSE: ${rmse:.4f}")
print(f"  MAE:  ${mae:.4f}")
print(f"  MAPE: {mape:.2f}%")
print(f"\n  Evaluado en {len(y_test_eval):,} predicciones")


Evaluando modelo.
Resultados

  R²:   0.9003 (90.0%)
  RMSE: $5.2321
  MAE:  $2.4113
  MAPE: 16.28%

  Evaluado en 409,193 predicciones
