<a href="https://colab.research.google.com/github/hpestrella/lab_datos/blob/main/ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Proyecto 1**

**Objetivo**

Diseñar e implementar un **pipeline de ETL escalable y reutilizable**, orientado a la preparación de datasets que puedan ser utilizados en futuras aplicaciones de Machine Learning

**Descripción**

En este proyecto se realiza un **ETL** de los datos disponibles para el desarrollo de modelos de Machine Learning. Se presenta la ingestión de diferentes fuentes de datos, la unión de estas bases, la homogenización de nombres en las columnas, la limpieza de cada variable y el guardado en un archivo que conserve las propiedades que se otorgaron a lo datos.

Adicionalmente, se implementó una función de registro (logging) que documenta cada etapa del proceso, permitiendo contar con trazabilidad detallada de las operaciones realizadas y facilitando el monitoreo, la depuración y el análisis del pipeline.

## **1. Importaciones y Configuración**

In [None]:
# ==============================
# Librerías estándar de Python
# ==============================
from datetime import datetime
import unicodedata

# ==============================
# Librerías de terceros
# ==============================
import joblib
import numpy as np
import pandas as pd
import yaml
import json
from datetime import datetime
from datasets import load_dataset
from pathlib import Path

# ==============================
# Librerías de visualización
# ==============================
import matplotlib.pyplot as plt
import seaborn as sns

### `PipelineLogger` — Registro Simple de Procedimientos en un Pipeline

`PipelineLogger` es una clase diseñada para **guardar de forma sencilla** la información más importante de un pipeline de datos:  
la fuente, los pasos realizados y la salida generada.

---

**1. ¿Qué hace?**

- Guarda metadatos del pipeline (nombre, versión, autor, fecha).
- Registra **de dónde vienen los datos**.
- Registra **cada transformación** aplicada.
- Registra **cómo y dónde termina la salida**.
- Permite guardar todo en **JSON o YAML**.

---

**2. Cómo se usa**

*Crear el logger*

```python
logger = PipelineLogger(
    name="mi_pipeline",
    version="1.0",
    description="Ejemplo de pipeline",
    author="Equipo de Datos"
)
````

---

**3. Registrar la fuente**

```python
logger.set_source(
    source_type="csv",
    path="data/entrada.csv",
    rows_before=1000,
    columns=["id", "fecha", "monto"]
)
```

---

**4. Registrar pasos del pipeline**

```python
logger.log_step(
    action="limpieza_nulos",
    description="Se eliminan registros con nulos",
    filas_antes=1000,
    filas_despues=950
)

logger.log_step(
    action="crear_campo_mes",
    description="Se genera columna 'mes'",
    formula="mes = fecha.dt.month"
)
```

---

**5. Registrar la salida**

```python
logger.set_output(
    output_type="parquet",
    path="data/salida.parquet",
    rows_after=950,
    columns_schema={"id": "int", "fecha": "datetime", "mes": "int", "monto": "float"}
)
```

---

**6. Guardar los metadatos**

```python
logger.save("metadatos.json", fmt="json")
```

o

```python
logger.save("metadatos.yaml", fmt="yaml")
```

---

**7. Ejemplo rápido**

```python
logger = PipelineLogger("pipeline_ejemplo")

logger.set_source("csv", "datos.csv")
logger.log_step("limpieza")
logger.set_output("csv", "salida.csv")

logger.save("pipeline.json")
```

---

**8. ¿Para qué sirve?**

* Dejar **evidencia clara** de lo que hizo el pipeline.
* Facilitar auditorías.

In [None]:
class PipelineLogger:
    def __init__(self, name, version="1.0", description="", author=""):
        self.data = {
            "pipeline": {
                "name": name,
                "version": version,
                "description": description,
                "author": author,
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },
            "sources": [],
            "transformations": [],
            "joins": [],
            "output": {}
        }

    # ===============================
    # INTERNAL SANITIZER
    # ===============================

    def _sanitize(self, obj):
        """
        Limpia recursivamente claves y valores para
        serialización segura en JSON / YAML.
        """
        # --- Diccionarios ---
        if isinstance(obj, dict):
            clean = {}
            for k, v in obj.items():
                # Sanitizar clave
                if k is pd.NA or (isinstance(k, float) and np.isnan(k)):
                    key = "NA"
                else:
                    key = str(k)

                clean[key] = self._sanitize(v)
            return clean

        # --- Listas / tuplas ---
        elif isinstance(obj, (list, tuple, set)):
            return [self._sanitize(v) for v in obj]

        # --- Pandas / numpy NA ---
        elif obj is pd.NA or (isinstance(obj, float) and np.isnan(obj)):
            return None

        # --- DataFrames / Series ---
        elif isinstance(obj, pd.DataFrame):
            return {
                "__type__": "DataFrame",
                "rows": int(obj.shape[0]),
                "columns": list(obj.columns)
            }

        elif isinstance(obj, pd.Series):
            return {
                "__type__": "Series",
                "length": int(len(obj)),
                "name": obj.name
            }

        # --- Tipos básicos ---
        elif isinstance(obj, (str, int, float, bool)) or obj is None:
            return obj

        # --- Fallback ---
        return str(obj)

    # ===============================
    # SOURCES
    # ===============================

    def add_source(self, source_id, source_type, path,
                   rows=None, columns=None, **extra):
        source = {
            "source_id": source_id,
            "type": source_type,
            "path": path,
            "rows": rows,
            "columns": list(columns) if columns is not None else None
        }
        source.update(extra)
        self.data["sources"].append(source)

    # ===============================
    # TRANSFORMATIONS
    # ===============================

    def log_step(self, action, description="", input_df=None,
                 output_df=None, **details):
        step_number = len(self.data["transformations"]) + 1
        self.data["transformations"].append({
            "step": step_number,
            "action": action,
            "description": description,
            "input_df": input_df,
            "output_df": output_df,
            "details": details
        })

    # ===============================
    # JOINS
    # ===============================

    def log_join(self, left_df, right_df, how,
                 left_on, right_on,
                 result_df,
                 rows_before_left=None,
                 rows_before_right=None,
                 rows_after=None,
                 columns_added=None):

        join_number = len(self.data["joins"]) + 1
        self.data["joins"].append({
            "join_step": join_number,
            "left_df": left_df,
            "right_df": right_df,
            "how": how,
            "keys": {
                "left_on": left_on,
                "right_on": right_on
            },
            "rows_before": {
                "left": rows_before_left,
                "right": rows_before_right
            },
            "rows_after": rows_after,
            "columns_added": columns_added,
            "result_df": result_df
        })

    # ===============================
    # OUTPUT
    # ===============================

    def set_output(self, output_type, path,
                   rows_after=None,
                   columns_schema=None,
                   **extra):
        self.data["output"] = {
            "type": output_type,
            "path": path,
            "rows_after": rows_after,
            "columns": columns_schema
        }
        self.data["output"].update(extra)

    # ===============================
    # EXPORT
    # ===============================

    def to_dict(self, sanitized=True):
        return self._sanitize(self.data) if sanitized else self.data

    def save(self, filename, fmt="json"):
        fmt = fmt.lower()
        clean_data = self._sanitize(self.data)

        if fmt == "json":
            with open(filename, "w", encoding="utf-8") as f:
                json.dump(clean_data, f, indent=4, ensure_ascii=False)

        elif fmt in ("yaml", "yml"):
            with open(filename, "w", encoding="utf-8") as f:
                yaml.safe_dump(clean_data, f, sort_keys=False, allow_unicode=True)

        else:
            raise ValueError("Formato no soportado")

        print(f"✅ Pipeline guardado en {filename}")




Inicialización de Logger

In [None]:
# 1. Crear el logger
logger = PipelineLogger(
    name="ETL",
    version="0.0.1",
    description="Limpieza y transformación de datos",
    author="Pavel Estrella G."
)

### Fijar Aleatorización

In [None]:
RANDOM_STATE = 42

## **2. Ingesta y Unión de Datos**

Se realiza la ingesta de datos, para mi caso se tiene 7 archivos de diferentes fuentes y con diferentes tipos de archivos. En primera instancia se cargaran los datos, luego se validaran que clave única tienen para proceder a la unión y construcción de un dataframe único.


### **2.1 Carga de Datos**

In [None]:
# Leer datos y registrar fuente de Perfil del Cliente

path_c_profile = "hpestrellag/customer_profile"
ds = load_dataset(path_c_profile)
df_c_profile = ds['train'].to_pandas()

logger.add_source(
    source_id = "customer_profile",
    source_type="huggingface_datasets",
    path=path_c_profile,
    rows_before=len(df_c_profile),
    columns=df_c_profile.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="customer_profile"
)

# Leer datos y registrar fuente de Minutos de Consumo

path_engagement_minutes = "hpestrellag/engagement_minutes"
ds = load_dataset(path_engagement_minutes)
df_engagement_minutes = ds['train'].to_pandas()

logger.add_source(
    source_id="engagement_minutes",
    source_type="huggingface_datasets",
    path=path_engagement_minutes,
    rows_before=len(df_engagement_minutes),
    columns=df_engagement_minutes.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="engagement_minutes"
)

# Leer datos y registrar fuente de Ciclo de Vida de Suscripcion

path_subscription_lifecycle = "hpestrellag/subscription_lifecycle"
ds = load_dataset(path_subscription_lifecycle)
df_subscription_lifecycle = ds['train'].to_pandas()

logger.add_source(
    source_id="subscription_lifecycle",
    source_type="huggingface_datasets",
    path=path_subscription_lifecycle,
    rows_before=len(df_subscription_lifecycle),
    columns=df_subscription_lifecycle.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="subscription_lifecycle"
)

# Leer datos y registrar fuente de Actividad en Aplicación

path_inapp_activity = "hpestrellag/inapp_activity"
ds = load_dataset(path_inapp_activity)
df_inapp_activity = ds['train'].to_pandas()

logger.add_source(
    source_id="inapp_activity",
    source_type="huggingface_datasets",
    path=path_inapp_activity,
    rows=len(df_inapp_activity),
    columns=df_inapp_activity.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="inapp_activity"
)

# Leer datos y registrar fuente de Facturación

path_payments_delinquency = "hpestrellag/payments_delinquency"
ds = load_dataset(path_payments_delinquency)
df_payments_delinquency = ds['train'].to_pandas()

logger.add_source(
    source_id="payments_delinquency",
    source_type="huggingface_datasets",
    path=path_payments_delinquency,
    rows_before=len(df_payments_delinquency),
    columns=df_payments_delinquency.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="payments_delinquency"
)

# Leer datos y registrar fuente de Problemas en la Plataforma

path_incidents_platform_account = "hpestrellag/incidents_platform_account"
ds = load_dataset(path_incidents_platform_account)
df_incidents_platform_account = ds['train'].to_pandas()

logger.add_source(
    source_id="incidents_platform_account",
    source_type="huggingface_datasets",
    path=path_incidents_platform_account,
    rows_before=len(df_incidents_platform_account),
    columns=df_incidents_platform_account.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="incidents_platform_account"
)

# Leer datos y registrar fuente de Soporte

path_support_interactions = "hpestrellag/support_interactions"
ds = load_dataset(path_support_interactions)
df_support_interactions = ds['train'].to_pandas()

logger.add_source(
    source_id="support_interactions",
    source_type="huggingface_datasets",
    path=path_support_interactions,
    rows_before=len(df_support_interactions),
    columns=df_support_interactions.columns,
    sistema_origen="dataset_generados_hpestrellag",
    tabla_origen="support_interactions"
)

### **2.1 Descripción**

Se aplica `info()` a cada uno de los 7 *dataframes* para revisar su estructura —tipos de datos, cantidad de registros y valores nulos— con el fin de validar si la clave única está correctamente definida en cada conjunto.

In [None]:
df_c_profile.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 6 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   customer_id        500000 non-null  object 
 1   department         500000 non-null  object 
 2   consumption_style  500000 non-null  object 
 3   locality           500000 non-null  object 
 4   province           500000 non-null  object 
 5   monthly_fee_usd    500000 non-null  float64
dtypes: float64(1), object(5)
memory usage: 22.9+ MB


In [None]:
df_engagement_minutes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 17 columns):
 #   Column                                 Non-Null Count   Dtype  
---  ------                                 --------------   -----  
 0   customer_id                            500000 non-null  object 
 1   minutes_watched_rest_of_last_week      423420 non-null  float64
 2   minutes_watched_week_2                 423085 non-null  float64
 3   minutes_watched_week_3                 423525 non-null  float64
 4   minutes_watched_week_4                 422868 non-null  float64
 5   minutes_watched_last_3d                423506 non-null  float64
 6   minutes_watched_last_1d                423415 non-null  float64
 7   minutes_watched_second_last_day        423263 non-null  float64
 8   minutes_watched_third_last_day         423588 non-null  float64
 9   minutes_watched_delta_rest_of_week     423368 non-null  float64
 10  minutes_watched_delta_week_1_2         423300 non-null  

In [None]:
df_subscription_lifecycle.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 14 columns):
 #   Column                       Non-Null Count   Dtype  
---  ------                       --------------   -----  
 0   customer_id                  500000 non-null  object 
 1   churn                        500000 non-null  object 
 2   joined_with_promo            500000 non-null  object 
 3   welcome_promo_flag           500000 non-null  object 
 4   cancellation_ticket          500000 non-null  object 
 5   retention_ticket             500000 non-null  object 
 6   retention_followup_ticket    500000 non-null  object 
 7   last_default_payment_method  500000 non-null  object 
 8   days_inactive                500000 non-null  float64
 9   cancel_date                  79268 non-null   object 
 10  signup_date                  500000 non-null  object 
 11  reactivation_date            210054 non-null  object 
 12  last_close_date              209999 non-null  object 
 13 

**2.1.1 Solucionar Problemas de Carga**

In [None]:
df_inapp_activity.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500001 entries, 0 to 500000
Data columns (total 1 columns):
 #   Column  Non-Null Count   Dtype 
---  ------  --------------   ----- 
 0   text    500001 non-null  object
dtypes: object(1)
memory usage: 3.8+ MB


Existe un problema porque solo hay una columna, por lo cual primero inspeccionamos la primera fila para ver el problema:

In [None]:
df_inapp_activity.iloc[0, 0]

'customer_id\tinapp_purchases_6m\tinapp_purchases_1m\tinapp_purchases_3m\tinapp_purchases_12m\tdevice_model'

Se presenta un separador TAB (\t), por lo cual realizamos un split para separar las columnas:

In [None]:
tmp = df_inapp_activity.iloc[:, 0].astype(str).str.split("\t", expand=True)
tmp.head()

Unnamed: 0,0,1,2,3,4,5
0,customer_id,inapp_purchases_6m,inapp_purchases_1m,inapp_purchases_3m,inapp_purchases_12m,device_model
1,SLX83969,-,-,3.0,7.0,Laptop
2,UTT27594,1.0,-,0.0,0.0,Media Hub
3,HER11355,-,-,0.0,0.0,Smartphone
4,ZRQ19908,-,1.0,1.0,0.0,Desktop


Asignamos el nombre a las columnas correctamente:

In [None]:
tmp.columns = tmp.iloc[0].astype(str).str.strip()
df_inapp_activity = tmp.iloc[1:].reset_index(drop=True)
df_inapp_activity.head()

Unnamed: 0,customer_id,inapp_purchases_6m,inapp_purchases_1m,inapp_purchases_3m,inapp_purchases_12m,device_model
0,SLX83969,-,-,3.0,7.0,Laptop
1,UTT27594,1.0,-,0.0,0.0,Media Hub
2,HER11355,-,-,0.0,0.0,Smartphone
3,ZRQ19908,-,1.0,1.0,0.0,Desktop
4,EAT08840,-,-,3.0,28.0,Smartphone


Validamos y registramos lo que se realizó

In [None]:
df_inapp_activity.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 6 columns):
 #   Column               Non-Null Count   Dtype 
---  ------               --------------   ----- 
 0   customer_id          500000 non-null  object
 1   inapp_purchases_6m   500000 non-null  object
 2   inapp_purchases_1m   500000 non-null  object
 3   inapp_purchases_3m   500000 non-null  object
 4   inapp_purchases_12m  500000 non-null  object
 5   device_model         500000 non-null  object
dtypes: object(6)
memory usage: 22.9+ MB


In [None]:
logger.log_step(
    action="extraer_encabezado_primera_fila",
    description="Se separó la columna única y se tomó la primera fila como encabezado",
    input_df="df_inapp_activity",
    output_df="df_inapp_activity",
    separator="\\t"
)

In [None]:
df_payments_delinquency.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 9 columns):
 #   Column                                   Non-Null Count   Dtype 
---  ------                                   --------------   ----- 
 0   customer_id                              500000 non-null  object
 1   default_payment_method                   500000 non-null  object
 2   activation_discount_pct                  500000 non-null  object
 3   payment_declines_12m                     500000 non-null  object
 4   delinquency_email_alerts_6m              500000 non-null  object
 5   delinquency_email_alerts_1m              500000 non-null  object
 6   delinquency_email_alerts_3m              500000 non-null  object
 7   delinquency_email_alerts_12m             500000 non-null  object
 8   delinquency_email_alerts_6m_ineffective  500000 non-null  object
dtypes: object(9)
memory usage: 34.3+ MB


In [None]:
df_incidents_platform_account.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 10 columns):
 #   Column                     Non-Null Count   Dtype 
---  ------                     --------------   ----- 
 0   customer_id                500000 non-null  object
 1   platform_incident_a        500000 non-null  object
 2   platform_incident_b        500000 non-null  object
 3   platform_incident_c        500000 non-null  object
 4   platform_incident_d        500000 non-null  object
 5   account_incident_a         500000 non-null  object
 6   account_incident_b         500000 non-null  object
 7   account_incident_c         500000 non-null  object
 8   account_incident_d         500000 non-null  object
 9   service_interruptions_12m  500000 non-null  object
dtypes: object(10)
memory usage: 38.1+ MB


In [None]:
df_support_interactions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 20 columns):
 #   Column                         Non-Null Count   Dtype 
---  ------                         --------------   ----- 
 0   customer_id                    500000 non-null  object
 1   chatbot_interactions_6m        500000 non-null  object
 2   chatbot_interactions_1m        500000 non-null  object
 3   chatbot_interactions_3m        500000 non-null  object
 4   chatbot_interactions_12m       500000 non-null  object
 5   support_interactions_6m        500000 non-null  object
 6   support_interactions_1m        500000 non-null  object
 7   support_interactions_3m        500000 non-null  object
 8   support_interactions_12m       500000 non-null  object
 9   offline_support_6m             500000 non-null  object
 10  offline_support_1m             500000 non-null  object
 11  offline_support_3m             500000 non-null  object
 12  internal_interactions          500000 non-nu

Dado que nuestra clave única es `customer_id`, utilizaremos este identificador para unificar los *dataframes*. Para ello emplearemos `merge`, tomando como base el *dataframe* principal `customer_profile`, que contiene la información de los clientes suscritos-abandonados.


In [None]:
dfs = [
    ("customer_profile", df_c_profile),
    ("engagement_minutes", df_engagement_minutes),
    ("subscription_lifecycle", df_subscription_lifecycle),
    ("inapp_activity", df_inapp_activity),
    ("payments_delinquency", df_payments_delinquency),
    ("incidents_platform_account", df_incidents_platform_account),
    ("support_interactions", df_support_interactions),
]

# Base
base_name, df_combined = dfs[0]

logger.log_step(
    action="asignar_tabla_base",
    description="Customer Profile definido como tabla base obligatoria",
    output_df=base_name,
    rows=len(df_combined),
    primary_key="customer_id"
)

# Iterar joins
for right_name, current_df in dfs[1:]:

    rows_left_before = len(df_combined)
    rows_right_before = len(current_df)
    cols_left_before = set(df_combined.columns)

    # JOIN
    df_combined = df_combined.merge(
        current_df,
        on="customer_id",
        how="left"
    )

    cols_after = set(df_combined.columns)
    columns_added = list(cols_after - cols_left_before)

    # LOG DEL JOIN
    logger.log_join(
        left_df=base_name,
        right_df=right_name,
        how="left",
        left_on=["customer_id"],
        right_on=["customer_id"],
        result_df="customer_360",
        rows_before_left=rows_left_before,
        rows_before_right=rows_right_before,
        rows_after=len(df_combined),
        columns_added=columns_added
    )

# Resultado final
df = df_combined

Se valida nuevamente con `info()` par validar la estructura de datos, el nombre de variables, y valores nulos.

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 76 columns):
 #   Column                                   Non-Null Count   Dtype  
---  ------                                   --------------   -----  
 0   customer_id                              500000 non-null  object 
 1   department                               500000 non-null  object 
 2   consumption_style                        500000 non-null  object 
 3   locality                                 500000 non-null  object 
 4   province                                 500000 non-null  object 
 5   monthly_fee_usd                          500000 non-null  float64
 6   minutes_watched_rest_of_last_week        423420 non-null  float64
 7   minutes_watched_week_2                   423085 non-null  float64
 8   minutes_watched_week_3                   423525 non-null  float64
 9   minutes_watched_week_4                   422868 non-null  float64
 10  minutes_watched_last_3d         

Para auditar `customer_id` no encontrados:

In [None]:
lost_customers = (
    df_c_profile["customer_id"]
    .nunique()
    - df["customer_id"].nunique()
)

logger.log_step(
    action="validación_después_join",
    description="Validación de integridad del customer_id luego de todos los joins",
    base_customers=df_c_profile["customer_id"].nunique(),
    final_customers=df["customer_id"].nunique(),
    customers_lost=lost_customers,
    status="OK" if lost_customers == 0 else "ERROR"
)


**Validemos mediante el Logger**

In [None]:
registros = logger.to_dict()
registros

{'pipeline': {'name': 'ETL',
  'version': '0.0.1',
  'description': 'Limpieza y transformación de datos',
  'author': 'Pavel Estrella G.',
  'created_at': '2025-12-18 00:55:29'},
 'sources': [{'source_id': 'customer_profile',
   'type': 'huggingface_datasets',
   'path': 'hpestrellag/customer_profile',
   'rows': None,
   'columns': ['customer_id',
    'department',
    'consumption_style',
    'locality',
    'province',
    'monthly_fee_usd'],
   'rows_before': 500000,
   'sistema_origen': 'dataset_generados_hpestrellag',
   'tabla_origen': 'customer_profile'},
  {'source_id': 'engagement_minutes',
   'type': 'huggingface_datasets',
   'path': 'hpestrellag/engagement_minutes',
   'rows': None,
   'columns': ['customer_id',
    'minutes_watched_rest_of_last_week',
    'minutes_watched_week_2',
    'minutes_watched_week_3',
    'minutes_watched_week_4',
    'minutes_watched_last_3d',
    'minutes_watched_last_1d',
    'minutes_watched_second_last_day',
    'minutes_watched_third_last_d

## **3. Normalización de nombres y limpieza de datos**

Se normalizan los nombres de las columnas eliminando espacios, caracteres especiales y aplicando formato en minúsculas con guiones bajos. Además, se realiza la depuración de datos para corregir inconsistencias y asegurar la integridad del dataset.

### **3.1. Normalización de nombres**

In [None]:
# Mostramos los nombres originales
print("Columnas originales:")
print(df.columns)

Columnas originales:
Index(['customer_id', 'department', 'consumption_style', 'locality',
       'province', 'monthly_fee_usd', 'minutes_watched_rest_of_last_week',
       'minutes_watched_week_2', 'minutes_watched_week_3',
       'minutes_watched_week_4', 'minutes_watched_last_3d',
       'minutes_watched_last_1d', 'minutes_watched_second_last_day',
       'minutes_watched_third_last_day', 'minutes_watched_delta_rest_of_week',
       'minutes_watched_delta_week_1_2', 'minutes_watched_delta_week_2_3',
       'minutes_watched_delta_week_3_4', 'minutes_watched_delta_last_3d',
       'minutes_watched_delta_last_1d',
       'minutes_watched_delta_second_last_day',
       'minutes_watched_delta_third_last_day', 'churn', 'joined_with_promo',
       'welcome_promo_flag', 'cancellation_ticket', 'retention_ticket',
       'retention_followup_ticket', 'last_default_payment_method',
       'days_inactive', 'cancel_date', 'signup_date', 'reactivation_date',
       'last_close_date', 'account_trans

En este caso no es necesaria la normalización de los nombres de las columnas, pero se construyó un pequeño bloque de código reutilizable para realizar esta limpieza cuando se requiera. Este permite convertir todos los nombres de variables a minúsculas, reemplazar los espacios por guiones bajos y eliminar caracteres especiales.

In [None]:
# --- Guardar columnas originales ---
cols_before = df.columns.astype(str).tolist()

# --- Limpieza de nombres de columnas ---
df.columns = (
    df.columns.astype(str)
    .str.strip()
    .str.lower()
    .str.replace(" ", "_", regex=False)
    .str.replace(r"[^\w]+", "", regex=True)
)

cols_after = df.columns.tolist()

# --- Mapeo antes -> después (solo cambios) ---
mapping_changed = {
    b: a for b, a in zip(cols_before, cols_after) if b != a
}

# --- Detectar colisiones (nombres duplicados tras limpiar) ---
dup_mask = pd.Index(cols_after).duplicated(keep=False)
duplicated_cols = sorted(set([c for c, d in zip(cols_after, dup_mask) if d]))

# --- Log del paso ---
logger.log_step(
    action="limpieza_nombre_columnas",
    description="Estandarización de nombres de columnas (strip, lower, espacios->'_', remove non-alnum)",
    input_df="customer_360_pre_clean",
    output_df="customer_360",
    n_columns_before=len(cols_before),
    n_columns_after=len(cols_after),
    n_columns_changed=len(mapping_changed),
    mapping_sample=list(mapping_changed.items())[:50],
    duplicated_columns_after_clean=duplicated_cols,
    has_collisions=len(duplicated_cols) > 0
)

print("Columnas limpias:")
print(df.columns)


Columnas limpias:
Index(['customer_id', 'department', 'consumption_style', 'locality',
       'province', 'monthly_fee_usd', 'minutes_watched_rest_of_last_week',
       'minutes_watched_week_2', 'minutes_watched_week_3',
       'minutes_watched_week_4', 'minutes_watched_last_3d',
       'minutes_watched_last_1d', 'minutes_watched_second_last_day',
       'minutes_watched_third_last_day', 'minutes_watched_delta_rest_of_week',
       'minutes_watched_delta_week_1_2', 'minutes_watched_delta_week_2_3',
       'minutes_watched_delta_week_3_4', 'minutes_watched_delta_last_3d',
       'minutes_watched_delta_last_1d',
       'minutes_watched_delta_second_last_day',
       'minutes_watched_delta_third_last_day', 'churn', 'joined_with_promo',
       'welcome_promo_flag', 'cancellation_ticket', 'retention_ticket',
       'retention_followup_ticket', 'last_default_payment_method',
       'days_inactive', 'cancel_date', 'signup_date', 'reactivation_date',
       'last_close_date', 'account_transfer

In [None]:
logger.data["transformations"][-1]

{'step': 4,
 'action': 'limpieza_nombre_columnas',
 'description': "Estandarización de nombres de columnas (strip, lower, espacios->'_', remove non-alnum)",
 'input_df': 'customer_360_pre_clean',
 'output_df': 'customer_360',
 'details': {'n_columns_before': 76,
  'n_columns_after': 76,
  'n_columns_changed': 0,
  'mapping_sample': [],
  'duplicated_columns_after_clean': [],
  'has_collisions': False}}

### **3.2. Limpieza de Datos**

En esta sección se realizarán las labores de limpieza de datos de acuerdo con la documentación del modelo de datos, asegurando que cada campo cumpla con los formatos, rangos y reglas de negocio definidos. Para validar estructura de datos usamos `info()`

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 76 columns):
 #   Column                                   Non-Null Count   Dtype  
---  ------                                   --------------   -----  
 0   customer_id                              500000 non-null  object 
 1   department                               500000 non-null  object 
 2   consumption_style                        500000 non-null  object 
 3   locality                                 500000 non-null  object 
 4   province                                 500000 non-null  object 
 5   monthly_fee_usd                          500000 non-null  float64
 6   minutes_watched_rest_of_last_week        423420 non-null  float64
 7   minutes_watched_week_2                   423085 non-null  float64
 8   minutes_watched_week_3                   423525 non-null  float64
 9   minutes_watched_week_4                   422868 non-null  float64
 10  minutes_watched_last_3d         

#### **3.2.1 Limpieza de Variables Numéricas**

Se realiza una limpieza de las variables numericas, validando con los listados proporcionados en la documentación de las bases de datos.

##### **Decimales**

La validación se realiza con`describe()` para validar maximos, minimos con la documentación.

*Variables relacionadas con minutos de visualización.*

In [None]:
minutes_cols = [col for col in df.columns if col.startswith('minutes')]
df[minutes_cols].describe()

Unnamed: 0,minutes_watched_rest_of_last_week,minutes_watched_week_2,minutes_watched_week_3,minutes_watched_week_4,minutes_watched_last_3d,minutes_watched_last_1d,minutes_watched_second_last_day,minutes_watched_third_last_day,minutes_watched_delta_rest_of_week,minutes_watched_delta_week_1_2,minutes_watched_delta_week_2_3,minutes_watched_delta_week_3_4,minutes_watched_delta_last_3d,minutes_watched_delta_last_1d,minutes_watched_delta_second_last_day,minutes_watched_delta_third_last_day
count,423420.0,423085.0,423525.0,422868.0,423506.0,423415.0,423263.0,423588.0,423368.0,423300.0,423595.0,423005.0,422940.0,422990.0,423632.0,423567.0
mean,-350.968274,-393.341076,-525.421772,-477.635984,165.656875,166.428283,165.716939,165.798804,5.216671,5.702097,5.018324,5.350666,4.994175,0.049745,0.049655,4.835913
std,2233.898428,2318.226441,2555.978045,2472.84242,110.126582,109.878658,110.173997,110.02565,21.076393,21.324206,21.183011,21.257946,21.140473,0.217313,0.217077,20.655218
min,-10000.0,-10000.0,-10000.0,-10000.0,-210.213833,-208.503702,-209.437444,-209.926159,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,181.896574,180.761551,177.142867,178.977901,182.864441,183.698952,182.905011,182.984192,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,201.071986,200.804282,199.877766,200.2989,201.246043,201.497024,201.248934,201.248387,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,215.171373,215.030153,214.469509,214.817489,215.169999,215.234473,215.13183,215.128747,0.0,0.0,0.0,0.005206,0.0,0.0,0.0,0.007244
max,239.999806,239.999828,239.999567,239.999823,239.999709,239.999939,239.999689,239.999433,100.0,100.0,100.0,100.0,100.0,1.0,1.0,100.0


*Variables relacionadas con porcentaje de descuento.*

In [None]:
df[['activation_discount_pct']].describe()

Unnamed: 0,activation_discount_pct
count,500000
unique,125025
top,0
freq,374785


##### **Enteros**

La validación se realiza con `nunique()` para validar maximos, minimos con la documentación.

*Variables relacionadas con compras en la app.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("inapp_purchases_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)



Columna: inapp_purchases_6m
  Únicos: 2
  Valores: ['-' '1.0']
----------------------------------------
Columna: inapp_purchases_1m
  Únicos: 36
  Valores: ['-' '1.0' '3.0' '7.0' '2.0' '4.0' '5.0' '8.0' '14.0' '25.0' '9.0' '17.0'
 '20.0' '10.0' '31.0' '21.0' '13.0' '11.0' '18.0' '16.0' '23.0' '32.0'
 '30.0' '12.0' '19.0' '6.0' '15.0' '34.0' '28.0' '24.0' '26.0' '29.0'
 '22.0' '33.0' '27.0' '35.0']
----------------------------------------
Columna: inapp_purchases_3m
  Únicos: 37
  Valores: ['3.0' '0.0' '1.0' '9.0' '4.0' '5.0' '12.0' '2.0' '17.0' '10.0' '6.0'
 '11.0' '8.0' '7.0' '33.0' '16.0' '21.0' '35.0' '18.0' '34.0' '19.0'
 '26.0' '14.0' '27.0' '15.0' '25.0' '24.0' '22.0' '13.0' '30.0' '32.0'
 '31.0' '20.0' '23.0' '28.0' '29.0' '36.0']
----------------------------------------
Columna: inapp_purchases_12m
  Únicos: 87
  Valores: ['7.0' '0.0' '28.0' '1.0' '23.0' '40.0' '20.0' '45.0' '37.0' '3.0' '36.0'
 '8.0' '13.0' '5.0' '2.0' '49.0' '38.0' '46.0' '4.0' '11.0' '48.0' '47.0'
 '6.0' '26

Para las columnas `inapp_purchases_1m` y `inapp_purchases_6m` se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.

In [None]:
cols_to_convert = ['inapp_purchases_1m', 'inapp_purchases_6m']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_inapp_purchases_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 2 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   inapp_purchases_1m  500000 non-null  float64
 1   inapp_purchases_6m  500000 non-null  float64
dtypes: float64(2)
memory usage: 7.6 MB


*Variables relacionadas con faltas de pago.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("delinquency_email_alerts_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: delinquency_email_alerts_6m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: delinquency_email_alerts_1m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: delinquency_email_alerts_3m
  Únicos: 2
  Valores: ['-' '1']
----------------------------------------
Columna: delinquency_email_alerts_12m
  Únicos: 2
  Valores: ['-' '1']
----------------------------------------
Columna: delinquency_email_alerts_6m_ineffective
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------


Para las columnas `delinquency_email_alerts_3m` y `delinquency_email_alerts_12m` se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.

In [None]:
cols_to_convert = ['delinquency_email_alerts_3m', 'delinquency_email_alerts_12m']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_delinquency_email_alerts_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 2 columns):
 #   Column                        Non-Null Count   Dtype
---  ------                        --------------   -----
 0   delinquency_email_alerts_3m   500000 non-null  int64
 1   delinquency_email_alerts_12m  500000 non-null  int64
dtypes: int64(2)
memory usage: 7.6 MB


*Variables relacionadas con afectaciones en plataforma.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("platform_incident_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: platform_incident_a
  Únicos: 4
  Valores: ['0' '1' '2' '3']
----------------------------------------
Columna: platform_incident_b
  Únicos: 53
  Valores: ['7' '0' '21' '10' '15' '1' '6' '2' '4' '3' '24' '17' '28' '13' '5' '49'
 '16' '23' '9' '12' '48' '38' '26' '22' '29' '27' '30' '11' '33' '8' '42'
 '14' '44' '18' '47' '25' '32' '37' '19' '39' '36' '45' '31' '34' '50'
 '43' '35' '41' '40' '20' '46' '51' '52']
----------------------------------------
Columna: platform_incident_c
  Únicos: 15
  Valores: ['0' '9' '3' '4' '1' '7' '2' '12' '5' '8' '6' '13' '14' '11' '10']
----------------------------------------
Columna: platform_incident_d
  Únicos: 14
  Valores: ['2' '3' '-' '6' '1' '4' '12' '5' '11' '9' '13' '7' '8' '10']
----------------------------------------


Para las columnas `platform_incident_d` se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.

In [None]:
cols_to_convert = ['platform_incident_d']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_platform_incident_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 1 columns):
 #   Column               Non-Null Count   Dtype
---  ------               --------------   -----
 0   platform_incident_d  500000 non-null  int64
dtypes: int64(1)
memory usage: 3.8 MB


*Variables relacionadas con afectaciones en cuenta.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("account_incident_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: account_incident_a
  Únicos: 2
  Valores: ['-' '1']
----------------------------------------
Columna: account_incident_b
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: account_incident_c
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: account_incident_d
  Únicos: 3
  Valores: ['0' '1' '2']
----------------------------------------


Para las columnas `account_incident_a` se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.

In [None]:
cols_to_convert = ['account_incident_a']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_account_incident_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 1 columns):
 #   Column              Non-Null Count   Dtype
---  ------              --------------   -----
 0   account_incident_a  500000 non-null  int64
dtypes: int64(1)
memory usage: 3.8 MB


*Variables relacionadas con interacciones con chatbots.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("chatbot_interactions_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: chatbot_interactions_6m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: chatbot_interactions_1m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: chatbot_interactions_3m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: chatbot_interactions_12m
  Únicos: 16
  Valores: ['0' '1' '6' '2' '4' '3' '9' '5' '8' '7' '13' '11' '10' '12' '14' '15']
----------------------------------------


*Variables relacionadas con soporte.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("support_interactions_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: support_interactions_6m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: support_interactions_1m
  Únicos: 2
  Valores: ['-' '1']
----------------------------------------
Columna: support_interactions_3m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: support_interactions_12m
  Únicos: 15
  Valores: ['-' '1' '4' '3' '14' '2' '11' '9' '7' '5' '6' '8' '13' '10' '12']
----------------------------------------


Para las columnas `support_interactions_1m`,`support_interactions_12m`, se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.

In [None]:
cols_to_convert = ['support_interactions_1m','support_interactions_12m']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_support_interactions_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 2 columns):
 #   Column                    Non-Null Count   Dtype
---  ------                    --------------   -----
 0   support_interactions_1m   500000 non-null  int64
 1   support_interactions_12m  500000 non-null  int64
dtypes: int64(2)
memory usage: 7.6 MB


*Variables relacionadas con soporte offline.*

In [None]:
# Filtrar columnas que comiencen con ciertas secuencia de palabras
cols = [col for col in df.columns if col.startswith("offline_support_")]

# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: offline_support_6m
  Únicos: 2
  Valores: ['-' '1']
----------------------------------------
Columna: offline_support_1m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: offline_support_3m
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------



Para las columnas `offline_support_6m`, se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.

In [None]:
cols_to_convert = ['offline_support_6m']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_offline_support_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 1 columns):
 #   Column              Non-Null Count   Dtype
---  ------              --------------   -----
 0   offline_support_6m  500000 non-null  int64
dtypes: int64(1)
memory usage: 3.8 MB


*Variables Otras.*

In [None]:
cols = [
    'payment_declines_12m',
    'service_interruptions_12m',
    'internal_interactions',
    'case_interactions',
    'info_requests',
    'billing_inquiries',
    'support_interaction',
    'other_interactions',
    'customer_satisfaction_tickets',
    'loyalty_tickets'
    ]

  # Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: payment_declines_12m
  Únicos: 9
  Valores: ['0' '2' '7' '4' '6' '5' '3' '1' '8']
----------------------------------------
Columna: service_interruptions_12m
  Únicos: 17
  Valores: ['0' '4' '12' '8' '5' '13' '16' '15' '1' '2' '14' '6' '10' '7' '9' '3'
 '11']
----------------------------------------
Columna: internal_interactions
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: case_interactions
  Únicos: 2
  Valores: ['1' '0']
----------------------------------------
Columna: info_requests
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: billing_inquiries
  Únicos: 2
  Valores: ['0' '1']
----------------------------------------
Columna: support_interaction
  Únicos: 46
  Valores: ['-' '1' '19' '4' '3' '25' '23' '6' '2' '27' '10' '22' '5' '20' '33' '21'
 '17' '14' '11' '7' '8' '13' '18' '9' '24' '16' '32' '12' '36' '41' '15'
 '42' '40' '34' '31' '37' '35' '39' '28' '26' '43' '29' '38' '30' '44'
 '46']
--------


Para las columnas `support_interaction`, `other_interactions`,  se encuentra el valor `-`, pero este representa al valor 0, por lo cual se debe reemplazar.\

In [None]:
cols_to_convert = ['support_interaction', 'other_interactions', 'customer_satisfaction_tickets']

# --- métricas antes ---
dtypes_before = {c: str(df[c].dtype) for c in cols_to_convert}
dash_counts = {c: int((df[c].astype(str) == "-").sum()) for c in cols_to_convert}
na_before = {c: int(df[c].isna().sum()) for c in cols_to_convert}

# --- transformación ---
for col in cols_to_convert:
    df[col] = pd.to_numeric(df[col].replace("-", 0), errors='coerce')

# --- métricas después ---
dtypes_after = {c: str(df[c].dtype) for c in cols_to_convert}
na_after = {c: int(df[c].isna().sum()) for c in cols_to_convert}
new_nas_from_coerce = {c: int(na_after[c] - na_before[c]) for c in cols_to_convert}

# --- log ---
logger.log_step(
    action="convertir_otras_variables_a_numerico",
    description="Reemplazo de '-' por 0 y conversión a numérico con errors='coerce'",
    input_df="customer_360_pre_typecast",
    output_df="customer_360",
    columns=cols_to_convert,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after,
    dash_replaced_counts=dash_counts,
    na_before=na_before,
    na_after=na_after,
    new_nas_from_coerce=new_nas_from_coerce
)

# --- validación rápida ---
df[cols_to_convert].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 3 columns):
 #   Column                         Non-Null Count   Dtype
---  ------                         --------------   -----
 0   support_interaction            500000 non-null  int64
 1   other_interactions             500000 non-null  int64
 2   customer_satisfaction_tickets  500000 non-null  int64
dtypes: int64(3)
memory usage: 11.4 MB


#### **3.2.2 Limpieza de Variables Categóricas**

Se realiza una limpieza de las variables categoricas, validando con los listados proporcionados en la documentación de las bases de datos.

En primer lugar, se utiliza `describe(include="object")` para identificar las variables categóricas.

In [None]:
cols_object_all = df.select_dtypes(include=["object"]).columns.tolist()

logger.log_step(
    action="identificar_columnas_tipo_objeto",
    description="Se detectaron columnas con dtype object para perfilado y limpieza",
    n_object_columns=len(cols_object_all),
    object_columns=cols_object_all
)

print("Columnas object detectadas:", len(cols_object_all))
print(cols_object_all)

Columnas object detectadas: 45
['customer_id', 'department', 'consumption_style', 'locality', 'province', 'churn', 'joined_with_promo', 'welcome_promo_flag', 'cancellation_ticket', 'retention_ticket', 'retention_followup_ticket', 'last_default_payment_method', 'cancel_date', 'signup_date', 'reactivation_date', 'last_close_date', 'inapp_purchases_3m', 'inapp_purchases_12m', 'device_model', 'default_payment_method', 'activation_discount_pct', 'payment_declines_12m', 'delinquency_email_alerts_6m', 'delinquency_email_alerts_1m', 'delinquency_email_alerts_6m_ineffective', 'platform_incident_a', 'platform_incident_b', 'platform_incident_c', 'account_incident_b', 'account_incident_c', 'account_incident_d', 'service_interruptions_12m', 'chatbot_interactions_6m', 'chatbot_interactions_1m', 'chatbot_interactions_3m', 'chatbot_interactions_12m', 'support_interactions_6m', 'support_interactions_3m', 'offline_support_1m', 'offline_support_3m', 'internal_interactions', 'case_interactions', 'info_req

Validamos cada columna:

*   parece booleana (si/no)
*   parece numérica disfrazada
*   parece texto/categórica real
*   tiene muchos vacíos / “-”

In [None]:
def profile_object_columns(df, cols, top_n=8):
    profiles = []
    for c in cols:
        s = df[c]
        # trabajar con strings preservando NA
        s_str = s.astype("string")
        s_clean = s_str.str.strip().str.lower()

        n = len(s)
        n_na = int(s_str.isna().sum())
        n_empty = int((s_clean.fillna("") == "").sum())
        n_dash = int((s_clean.fillna("") == "-").sum())
        nunique = int(s_clean.nunique(dropna=True))

        top_vals = s_clean.value_counts(dropna=False).head(top_n).to_dict()

        profiles.append({
            "col": c,
            "dtype": str(s.dtype),
            "n_rows": n,
            "n_na": n_na,
            "n_empty": n_empty,
            "n_dash": n_dash,
            "n_unique": nunique,
            "top_values": top_vals
        })
    return profiles

profiles = profile_object_columns(df, cols_object_all, top_n=8)

logger.log_step(
    action="caracterizar_columnas_tipo_objeto",
    description="Perfilado rápido de columnas object (NA, vacíos, '-', cardinalidad, top values)",
    n_columns=len(cols_object_all),
    sample_top_n=8,
    profiles_sample=profiles[:20]  # guarda muestra para no inflar el log
)

# Visual
for p in profiles:
    print("\n", p["col"])
    print("unique:", p["n_unique"], "NA:", p["n_na"], "empty:", p["n_empty"], "dash:", p["n_dash"])
    print("top:", p["top_values"])



 customer_id
unique: 500000 NA: 0 empty: 0 dash: 0
top: {'aev18345': 1, 'soi99182': 1, 'hlr99258': 1, 'dwd17834': 1, 'dkg68579': 1, 'fkj19635': 1, 'qiz25190': 1, 'vax65567': 1}

 department
unique: 196 NA: 0 empty: 0 dash: 0
top: {'department_cat_1': 164319, 'department_cat_109': 145152, 'department_cat_120': 12759, 'department_cat_131': 11630, 'department_cat_142': 10921, 'department_cat_153': 10039, 'department_cat_164': 9256, 'department_cat_175': 8339}

 consumption_style
unique: 5 NA: 0 empty: 0 dash: 0
top: {'occasional': 288181, 'casual': 98581, 'weekend-viewer': 73431, 'binge-watcher': 26248, 'heavy-user': 13559}

 locality
unique: 679 NA: 0 empty: 0 dash: 0
top: {'locality_cat_1': 49961, 'locality_cat_112': 33595, 'locality_cat_223': 27631, 'locality_cat_334': 19751, 'locality_cat_445': 14243, 'locality_cat_556': 12411, 'locality_cat_645': 11569, 'locality_cat_658': 10475}

 province
unique: 23 NA: 0 empty: 0 dash: 0
top: {'buenos aires': 189374, 'córdoba': 174465, 'santa fe'

Seleccionamos las columnas de textos para normalizar:

In [None]:
cols_to_text_clean = [
    "province",
    "joined_with_promo",
    "welcome_promo_flag",
    "cancellation_ticket",
    "retention_ticket",
    "retention_followup_ticket",
    "last_default_payment_method",
    "device_model",
]

# Validación: que existan
cols_to_text_clean = [c for c in cols_to_text_clean if c in df.columns]

logger.log_step(
    action="seleccionar_columnas_texto_para_depuración",
    description="Selección de columnas object a convertir a string y limpiar con limpiar_texto",
    n_selected=len(cols_to_text_clean),
    selected_columns=cols_to_text_clean
)

print("Seleccionadas para limpieza:", cols_to_text_clean)

Seleccionadas para limpieza: ['province', 'joined_with_promo', 'welcome_promo_flag', 'cancellation_ticket', 'retention_ticket', 'retention_followup_ticket', 'last_default_payment_method', 'device_model']


Convertimos a String para manejos de NaN

In [None]:
dtypes_before = df[cols_to_text_clean].dtypes.astype(str).to_dict()

df[cols_to_text_clean] = df[cols_to_text_clean].astype("string")

dtypes_after = df[cols_to_text_clean].dtypes.astype(str).to_dict()

logger.log_step(
    action="convertir_columnas_tipo_objeto_a_cadena",
    description="Conversión de columnas seleccionadas a dtype 'string' preservando NA",
    columns=cols_to_text_clean,
    dtypes_before=dtypes_before,
    dtypes_after=dtypes_after
)

Se normalizan las columnas string eliminando tildes, espacios, mayúsculas y caracteres especiales. De esta forma se unifican las categorías, evitando que valores iguales aparezcan como distintos debido a variaciones en mayúsculas, minúsculas u otros caracteres, y se obtienen descripciones más coherentes.

In [None]:
def limpiar_texto(serie: pd.Series) -> pd.Series:
    def quitar_tildes(x):
        if pd.isna(x):
            return x
        x = str(x)
        return ''.join(
            c for c in unicodedata.normalize("NFD", x)
            if unicodedata.category(c) != "Mn"
        )

    s = serie.copy()
    mask_na = s.isna()

    s = (s
         .map(quitar_tildes)
         .astype("string")
         .str.strip()
         .str.lower()
         .str.replace(" ", "_", regex=False)
         .str.replace(r"[^\w]+", "", regex=True)
    )

    s[mask_na] = pd.NA
    return s

In [None]:
before_snapshot = df[cols_to_text_clean].copy()
na_before = before_snapshot.isna().sum().to_dict()

df[cols_to_text_clean] = df[cols_to_text_clean].apply(limpiar_texto)

after_snapshot = df[cols_to_text_clean]
na_after = after_snapshot.isna().sum().to_dict()

# % cambiado + ejemplos
changed_pct = {}
examples = {}

for c in cols_to_text_clean:
    b = before_snapshot[c].astype("string")
    a = after_snapshot[c].astype("string")
    comparable = ~(b.isna() & a.isna())

    if comparable.sum() == 0:
        changed_pct[c] = 0.0
    else:
        changed_pct[c] = float((b[comparable] != a[comparable]).mean())

    m = (b != a) & comparable
    if m.any():
        examples[c] = list(zip(b[m].head(5).tolist(), a[m].head(5).tolist()))
    else:
        examples[c] = []

logger.log_step(
    action="limpieza_columnas_texto",
    description="Aplicación de limpiar_texto (sin convertir NA a 'nan'): quitar tildes, strip, lower, espacios->'_', remove non-alnum",
    input_df="customer_360_pre_text_clean",
    output_df="customer_360",
    columns=cols_to_text_clean,
    na_before=na_before,
    na_after=na_after,
    changed_pct=changed_pct,
    examples_before_after=examples
)

# Resumen (sin error): describe para string
df[cols_to_text_clean].describe(include="string")

Unnamed: 0,province,joined_with_promo,welcome_promo_flag,cancellation_ticket,retention_ticket,retention_followup_ticket,last_default_payment_method,device_model
count,500000,500000,500000,500000,500000,500000,500000,500000
unique,23,7,7,7,7,7,4,18
top,buenos_aires,si,si,si,si,si,credit_card,smartphone
freq,189374,163734,161489,250479,252918,263587,261027,226049


Algunas variables deberían ser binarias (Si/No), pero contienen otras descripciones adicionales. Por ello, se revisan las categorías presentes en cada una para identificar esas variaciones y reemplazarlas por los valores estándar Si/No.

In [None]:
cols = [
    'retention_followup_ticket',
    'retention_ticket',
    'cancellation_ticket',
    'welcome_promo_flag',
    'joined_with_promo'
    ]

  # Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: retention_followup_ticket
  Únicos: 7
  Valores: <StringArray>
['si', 's', 'sure', 'y', 'yes', 'no', 'n']
Length: 7, dtype: string
----------------------------------------
Columna: retention_ticket
  Únicos: 7
  Valores: <StringArray>
['si', 's', 'sure', 'y', 'yes', 'n', 'no']
Length: 7, dtype: string
----------------------------------------
Columna: cancellation_ticket
  Únicos: 7
  Valores: <StringArray>
['si', 's', 'sure', 'y', 'yes', 'n', 'no']
Length: 7, dtype: string
----------------------------------------
Columna: welcome_promo_flag
  Únicos: 7
  Valores: <StringArray>
['si', 'n', 's', 'sure', 'y', 'no', 'yes']
Length: 7, dtype: string
----------------------------------------
Columna: joined_with_promo
  Únicos: 7
  Valores: <StringArray>
['si', 's', 'sure', 'no', 'yes', 'n', 'y']
Length: 7, dtype: string
----------------------------------------


In [None]:
# Listas de variantes
si_variantes = ['si', 's', 'sure', 'y', 'yes']
no_variantes = ['no', 'n']

def normalizar_si_no(valor):
    if pd.isna(valor) or str(valor).strip() == '':
        return np.nan
    valor_lower = str(valor).strip().lower()
    if valor_lower in si_variantes:
        return 'si'
    elif valor_lower in no_variantes:
        return 'no'
    else:
        return np.nan

In [None]:
# --- métricas antes ---
before_snapshot = df[cols].copy()

def _count_categories_before(series):
    s = series.astype("string")
    s_clean = s.str.strip().str.lower()
    return {
        "na_or_empty": int(series.isna().sum() + (s_clean.fillna("").eq("")).sum()),
        "si_variantes": int(s_clean.isin(si_variantes).sum()),
        "no_variantes": int(s_clean.isin(no_variantes).sum()),
        "otros": int((~s_clean.isin(si_variantes + no_variantes) & s_clean.notna() & (s_clean != "")).sum()),
        "unique_raw": int(s_clean.nunique(dropna=True))
    }

counts_before = {c: _count_categories_before(before_snapshot[c]) for c in cols}

# --- aplicar normalización ---
for col_name in cols:
    df[col_name] = df[col_name].apply(normalizar_si_no).astype("string")

# --- métricas después ---
after_snapshot = df[cols]

counts_after = {}
unknown_examples = {}

for c in cols:
    a = after_snapshot[c]
    counts_after[c] = {
        "si": int((a == "si").sum()),
        "no": int((a == "no").sum()),
        "na": int(a.isna().sum()),
        "unique_after": int(pd.Series(a).nunique(dropna=True))
    }

    # ejemplos de valores no reconocidos (tomados del BEFORE)
    b = before_snapshot[c].astype("string").str.strip().str.lower()
    mask_unknown = (~b.isin(si_variantes + no_variantes)) & b.notna() & (b != "")
    top_unknown = b[mask_unknown].value_counts().head(10).to_dict()
    unknown_examples[c] = top_unknown

# --- log ---
logger.log_step(
    action="normalizacion_binaria",
    description="Normalización de variantes a categorías {si, no}; valores no reconocidos -> NaN",
    input_df="customer_360_pre_yesno",
    output_df="customer_360",
    columns=cols,
    si_variantes=si_variantes,
    no_variantes=no_variantes,
    counts_before=counts_before,
    counts_after=counts_after,
    unknown_examples_top10=unknown_examples
)

In [None]:
# Iterar sobre cada columna y mostrar número de únicos y valores
for col in cols:
    print(f"Columna: {col}")
    print("  Únicos:", df[col].nunique())
    print("  Valores:", df[col].unique())
    print("-" * 40)

Columna: retention_followup_ticket
  Únicos: 2
  Valores: <StringArray>
['si', 'no']
Length: 2, dtype: string
----------------------------------------
Columna: retention_ticket
  Únicos: 2
  Valores: <StringArray>
['si', 'no']
Length: 2, dtype: string
----------------------------------------
Columna: cancellation_ticket
  Únicos: 2
  Valores: <StringArray>
['si', 'no']
Length: 2, dtype: string
----------------------------------------
Columna: welcome_promo_flag
  Únicos: 2
  Valores: <StringArray>
['si', 'no']
Length: 2, dtype: string
----------------------------------------
Columna: joined_with_promo
  Únicos: 2
  Valores: <StringArray>
['si', 'no']
Length: 2, dtype: string
----------------------------------------


In [None]:
df[cols_to_text_clean].describe(include="string")

Unnamed: 0,province,joined_with_promo,welcome_promo_flag,cancellation_ticket,retention_ticket,retention_followup_ticket,last_default_payment_method,device_model
count,500000,500000,500000,500000,500000,500000,500000,500000
unique,23,2,2,2,2,2,4,18
top,buenos_aires,si,si,si,si,si,credit_card,smartphone
freq,189374,305331,301420,466873,471229,491336,261027,226049


### **3.3. Manejo de Tipo de Datos**

Se realiza un ajuste de tipos de datos, ya que aún existen variables cuyo tipo no se ha inferido correctamente. Esto permite optimizar el uso de memoria, asegurar que las fechas estén en el formato adecuado y, en general, garantizar que cada columna utilice el tipo de dato más apropiado.

In [None]:
# Ejemplo de tipado esperado (ajusta a tu realidad)
dtype_map = {
    "cancel_date": "datetime64[ns]",
    "signup_date": "datetime64[ns]",
    "reactivation_date": "datetime64[ns]",
    "last_close_date":  "datetime64[ns]",

    # Problema de transformacion
    "inapp_purchases_1m": "int64",
    "inapp_purchases_3m": "int64",
    "inapp_purchases_6m": "int64",
    "inapp_purchases_12m": "int64",
    "delinquency_email_alerts_1m": "int64",
    "delinquency_email_alerts_6m": "int64",
    "platform_incident_a": "int64",
    "platform_incident_b": "int64",
    "platform_incident_c": "int64",
    "account_incident_b": "int64",
    "account_incident_c": "int64",
    "account_incident_d": "int64",
    "service_interruptions_12m": "int64",
    "chatbot_interactions_6m": "int64",
    "chatbot_interactions_1m": "int64",
    "chatbot_interactions_3m": "int64",
    "chatbot_interactions_12m": "int64",
    "support_interactions_6m": "int64",
    "payment_declines_12m": "int64",
    "delinquency_email_alerts_6m_ineffective": "int64",
    "support_interactions_3m": "int64",
    "offline_support_1m": "int64",
    "offline_support_3m": "int64",
    "internal_interactions": "int64",
    "case_interactions": "int64",
    "info_requests": "int64",
    "billing_inquiries": "int64",
    "customer_satisfaction_tickets": "int64",
    "loyalty_tickets": "int64",
    "activation_discount_pct": "float64",

    # Diversas Categorias
    "customer_id": "string",
    "department": "string",
    "locality": "string",
    "province": "string",

    # Diferetnes Categorias pero controlada
    "device_model": "category",
    "default_payment_method": "category",
    "last_default_payment_method": "category",
    "consumption_style": "category",

    # Bool pero esta con Si o No
    "churn": "category",
    "joined_with_promo": "category",
    "welcome_promo_flag": "category",
    "cancellation_ticket": "category",
    "retention_ticket": "category",
    "retention_followup_ticket": "category",
}

In [None]:
# CONVERSIÓN DE TIPOS Y REGISTRO DETALLADO

conversion_log = {}

for c, t in dtype_map.items():
    if c in df.columns:
        orig = str(df[c].dtype)
        try:
            if "datetime" in str(t).lower():
                df[c] = pd.to_datetime(df[c], errors="raise")
            else:
                if str(t).startswith("int"):
                    df[c] = pd.to_numeric(df[c], errors="raise").astype(t)
                else:
                    df[c] = df[c].astype(t, errors="raise")

            new = str(df[c].dtype)
            status = "converted" if new != orig else "unchanged"

            conversion_log[c] = {
                "original": orig,
                "nuevo": new,
                "estado": status
            }

        except Exception as e:
            conversion_log[c] = {
                "original": orig,
                "nuevo": None,
                "estado": f"failed: {str(e)}"
            }
    else:
        conversion_log[c] = {
            "original": None,
            "nuevo": None,
            "estado": "missing_column"
        }

ok = sum(1 for v in conversion_log.values() if v["estado"] == "converted")
unch = sum(1 for v in conversion_log.values() if v["estado"] == "unchanged")
fail = sum(1 for v in conversion_log.values() if v["estado"].startswith("failed"))
missing = sum(1 for v in conversion_log.values() if v["estado"] == "missing_column")

# LOGGING AL PIPELINE

logger.log_step(
    action="aplicar_mapa_dtypes",
    description="Aplicación controlada del mapa de tipos esperado por columna",
    input_df="customer_360_pre_typing",
    output_df="customer_360",
    dtype_map=dtype_map,
    conversion_log=conversion_log
)

logger.log_step(
    action="resumen_conversion_dtypes",
    description="Resumen del proceso de conversión de tipos",
    total_columns=len(dtype_map),
    converted=ok,
    unchanged=unch,
    failed=fail,
    missing_columns=missing
)

# LOG SOLO DE ERRORES

failed_columns = {
    c: v for c, v in conversion_log.items()
    if v["estado"].startswith("failed")
}

if failed_columns:
    logger.log_step(
        action="errores_conversion_dtypes",
        description="Columnas que no pudieron convertirse al tipo esperado",
        failed_columns=failed_columns
    )

# VALIDACIÓN FINAL

df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 76 columns):
 #   Column                                   Non-Null Count   Dtype         
---  ------                                   --------------   -----         
 0   customer_id                              500000 non-null  string        
 1   department                               500000 non-null  string        
 2   consumption_style                        500000 non-null  category      
 3   locality                                 500000 non-null  string        
 4   province                                 500000 non-null  string        
 5   monthly_fee_usd                          500000 non-null  float64       
 6   minutes_watched_rest_of_last_week        423420 non-null  float64       
 7   minutes_watched_week_2                   423085 non-null  float64       
 8   minutes_watched_week_3                   423525 non-null  float64       
 9   minutes_watched_week_4    

In [None]:
logger.data["transformations"][-1]

{'step': 19,
 'action': 'resumen_conversion_dtypes',
 'description': 'Resumen del proceso de conversión de tipos',
 'input_df': None,
 'output_df': None,
 'details': {'total_columns': 48,
  'converted': 46,
  'unchanged': 2,
  'failed': 0,
  'missing_columns': 0}}

## **4. Duplicación por clave única**

Se valida que cada registro en la tabla sea único según la clave de negocio definida. Se detectan filas duplicadas, se documentan los criterios de consolidación o eliminación y se restablece la integridad de la clave única.

In [None]:
# --- 1. Identificar duplicados por clave única ---
clave_unica = "customer_id"

rows_before = len(df)
unique_before = int(df[clave_unica].nunique(dropna=False))

dup_mask = df.duplicated(subset=[clave_unica], keep=False)
duplicados = df[dup_mask]

n_dup_rows = int(duplicados.shape[0])
n_dup_keys = int(df.loc[dup_mask, clave_unica].nunique(dropna=False))

# (muestra pequeña para log, no guardar 500k filas)
dup_keys_sample = (
    df.loc[dup_mask, clave_unica]
      .dropna()
      .astype("string")
      .value_counts()
      .head(10)
      .to_dict()
)

logger.log_step(
    action="deteccion_duplicados_por_id",
    description="Detección de duplicados por clave única",
    key=clave_unica,
    rows_before=rows_before,
    unique_keys_before=unique_before,
    duplicated_rows=n_dup_rows,
    duplicated_keys=n_dup_keys,
    duplicated_key_counts_top10=dup_keys_sample
)

print("\n🔹 Filas duplicadas según clave única:")
print(duplicados)

# --- 2. Eliminar duplicados conservando la primera ocurrencia ---
df_sin_duplicados = df.drop_duplicates(subset=[clave_unica], keep="first")

rows_after_drop = len(df_sin_duplicados)
unique_after_drop = int(df_sin_duplicados[clave_unica].nunique(dropna=False))
rows_removed = rows_before - rows_after_drop

logger.log_step(
    action="eliminacion_duplicados_id",
    description="Eliminación de duplicados conservando la primera ocurrencia",
    key=clave_unica,
    rows_before=rows_before,
    rows_after=rows_after_drop,
    rows_removed=rows_removed,
    unique_keys_before=unique_before,
    unique_keys_after=unique_after_drop
)

print("\n🔹 DataFrame sin duplicados (manteniendo el primero):")
print(df_sin_duplicados.head(20))

# --- 3. Crear un indicador de duplicación ---
df["es_duplicado"] = dup_mask  # reusar máscara (más eficiente)

logger.log_step(
    action="indicador_marcar_duplicados",
    description="Creación de indicador booleano es_duplicado para trazabilidad",
    key=clave_unica,
    flagged_true=int(df["es_duplicado"].sum()),
    flagged_false=int((~df["es_duplicado"]).sum()),
    column_created="es_duplicado"
)

print("\n🔹 DataFrame con marca de duplicados:")
print(df.head(20))

# --- 4. LIMPIEZA FINAL DEL DATAFRAME ---
rows_before_final = len(df)

df = (
    df.drop_duplicates(subset=[clave_unica], keep="first")
      .drop(columns=["es_duplicado"])
      .reset_index(drop=True)
)

rows_after_final = len(df)
unique_after_final = int(df[clave_unica].nunique(dropna=False))

logger.log_step(
    action="limpieza_final_duplicados",
    description="Limpieza final: drop_duplicates + drop columna auxiliar + reset_index",
    key=clave_unica,
    rows_before=rows_before_final,
    rows_after=rows_after_final,
    rows_removed=int(rows_before_final - rows_after_final),
    unique_keys_after=unique_after_final,
    dropped_columns=["es_duplicado"],
    index_reset=True
)

print("\n🔹 df limpio y final:")
df.head(20)


🔹 Filas duplicadas según clave única:
Empty DataFrame
Columns: [customer_id, department, consumption_style, locality, province, monthly_fee_usd, minutes_watched_rest_of_last_week, minutes_watched_week_2, minutes_watched_week_3, minutes_watched_week_4, minutes_watched_last_3d, minutes_watched_last_1d, minutes_watched_second_last_day, minutes_watched_third_last_day, minutes_watched_delta_rest_of_week, minutes_watched_delta_week_1_2, minutes_watched_delta_week_2_3, minutes_watched_delta_week_3_4, minutes_watched_delta_last_3d, minutes_watched_delta_last_1d, minutes_watched_delta_second_last_day, minutes_watched_delta_third_last_day, churn, joined_with_promo, welcome_promo_flag, cancellation_ticket, retention_ticket, retention_followup_ticket, last_default_payment_method, days_inactive, cancel_date, signup_date, reactivation_date, last_close_date, account_transfer_ticket, inapp_purchases_6m, inapp_purchases_1m, inapp_purchases_3m, inapp_purchases_12m, device_model, default_payment_method,

Unnamed: 0,customer_id,department,consumption_style,locality,province,monthly_fee_usd,minutes_watched_rest_of_last_week,minutes_watched_week_2,minutes_watched_week_3,minutes_watched_week_4,...,offline_support_1m,offline_support_3m,internal_interactions,case_interactions,info_requests,billing_inquiries,support_interaction,other_interactions,customer_satisfaction_tickets,loyalty_tickets
0,SLX83969,department_cat_153,heavy-user,locality_cat_163,misiones,9.12,,224.101427,,210.093374,...,0,0,0,1,0,0,0,0,0,0
1,UTT27594,department_cat_1,occasional,locality_cat_268,tucuman,10.15,196.921059,168.398843,203.379723,204.660259,...,0,0,0,0,0,0,0,0,0,0
2,HER11355,department_cat_1,occasional,locality_cat_102,buenos_aires,18.93,203.359698,201.94415,198.886979,200.936218,...,0,0,0,0,1,1,1,0,0,0
3,ZRQ19908,department_cat_142,weekend-viewer,locality_cat_29,santiago_del_estero,7.36,,,223.231335,189.815427,...,0,0,0,0,1,0,19,1,0,0
4,EAT08840,department_cat_142,occasional,locality_cat_90,cordoba,7.56,189.650217,192.619353,195.423085,214.500631,...,0,0,1,0,0,1,1,0,0,0
5,KYJ36841,department_cat_1,occasional,locality_cat_1,santiago_del_estero,8.31,-157.891458,-10000.0,210.631472,214.894647,...,0,0,0,0,0,0,0,0,0,0
6,FWX15967,department_cat_120,occasional,locality_cat_335,santa_fe,8.07,207.89472,197.678903,226.220603,-10000.0,...,0,0,0,0,0,0,0,0,0,0
7,YFL75558,department_cat_131,weekend-viewer,locality_cat_630,buenos_aires,9.77,,-148.645217,,228.726474,...,0,0,0,0,0,0,0,0,0,0
8,SYQ06673,department_cat_1,occasional,locality_cat_658,cordoba,11.06,190.262435,209.899681,-235.996727,199.818424,...,0,0,0,1,1,0,0,0,0,0
9,ULR53254,department_cat_112,weekend-viewer,locality_cat_445,cordoba,11.55,224.765692,204.803722,202.416554,206.741463,...,0,0,0,0,0,0,1,0,0,0


## **5. Transformación de Fechas**

Se generan campos derivados como la diferencia en días entre fechas relevantes y la fecha de recolección de datos, para facilitar métricas temporales.

In [None]:
fecha_corte = pd.Timestamp("2025-11-23")
print("📅 Fecha de corte:", fecha_corte)

📅 Fecha de corte: 2025-11-23 00:00:00


In [None]:
df["end_date_effective"] = df["cancel_date"].fillna(fecha_corte)

df["days_tenure"] = (
    df["end_date_effective"] - df["signup_date"]
).dt.days

In [None]:
df["days_since_reactivation"] = (
    fecha_corte - df["reactivation_date"]
).dt.days

In [None]:
df["days_inactive_before_reactivation"] = (
    df["reactivation_date"] - df["last_close_date"]
).dt.days

In [None]:
inconsistentes = df[
    df["reactivation_date"].notna() & df["last_close_date"].isna()
]

print("⚠️ Reactivados sin last_close_date:", len(inconsistentes))

⚠️ Reactivados sin last_close_date: 55


In [None]:
negativos = df[
    (df["days_inactive_before_reactivation"] < 0)
]

df.loc[df["days_inactive_before_reactivation"] < 0,
       "days_inactive_before_reactivation"] = pd.NA


In [None]:
logger.log_step(
    action="definir_relacion_cercana_reactivacion",
    description=(
        "Se establece la relación semántica entre last_close_date y reactivation_date: "
        "last_close_date representa el último cierre previo a la reactivación"
    )
)

In [None]:
logger.log_step(
    action="verificar_fechas_reactivacion",
    description="Validación de consistencia entre last_close_date y reactivation_date",
    total_rows=len(df),
    reactivated=int(df["reactivation_date"].notna().sum()),
    inconsistent_reactivations=int(len(inconsistentes)),
    negative_durations_fixed=True
)




## **6. Exportar dataset limpio de cancelaciones**

Se filtra y selecciona el conjunto final de registros de clientes con estado “cancelado”, ya debidamente tipificado y limpio según el modelo de datos. A continuación, se exporta dicho dataset en formato Parquet para optimizar el almacenamiento, mantener preservación de tipos y facilitar la ingesta en downstream. Se emplean parámetros adecuados para compresión, mantenimiento (o no) del índice, y trazabilidad del archivo exportado.

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500000 entries, 0 to 499999
Data columns (total 80 columns):
 #   Column                                   Non-Null Count   Dtype         
---  ------                                   --------------   -----         
 0   customer_id                              500000 non-null  string        
 1   department                               500000 non-null  string        
 2   consumption_style                        500000 non-null  category      
 3   locality                                 500000 non-null  string        
 4   province                                 500000 non-null  string        
 5   monthly_fee_usd                          500000 non-null  float64       
 6   minutes_watched_rest_of_last_week        423420 non-null  float64       
 7   minutes_watched_week_2                   423085 non-null  float64       
 8   minutes_watched_week_3                   423525 non-null  float64       
 9   minutes_watched_week_4    

In [None]:
output_path = Path("dataset_limpio.parquet")

df.to_parquet(output_path, engine="pyarrow", index=False)

rows = len(df)
unique_customers = df["customer_id"].nunique(dropna=False)
file_size_mb = round(output_path.stat().st_size / (1024 * 1024), 2)

logger.log_step(
  action="exportar_dataset_a_parquet",
  description="Exportación del dataset de clientes cancelados en formato Parquet",
  output_type="parquet",
  path=str(output_path),
  engine="pyarrow",
  compression="snappy",
  rows_exported=rows,
  unique_customers_exported=unique_customers,
  file_size_mb=file_size_mb,
  index_included=False,
  )

print(f"✔️ Exportación completada: {rows} registros a {output_path}")

✔️ Exportación completada: 500000 registros a dataset_limpio.parquet


### **7. Esquematización**

Se documenta de forma estructurada cada transformación, limpieza y carga realizada, generando un archivo plano (por ejemplo YAML o JSON) conforme a estándares de metadatos de la industria. Este archivo sirve como artefacto de gobernanza y trazabilidad del pipeline.

In [None]:
# Guardar en JSON
logger.save("metadatos_pipeline.json", fmt="json")

# Guardar en YAML
logger.save("metadatos_pipeline.yaml", fmt="yaml")

✅ Pipeline guardado en metadatos_pipeline.json
✅ Pipeline guardado en metadatos_pipeline.yaml
