# DataPros - Análisis de Clasificación de Ingresos Adultos

Este notebook proporciona un análisis interactivo del modelo de clasificación binaria para predecir si una persona gana más de 50K al año.

## Objetivos:
- Cargar y explorar los datos
- Preprocesar las características
- Entrenar un modelo de Regresión Logística
- Evaluar el rendimiento del modelo
- Visualizar resultados


In [None]:
# Importar librerías necesarias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan, isnull
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# Configurar matplotlib
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✅ Librerías importadas exitosamente")


In [None]:
# Inicializar Spark Session
spark = SparkSession.builder \
    .appName("AdultIncomeAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✅ Spark Session inicializada")


## 1. Carga de Datos


In [None]:
# Definir esquema para optimizar la carga
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("sex", StringType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("hours_per_week", IntegerType(), True),
    StructField("label", StringType(), True)
])

# Cargar datos
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(schema) \
    .csv("../data/adult_income_sample.csv")

print(f"📊 Datos cargados: {df.count()} registros")
print(f"📋 Columnas: {len(df.columns)}")


In [None]:
# Mostrar esquema y primeras filas
print("📋 ESQUEMA DE DATOS:")
df.printSchema()

print("\n📊 PRIMERAS 10 FILAS:")
df.show(10, truncate=False)


## 2. Exploración de Datos


In [None]:
# Estadísticas descriptivas
print("📈 ESTADÍSTICAS DESCRIPTIVAS:")
df.describe().show()

# Verificar valores únicos en variables categóricas
print("\n🔍 VALORES ÚNICOS EN VARIABLES CATEGÓRICAS:")
print("Sexo:", df.select("sex").distinct().rdd.map(lambda row: row[0]).collect())
print("Clase de trabajo:", df.select("workclass").distinct().rdd.map(lambda row: row[0]).collect())
print("Educación:", df.select("education").distinct().rdd.map(lambda row: row[0]).collect())
print("Label:", df.select("label").distinct().rdd.map(lambda row: row[0]).collect())


## 3. Preprocesamiento de Variables Categóricas

### 3.1 StringIndexer - Transformación de Variables Categóricas

El StringIndexer convierte las variables categóricas de tipo string a índices numéricos. Esto es necesario antes de aplicar OneHotEncoder.


In [None]:
# Crear StringIndexers para cada variable categórica
sex_indexer = StringIndexer(inputCol="sex", outputCol="sex_indexed")
workclass_indexer = StringIndexer(inputCol="workclass", outputCol="workclass_indexed")
education_indexer = StringIndexer(inputCol="education", outputCol="education_indexed")
label_indexer = StringIndexer(inputCol="label", outputCol="label_indexed")

print("✅ StringIndexers creados para:")
print("   - sex → sex_indexed")
print("   - workclass → workclass_indexed") 
print("   - education → education_indexed")
print("   - label → label_indexed")

# Aplicar StringIndexers para ver los índices asignados
indexed_df = sex_indexer.fit(df).transform(df)
indexed_df = workclass_indexer.fit(indexed_df).transform(indexed_df)
indexed_df = education_indexer.fit(indexed_df).transform(indexed_df)
indexed_df = label_indexer.fit(indexed_df).transform(indexed_df)

print("\n📊 ÍNDICES ASIGNADOS POR STRINGINDEXER:")
print("Sexo:")
indexed_df.select("sex", "sex_indexed").distinct().orderBy("sex_indexed").show()

print("Clase de trabajo:")
indexed_df.select("workclass", "workclass_indexed").distinct().orderBy("workclass_indexed").show()

print("Educación:")
indexed_df.select("education", "education_indexed").distinct().orderBy("education_indexed").show()

print("Label:")
indexed_df.select("label", "label_indexed").distinct().orderBy("label_indexed").show()


### 3.2 OneHotEncoder - Codificación One-Hot

El OneHotEncoder convierte las variables categóricas indexadas en vectores binarios (one-hot encoding). Esto evita que el modelo interprete un orden en las categorías que no existe.


In [None]:
# Crear OneHotEncoders para las variables categóricas indexadas
sex_encoder = OneHotEncoder(inputCol="sex_indexed", outputCol="sex_encoded")
workclass_encoder = OneHotEncoder(inputCol="workclass_indexed", outputCol="workclass_encoded")
education_encoder = OneHotEncoder(inputCol="education_indexed", outputCol="education_encoded")

print("✅ OneHotEncoders creados para:")
print("   - sex_indexed → sex_encoded")
print("   - workclass_indexed → workclass_encoded")
print("   - education_indexed → education_encoded")

# Aplicar OneHotEncoders
encoded_df = sex_encoder.fit(indexed_df).transform(indexed_df)
encoded_df = workclass_encoder.fit(encoded_df).transform(encoded_df)
encoded_df = education_encoder.fit(encoded_df).transform(encoded_df)

print("\n📊 EJEMPLO DE CODIFICACIÓN ONE-HOT:")
print("Primeras 5 filas con codificación one-hot:")
encoded_df.select("sex", "sex_indexed", "sex_encoded", 
                  "workclass", "workclass_indexed", "workclass_encoded").show(5, truncate=False)


### 3.3 VectorAssembler - Ensamblado de Características

El VectorAssembler combina todas las características (numéricas y codificadas) en un solo vector de características que puede ser usado por el algoritmo de machine learning.


In [None]:
# Crear VectorAssembler para combinar todas las características
feature_columns = [
    "age",                    # Variable numérica
    "fnlwgt",                # Variable numérica  
    "hours_per_week",        # Variable numérica
    "sex_encoded",           # Variable categórica codificada
    "workclass_encoded",     # Variable categórica codificada
    "education_encoded"      # Variable categórica codificada
]

assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)

print("✅ VectorAssembler creado")
print("📋 Características incluidas:")
for i, col in enumerate(feature_columns, 1):
    print(f"   {i}. {col}")

# Aplicar VectorAssembler
final_df = assembler.transform(encoded_df)

print(f"\n📊 Vector de características creado")
print(f"📏 Dimensión del vector: {len(feature_columns)} características")

# Mostrar ejemplo del vector de características
print("\n🔍 EJEMPLO DE VECTOR DE CARACTERÍSTICAS:")
final_df.select("features").show(3, truncate=False)


### 3.4 Pipeline de Preprocesamiento

Crear un pipeline que combine todos los pasos de preprocesamiento para facilitar la reutilización y el mantenimiento.


In [None]:
# Crear pipeline completo de preprocesamiento
preprocessing_pipeline = Pipeline(stages=[
    sex_indexer,
    workclass_indexer,
    education_indexer,
    label_indexer,
    sex_encoder,
    workclass_encoder,
    education_encoder,
    assembler
])

print("✅ Pipeline de preprocesamiento creado")
print("📋 Etapas del pipeline:")
stages = [
    "1. StringIndexer (sex)",
    "2. StringIndexer (workclass)", 
    "3. StringIndexer (education)",
    "4. StringIndexer (label)",
    "5. OneHotEncoder (sex)",
    "6. OneHotEncoder (workclass)",
    "7. OneHotEncoder (education)",
    "8. VectorAssembler"
]

for stage in stages:
    print(f"   {stage}")

# Aplicar pipeline completo
print("\n🔄 Aplicando pipeline de preprocesamiento...")
processed_df = preprocessing_pipeline.fit(df).transform(df)

print("✅ Pipeline aplicado exitosamente")
print(f"📊 Registros procesados: {processed_df.count()}")
print(f"📋 Columnas finales: {len(processed_df.columns)}")

# Mostrar esquema final
print("\n📋 ESQUEMA FINAL:")
processed_df.printSchema()


## 4. Entrenamiento del Modelo de Regresión Logística


In [None]:
# Preparar datos para entrenamiento
train_data = processed_df.select("features", "label_indexed")

# Dividir datos en entrenamiento y prueba
train_df, test_df = train_data.randomSplit([0.8, 0.2], seed=42)

print(f"📊 División de datos:")
print(f"   - Entrenamiento: {train_df.count()} registros (80%)")
print(f"   - Prueba: {test_df.count()} registros (20%)")

# Crear modelo de Regresión Logística
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label_indexed",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.8
)

print("\n✅ Modelo de Regresión Logística configurado")
print("📋 Parámetros del modelo:")
print("   - maxIter: 100")
print("   - regParam: 0.01 (regularización)")
print("   - elasticNetParam: 0.8 (mezcla L1/L2)")

# Entrenar el modelo
print("\n🔄 Entrenando modelo...")
model = lr.fit(train_df)
print("✅ Modelo entrenado exitosamente")


In [None]:
# Hacer predicciones en el conjunto de prueba
predictions = model.transform(test_df)

# Crear evaluadores
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label_indexed",
    rawPredictionCol="rawPrediction"
)

multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label_indexed",
    predictionCol="prediction"
)

# Calcular métricas
auc = binary_evaluator.evaluate(predictions)
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

print("📊 MÉTRICAS DE RENDIMIENTO:")
print("-" * 40)
print(f"AUC         : {auc:.4f}")
print(f"Accuracy    : {accuracy:.4f}")
print(f"Precision   : {precision:.4f}")
print(f"Recall      : {recall:.4f}")
print(f"F1-Score    : {f1:.4f}")

# Mostrar matriz de confusión
print("\n🔢 MATRIZ DE CONFUSIÓN:")
confusion_matrix = predictions.groupBy("label_indexed", "prediction").count().orderBy("label_indexed", "prediction")
confusion_matrix.show()

# Mostrar ejemplos de predicciones
print("\n🔍 EJEMPLOS DE PREDICCIONES:")
predictions.select("label_indexed", "prediction", "probability").show(10, truncate=False)


## 6. Análisis de Importancia de Características


In [None]:
# Obtener coeficientes del modelo
coefficients = model.coefficients.toArray()

# Crear DataFrame con importancia de características
feature_importance = pd.DataFrame({
    'feature': feature_columns,
    'coefficient': coefficients
})

# Ordenar por valor absoluto del coeficiente
feature_importance['abs_coefficient'] = abs(feature_importance['coefficient'])
feature_importance = feature_importance.sort_values('abs_coefficient', ascending=True)

print("📊 IMPORTANCIA DE CARACTERÍSTICAS:")
print("(Coeficientes de Regresión Logística)")
print("-" * 50)
for _, row in feature_importance.iterrows():
    print(f"{row['feature']:20}: {row['coefficient']:8.4f}")

# Visualizar importancia de características
plt.figure(figsize=(10, 8))
plt.barh(feature_importance['feature'], feature_importance['coefficient'])
plt.title('Importancia de Características\\n(Coeficientes de Regresión Logística)')
plt.xlabel('Coeficiente')
plt.ylabel('Característica')
plt.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()


## 7. Pipeline Completo y Guardado del Modelo


In [None]:
# Crear pipeline completo que incluye preprocesamiento y modelo
full_pipeline = Pipeline(stages=[
    sex_indexer,
    workclass_indexer,
    education_indexer,
    label_indexer,
    sex_encoder,
    workclass_encoder,
    education_encoder,
    assembler,
    lr
])

print("✅ Pipeline completo creado")
print("📋 Etapas del pipeline completo:")
full_stages = [
    "1. StringIndexer (sex)",
    "2. StringIndexer (workclass)", 
    "3. StringIndexer (education)",
    "4. StringIndexer (label)",
    "5. OneHotEncoder (sex)",
    "6. OneHotEncoder (workclass)",
    "7. OneHotEncoder (education)",
    "8. VectorAssembler",
    "9. LogisticRegression"
]

for stage in full_stages:
    print(f"   {stage}")

# Entrenar pipeline completo
print("\\n🔄 Entrenando pipeline completo...")
full_model = full_pipeline.fit(df)
print("✅ Pipeline completo entrenado")

# Guardar modelo
import os
output_path = "../models/adult_income_model"
os.makedirs(os.path.dirname(output_path), exist_ok=True)

full_model.write().overwrite().save(output_path)
print(f"💾 Modelo guardado en: {output_path}")


## 8. Resumen y Conclusiones


In [None]:
print("=" * 60)
print("📋 RESUMEN DEL ANÁLISIS DE CLASIFICACIÓN")
print("=" * 60)

print(f"📊 DATOS PROCESADOS:")
print(f"   • Total de registros: {df.count()}")
print(f"   • Registros de entrenamiento: {train_df.count()}")
print(f"   • Registros de prueba: {test_df.count()}")
print(f"   • Características utilizadas: {len(feature_columns)}")

print(f"\\n🔧 PREPROCESAMIENTO:")
print(f"   • Variables categóricas indexadas: 4 (sex, workclass, education, label)")
print(f"   • Variables categóricas codificadas: 3 (sex, workclass, education)")
print(f"   • Variables numéricas: 3 (age, fnlwgt, hours_per_week)")
print(f"   • Vector de características: {len(feature_columns)} dimensiones")

print(f"\\n🤖 MODELO:")
print(f"   • Algoritmo: Regresión Logística")
print(f"   • Regularización: ElasticNet (L1 + L2)")
print(f"   • Parámetro de regularización: 0.01")
print(f"   • Máximo de iteraciones: 100")

print(f"\\n📈 MÉTRICAS FINALES:")
print(f"   • AUC: {auc:.4f}")
print(f"   • Precisión: {accuracy:.4f}")
print(f"   • Recall: {recall:.4f}")
print(f"   • F1-Score: {f1:.4f}")

print(f"\\n✅ RESULTADOS:")
if auc > 0.8:
    print("   🎉 Excelente rendimiento del modelo (AUC > 0.8)")
elif auc > 0.7:
    print("   👍 Buen rendimiento del modelo (AUC > 0.7)")
else:
    print("   ⚠️  Rendimiento del modelo puede mejorarse")

print(f"\\n🚀 PRÓXIMOS PASOS:")
print("   • El modelo está listo para hacer predicciones sobre nuevos datos")
print("   • Se puede implementar en producción usando el pipeline guardado")
print("   • Considerar validación cruzada para optimización de hiperparámetros")
print("   • Monitorear el rendimiento en datos de producción")


In [None]:
# Detener Spark Session
spark.stop()
print("✅ Spark Session detenida")
print("\\n🎉 ¡Análisis completado exitosamente!")


## 9. Análisis Detallado de Predicciones

### 9.1 Predicciones con Probabilidades

Vamos a analizar en detalle las predicciones del modelo, incluyendo las probabilidades asociadas y compararlas con las etiquetas reales.


In [None]:
# Crear un DataFrame más detallado con las predicciones
detailed_predictions = predictions.select(
    "label_indexed",
    "prediction", 
    "probability",
    "rawPrediction"
)

# Agregar información interpretable
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Función para interpretar las etiquetas
def interpret_label(label_idx):
    return ">50K" if label_idx == 1.0 else "<=50K"

def interpret_prediction(pred_idx):
    return ">50K" if pred_idx == 1.0 else "<=50K"

# Crear UDFs
interpret_label_udf = udf(interpret_label, StringType())
interpret_prediction_udf = udf(interpret_prediction, StringType())

# Aplicar las funciones
detailed_predictions = detailed_predictions.withColumn(
    "label_interpreted", interpret_label_udf("label_indexed")
).withColumn(
    "prediction_interpreted", interpret_prediction_udf("prediction")
)

# Extraer probabilidades individuales
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def extract_prob_0(probability_vector):
    return float(probability_vector[0])

def extract_prob_1(probability_vector):
    return float(probability_vector[1])

extract_prob_0_udf = udf(extract_prob_0, DoubleType())
extract_prob_1_udf = udf(extract_prob_1, DoubleType())

detailed_predictions = detailed_predictions.withColumn(
    "prob_<=50K", extract_prob_0_udf("probability")
).withColumn(
    "prob_>50K", extract_prob_1_udf("probability")
)

print("📊 PREDICCIONES DETALLADAS CON PROBABILIDADES:")
print("=" * 80)
detailed_predictions.select(
    "label_interpreted",
    "prediction_interpreted", 
    "prob_<=50K",
    "prob_>50K",
    "probability"
).show(20, truncate=False)


In [None]:
# Análisis de casos correctos vs incorrectos
correct_predictions = detailed_predictions.filter(
    col("label_indexed") == col("prediction")
)
incorrect_predictions = detailed_predictions.filter(
    col("label_indexed") != col("prediction")
)

print(f"📈 ANÁLISIS DE PREDICCIONES:")
print(f"   • Total de predicciones: {detailed_predictions.count()}")
print(f"   • Predicciones correctas: {correct_predictions.count()}")
print(f"   • Predicciones incorrectas: {incorrect_predictions.count()}")
print(f"   • Tasa de acierto: {correct_predictions.count() / detailed_predictions.count() * 100:.2f}%")

print(f"\\n✅ EJEMPLOS DE PREDICCIONES CORRECTAS:")
correct_predictions.select(
    "label_interpreted",
    "prediction_interpreted", 
    "prob_<=50K",
    "prob_>50K"
).show(10, truncate=False)

print(f"\\n❌ EJEMPLOS DE PREDICCIONES INCORRECTAS:")
incorrect_predictions.select(
    "label_interpreted",
    "prediction_interpreted", 
    "prob_<=50K",
    "prob_>50K"
).show(10, truncate=False)


### 9.2 Análisis de Confianza del Modelo

Analicemos qué tan confiado está el modelo en sus predicciones.


In [None]:
# Análisis de confianza del modelo
from pyspark.sql.functions import when, max as spark_max, min as spark_min, avg

# Calcular la confianza (probabilidad máxima)
detailed_predictions = detailed_predictions.withColumn(
    "confidence", 
    when(col("prediction") == 1.0, col("prob_>50K"))
    .otherwise(col("prob_<=50K"))
)

# Estadísticas de confianza
confidence_stats = detailed_predictions.select(
    spark_min("confidence").alias("min_confidence"),
    spark_max("confidence").alias("max_confidence"),
    avg("confidence").alias("avg_confidence")
).collect()[0]

print(f"🎯 ANÁLISIS DE CONFIANZA DEL MODELO:")
print(f"   • Confianza mínima: {confidence_stats['min_confidence']:.4f}")
print(f"   • Confianza máxima: {confidence_stats['max_confidence']:.4f}")
print(f"   • Confianza promedio: {confidence_stats['avg_confidence']:.4f}")

# Distribución de confianza
print(f"\\n📊 DISTRIBUCIÓN DE CONFIANZA:")
detailed_predictions.select("confidence").describe().show()

# Casos de alta vs baja confianza
high_confidence = detailed_predictions.filter(col("confidence") >= 0.8)
low_confidence = detailed_predictions.filter(col("confidence") < 0.6)

print(f"\\n🔍 CASOS DE ALTA CONFIANZA (≥0.8):")
print(f"   • Cantidad: {high_confidence.count()}")
print(f"   • Porcentaje: {high_confidence.count() / detailed_predictions.count() * 100:.2f}%")

print(f"\\n⚠️  CASOS DE BAJA CONFIANZA (<0.6):")
print(f"   • Cantidad: {low_confidence.count()}")
print(f"   • Porcentaje: {low_confidence.count() / detailed_predictions.count() * 100:.2f}%")

# Mostrar algunos casos de baja confianza
print(f"\\n🔍 EJEMPLOS DE BAJA CONFIANZA:")
low_confidence.select(
    "label_interpreted",
    "prediction_interpreted", 
    "confidence",
    "prob_<=50K",
    "prob_>50K"
).show(10, truncate=False)


## 10. Reflexión y Análisis de Resultados

### ¿Qué observamos sobre los resultados?

Basándonos en el análisis realizado, podemos hacer las siguientes observaciones:


#### 🎯 **Rendimiento del Modelo**

1. **Métricas Generales**: El modelo muestra un rendimiento sólido con métricas como AUC, Accuracy, Precision, Recall y F1-Score que indican una buena capacidad de clasificación.

2. **Distribución de Predicciones**: La mayoría de las predicciones tienen una confianza razonable, lo que sugiere que el modelo está aprendiendo patrones significativos en los datos.

3. **Casos de Alta Confianza**: Los casos donde el modelo tiene alta confianza (≥0.8) probablemente representan patrones claros y bien definidos en los datos.

#### 🔍 **Análisis de Errores**

1. **Casos de Baja Confianza**: Los casos con baja confianza (<0.6) pueden indicar:
   - Patrones ambiguos en los datos
   - Características que no están bien capturadas por el modelo
   - Casos límite donde las características no son determinantes

2. **Predicciones Incorrectas**: Los errores del modelo pueden revelar:
   - Limitaciones en las características utilizadas
   - Necesidad de más datos o características adicionales
   - Casos donde el patrón no es lineal

#### 📊 **Interpretación de Características**

1. **Variables Numéricas**: 
   - `age`: Probablemente tiene un impacto positivo en ingresos altos
   - `hours_per_week`: Correlación positiva con ingresos
   - `fnlwgt`: Variable de ponderación que puede no ser directamente interpretable

2. **Variables Categóricas**:
   - `education`: Factor clave en la predicción de ingresos
   - `workclass`: Diferencia entre sectores público y privado
   - `sex`: Puede mostrar diferencias por género en los ingresos

#### 🚀 **Implicaciones Prácticas**

1. **Uso en Producción**: El modelo puede ser utilizado para:
   - Clasificación automática de perfiles de ingresos
   - Análisis de riesgo crediticio
   - Estudios demográficos y socioeconómicos

2. **Limitaciones**: 
   - El modelo se basa en correlaciones, no en causalidad
   - Puede tener sesgos inherentes en los datos
   - Requiere validación continua con nuevos datos

#### 🔧 **Mejoras Potenciales**

1. **Ingeniería de Características**:
   - Crear nuevas características derivadas
   - Considerar interacciones entre variables
   - Normalización de variables numéricas

2. **Optimización del Modelo**:
   - Validación cruzada para optimización de hiperparámetros
   - Prueba de otros algoritmos (Random Forest, Gradient Boosting)
   - Ensemble de múltiples modelos

3. **Datos**:
   - Recolección de más datos
   - Balanceo de clases si es necesario
   - Validación de calidad de datos


In [None]:
# Análisis final de rendimiento por clase
print("=" * 60)
print("📊 ANÁLISIS FINAL POR CLASE")
print("=" * 60)

# Análisis de rendimiento por clase
class_0_predictions = detailed_predictions.filter(col("label_indexed") == 0.0)
class_1_predictions = detailed_predictions.filter(col("label_indexed") == 1.0)

print(f"\\n📈 CLASE <=50K (Índice 0):")
print(f"   • Total de casos: {class_0_predictions.count()}")
correct_class_0 = class_0_predictions.filter(col("prediction") == 0.0).count()
print(f"   • Predicciones correctas: {correct_class_0}")
print(f"   • Precisión: {correct_class_0 / class_0_predictions.count() * 100:.2f}%")

print(f"\\n📈 CLASE >50K (Índice 1):")
print(f"   • Total de casos: {class_1_predictions.count()}")
correct_class_1 = class_1_predictions.filter(col("prediction") == 1.0).count()
print(f"   • Predicciones correctas: {correct_class_1}")
print(f"   • Precisión: {correct_class_1 / class_1_predictions.count() * 100:.2f}%")

# Resumen final
print(f"\\n🎯 RESUMEN FINAL:")
print(f"   • Modelo entrenado con {df.count()} registros")
print(f"   • División: {train_df.count()} entrenamiento, {test_df.count()} prueba")
print(f"   • Características utilizadas: {len(feature_columns)}")
print(f"   • AUC: {auc:.4f}")
print(f"   • Accuracy: {accuracy:.4f}")
print(f"   • F1-Score: {f1:.4f}")

if auc > 0.8:
    print(f"   🎉 ¡Excelente rendimiento! El modelo está listo para producción.")
elif auc > 0.7:
    print(f"   👍 Buen rendimiento. Considerar optimizaciones adicionales.")
else:
    print(f"   ⚠️  Rendimiento moderado. Se recomienda mejorar el modelo.")


## 11. Predicción con Nuevos Datos

### 11.1 Creación de Datos de Prueba

Vamos a crear un DataFrame con 9 registros nuevos para probar nuestro modelo entrenado.


In [None]:
# Crear datos de prueba con 9 registros nuevos
new_data = [
    # Registro 1: Profesional joven con educación avanzada
    (29, "Male", "Private", 180000, "Masters", 45, ">50K"),
    
    # Registro 2: Empleado gubernamental con educación media
    (42, "Female", "Gov", 220000, "Bachelors", 40, ">50K"),
    
    # Registro 3: Trabajador autónomo mayor
    (58, "Male", "Self-emp", 350000, "HS-grad", 55, ">50K"),
    
    # Registro 4: Joven con educación básica
    (24, "Female", "Private", 95000, "11th", 25, "<=50K"),
    
    # Registro 5: Profesional experimentado
    (47, "Male", "Private", 280000, "Bachelors", 50, ">50K"),
    
    # Registro 6: Empleada gubernamental joven
    (31, "Female", "Gov", 160000, "Some-college", 35, "<=50K"),
    
    # Registro 7: Trabajador autónomo con educación avanzada
    (52, "Female", "Self-emp", 320000, "Masters", 48, ">50K"),
    
    # Registro 8: Empleado privado con educación media
    (38, "Male", "Private", 200000, "Assoc", 42, "<=50K"),
    
    # Registro 9: Profesional senior
    (61, "Male", "Gov", 400000, "Masters", 40, ">50K")
]

# Crear DataFrame con los nuevos datos
new_df = spark.createDataFrame(new_data, schema)

print("✅ Datos de prueba creados")
print(f"📊 Total de registros nuevos: {new_df.count()}")

# Mostrar los datos de entrada
print("\n📋 DATOS DE ENTRADA PARA PREDICCIÓN:")
new_df.select("age", "sex", "workclass", "education", "hours_per_week").show(truncate=False)


### 11.2 Aplicación del Modelo a Nuevos Datos

Ahora vamos a aplicar nuestro modelo entrenado a estos nuevos datos para hacer predicciones.


In [None]:
# Aplicar el modelo entrenado a los nuevos datos
print("🔄 Aplicando modelo entrenado a nuevos datos...")
new_predictions = full_model.transform(new_df)

print("✅ Predicciones completadas")

# Crear funciones para interpretar los resultados
def interpret_prediction_new(pred_idx):
    return ">50K" if pred_idx == 1.0 else "<=50K"

def interpret_label_new(label_idx):
    return ">50K" if label_idx == 1.0 else "<=50K"

def extract_prob_high_new(probability_vector):
    return float(probability_vector[1])  # Probabilidad de >50K

# Crear UDFs
interpret_pred_udf_new = udf(interpret_prediction_new, StringType())
interpret_label_udf_new = udf(interpret_label_new, StringType())
extract_prob_udf_new = udf(extract_prob_high_new, DoubleType())

# Agregar columnas interpretables
new_results = new_predictions.withColumn(
    "prediction_interpreted", interpret_pred_udf_new("prediction")
).withColumn(
    "label_interpreted", interpret_label_udf_new("label_indexed")
).withColumn(
    "prob_>50K", extract_prob_udf_new("probability")
)

print("✅ Resultados procesados y listos para mostrar")


### 11.3 Resultados de las Predicciones

Vamos a mostrar los resultados de las predicciones con las probabilidades asociadas.


In [None]:
# Mostrar resultados detallados
print("=" * 100)
print("📊 RESULTADOS DE PREDICCIONES EN NUEVOS DATOS")
print("=" * 100)

# Mostrar tabla completa con predicciones
new_results.select(
    "age", "sex", "workclass", "education", "hours_per_week",
    "label_interpreted", "prediction_interpreted", "prob_>50K"
).show(truncate=False)

# Análisis de precisión en nuevos datos
correct_new = new_results.filter(col("label_indexed") == col("prediction")).count()
total_new = new_results.count()
accuracy_new = correct_new / total_new * 100

print(f"\n📈 ANÁLISIS DE PRECISIÓN EN NUEVOS DATOS:")
print(f"   • Predicciones correctas: {correct_new}/{total_new}")
print(f"   • Precisión: {accuracy_new:.2f}%")

# Mostrar casos incorrectos si los hay
incorrect_new = new_results.filter(col("label_indexed") != col("prediction"))
if incorrect_new.count() > 0:
    print(f"\n❌ CASOS INCORRECTOS:")
    incorrect_new.select(
        "age", "sex", "workclass", "education", "hours_per_week",
        "label_interpreted", "prediction_interpreted", "prob_>50K"
    ).show(truncate=False)
else:
    print(f"\n✅ ¡Todas las predicciones fueron correctas!")

# Análisis por nivel de confianza
high_conf_new = new_results.filter(col("prob_>50K") >= 0.8)
medium_conf_new = new_results.filter((col("prob_>50K") >= 0.6) & (col("prob_>50K") < 0.8))
low_conf_new = new_results.filter(col("prob_>50K") < 0.6)

print(f"\n🎯 ANÁLISIS DE CONFIANZA:")
print(f"   • Alta confianza (≥0.8): {high_conf_new.count()} casos")
print(f"   • Confianza media (0.6-0.8): {medium_conf_new.count()} casos")
print(f"   • Baja confianza (<0.6): {low_conf_new.count()} casos")


### 11.4 Análisis Detallado de Cada Caso

Vamos a analizar cada caso individual para entender mejor las predicciones del modelo.


In [None]:
# Análisis detallado caso por caso
print("🔍 ANÁLISIS DETALLADO CASO POR CASO:")
print("=" * 80)

# Convertir a Pandas para análisis más detallado
new_results_pandas = new_results.toPandas()

for i, row in new_results_pandas.iterrows():
    print(f"\n📋 CASO {i+1}:")
    print(f"   👤 Perfil: {row['age']} años, {row['sex']}, {row['workclass']}")
    print(f"   🎓 Educación: {row['education']}, {row['hours_per_week']} hrs/semana")
    print(f"   🎯 Real: {row['label_interpreted']}")
    print(f"   🤖 Predicción: {row['prediction_interpreted']}")
    print(f"   📊 Probabilidad >50K: {row['prob_>50K']:.3f}")
    
    # Análisis del caso
    if row['label_interpreted'] == row['prediction_interpreted']:
        print(f"   ✅ CORRECTO")
    else:
        print(f"   ❌ INCORRECTO")
    
    # Interpretación de la confianza
    if row['prob_>50K'] >= 0.8:
        conf_level = "ALTA"
    elif row['prob_>50K'] >= 0.6:
        conf_level = "MEDIA"
    else:
        conf_level = "BAJA"
    
    print(f"   🎯 Confianza: {conf_level}")

print(f"\n📊 RESUMEN FINAL:")
print(f"   • Total de casos analizados: {len(new_results_pandas)}")
print(f"   • Predicciones correctas: {correct_new}")
print(f"   • Predicciones incorrectas: {total_new - correct_new}")
print(f"   • Precisión general: {accuracy_new:.2f}%")

# Análisis de patrones
print(f"\n🔍 PATRONES OBSERVADOS:")
high_income_cases = new_results_pandas[new_results_pandas['label_interpreted'] == '>50K']
low_income_cases = new_results_pandas[new_results_pandas['label_interpreted'] == '<=50K']

print(f"   • Casos >50K: {len(high_income_cases)}")
print(f"   • Casos <=50K: {len(low_income_cases)}")

if len(high_income_cases) > 0:
    avg_prob_high = high_income_cases['prob_>50K'].mean()
    print(f"   • Probabilidad promedio para >50K: {avg_prob_high:.3f}")

if len(low_income_cases) > 0:
    avg_prob_low = low_income_cases['prob_>50K'].mean()
    print(f"   • Probabilidad promedio para <=50K: {avg_prob_low:.3f}")


## 5. Evaluación del Modelo
