In [None]:
import dask
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.metrics import mean_squared_log_error, mean_squared_error
from glob import glob
import os
import time
import random
from itertools import product

# Configurar Dask
dask.config.set({"dataframe.convert-string": False})
client = Client()

# Paths
train_path = '/home/stargix/Desktop/hackathons/datathon/train/train'
test_path = '/home/stargix/Desktop/hackathons/datathon/test/test'



## 1. Carga de Datos (5% sample)

In [None]:
# Cargar datos con Dask (solo 5% de los archivos)
parquet_files_train = glob(os.path.join(train_path, '**/part-*.parquet'), recursive=True)
num_files_train = max(1, int(len(parquet_files_train) * 0.05))
parquet_files_train = parquet_files_train[:num_files_train]

train_ddf = dd.read_parquet(parquet_files_train, engine='pyarrow')
print(f"✓ Train cargado: {num_files_train} archivos (5% del total)")

parquet_files_test = glob(os.path.join(test_path, '**/part-*.parquet'), recursive=True)
num_files_test = max(1, int(len(parquet_files_test) * 0.05))
parquet_files_test = parquet_files_test[:num_files_test]

test_ddf = dd.read_parquet(parquet_files_test, engine='pyarrow')
print(f"✓ Test cargado: {num_files_test} archivos (5% del total)")

# Computar a Pandas
train_df = train_ddf.compute(scheduler='synchronous')
test_df = test_ddf.compute(scheduler='synchronous')

print(f"\nTrain shape: {train_df.shape}")
print(f"Test shape: {test_df.shape}")

✓ Train cargado: 7 archivos (5% del total)
✓ Test cargado: 4 archivos (5% del total)


Exception ignored in: <bound method GCDiagnosis._gc_callback of <distributed.gc.GCDiagnosis object at 0x70f5f2778910>>
Traceback (most recent call last):
  File "/home/stargix/Desktop/hackathons/datathon/.venv/lib/python3.10/site-packages/distributed/gc.py", line 198, in _gc_callback
    def _gc_callback(self, phase, info):
KeyboardInterrupt: 
2025-11-16 00:24:17,994 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/stargix/Desktop/hackathons/datathon/.venv/lib/python3.10/site-packages/distributed/comm/tcp.py", line 226, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/stargix/Desktop/hackathons/datathon/.venv/lib/python3.10/site-packages/distributed/worker.py", line 1273, in heartbeat
    response = await r

## 2. Preprocesado y Feature Engineering

In [None]:
# Identificar columnas a excluir
labels_to_exclude = ['buyer_d7', 'iap_revenue_d7', 'row_id', 'datetime']

# Eliminar columnas con >70% missings
missing_pct = train_df.isnull().sum() / len(train_df)
high_missing_cols = missing_pct[missing_pct > 0.7].index.tolist()
print(f"Columnas con >70% missings: {len(high_missing_cols)}")

# Features categóricas principales
cat_features = ['advertiser_bundle', 'advertiser_category', 'advertiser_subcategory', 
                'country', 'region', 'dev_make', 'dev_model', 'dev_os', 'dev_osv']
cat_features = [col for col in cat_features if col in train_df.columns]

# Features numéricas
numeric_cols = train_df.select_dtypes(include=[np.number]).columns.tolist()

# Combinar features
features = list(set(cat_features + numeric_cols))
features = [col for col in features if col not in labels_to_exclude + high_missing_cols]

print(f"\nTotal features: {len(features)}")
print(f"  Categóricas: {len([f for f in features if f in cat_features])}")
print(f"  Numéricas: {len([f for f in features if f not in cat_features])}")
print(f"\nTasa de conversión: {train_df['buyer_d7'].mean():.4f}")

## 3. Train/Validation Split (Time-based)

In [None]:
# Split temporal (80% train, 20% val)
train_df['datetime'] = pd.to_datetime(train_df['datetime'].astype(str))
cutoff = train_df['datetime'].quantile(0.8)
val_mask = train_df['datetime'] >= cutoff

# Preparar datos
X_train = train_df[~val_mask][features].copy()
X_val = train_df[val_mask][features].copy()
y_train = train_df[~val_mask]['iap_revenue_d7'].copy()
y_val = train_df[val_mask]['iap_revenue_d7'].copy()

print(f"Train: {len(X_train):,} samples")
print(f"Val: {len(X_val):,} samples")

## 4. Limpieza de Datos

In [None]:
# Convertir columnas object a numéricas (excepto categóricas)
for col in X_train.select_dtypes(include=['object']).columns:
    if col not in cat_features:
        X_train[col] = pd.to_numeric(X_train[col], errors='coerce')
        X_val[col] = pd.to_numeric(X_val[col], errors='coerce')

# Rellenar NaN
X_train = X_train.fillna(0)
X_val = X_val.fillna(0)

# Asegurar tipo category
for col in cat_features:
    if col in X_train.columns:
        X_train[col] = X_train[col].astype('category')
        X_val[col] = X_val[col].astype('category')

# Transform target con log1p
y_train_log = np.log1p(y_train)
y_val_log = np.log1p(y_val)

print("✓ Datos limpios y preparados")

## 5. Modelo Baseline (sin optimizar)

In [None]:
# Parámetros baseline
params_baseline = {
    'objective': 'regression',
    'metric': 'rmse',
    'learning_rate': 0.05,
    'num_leaves': 31,
    'max_depth': -1,
    'min_data_in_leaf': 20,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': -1,
    'device': 'cpu'
}

train_ds = lgb.Dataset(X_train, label=y_train_log, categorical_feature=cat_features)
val_ds = lgb.Dataset(X_val, label=y_val_log, reference=train_ds)

model_baseline = lgb.train(
    params_baseline,
    train_ds,
    num_boost_round=2000,
    valid_sets=[train_ds, val_ds],
    callbacks=[lgb.early_stopping(stopping_rounds=100)]
)

# Evaluar baseline
pred_baseline_log = model_baseline.predict(X_val)
pred_baseline = np.expm1(pred_baseline_log).clip(0, None)
msle_baseline = mean_squared_log_error(y_val, pred_baseline)
rmse_baseline = np.sqrt(mean_squared_error(y_val, pred_baseline))

print(f"\n{'='*60}")
print("BASELINE MODEL")
print(f"{'='*60}")
print(f"MSLE: {msle_baseline:.6f}")
print(f"RMSE: ${rmse_baseline:.2f}")
print(f"{'='*60}")

## 6. Grid Search - Optimización de Hiperparámetros

In [None]:
# Definir grid de hiperparámetros
param_grid = {
    'learning_rate': [0.03, 0.05, 0.1],
    'num_leaves': [31, 63, 127],
    'max_depth': [-1, 10, 20],
    'min_data_in_leaf': [20, 50, 100],
    'feature_fraction': [0.7, 0.8, 0.9],
    'bagging_fraction': [0.7, 0.8, 0.9],
}

base_params = {
    'objective': 'regression',
    'metric': 'rmse',
    'bagging_freq': 5,
    'verbose': -1,
    'device': 'cpu'
}

# Generar combinaciones aleatorias
all_combinations = list(product(
    param_grid['learning_rate'],
    param_grid['num_leaves'],
    param_grid['max_depth'],
    param_grid['min_data_in_leaf'],
    param_grid['feature_fraction'],
    param_grid['bagging_fraction']
))

# Probar 30 combinaciones aleatorias
n_trials = min(30, len(all_combinations))
random_combinations = random.sample(all_combinations, n_trials)

print(f"{'='*60}")
print(f"GRID SEARCH - {n_trials} combinaciones")
print(f"{'='*60}")

best_msle = float('inf')
best_params = None
results = []
start_time = time.time()

for i, (lr, n_leaves, depth, min_leaf, feat_frac, bag_frac) in enumerate(random_combinations, 1):
    # Configurar parámetros
    current_params = base_params.copy()
    current_params.update({
        'learning_rate': lr,
        'num_leaves': n_leaves,
        'max_depth': depth,
        'min_data_in_leaf': min_leaf,
        'feature_fraction': feat_frac,
        'bagging_fraction': bag_frac
    })
    
    # Entrenar
    train_ds_gs = lgb.Dataset(X_train, label=y_train_log, categorical_feature=cat_features)
    val_ds_gs = lgb.Dataset(X_val, label=y_val_log, reference=train_ds_gs)
    
    model = lgb.train(
        current_params,
        train_ds_gs,
        num_boost_round=1000,
        valid_sets=[val_ds_gs],
        callbacks=[lgb.early_stopping(stopping_rounds=50, verbose=False)]
    )
    
    # Evaluar
    pred_log = model.predict(X_val)
    pred = np.expm1(pred_log).clip(0, None)
    msle = mean_squared_log_error(y_val, pred)
    
    # Guardar resultado
    results.append({
        'learning_rate': lr,
        'num_leaves': n_leaves,
        'max_depth': depth,
        'min_data_in_leaf': min_leaf,
        'feature_fraction': feat_frac,
        'bagging_fraction': bag_frac,
        'msle': msle,
        'n_iterations': model.best_iteration
    })
    
    if msle < best_msle:
        best_msle = msle
        best_params = current_params.copy()
        print(f"  [{i}/{n_trials}] ⭐ Nuevo mejor MSLE: {msle:.6f}")
    else:
        if i % 5 == 0:
            print(f"  [{i}/{n_trials}] MSLE: {msle:.6f}")

elapsed = time.time() - start_time
print(f"\n✓ Grid Search completado en {elapsed/60:.1f} minutos")

## 7. Resultados del Grid Search

In [None]:
# Mostrar top 10 mejores configuraciones
results_df = pd.DataFrame(results).sort_values('msle')

print(f"\n{'='*60}")
print("TOP 10 MEJORES CONFIGURACIONES")
print(f"{'='*60}")
print(results_df.head(10).to_string(index=False))

# Guardar resultados completos
results_df.to_csv('/home/stargix/Desktop/hackathons/datathon/sergi/grid_search_results_clean.csv', index=False)
print(f"\n✓ Resultados guardados en: grid_search_results_clean.csv")

## 8. Modelo Final con Mejores Hiperparámetros

In [None]:
# Entrenar modelo final
train_ds_final = lgb.Dataset(X_train, label=y_train_log, categorical_feature=cat_features)
val_ds_final = lgb.Dataset(X_val, label=y_val_log, reference=train_ds_final)

model_final = lgb.train(
    best_params,
    train_ds_final,
    num_boost_round=2000,
    valid_sets=[train_ds_final, val_ds_final],
    callbacks=[lgb.early_stopping(stopping_rounds=100)]
)

# Evaluar modelo optimizado
pred_final_log = model_final.predict(X_val)
pred_final = np.expm1(pred_final_log).clip(0, None)
msle_final = mean_squared_log_error(y_val, pred_final)
rmse_final = np.sqrt(mean_squared_error(y_val, pred_final))

print(f"\n{'='*60}")
print("COMPARACIÓN: BASELINE vs OPTIMIZADO")
print(f"{'='*60}")
print(f"Baseline:   MSLE={msle_baseline:.6f}, RMSE=${rmse_baseline:.2f}")
print(f"Optimizado: MSLE={msle_final:.6f}, RMSE=${rmse_final:.2f}")
print(f"Mejora:     {((msle_baseline - msle_final) / msle_baseline * 100):+.2f}%")
print(f"{'='*60}")

print(f"\nMejores hiperparámetros:")
for param, value in best_params.items():
    if param not in ['objective', 'metric', 'verbose', 'device']:
        print(f"  {param}: {value}")

## 9. Feature Importance

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(12, 8))
lgb.plot_importance(model_final, max_num_features=30, ax=ax)
ax.set_title('Top 30 Features más importantes')
plt.tight_layout()
plt.show()

## 10. Predicción en Test Set

In [None]:
# Preparar test data
X_test = test_df[features].copy()

# Limpiar test data (mismo proceso que train)
for col in X_test.select_dtypes(include=['object']).columns:
    if col not in cat_features:
        X_test[col] = pd.to_numeric(X_test[col], errors='coerce')

X_test = X_test.fillna(0)

for col in cat_features:
    if col in X_test.columns:
        X_test[col] = X_test[col].astype('category')

# Predecir
pred_test_log = model_final.predict(X_test)
pred_test = np.expm1(pred_test_log).clip(0, None)

print(f"Predicciones generadas: {len(pred_test):,}")
print(f"Revenue promedio predicho: ${pred_test.mean():.2f}")
print(f"Revenue mediano predicho: ${np.median(pred_test):.2f}")

## 11. Crear Submission

In [None]:
# Crear submission
submission = pd.DataFrame({
    'row_id': test_df['row_id'],
    'iap_revenue_d7': pred_test
})

# Guardar
submission_path = '/home/stargix/Desktop/hackathons/datathon/sergi/submission_clean_grid.csv'
submission.to_csv(submission_path, index=False)

print(f"✓ Submission guardado: {submission_path}")
print(f"\nEstadísticas de la submission:")
print(submission['iap_revenue_d7'].describe())