In [None]:
# ============================================================
# COMPLETE CRITEO PIPELINE
# Preprocessing + FT-AFM Training
# Based on your Avazu pipeline
# ============================================================
import os, json, math, random, numpy as np, pandas as pd, torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score, log_loss
from sklearn.model_selection import KFold, train_test_split
from torch.amp import GradScaler, autocast

# ============== Setup ==============
def set_seed(seed=42):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
set_seed(42)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_GPUS = torch.cuda.device_count()

print(f"GPUs available: {NUM_GPUS}")

# ============== Models (Copy from your FT-AFM script) ==============
class ImprovedAFM(nn.Module):
    """AFM with larger attention dimension and dropout"""
    def __init__(self, d, attn_dim=64, dropout=0.1):
        super().__init__()
        self.W = nn.Linear(d, attn_dim, bias=False)
        self.h = nn.Linear(attn_dim, 1, bias=False)
        self.dropout = nn.Dropout(dropout)
    def forward(self, E):
        B,F,d = E.shape; pairs=[]
        for i in range(F):
            for j in range(i+1, F):
                pairs.append(E[:,i]*E[:,j])
        P = torch.stack(pairs, dim=1)
        P = self.dropout(P)
        A = torch.softmax(self.h(torch.tanh(self.W(P))), dim=1)
        return (A * P).sum(dim=1)

class FeatureTokenizer(nn.Module):
    def __init__(self, cat_cardinalities, n_num, d_model):
        super().__init__()
        self.cat_embs = nn.ModuleList([nn.Embedding(card, d_model) for card in cat_cardinalities])
        self.num_proj = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num)])
        self.cls = nn.Parameter(torch.zeros(1,1,d_model)); nn.init.trunc_normal_(self.cls, std=0.02)
    def forward(self, x_cat, x_num):
        B = x_cat.size(0)
        cat_tokens = [emb(x_cat[:, i]) for i, emb in enumerate(self.cat_embs)]
        num_tokens = [proj(x_num[:, i:i+1]) for i, proj in enumerate(self.num_proj)]
        field_embs = torch.stack(cat_tokens + num_tokens, dim=1)
        cls = self.cls.expand(B, -1, -1)
        tokens = torch.cat([cls, field_embs], dim=1)
        return tokens, field_embs

class FTTransformer(nn.Module):
    def __init__(self, d_model=192, nhead=8, ff=512, n_layers=3, dropout=0.15):
        super().__init__()
        enc = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=ff,
                                         dropout=dropout, batch_first=True, activation="gelu", norm_first=True)
        self.encoder = nn.TransformerEncoder(enc, num_layers=n_layers)
    def forward(self, tokens): return self.encoder(tokens)

class ImprovedFTAFM(nn.Module):
    """Improved FT+AFM with larger capacity and better fusion"""
    def __init__(self, cat_cards, n_num, d_model=192, nhead=8, ff=512, n_layers=3, dropout=0.15, afm_attn_dim=64):
        super().__init__()
        self.tok = FeatureTokenizer(cat_cards, n_num, d_model)
        self.backbone = FTTransformer(d_model, nhead, ff, n_layers, dropout)
        self.afm = ImprovedAFM(d_model, afm_attn_dim, dropout=dropout)
        
        # Larger head
        fusion_dim = d_model + d_model
        self.head = nn.Sequential(
            nn.Linear(fusion_dim, 256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1)
        )
        
    def forward(self, x_cat, x_num):
        tokens, field_embs = self.tok(x_cat, x_num)
        H = self.backbone(tokens); h_cls = H[:,0,:]
        v_afm = self.afm(field_embs)
        z = torch.cat([h_cls, v_afm], dim=1)
        return self.head(z).squeeze(1)


# ============== Utils ==============
def ensure_dir(p): os.makedirs(p, exist_ok=True)

def save_json(obj, path):
    with open(path, "w") as f:
        json.dump(obj, f, indent=2)

def kfold_target_encode(df_tr, df_va, col, yname, n_splits=5, min_samples=50):
    prior = float(df_tr[yname].mean())
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    te_tr = pd.Series(np.zeros(len(df_tr), dtype="float32"), index=df_tr.index)
    for tr_idx, hold_idx in kf.split(df_tr):
        cur = df_tr.iloc[tr_idx]
        means = cur.groupby(col)[yname].mean()
        cnts  = cur.groupby(col)[yname].size()
        m = (means*cnts + prior*min_samples) / (cnts + min_samples)
        te_tr.iloc[hold_idx] = df_tr.iloc[hold_idx][col].map(m).fillna(prior).astype("float32")
    means = df_tr.groupby(col)[yname].mean()
    cnts  = df_tr.groupby(col)[yname].size()
    mfull = (means*cnts + prior*min_samples) / (cnts + min_samples)
    te_va = df_va[col].map(mfull).fillna(prior).astype("float32")
    return te_tr, te_va

# ============== Criteo Preprocessing ==============
def preprocess_criteo(data_path, output_dir, sample_size=None):
    """
    Preprocess Criteo dataset
    
    Criteo format:
    - Column 0: Label (0/1)
    - Columns 1-13: I1-I13 (numerical features)
    - Columns 14-39: C1-C26 (categorical features)
    
    Args:
        data_path: Path to Criteo train.txt file
        output_dir: Where to save preprocessed data
        sample_size: If set, use only this many rows (for testing)
    """
    print("\n" + "="*80)
    print("CRITEO PREPROCESSING")
    print("="*80)
    
    ensure_dir(output_dir)
    
    # Load data
    print(f"Loading from: {data_path}")
    if sample_size:
        df = pd.read_csv(data_path, sep='\t', header=None, nrows=sample_size)
        print(f"Using {sample_size:,} samples for testing")
    else:
        df = pd.read_csv(data_path, sep='\t', header=None)
    
    print(f"Loaded {len(df):,} rows, {len(df.columns)} columns")
    
    # Name columns
    df.columns = ['label'] + [f'I{i}' for i in range(1,14)] + [f'C{i}' for i in range(1,27)]
    
    print(f"CTR: {df['label'].mean():.4f}")
    
    # Handle missing values
    print("Handling missing values...")
    for col in df.columns:
        if col.startswith('I'):
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('float32')
        elif col.startswith('C'):
            df[col] = df[col].fillna('missing').astype(str)
    
    # 8:1:1 split (random - Criteo has no temporal ordering)
    print("Splitting data (8:1:1)...")
    df_tr, df_tmp = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)
    df_va, df_te = train_test_split(df_tmp, test_size=0.5, stratify=df_tmp['label'], random_state=42)
    df_tr, df_va, df_te = df_tr.copy(), df_va.copy(), df_te.copy()
    
    print(f"Train: {len(df_tr):,}, Val: {len(df_va):,}, Test: {len(df_te):,}")
    
    # Feature lists
    num_cols = [f'I{i}' for i in range(1,14)]
    cat_cols = [f'C{i}' for i in range(1,27)]
    
    # Categorical encoding with frequency filtering
    print("Encoding categorical features...")
    cat_cards = []
    MIN_FREQ = 10
    
    for c in cat_cols:
        vc = df_tr[c].value_counts()
        frequent = vc[vc >= MIN_FREQ].index
        mapping = {v:i for i,v in enumerate(frequent)}
        unk_id = len(mapping)
        
        df_tr[c] = df_tr[c].map(mapping).fillna(unk_id).astype('int64')
        df_va[c] = df_va[c].map(mapping).fillna(unk_id).astype('int64')
        df_te[c] = df_te[c].map(mapping).fillna(unk_id).astype('int64')
        
        cat_cards.append(unk_id + 1)
        print(f"  {c}: {unk_id+1} categories")
    
    # Single frequency features
    print("Engineering frequency features...")
    freq_candidates = ['C1', 'C14', 'C17', 'C20', 'C21']  # High-cardinality fields
    for c in freq_candidates:
        vc = df_tr[c].value_counts()
        df_tr[f"{c}_freq"] = df_tr[c].map(vc).astype('float32')
        df_va[f"{c}_freq"] = df_va[c].map(vc).fillna(0).astype('float32')
        df_te[f"{c}_freq"] = df_te[c].map(vc).fillna(0).astype('float32')
        num_cols.append(f"{c}_freq")
    
    # Target encoding
    print("Target encoding...")
    te_candidates = ['C1', 'C14', 'C20']
    for c in te_candidates:
        te_tr, te_va = kfold_target_encode(df_tr, df_va, c, 'label', n_splits=5, min_samples=50)
        prior = float(df_tr['label'].mean())
        means = df_tr.groupby(c)['label'].mean()
        cnts  = df_tr.groupby(c)['label'].size()
        mfull = (means*cnts + prior*50) / (cnts + 50)
        te_te = df_te[c].map(mfull).fillna(prior).astype('float32')
        
        df_tr[f"{c}_te"] = te_tr.astype('float32')
        df_va[f"{c}_te"] = te_va.astype('float32')
        df_te[f"{c}_te"] = te_te.astype('float32')
        num_cols.append(f"{c}_te")
    
    # Log1p frequency features
    freq_cols = [c for c in num_cols if c.endswith('_freq')]
    for c in freq_cols:
        for d in (df_tr, df_va, df_te):
            d[c] = np.log1p(d[c].clip(lower=0))
    
    # Standardize all numerics
    print("Standardizing numerics...")
    num_means = {c: float(df_tr[c].mean()) for c in num_cols}
    num_stds  = {c: float(df_tr[c].std()) for c in num_cols}
    
    for c in num_cols:
        mu, sd = num_means[c], (num_stds[c] if num_stds[c] > 1e-8 else 1.0)
        for d in (df_tr, df_va, df_te):
            d[c] = ((d[c] - mu)/sd).clip(-5, 5).astype('float32')  # Clip outliers
    
    # Save preprocessed arrays
    print("Saving preprocessed data...")
    np.save(f'{output_dir}/Xc_train.npy', df_tr[cat_cols].to_numpy())
    np.save(f'{output_dir}/Xn_train.npy', df_tr[num_cols].to_numpy().astype('float32'))
    np.save(f'{output_dir}/y_train.npy', df_tr['label'].to_numpy().astype('float32'))
    
    np.save(f'{output_dir}/Xc_val.npy', df_va[cat_cols].to_numpy())
    np.save(f'{output_dir}/Xn_val.npy', df_va[num_cols].to_numpy().astype('float32'))
    np.save(f'{output_dir}/y_val.npy', df_va['label'].to_numpy().astype('float32'))
    
    np.save(f'{output_dir}/Xc_test.npy', df_te[cat_cols].to_numpy())
    np.save(f'{output_dir}/Xn_test.npy', df_te[num_cols].to_numpy().astype('float32'))
    np.save(f'{output_dir}/y_test.npy', df_te['label'].to_numpy().astype('float32'))
    
    schema = {
        'cat_cards': cat_cards,
        'num_cols': num_cols,
        'cat_cols': cat_cols,
        'train_ctr': float(df_tr['label'].mean()),
        'val_ctr': float(df_va['label'].mean()),
        'test_ctr': float(df_te['label'].mean())
    }
    save_json(schema, f'{output_dir}/schema.json')
    
    print(f"✅ Preprocessing complete!")
    print(f"   Categorical features: {len(cat_cols)}")
    print(f"   Numerical features: {len(num_cols)}")
    print(f"   Saved to: {output_dir}/")
    
    return cat_cards, num_cols

# ============== Training (Copy from your FT-AFM script) ==============
class CTRDataset(Dataset):
    def __init__(self, Xc, Xn, y):
        self.Xc = torch.as_tensor(Xc, dtype=torch.long)
        self.Xn = torch.as_tensor(Xn, dtype=torch.float32)
        self.y  = torch.as_tensor(y,  dtype=torch.float32)
    def __len__(self): return len(self.y)
    def __getitem__(self, i): return self.Xc[i], self.Xn[i], self.y[i].unsqueeze(-1)

def evaluate(model, dl):
    model.eval(); ys, ps = [], []
    with torch.no_grad():
        for Xc, Xn, yb in dl:
            batch_probs = []
            for i in range(0, Xc.size(0), 2048):
                Xc_chunk = Xc[i:i+2048].to(DEVICE)
                Xn_chunk = Xn[i:i+2048].to(DEVICE)
                logits = model(Xc_chunk, Xn_chunk)
                if logits.dim()==1: logits = logits.unsqueeze(1)
                probs = torch.sigmoid(logits).squeeze(-1).cpu()
                batch_probs.append(probs)
                del Xc_chunk, Xn_chunk, logits, probs
                torch.cuda.empty_cache()
            probs = torch.cat(batch_probs)
            ys.append(yb.squeeze(-1).numpy())
            ps.append(probs.numpy())
    y_true = np.concatenate(ys); y_prob = np.clip(np.concatenate(ps), 1e-7, 1-1e-7)
    return roc_auc_score(y_true, y_prob), log_loss(y_true, y_prob)

def train_criteo_ft_afm(data_dir, output_dir, max_epochs=15, patience=5):
    """Train FT-AFM on preprocessed Criteo data"""
    print("\n" + "="*80)
    print("TRAINING FT-AFM ON CRITEO")
    print("="*80)
    
    ensure_dir(output_dir)
    
    # Load preprocessed data
    print("Loading preprocessed data...")
    Xc_tr = np.load(f'{data_dir}/Xc_train.npy')
    Xn_tr = np.load(f'{data_dir}/Xn_train.npy')
    y_tr = np.load(f'{data_dir}/y_train.npy')
    
    Xc_va = np.load(f'{data_dir}/Xc_val.npy')
    Xn_va = np.load(f'{data_dir}/Xn_val.npy')
    y_va = np.load(f'{data_dir}/y_val.npy')
    
    Xc_te = np.load(f'{data_dir}/Xc_test.npy')
    Xn_te = np.load(f'{data_dir}/Xn_test.npy')
    y_te = np.load(f'{data_dir}/y_test.npy')
    
    with open(f'{data_dir}/schema.json') as f:
        schema = json.load(f)
    
    cat_cards = schema['cat_cards']
    n_num = len(schema['num_cols'])
    
    print(f"Train: {len(y_tr):,}, Val: {len(y_va):,}, Test: {len(y_te):,}")
    print(f"Features: {len(cat_cards)} cat + {n_num} num")
    
    # Data loaders
    batch_size = 2048 * NUM_GPUS if NUM_GPUS > 1 else 2048
    tr_dl = DataLoader(CTRDataset(Xc_tr, Xn_tr, y_tr), batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    va_dl = DataLoader(CTRDataset(Xc_va, Xn_va, y_va), batch_size=batch_size*2, shuffle=False, num_workers=4, pin_memory=True)
    te_dl = DataLoader(CTRDataset(Xc_te, Xn_te, y_te), batch_size=batch_size*2, shuffle=False, num_workers=4, pin_memory=True)
    
    # Build model
    model = ImprovedFTAFM(cat_cards, n_num, d_model=192, nhead=8, ff=512, n_layers=3, dropout=0.15, afm_attn_dim=64).to(DEVICE)
    
    if NUM_GPUS > 1:
        model = nn.DataParallel(model)
    
    # Initialize bias
    base_ctr = float(y_tr.mean())
    last_linear = model.module.head[-1] if isinstance(model, nn.DataParallel) else model.head[-1]
    with torch.no_grad():
        last_linear.bias.fill_(math.log(base_ctr/(1.0-base_ctr)))
    
    # Training
    opt = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=5e-5)
    scaler = GradScaler(device='cuda', enabled=torch.cuda.is_available())
    best_ll, best_state, stale = float("inf"), None, 0
    history = []
    
    for ep in range(1, max_epochs+1):
        model.train(); run = 0.0
        for Xc, Xn, yb in tr_dl:
            Xc, Xn, yb = Xc.to(DEVICE), Xn.to(DEVICE), yb.to(DEVICE)
            opt.zero_grad(set_to_none=True)
            with autocast(device_type='cuda', enabled=torch.cuda.is_available()):
                logits = model(Xc, Xn)
                if logits.dim()==1: logits = logits.unsqueeze(1)
                loss = F.binary_cross_entropy_with_logits(logits, yb)
            scaler.scale(loss).backward()
            scaler.step(opt); scaler.update()
            run += loss.item() * yb.size(0)
        
        val_auc, val_ll = evaluate(model, va_dl)
        train_ll = run / len(tr_dl.dataset)
        improved = val_ll < best_ll
        
        if improved:
            best_ll = val_ll; stale = 0
            model_to_save = model.module if isinstance(model, nn.DataParallel) else model
            best_state = {k: v.cpu().clone() for k,v in model_to_save.state_dict().items()}
        else:
            stale += 1
        
        history.append({"epoch": ep, "train_ll": train_ll, "val_ll": val_ll, "val_auc": val_auc})
        print(f"Epoch {ep:02d} | train_ll={train_ll:.4f} val_ll={val_ll:.4f} val_auc={val_auc:.4f} {'*BEST*' if improved else f'stale {stale}/{patience}'}")
        
        if stale >= patience:
            print("Early stopped.")
            break
    
    # Restore best model
    if best_state:
        model_to_load = model.module if isinstance(model, nn.DataParallel) else model
        model_to_load.load_state_dict(best_state)
        torch.save(best_state, f'{output_dir}/criteo_ft_afm_best.pth')
    
    pd.DataFrame(history).to_csv(f'{output_dir}/criteo_history.csv', index=False)
    
    # Final evaluation
    print("\nEvaluating on test set...")
    test_auc, test_ll = evaluate(model, te_dl)
    
    results = {
        "test_auc": float(test_auc),
        "test_logloss": float(test_ll),
        "val_auc": float(max([h['val_auc'] for h in history])),
        "val_logloss": float(min([h['val_ll'] for h in history]))
    }
    save_json(results, f'{output_dir}/criteo_results.json')
    
    print("\n" + "="*80)
    print("CRITEO FT-AFM RESULTS")
    print("="*80)
    print(f"Test AUC:     {test_auc:.4f}")
    print(f"Test LogLoss: {test_ll:.4f}")
    print(f"Saved to: {output_dir}/")
    
    return test_auc, test_ll

# ============== MAIN RUNNER ==============
def run_criteo_complete(
    data_path="/ctr_project/data/criteo/train.txt",
    preprocess_dir="criteo_preprocessed",
    output_dir="criteo_ft_afm_results",
    sample_size=None  # Set to e.g. 1000000 for testing
):
    """Complete Criteo pipeline"""
    
    # Step 1: Preprocess
    print("STEP 1: Preprocessing Criteo...")
    cat_cards, num_cols = preprocess_criteo(data_path, preprocess_dir, sample_size)
    
    # Step 2: Train FT-AFM
    print("\nSTEP 2: Training FT-AFM...")
    test_auc, test_ll = train_criteo_ft_afm(preprocess_dir, output_dir)
    
    print("\n✅ CRITEO PIPELINE COMPLETE!")
    return test_auc, test_ll

# ============== USAGE ==============

# Full dataset (45M samples, ~8-10 hours on 4×A100)
run_criteo_complete(
    data_path="ctr_project/data/criteo/train.txt",
    preprocess_dir="criteo_preprocessed",
    output_dir="criteo_ft_afm_results"
)

"""
# Test on 1M samples first (~1 hour)
run_criteo_complete(
    data_path="ctr_project/data/criteo/train.txt",
    preprocess_dir="criteo_preprocessed_1m",
    output_dir="criteo_ft_afm_results_1m",
    sample_size=1000000
)
"""

print("="*80)
print("CRITEO PIPELINE READY")
print("="*80)
print("\nDownload Criteo from:")
print("  https://www.kaggle.com/datasets/mrkmakr/criteo-dataset")
print("  or: http://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset/")
print("\nPlace train.txt in: /ctr_project/data/criteo/")
print("\nThen run: run_criteo_complete()")

GPUs available: 4
STEP 1: Preprocessing Criteo...

CRITEO PREPROCESSING
Loading from: ctr_project/data/criteo/train.txt
Loaded 45,840,617 rows, 40 columns
CTR: 0.2562
Handling missing values...
Splitting data (8:1:1)...
Train: 36,672,493, Val: 4,584,062, Test: 4,584,062
Encoding categorical features...
  C1: 1444 categories
  C2: 553 categories
  C3: 157547 categories
  C4: 117728 categories
  C5: 306 categories
  C6: 19 categories
  C7: 11880 categories
  C8: 629 categories
  C9: 4 categories
  C10: 39495 categories
  C11: 5129 categories
  C12: 156812 categories
  C13: 3174 categories
  C14: 27 categories
  C15: 11053 categories
  C16: 149078 categories
  C17: 11 categories
  C18: 4546 categories
  C19: 1998 categories
  C20: 5 categories
  C21: 154814 categories
  C22: 18 categories
  C23: 16 categories
  C24: 53042 categories
  C25: 82 categories
  C26: 40957 categories
Engineering frequency features...
Target encoding...
Standardizing numerics...
Saving preprocessed data...
✅ Prep



Epoch 01 | train_ll=0.4579 val_ll=0.4500 val_auc=0.8029 *BEST*
Epoch 02 | train_ll=0.4461 val_ll=0.4455 val_auc=0.8060 *BEST*
Epoch 03 | train_ll=0.4415 val_ll=0.4446 val_auc=0.8076 *BEST*
Epoch 04 | train_ll=0.4377 val_ll=0.4448 val_auc=0.8082 stale 1/5
Epoch 05 | train_ll=0.4337 val_ll=0.4445 val_auc=0.8079 *BEST*
Epoch 06 | train_ll=0.4293 val_ll=0.4467 val_auc=0.8071 stale 1/5
Epoch 07 | train_ll=0.4240 val_ll=0.4547 val_auc=0.8042 stale 2/5
Epoch 08 | train_ll=0.4178 val_ll=0.4574 val_auc=0.8018 stale 3/5
Epoch 09 | train_ll=0.4109 val_ll=0.4665 val_auc=0.7982 stale 4/5
Epoch 10 | train_ll=0.4033 val_ll=0.4804 val_auc=0.7934 stale 5/5
Early stopped.

Evaluating on test set...


In [4]:
import zipfile
import os

zip_path = "ctr_project/data/criteo/train.txt.zip"
extract_dir = "ctr_project/data/criteo"

print("Exists?", os.path.exists(zip_path))

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print("✅ Unzipped successfully!")


Exists? True
✅ Unzipped successfully!
