# NVIDIA Merlin XGBoost
Complete implementation with proper memory management and debugging outputs

In [1]:
# Environment setup
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
warnings.filterwarnings('ignore')

print("✅ Environment configured")

✅ Environment configured


In [2]:
# Required libraries and versions
required_libs = {
    'nvtabular': '23.08.00',
    'cudf': '23.10',      # Prefix match
    'cupy': '13.6',       # Prefix match  
    'xgboost': '3.0',     # Minimum version
    'dask': '2023.9',
    'pandas': '1.5',
    'numpy': '1.24',
    'scikit-learn': '1.7',
    'psutil': '5.9',      # 5.9.1 works fine (used in working code)
    'pyarrow': '12.0'     # 12.0.1 works fine (used in working code)
}

# Check installed versions
import importlib
import warnings

# Suppress deprecation warnings
with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    try:
        import pkg_resources
    except:
        pkg_resources = None

missing_libs = []
all_good = True

for lib, required_version in required_libs.items():
    try:
        # Map library names for import
        import_name = lib
        if lib == 'scikit-learn':
            import_name = 'sklearn'
        
        # Check if library is installed
        module = importlib.import_module(import_name)
        
        # Get installed version
        try:
            if hasattr(module, '__version__'):
                installed_version = module.__version__
            elif pkg_resources:
                installed_version = pkg_resources.get_distribution(lib).version
            else:
                installed_version = 'unknown'
        except:
            installed_version = 'unknown'
        
        # Check version compatibility
        req_major = required_version.split('.')[0]
        inst_version_parts = installed_version.split('.')
        inst_major = inst_version_parts[0] if installed_version != 'unknown' else ''
        
        # More lenient version check
        if installed_version == 'unknown':
            print(f"⚠️  {lib:15} {installed_version:15} (required: ≥{required_version})")
        elif float(inst_major) >= float(req_major) if inst_major.isdigit() and req_major.isdigit() else installed_version.startswith(required_version[:3]):
            print(f"✅ {lib:15} {installed_version:15} (required: ≥{required_version})")
        else:
            print(f"⚠️  {lib:15} {installed_version:15} (required: ≥{required_version}) - but should work")
        
    except ImportError:
        missing_libs.append(lib)
        print(f"❌ {lib:15} NOT INSTALLED (required: ≥{required_version})")
        all_good = False

# Report
if missing_libs:
    print(f"\n❌ Missing libraries: {', '.join(missing_libs)}")
    print("Please install them using conda or pip")
elif all_good:
    print("\n✅ All required libraries are installed and compatible!")

✅ nvtabular       23.08.00        (required: ≥23.08.00)
✅ cudf            23.10.00        (required: ≥23.10)
✅ cupy            13.6.0          (required: ≥13.6)
✅ xgboost         3.0.5           (required: ≥3.0)
✅ dask            2023.9.2        (required: ≥2023.9)
✅ pandas          1.5.3           (required: ≥1.5)
✅ numpy           1.23.5          (required: ≥1.24)
✅ scikit-learn    1.7.2           (required: ≥1.7)
✅ psutil          5.9.8           (required: ≥5.9)
✅ pyarrow         12.0.1          (required: ≥12.0)

✅ All required libraries are installed and compatible!


In [3]:
# Core imports
import gc
import time
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import psutil

# GPU libraries
import cudf
import cupy as cp

# NVTabular
import nvtabular as nvt
from nvtabular import ops
from merlin.io import Dataset

# ML libraries
import xgboost as xgb
from sklearn.metrics import average_precision_score
from sklearn.model_selection import StratifiedKFold

print("✅ All libraries imported successfully")
print(f"NVTabular version: {nvt.__version__}")
print(f"XGBoost version: {xgb.__version__}")

✅ All libraries imported successfully
NVTabular version: 23.08.00
XGBoost version: 3.0.5


In [4]:
# Configuration(DATA PATH)
TRAIN_PATH = 'data/train.parquet'
OUTPUT_DIR = 'data/nvt_processed_final'
TEMP_DIR = '/tmp'
N_FOLDS = 5
FORCE_REPROCESS = False  # Set to True to reprocess data

print(f"📋 Configuration:")
print(f"   Input: {TRAIN_PATH}")
print(f"   Output: {OUTPUT_DIR}")
print(f"   Folds: {N_FOLDS}")
print(f"   Force reprocess: {FORCE_REPROCESS}")

📋 Configuration:
   Input: data/train.parquet
   Output: data/nvt_processed_final
   Folds: 5
   Force reprocess: False


In [5]:
# Memory management functions
def print_memory():
    """Print current memory usage"""
    mem = psutil.virtual_memory()
    
    try:
        import pynvml
        pynvml.nvmlInit()
        handle = pynvml.nvmlDeviceGetHandleByIndex(0)
        gpu_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
        gpu_used = gpu_info.used / 1024**3
        gpu_total = gpu_info.total / 1024**3
    except:
        gpu_used = 0
        gpu_total = 0
    
    print(f"💾 CPU: {mem.used/1024**3:.1f}GB/{mem.total/1024**3:.1f}GB ({mem.percent:.1f}%)")
    print(f"💾 GPU: {gpu_used:.1f}GB/{gpu_total:.1f}GB")
    return mem.percent

def clear_gpu_memory():
    """Clear GPU memory"""
    cp.get_default_memory_pool().free_all_blocks()
    gc.collect()
    print("🧹 GPU memory cleared")

# Test memory functions
print("Testing memory functions:")
print_memory()
clear_gpu_memory()

Testing memory functions:
💾 CPU: 9.5GB/251.6GB (5.5%)
💾 GPU: 0.6GB/24.0GB
🧹 GPU memory cleared


In [6]:
# Metric functions
def calculate_weighted_logloss(y_true, y_pred, eps=1e-15):
    """Calculate Weighted LogLoss with 50:50 class weights"""
    y_pred = np.clip(y_pred, eps, 1 - eps)
    
    mask_0 = (y_true == 0)
    mask_1 = (y_true == 1)
    
    ll_0 = -np.mean(np.log(1 - y_pred[mask_0])) if mask_0.sum() > 0 else 0
    ll_1 = -np.mean(np.log(y_pred[mask_1])) if mask_1.sum() > 0 else 0
    
    return 0.5 * ll_0 + 0.5 * ll_1

def calculate_competition_score(y_true, y_pred):
    """Calculate competition score: 0.5*AP + 0.5*(1/(1+WLL))"""
    ap = average_precision_score(y_true, y_pred)
    wll = calculate_weighted_logloss(y_true, y_pred)
    score = 0.5 * ap + 0.5 * (1 / (1 + wll))
    return score, ap, wll

print("✅ Metric functions defined")

✅ Metric functions defined


In [7]:
def create_workflow():
    """Create NVTabular workflow optimized for XGBoost"""
    print("\n🔧 Creating XGBoost-optimized workflow...")
    
    # TRUE CATEGORICAL COLUMNS (only 5)
    true_categorical = ['gender', 'age_group', 'inventory_id', 'day_of_week', 'hour']
    
    # CONTINUOUS COLUMNS (112 total)
    all_continuous = (
        [f'feat_a_{i}' for i in range(1, 19)] +  # 18
        [f'feat_b_{i}' for i in range(1, 7)] +   # 6
        [f'feat_c_{i}' for i in range(1, 9)] +   # 8
        [f'feat_d_{i}' for i in range(1, 7)] +   # 6
        [f'feat_e_{i}' for i in range(1, 11)] +  # 10
        [f'history_a_{i}' for i in range(1, 8)] +  # 7
        [f'history_b_{i}' for i in range(1, 31)] + # 30
        [f'l_feat_{i}' for i in range(1, 28)]      # 27
    )
    
    print(f"   Categorical: {len(true_categorical)} columns")
    print(f"   Continuous: {len(all_continuous)} columns")
    print(f"   Total features: {len(true_categorical) + len(all_continuous)}")
    
    # Minimal preprocessing for XGBoost
    cat_features = true_categorical >> ops.Categorify(
        freq_threshold=0,
        max_size=50000
    )
    cont_features = all_continuous >> ops.FillMissing(fill_val=0)
    
    workflow = nvt.Workflow(cat_features + cont_features + ['clicked'])
    
    print("   ✅ Workflow created (no normalization for tree models)")
    return workflow

# Test workflow creation
test_workflow = create_workflow()
print("✅ Workflow creation tested successfully")


🔧 Creating XGBoost-optimized workflow...
   Categorical: 5 columns
   Continuous: 112 columns
   Total features: 117
   ✅ Workflow created (no normalization for tree models)
✅ Workflow creation tested successfully


In [8]:
def process_data():
    """Process data with NVTabular"""
    import shutil
    
    print("\n" + "="*70)
    print("🚀 NVTabular Data Processing")
    print("="*70)
    
    # Check if already processed
    if os.path.exists(OUTPUT_DIR) and not FORCE_REPROCESS:
        try:
            test_dataset = Dataset(OUTPUT_DIR, engine='parquet')
            print(f"✅ Using existing processed data from {OUTPUT_DIR}")
            return OUTPUT_DIR
        except:
            print(f"⚠️ Existing data corrupted, reprocessing...")
            shutil.rmtree(OUTPUT_DIR)
    
    # Clear existing if needed
    if os.path.exists(OUTPUT_DIR):
        print(f"🗑️ Removing existing directory {OUTPUT_DIR}")
        shutil.rmtree(OUTPUT_DIR)
    
    start_time = time.time()
    initial_mem = print_memory()
    
    # Prepare data without 'seq' column
    temp_path = f'{TEMP_DIR}/train_no_seq.parquet'
    if not os.path.exists(temp_path):
        print("\n📋 Creating temp file without 'seq' column...")
        pf = pq.ParquetFile(TRAIN_PATH)
        cols = [c for c in pf.schema.names if c != 'seq']
        print(f"   Total columns: {len(pf.schema.names)}")
        print(f"   Using columns: {len(cols)} (excluded 'seq')")
        
        df = pd.read_parquet(TRAIN_PATH, columns=cols)
        print(f"   Loaded {len(df):,} rows")
        df.to_parquet(temp_path, index=False)
        del df
        gc.collect()
        print("   ✅ Temp file created")
    else:
        print(f"✅ Using existing temp file: {temp_path}")
    
    # Create dataset with small partitions
    print("\n📦 Creating NVTabular Dataset...")
    print("   Using 32MB partitions for memory efficiency")
    clear_gpu_memory()
    
    dataset = Dataset(
        temp_path,
        engine='parquet',
        part_size='32MB'  #change size based on your environment
    )
    print("   ✅ Dataset created")
    
    # Create and fit workflow
    print("\n📊 Fitting workflow...")
    workflow = create_workflow()
    workflow.fit(dataset)
    print("   ✅ Workflow fitted")
    
    # Transform and save
    print(f"\n💾 Transforming and saving to {OUTPUT_DIR}...")
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    clear_gpu_memory()
    
    try:
        workflow.transform(dataset).to_parquet(
            output_path=OUTPUT_DIR,
            shuffle=nvt.io.Shuffle.PER_PARTITION,
            out_files_per_proc=8
        )
        
        workflow_path = f'{OUTPUT_DIR}/workflow'
        workflow.save(workflow_path)
        print(f"   ✅ Data processed and saved")
        print(f"   ✅ Workflow saved to {workflow_path}")
        
    except Exception as e:
        print(f"❌ Error during processing: {e}")
        if os.path.exists(OUTPUT_DIR):
            shutil.rmtree(OUTPUT_DIR)
        raise
    
    elapsed = time.time() - start_time
    final_mem = print_memory()
    
    print(f"\n✅ Processing complete!")
    print(f"   Time: {elapsed:.1f}s")
    print(f"   Memory increase: +{final_mem - initial_mem:.1f}%")
    
    clear_gpu_memory()
    return OUTPUT_DIR

# Process data
processed_dir = process_data()


🚀 NVTabular Data Processing
✅ Using existing processed data from data/nvt_processed_final


In [None]:
import lightgbm as lgb
import numpy as np
from sklearn.model_selection import StratifiedKFold
import time

def run_cv_lgbm_ensemble(processed_dir, n_folds=5): 
    """Run stratified cross-validation with LightGBM ensemble of different boosting types""" 
    print("\n" + "="*70) 
    print("🔄 LightGBM Boosting Type Ensemble Cross-Validation") 
    print("="*70) 
     
    # Load processed data 
    print("\n📦 Loading processed data...") 
    start_load = time.time() 
     
    try: 
        dataset = Dataset(processed_dir, engine='parquet', part_size='256MB') 
        print("   Converting to GPU DataFrame...") 
        gdf = dataset.to_ddf().compute() 
        print(f"   ✅ Loaded {len(gdf):,} rows x {len(gdf.columns)} columns") 
        print(f"   Time: {time.time() - start_load:.1f}s") 
    except Exception as e: 
        print(f"❌ Error loading data: {e}") 
        return None 
     
    print_memory() 
     
    # Prepare data 
    print("\n📊 Preparing data for LightGBM...") 
    y = gdf['clicked'].to_numpy() 
    X = gdf.drop('clicked', axis=1) 
     
    # Convert to float32 
    for col in X.columns: 
        if X[col].dtype != 'float32': 
            X[col] = X[col].astype('float32') 
     
    X_np = X.to_numpy() 
    print(f"   Shape: {X_np.shape}") 
    print(f"   Features: {X.shape[1]}") 
    print(f"   Samples: {X.shape[0]:,}") 
     
    # Class distribution 
    pos_ratio = y.mean() 
    scale_pos_weight = (1 - pos_ratio) / pos_ratio 
    print(f"\n📊 Class distribution:") 
    print(f"   Positive ratio: {pos_ratio:.4f}") 
    print(f"   Scale pos weight: {scale_pos_weight:.2f}") 
     
    del X, gdf 
    clear_gpu_memory() 
     
    # Define different boosting types and their parameters
    boosting_configs = {
        'gbdt': {
            'boosting_type': 'gbdt',
            'objective': 'binary',
            'metric': 'binary_logloss',
            'device_type': 'cpu',
            'max_depth': 8,
            'learning_rate': 0.1,
            'feature_fraction': 0.8,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'scale_pos_weight': scale_pos_weight,
            'verbose': -1,
            'seed': 42,
            'num_leaves': 255,
            'min_data_in_leaf': 100,
        },
        'dart': {
            'boosting_type': 'dart',
            'objective': 'binary',
            'metric': 'binary_logloss',
            'device_type': 'cpu',
            'max_depth': 8,
            'learning_rate': 0.1,
            'feature_fraction': 0.8,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'scale_pos_weight': scale_pos_weight,
            'verbose': -1,
            'seed': 42,
            'num_leaves': 255,
            'min_data_in_leaf': 100,
            'drop_rate': 0.1,
            'max_drop': 50,
            'skip_drop': 0.5,
        },
        'goss': {
            'boosting_type': 'goss',
            'objective': 'binary',
            'metric': 'binary_logloss',
            'device_type': 'cpu',
            'max_depth': 8,
            'learning_rate': 0.1,
            'feature_fraction': 0.8,
            'scale_pos_weight': scale_pos_weight,
            'verbose': -1,
            'seed': 42,
            'num_leaves': 255,
            'min_data_in_leaf': 100,
            'top_rate': 0.2,
            'other_rate': 0.1,
        }
    }
     
    # Cross-validation 
    print("\n🔄 Starting cross-validation...") 
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42) 
     
    # Store results for each boosting type
    all_results = {boosting_type: {'scores': [], 'ap': [], 'wll': [], 'predictions': []} 
                   for boosting_type in boosting_configs.keys()}
    ensemble_results = {'scores': [], 'ap': [], 'wll': []}
     
    for fold, (train_idx, val_idx) in enumerate(skf.split(X_np, y), 1): 
        print(f"\n📍 Fold {fold}/{n_folds}") 
        fold_start = time.time() 
         
        print(f"   Train: {len(train_idx):,} | Val: {len(val_idx):,}") 
        
        # Create LightGBM datasets
        train_data = lgb.Dataset(X_np[train_idx], label=y[train_idx])
        val_data = lgb.Dataset(X_np[val_idx], label=y[val_idx], reference=train_data)
        
        fold_predictions = {}
        
        # Train each boosting type
        for boosting_type, params in boosting_configs.items():
            print(f"   🚀 Training {boosting_type.upper()}...")
            boosting_start = time.time()
            
            # Train model
            model = lgb.train(
                params,
                train_data,
                num_boost_round=200,
                valid_sets=[val_data],
                callbacks=[
                    lgb.early_stopping(20, verbose=False),
                    lgb.log_evaluation(0)  # No verbose output
                ]
            )
            
            # Predict
            y_pred = model.predict(X_np[val_idx], num_iteration=model.best_iteration)
            fold_predictions[boosting_type] = y_pred
            
            # Evaluate individual model
            score, ap, wll = calculate_competition_score(y[val_idx], y_pred)
            
            all_results[boosting_type]['scores'].append(score)
            all_results[boosting_type]['ap'].append(ap)
            all_results[boosting_type]['wll'].append(wll)
            all_results[boosting_type]['predictions'].append(y_pred)
            
            print(f"      {boosting_type.upper()} - Score: {score:.6f}, AP: {ap:.6f}, WLL: {wll:.6f}")
            print(f"      Best iteration: {model.best_iteration}, Time: {time.time() - boosting_start:.1f}s")
            
            del model
            clear_gpu_memory()
        
        # Create ensemble prediction (simple average)
        print("   🎯 Creating ensemble...")
        ensemble_pred = np.mean([fold_predictions[bt] for bt in boosting_configs.keys()], axis=0)
        
        # Evaluate ensemble
        ensemble_score, ensemble_ap, ensemble_wll = calculate_competition_score(y[val_idx], ensemble_pred)
        
        ensemble_results['scores'].append(ensemble_score)
        ensemble_results['ap'].append(ensemble_ap)
        ensemble_results['wll'].append(ensemble_wll)
        
        print(f"   🏆 ENSEMBLE - Score: {ensemble_score:.6f}, AP: {ensemble_ap:.6f}, WLL: {ensemble_wll:.6f}")
        print(f"   ⏱️ Total fold time: {time.time() - fold_start:.1f}s")
        
        # Cleanup
        del train_data, val_data
        clear_gpu_memory()
     
    # Final results 
    print("\n" + "="*80) 
    print("📊 Final Cross-Validation Results") 
    print("="*80) 
    
    # Individual boosting type results
    print("\n📈 Individual Boosting Type Results:")
    for boosting_type in boosting_configs.keys():
        results = all_results[boosting_type]
        mean_score = np.mean(results['scores'])
        std_score = np.std(results['scores'])
        mean_ap = np.mean(results['ap'])
        std_ap = np.std(results['ap'])
        mean_wll = np.mean(results['wll'])
        std_wll = np.std(results['wll'])
        
        print(f"\n🔹 {boosting_type.upper()}:")
        print(f"   Competition Score: {mean_score:.6f} ± {std_score:.6f}")
        print(f"   Average Precision: {mean_ap:.6f} ± {std_ap:.6f}")
        print(f"   Weighted LogLoss:  {mean_wll:.6f} ± {std_wll:.6f}")
        print(f"   All scores: {[f'{s:.6f}' for s in results['scores']]}")
    
    # Ensemble results
    print(f"\n🏆 ENSEMBLE RESULTS:")
    print(f"   Competition Score: {np.mean(ensemble_results['scores']):.6f} ± {np.std(ensemble_results['scores']):.6f}")
    print(f"   Average Precision: {np.mean(ensemble_results['ap']):.6f} ± {np.std(ensemble_results['ap']):.6f}")
    print(f"   Weighted LogLoss:  {np.mean(ensemble_results['wll']):.6f} ± {np.std(ensemble_results['wll']):.6f}")
    print(f"   All scores: {[f'{s:.6f}' for s in ensemble_results['scores']]}")
    
    # Performance comparison
    print(f"\n📊 Performance Summary:")
    best_individual = max(boosting_configs.keys(), 
                         key=lambda bt: np.mean(all_results[bt]['scores']))
    best_individual_score = np.mean(all_results[best_individual]['scores'])
    ensemble_score = np.mean(ensemble_results['scores'])
    
    print(f"   Best individual: {best_individual.upper()} ({best_individual_score:.6f})")
    print(f"   Ensemble score: {ensemble_score:.6f}")
    print(f"   Improvement: {ensemble_score - best_individual_score:+.6f}")
    
    return ensemble_results, all_results

def train_final_ensemble_models(processed_dir, boosting_configs):
    """Train final ensemble models on full dataset"""
    print("\n" + "="*70)
    print("🎯 Training Final Ensemble Models")
    print("="*70)
    
    # Load data (same as before)
    dataset = Dataset(processed_dir, engine='parquet', part_size='256MB')
    gdf = dataset.to_ddf().compute()
    
    y = gdf['clicked'].to_numpy()
    X = gdf.drop('clicked', axis=1)
    
    for col in X.columns:
        if X[col].dtype != 'float32':
            X[col] = X[col].astype('float32')
    
    X_np = X.to_numpy()
    del X, gdf
    clear_gpu_memory()
    
    # Train final models
    final_models = {}
    train_data = lgb.Dataset(X_np, label=y)
    
    for boosting_type, params in boosting_configs.items():
        print(f"\n🚀 Training final {boosting_type.upper()} model...")
        start_time = time.time()
        
        model = lgb.train(
            params,
            train_data,
            num_boost_round=200,
            callbacks=[lgb.log_evaluation(0)]
        )
        
        final_models[boosting_type] = model
        print(f"   ✅ Completed in {time.time() - start_time:.1f}s")
    
    return final_models

# Run cross-validation with ensemble
print("Starting LightGBM ensemble cross-validation...")
ensemble_results, individual_results = run_cv_lgbm_ensemble(processed_dir, N_FOLDS)

# Train final models for inference
boosting_configs = {
    'gbdt': {
        'boosting_type': 'gbdt',
        'objective': 'binary',
        'metric': 'binary_logloss',
        'device_type': 'cpu',
        'max_depth': 8,
        'learning_rate': 0.1,
        'feature_fraction': 0.8,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'verbose': -1,
        'seed': 42,
        'num_leaves': 255,
        'min_data_in_leaf': 100,
    },
    'dart': {
        'boosting_type': 'dart',
        'objective': 'binary',
        'metric': 'binary_logloss',
        'device_type': 'cpu',
        'max_depth': 8,
        'learning_rate': 0.1,
        'feature_fraction': 0.8,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'verbose': -1,
        'seed': 42,
        'num_leaves': 255,
        'min_data_in_leaf': 100,
        'drop_rate': 0.1,
        'max_drop': 50,
        'skip_drop': 0.5,
    },
    'goss': {
        'boosting_type': 'goss',
        'objective': 'binary',
        'metric': 'binary_logloss',
        'device_type': 'cpu',
        'max_depth': 8,
        'learning_rate': 0.1,
        'feature_fraction': 0.8,
        'verbose': -1,
        'seed': 42,
        'num_leaves': 255,
        'min_data_in_leaf': 100,
        'top_rate': 0.2,
        'other_rate': 0.1,
    }
}

final_models = train_final_ensemble_models(processed_dir, boosting_configs)

Starting LightGBM ensemble cross-validation...

🔄 LightGBM Boosting Type Ensemble Cross-Validation

📦 Loading processed data...
   Converting to GPU DataFrame...
   ✅ Loaded 10,704,179 rows x 118 columns
   Time: 1.8s
💾 CPU: 19.5GB/251.6GB (9.5%)
💾 GPU: 5.8GB/24.0GB

📊 Preparing data for LightGBM...
   Shape: (10704179, 117)
   Features: 117
   Samples: 10,704,179

📊 Class distribution:
   Positive ratio: 0.0191
   Scale pos weight: 51.43

🔄 Starting cross-validation...

📍 Fold 1/5
   Train: 8,563,343 | Val: 2,140,836
   🚀 Training GBDT...


In [None]:
if ensemble_results:
    cv_scores = ensemble_results['scores']
    print("\n" + "🎉"*35)
    print("COMPLETE!")
    print("🎉"*35)
    print(f"\n✅ Final CV Score: {np.mean(cv_scores):.6f} ± {np.std(cv_scores):.6f}")
    print("✅ Full dataset processed (10.7M rows)")
    print("✅ LightGBM-ensemble preprocessing (GBDT+DART+GOSS)")
    print("✅ Memory-efficient with small partitions")
    print("="*70)
else:
    print("\n⚠️ Cross-validation did not complete. Please check for errors above.")

# Final cleanup
clear_gpu_memory()
print("\n🧹 Final cleanup complete")
print_memory()


🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉
COMPLETE!
🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉🎉

✅ Final CV Score: 0.353147 ± 0.000375
✅ Full dataset processed (10.7M rows)
✅ XGBoost-optimized preprocessing (no normalization)
✅ Memory-efficient with small partitions
🧹 GPU memory cleared

🧹 Final cleanup complete
💾 CPU: 9.7GB/251.6GB (5.6%)
💾 GPU: 0.6GB/24.0GB


5.6

In [11]:
# Configuration for submission
OUTPUT_DIR = 'data/nvt_processed_final' # Define the output directory path
TEST_PATH = 'data/test.parquet'
WORKFLOW_PATH = f'{OUTPUT_DIR}/workflow'
SUBMISSION_PATH = 'data/submission.csv'
TEMP_TEST_DIR = 'tmp/nvt_processed_test' # Processed test data temp dir

print("📋 Submission Configuration:")
print(f"   Test data: {TEST_PATH}")
print(f"   Workflow: {WORKFLOW_PATH}")
print(f"   Submission file: {SUBMISSION_PATH}")
print(f"   Temp test dir: {TEMP_TEST_DIR}")

📋 Submission Configuration:
   Test data: data/test.parquet
   Workflow: data/nvt_processed_final/workflow
   Submission file: data/submission.csv
   Temp test dir: tmp/nvt_processed_test


In [None]:
def train_final_ensemble_models(processed_dir, boosting_configs, num_rounds=200):
    """
    Train the final LightGBM ensemble models on the entire processed training dataset.
    """
    print("\n" + "="*70)
    print("🚀 Training Final Ensemble Models on Full Data")
    print("="*70)

    # 1. Load full training data
    print("\n📦 Loading processed training data...")
    start_load = time.time()
    dataset = Dataset(processed_dir, engine='parquet', part_size='256MB')
    gdf = dataset.to_ddf().compute()
    print(f"   ✅ Loaded {len(gdf):,} rows in {time.time() - start_load:.1f}s")
    print_memory()

    # 2. Prepare data
    print("\n📊 Preparing training data...")
    y = gdf['clicked'].to_numpy()
    X = gdf.drop('clicked', axis=1)
    
    # Ensure float32 for LightGBM
    for col in X.columns:
        if X[col].dtype != 'float32':
            X[col] = X[col].astype('float32')
    
    X_np = X.to_numpy()
    train_data = lgb.Dataset(X_np, label=y)
    print("   ✅ Full training dataset created.")

    del gdf, X
    clear_gpu_memory()

    # 3. Train ensemble models
    print(f"\n💪 Training {len(boosting_configs)} models for {num_rounds} rounds...")
    final_models = {}
    
    for boosting_type, params in boosting_configs.items():
        print(f"\n🚀 Training {boosting_type.upper()} model...")
        start_train = time.time()
        
        model = lgb.train(
            params,
            train_data,
            num_boost_round=num_rounds,
            callbacks=[lgb.log_evaluation(50)]  # Show progress every 50 rounds
        )
        
        final_models[boosting_type] = model
        print(f"   ✅ {boosting_type.upper()} model trained in {time.time() - start_train:.1f}s")
    
    del train_data
    clear_gpu_memory()

    return final_models

# Use the same parameters from CV
pos_ratio = 0.0191  # From CV output, to recalculate scale_pos_weight
scale_pos_weight = (1 - pos_ratio) / pos_ratio

boosting_configs = {
    'gbdt': {
        'boosting_type': 'gbdt',
        'objective': 'binary',
        'metric': 'binary_logloss',
        'device_type': 'cpu',
        'max_depth': 8,
        'learning_rate': 0.1,
        'feature_fraction': 0.8,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'scale_pos_weight': scale_pos_weight,
        'verbosity': 1,  # Set to 1 to see training progress
        'seed': 42,
        'num_leaves': 255,
        'min_data_in_leaf': 100,
        'num_threads': -1,
    },
    'dart': {
        'boosting_type': 'dart',
        'objective': 'binary',
        'metric': 'binary_logloss',
        'device_type': 'cpu',
        'max_depth': 8,
        'learning_rate': 0.1,
        'feature_fraction': 0.8,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'scale_pos_weight': scale_pos_weight,
        'verbosity': 1,  # Set to 1 to see training progress
        'seed': 42,
        'num_leaves': 255,
        'min_data_in_leaf': 100,
        'drop_rate': 0.1,
        'max_drop': 50,
        'skip_drop': 0.5,
        'num_threads': -1,
    },
    'goss': {
        'boosting_type': 'goss',
        'objective': 'binary',
        'metric': 'binary_logloss',
        'device_type': 'cpu',
        'max_depth': 8,
        'learning_rate': 0.1,
        'feature_fraction': 0.8,
        'scale_pos_weight': scale_pos_weight,
        'verbosity': 1,  # Set to 1 to see training progress
        'seed': 42,
        'num_leaves': 255,
        'min_data_in_leaf': 100,
        'top_rate': 0.2,
        'other_rate': 0.1,
        'num_threads': -1,
    }
}

# The CV showed that 200 rounds is optimal
final_models = train_final_ensemble_models(processed_dir, boosting_configs, num_rounds=200)

In [None]:
import pandas as pd
import cudf
import dask.dataframe as dd
import lightgbm as lgb
import nvtabular as nvt
from merlin.io import Dataset
import numpy as np
import os
import shutil
import time
import gc
import cupy as cp

# 노트북의 헬퍼 함수
def clear_gpu_memory():
    cp.get_default_memory_pool().free_all_blocks()
    gc.collect()

def create_submission_ensemble(final_models, workflow_path, test_path, submission_path, temp_dir):
    """
    테스트 데이터를 처리하고, LightGBM 앙상블로 예측하며, 제출 파일을 생성합니다.
    """
    print("\n" + "="*70)
    print("📄 제출 파일 생성 중 (LightGBM 앙상블 최종 버전)")
    print("="*70)

    # 1. 원본 워크플로우 불러오기
    print(f"\n🔍 {workflow_path}에서 워크플로우 불러오는 중...")
    workflow = nvt.Workflow.load(workflow_path)
    print("   ✅ 워크플로우 불러오기 완료.")

    # 2. [오류 수정 1] CPU(Pandas)에서 테스트 데이터를 읽고 ID('seq') 기준으로 정렬
    print("\n🔧 CPU에서 테스트 데이터 로드 및 정렬 중...")
    test_df = pd.read_parquet(test_path)
    test_df = test_df.sort_values('seq').reset_index(drop=True)
    
    # 워크플로우 통과를 위한 더미 'clicked' 컬럼 추가
    test_df['clicked'] = 0
    test_df['clicked'] = test_df['clicked'].astype('int8')
    
    # 정렬된 Pandas DataFrame을 Dataset 객체로 만들어 타입 문제를 해결
    test_dataset_sorted = Dataset(test_df, cpu=True)
    del test_df
    gc.collect()
    print("   ✅ 정렬된 테스트 데이터 준비 완료.")

    # 3. 정렬된 테스트 데이터 처리 (순서 보존)
    print("\n💾 처리된 테스트 데이터 변환 및 저장 중...")
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
    os.makedirs(temp_dir, exist_ok=True)
    workflow.transform(test_dataset_sorted).to_parquet(output_path=temp_dir, shuffle=False)
    print("   ✅ 테스트 데이터 처리 완료.")
    clear_gpu_memory()

    # 4. [오류 수정 2] 안정적인 Dask/Pandas로 처리된 데이터 불러오기
    print("\n📦 예측을 위해 처리된 테스트 데이터 불러오는 중...")
    processed_pandas_df = dd.read_parquet(temp_dir).compute()
    processed_test_gdf = cudf.DataFrame.from_pandas(processed_pandas_df)
    del processed_pandas_df
    gc.collect()
    print(f"   ✅ {len(processed_test_gdf):,}개 테스트 행 불러오기 완료.")

    # 더미 'clicked' 컬럼 삭제
    feature_cols = [col for col in processed_test_gdf.columns if col != 'clicked']
    processed_test_gdf = processed_test_gdf[feature_cols]

    # 데이터 타입 변환 및 NumPy 배열 생성 (LightGBM용)
    for col in processed_test_gdf.columns:
        if processed_test_gdf[col].dtype != 'float32':
            processed_test_gdf[col] = processed_test_gdf[col].astype('float32')
    
    test_X = processed_test_gdf.to_numpy()
    del processed_test_gdf
    clear_gpu_memory()

    # 5. 앙상블 예측 수행
    print(f"\n🧠 {len(final_models)}개 모델로 앙상블 예측 중...")
    ensemble_predictions = []
    
    for boosting_type, model in final_models.items():
        print(f"   🔮 {boosting_type.upper()} 모델 예측 중...")
        start_time = time.time()
        pred = model.predict(test_X)
        ensemble_predictions.append(pred)
        print(f"      ✅ {boosting_type.upper()} 예측 완료 ({time.time() - start_time:.1f}s)")
    
    # 앙상블 평균 계산
    print("   🎯 앙상블 평균 계산 중...")
    final_predictions = np.mean(ensemble_predictions, axis=0)
    print("   ✅ 앙상블 예측 완료.")
    
    del test_X, ensemble_predictions
    clear_gpu_memory()

    # 6. 제출 파일 생성
    print("\n✍️ 제출 파일 생성 중...")
    SAMPLE_SUBMISSION_PATH = 'sample_submission.csv' 
    sub_df = pd.read_csv(SAMPLE_SUBMISSION_PATH)
    sub_df = sub_df.sort_values('ID').reset_index(drop=True)
    sub_df['clicked'] = final_predictions
    sub_df.to_csv(submission_path, index=False)
    print(f"   ✅ 제출 파일 저장 완료: {submission_path}")
    print(f"   미리보기:\n{sub_df.head()}")
    
    # 앙상블 통계 정보
    print(f"\n📊 앙상블 예측 통계:")
    print(f"   평균: {final_predictions.mean():.6f}")
    print(f"   표준편차: {final_predictions.std():.6f}")
    print(f"   최솟값: {final_predictions.min():.6f}")
    print(f"   최댓값: {final_predictions.max():.6f}")

# 함수 실행
try:
    create_submission_ensemble(final_models, WORKFLOW_PATH, TEST_PATH, SUBMISSION_PATH, TEMP_TEST_DIR)
except Exception as e:
    print(f"\n❌ 예기치 않은 오류가 발생했습니다: {e}")


📄 제출 파일 생성 중 (모든 문제 해결 최종 버전)

🔍 data/nvt_processed_final/workflow에서 워크플로우 불러오는 중...
   ✅ 워크플로우 불러오기 완료.

🔧 CPU에서 테스트 데이터 로드 및 정렬 중...


   ✅ 정렬된 테스트 데이터 준비 완료.

💾 처리된 테스트 데이터 변환 및 저장 중...
   ✅ 테스트 데이터 처리 완료.

📦 예측을 위해 처리된 테스트 데이터 불러오는 중...
   ✅ 1,527,298개 테스트 행 불러오기 완료.

🧠 테스트 데이터로 예측 중...
   ✅ 예측 완료.

✍️ 제출 파일 생성 중...
   ✅ 제출 파일 저장 완료: data/submission.csv
   미리보기:
             ID   clicked
0  TEST_0000000  0.623566
1  TEST_0000001  0.425492
2  TEST_0000002  0.717925
3  TEST_0000003  0.417693
4  TEST_0000004  0.870040
