1) Setup y carga del Parquet

In [22]:
%matplotlib inline
import os
from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder.master("local[*]").appName("kpi-dashboard").getOrCreate())
df = spark.read.parquet("/data/processed/employee_attrition.parquet")

TOTAL = df.count()
POS   = df.filter(F.col("attrition_label")==1).count()
RATE  = POS / TOTAL if TOTAL else 0.0

print("Total filas:", TOTAL, "| Attrition rate:", round(RATE,4))
os.makedirs("/output/bi", exist_ok=True)
os.makedirs("/output/plots", exist_ok=True)


Total filas: 1470 | Attrition rate: 0.1612


2) KPI general (overview)

In [23]:
import pandas as pd
kpi_overview = pd.DataFrame([{"rows": TOTAL, "attrition_positive": POS, "attrition_rate": RATE}])
kpi_overview.to_csv("/output/bi/kpi_overview.csv", index=False)
kpi_overview


Unnamed: 0,rows,attrition_positive,attrition_rate
0,1470,237,0.161224


3) Tasa por JobRole / Department / OverTime

In [24]:
def agg_rate(df, col):
    return (df.groupBy(col)
              .agg(F.count("*").alias("n"),
                   F.avg("attrition_label").alias("attrition_rate"))
              .orderBy(F.desc("attrition_rate")))

jobrole  = agg_rate(df, "JobRole")
depart   = agg_rate(df, "Department")
overtime = agg_rate(df, "OverTime")

jobrole.toPandas().to_csv("/output/bi/attrition_by_jobrole.csv", index=False)
depart.toPandas().to_csv("/output/bi/attrition_by_department.csv", index=False)
overtime.toPandas().to_csv("/output/bi/attrition_by_overtime.csv", index=False)

jobrole.show(10); depart.show(10); overtime.show()


+--------------------+---+--------------------+
|             JobRole|  n|      attrition_rate|
+--------------------+---+--------------------+
|Sales Representative| 83| 0.39759036144578314|
|Laboratory Techni...|259| 0.23938223938223938|
|     Human Resources| 52| 0.23076923076923078|
|     Sales Executive|326| 0.17484662576687116|
|  Research Scientist|292| 0.16095890410958905|
|Manufacturing Dir...|145| 0.06896551724137931|
|Healthcare Repres...|131| 0.06870229007633588|
|             Manager|102|0.049019607843137254|
|   Research Director| 80|               0.025|
+--------------------+---+--------------------+

+--------------------+---+-------------------+
|          Department|  n|     attrition_rate|
+--------------------+---+-------------------+
|               Sales|446| 0.2062780269058296|
|     Human Resources| 63|0.19047619047619047|
|Research & Develo...|961| 0.1383975026014568|
+--------------------+---+-------------------+

+--------+----+-------------------+
|OverTime

4) Tasa por tramos de salario y de antigüedad

In [25]:
# Salario anual ya lo creaste como 'income_yearly' en el ETL
from pyspark.sql import functions as F

df_bins = (df
  .withColumn("income_band",
              F.when(F.col("income_yearly") < 40000, "<40k")
               .when(F.col("income_yearly") < 60000, "40–60k")
               .when(F.col("income_yearly") < 80000, "60–80k")
               .when(F.col("income_yearly") < 100000,"80–100k")
               .otherwise(">=100k"))
  .withColumn("tenure_band",
              F.when(F.col("YearsAtCompany") < 2, "0–2")
               .when(F.col("YearsAtCompany") < 5, "2–5")
               .when(F.col("YearsAtCompany") < 10,"5–10")
               .otherwise(">=10"))
)

income_rate = (df_bins.groupBy("income_band")
                .agg(F.count("*").alias("n"), F.avg("attrition_label").alias("attrition_rate"))
                .orderBy("income_band"))
tenure_rate = (df_bins.groupBy("tenure_band")
                .agg(F.count("*").alias("n"), F.avg("attrition_label").alias("attrition_rate"))
                .orderBy("tenure_band"))

income_rate.toPandas().to_csv("/output/bi/attrition_by_income_band.csv", index=False)
tenure_rate.toPandas().to_csv("/output/bi/attrition_by_tenure_band.csv", index=False)

income_rate.show(); tenure_rate.show()


+-----------+---+-------------------+
|income_band|  n|     attrition_rate|
+-----------+---+-------------------+
|     40–60k|307|0.14332247557003258|
|     60–80k|249|0.08835341365461848|
|    80–100k|102|0.13725490196078433|
|       <40k|442| 0.2692307692307692|
|     >=100k|370|0.10270270270270271|
+-----------+---+-------------------+

+-----------+---+-------------------+
|tenure_band|  n|     attrition_rate|
+-----------+---+-------------------+
|        0–2|215| 0.3488372093023256|
|        2–5|365|0.18082191780821918|
|       5–10|524|0.11068702290076336|
|       >=10|366|0.10382513661202186|
+-----------+---+-------------------+



5) Exportar coeficientes (LogReg) o importancias (RF)

In [26]:
import json, pandas as pd, joblib, os

# Carga robusta (soporta .pkl como dict {"model":..., "threshold":...} o como Pipeline directamente)
bundle = joblib.load("/output/models/logreg_baseline.pkl")

if isinstance(bundle, dict) and "model" in bundle:
    mdl = bundle["model"]
    thr = bundle.get("threshold", 0.5)
else:
    mdl = bundle         # el .pkl es directamente el Pipeline
    thr = 0.5            # si no guardaste threshold, usa 0.5 por defecto

# 'pre' es el ColumnTransformer dentro del Pipeline
pre = mdl.named_steps["prep"]

# Reconstruir nombres de features
from sklearn.preprocessing import OneHotEncoder

def ohe_feature_names(preproc, cat_cols):
    ohe = preproc.named_transformers_["cat"]
    return ohe.get_feature_names_out(cat_cols).tolist()

# Obtener las columnas originales desde el dataset
import numpy as np
pdf = df.toPandas()
drop_cols = ["attrition_label","Attrition","EmployeeNumber","EmployeeCount","StandardHours","Over18"]
X = pdf.drop(columns=[c for c in drop_cols if c in pdf.columns], errors="ignore")
num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = X.select_dtypes(exclude=[np.number]).columns.tolist()

feat_names = num_cols + ohe_feature_names(pre, cat_cols)

# Exportar coeficientes (LR) o importancias (RF)
out_path = "/output/bi/feature_effects.csv"
os.makedirs("/output/bi", exist_ok=True)

try:
    coefs = mdl.named_steps["clf"].coef_.ravel()
    pd.DataFrame({"feature": feat_names, "effect": coefs}) \
      .sort_values("effect", ascending=False) \
      .to_csv(out_path, index=False)
    print("Coeficientes (LR) ->", out_path)
except Exception:
    importances = mdl.named_steps["clf"].feature_importances_
    pd.DataFrame({"feature": feat_names, "importance": importances}) \
      .sort_values("importance", ascending=False) \
      .to_csv(out_path, index=False)
    print("Importancias (RF) ->", out_path)


Coeficientes (LR) -> /output/bi/feature_effects.csv


In [27]:
import pandas as pd
import sqlalchemy as sa

# 1) Lee el CSV que acabas de generar
df = pd.read_csv("/output/bi/feature_effects.csv")

# Normaliza nombre de columna a 'value' para que el visual sea único
if "effect" in df.columns:
    df = df.rename(columns={"effect":"value"})
elif "importance" in df.columns:
    df = df.rename(columns={"importance":"value"})

# etiqueta del modelo (ajusta si estás mostrando RF)
df["model_name"] = "LogReg"

# 2) Escribe en Postgres
eng = sa.create_engine("postgresql+psycopg2://ml:ml@postgres:5432/mlops")
df.to_sql("feature_effects", eng, schema="public", if_exists="replace", index=False)
print("Escrito -> public.feature_effects")


Escrito -> public.feature_effects
