# 0) Requirements

In [1]:
# Install required packages
%pip install -q lightgbm dask "dask[distributed]" dask-ml tqdm

Note: you may need to restart the kernel to use updated packages.


# 1) Imports & Settings

In [2]:
from pathlib import Path
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from tqdm import tqdm
import lightgbm as lgb
import pandas as pd
import joblib
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import numpy as np

# 2) Paths & Global params

In [3]:
# -----------------------
# Paths
# -----------------------
DATA_DIR = Path('data')
TRAIN_CSV = DATA_DIR / 'ctr_train.csv'
TEST_CSV = DATA_DIR / 'ctr_test.csv'
SAMPLE_SUB = DATA_DIR / 'ctr_sample_submission.csv'
OUTPUT_DIR = Path('output')
OUTPUT_DIR.mkdir(exist_ok=True)

TARGET = 'click'
ID_COL = 'id'
IDX_COL = 'idx'

# 3) Load data

In [None]:
# -----------------------
# Dask Cluster Setup
# -----------------------
cluster = LocalCluster(
    n_workers=4,              # Adjust to your CPU cores
    threads_per_worker=4,
    processes=False,
    memory_limit="6GB",       # 6GB per worker, safe for your 20GB RAM
)
client = Client(cluster)
print(client)

<Client: 'inproc://192.168.100.2/15668/1' processes=4 threads=16, memory=22.35 GiB>




In [5]:
# -----------------------
# Load Data with Dask
# -----------------------
print("Loading training data...")
train = dd.read_csv(TRAIN_CSV, assume_missing=True, blocksize="64MB")
print(f"Train partitions (before): {train.npartitions}")

print("Loading test data...")
test = dd.read_csv(TEST_CSV, assume_missing=True, blocksize="64MB")
print(f"Test partitions (before): {test.npartitions}")

# Repartition to smaller chunks (better parallelism)
train = train.repartition(partition_size="32MB")
test = test.repartition(partition_size="32MB")
print(f"Train partitions (after): {train.npartitions}")
print(f"Test partitions (after): {test.npartitions}")

Loading training data...
Train partitions (before): 98
Loading test data...
Test partitions (before): 1
Train partitions (after): 392
Test partitions (after): 1


In [6]:
# -----------------------
# Feature Preparation
# -----------------------
print("Preparing features...")
categorical_cols = [c for c in train.columns if c not in [TARGET, ID_COL, IDX_COL]]

# Convert to category to save memory
for col in tqdm(categorical_cols, desc="Converting to category"):
    train[col] = train[col].astype('category')
    test[col] = test[col].astype('category')

Preparing features...


Converting to category: 100%|██████████| 22/22 [00:00<00:00, 92.13it/s]


In [None]:
# -----------------------
# Train/Validation Split
# -----------------------

from dask_ml.model_selection import train_test_split

X = train[categorical_cols]
y = train[TARGET]

X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=0.1, random_state=42, shuffle=True
)

# Compute only when ready to train
X_train, X_valid, y_train, y_valid = (
    X_train.compute(),
    X_valid.compute(),
    y_train.compute(),
    y_valid.compute(),
)


In [None]:
# -----------------------
# LightGBM Dataset
# -----------------------
dtrain = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_cols)
dvalid = lgb.Dataset(X_valid, label=y_valid, categorical_feature=categorical_cols)

params = {
    'objective': 'binary',
    'metric': 'auc',
    'boosting_type': 'gbdt',
    'learning_rate': 0.05,
    'num_leaves': 64,
    'max_bin': 255,
    'verbose': -1,
    'device': 'gpu' if lgb.has_gpu() else 'cpu'
}

print("Training model...")
model = lgb.train(
    params,
    dtrain,
    valid_sets=[dtrain, dvalid],
    valid_names=['train', 'valid'],
    num_boost_round=200,
    early_stopping_rounds=30,
    verbose_eval=50
)

In [None]:
# -----------------------
# Validation AUC & ROC Curve
# -----------------------
print("Evaluating on validation set...")
y_pred_valid = model.predict(X_valid)
auc_score = roc_auc_score(y_valid, y_pred_valid)
print(f"Validation ROC-AUC: {auc_score:.5f}")

fpr, tpr, _ = roc_curve(y_valid, y_pred_valid)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"AUC = {auc_score:.5f}")
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="lower right")
plt.grid(True)
roc_path = OUTPUT_DIR / "roc_curve.png"
plt.savefig(roc_path)
plt.close()
print(f"ROC curve saved to {roc_path}")

In [None]:
# -----------------------
# Save Model
# -----------------------
model_path = OUTPUT_DIR / "lgbm_ctr_model.txt"
model.save_model(model_path)
print(f"Model saved to {model_path}")

In [None]:
# -----------------------
# Predictions for Submission
# -----------------------
print("Predicting on test set...")
test_df = test.compute()
y_pred_test = model.predict(test_df[categorical_cols])

sample_sub = pd.read_csv(SAMPLE_SUB)
sample_sub['click'] = y_pred_test
submission_path = OUTPUT_DIR / "ctr_submission.csv"
sample_sub.to_csv(submission_path, index=False)
print(f"✅ Submission saved to {submission_path}")