# Instalacion de Requisitos y Creación de Spark Session

In [0]:
!pip install pyspark

In [0]:
from pyspark.sql import SparkSession

# Inicializa una sesión de Spark
spark = SparkSession.builder \
    .appName("Regresion-Berka") \
    .getOrCreate()

# Ingesta y Procesamiento de Datos

In [0]:
# Carga de datos
consulta_sql = """
SELECT *
FROM view_berka
"""
data = spark.sql(consulta_sql)
data.show()

In [0]:
# Preprocesamiento de datos (limpieza, conversión de categorías, etc.)
from pyspark.ml.feature import StringIndexer
# Renonbrar columnas
data = data.withColumnRenamed('A11','avg_salary')
data = data.withColumnRenamed('A1','district_id')
data = data.withColumnRenamed('edad','client_age')

# Crear un StringIndexer
indexer = StringIndexer(inputCol="status", outputCol="status_index")

# Convierte las columnas de cadena a tipo de dato numérico
data = data.withColumn("amount", data["amount"].cast("double"))
data = data.withColumn("payments", data["payments"].cast("double"))
data = data.withColumn("duration", data["duration"].cast("double"))
data = data.withColumn("avg_salary", data["avg_salary"].cast("double"))

# Ajustar el StringIndexer al DataFrame y transformar los datos
df_berka_final = indexer.fit(data).transform(data)

# Mostrar el resultado
df_berka_final.show()

###  -> Realizar un agrupamiento de las variables cualitativas que usara (tabla de frecuencia).

In [0]:
from pyspark.sql.functions import count

# Calcula la frecuencia de cada valor único en la columna "status"
status_frequencies = df_berka_final.groupBy("status").agg(count("*").alias("frequency"))
# Muestra la tabla de frecuencias
status_frequencies.show()

### -> Para el caso de una variable cuantitativa usar el método describe para obtener estadísticos descriptivos.

In [0]:
# Selecciona las columnas cuantitativas que deseas analizar.
cuantitativas = ["payments", "duration", "amount", "avg_salary", "client_age"]

# Usa el método "describe" en estas columnas.
describe_stats = df_berka_final.select(*cuantitativas).describe()

# Muestra las estadísticas descriptivas.
describe_stats.show()

# División de los datos

In [0]:
# División de datos en conjuntos de entrenamiento y prueba
train_data, test_data = df_berka_final.randomSplit([0.8, 0.2], seed=123)

In [0]:
# Creación de un DataFrame de características
from pyspark.ml.feature import VectorAssembler

feature_cols = ["amount", "payments", "duration", "client_age", "avg_salary"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)
train_data.show()

### creacion de datos especiales para MLP

In [0]:
# Ejecutar esta linea en caso se quiera volver a ejecutar el paso 4
#train_data = train_data.drop("features")
#test_data = test_data.drop("features")

In [0]:
from pyspark.sql.functions import col

ml_data = assembler.transform(df_berka_final)
ml_data = ml_data.withColumnRenamed("status_index", "label")
ml_data = ml_data.withColumn("label", col("label").cast("integer"))

# División de datos en conjuntos de entrenamiento y prueba
(ml_train_data, ml_test_data) = ml_data.randomSplit([0.8, 0.2], seed=1234)
ml_test_data = ml_test_data.withColumn("label", col("label").cast("integer"))

# Creación de los modelos

### Modelo LogisticRegression

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(featuresCol="features", labelCol="status_index")
pipeline = Pipeline(stages=[lr])
lr_model = pipeline.fit(train_data)

### Modelo RandomForestClassifier

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf_classifier = RandomForestClassifier(numTrees=45, labelCol="status_index", featuresCol="features")
rf_model = rf_classifier.fit(train_data)

### Modelo MultilayerPerceptronClassifier

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

ml_layers = [len(feature_cols), 10, 5, len(ml_data.select("label").distinct().collect())]
ml_classifier = MultilayerPerceptronClassifier(maxIter=120, layers=ml_layers, blockSize=256, seed=1234)
ml_model = ml_classifier.fit(ml_train_data)

# Evaluación de los Modelos

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Crear un diccionario de modelos y etiquetas de columna
models_and_labels = {
    "LogisticRegression": ("status_index", lr_model),
    "RandomForest": ("status_index", rf_model),
    "MultilayerPerceptron": ("label", ml_model)
}

# Crear un diccionario de métricas
metrics = {
    "F1 Score": "f1",
    "Precision": "weightedPrecision",
    "Recall": "weightedRecall",
    "Weighted F1 Score": "weightedFMeasure",
    "Accuracy": "accuracy"
}

# Realizar predicciones y calcular varias métricas para cada modelo
for model_name, (label_col, model) in models_and_labels.items():
    predictions = model.transform(test_data if model_name != "MultilayerPerceptron" else ml_test_data)
    
    print(f"Resultados para el modelo {model_name}:")
    
    for metric_name, metric_value in metrics.items():
        evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName=metric_value)
        metric_result = evaluator.evaluate(predictions)
        print(f"{metric_name}: {metric_result}")
    
    print("\n")

# Probando hiperparametros en MLP

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Definir una cuadrícula de hiperparámetros a explorar
param_grid = (ParamGridBuilder()
              .addGrid(ml_classifier.maxIter, [100, 200])
              .addGrid(ml_classifier.layers, [[5, 10, 4], [5, 10, 10, 4]])
              .build())

# Crear un evaluador
cross_ml_evaluator = MulticlassClassificationEvaluator(metricName="f1")

# Configurar la validación cruzada
ml_crossval = CrossValidator(estimator=ml_classifier,
                          estimatorParamMaps=param_grid,
                          evaluator=cross_ml_evaluator,
                          numFolds=3)

# Ejecutar la validación cruzada para encontrar los mejores hiperparámetros
cv_model = ml_crossval.fit(ml_train_data)
best_model = cv_model.bestModel


In [0]:
# Evaluar el modelo en los datos de prueba (o cualquier otro conjunto de datos)
ml_test_predictions = best_model.transform(ml_test_data)

# Crear un evaluador para calcular métricas
cross_ml_evaluator = MulticlassClassificationEvaluator(metricName="f1")

# Calcular el F1 Score en los datos de prueba
cross_ml_f1_score = cross_ml_evaluator.evaluate(ml_test_predictions)

# Imprimir el F1 Score y otras métricas si es necesario
print(f"F1 Score: {cross_ml_f1_score}")