In [1]:
# This script creates a Jupyter notebook for data ingestion and preprocessing
# tailored to the "primera etapa" of the Assessment MLE. It also previews the
# provided CSV so you can confirm the schema at a glance.

import os
import json
from datetime import datetime

import pandas as pd

# Try to import nbformat to build a notebook programmatically
import nbformat
from nbformat.v4 import new_notebook, new_markdown_cell, new_code_cell

# Paths
base_dir = "/mnt/data"
notebook_path = os.path.join(base_dir, "01_pipeline_ml", "ingesta_preprocesamiento.ipynb")
os.makedirs(os.path.dirname(notebook_path), exist_ok=True)

# Parameters
csv_path = "/mnt/data/clientes.csv"
parquet_output_dir = "/mnt/data/01_pipeline_ml/outputs"
os.makedirs(parquet_output_dir, exist_ok=True)

# Build notebook cells
cells = []

# 1) Title & overview
cells.append(new_markdown_cell("""
# Ingesta y Preprocesamiento (Primera Etapa)

**Objetivo:** Cargar `clientes.csv`, validar esquema, limpiar y estandarizar tipos, crear *features* iniciales y preparar un dataset **curado** listo para *feature engineering* y *modelado* en etapas posteriores.

**Contenido:**
1. Configuración y dependencias
2. Ingesta del CSV y validación de esquema
3. Limpieza: tipos, valores faltantes y reglas de negocio
4. *EDA* breve (sanity checks)
5. *Features* iniciales (*recencia*, *tenure*, *frequency* proxy)
6. Preprocesamiento con `sklearn` (`ColumnTransformer` + `Pipeline`)
7. Exportación de artefactos
"""))

# 2) Install/imports (kept minimal; most envs already have these)
cells.append(new_code_cell("""
# %% [code]
# === 1. Configuración y dependencias ===
import os
from datetime import datetime

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
import joblib

DATA_PATH = "{csv_path}"
OUTPUT_DIR = "{parquet_output_dir}"
os.makedirs(OUTPUT_DIR, exist_ok=True)

RANDOM_STATE = 42

print(f"[INFO] DATA_PATH: {csv_path}")
print(f"[INFO] OUTPUT_DIR: {parquet_output_dir}")
""".format(csv_path=csv_path, parquet_output_dir=parquet_output_dir)))

# 3) Load CSV & schema
cells.append(new_markdown_cell("## 2. Ingesta del CSV y validación de esquema"))
cells.append(new_code_cell("""
# %% [code]
expected_cols = [
    "customer_id", "age", "gender", "signup_date", "last_purchase_date",
    "total_purchases", "avg_purchase_value", "is_active", "churned"
]

df_raw = pd.read_csv(DATA_PATH)

print("[INFO] Columnas encontradas:", list(df_raw.columns))
missing = set(expected_cols) - set(df_raw.columns)
extra = set(df_raw.columns) - set(expected_cols)
if missing:
    print(f"[WARN] Faltan columnas: {missing}")
if extra:
    print(f"[WARN] Columnas adicionales: {extra}")

df_raw.head(3)
"""))

# 4) Cast dtypes & basic cleaning
cells.append(new_markdown_cell("## 3. Limpieza: tipos, valores faltantes y reglas de negocio"))
cells.append(new_code_cell("""
# %% [code]
# Tipos de datos: aseguramos fechas y numéricos
df = df_raw.copy()

# Fechas
for col in ["signup_date", "last_purchase_date"]:
    df[col] = pd.to_datetime(df[col], errors="coerce")

# Numéricos
numeric_int = ["age", "total_purchases", "is_active", "churned"]
for col in numeric_int:
    df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")

df["avg_purchase_value"] = pd.to_numeric(df["avg_purchase_value"], errors="coerce")

# Valores imposibles / reglas de negocio simples
# - Edad en [18, 100] (dejamos margen por datos sucios)
df.loc[~df["age"].between(18, 100), "age"] = pd.NA

# - total_purchases no negativo
df.loc[df["total_purchases"].fillna(-1) < 0, "total_purchases"] = pd.NA

# - avg_purchase_value positivo
df.loc[df["avg_purchase_value"].fillna(-1) <= 0, "avg_purchase_value"] = pd.NA

# - last_purchase_date no debe ser antes que signup_date (si lo es, marcamos como NaT)
mask_bad_dates = (df["last_purchase_date"].notna() & df["signup_date"].notna() &
                  (df["last_purchase_date"] < df["signup_date"]))
df.loc[mask_bad_dates, "last_purchase_date"] = pd.NaT

df.head(3)
"""))

# 5) Brief EDA
cells.append(new_markdown_cell("## 4. EDA breve (sanity checks)"))
cells.append(new_code_cell("""
# %% [code]
print("[INFO] Dimensiones:", df.shape)
print("\\n[INFO] Porcentaje de nulos por columna:")
print((df.isna().mean() * 100).round(2).sort_values(ascending=False))

print("\\n[INFO] Balance de 'churned':")
print(df["churned"].value_counts(dropna=False, normalize=True).round(3))

print("\\n[INFO] Rango de fechas:")
for c in ["signup_date", "last_purchase_date"]:
    print(c, "->", df[c].min(), "—", df[c].max())
"""))

# 6) Feature creation
cells.append(new_markdown_cell("## 5. *Features* iniciales"))
cells.append(new_code_cell("""
# %% [code]
# Elegimos una 'fecha de referencia' como el máximo 'last_purchase_date' disponible
# (alternativamente, usar 'hoy' si el negocio lo requiere)
ref_date = pd.to_datetime(df["last_purchase_date"].max())

df["tenure_days"] = (ref_date - df["signup_date"]).dt.days
df["recency_days"] = (ref_date - df["last_purchase_date"]).dt.days

# Frecuencia proxy: total_purchases / (tenure en meses aprox.)
df["tenure_months"] = df["tenure_days"] / 30.0
df["frequency_pm"] = df["total_purchases"] / df["tenure_months"]
df.loc[~np.isfinite(df["frequency_pm"]), "frequency_pm"] = np.nan  # protege divisiones por 0

df[["customer_id","tenure_days","recency_days","frequency_pm"]].head(3)
"""))

# 7) Preprocessing with sklearn
cells.append(new_markdown_cell("## 6. Preprocesamiento con `sklearn`"))
cells.append(new_code_cell("""
# %% [code]
# Definimos variables
target_col = "churned"
id_cols = ["customer_id"]
date_cols = ["signup_date", "last_purchase_date"]
num_cols = [
    "age", "total_purchases", "avg_purchase_value",
    "is_active", "tenure_days", "recency_days", "frequency_pm"
]
cat_cols = ["gender"]

# Dataset para modelado (excluye IDs y fechas crudas)
model_df = df[id_cols + date_cols + num_cols + cat_cols + [target_col]].copy()

# Separación X/y (guardamos id para trazabilidad si hace falta)
X = model_df[num_cols + cat_cols]
y = model_df[target_col].astype("Int64")

# Transformadores
num_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

cat_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore"))
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", num_transformer, num_cols),
        ("cat", cat_transformer, cat_cols)
    ]
)

# *Fit* del preprocesador (sin entrenar modelo todavía)
X_pre = preprocessor.fit_transform(X)

print("[INFO] Forma de X antes:", X.shape)
print("[INFO] Forma de X después del preprocesamiento:", X_pre.shape)
"""))

# 8) Train/val split ready (optional)
cells.append(new_markdown_cell("### (Opcional) Train/Validation Split"))
cells.append(new_code_cell("""
# %% [code]
X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)
print("[INFO] X_train:", X_train.shape, "| X_valid:", X_valid.shape)
"""))

# 9) Export artifacts
cells.append(new_markdown_cell("## 7. Exportación de artefactos"))
cells.append(new_code_cell("""
# %% [code]
# Guardamos dataset curado (parquet) y el preprocesador
curated_path = os.path.join(OUTPUT_DIR, "clientes_curado.parquet")
preproc_path = os.path.join(OUTPUT_DIR, "preprocessor.joblib")

df.to_parquet(curated_path, index=False)
joblib.dump(preprocessor, preproc_path)

print(f"[OK] Curado guardado en: {curated_path}")
print(f"[OK] Preprocessor guardado en: {preproc_path}")

# (Opcional) Exportación de particiones de train/valid para etapas siguientes
X_train_path = os.path.join(OUTPUT_DIR, "X_train.parquet")
X_valid_path = os.path.join(OUTPUT_DIR, "X_valid.parquet")
y_train_path = os.path.join(OUTPUT_DIR, "y_train.parquet")
y_valid_path = os.path.join(OUTPUT_DIR, "y_valid.parquet")

X_train.to_parquet(X_train_path, index=False)
X_valid.to_parquet(X_valid_path, index=False)
pd.DataFrame({"churned": y_train}).to_parquet(y_train_path, index=False)
pd.DataFrame({"churned": y_valid}).to_parquet(y_valid_path, index=False)

print("[OK] Particiones de train/valid exportadas.")
"""))

# 10) Notes for GCS/Vertex
cells.append(new_markdown_cell("""
> **Nota (GCP):**  
> - Para esta primera etapa trabajamos localmente.  
> - En producción, estos artefactos se escriben a **GCS** (`gs://...`) y el *preprocessor* pasa al *model training job* en Vertex AI.  
> - Este notebook puede convertirse en un *Python component* dentro de un *Vertex Pipeline* o bien en un *Matillion Python Script* si orquestas fuera de Vertex.
"""))

# Create the notebook
nb = new_notebook(cells=cells, metadata={
    "kernelspec": {"name": "python3", "display_name": "Python 3"},
    "language_info": {"name": "python", "version": "3.x"},
    "authors": [{"name": "Assessment MLE - Ingesta/Preprocesamiento"}],
    "created": datetime.utcnow().isoformat() + "Z"
})

with open(notebook_path, "w", encoding="utf-8") as f:
    nbformat.write(nb, f)

notebook_path


ModuleNotFoundError: No module named 'nbformat'