# Big Data - Proyecto MLLib
# Preprocesamiento de Datos
# Inicializar Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import numpy as np

spark = SparkSession.builder \
    .appName("Hotel Booking Cancellation - Data Preprocessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print("=== CARGANDO DATOS ===")
df = spark.read.csv("train.csv", header=True, inferSchema=True)
print(f"Datos cargados: {df.count()} registros, {len(df.columns)} columnas")

25/06/01 16:12:00 WARN Utils: Your hostname, jgasbul-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/06/01 16:12:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/01 16:12:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


=== CARGANDO DATOS ===


                                                                                

Datos cargados: 55531 registros, 23 columnas


# Definir columnas por tipo

In [2]:
numeric_cols = ["lead_time", "arrival_date_week_number", "stays_in_weekend_nights", 
               "stays_in_week_nights", "adults", "children", "babies", 
               "previous_cancellations", "previous_bookings_not_canceled", 
               "booking_changes", "days_in_waiting_list", "adr", 
               "required_car_parking_spaces", "total_of_special_requests"]

categorical_cols = ["meal", "country", "market_segment", "distribution_channel", 
                   "reserved_room_type", "deposit_type", "customer_type"]

boolean_cols = ["is_repeated_guest"]
target_col = "is_canceled"

print(f"Columnas numéricas: {len(numeric_cols)}")
print(f"Columnas categóricas: {len(categorical_cols)}")
print(f"Columnas booleanas: {len(boolean_cols)}")

Columnas numéricas: 14
Columnas categóricas: 7
Columnas booleanas: 1


# 1. TRATAMIENTO DE VALORES NULOS

In [3]:
print("\n=== TRATAMIENTO DE VALORES NULOS ===")

# Imputar valores nulos en 'children' con la mediana
children_median = df.approxQuantile("children", [0.5], 0.01)[0]
print(f"Mediana de 'children': {children_median}")

df = df.fillna({"children": children_median})

# Verificar que no quedan nulos
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
print("Valores nulos después de imputación:")
null_counts.show()


=== TRATAMIENTO DE VALORES NULOS ===
Mediana de 'children': 0.0
Valores nulos después de imputación:
+---------+------------------------+-----------------------+--------------------+------+--------+------+----+-------+--------------+--------------------+-----------------+----------------------+------------------------------+------------------+---------------+------------+--------------------+-------------+---+---------------------------+-------------------------+-----------+
|lead_time|arrival_date_week_number|stays_in_weekend_nights|stays_in_week_nights|adults|children|babies|meal|country|market_segment|distribution_channel|is_repeated_guest|previous_cancellations|previous_bookings_not_canceled|reserved_room_type|booking_changes|deposit_type|days_in_waiting_list|customer_type|adr|required_car_parking_spaces|total_of_special_requests|is_canceled|
+---------+------------------------+-----------------------+--------------------+------+--------+------+----+-------+--------------+--------

# 2. INGENIERÍA DE CARACTERÍSTICAS

In [4]:
print("\n=== INGENIERÍA DE CARACTERÍSTICAS ===")

# Crear nuevas características derivadas
df = df.withColumn("total_nights", col("stays_in_weekend_nights") + col("stays_in_week_nights")) \
       .withColumn("total_guests", col("adults") + col("children") + col("babies")) \
       .withColumn("adr_per_person", when(col("total_guests") > 0, col("adr") / col("total_guests")).otherwise(col("adr"))) \
       .withColumn("has_special_requests", when(col("total_of_special_requests") > 0, 1).otherwise(0)) \
       .withColumn("has_previous_cancellations", when(col("previous_cancellations") > 0, 1).otherwise(0)) \
       .withColumn("booking_changes_flag", when(col("booking_changes") > 0, 1).otherwise(0))

# Actualizar lista de columnas numéricas
numeric_cols_extended = numeric_cols + ["total_nights", "total_guests", "adr_per_person"]
binary_derived_cols = ["has_special_requests", "has_previous_cancellations", "booking_changes_flag"]

print("Nuevas características creadas:")
print("- total_nights: suma de noches de fin de semana y entre semana")
print("- total_guests: suma de adultos, niños y bebés")
print("- adr_per_person: ADR por persona")
print("- has_special_requests: flag si tiene peticiones especiales")
print("- has_previous_cancellations: flag si tiene cancelaciones previas")
print("- booking_changes_flag: flag si ha hecho cambios en la reserva")


=== INGENIERÍA DE CARACTERÍSTICAS ===
Nuevas características creadas:
- total_nights: suma de noches de fin de semana y entre semana
- total_guests: suma de adultos, niños y bebés
- adr_per_person: ADR por persona
- has_special_requests: flag si tiene peticiones especiales
- has_previous_cancellations: flag si tiene cancelaciones previas
- booking_changes_flag: flag si ha hecho cambios en la reserva


# 3. TRANSFORMACIONES PARA VARIABLES ASIMÉTRICAS

In [5]:
print("\n=== TRANSFORMACIONES PARA VARIABLES ASIMÉTRICAS ===")

# Aplicar transformación logarítmica a variables asimétricas
skewed_cols = ["lead_time", "adr", "total_of_special_requests"]

for col_name in skewed_cols:
    df = df.withColumn(f"{col_name}_log", 
                      when(col(col_name) > 0, log(col(col_name) + 1)).otherwise(0))

log_transformed_cols = [f"{col}_log" for col in skewed_cols]
print(f"Transformaciones logarítmicas aplicadas a: {skewed_cols}")


=== TRANSFORMACIONES PARA VARIABLES ASIMÉTRICAS ===
Transformaciones logarítmicas aplicadas a: ['lead_time', 'adr', 'total_of_special_requests']


# 4. DISCRETIZACIÓN DE VARIABLES CONTINUAS

In [6]:
print("\n=== DISCRETIZACIÓN DE VARIABLES CONTINUAS ===")

# Discretizar ADR en rangos
adr_bucketizer = Bucketizer(
    splits=[-float('inf'), 50, 100, 150, 200, float('inf')],
    inputCol="adr",
    outputCol="adr_bucket"
)

# Discretizar lead_time
lead_time_bucketizer = Bucketizer(
    splits=[-float('inf'), 7, 30, 90, 365, float('inf')],
    inputCol="lead_time", 
    outputCol="lead_time_bucket"
)

discretized_cols = ["adr_bucket", "lead_time_bucket"]


=== DISCRETIZACIÓN DE VARIABLES CONTINUAS ===


# 5. PIPELINE DE TRANSFORMACIONES

In [7]:
print("\n=== CONSTRUYENDO PIPELINE DE TRANSFORMACIONES ===")

# Imputadores para valores faltantes (por si acaso)
imputers = []
for col_name in numeric_cols_extended:
    imputer = Imputer(
        inputCols=[col_name],
        outputCols=[f"{col_name}_imputed"],
        strategy="median"
    )
    imputers.append(imputer)

# StringIndexers para variables categóricas
string_indexers = []
for col_name in categorical_cols:
    indexer = StringIndexer(
        inputCol=col_name,
        outputCol=f"{col_name}_indexed",
        handleInvalid="keep"
    )
    string_indexers.append(indexer)

# OneHotEncoders para variables categóricas
one_hot_encoders = []
indexed_categorical_cols = [f"{col}_indexed" for col in categorical_cols]
encoded_categorical_cols = [f"{col}_encoded" for col in categorical_cols]

for i, col_name in enumerate(indexed_categorical_cols):
    encoder = OneHotEncoder(
        inputCols=[col_name],
        outputCols=[encoded_categorical_cols[i]],
        handleInvalid="keep"
    )
    one_hot_encoders.append(encoder)

# Escaladores para variables numéricas
scaler_cols = [f"{col}_imputed" for col in numeric_cols_extended] + log_transformed_cols
scaled_cols = [f"{col}_scaled" for col in scaler_cols]

# Usar StandardScaler
assembler_for_scaling = VectorAssembler(
    inputCols=scaler_cols,
    outputCol="features_to_scale"
)

scaler = StandardScaler(
    inputCol="features_to_scale",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

# Separar características escaladas
feature_slicer = VectorSlicer(
    inputCol="scaled_features",
    outputCol="numeric_features_scaled",
    indices=list(range(len(scaler_cols)))
)

# Ensamblar todas las características finales
all_feature_cols = ["numeric_features_scaled"] + encoded_categorical_cols + boolean_cols + binary_derived_cols + discretized_cols

final_assembler = VectorAssembler(
    inputCols=all_feature_cols,
    outputCol="features"
)

# Construir pipeline completo
def create_preprocessing_pipeline():
    stages = []
    
    # Bucketizers
    stages.extend([adr_bucketizer, lead_time_bucketizer])
    
    # Imputadores
    stages.extend(imputers)
    
    # String indexers
    stages.extend(string_indexers)
    
    # One hot encoders
    stages.extend(one_hot_encoders)
    
    # Escalado
    stages.extend([assembler_for_scaling, scaler, feature_slicer])
    
    # Ensamblador final
    stages.append(final_assembler)
    
    return Pipeline(stages=stages)

preprocessing_pipeline = create_preprocessing_pipeline()

print(f"Pipeline construido con {len(preprocessing_pipeline.getStages())} etapas")


=== CONSTRUYENDO PIPELINE DE TRANSFORMACIONES ===
Pipeline construido con 37 etapas


# 6. APLICAR TRANSFORMACIONES

In [8]:
print("\n=== APLICANDO TRANSFORMACIONES ===")

# Fit del pipeline
fitted_pipeline = preprocessing_pipeline.fit(df)

# Transform
df_transformed = fitted_pipeline.transform(df)

print("Transformaciones aplicadas correctamente")
print(f"Dimensiones del vector de características: {len(df_transformed.select('features').first()[0])}")


=== APLICANDO TRANSFORMACIONES ===


25/06/01 16:12:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Transformaciones aplicadas correctamente
Dimensiones del vector de características: 224


# 7. DIVISIÓN TRAIN/VALIDATION

In [9]:
print("\n=== DIVISIÓN TRAIN/VALIDATION ===")

# Dividir en train y validation
train_df, val_df = df_transformed.randomSplit([0.8, 0.2], seed=42)

print(f"Conjunto de entrenamiento: {train_df.count()} registros")
print(f"Conjunto de validación: {val_df.count()} registros")

# Verificar distribución de clases
print("\nDistribución de clases en entrenamiento:")
train_df.groupBy(target_col).count().show()

print("Distribución de clases en validación:")
val_df.groupBy(target_col).count().show()


=== DIVISIÓN TRAIN/VALIDATION ===


                                                                                

Conjunto de entrenamiento: 44413 registros




Conjunto de validación: 11118 registros

Distribución de clases en entrenamiento:


                                                                                

+-----------+-----+
|is_canceled|count|
+-----------+-----+
|          1|18486|
|          0|25927|
+-----------+-----+

Distribución de clases en validación:




+-----------+-----+
|is_canceled|count|
+-----------+-----+
|          1| 4627|
|          0| 6491|
+-----------+-----+



                                                                                

# 8. GUARDAR DATOS PREPROCESADOS

In [10]:
print("\n=== GUARDANDO DATOS PREPROCESADOS ===")

# Guardar pipeline fitted para reutilización
fitted_pipeline.write().overwrite().save("preprocessing_pipeline")

# Guardar datos transformados
train_df.select("features", target_col).write.mode("overwrite").parquet("train_processed")
val_df.select("features", target_col).write.mode("overwrite").parquet("val_processed")

print("Datos preprocesados guardados:")
print("- preprocessing_pipeline/: Pipeline de transformaciones")
print("- train_processed/: Datos de entrenamiento procesados")
print("- val_processed/: Datos de validación procesados")

# 9. RESUMEN DE TRANSFORMACIONES
print("\n=== RESUMEN DE TRANSFORMACIONES APLICADAS ===")
print("1. Imputación de valores nulos en 'children' con mediana")
print("2. Creación de características derivadas:")
print("   - total_nights, total_guests, adr_per_person")
print("   - has_special_requests, has_previous_cancellations, booking_changes_flag")
print("3. Transformaciones logarítmicas para variables asimétricas")
print("4. Discretización de ADR y lead_time en buckets")
print("5. Encoding de variables categóricas con StringIndexer + OneHotEncoder")
print("6. Normalización de variables numéricas con StandardScaler")
print("7. Ensamblado final de todas las características")
print(f"8. Dimensión final del vector de características: {len(df_transformed.select('features').first()[0])}")

spark.stop()


=== GUARDANDO DATOS PREPROCESADOS ===


                                                                                

Datos preprocesados guardados:
- preprocessing_pipeline/: Pipeline de transformaciones
- train_processed/: Datos de entrenamiento procesados
- val_processed/: Datos de validación procesados

=== RESUMEN DE TRANSFORMACIONES APLICADAS ===
1. Imputación de valores nulos en 'children' con mediana
2. Creación de características derivadas:
   - total_nights, total_guests, adr_per_person
   - has_special_requests, has_previous_cancellations, booking_changes_flag
3. Transformaciones logarítmicas para variables asimétricas
4. Discretización de ADR y lead_time en buckets
5. Encoding de variables categóricas con StringIndexer + OneHotEncoder
6. Normalización de variables numéricas con StandardScaler
7. Ensamblado final de todas las características
8. Dimensión final del vector de características: 224
