In [0]:
# -------------------------------------------------------
# Capa Bronze: Carga inicial de datos crudos
# -------------------------------------------------------

from pyspark.sql import SparkSession

# Leer la tabla original
df_bronze = spark.sql("SELECT * FROM bank_loan.default.bank_personal_loan")

# Limpiar nombres de columnas: minúsculas y espacios → _
df_bronze = df_bronze.toDF(*[
    c.strip().lower().replace(" ", "_") for c in df_bronze.columns
])

# Guardar como tabla Delta en la capa Bronze
# df_bronze.write.format("delta").mode("overwrite").saveAsTable("bank_loan.default.bank_personal_loan_bronze")

In [0]:
print("\n>>> 1. TAMAÑO Y ESTRUCTURA <<<\n")
df_count = df_bronze.count()
df_shape = spark.createDataFrame([(df_count, len(df_bronze.columns))], ["num_rows", "num_columns"])
display(df_shape)
df_bronze.printSchema()

In [0]:
df_bronze_pandas = df_bronze.toPandas()
df_bronze_pandas.describe()

In [0]:
display(df_bronze)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df_silver = spark.read.table("bank_loan.default.etl_bank_loan_bronze")
print(df_silver.dtypes)

In [0]:
# -------------------------------------------------------
# Capa Silver: limpieza, transformación y escalado manual
# -------------------------------------------------------

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

# -------------------------
# 0. Leer la tabla Bronze
# -------------------------
df_silver = spark.read.table("bank_loan.default.etl_bank_loan_bronze")

# -------------------------
# 1. Convertir columnas ID y ZIP a string
# -------------------------
for c in ["id", "zip_code"]:
    df_silver = df_silver.withColumn(c, F.col(c).cast("string"))

# -------------------------
# 2. Convertir columnas numéricas bigint/int a double
# -------------------------
cols = [c for c, t in df_silver.dtypes if t in ("bigint", "int")]
for c in cols:
    df_silver = df_silver.withColumn(c, F.col(c).cast("double"))

# -------------------------
# 3. Función UDF para convertir fracciones a float
# -------------------------
def frac_to_double(s):
    try:
        if s is None:
            return None
        if '/' in s:
            num, denom = s.split('/')
            return float(num) / float(denom)
        else:
            return float(s)
    except:
        return None

frac_to_double_udf = F.udf(frac_to_double, DoubleType())
df_silver = df_silver.withColumn("ccavg", frac_to_double_udf(F.col("ccavg")))

# -------------------------
# 4. Rellenar valores nulos con la mediana
# -------------------------
for col in ["age", "income", "ccavg"]:
    median_val = df_silver.approxQuantile(col, [0.5], 0.01)[0]
    df_silver = df_silver.fillna({col: median_val})

# -------------------------
# 5. Escalado manual (z-score) de columnas numéricas
# -------------------------

# Columnas numéricas continuas que sí queremos escalar
numeric_cols = [c for c, t in df_silver.dtypes if t == "double"]
string_cols = ["id", "zip_code"]

# Variables que NO queremos escalar
not_scaled_cols = [
    'education',
    'family',
    'personal_loan',
    'securities_account',	
    'cd_account',	
    'online',	
    'creditcard'
]

# Filtrar solo las numéricas continuas
numeric_cols_to_scale = [c for c in numeric_cols if c not in not_scaled_cols]


scaled_cols = []

for c in numeric_cols_to_scale:
    stats = df_silver.select(
        F.mean(F.col(c)).alias("mean"),
        F.stddev(F.col(c)).alias("std")
    ).collect()[0]

    mean_val = stats["mean"]
    std_val = stats["std"] if stats["std"] != 0 else 1.0

    scaled_col_name = f"{c}_scaled"
    df_silver = df_silver.withColumn(
        scaled_col_name,
        (F.col(c) - F.lit(mean_val)) / F.lit(std_val)
    )
    scaled_cols.append(scaled_col_name)

# -------------------------
#  6. Seleccionar solo columnas escaladas + columnas string
# -------------------------
df_silver_scaled = df_silver.select(string_cols + scaled_cols + not_scaled_cols)

# -------------------------
# 7. Quitar filas con valores nulos
# -------------------------
df_silver_scaled = df_silver_scaled.na.drop()

# -------------------------
# 8️. Guardar la tabla en Delta como Silver
# -------------------------
# df_silver_scaled.write.format("delta").mode("overwrite").saveAsTable("bank_loan.default.bank_personal_loan_silver")
# -------------------------
# 9️. Mostrar resultados
# -------------------------
display(df_silver_scaled)


In [0]:
df_silver_scaled_pandas = df_silver_scaled.toPandas()
df_silver_scaled_pandas.describe()

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Extraer la variable
data = df_silver_scaled_pandas["age_scaled"]

# Calcular estadísticas
media = np.mean(data)
desv_std = np.std(data)

plt.figure(figsize=(12,6))
sns.histplot(data, bins=30, kde=True, color='skyblue')

# Añadir líneas de media y desviación estándar
plt.axvline(media, color='red', linestyle='--', label=f'Media = {media:.2f}')
plt.axvline(media + desv_std, color='green', linestyle='--', label=f'Media + 1σ = {media + desv_std:.2f}')
plt.axvline(media - desv_std, color='green', linestyle='--', label=f'Media - 1σ = {media - desv_std:.2f}')

plt.xlabel("Income Escalado")
plt.ylabel("Frecuencia")
plt.title("Histograma de Income Escalado con Media y Desviación Estándar")
plt.legend()
plt.show()


In [0]:
df_silver_scaled_pandas.info()

In [0]:
# -------------------------------------------------------
# Capa Gold: Features finales para análisis y ML
# -------------------------------------------------------

# Se define la tabla Gold con la tabla Silver escalada
df_gold = df_silver_scaled

# -------------------------
# 1️⃣ Crear flags y ratios usando solo columnas escaladas
# -------------------------
# Flag de alto ingreso (usando z-score)
mean_income = df_gold.select(F.mean("income_scaled")).collect()[0][0]
df_gold = df_gold.withColumn("high_income_flag", (F.col("income_scaled") > mean_income).cast("int"))

# Ratio ingreso por miembro de la familia
df_gold = df_gold.withColumn(
    "income_per_family_member",
    F.col("income_scaled") / F.col("family")
)

# Flag de clientes jóvenes
df_gold = df_gold.withColumn("young_flag", (F.col("age_scaled") < 0).cast("int"))

# -------------------------
# 2️⃣ Agregaciones por zip_code
# -------------------------
df_zip_agg = df_gold.groupBy("zip_code").agg(
    F.mean("ccavg_scaled").alias("avg_ccavg_by_zip"),
    F.mean("income_scaled").alias("avg_income_by_zip")
)

# Hacer join de nuevo con df_gold
df_gold = df_gold.join(df_zip_agg, on="zip_code", how="left")

# -------------------------
# 3️⃣ Seleccionar columnas finales
# -------------------------
# Mantener ID, zip, binarios, escaladas, y nuevas features
not_scaled_cols = [
    'education',
    'family',
    'personal_loan',
    'securities_account',	
    'cd_account',	
    'online',	
    'creditcard'
]
scaled_cols = [c for c in df_gold.columns if "_scaled" in c]
new_features = ["high_income_flag", "income_per_family_member", "young_flag", "avg_ccavg_by_zip", "avg_income_by_zip"]

final_cols = ["id", "zip_code"] + scaled_cols + not_scaled_cols + new_features
df_gold_final = df_gold.select(final_cols)

# -------------------------
# 4️⃣ Guardar Gold en Delta
# -------------------------
# df_gold_final.write.format("delta").mode("overwrite").saveAsTable("bank_loan.default.bank_personal_loan_gold")

# -------------------------
# 5️⃣ Mostrar resultados
# -------------------------
display(df_gold_final)


In [0]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Leer tabla Gold
df_spark = spark.read.table("bank_loan.default.etl_bank_loan_gold")

# Convertir a pandas
df = df_spark.toPandas()

# Guardar id y zip_code
ids = df["id"]
zips = df["zip_code"]

# Separar target y features automáticamente
X = df.drop(columns=["id", "zip_code", "personal_loan"])
y = df["personal_loan"]

# Split train/test (mantener indices para unir id/zip)
X_train, X_test, y_train, y_test, idx_train, idx_test = train_test_split(
    X, y, df.index, test_size=0.2, random_state=848
)

# Entrenar Random Forest
model = RandomForestRegressor(
    n_estimators=500,
    max_depth=10,
    min_samples_split=5,
    min_samples_leaf=2,
    random_state=42,
    n_jobs=-1
)
model.fit(X_train, y_train)

# Predicción y métrica
y_pred = model.predict(X_test)
rmse = mean_squared_error(y_test, y_pred, squared=False)
print(f"RMSE: {rmse:.4f}")

# Crear DataFrame final con id, zip_code, y_real y y_pred
df_results = pd.DataFrame({
    "id": ids.iloc[idx_test],
    "zip_code": zips.iloc[idx_test],
    "y_real": y_test,
    "y_pred": y_pred
})

# Convertir a Spark DataFrame y guardar como Delta
df_results_spark = spark.createDataFrame(df_results)
## df_results_spark.write.format("delta").mode("overwrite").saveAsTable("bank_loan.default.rf_predictions_gold")

In [0]:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor

# Entrenar modelo
model = RandomForestRegressor(
    n_estimators=500,
    max_depth=10,
    min_samples_split=5,
    min_samples_leaf=2,
    random_state=848,
    n_jobs=-1
)
model.fit(X_train, y_train)

# Guardar con MLflow
with mlflow.start_run():
    mlflow.sklearn.log_model(model, "personal_loan_model")
    mlflow.log_metric("rmse", rmse)
