In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
)
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, isnull

# ==========================================================
# Configurar SparkSession
# ==========================================================

spark = SparkSession.builder \
    .appName("SECOP_FeatureEngineering") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

# Reducir ruido de logs (opcional)
spark.sparkContext.setLogLevel("ERROR")

print("Spark inicializado correctamente")
print("Versión:", spark.version)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/12 23:34:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark inicializado correctamente
Versión: 3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# ==========================================================
# 1. Crear SparkSession
# ==========================================================

spark = SparkSession.builder \
    .appName("SECOP_FeatureEngineering") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("Spark listo - versión:", spark.version)


# ==========================================================
# 2. Cargar datos desde BRONZE
# ==========================================================

bronze_path = "/opt/spark-data/bronze/secop_contratos"

df_bronze = spark.read.parquet(bronze_path)

print(f"Registros en Bronze: {df_bronze.count():,}")
print(f"Columnas en Bronze: {len(df_bronze.columns)}")

df_bronze.printSchema()


# ==========================================================
# 3. Guardar en PROCESSED (crear capa si no existe)
# ==========================================================

processed_path = "/opt/spark-data/processed/secop_eda.parquet"

df_bronze.write.mode("overwrite").parquet(processed_path)

print("Datos guardados en PROCESSED correctamente")


# ==========================================================
# 4. Cargar datos desde PROCESSED
# ==========================================================

df = spark.read.parquet(processed_path)

print(f"Registros cargados desde PROCESSED: {df.count():,}")


# ==========================================================
# 5. Explorar columnas disponibles
# ==========================================================

print("Columnas disponibles:")
for col_name in df.columns:
    print(f"  - {col_name}")


Spark listo - versión: 3.5.0


                                                                                

Registros en Bronze: 132,641
Columnas en Bronze: 87
root
 |-- anno_bpin: string (nullable = true)
 |-- c_digo_bpin: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- codigo_de_categoria_principal: string (nullable = true)
 |-- codigo_entidad: string (nullable = true)
 |-- codigo_proveedor: string (nullable = true)
 |-- condiciones_de_entrega: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- descripcion_del_proceso: string (nullable = true)
 |-- descripcion_documentos_tipo: string (nullable = true)
 |-- destino_gasto: string (nullable = true)
 |-- dias_adicionados: string (nullable = true)
 |-- documento_proveedor: string (nullable = true)
 |-- documentos_tipo: string (nullable = true)
 |-- domicilio_representante_legal: string (nullable = true)
 |-- duraci_n_del_contrato: string (nullable = true)
 |-- el_contrato_puede_ser_prorrogado: string (nullable = true)
 |-- entidad_centralizada: string (nullable = true)
 |-- es_grupo: string (nullable =

                                                                                

Datos guardados en PROCESSED correctamente




Registros cargados desde PROCESSED: 132,641
Columnas disponibles:
  - anno_bpin
  - c_digo_bpin
  - ciudad
  - codigo_de_categoria_principal
  - codigo_entidad
  - codigo_proveedor
  - condiciones_de_entrega
  - departamento
  - descripcion_del_proceso
  - descripcion_documentos_tipo
  - destino_gasto
  - dias_adicionados
  - documento_proveedor
  - documentos_tipo
  - domicilio_representante_legal
  - duraci_n_del_contrato
  - el_contrato_puede_ser_prorrogado
  - entidad_centralizada
  - es_grupo
  - es_pyme
  - espostconflicto
  - estado_bpin
  - estado_contrato
  - fecha_de_fin_del_contrato
  - fecha_de_firma
  - fecha_de_inicio_del_contrato
  - fecha_de_notificaci_n_de_prorrogaci_n
  - fecha_fin_liquidacion
  - fecha_inicio_liquidacion
  - g_nero_representante_legal
  - habilita_pago_adelantado
  - id_contrato
  - identificaci_n_representante_legal
  - justificacion_modalidad_de
  - liquidaci_n
  - localizaci_n
  - modalidad_de_contratacion
  - n_mero_de_cuenta
  - n_mero_de_docume

                                                                                

In [3]:
# Seleccionar features para el modelo
# Variables categóricas
categorical_cols = ["departamento", "tipo_de_contrato", "estado_contrato"]

# Variables numéricas
numeric_cols = ["plazo_de_ejec_del_contrato", "valor_del_contrato_num"]

# Verificar qué columnas existen
available_cat = [c for c in categorical_cols if c in df.columns]
available_num = [c for c in numeric_cols if c in df.columns]

print(f"Categóricas disponibles: {available_cat}")
print(f"Numéricas disponibles: {available_num}")

Categóricas disponibles: ['departamento', 'tipo_de_contrato', 'estado_contrato']
Numéricas disponibles: []


In [4]:
# %%
# Limpiar datos: eliminar nulos
df_clean = df.dropna(subset=available_cat + available_num)
print(f"Registros después de limpiar nulos: {df_clean.count():,}")

Registros después de limpiar nulos: 132,641


In [5]:
# Convierte strings a índices numéricos
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_idx", handleInvalid="keep")
    for col in available_cat
]

In [6]:
print("StringIndexers creados:")
for idx in indexers:
    print(f"  - {idx.getInputCol()} -> {idx.getOutputCol()}")

# %%

StringIndexers creados:
  - departamento -> departamento_idx
  - tipo_de_contrato -> tipo_de_contrato_idx
  - estado_contrato -> estado_contrato_idx


In [7]:
# PASO 2: OneHotEncoder para generar variables dummy
encoders = [
    OneHotEncoder(inputCol=col + "_idx", outputCol=col + "_vec")
    for col in available_cat
]

In [8]:
print("\nOneHotEncoders creados:")
for enc in encoders:
    print(f"  - {enc.getInputCol()} -> {enc.getOutputCol()}")


OneHotEncoders creados:
  - departamento_idx -> departamento_vec
  - tipo_de_contrato_idx -> tipo_de_contrato_vec
  - estado_contrato_idx -> estado_contrato_vec


In [9]:
# PASO 3: VectorAssembler para combinar todas las features
# Combinamos: features numéricas + features categóricas codificadas
feature_cols = available_num + [col + "_vec" for col in available_cat]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw"
)

print(f"\nVectorAssembler combinará: {feature_cols}")


VectorAssembler combinará: ['departamento_vec', 'tipo_de_contrato_vec', 'estado_contrato_vec']


In [10]:
# PASO 4: Construir Pipeline
# Pipeline = secuencia de transformaciones
pipeline_stages = indexers + encoders + [assembler]

pipeline = Pipeline(stages=pipeline_stages)

print(f"\nPipeline con {len(pipeline_stages)} stages:")
for i, stage in enumerate(pipeline_stages):
    print(f"  Stage {i+1}: {type(stage).__name__}")


Pipeline con 7 stages:
  Stage 1: StringIndexer
  Stage 2: StringIndexer
  Stage 3: StringIndexer
  Stage 4: OneHotEncoder
  Stage 5: OneHotEncoder
  Stage 6: OneHotEncoder
  Stage 7: VectorAssembler


In [11]:
# PASO 5: Entrenar el pipeline (fit)
# Nota: StringIndexer y OneHotEncoder necesitan "aprender" del dataset
print("\nEntrenando pipeline...")
pipeline_model = pipeline.fit(df_clean)
print("Pipeline entrenado exitosamente")


Entrenando pipeline...


                                                                                

Pipeline entrenado exitosamente


In [12]:

# %%
# PASO 6: Aplicar transformaciones (transform)
df_transformed = pipeline_model.transform(df_clean)

print("\nTransformación completada")
print(f"Columnas después de transformar: {len(df_transformed.columns)}")



Transformación completada
Columnas después de transformar: 94


In [13]:
df_transformed.select("features_raw").printSchema()

root
 |-- features_raw: vector (nullable = true)



In [14]:
df_transformed.select("features_raw").show()

+--------------------+
|        features_raw|
+--------------------+
|(26,[0,1,20],[1.0...|
|(26,[0,1,20],[1.0...|
|(26,[0,1,20],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,20],[1.0...|
|(26,[0,5,23],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,20],[1.0...|
|(26,[0,1,20],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,2,20],[1.0...|
|(26,[0,1,20],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,21],[1.0...|
|(26,[0,1,19],[1.0...|
|(26,[0,1,19],[1.0...|
+--------------------+
only showing top 20 rows



                                                                                

In [15]:
# Ver dimensión del vector de features
sample_features = df_transformed.select("features_raw").first()[0]
print(f"Dimensión del vector de features: {len(sample_features)}")

# %%
# Mostrar ejemplo de transformación
df_transformed.select(
    available_cat[0] if available_cat else "id",
    available_cat[0] + "_idx" if available_cat else "id",
    available_cat[0] + "_vec" if available_cat else "id",
    "features_raw"
).show(5, truncate=True)

Dimensión del vector de features: 26
+--------------------+----------------+----------------+--------------------+
|        departamento|departamento_idx|departamento_vec|        features_raw|
+--------------------+----------------+----------------+--------------------+
|Distrito Capital ...|             0.0|   (1,[0],[1.0])|(26,[0,1,20],[1.0...|
|Distrito Capital ...|             0.0|   (1,[0],[1.0])|(26,[0,1,20],[1.0...|
|Distrito Capital ...|             0.0|   (1,[0],[1.0])|(26,[0,1,20],[1.0...|
|Distrito Capital ...|             0.0|   (1,[0],[1.0])|(26,[0,1,19],[1.0...|
|Distrito Capital ...|             0.0|   (1,[0],[1.0])|(26,[0,1,19],[1.0...|
+--------------------+----------------+----------------+--------------------+
only showing top 5 rows



In [17]:
# ==========================================================
# Guardar pipeline entrenado (sobrescribiendo si existe)
# ==========================================================

pipeline_path = "/opt/spark-data/processed/feature_pipeline"

pipeline_model.write() \
    .overwrite() \
    .save(pipeline_path)

print(f"Pipeline guardado correctamente en: {pipeline_path}")


# ==========================================================
# Guardar dataset transformado
# ==========================================================

output_path = "/opt/spark-data/processed/secop_features.parquet"

df_transformed.write \
    .mode("overwrite") \
    .parquet(output_path)

print(f"Dataset transformado guardado correctamente en: {output_path}")


                                                                                

Pipeline guardado correctamente en: /opt/spark-data/processed/feature_pipeline


                                                                                

Dataset transformado guardado correctamente en: /opt/spark-data/processed/secop_features.parquet
