## 1. Configuración del Entorno

Importamos las librerías necesarias para el análisis exploratorio y feature engineering.

In [None]:
# Importar librerías necesarias
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import seaborn as sns

# Configuración para visualizaciones
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("✓ Librerías importadas exitosamente")

## 2. Carga de Datos

Cargamos el dataset de energía mundial desde DBFS.

In [None]:
# Cargar datos con Spark
df_spark = spark.read.csv(
    "/FileStore/tables/owid-energy-data.csv",
    header=True,
    inferSchema=True
)

# Mostrar esquema
print("Esquema del DataFrame:")
df_spark.printSchema()

In [None]:
# Estadísticas básicas
display(df_spark.describe())

In [None]:
# Conteo de registros y columnas
print(f"Total de registros: {df_spark.count():,}")
print(f"Total de columnas: {len(df_spark.columns)}")

# Mostrar primeras filas
display(df_spark.limit(10))

## 3. Conversión a Pandas para EDA Detallado

Convertimos a Pandas para realizar análisis exploratorio más detallado.

In [None]:
# Convertir a Pandas para análisis exploratorio
# Nota: En producción, considerar usar solo una muestra si el dataset es muy grande
df_pandas = df_spark.toPandas()

# Información general
print("Información del DataFrame:")
print(df_pandas.info())
print("\n" + "="*50)
print("Primeras filas:")
display(df_pandas.head(10))

## 4. Análisis de Valores Faltantes

Identificamos y visualizamos valores faltantes en el dataset.

In [None]:
# Calcular porcentaje de valores nulos por columna
missing_data = pd.DataFrame({
    'column': df_pandas.columns,
    'missing_count': df_pandas.isnull().sum(),
    'missing_percent': (df_pandas.isnull().sum() / len(df_pandas)) * 100
})

missing_data = missing_data[missing_data['missing_count'] > 0].sort_values(
    'missing_percent', 
    ascending=False
)

print(f"Columnas con valores faltantes: {len(missing_data)}")
display(missing_data.head(20))

In [None]:
# Visualización de valores faltantes
plt.figure(figsize=(12, 6))
top_missing = missing_data.head(20)
plt.barh(top_missing['column'], top_missing['missing_percent'])
plt.xlabel('Porcentaje de Valores Faltantes (%)')
plt.title('Top 20 Columnas con Valores Faltantes')
plt.tight_layout()
plt.show()

## 5. Análisis de Variables Numéricas

Exploramos las distribuciones de las variables numéricas clave.

In [None]:
# Seleccionar columnas numéricas
numeric_cols = df_pandas.select_dtypes(include=[np.number]).columns.tolist()

# Estadísticas descriptivas detalladas
print(f"Variables numéricas: {len(numeric_cols)}")
display(df_pandas[numeric_cols].describe().T)

In [None]:
# Distribución de variables clave
key_features = [
    'gdp', 
    'population', 
    'primary_energy_consumption',
    'renewables_consumption',
    'fossil_fuel_consumption',
    'greenhouse_gas_emissions'
]

# Filtrar solo las que existen y tienen datos
key_features_available = [col for col in key_features if col in df_pandas.columns and df_pandas[col].notna().sum() > 0]

if len(key_features_available) == 0:
    print("⚠️ No hay features disponibles para visualizar")
else:
    print(f"Visualizando {len(key_features_available)} features: {key_features_available}")
    
    # Calcular número de filas y columnas para subplots
    n_features = len(key_features_available)
    n_cols = 2
    n_rows = (n_features + 1) // 2
    
    # Crear histogramas
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))
    
    # Asegurar que axes sea siempre un array 1D
    if n_features == 1:
        axes = [axes]
    else:
        axes = axes.ravel() if n_features > 2 else axes
    
    for idx, col in enumerate(key_features_available):
        if idx < len(axes):
            # Eliminar valores nulos y obtener datos
            data = df_pandas[col].dropna()
            
            if len(data) > 0:
                axes[idx].hist(data, bins=50, edgecolor='black', alpha=0.7, color='steelblue')
                axes[idx].set_title(f'Distribución de {col}', fontsize=12, fontweight='bold')
                axes[idx].set_xlabel(col, fontsize=10)
                axes[idx].set_ylabel('Frecuencia', fontsize=10)
                axes[idx].grid(True, alpha=0.3)
                
                # Agregar estadísticas
                mean_val = data.mean()
                median_val = data.median()
                axes[idx].axvline(mean_val, color='red', linestyle='--', linewidth=2, label=f'Media: {mean_val:.2e}')
                axes[idx].axvline(median_val, color='green', linestyle='--', linewidth=2, label=f'Mediana: {median_val:.2e}')
                axes[idx].legend(fontsize=8)
    
    # Ocultar ejes no utilizados
    for idx in range(len(key_features_available), len(axes)):
        fig.delaxes(axes[idx])
    
    plt.tight_layout()
    plt.show()

## 6. Análisis de Correlaciones

Calculamos y visualizamos las correlaciones entre variables de energía.

In [None]:
# Seleccionar subconjunto de variables para análisis de correlación
energy_features = [col for col in df_pandas.columns 
                   if any(keyword in col.lower() for keyword in 
                   ['energy', 'consumption', 'electricity', 'renewables', 'fossil'])]

# Agregar variables contextuales
correlation_cols = ['year', 'gdp', 'population'] + energy_features[:15]
correlation_cols = [col for col in correlation_cols if col in df_pandas.columns]

print(f"Analizando correlaciones para {len(correlation_cols)} variables")

# Calcular matriz de correlación
correlation_matrix = df_pandas[correlation_cols].corr()

In [None]:
# Visualizar matriz de correlación
if len(correlation_matrix.columns) > 0:
    # Determinar tamaño de figura basado en número de variables
    n_vars = len(correlation_matrix.columns)
    fig_size = max(12, min(20, n_vars * 0.8))
    
    plt.figure(figsize=(fig_size, fig_size * 0.9))
    
    # Configurar heatmap con manejo de valores nulos
    mask = correlation_matrix.isnull()
    
    sns.heatmap(
        correlation_matrix, 
        annot=n_vars <= 20,  # Solo anotar si hay 20 o menos variables
        fmt='.2f', 
        cmap='coolwarm',
        center=0,
        square=True,
        linewidths=0.5,
        cbar_kws={'label': 'Coeficiente de Correlación'},
        mask=mask,
        vmin=-1,
        vmax=1
    )
    
    plt.title('Matriz de Correlación - Variables de Energía', fontsize=16, fontweight='bold', pad=20)
    plt.xticks(rotation=45, ha='right', fontsize=10)
    plt.yticks(rotation=0, fontsize=10)
    plt.tight_layout()
    plt.show()
else:
    print("⚠️ No hay suficientes variables para crear la matriz de correlación")

In [None]:
# Identificar correlaciones fuertes
threshold = 0.7
high_corr = []
for i in range(len(correlation_matrix.columns)):
    for j in range(i+1, len(correlation_matrix.columns)):
        if abs(correlation_matrix.iloc[i, j]) > threshold:
            high_corr.append({
                'Feature 1': correlation_matrix.columns[i],
                'Feature 2': correlation_matrix.columns[j],
                'Correlation': correlation_matrix.iloc[i, j]
            })

print(f"\nCorrelaciones Fuertes (|r| > {threshold}):")
display(pd.DataFrame(high_corr).sort_values('Correlation', ascending=False))

## 7. Análisis Temporal y por País

Analizamos la evolución temporal del consumo energético en los principales países.

In [None]:
# Análisis de evolución temporal
if 'year' in df_pandas.columns and 'country' in df_pandas.columns:
    # Top 10 países por consumo energético reciente
    recent_year = df_pandas['year'].max()
    print(f"Año más reciente en el dataset: {recent_year}")
    
    top_countries = df_pandas[
        df_pandas['year'] == recent_year
    ].nlargest(10, 'primary_energy_consumption')['country'].tolist()
    
    print(f"Top 10 países por consumo energético en {recent_year}:")
    for idx, country in enumerate(top_countries, 1):
        print(f"{idx}. {country}")

In [None]:
# Evolución temporal de consumo energético
plt.figure(figsize=(14, 6))
for country in top_countries:
    country_data = df_pandas[df_pandas['country'] == country]
    plt.plot(
        country_data['year'], 
        country_data['primary_energy_consumption'],
        label=country,
        marker='o',
        markersize=3
    )

plt.xlabel('Año')
plt.ylabel('Consumo de Energía Primaria (TWh)')
plt.title('Evolución del Consumo de Energía - Top 10 Países')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 8. Feature Engineering con PySpark

Creamos variables derivadas usando PySpark para transformaciones escalables.

In [None]:
# Importar funciones necesarias
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Ratio de energías renovables vs total
df_features = df_spark.withColumn(
    'renewable_ratio',
    F.when(F.col('primary_energy_consumption') > 0,
           F.col('renewables_consumption') / F.col('primary_energy_consumption')
    ).otherwise(0)
)

# 2. Consumo per cápita
df_features = df_features.withColumn(
    'energy_per_capita',
    F.when(F.col('population') > 0,
           F.col('primary_energy_consumption') / F.col('population') * 1000000
    ).otherwise(0)
)

# 3. Intensidad energética (energía por unidad de PIB)
df_features = df_features.withColumn(
    'energy_intensity',
    F.when(F.col('gdp') > 0,
           F.col('primary_energy_consumption') / F.col('gdp')
    ).otherwise(0)
)

# 4. Índice de dependencia de combustibles fósiles
df_features = df_features.withColumn(
    'fossil_dependency_index',
    F.when(F.col('primary_energy_consumption') > 0,
           F.col('fossil_fuel_consumption') / F.col('primary_energy_consumption')
    ).otherwise(0)
)

print("✓ Features básicos creados")

In [None]:
# 5. Variación año a año (usando Window Functions)
windowSpec = Window.partitionBy('country').orderBy('year')

df_features = df_features.withColumn(
    'energy_yoy_change',
    F.col('primary_energy_consumption') - F.lag('primary_energy_consumption').over(windowSpec)
)

df_features = df_features.withColumn(
    'energy_yoy_pct_change',
    F.when(F.lag('primary_energy_consumption').over(windowSpec) > 0,
           ((F.col('primary_energy_consumption') - F.lag('primary_energy_consumption').over(windowSpec)) / 
            F.lag('primary_energy_consumption').over(windowSpec)) * 100
    ).otherwise(0)
)

# 6. Categorización de países por nivel de desarrollo energético
df_features = df_features.withColumn(
    'energy_development_level',
    F.when(F.col('energy_per_capita') >= 100, 'High')
     .when(F.col('energy_per_capita') >= 50, 'Medium')
     .when(F.col('energy_per_capita') > 0, 'Low')
     .otherwise('Unknown')
)

print("✓ Features avanzados creados (variaciones temporales y categorización)")

In [None]:
# Mostrar resultados
display(df_features.select(
    'country', 'year', 'population', 'gdp',
    'renewable_ratio', 'energy_per_capita', 'energy_intensity',
    'fossil_dependency_index', 'energy_development_level'
).orderBy(F.desc('year'), F.desc('energy_per_capita')).limit(20))

## 9. Imputación de Valores Faltantes

Aplicamos diferentes estrategias de imputación para manejar valores faltantes.

In [None]:
from pyspark.ml.feature import Imputer

# Estrategia 1: Imputación por media (para variables continuas)
imputer_mean = Imputer(
    inputCols=['gdp', 'population', 'primary_energy_consumption'],
    outputCols=['gdp_imputed', 'population_imputed', 'energy_imputed']
).setStrategy('mean')

# Estrategia 2: Imputación por mediana (más robusta a outliers)
imputer_median = Imputer(
    inputCols=['renewable_ratio', 'energy_per_capita'],
    outputCols=['renewable_ratio_imputed', 'energy_per_capita_imputed']
).setStrategy('median')

# Aplicar imputación
df_imputed = imputer_mean.fit(df_features).transform(df_features)
df_imputed = imputer_median.fit(df_imputed).transform(df_imputed)

print("✓ Imputación por media y mediana completada")

In [None]:
# Estrategia 3: Forward fill por país y año (para series temporales)
windowSpec = Window.partitionBy('country').orderBy('year').rowsBetween(Window.unboundedPreceding, 0)

df_imputed = df_imputed.withColumn(
    'gdp_filled',
    F.last('gdp', ignorenulls=True).over(windowSpec)
)

print("✓ Forward fill completado")

# Comparar estrategias de imputación
display(df_imputed.select('country', 'year', 'gdp', 'gdp_imputed', 'gdp_filled').limit(20))

## 10. Encoding de Variables Categóricas

Convertimos variables categóricas en representaciones numéricas.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# String Indexer para 'country'
country_indexer = StringIndexer(
    inputCol='country',
    outputCol='country_index',
    handleInvalid='keep'
)

df_encoded = country_indexer.fit(df_imputed).transform(df_imputed)

print("✓ String Indexer aplicado a 'country'")

In [None]:
# One-Hot Encoding para nivel de desarrollo
level_indexer = StringIndexer(
    inputCol='energy_development_level',
    outputCol='development_level_index',
    handleInvalid='keep'
)

df_encoded = level_indexer.fit(df_encoded).transform(df_encoded)

encoder = OneHotEncoder(
    inputCols=['development_level_index'],
    outputCols=['development_level_vec']
)

df_encoded = encoder.fit(df_encoded).transform(df_encoded)

print("✓ One-Hot Encoding aplicado a 'energy_development_level'")

display(df_encoded.select(
    'country', 'country_index', 
    'energy_development_level', 'development_level_index', 'development_level_vec'
).limit(10))

## 11. Escalado y Normalización

Aplicamos escalado estándar y normalización min-max a las features.

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler

# Seleccionar features para escalado
features_to_scale = [
    'energy_per_capita_imputed',
    'renewable_ratio_imputed',
    'energy_intensity',
    'fossil_dependency_index'
]

# Ensamblar features en un vector
assembler = VectorAssembler(
    inputCols=features_to_scale,
    outputCol='features_raw'
)

df_assembled = assembler.transform(df_encoded)

print("✓ Features ensamblados en un vector")

In [None]:
# StandardScaler (Z-score normalization: media=0, std=1)
standard_scaler = StandardScaler(
    inputCol='features_raw',
    outputCol='features_standardized',
    withMean=True,
    withStd=True
)

scaler_model = standard_scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

print("✓ StandardScaler aplicado")

In [None]:
# MinMaxScaler (escalado a rango [0,1])
minmax_scaler = MinMaxScaler(
    inputCol='features_raw',
    outputCol='features_normalized'
)

minmax_model = minmax_scaler.fit(df_assembled)
df_scaled = minmax_model.transform(df_scaled)

print("✓ MinMaxScaler aplicado")

display(df_scaled.select(
    'country', 'year', 
    'features_raw', 'features_standardized', 'features_normalized'
).limit(10))

## 12. Selección de Features

Analizamos correlaciones y filtramos features con baja varianza.

In [None]:
from pyspark.ml.stat import Correlation

# Correlación de Pearson
correlation_matrix = Correlation.corr(df_scaled, 'features_raw', 'pearson')
print("Matriz de Correlación:")
correlation_matrix.show(truncate=False)

In [None]:
from pyspark.ml.feature import VarianceThresholdSelector

# Filtrar features con baja varianza
variance_selector = VarianceThresholdSelector(
    featuresCol='features_standardized',
    outputCol='features_selected',
    varianceThreshold=0.1
)

# Aplicar selector
variance_model = variance_selector.fit(df_scaled)
df_selected = variance_model.transform(df_scaled)

print(f"Features originales: {len(features_to_scale)}")
print(f"Features después de filtrar por varianza: {len(variance_model.selectedFeatures)}")
print(f"Features seleccionados: {variance_model.selectedFeatures}")

## 13. Creación de Dataset Final para Modelado

Seleccionamos las features finales y preparamos el dataset para machine learning.

In [None]:
# Seleccionar features finales
final_features = [
    'country', 'year', 
    'renewable_ratio_imputed',
    'energy_per_capita_imputed',
    'energy_intensity',
    'fossil_dependency_index',
    'energy_yoy_pct_change',
    'country_index',
    'development_level_index',
    'features_standardized'
]

# Crear dataset final
df_final = df_scaled.select(final_features)

# Filtrar registros completos
df_final = df_final.na.drop(subset=['renewable_ratio_imputed', 'energy_per_capita_imputed'])

# Estadísticas del dataset final
print(f"Registros finales: {df_final.count():,}")
print(f"Features: {len(final_features)}")

display(df_final.limit(20))

## 14. Persistencia en Delta Lake

Guardamos los features procesados en Delta Lake para su uso futuro.

In [None]:
# Definir ruta para Delta Table
delta_path = "/delta/energy_features"

# Guardar como Delta Table con particionamiento
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year") \
    .option("overwriteSchema", "true") \
    .save(delta_path)

print(f"✓ Features guardados en Delta Lake: {delta_path}")

In [None]:
# Crear tabla en el metastore
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year") \
    .saveAsTable("energy_features_table")

print("✓ Tabla 'energy_features_table' creada exitosamente")

## 15. Verificación de Delta Table

Verificamos que los datos se guardaron correctamente y exploramos el historial de versiones.

In [None]:
# Leer desde Delta Table
df_from_delta = spark.read.format("delta").load(delta_path)

print("Esquema de la tabla Delta:")
df_from_delta.printSchema()

print(f"\nRegistros en Delta Table: {df_from_delta.count():,}")

In [None]:
# Contar registros por año
print("Registros por año:")
display(df_from_delta.groupBy("year").count().orderBy("year"))

In [None]:
# Verificar historial de versiones
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, delta_path)
print("Historial de versiones de la tabla Delta:")
display(delta_table.history())

## 16. Control de Data Drift

Analizamos cambios en la distribución de datos a lo largo del tiempo.

In [None]:
# Calcular estadísticas por período
df_with_period = df_final.withColumn(
    'period',
    F.when(F.col('year') < 2000, 'pre_2000')
     .when(F.col('year') < 2010, '2000-2009')
     .when(F.col('year') < 2020, '2010-2019')
     .otherwise('2020+')
)

# Comparar distribuciones por período
drift_analysis = df_with_period.groupBy('period').agg(
    F.mean('energy_per_capita_imputed').alias('avg_energy_per_capita'),
    F.stddev('energy_per_capita_imputed').alias('std_energy_per_capita'),
    F.mean('renewable_ratio_imputed').alias('avg_renewable_ratio'),
    F.stddev('renewable_ratio_imputed').alias('std_renewable_ratio'),
    F.count('*').alias('record_count')
).orderBy('period')

print("Análisis de Drift por Período:")
display(drift_analysis)

## 17. Validación de Calidad de Datos

Generamos un reporte de calidad de los features procesados.

In [None]:
from pyspark.sql.functions import col, isnan, when, count

# Función de validación de calidad
def data_quality_check(df, columns_to_check):
    """
    Genera reporte de calidad de datos
    """
    quality_metrics = []
    
    for column in columns_to_check:
        # Conteo de valores nulos
        null_count = df.filter(col(column).isNull()).count()
        
        # Conteo de valores negativos (si es numérico)
        negative_count = df.filter(col(column) < 0).count()
        
        # Valores únicos
        distinct_count = df.select(column).distinct().count()
        
        quality_metrics.append({
            'column': column,
            'null_count': null_count,
            'null_percentage': (null_count / df.count()) * 100,
            'negative_count': negative_count,
            'distinct_values': distinct_count
        })
    
    return spark.createDataFrame(quality_metrics)

# Aplicar validación
columns_to_validate = [
    'renewable_ratio_imputed',
    'energy_per_capita_imputed',
    'energy_intensity',
    'fossil_dependency_index'
]

quality_report = data_quality_check(df_final, columns_to_validate)
print("Reporte de Calidad de Datos:")
display(quality_report)

## 18. Documentación de Features

Creamos un diccionario de features para documentar las transformaciones.

In [None]:
import json

# Crear diccionario de features
feature_dictionary = {
    'renewable_ratio': {
        'description': 'Ratio de energías renovables vs consumo total de energía',
        'formula': 'renewables_consumption / primary_energy_consumption',
        'range': '[0, 1]',
        'type': 'numeric - continuous',
        'imputation': 'median'
    },
    'energy_per_capita': {
        'description': 'Consumo de energía por persona en kWh',
        'formula': '(primary_energy_consumption / population) * 1000000',
        'range': '[0, +inf]',
        'type': 'numeric - continuous',
        'imputation': 'median'
    },
    'energy_intensity': {
        'description': 'Consumo de energía por unidad de PIB',
        'formula': 'primary_energy_consumption / gdp',
        'range': '[0, +inf]',
        'type': 'numeric - continuous',
        'imputation': 'mean'
    },
    'fossil_dependency_index': {
        'description': 'Índice de dependencia de combustibles fósiles',
        'formula': 'fossil_fuel_consumption / primary_energy_consumption',
        'range': '[0, 1]',
        'type': 'numeric - continuous',
        'imputation': 'mean'
    },
    'energy_development_level': {
        'description': 'Nivel de desarrollo energético del país',
        'formula': 'Categorización basada en energy_per_capita',
        'categories': ['Low', 'Medium', 'High', 'Unknown'],
        'type': 'categorical',
        'encoding': 'one-hot'
    }
}

# Convertir a JSON y mostrar
feature_dict_json = json.dumps(feature_dictionary, indent=2)
print("Diccionario de Features:")
print(feature_dict_json)

In [None]:
# Guardar en DBFS
dbutils.fs.put(
    "/FileStore/energy_features_dictionary.json",
    feature_dict_json,
    overwrite=True
)

print("✓ Diccionario de features guardado en DBFS")

## 19. Pipeline Completo Reproducible

Creamos una función que encapsula todo el pipeline de feature engineering.

In [None]:
def energy_feature_pipeline(input_path, output_path, imputation_strategy='median'):
    """
    Pipeline completo de feature engineering para datos de energía
    
    Args:
        input_path: Ruta al archivo CSV de entrada
        output_path: Ruta de salida para Delta Table
        imputation_strategy: Estrategia de imputación ('mean', 'median')
    
    Returns:
        DataFrame con features procesados
    """
    from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, StandardScaler
    
    # 1. Carga de datos
    df = spark.read.csv(input_path, header=True, inferSchema=True)
    print(f"✓ Datos cargados: {df.count():,} registros")
    
    # 2. Feature Engineering
    df = df.withColumn(
        'renewable_ratio',
        F.when(F.col('primary_energy_consumption') > 0,
               F.col('renewables_consumption') / F.col('primary_energy_consumption')
        ).otherwise(0)
    )
    
    df = df.withColumn(
        'energy_per_capita',
        F.when(F.col('population') > 0,
               F.col('primary_energy_consumption') / F.col('population') * 1000000
        ).otherwise(0)
    )
    
    df = df.withColumn(
        'energy_intensity',
        F.when(F.col('gdp') > 0,
               F.col('primary_energy_consumption') / F.col('gdp')
        ).otherwise(0)
    )
    
    df = df.withColumn(
        'fossil_dependency_index',
        F.when(F.col('primary_energy_consumption') > 0,
               F.col('fossil_fuel_consumption') / F.col('primary_energy_consumption')
        ).otherwise(0)
    )
    
    print("✓ Features derivados creados")
    
    # 3. Imputación
    imputer = Imputer(
        inputCols=['renewable_ratio', 'energy_per_capita', 'energy_intensity'],
        outputCols=['renewable_ratio_imputed', 'energy_per_capita_imputed', 'energy_intensity_imputed']
    ).setStrategy(imputation_strategy)
    
    df = imputer.fit(df).transform(df)
    print(f"✓ Imputación completada (estrategia: {imputation_strategy})")
    
    # 4. Encoding
    indexer = StringIndexer(inputCol='country', outputCol='country_index', handleInvalid='keep')
    df = indexer.fit(df).transform(df)
    print("✓ Encoding completado")
    
    # 5. Escalado
    assembler = VectorAssembler(
        inputCols=['renewable_ratio_imputed', 'energy_per_capita_imputed', 'energy_intensity_imputed'],
        outputCol='features_raw'
    )
    df = assembler.transform(df)
    
    scaler = StandardScaler(inputCol='features_raw', outputCol='features_scaled', withMean=True, withStd=True)
    df = scaler.fit(df).transform(df)
    print("✓ Escalado completado")
    
    # 6. Persistencia
    df.write.format("delta").mode("overwrite").partitionBy("year").save(output_path)
    print(f"✓ Datos guardados en: {output_path}")
    
    return df

print("✓ Función de pipeline definida")

In [None]:
# Ejecutar pipeline
result_df = energy_feature_pipeline(
    input_path="/FileStore/tables/owid-energy-data.csv",
    output_path="/delta/energy_features_pipeline",
    imputation_strategy='median'
)

print("\n" + "="*50)
print("PIPELINE COMPLETADO EXITOSAMENTE")
print("="*50)

## 20. Reporte Final de Calidad

Generamos un reporte final con métricas de calidad del dataset procesado.

In [None]:
def generate_quality_report(df):
    """Genera reporte de calidad de features"""
    report = {
        'total_records': df.count(),
        'total_features': len(df.columns),
        'numeric_features': len([f for f in df.schema.fields 
                                if isinstance(f.dataType, (DoubleType, FloatType))]),
        'categorical_features': len([f for f in df.schema.fields 
                                    if isinstance(f.dataType, StringType)])
    }
    return report

quality_metrics = generate_quality_report(df_final)
print("Reporte de Calidad Final:")
print(json.dumps(quality_metrics, indent=2))

## Conclusión

¡Felicitaciones! Has completado exitosamente el laboratorio de Feature Engineering y Exploración de Datos.

### Habilidades Adquiridas:

✅ Análisis exploratorio completo con Pandas y Spark  
✅ Creación de features derivados con significado de negocio  
✅ Implementación de pipelines reproducibles de transformación  
✅ Manejo de valores faltantes con diferentes estrategias  
✅ Encoding y escalado de features  
✅ Persistencia en Delta Lake con particionamiento  
✅ Validación de calidad y detección de drift  

### Próximos Pasos:

- Construcción de modelos de ML con los features creados
- Integración con MLflow para tracking de experimentos
- Deployment de modelos en producción
- Monitoreo de performance y drift en producción

**¡Excelente trabajo!** Estos features están listos para ser utilizados en modelos de machine learning.