## Notebook 03: Feature Engineering con Pipelines

**Sección 13 - Spark ML**: Construcción de pipelines end-to-end

**Objetivo**: Aplicar VectorAssembler y construir un pipeline de transformación.

**Conceptos clave**:

  - **Transformer**: Aplica transformaciones (ej: StringIndexer)
  - **Estimator**: Aprende de los datos y genera un modelo
  - **Pipeline**: Encadena múltiples stages secuencialmente

**Actividades:**
  1. Crear StringIndexer para variables categóricas
  2. Aplicar OneHotEncoder
  3. Combinar features con VectorAssembler
  4. Construir y ejecutar Pipeline


In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
)
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, isnull

# %%
# Configurar SparkSession
spark = SparkSession.builder \
    .appName("SECOP_FeatureEngineering") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/13 15:05:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/13 15:05:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# Cargar datos
df = spark.read.parquet("/opt/spark-data/raw/secop_eda.parquet")
from pyspark.sql import functions as F

# Antes de entrenar/transformar (o después, da igual si la columna existe)
df = df.withColumn("valor_del_contrato_log", F.log1p(F.col("valor_del_contrato_num")))
print(f"Registros cargados: {df.count():,}")

# %%
# Explorar columnas disponibles
print("Columnas disponibles:")
for col_name in df.columns:
    print(f"  - {col_name}")


categorical_cols = [
    "departamento",
    "tipo_de_contrato",
    "estado_contrato"
]
numeric_cols = [
    "valor_del_contrato_num"
]


26/02/13 15:05:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Registros cargados: 100,000
Columnas disponibles:
  - referencia_del_contrato
  - valor_del_contrato
  - valor_del_contrato_num
  - departamento
  - tipo_de_contrato
  - fecha_de_firma
  - fecha_de_firma_ts
  - duraci_n_del_contrato
  - proveedor_adjudicado
  - estado_contrato
  - valor_del_contrato_log


### **Seleccionar features categóricas y numéricas**

In [3]:
categorical_cols = [
    "departamento",
    "tipo_de_contrato",
    "estado_contrato"
]
numeric_cols = [
    "valor_del_contrato_log"
]

available_cat = [c for c in categorical_cols if c in df.columns]
available_num = [c for c in numeric_cols if c in df.columns]

print(f"Categóricas seleccionadas: {available_cat}")
print(f"Numéricas seleccionadas: {available_num}")


Categóricas seleccionadas: ['departamento', 'tipo_de_contrato', 'estado_contrato']
Numéricas seleccionadas: ['valor_del_contrato_log']


## **Implementar estrategia de limpieza de datos**

En analisis previos no se encontraron datos faltantes.

In [4]:
# Estrategia: eliminar filas con valores nulos en features seleccionadas

df_clean = df.dropna(subset=available_cat + available_num)

print(f"Registros antes de limpiar: {df.count():,}")
print(f"Registros después de limpiar: {df_clean.count():,}")


Registros antes de limpiar: 100,000
Registros después de limpiar: 100,000


## **Crear VectorAssembler para combinar features**

Se buscara combinar variables numéricas y categóricas codificadas en un solo vector, para que ese vector numérico contenga todas las variables explicativas del modelo.

In [5]:
# StringIndexer para variables categóricas

indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=f"{c}_idx",
        handleInvalid="keep"
    )
    for c in available_cat
]

print("StringIndexers creados:")
for idx in indexers:
    print(f" - {idx.getInputCol()} → {idx.getOutputCol()}")

StringIndexers creados:
 - departamento → departamento_idx
 - tipo_de_contrato → tipo_de_contrato_idx
 - estado_contrato → estado_contrato_idx


In [6]:
# OneHotEncoder para variables categóricas
encoders = [
    OneHotEncoder(
        inputCol=f"{c}_idx",
        outputCol=f"{c}_vec"
    )
    for c in available_cat
]

print("\nOneHotEncoders creados:")
for enc in encoders:
    print(f" - {enc.getInputCol()} → {enc.getOutputCol()}")



OneHotEncoders creados:
 - departamento_idx → departamento_vec
 - tipo_de_contrato_idx → tipo_de_contrato_vec
 - estado_contrato_idx → estado_contrato_vec


In [7]:
# Columnas categóricas codificadas (salida del OneHotEncoder)
encoded_cat_cols = [c + "_vec" for c in available_cat]

# ensamblar
feature_cols = available_num + encoded_cat_cols
print("Columnas que se combinarán en el vector de features:")

for c in feature_cols:
    print(f" - {c}")

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)


Columnas que se combinarán en el vector de features:
 - valor_del_contrato_log
 - departamento_vec
 - tipo_de_contrato_vec
 - estado_contrato_vec


### **Construir Pipeline completo (orden correcto de stages)**

StringIndexer  →  OneHotEncoder  →  VectorAssembler

In [8]:
pipeline_stages = []
pipeline_stages.extend(indexers)
pipeline_stages.extend(encoders)
pipeline_stages.append(assembler)

pipeline = Pipeline(stages=pipeline_stages)

print(f"\nPipeline construido con {len(pipeline_stages)} stages:")
for i, stage in enumerate(pipeline_stages, start=1):
    print(f" Stage {i}: {type(stage).__name__}")

# Entrenar y aplicar el Pipeline
print("\nEntrenando pipeline...")
pipeline_model = pipeline.fit(df_clean)
print("Pipeline entrenado")

df_transformed = pipeline_model.transform(df_clean)
print("Transformación aplicada")

# Verificar el vector final
df_transformed.select("features_raw").printSchema()

sample = df_transformed.select("features_raw").first()[0]
print(f"Dimensión del vector de features: {len(sample)}")




Pipeline construido con 7 stages:
 Stage 1: StringIndexer
 Stage 2: StringIndexer
 Stage 3: StringIndexer
 Stage 4: OneHotEncoder
 Stage 5: OneHotEncoder
 Stage 6: OneHotEncoder
 Stage 7: VectorAssembler

Entrenando pipeline...


                                                                                

Pipeline entrenado
Transformación aplicada
root
 |-- features_raw: vector (nullable = true)

Dimensión del vector de features: 62


### **Calcular dimension total de features post-encoding**

In [9]:
print("CATEGORÍAS POR VARIABLE CATEGÓRICA")

total_ohe_features = 0

for cat in available_cat:
    n_cat = df_clean.select(cat).distinct().count()
    total_ohe_features += n_cat
    print(f"{cat}: {n_cat} categorías únicas")

print(f"\nTotal features categóricas (OneHot): {total_ohe_features}")
print(f"Features numéricas: {len(available_num)}")
print(f"FEATURES TOTALES ESPERADAS: {total_ohe_features + len(available_num)}")

# Validacion contra el vector real
sample = df_transformed.select("features_raw").first()[0]
print(f"\nDimensión real del vector features_raw: {len(sample)}")



CATEGORÍAS POR VARIABLE CATEGÓRICA
departamento: 34 categorías únicas
tipo_de_contrato: 20 categorías únicas
estado_contrato: 7 categorías únicas

Total features categóricas (OneHot): 61
Features numéricas: 1
FEATURES TOTALES ESPERADAS: 62

Dimensión real del vector features_raw: 62


La dimensión final del vector de características se obtuvo sumando una variable numérica continua y las variables categóricas codificadas mediante OneHotEncoding. Cada categoría única genera una dimensión independiente, lo que resultó en un vector final de 62 features, coincidente con el conteo empírico obtenido tras aplicar el pipeline.

### **Analisis de varianza de features**

In [10]:
import numpy as np
import pandas as pd

print("ANÁLISIS DE VARIANZA DE FEATURES")

# Tomamos una muestra para no explotar memoria
sample_df = (
    df_transformed
    .select("features_raw")
    .sample(fraction=0.01, seed=42)
    .limit(1000)
    .toPandas()
)

# Convertir vector Spark → array NumPy
features_matrix = np.array(
    [row["features_raw"].toArray() for _, row in sample_df.iterrows()]
)

print(f"Matriz de features: {features_matrix.shape}")


ANÁLISIS DE VARIANZA DE FEATURES


                                                                                

Matriz de features: (1000, 62)


In [11]:
variances = np.var(features_matrix, axis=0)

# Top 5 features con mayor varianza
top_5_idx = np.argsort(variances)[-5:][::-1]

print("\nTop 5 features con mayor varianza:")
for idx in top_5_idx:
    print(f"Feature {idx}: varianza = {variances[idx]:.4f}")



Top 5 features con mayor varianza:
Feature 0: varianza = 6.1991
Feature 35: varianza = 0.1948
Feature 1: varianza = 0.1699
Feature 55: varianza = 0.1618
Feature 2: varianza = 0.1275


Se realizó un análisis de varianza sobre el vector de características para identificar las variables con mayor dispersión. Las features con mayor varianza corresponden principalmente a variables categóricas con alta frecuencia diferencial y a la variable numérica continua, validando la necesidad de aplicar técnicas de normalización y reducción dimensional en etapas posteriores.

### **Cierre del Pipelines y Feature Engineering: Guardar Dataset**

In [12]:
output_path = "/opt/spark-data/raw/secop_features_1.parquet"

df_transformed.write \
    .mode("overwrite") \
    .parquet(output_path)

print(f"Dataset con features guardado en: {output_path}")
print(f"✓ Registros: {df_transformed.count():,}")

                                                                                

Dataset con features guardado en: /opt/spark-data/raw/secop_features_1.parquet
✓ Registros: 100,000


In [13]:
spark.stop()