# Preprocessing pipeline

Procesamos los escenarios de test

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from preprocess import pipeline, meteo, depurador
import tensorflow as tf
import os

RESULT_PATH = "data/final_scenarios/answers_empty.csv"
MODEL_PATH = "models/modeloTransformer3.keras"
METEO_PATH = "dato/completo/open-meteo-40.53N3.56W602m-2.csv"
DIR = "data/final_scenarios/clean"

# Inicializamos la sesión de Spark
print("Configurando Spark Session...")
spark = SparkSession.builder \
    .appName("PrediccionTiempoEsperaAvionNotebook") \
    .config("spark.master", "local[*]") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.sql.warehouse.dir", f"file:///{os.path.abspath('spark-warehouse')}") \
    .config("spark.driver.extraJavaOptions", f"-Dderby.system.home={os.path.abspath('derby_metastore_db')}") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Cargamos el df de resultados
df_results = pd.read_csv(RESULT_PATH)
df_results

2025-05-07 23:22:49.905514: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-07 23:22:49.918253: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-05-07 23:22:49.933761: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-05-07 23:22:49.938177: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-05-07 23:22:49.949891: I tensorflow/core/platform/cpu_feature_guar

Configurando Spark Session...


25/05/07 23:22:52 WARN Utils: Your hostname, bryanSpace resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/07 23:22:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Unnamed: 0,scenario_name,icao,holding_point,runway,time_to_takeoff_s
0,scenario_001.parquet,34324e,K2,14L/32R,-9.0
1,scenario_002.parquet,4bb26b,K1,14L/32R,-5.0
2,scenario_003.parquet,3451d8,K3,14L/32R,-9.0
3,scenario_004.parquet,3420ca,K2,14L/32R,
4,scenario_005.parquet,344416,Z4,18R/36L,
...,...,...,...,...,...
188,scenario_189.parquet,3445d4,Z4,18R/36L,
189,scenario_190.parquet,4d251c,Z2,18R/36L,
190,scenario_191.parquet,34628d,Z1,18R/36L,
191,scenario_192.parquet,342107,Z4,18R/36L,


In [2]:
# Creamos el pipeline de preprocesado
pipeline = pipeline.Pipeline(spark_session=spark)

# Creamos el depurador
depurador = depurador.Depurador()

In [3]:
def add_meteo_data(spark_df, meteo_csv_path):
    """Carga datos meteorológicos y los une a un DataFrame Spark sin prefijos innecesarios."""
    print(f"Cargando datos meteorológicos desde: {meteo_csv_path}")

    # Leer CSV con Pandas y eliminar columnas duplicadas si las hay
    pdf_meteo = pd.read_csv(meteo_csv_path, skiprows=2)
    pdf_meteo = pdf_meteo.loc[:, ~pdf_meteo.columns.duplicated()]
    
    df_meteo = spark.createDataFrame(pdf_meteo)

    # Convertir columna 'time' en timestamp
    if 'time' in df_meteo.columns:
        df_meteo = df_meteo.withColumn("time_hour_meteo", to_timestamp(col("time")))
        df_meteo = df_meteo.drop("time")
    else:
        print("Error: Columna 'time' no encontrada en DataFrame meteo.")
        return spark_df

    if 'timestamp' in spark_df.columns:
        spark_df = spark_df.withColumn("time_hour", date_trunc("hour", col("timestamp")))
    else:
        print("Error: Columna 'timestamp' no encontrada en DataFrame principal.")
        return spark_df

    # --- ELIMINAR columnas duplicadas antes del join ---
    meteo_cols = set(df_meteo.columns) - {"time_hour_meteo"}
    spark_cols = set(spark_df.columns)
    duplicated_cols = list(meteo_cols & spark_cols)
    if duplicated_cols:
        print(f"Eliminando columnas duplicadas del df principal antes del join: {duplicated_cols}")
        spark_df = spark_df.drop(*duplicated_cols)

    # Realizar el join
    print("Uniendo datos principales con datos meteorológicos (join)...")
    joined_df = spark_df.join(df_meteo, spark_df["time_hour"] == df_meteo["time_hour_meteo"], how="inner")

    # Eliminar columnas auxiliares
    joined_df = joined_df.drop("time_hour", "time_hour_meteo")
    print("Unión completada sin prefijos.")
    return joined_df


In [4]:
import os
# Importa PipelineModel, no Pipeline, para cargar un modelo ajustado
from pyspark.ml import PipelineModel
from pyspark.sql import functions as F
from sklearn.preprocessing import MinMaxScaler as SklearnMinMaxScaler # Importar Sklearn MinMaxScaler
from pyspark.sql.functions import sin, cos, pi, col, lit, when, lower, to_timestamp, date_trunc, row_number, monotonically_increasing_id, log1p

# --- Define la ruta donde guardaste tu pipeline Spark ML ajustado ---
# Basado en tu salida, parece que esta ruta es correcta ahora.
FITTED_SPARK_PIPELINE_PATH = "./models/completo/fitted_spark_ml_pipeline_model3/"

print(f"Intentando cargar Pipeline Spark ML ajustado desde: {FITTED_SPARK_PIPELINE_PATH}")

try:
    # Usa PipelineModel.load() para cargar un modelo ajustado
    fitted_spark_pipeline = PipelineModel.load(FITTED_SPARK_PIPELINE_PATH)
    print("Pipeline Spark ML ajustado cargado exitosamente.")
except Exception as e:
    print(f"Error al cargar el pipeline Spark ML ajustado desde '{FITTED_SPARK_PIPELINE_PATH}': {e}")
    print("Verifica que la ruta sea correcta y que el modelo se haya guardado usando pipeline_model.save().")
    fitted_spark_pipeline = None # Asegurarse de que la variable esté definida aunque falle la carga


# --- Función para crear secuencias NumPy ---
def create_sequences_np(data, sequence_length):
    X = []
    # Aseguramos que haya suficientes datos para al menos una secuencia
    if len(data) < sequence_length:
        return np.array([]).reshape(0, sequence_length, data.shape[-1]) # Devuelve array vacío con forma correcta

    for i in range(len(data) - sequence_length + 1):
        X.append(data[i:(i + sequence_length)])
    return np.array(X)

# --- Mapeo de weekday (de tu celda de ingeniería de características) ---
weekday_mapping = {
    "mon": 0, "tue": 1, "wed": 2, "thu": 3, "fri": 4, "sat": 5, "sun": 6
}
mapping_expr = None
for day_str, day_num in weekday_mapping.items():
    if mapping_expr is None:
        mapping_expr = when(lower(col("weekday")) == day_str, day_num)
    else:
        mapping_expr = mapping_expr.when(lower(col("weekday")) == day_str, day_num)
mapping_expr = mapping_expr.otherwise(None)


Intentando cargar Pipeline Spark ML ajustado desde: ./models/completo/fitted_spark_ml_pipeline_model3/
Pipeline Spark ML ajustado cargado exitosamente.


In [5]:
from tensorflow.keras.models import load_model
from tensorflow.keras.saving import register_keras_serializable
import tensorflow as tf

@register_keras_serializable()
class PositionalEncoding(tf.keras.layers.Layer):
    def call(self, x):
        seq_len = tf.shape(x)[1]
        embedding_dim = tf.shape(x)[2]
        pos = tf.range(seq_len, dtype=tf.float32)[:, tf.newaxis]
        i = tf.range(embedding_dim, dtype=tf.float32)[tf.newaxis, :]
        angle_rates = 1 / tf.pow(10000., (2 * (i // 2)) / tf.cast(embedding_dim, tf.float32))
        angle_rads = pos * angle_rates
        sines = tf.sin(angle_rads[:, 0::2])
        cosines = tf.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[:, :embedding_dim]
        return x + pos_encoding[tf.newaxis, :, :]

# Load the model with custom objects
custom_objects = {'PositionalEncoding': PositionalEncoding}
model = load_model(MODEL_PATH, custom_objects=custom_objects)
print(f"Modelo Keras cargado desde: {MODEL_PATH}")
print(f"Forma de entrada esperada por el modelo Keras: {model.input_shape}")

# Extraer el número de características esperado por el modelo Keras
# model.input_shape es (None, TIMESTEPS, FEATURES_ESPERADAS)
EXPECTED_NUM_FEATURES = model.input_shape[-1]


I0000 00:00:1746653008.686938   14729 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1746653008.727230   14729 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1746653008.727334   14729 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1746653008.731924   14729 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:00:1746653008.733164   14729 cuda_executor.cc:1001] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
I0000 00:0

Modelo Keras cargado desde: models/modeloTransformer3.keras
Forma de entrada esperada por el modelo Keras: (None, 10, 340)


In [6]:
import numpy as np
import joblib
import os
from pyspark.sql.functions import col, log1p, sin, lit, cos, pi, monotonically_increasing_id
# Asegúrate de que 'spark', 'pipeline', 'add_meteo_data', 'METEO_PATH',
# 'mapping_expr', 'fitted_spark_pipeline', 'create_sequences_np', 'model',
# 'EXPECTED_NUM_FEATURES', 'df_results', 'RESULT_PATH', 'DIR' están definidos e importados.
# Ejemplo: from despegues.src.preprocess.pipeline import Pipeline (o como se llame tu clase)
#          pipeline = Pipeline()


# --- Carga el y_scaler ajustado durante el entrenamiento ---
try:
    y_scaler_trained = joblib.load('models/completo/y_scaler_trained_FINAL.gz')
    print("y_scaler del entrenamiento cargado exitosamente.")
except FileNotFoundError:
    print("ERROR: No se encontró el archivo 'y_scaler_trained_FINAL.gz'.")
    print("Continuando sin y_scaler, las predicciones no se podrán desescalar correctamente.")
    y_scaler_trained = None
    
TARGET_COL = "log_takeoff_time" # Columna objetivo que el modelo predice (transformada)
RAW_TARGET_COL_NAME_IN_PIPELINE = "takeoff time" # Nombre esperado por cleanColumns (con espacio)
RAW_TARGET_COL_NAME_SPARK = "takeoff_time"       # Nombre después de cleanColumns (estándar Spark)

SEQUENCE_LENGTH = 10
print(f"La columna objetivo (transformada) a predecir es: {TARGET_COL}")
print(f"El pipeline interno espera calcular y luego limpiar '{RAW_TARGET_COL_NAME_IN_PIPELINE}' a '{RAW_TARGET_COL_NAME_SPARK}'.")

# Iterar sobre las filas de df_results para asegurar consistencia
# Asumimos que df_results es un Pandas DataFrame con una columna 'scenario_name'
# y que su índice es el que queremos usar para actualizar.
for idx in range(3):
    # Usamos el índice numérico para iloc para obtener el nombre del escenario,
    # pero el índice real de la fila para .loc para actualizar.
    # Esto asume que el índice de df_results es el estándar 0, 1, 2...
    # Si df_results tiene un índice personalizado, ajústalo o usa df_results.index[idx]
    current_df_results_index = df_results.index[idx]
    scenario_name_from_df = df_results.loc[current_df_results_index, 'scenario_name']
    
    file_name = scenario_name_from_df # Asume que scenario_name es el nombre del archivo .parquet
    file_path = os.path.join(DIR, file_name)
    
    print(f"\n[{idx + 1}/{len(df_results)}] Procesando escenario: {file_name} (índice df_results: {current_df_results_index})")
    df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -99 # Valor por defecto para indicar no procesado / error

    if not os.path.exists(file_path):
        print(f"  ERROR CRÍTICO: Archivo no encontrado: {file_path}")
        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -1 # Código de error: Archivo no encontrado
        continue

    # Cargar datos brutos del escenario (Spark DataFrame)
    df0_spark = spark.read.parquet(file_path)
    print(f"  --- Esquema de df0_spark para {file_name} (ANTES de apply_pipeline) ---")
    df0_spark.printSchema(level=1)

    # Aplicar pipeline custom de procesamiento (Spark)
    df1_spark = None
    try:
        print(f"  Llamando a pipeline.apply_pipeline para {file_name}...")
        # Aquí es donde ocurre el AnalysisException para scenario_002 si 'takeoff time' no se genera
        df1_spark = pipeline.apply_pipeline(df0_spark) 
        
        if df1_spark is None:
            print(f"  ERROR: pipeline.apply_pipeline devolvió None para {file_name}")
            df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -2 # Código de error: Pipeline devolvió None
            continue
        
        print(f"  --- Esquema de df1_spark para {file_name} (DESPUÉS de apply_pipeline) ---")
        df1_spark.printSchema(level=1)
        # df1_spark.show(5, truncate=False) # Descomentar para ver algunas filas

        # Verificar si la columna renombrada RAW_TARGET_COL_NAME_SPARK ('takeoff_time') está presente
        if RAW_TARGET_COL_NAME_SPARK not in df1_spark.columns:
            print(f"  ERROR CRÍTICO: La columna '{RAW_TARGET_COL_NAME_SPARK}' NO está en el DataFrame después de pipeline.apply_pipeline para {file_name}.")
            print(f"  Esto significa que '{RAW_TARGET_COL_NAME_IN_PIPELINE}' no se pudo calcular o renombrar correctamente dentro del pipeline.")
            df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -3 # Código de error: Target col faltante post-pipeline
            continue

        if df1_spark.count() == 0:
            print(f"  ADVERTENCIA: pipeline.apply_pipeline devolvió un DataFrame vacío para {file_name}")
            df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -4 # Código de error: Pipeline devolvió DF vacío
            continue

    except Exception as e:
        print(f"  ERROR CRÍTICO durante pipeline.apply_pipeline para {file_name}: {type(e).__name__} - {e}")
        import traceback
        # traceback.print_exc() # Descomentar para traceback completo si es necesario
        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -5 # Código de error: Excepción en pipeline
        continue
        
    df2_spark = add_meteo_data(df1_spark, METEO_PATH)
    if df2_spark is None or df2_spark.count() == 0:
        print(f"  ADVERTENCIA: add_meteo_data devolvió un DataFrame vacío o nulo para {file_name}")
        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -6 # Código de error: Meteo devolvió DF vacío/nulo
        continue

    # --- Creación de TARGET_COL (log_takeoff_time) para posible evaluación ---
    # En inferencia pura, no necesitas y_test_raw. Si 'takeoff_time' (la columna original)
    # está presente, podemos crear la versión logarítmica para y_test_raw.
    # Si no está (lo cual sería extraño si el pipeline la generó), y_test_raw no se creará.
    can_create_log_target_for_eval = False
    if RAW_TARGET_COL_NAME_SPARK in df2_spark.columns:
        df2_spark = df2_spark.withColumn(TARGET_COL, log1p(col(RAW_TARGET_COL_NAME_SPARK)))
        can_create_log_target_for_eval = True
        print(f"  Columna '{TARGET_COL}' creada a partir de '{RAW_TARGET_COL_NAME_SPARK}' para evaluación.")
    else:
        # Esto no debería ocurrir si la comprobación anterior en df1_spark pasó y add_meteo_data no la eliminó.
        print(f"  ADVERTENCIA: '{RAW_TARGET_COL_NAME_SPARK}' no encontrada en df2_spark (post-meteo) para {file_name}. No se creará '{TARGET_COL}'.")

    df2_spark = df2_spark.withColumn("weekday_num", mapping_expr)
    df2_spark = df2_spark.withColumn("hour_sin", sin(lit(2) * pi() * col("hour") / lit(24.0)))
    df2_spark = df2_spark.withColumn("hour_cos", cos(lit(2) * pi() * col("hour") / lit(24.0)))
    df2_spark = df2_spark.withColumn("weekday_sin", sin(lit(2) * pi() * col("weekday_num") / lit(7.0)))
    df2_spark = df2_spark.withColumn("weekday_cos", cos(lit(2) * pi() * col("weekday_num") / lit(7.0)))
    df2_spark = df2_spark.withColumn("__index_level_0__", monotonically_increasing_id().cast("long"))

    print(f"  --- Esquema de df2_spark para {file_name} (antes de fitted_spark_pipeline.transform()) ---")
    df2_spark.printSchema(level=1)

    df_model_input_spark = fitted_spark_pipeline.transform(df2_spark)
    df_model_input_pandas = df_model_input_spark.toPandas()

    if df_model_input_pandas.empty or 'features' not in df_model_input_pandas.columns:
        print(f"  ADVERTENCIA: DataFrame para modelo (pandas) está vacío o falta 'features' para {file_name}.")
        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -7 # Código de error: Pandas DF vacío/sin features
        continue

    features_list = df_model_input_pandas['features'].apply(lambda x: x.toArray().tolist()).tolist()
    features_np = np.array(features_list)

    y_test_raw = None # Para evaluación, no estrictamente necesario para predicción
    if can_create_log_target_for_eval and TARGET_COL in df_model_input_pandas.columns:
        y_test_raw = df_model_input_pandas[TARGET_COL].values
        print(f"  Extraído y_test_raw (shape: {y_test_raw.shape}) para {file_name}.")
    elif TARGET_COL not in df_model_input_pandas.columns and can_create_log_target_for_eval:
         print(f"  ADVERTENCIA: '{TARGET_COL}' no encontrada en df_model_input_pandas aunque se esperaba. No se puede crear y_test_raw para {file_name}.")
    else:
        print(f"  Info: No se creó '{TARGET_COL}' o no está en pandas DF, y_test_raw no disponible para {file_name}.")

    if features_np.ndim == 1:
        features_np = features_np.reshape(1, -1) if features_np.shape[0] > 0 else np.array([]).reshape(0,0) # Manejar array vacío

    if features_np.shape[0] == 0:
        print(f"  ADVERTENCIA: features_np tiene 0 filas para {file_name} (antes de ajuste dim).")
        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -8 # Código de error: features_np vacío
        continue
        
    current_num_features = features_np.shape[1]
    print(f"  features_np shape (antes ajuste dim): {features_np.shape}. Contiene NaN: {np.isnan(features_np).any()}, Inf: {np.isinf(features_np).any()}")
    if features_np.size > 0: print(f"    Min: {np.min(features_np):.4f}, Max: {np.max(features_np):.4f}, Mean: {np.mean(features_np):.4f}")

    if current_num_features != EXPECTED_NUM_FEATURES:
        print(f"  Ajustando características: {current_num_features} -> {EXPECTED_NUM_FEATURES}. Relleno/Truncado.")
        if current_num_features < EXPECTED_NUM_FEATURES:
            padding = np.zeros((features_np.shape[0], EXPECTED_NUM_FEATURES - current_num_features))
            features_np = np.concatenate((features_np, padding), axis=1)
        else:
            features_np = features_np[:, :EXPECTED_NUM_FEATURES]
        print(f"  features_np shape (después ajuste dim): {features_np.shape}. NaN: {np.isnan(features_np).any()}, Inf: {np.isinf(features_np).any()}")
        if features_np.size > 0: print(f"    Min: {np.min(features_np):.4f}, Max: {np.max(features_np):.4f}, Mean: {np.mean(features_np):.4f}")
    
    X_scenario_seq = create_sequences_np(features_np, SEQUENCE_LENGTH)
    
    if X_scenario_seq.shape[0] > 0:
        print(f"  X_scenario_seq shape: {X_scenario_seq.shape}. NaN: {np.isnan(X_scenario_seq).any()}, Inf: {np.isinf(X_scenario_seq).any()}")
        if np.isnan(X_scenario_seq).any() or np.isinf(X_scenario_seq).any():
            print(f"  ADVERTENCIA CRÍTICA: X_scenario_seq para {file_name} contiene NaN/Inf ANTES de model.predict(). ¡Esto causará NaN en la predicción!")
            # Aquí podrías añadir lógica para guardar X_scenario_seq o features_np para inspección
            # np.save(f"debug_X_scenario_seq_{scenario_name_from_df}.npy", X_scenario_seq)
            # np.save(f"debug_features_np_{scenario_name_from_df}.npy", features_np)
            df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -9 # Error: NaN/Inf en input del modelo
            continue

        scenario_predictions_scaled = model.predict(X_scenario_seq)

        if np.isnan(scenario_predictions_scaled).any():
            print(f"  ADVERTENCIA: Predicciones escaladas del modelo (model.predict) contienen NaN para {file_name}.")
            last_prediction_for_scenario = np.nan 
        elif y_scaler_trained is not None:
            # Asegurar forma 2D para inverse_transform
            if scenario_predictions_scaled.ndim == 1:
                 scenario_predictions_scaled = scenario_predictions_scaled.reshape(-1, 1)
            
            # Verificar compatibilidad de features con y_scaler_trained
            expected_scaler_features = getattr(y_scaler_trained, 'n_features_in_', 1) # Default a 1 si no está el atributo
            if scenario_predictions_scaled.shape[1] != expected_scaler_features:
                 print(f"  ADVERTENCIA: Discrepancia en features para y_scaler_trained. Pred. shape: {scenario_predictions_scaled.shape}, y_scaler espera: {expected_scaler_features}")
                 if scenario_predictions_scaled.shape[1] > 0 : # Intentar tomar la primera columna
                    scenario_predictions_scaled = scenario_predictions_scaled[:,0].reshape(-1,1)
                 else: # No se puede resolver
                    last_prediction_for_scenario = np.nan
                    df_results.loc[current_df_results_index, "time_to_takeoff_s"] = last_prediction_for_scenario
                    continue
            
            try:
                scenario_predictions_log = y_scaler_trained.inverse_transform(scenario_predictions_scaled).flatten()
                scenario_predictions_orig = np.expm1(scenario_predictions_log)

                if scenario_predictions_orig.shape[0] > 0:
                    last_prediction_for_scenario = scenario_predictions_orig[-1]
                    if np.isnan(last_prediction_for_scenario):
                        print(f"  ADVERTENCIA: Predicción final es NaN después de inverse_transform/expm1 para {file_name}.")
                    else:
                        print(f"  PREDICCIÓN para {file_name} (último timestamp): {last_prediction_for_scenario:.2f} segundos")
                else:
                    print(f"  ADVERTENCIA: No se generaron predicciones (orig) después de invertir escalado para {file_name}.")
                    last_prediction_for_scenario = -10 # Error específico
            except Exception as e_inv_transform:
                print(f"  ERROR durante inverse_transform/expm1 para {file_name}: {e_inv_transform}")
                last_prediction_for_scenario = np.nan

        else: # y_scaler_trained is None
            print(f"  ADVERTENCIA: y_scaler_trained no disponible. Usando predicción escalada como resultado para {file_name}.")
            last_prediction_for_scenario = scenario_predictions_scaled.flatten()[-1] if scenario_predictions_scaled.size > 0 else -11

        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = last_prediction_for_scenario
    else:
        print(f"  ADVERTENCIA: No hay suficientes datos en {file_name} para crear secuencias (X_scenario_seq vacío).")
        df_results.loc[current_df_results_index, "time_to_takeoff_s"] = -12 

# --- Guardar el dataframe de resultados final ---
df_results.to_csv(RESULT_PATH, index=False)
print(f"\nResultados guardados en: {RESULT_PATH}")
df_results.info() # Muestra un resumen de los resultados, incluyendo conteo de nulos si las predicciones fueron NaN

y_scaler del entrenamiento cargado exitosamente.
La columna objetivo (transformada) a predecir es: log_takeoff_time
El pipeline interno espera calcular y luego limpiar 'takeoff time' a 'takeoff_time'.

[1/193] Procesando escenario: scenario_001.parquet (índice df_results: 0)
  --- Esquema de df0_spark para scenario_001.parquet (ANTES de apply_pipeline) ---
root
 |-- Timestamp (date): timestamp_ntz (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Downlink Format: long (nullable = true)
 |-- Typecode: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- TurbulenceCategory: string (nullable = true)
 |-- Callsign: string (nullable = true)
 |-- Speed: double (nullable = true)
 |-- Altitude (ft): double (nullable = true)
 |-- Flight status: string (nullable = true)

  Llamando a pipeline.apply_pipeline para scenario_001.parquet...
=== INICIO apply_pipeline ===
  Input df_input_raw count: 48028
  DEBUG: Entrando a cleanColumns.
  ADVERTEN

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/bryan/pruebas/.venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/bryan/pruebas/.venv/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

# Predictions

Importamos el modelo final de la fase de entrenamiento

Realizamos las predicciones sobre el escenario procesado

Guardamos la predicción correspondiente en el CSV de respuestas

In [1]:
#----------------------------------------------
# Mis pruebecitas
#----------------------------------------------

In [1]:
# Lo hago primero con uno y luego hago el bucle
import pandas as pd

RESULT_PATH = "/Users/maria/Dropbox/UCM/PD2/despegues/src/data/final_scenarios/answers_empty.csv"
#MODEL_PATH = 
df_results = pd.read_csv(RESULT_PATH)

In [8]:
df2 = getAirplaneCategories(df0)
df2.show(5)

+------+--------------------+
|  ICAO|  TurbulenceCategory|
+------+--------------------+
|ab7376|Heavy (larger tha...|
|344099|Medium 2 (between...|
|3420cc|Medium 2 (between...|
|346303|Medium 2 (between...|
|39ceb0|Medium 2 (between...|
+------+--------------------+
only showing top 5 rows

