In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler
)
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# 1. Configurar SparkSession
spark = SparkSession.builder \
    .appName("SECOP_FeatureEngineering") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# 2. Cargar datos desde la ruta del Notebook 01
path_input = "/opt/spark-data/raw/secop_contratos.parquet"
df = spark.read.parquet(path_input)
print(f"Registros iniciales: {df.count():,}")

# --- SOLUCIÓN AL ERROR DE TIPO ---
# Convertimos el valor del contrato de Texto a Número (Double)
# Esto es obligatorio para que el modelo pueda hacer cálculos matemáticos
df = df.withColumn("valor_del_contrato", col("valor_del_contrato").cast("double"))

# 3. Limpieza de datos (Reto 2)
# Filtramos departamentos, tipo, estado y valor para que no tengan nulos ni ceros
categorical_cols = ["departamento", "tipo_de_contrato", "estado_contrato"]
df_clean = df.dropna(subset=categorical_cols + ["valor_del_contrato"]) \
             .filter(col("valor_del_contrato") > 0)

print(f"Registros después de limpieza: {df_clean.count():,}")

# 4. Construcción del Pipeline de IA
# Paso A: StringIndexer (Letras a números de índice)
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") 
    for c in categorical_cols
]

# Paso B: OneHotEncoder (Índices a vectores binarios)
encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_vec") 
    for c in categorical_cols
]

# Paso C: VectorAssembler (Unir todo en una sola columna 'features')
# Usamos las versiones _vec y el valor numérico
assembler = VectorAssembler(
    inputCols=[f"{c}_vec" for c in categorical_cols] + ["valor_del_contrato"],
    outputCol="features_raw"
)

# 5. Ejecutar el proceso completo
pipeline = Pipeline(stages=indexers + encoders + [assembler])
pipeline_model = pipeline.fit(df_clean)
df_transformed = pipeline_model.transform(df_clean)

# 6. Guardar los resultados para el Notebook 05 de Regresión
output_path = "/opt/spark-data/processed/secop_features.parquet"
pipeline_path = "/opt/spark-data/processed/feature_pipeline"

df_transformed.write.mode("overwrite").parquet(output_path)
pipeline_model.write().overwrite().save(pipeline_path)

print("\n" + "="*40)
print("¡PROCESO COMPLETADO EXITOSAMENTE!")
print(f"Dataset guardado en: {output_path}")
print("="*40)

# Mostrar muestra del resultado final
df_transformed.select("features_raw", "valor_del_contrato").show(5, truncate=False)

Registros iniciales: 1,000
Registros después de limpieza: 967


                                                                                


¡PROCESO COMPLETADO EXITOSAMENTE!
Dataset guardado en: /opt/spark-data/processed/secop_features.parquet
+-------------------------------------------+------------------+
|features_raw                               |valor_del_contrato|
+-------------------------------------------+------------------+
|(53,[0,32,48,52],[1.0,1.0,1.0,7.3451015E7])|7.3451015E7       |
|(53,[26,32,43,52],[1.0,1.0,1.0,6673341.0]) |6673341.0         |
|(53,[0,32,42,52],[1.0,1.0,1.0,1.32E7])     |1.32E7            |
|(53,[0,32,42,52],[1.0,1.0,1.0,6.75268E7])  |6.75268E7         |
|(53,[16,32,44,52],[1.0,1.0,1.0,1.5E7])     |1.5E7             |
+-------------------------------------------+------------------+
only showing top 5 rows

