# üéØ Pipeline Optimizado - Datathon FME

## Objetivo
Predecir **iap_revenue_d7** (revenue de usuarios que vieron un ad) usando un modelo two-stage:
1. **Clasificador**: Predice si el usuario comprar√° (buyer_d7)
2. **Regresor**: Predice el revenue solo para compradores

## üìã Instrucciones de Uso

### Ejecutar en orden:
1. **Celda 3**: Cargar datos de train con Dask
2. **Celda 5**: Procesar train (feature engineering + object parsing) ‚Üí Guardar en disco
3. **Celda 7**: Preprocessing avanzado (binning, interacciones, encoding)
4. **Celda 9**: Entrenar modelo two-stage con LightGBM
5. **Celda 11**: Procesar test set
6. **Celda 12**: Generar predicciones y submission.csv

## üîß Preprocessing Incluido

### Fase 1: Feature Engineering B√°sico
- Ratios de actividad (days_active_ratio, sessions_per_week)
- Flags de comportamiento (is_new_user, is_veteran_user, recent_buyer)
- M√©tricas de engagement (total_time_in_app)

### Fase 2: Extracci√≥n de Object Columns
- Stats de arrays/dicts: count, sum, max, min, mean, std
- Top 7 columnas prioritarias relacionadas con revenue

### Fase 3: Preprocessing Avanzado
- **Binning**: Discretizaci√≥n de variables continuas (user_age, sessions, session_time)
- **Interacciones**: Features combinadas (engagement_score, avg_revenue_per_purchase)
- **Target Encoding**: Encoding con target para categ√≥ricas de baja cardinalidad
- **Frequency Encoding**: Encoding por frecuencia para top 5 categ√≥ricas

## üíæ Optimizaci√≥n de Memoria
- Procesamiento **lazy** con Dask (no carga todo en RAM)
- Muestreo inteligente (50% por defecto)
- Persistencia en disco (Parquet comprimido)
- RAM necesaria: **~4-8GB** (vs 26GB+ anterior)

## ‚öôÔ∏è Ajustes
- Cambiar rango de fechas: Modificar `filters` en celda 3
- Ajustar muestra: Cambiar `sample_fraction` en celda 7
- M√°s columnas object: Editar `top_priority_cols` en celda 5
- Modelos guardados en: `./models/`
- Submission guardado en: `./outputs/submission.csv`

# üìÇ PASO 1: Cargar Datos de Train

In [36]:
#!pip install dask
#!pip install pyarrow
!pip install pyarrow --upgrade

Defaulting to user installation because normal site-packages is not writeable


In [37]:
import dask
import dask.dataframe as dd

dask.config.set({"dataframe.convert-string": False})

dataset_path = "./smadex-challenge-predict-the-revenue/train/train"
filters = [("datetime", ">=", "2025-10-06-00-00"), ("datetime", "<", "2025-10-20-00-00")]

ddf = dd.read_parquet(
    dataset_path,
    filters=filters
)

ddf = ddf.drop(columns=['buy_d14', 'buy_d28', 'buy_d7', 'buyer_d1', 'buyer_d14', 'buyer_d28', 
                        'iap_revenue_d28', 'iap_revenue_d14', 
                        'registration', 'retention_d1', 'retention_d1_to_d7', 'retention_d3',
                        'retention_d3_to_d7', 'retention_d7_to_d14', 'retentiond7'])

In [38]:
ddf

Unnamed: 0_level_0,buyer_d7,iap_revenue_d7,advertiser_bundle,advertiser_category,advertiser_subcategory,advertiser_bottom_taxonomy_level,carrier,country,region,dev_make,dev_model,dev_os,dev_osv,hour,release_date,release_msrp,weekday,avg_act_days,avg_daily_sessions,avg_days_ins,avg_duration,bcat,bcat_bottom_taxonomy,bundles_cat,bundles_cat_bottom_taxonomy,bundles_ins,city_hist,country_hist,cpm,cpm_pct_rk,ctr,ctr_pct_rk,dev_language_hist,dev_osv_hist,first_request_ts,first_request_ts_bundle,first_request_ts_category_bottom_taxonomy,hour_ratio,iap_revenue_usd_bundle,iap_revenue_usd_category,iap_revenue_usd_category_bottom_taxonomy,last_buy,last_buy_ts_bundle,last_buy_ts_category,last_ins,last_install_ts_bundle,last_install_ts_category,advertiser_actions_action_count,advertiser_actions_action_last_timestamp,user_actions_bundles_action_count,user_actions_bundles_action_last_timestamp,last_advertiser_action,new_bundles,num_buys_bundle,num_buys_category,num_buys_category_bottom_taxonomy,region_hist,rev_by_adv,rwd_prank,user_bundles,user_bundles_l28d,weekend_ratio,weeks_since_first_seen,wifi_ratio,whale_users_bundle_num_buys_prank,whale_users_bundle_revenue_prank,whale_users_bundle_total_num_buys,whale_users_bundle_total_revenue,row_id,datetime
npartitions=24,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1,Unnamed: 61_level_1,Unnamed: 62_level_1,Unnamed: 63_level_1,Unnamed: 64_level_1,Unnamed: 65_level_1,Unnamed: 66_level_1,Unnamed: 67_level_1,Unnamed: 68_level_1,Unnamed: 69_level_1,Unnamed: 70_level_1
,int32,float64,object,object,object,object,object,object,object,object,object,object,object,object,object,int64,int32,float64,object,float64,object,object,object,object,object,object,object,object,object,object,object,object,object,object,int64,object,object,object,object,object,object,int64,object,object,int64,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,float64,int32,float64,object,object,object,object,object,category[known]
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


# üîß PASO 2: Procesar Train y Guardar en Disco

Este paso:
- Aplica feature engineering (ratios, flags)
- Extrae features de columnas object (top 5 prioritarias)
- Guarda resultado en `./processed_train_data/`
- **NO carga todo en RAM** (procesamiento lazy)

In [39]:
# ============================================
# VERSI√ìN DASK - PREPROCESSING SIN COMPUTE
# ============================================
print("=" * 80)
print("PROCESAMIENTO CON DASK (SIN CARGAR TODO EN RAM)")
print("=" * 80)

import dask.dataframe as dd
import numpy as np
import pyarrow as pa

# IMPORTANTE: NO hacer .compute() hasta el final

# ============================================
# PASO 1: FEATURE ENGINEERING EN DASK (LAZY)
# ============================================
print("\n[1/6] Feature engineering b√°sico (lazy)...")

# 1.1 Features de ratio y proporciones
if 'avg_act_days' in ddf.columns and 'weeks_since_first_seen' in ddf.columns:
    ddf['days_active_ratio'] = ddf['avg_act_days'] / (ddf['weeks_since_first_seen'] * 7 + 1e-10)

if 'weekend_ratio' in ddf.columns:
    ddf['is_weekend_user'] = (ddf['weekend_ratio'] > 0.5).astype(int)

if 'wifi_ratio' in ddf.columns:
    ddf['is_wifi_user'] = (ddf['wifi_ratio'] > 0.7).astype(int)

# 1.2 Features de engagement temporal
if 'weeks_since_first_seen' in ddf.columns:
    ddf['user_age_days'] = ddf['weeks_since_first_seen'] * 7
    ddf['is_new_user'] = (ddf['weeks_since_first_seen'] < 1).astype(int)
    ddf['is_veteran_user'] = (ddf['weeks_since_first_seen'] > 12).astype(int)

# 1.3 Features de compra hist√≥rica
if 'last_buy' in ddf.columns:
    ddf['has_previous_purchase'] = (ddf['last_buy'] > 0).astype(int)
    ddf['days_since_last_buy'] = ddf['last_buy']
    ddf['recent_buyer'] = (ddf['last_buy'] < 7).astype(int)

# 1.4 Features de sesiones
if 'total_sessions' in ddf.columns and 'weeks_since_first_seen' in ddf.columns:
    ddf['avg_sessions_per_week'] = ddf['total_sessions'] / (ddf['weeks_since_first_seen'] + 1)

if 'avg_session_time' in ddf.columns and 'total_sessions' in ddf.columns:
    ddf['total_time_in_app'] = ddf['avg_session_time'] * ddf['total_sessions']

print("‚úì 12+ features de engagement a√±adidas (lazy)")
print(f"‚úì Particiones actuales: {ddf.npartitions}")

# ============================================
# PASO 2: PROCESAR COLUMNAS OBJECT (STATS AVANZADAS)
# ============================================
print("\n[2/6] Procesando columnas object...")

def extract_array_stats_advanced(series, prefix):
    """
    Extrae stats completas de arrays/dicts con informaci√≥n m√°s rica
    """
    import ast
    import numpy as np
    import pandas as pd
    import pyarrow as pa
    
    def safe_parse(x):
        try:
            if x is None or (isinstance(x, float) and np.isnan(x)):
                return []
        except (TypeError, ValueError):
            pass
        
        try:
            parsed = ast.literal_eval(str(x))
            if isinstance(parsed, (list, dict)):
                if isinstance(parsed, dict):
                    return list(parsed.values())
                return parsed
        except:
            pass
        return []
    
    def get_numeric_values(arr):
        nums = []
        for val in arr:
            try:
                if val is not None and not (isinstance(val, float) and np.isnan(val)):
                    nums.append(float(val))
            except:
                pass
        return nums
    
    parsed = series.apply(safe_parse, meta=('x', 'object'))
    
    # Stats b√°sicas
    count = parsed.apply(len, meta=('x', 'int64'))
    numeric_vals = parsed.apply(get_numeric_values, meta=('x', 'object'))
    
    # Stats agregadas
    def safe_sum(x): return sum(x) if x else 0
    def safe_max(x): return max(x) if x else 0
    def safe_min(x): return min(x) if x else 0
    def safe_mean(x): return np.mean(x) if x else 0
    def safe_std(x): return np.std(x) if len(x) > 1 else 0
    
    total_sum = numeric_vals.apply(safe_sum, meta=('x', 'float64'))
    total_max = numeric_vals.apply(safe_max, meta=('x', 'float64'))
    total_min = numeric_vals.apply(safe_min, meta=('x', 'float64'))
    total_mean = numeric_vals.apply(safe_mean, meta=('x', 'float64'))
    total_std = numeric_vals.apply(safe_std, meta=('x', 'float64'))
    
    return count, total_sum, total_max, total_min, total_mean, total_std

# Columnas prioritarias (revenue-related)
top_priority_cols = [
    'iap_revenue_usd_bundle',
    'num_buys_bundle',
    'whale_users_bundle_total_revenue',
    'user_bundles',
    'country_hist',
    'advertiser_actions_action_count',
    'user_actions_bundles_action_count'
]

cols_to_process = [col for col in top_priority_cols if col in ddf.columns]

print(f"Procesando {len(cols_to_process)} columnas object...")

for col in cols_to_process:
    try:
        count, total_sum, total_max, total_min, total_mean, total_std = extract_array_stats_advanced(ddf[col], col)
        
        ddf[f'{col}_count'] = count
        ddf[f'{col}_sum'] = total_sum
        ddf[f'{col}_max'] = total_max
        ddf[f'{col}_min'] = total_min
        ddf[f'{col}_mean'] = total_mean
        ddf[f'{col}_std'] = total_std
        
        print(f"  ‚úì {col} ‚Üí 6 features")
    except Exception as e:
        print(f"  ‚ö†Ô∏è  {col} ‚Üí Error: {str(e)[:50]}")

print(f"\n‚úì Features object procesadas (lazy)")

# ============================================
# PASO 3: GUARDAR DATOS PROCESADOS EN DISCO
# ============================================
print("\n[3/6] Guardando datos procesados en disco...")

# Seleccionar solo columnas num√©ricas + target
numeric_cols_dask = ddf.select_dtypes(include=['int64', 'float64', 'int32', 'float32']).columns.tolist()

# Eliminar columnas object para reducir tama√±o
ddf_numeric = ddf[numeric_cols_dask]

# Guardar como parquet (comprimido)
output_path = './processed_train_data'
ddf_numeric.to_parquet(output_path, compression='snappy', overwrite=True, engine='pyarrow')

print(f"‚úì Datos guardados en: {output_path}")
print(f"‚úì Columnas guardadas: {len(numeric_cols_dask)}")

# Liberar memoria
del ddf
import gc
gc.collect()

print("\n‚úì Memoria liberada")

PROCESAMIENTO CON DASK (SIN CARGAR TODO EN RAM)

[1/6] Feature engineering b√°sico (lazy)...
‚úì 12+ features de engagement a√±adidas (lazy)
‚úì Particiones actuales: 24

[2/6] Procesando columnas object...
Procesando 7 columnas object...
  ‚úì iap_revenue_usd_bundle ‚Üí 6 features
  ‚úì num_buys_bundle ‚Üí 6 features
  ‚úì whale_users_bundle_total_revenue ‚Üí 6 features
  ‚úì user_bundles ‚Üí 6 features
  ‚úì country_hist ‚Üí 6 features
  ‚úì advertiser_actions_action_count ‚Üí 6 features
  ‚úì user_actions_bundles_action_count ‚Üí 6 features

‚úì Features object procesadas (lazy)

[3/6] Guardando datos procesados en disco...


NameError: name 'pa' is not defined

# üß™ PASO 3: Preprocessing Avanzado

- Carga datos procesados (50% sample)
- **Binning**: Discretiza variables continuas
- **Interacciones**: Crea features combinadas
- **Target Encoding**: Encode categ√≥ricas con target
- **Frequency Encoding**: Encode frecuencia de categor√≠as

In [4]:
# ============================================
# PASO 3.5: CARGAR Y APLICAR PREPROCESSING AVANZADO
# ============================================
print("\n" + "=" * 80)
print("PREPROCESSING AVANZADO (Target Encoding, Binning, Agregaciones)")
print("=" * 80)

import lightgbm as lgb
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

# Cargar datos procesados
print("\n[1/5] Cargando datos procesados...")
ddf_processed = dd.read_parquet('./processed_train_data')

sample_fraction = 0.5
ddf_sample = ddf_processed.sample(frac=sample_fraction, random_state=42)
df_train = ddf_sample.compute()

print(f"‚úì Datos cargados: {df_train.shape}")

# Separar targets
y_buyer = df_train['buyer_d7'] if 'buyer_d7' in df_train.columns else None
y_revenue = df_train['iap_revenue_d7'] if 'iap_revenue_d7' in df_train.columns else None
X_base = df_train.drop(columns=['buyer_d7', 'iap_revenue_d7'], errors='ignore')

# ============================================
# PREPROCESSING AVANZADO INCREMENTAL
# ============================================

# 4/6: BINNING de variables continuas
print("\n[2/5] Aplicando binning a variables continuas...")

binning_features = {}

# Binning para engagement metrics
if 'weeks_since_first_seen' in X_base.columns:
    X_base['user_age_bin'] = pd.qcut(X_base['weeks_since_first_seen'], q=5, labels=False, duplicates='drop').fillna(-1).astype(int)
    binning_features['user_age_bin'] = True

if 'total_sessions' in X_base.columns:
    X_base['sessions_bin'] = pd.qcut(X_base['total_sessions'], q=5, labels=False, duplicates='drop').fillna(-1).astype(int)
    binning_features['sessions_bin'] = True

if 'avg_session_time' in X_base.columns:
    X_base['session_time_bin'] = pd.qcut(X_base['avg_session_time'], q=5, labels=False, duplicates='drop').fillna(-1).astype(int)
    binning_features['session_time_bin'] = True

print(f"‚úì {len(binning_features)} variables binned")

# 5/6: FEATURES DE INTERACCI√ìN
print("\n[3/5] Creando features de interacci√≥n...")

interaction_count = 0

# Interacciones de engagement
if 'total_sessions' in X_base.columns and 'avg_session_time' in X_base.columns:
    X_base['engagement_score'] = X_base['total_sessions'] * X_base['avg_session_time']
    interaction_count += 1

if 'days_active_ratio' in X_base.columns and 'total_sessions' in X_base.columns:
    X_base['active_sessions_ratio'] = X_base['days_active_ratio'] * X_base['total_sessions']
    interaction_count += 1

# Interacciones de revenue hist√≥rico
if 'iap_revenue_usd_bundle_sum' in X_base.columns and 'num_buys_bundle_count' in X_base.columns:
    X_base['avg_revenue_per_purchase'] = X_base['iap_revenue_usd_bundle_sum'] / (X_base['num_buys_bundle_count'] + 1)
    interaction_count += 1

if 'whale_users_bundle_total_revenue_sum' in X_base.columns and 'user_bundles_count' in X_base.columns:
    X_base['whale_revenue_per_bundle'] = X_base['whale_users_bundle_total_revenue_sum'] / (X_base['user_bundles_count'] + 1)
    interaction_count += 1

print(f"‚úì {interaction_count} features de interacci√≥n creadas")

# 6/6: TARGET ENCODING para categ√≥ricas (si existen)
print("\n[4/5] Aplicando target encoding a variables categ√≥ricas...")

categorical_cols = X_base.select_dtypes(include=['object', 'category']).columns.tolist()

if len(categorical_cols) > 0 and y_buyer is not None:
    from category_encoders import TargetEncoder
    
    # Solo aplicar a categ√≥ricas con cardinalidad manejable
    cat_to_encode = []
    for col in categorical_cols:
        if X_base[col].nunique() < 1000:  # Evitar alta cardinalidad
            cat_to_encode.append(col)
    
    if cat_to_encode:
        # Train/test split para evitar leakage
        X_temp_train, X_temp_val, y_temp_train, y_temp_val = train_test_split(
            X_base[cat_to_encode], y_buyer, test_size=0.2, random_state=42
        )
        
        encoder = TargetEncoder(cols=cat_to_encode, smoothing=1.0)
        encoder.fit(X_temp_train, y_temp_train)
        
        X_encoded = encoder.transform(X_base[cat_to_encode])
        X_encoded.columns = [f'{col}_target_enc' for col in cat_to_encode]
        
        X_base = pd.concat([X_base, X_encoded], axis=1)
        print(f"‚úì {len(cat_to_encode)} columnas target-encoded")
else:
    print("‚úì No hay columnas categ√≥ricas para encodear")

# FREQUENCY ENCODING (alternativa simple)
print("\n[5/5] Aplicando frequency encoding...")

freq_encoded = 0
for col in categorical_cols[:5]:  # Solo top 5 para evitar overhead
    if col in X_base.columns:
        freq = X_base[col].value_counts(normalize=True)
        X_base[f'{col}_freq'] = X_base[col].map(freq).fillna(0)
        freq_encoded += 1

print(f"‚úì {freq_encoded} columnas frequency-encoded")

# ============================================
# RESUMEN FINAL
# ============================================
print("\n" + "=" * 80)
print("RESUMEN DE PREPROCESSING")
print("=" * 80)
print(f"Features originales:     {X_base.shape[1] - len(binning_features) - interaction_count - len(categorical_cols)*2}")
print(f"+ Binning:               {len(binning_features)}")
print(f"+ Interacciones:         {interaction_count}")
print(f"+ Target encoding:       {len(cat_to_encode) if 'cat_to_encode' in locals() else 0}")
print(f"+ Frequency encoding:    {freq_encoded}")
print(f"‚îÄ" * 80)
print(f"Total features finales:  {X_base.shape[1]}")
print(f"Total samples:           {X_base.shape[0]:,}")
print("=" * 80)

# Asignar a X para compatibilidad con celdas siguientes
X = X_base


PREPROCESSING AVANZADO (Target Encoding, Binning, Agregaciones)

[1/5] Cargando datos procesados...

[1/5] Cargando datos procesados...
‚úì Datos cargados: (1653239, 63)
‚úì Datos cargados: (1653239, 63)

[2/5] Aplicando binning a variables continuas...
‚úì 1 variables binned

[3/5] Creando features de interacci√≥n...
‚úì 2 features de interacci√≥n creadas

[4/5] Aplicando target encoding a variables categ√≥ricas...
‚úì No hay columnas categ√≥ricas para encodear

[5/5] Aplicando frequency encoding...
‚úì 0 columnas frequency-encoded

RESUMEN DE PREPROCESSING
Features originales:     61
+ Binning:               1
+ Interacciones:         2
+ Target encoding:       0
+ Frequency encoding:    0
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
Total features finales:  64
Total samples:           1,65

In [None]:
# ============================================
# UNDER SAMPLING CON PROPORCI√ìN 1:4 (nos quedamos 10% del total de datos)
# ============================================
ratio = 4
buyers = X[y_buyer == 1]
non_buyers = X[y_buyer == 0]
n_non_buyers = ratio*len(buyers)
non_buyers_sample = resample(non_buyers, n_samples = n_non_buyers, random_state = 22, replace = False)

X = pd.concat([buyers, non_buyers_sample])
y_buyer = y_buyer.loc[X.index]

In [17]:
# ============================================
# ENTRENAMIENTO TWO-STAGE CON LIGHTGBM
# ============================================
print("\n" + "=" * 80)
print("ENTRENAMIENTO DE MODELOS")
print("=" * 80)

from sklearn.metrics import roc_auc_score, mean_squared_error, mean_absolute_error

# X, y_buyer, y_revenue ya est√°n definidos en la celda anterior
print(f"\nDataset final:")
print(f"  Features: {X.shape[1]}")
print(f"  Samples:  {X.shape[0]:,}")

# ============================================
# TWO-STAGE MODEL: CLASIFICACI√ìN + REGRESI√ìN
# ============================================

# Train/test split
X_train, X_test, y_buyer_train, y_buyer_test, y_rev_train, y_rev_test = train_test_split(
    X, y_buyer, y_revenue, 
    test_size=0.2, 
    random_state=42, 
    stratify=y_buyer
)

print(f"\nTrain: {X_train.shape[0]:,} | Test: {X_test.shape[0]:,}")

# ============================================
# ETAPA 1: CLASIFICADOR (Buyer)
# ============================================
print("\n" + "="*60)
print("ETAPA 1: CLASIFICACI√ìN (buyer_d7)")
print("="*60)

params_buyer = {
    'objective': 'binary',
    'metric': 'auc',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1,
    'random_state': 42,
    'scale_pos_weight': (y_buyer_train == 0).sum() / (y_buyer_train == 1).sum()
}

lgb_train = lgb.Dataset(X_train, y_buyer_train)
lgb_eval = lgb.Dataset(X_test, y_buyer_test, reference=lgb_train)

model_buyer = lgb.train(
    params_buyer,
    lgb_train,
    num_boost_round=200,
    valid_sets=[lgb_eval],
    callbacks=[lgb.early_stopping(stopping_rounds=20), lgb.log_evaluation(period=0)]
)

y_buyer_pred = model_buyer.predict(X_test, num_iteration=model_buyer.best_iteration)
auc = roc_auc_score(y_buyer_test, y_buyer_pred)

print(f"‚úÖ AUC: {auc:.4f}")

# ============================================
# ETAPA 2: REGRESOR (Revenue)
# ============================================
print("\n" + "="*60)
print("ETAPA 2: REGRESI√ìN (iap_revenue_d7)")
print("="*60)

# Solo compradores
X_train_buyers = X_train[y_buyer_train == 1]
y_rev_train_buyers = y_rev_train[y_buyer_train == 1]
# aplicamos transformaci√≥n y = log(1 + y) al target
y_rev_train_buyers = np.log1p(y_rev_train_buyers)
lgb_train_rev = lgb.Dataset(X_train_buyers, y_rev_train_buyers)

print(f"Compradores en train: {len(X_train_buyers):,}")

params_revenue = {
    'objective': 'regression',
    'metric': 'rmse',
    'num_leaves': 31,
    'learning_rate': 0.03,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1,
    'random_state': 42
}

lgb_train_rev = lgb.Dataset(X_train_buyers, y_rev_train_buyers)

model_revenue = lgb.train(
    params_revenue,
    lgb_train_rev,
    num_boost_round=250,
    callbacks=[lgb.log_evaluation(period=0)]
)

# Predicciones combinadas
buyer_threshold = 0.3
predicted_buyers = y_buyer_pred > buyer_threshold
y_rev_pred = np.zeros(len(X_test))
y_rev_pred[predicted_buyers] = model_revenue.predict( # debido a transformaci√≥n inicial predecimos expm1
    X_test[predicted_buyers], 
    num_iteration=model_revenue.best_iteration
)
y_rev_pred_test = np.zeros(len(X_test_final))
y_rev_pred_test[predicted_buyers] = np.expm1(model_revenue.predict(X_test[predicted_buyers]))

rmse = np.sqrt(mean_squared_error(y_rev_test, y_rev_pred))
mae = mean_absolute_error(y_rev_test, y_rev_pred)

print(f"‚úÖ RMSE: ${rmse:.2f}")
print(f"‚úÖ MAE:  ${mae:.2f}")

print("\n" + "="*60)
print("RESUMEN")
print("="*60)
print(f"AUC:  {auc:.4f}")
print(f"RMSE: ${rmse:.2f}")
print(f"MAE:  ${mae:.2f}")

# Guardar modelos
model_buyer.save_model('./models/buyer_classifier_dask.txt')
model_revenue.save_model('./models/revenue_regressor_dask.txt')
print("\n‚úì Modelos guardados en ./models/")


ENTRENAMIENTO DE MODELOS

Dataset final:
  Features: 64
  Samples:  1,653,239

Train: 1,322,591 | Test: 330,648

ETAPA 1: CLASIFICACI√ìN (buyer_d7)

Train: 1,322,591 | Test: 330,648

ETAPA 1: CLASIFICACI√ìN (buyer_d7)
Training until validation scores don't improve for 20 rounds
Training until validation scores don't improve for 20 rounds
Did not meet early stopping. Best iteration is:
[187]	valid_0's auc: 0.722859
Did not meet early stopping. Best iteration is:
[187]	valid_0's auc: 0.722859
‚úÖ AUC: 0.7229

ETAPA 2: REGRESI√ìN (iap_revenue_d7)
‚úÖ AUC: 0.7229

ETAPA 2: REGRESI√ìN (iap_revenue_d7)
Compradores en train: 43,847
Compradores en train: 43,847
‚úÖ RMSE: $399.62
‚úÖ MAE:  $23.16

RESUMEN
AUC:  0.7229
RMSE: $399.62
MAE:  $23.16

‚úì Modelos guardados en ./models/
‚úÖ RMSE: $399.62
‚úÖ MAE:  $23.16

RESUMEN
AUC:  0.7229
RMSE: $399.62
MAE:  $23.16

‚úì Modelos guardados en ./models/


# ü§ñ PASO 4: Entrenar Modelo Two-Stage

- Train/test split
- Entrena clasificador LightGBM (buyer_d7)
- Entrena regresor LightGBM (iap_revenue_d7)
- Guarda modelos en `./models/`

# üéØ PASO 5: Generar Predicciones y Submission

- Carga test procesado
- Alinea features con train
- Genera predicciones
- Crea `./outputs/submission.csv`

In [18]:
# ============================================
# PROCESAR TEST SET CON DASK (VERSI√ìN OPTIMIZADA)
# ============================================
print("=" * 80)
print("PROCESAMIENTO TEST SET - VERSI√ìN DASK")
print("=" * 80)

# ============================================
# PASO 1: CARGAR TEST
# ============================================
print("\n[1/4] Cargando test set...")

test_path = "./smadex-challenge-predict-the-revenue/test/test"
ddf_test = dd.read_parquet(test_path)

print(f"‚úì Test shape: {ddf_test.npartitions} particiones")

# IMPORTANTE: Convertir index a columna para preservar row_id
if 'row_id' not in ddf_test.columns:
    ddf_test = ddf_test.reset_index()
    if 'index' in ddf_test.columns:
        ddf_test = ddf_test.rename(columns={'index': 'row_id'})

print(f"‚úì row_id preservado como columna")

# ============================================
# PASO 2: FEATURE ENGINEERING (LAZY)
# ============================================
print("\n[2/4] Feature engineering...")

if 'avg_act_days' in ddf_test.columns and 'weeks_since_first_seen' in ddf_test.columns:
    ddf_test['days_active_ratio'] = ddf_test['avg_act_days'] / (ddf_test['weeks_since_first_seen'] * 7 + 1e-10)

if 'weekend_ratio' in ddf_test.columns:
    ddf_test['is_weekend_user'] = (ddf_test['weekend_ratio'] > 0.5).astype(int)

if 'wifi_ratio' in ddf_test.columns:
    ddf_test['is_wifi_user'] = (ddf_test['wifi_ratio'] > 0.7).astype(int)

if 'last_buy' in ddf_test.columns:
    ddf_test['has_previous_purchase'] = (ddf_test['last_buy'] > 0).astype(int)

print("‚úì 4 features b√°sicas a√±adidas")

# ============================================
# PASO 3: PROCESAR COLUMNAS OBJECT (LAZY)
# ============================================
print("\n[3/4] Procesando columnas object...")

# Procesar solo top 5 columnas (igual que en train)
top_priority_cols_test = [
    'iap_revenue_usd_bundle',
    'num_buys_bundle',
    'whale_users_bundle_total_revenue',
    'user_bundles',
    'country_hist'
]

cols_to_process_test = [col for col in top_priority_cols_test if col in ddf_test.columns]

for col in cols_to_process_test:
    try:
        count, total_sum, total_max = extract_array_stats_simple(ddf_test[col], col)
        
        ddf_test[f'{col}_count'] = count
        ddf_test[f'{col}_sum'] = total_sum
        ddf_test[f'{col}_max'] = total_max
        
# PASO 4: SELECCIONAR FEATURES NUM√âRICAS + ROW_ID
    except Exception as e:
        print(f"  ‚ö†Ô∏è  {col} ‚Üí Error: {str(e)[:40]}")

# ============================================
# PASO 4: SELECCIONAR FEATURES NUM√âRICAS

# IMPORTANTE: Asegurar que row_id est√© incluido
if 'row_id' in ddf_test.columns and 'row_id' not in numeric_cols_test:
    numeric_cols_test.append('row_id')

ddf_test_numeric = ddf_test[numeric_cols_test]

print(f"‚úì row_id incluido en features guardadas")
# PASO 5: GUARDAR TEST PROCESADO
# ============================================
# PASO 5: GUARDAR TEST PROCESADO CON ROW_ID
# ============================================
print("\nGuardando test procesado...")
output_test_path = './processed_test_data'
ddf_test_numeric.to_parquet(output_test_path, compression='snappy', overwrite=True, write_index=False)

print("\n‚úì Test procesado y guardado en disco")
gc.collect()
del ddf_test
# Liberar memoria
print(f"‚úì Test guardado en: {output_test_path}")# Liberar memoria


PROCESAMIENTO TEST SET - VERSI√ìN DASK

[1/4] Cargando test set...
‚úì Test shape: 96 particiones
‚úì row_id preservado como columna

[2/4] Feature engineering...
‚úì 4 features b√°sicas a√±adidas

[3/4] Procesando columnas object...
  ‚ö†Ô∏è  iap_revenue_usd_bundle ‚Üí Error: name 'extract_array_stats_simple' is not
  ‚ö†Ô∏è  num_buys_bundle ‚Üí Error: name 'extract_array_stats_simple' is not
  ‚ö†Ô∏è  whale_users_bundle_total_revenue ‚Üí Error: name 'extract_array_stats_simple' is not
  ‚ö†Ô∏è  user_bundles ‚Üí Error: name 'extract_array_stats_simple' is not
  ‚ö†Ô∏è  country_hist ‚Üí Error: name 'extract_array_stats_simple' is not
‚úì row_id incluido en features guardadas

Guardando test procesado...
‚úì Test shape: 96 particiones
‚úì row_id preservado como columna

[2/4] Feature engineering...
‚úì 4 features b√°sicas a√±adidas

[3/4] Procesando columnas object...
  ‚ö†Ô∏è  iap_revenue_usd_bundle ‚Üí Error: name 'extract_array_stats_simple' is not
  ‚ö†Ô∏è  num_buys_bundle ‚Üí Error

In [None]:
# ============================================
# GENERAR PREDICCIONES Y SUBMISSION
# ============================================
print("=" * 80)
print("GENERANDO PREDICCIONES FINALES")
print("=" * 80)

# ============================================
# PASO 1: CARGAR TEST PROCESADO
# ============================================
print("\n[1/3] Cargando test procesado...")

ddf_test_processed = dd.read_parquet('./processed_test_data')

# Computar en chunks o completo seg√∫n RAM disponible
df_test = ddf_test_processed.compute()

print(f"‚úì Test cargado: {df_test.shape}")

# IMPORTANTE: Extraer y preservar row_id
if 'row_id' in df_test.columns:
    row_ids = df_test['row_id'].values
    print(f"‚úì row_id extra√≠do: {len(row_ids):,} registros")
    print(f"‚úì IDs √∫nicos: {len(set(row_ids)):,}")
    
    # Verificar duplicados
    if len(row_ids) != len(set(row_ids)):
        print(f"‚ö†Ô∏è  WARNING: Se encontraron {len(row_ids) - len(set(row_ids))} IDs duplicados")
    
    # Eliminar row_id de las features
    df_test = df_test.drop(columns=['row_id'])
else:
    print("‚ö†Ô∏è  WARNING: No se encontr√≥ row_id en el dataset")
    row_ids = None

# ============================================
# PASO 2: ALINEAR FEATURES CON TRAIN
# ============================================
print("\n[2/3] Alineando features con train...")

# Asegurar que tenemos las mismas columnas que en train
X_test_final = pd.DataFrame(index=df_test.index)

for col in X.columns:
    if col in df_test.columns:
        X_test_final[col] = df_test[col]
    else:
        # Si falta una feature, rellenar con 0
        X_test_final[col] = 0
        print(f"  ‚ö†Ô∏è  Missing: {col} ‚Üí Filled with 0")

print(f"‚úì Features alineadas: {X_test_final.shape[1]}")

# ============================================
# PASO 3: GENERAR PREDICCIONES
# ============================================
print("\n[3/3] Generando predicciones...")

# Cargar modelos guardados
model_buyer = lgb.Booster(model_file='./models/buyer_classifier_dask.txt')
model_revenue = lgb.Booster(model_file='./models/revenue_regressor_dask.txt')

# Predicci√≥n de compradores
y_buyer_pred_test = model_buyer.predict(X_test_final)

# Identificar compradores potenciales
buyer_threshold = 0.3
predicted_buyers = y_buyer_pred_test > buyer_threshold

# Predicci√≥n de revenue (solo para compradores)
y_revenue_pred_test = np.zeros(len(X_test_final))

if predicted_buyers.sum() > 0:
    y_revenue_pred_test[predicted_buyers] = model_revenue.predict(X_test_final[predicted_buyers])

# Verificar que row_ids existe y tiene el tama√±o correcto
if row_ids is None:
    raise ValueError("‚ùå ERROR: row_ids no est√° disponible. Ejecuta la celda de procesamiento de test.")

if len(row_ids) != len(y_revenue_pred_test):
    raise ValueError(f"‚ùå ERROR: Mismatch en tama√±os - row_ids: {len(row_ids)}, predicciones: {len(y_revenue_pred_test)}")

submission = pd.DataFrame({
    'row_id': row_ids,
    'iap_revenue_d7': y_revenue_pred_test
})

# Verificar duplicados antes de guardar
duplicates = submission['row_id'].duplicated().sum()
if duplicates > 0:
    print(f"‚ö†Ô∏è  WARNING: {duplicates} IDs duplicados encontrados - eliminando duplicados...")
    submission = submission.drop_duplicates(subset=['row_id'], keep='first')

# Guardar
output_file = './outputs/submission.csv'
submission.to_csv(output_file, index=False)

print(f"\n‚úÖ Archivo: {output_file}")
print(f"‚úÖ Registros: {len(submission):,}")
print(f"\nPrimeras 10 predicciones:")
print(submission.head(10))

print("\n" + "=" * 80)
print("üéâ ¬°LISTO PARA KAGGLE!")
print("=" * 80)

GENERANDO PREDICCIONES FINALES

[1/3] Cargando test procesado...


# üìä CONFIGURACI√ìN Y T√âCNICAS DE PREPROCESSING

## üé® T√©cnicas Implementadas

### ‚úÖ 1. Feature Engineering B√°sico
```python
# Ratios y proporciones
days_active_ratio = avg_act_days / (weeks_since_first_seen * 7)
avg_sessions_per_week = total_sessions / weeks_since_first_seen

# Flags comportamentales
is_new_user = (weeks_since_first_seen < 1)
is_veteran_user = (weeks_since_first_seen > 12)
recent_buyer = (last_buy < 7)
```

### ‚úÖ 2. Extracci√≥n de Object Columns
Convierte listas/dicts en features num√©ricas:
- `count`: N√∫mero de elementos
- `sum, max, min, mean, std`: Estad√≠sticas agregadas

### ‚úÖ 3. Binning (Discretizaci√≥n)
```python
# Variables continuas ‚Üí Categor√≠as
user_age_bin = pd.qcut(weeks_since_first_seen, q=5)
sessions_bin = pd.qcut(total_sessions, q=5)
session_time_bin = pd.qcut(avg_session_time, q=5)
```
**Ventaja**: Captura relaciones no lineales, reduce outliers

### ‚úÖ 4. Features de Interacci√≥n
```python
# Combina m√∫ltiples features
engagement_score = total_sessions * avg_session_time
active_sessions_ratio = days_active_ratio * total_sessions
avg_revenue_per_purchase = iap_revenue_sum / (num_buys + 1)
```
**Ventaja**: Captura sinergias entre variables

### ‚úÖ 5. Target Encoding
```python
# Codifica categor√≠as con media del target
country_encoded = mean(target | country)
```
**Ventaja**: Mejor que label encoding para alta cardinalidad
**Cuidado**: Incluye smoothing y train/val split para evitar leakage

### ‚úÖ 6. Frequency Encoding
```python
# Codifica por frecuencia de aparici√≥n
country_freq = count(country) / total_rows
```
**Ventaja**: Simple, r√°pido, sin leakage

## üîß Par√°metros Configurables

### Rango de Fechas (Celda 3)
```python
# 14 d√≠as completos
filters = [("datetime", ">=", "2025-10-06-00-00"), ("datetime", "<", "2025-10-20-00-00")]

# Solo 1 d√≠a (pruebas r√°pidas)
filters = [("datetime", ">=", "2025-10-06-00-00"), ("datetime", "<", "2025-10-07-00-00")]
```

### Muestreo (Celda 7)
```python
sample_fraction = 0.5  # 50% (recomendado 16GB RAM)
sample_fraction = 0.8  # 80% (m√°s datos, m√°s RAM)
sample_fraction = 0.3  # 30% (menos memoria)
```

### Columnas Object (Celda 5)
A√±ade m√°s columnas a `top_priority_cols`:
```python
top_priority_cols = [
    'iap_revenue_usd_bundle',
    'num_buys_bundle',
    'whale_users_bundle_total_revenue',
    'user_bundles',
    'country_hist',
    'advertiser_actions_action_count',  # Ya incluida
    'user_actions_bundles_action_count'  # Ya incluida
]
```

## üí° Pr√≥ximas Mejoras (Roadmap)

### üéØ Alta Prioridad
- [ ] **RFM Features**: Recency, Frequency, Monetary para comportamiento de compra
- [ ] **Agregaciones por grupo**: Mean/max/std por country, carrier, dev_os
- [ ] **Embeddings categ√≥ricos**: Entity embeddings para alta cardinalidad
- [ ] **Time-based features**: Hour, day_of_week, is_weekend del datetime

### üî¨ Media Prioridad
- [ ] **Feature selection**: Eliminar features redundantes (correlaci√≥n > 0.95)
- [ ] **Normalizaci√≥n**: StandardScaler para features num√©ricas
- [ ] **Polynomial features**: Interacciones de orden 2
- [ ] **Ensemble**: Combinar XGBoost + LightGBM + CatBoost

### ‚ö° Baja Prioridad (optimizaci√≥n)
- [ ] **Hyperparameter tuning**: Optuna para optimizar LightGBM
- [ ] **Cross-validation**: 5-fold CV para estimaci√≥n m√°s robusta
- [ ] **Feature importance**: SHAP values para interpretabilidad

## üìÅ Archivos Generados

| Archivo | Descripci√≥n |
|---------|-------------|
| `./processed_train_data/` | Train procesado (Parquet) |
| `./processed_test_data/` | Test procesado (Parquet) |
| `./models/buyer_classifier_dask.txt` | Clasificador LightGBM |
| `./models/revenue_regressor_dask.txt` | Regresor LightGBM |
| `./outputs/submission.csv` | Submission Kaggle |

## ‚ö†Ô∏è Troubleshooting

**Memory full**: Reducir `sample_fraction` o procesar menos object columns

**Categorical encoding error**: Verificar que las categor√≠as existen en train y test

**Predicciones = 0**: Revisar `buyer_threshold` y que los modelos se guardaron correctamente