
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src=https://raw.githubusercontent.com/aestaire/ml_workshop/refs/heads/main/files/images/hands-on.png>
</div>

# Churn Prediction Feature Engineering
Nuestro primer paso es analizar los datos y construir las features que usaremos para entrenar nuestro modelo. Veamos cómo se puede hacer.

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/mlops/mlops-uc-end2end-1-v2.png?raw=true" width="1200">

<!-- Recopilar datos de uso (vista). Elimínelo para deshabilitar la recopilación o desactive el rastreador durante la instalación. Consulte el README para más detalles.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-science&org_id=1444828305810485&notebook=%2F01-mlops-quickstart%2F01_feature_engineering&demo_name=mlops-end2end&event=VIEW&path=%2F_dbdemos%2Fdata-science%2Fmlops-end2end%2F01-mlops-quickstart%2F01_feature_engineering&version=1&user_hash=f7ea13a45c991650d8df810431c3e0e2b12887e9ed7e206ee8fb6209bdb2ae82">

### Se ha creado un cluster para esta demostración
Para ejecutar esta demostración, simplemente selecciona el clúster `dbdemos-mlops-end2end` en el menú desplegable ([abrir configuración del clúster](https://e2-demo-field-eng.cloud.databricks.com/#setting/clusters/1013-181446-g00pu2bj/configuration)). <br />
*Nota: Si el clúster fue eliminado después de 30 días, puedes volver a crearlo con `dbdemos.create_cluster('mlops-end2end')` o reinstalar la demo: `dbdemos.install('mlops-end2end')`*

Último environment testado:

mlflow==3.3.0

In [0]:
%pip install --quiet mlflow --upgrade


%restart_python

In [0]:
%run ../_resources/00-setup

## Análisis exploratorio de datos
Para familiarizarse con los datos, identificar qué necesita limpieza, preprocesamiento, etc.
- **Utiliza las herramientas nativas de visualización de Databricks**
  - Después de ejecutar una consulta SQL en una celda del notebook, usa la pestaña `+` para agregar gráficos y visualizar los resultados.
- Trae tu propia librería de visualización preferida (por ejemplo, seaborn, plotly)

In [0]:
%sql
SELECT * FROM mlops_churn_bronze_customers

In [0]:
telco_df = spark.read.table("mlops_churn_bronze_customers").pandas_api()
telco_df["internet_service"].value_counts().plot.pie()

In [0]:
# Read into Spark
telcoDF = spark.read.table("mlops_churn_bronze_customers")
display(telcoDF)

## Definir lógica de limpieza y creación de features

Vamos a definir una función para limpiar los datos e implementar la lógica de creación de features. Para ello vamos a:

1. Calcular el número de servicios opcionales
2. Proporcionar etiquetas significativas
3. Imputar valores nulos


### Usando la API de Pandas en Spark

Como nuestro equipo de científicos de datos está familiarizado con Pandas, utilizaremos la [API de pandas en spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html) para escalar el código de `pandas`. Las instrucciones de Pandas se convertirán en el motor de spark internamente y se distribuirán a escala.

*Nota: La API de Pandas en Spark antes se llamaba Koalas. A partir de `spark 3.2`, Koalas está incorporado y podemos obtener un DataFrame de Pandas usando `pandas_api()` [Detalles](https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html).*

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame


def clean_churn_features(dataDF: DataFrame) -> DataFrame:
  """
  Simple cleaning function leveraging pandas API
  """

  # Convert to pandas on spark dataframe
  data_psdf = dataDF.pandas_api()
  # Convert some columns
  data_psdf = data_psdf.astype({"senior_citizen": "string"})
  data_psdf["senior_citizen"] = data_psdf["senior_citizen"].map({"1" : "Yes", "0" : "No"})

  data_psdf["total_charges"] = data_psdf["total_charges"].apply(lambda x: float(x) if x.strip() else 0)


  # Fill some missing numerical values with 0
  data_psdf = data_psdf.fillna({"tenure": 0.0})
  data_psdf = data_psdf.fillna({"monthly_charges": 0.0})
  data_psdf = data_psdf.fillna({"total_charges": 0.0})

  def sum_optional_services(df):
      """Count number of optional services enabled, like streaming TV"""
      cols = ["online_security", "online_backup", "device_protection", "tech_support",
              "streaming_tv", "streaming_movies"]
      return sum(map(lambda c: (df[c] == "Yes"), cols))

  data_psdf["num_optional_services"] = sum_optional_services(data_psdf)

  # Return the cleaned Spark dataframe
  return data_psdf.to_spark()


## Calcular features y guardar tabla con features y etiquetas

Una vez que nuestras features estén listas, las guardaremos junto con las etiquetas como una tabla Delta Lake. Luego, esta tabla podrá ser recuperada para el entrenamiento del modelo.

En esta demostración rápida, veremos cómo entrenar un modelo usando este conjunto de datos etiquetado guardado como una tabla Delta Lake y cómo capturar la trazabilidad entre la tabla y el modelo. La trazabilidad del modelo aporta control y gobernanza a nuestro despliegue, permitiéndonos saber qué modelo depende de qué conjunto de tablas de features.

Databricks tiene una capacidad de Feature Store (almacén de features) totalmente integrada en la plataforma. Cualquier tabla Delta Lake con una clave primaria puede usarse como tabla de features para el entrenamiento de modelos y para el servicio batch y en línea. Veremos un ejemplo de cómo usar el Feature Store para realizar búsquedas de features en una demostración más avanzada.

In [0]:
churn_features = clean_churn_features(telcoDF)
display(churn_features)

### Escribir la tabla para entrenamiento

Escribe los datos etiquetados que tienen las features preparadas y las etiquetas como una tabla Delta. Luego usaremos esta tabla para entrenar el modelo para predecir la deserción.

In [0]:
# Specify train-test split
train_ratio, test_ratio = 0.8, 0.2
churn_features = (churn_features.withColumn("random", F.rand(seed=42))
                                .withColumn("split",
                                            F.when(F.col("random") < train_ratio, "train")
                                            .otherwise("test"))
                                .drop("random"))

# Write table for training
(churn_features.write.mode("overwrite")
               .option("overwriteSchema", "true")
               .saveAsTable("mlops_churn_training"))

# Add comment to the table
spark.sql(f"""COMMENT ON TABLE {catalog}.{db}.mlops_churn_training IS \'The features in this table are derived from the mlops_churn_bronze_customers table in the lakehouse. 
              We created service features and cleaned up their names.  No aggregations were performed.'""")


¡Eso es todo! Las features etiquetadas ya están listas para ser utilizadas en el entrenamiento.

### Entrenar un modelo base

Próximo paso: [Entrenar un modelo lightGBM]($./02_train_lightGBM)