In [1]:
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import glob
import os
import gc
import xgboost as xgb
import mlflow
import mlflow.xgboost
import joblib
from sklearn.datasets import dump_svmlight_file

def read_parquet_in_chunks(file_path, columns=None, chunk_size=1_000_000):
    parquet_file = pq.ParquetFile(file_path)
    total_rows = parquet_file.metadata.num_rows
    for start_row in range(0, total_rows, chunk_size):
        end_row = min(start_row + chunk_size, total_rows)
        batch = parquet_file.read_row_group(0, columns=columns, use_threads=True).slice(start_row, end_row - start_row)
        yield batch.to_pandas()

# Initialize max_date
max_date = None
file_paths = sorted(glob.glob('Data/train.parquet/*/*.parquet'))

# Determine the maximum date_id across all files
for file_path in file_paths:
    table = pq.read_table(file_path, columns=['date_id'])
    date_ids = table['date_id'].to_pandas()
    file_max_date = date_ids.max()
    if max_date is None or file_max_date > max_date:
        max_date = file_max_date

# Subtract 200 to get the cutoff date
cutoff_date = max_date - 200

print(f"Max date_id: {max_date}")
print(f"Cutoff date_id: {cutoff_date}")

Max date_id: 1698
Cutoff date_id: 1498


In [2]:
# Initialize the LIBSVM files
train_libsvm_file = 'train.libsvm'
val_libsvm_file = 'val.libsvm'

# Define the window size for rolling mean
window_size = 800

# Process each Parquet file
for file_idx, file_path in enumerate(file_paths):
    print(f"Processing file {file_idx+1}/{len(file_paths)}: {file_path}")
    
    # Read the file in chunks
    for df_chunk in read_parquet_in_chunks(file_path, chunk_size=1_000_000):
        
        df_chunk = df_chunk.sort_values(['date_id', 'time_id'])
        
        # Ensure feature columns are float64 to avoid dtype issues
        df_chunk = df_chunk.astype('float64')

        feature_cols = [col for col in df_chunk.columns if col not in ['responder_6']]

        feature_names_pickle = 'feature_columns.pkl'
        pd.to_pickle(feature_cols, feature_names_pickle)
        
        # Split into train and validation based on cutoff_date
        train_chunk = df_chunk[df_chunk['date_id'] <= cutoff_date]
        val_chunk = df_chunk[df_chunk['date_id'] > cutoff_date]
        
        # Process training data
        if not train_chunk.empty:
            X_train = train_chunk[feature_cols]
            y_train = train_chunk['responder_6']

            # Impute NaN values using backward-looking rolling mean
            X_train_imputed = X_train.copy()
            for col in feature_cols:
                # Create a mask of NaN values
                na_mask = X_train_imputed[col].isna()
                if na_mask.any():
                    # Forward fill to handle initial NaNs
                    col_ffill = X_train_imputed[col].ffill()
                    # Compute rolling mean (looking back)
                    col_roll_mean = col_ffill.rolling(window=window_size, min_periods=1).mean()
                    # Ensure data types are consistent
                    col_roll_mean = col_roll_mean.astype(X_train_imputed[col].dtype)
                    # Fill NaNs in the original series with rolling mean values
                    X_train_imputed.loc[na_mask, col] = col_roll_mean[na_mask].astype(X_train_imputed[col].dtype)
                    # If there are still NaNs (e.g., at the start), fill with overall mean
                    if X_train_imputed[col].isna().any():
                        overall_mean = X_train_imputed[col].mean()
                        X_train_imputed[col] = X_train_imputed[col].fillna(overall_mean)
            # Convert target variable to float64
            y_train = y_train.astype('float64')

            # Ensure no remaining NaNs
            X_train_imputed = X_train_imputed.fillna(0)
            
            # Append to train LIBSVM file
            with open(train_libsvm_file, 'ab') as f_train:
                dump_svmlight_file(X_train_imputed, y_train, f_train, zero_based=True)

        # Process validation data
        if not val_chunk.empty:
            X_val = val_chunk[feature_cols]
            y_val = val_chunk['responder_6']

            # Impute NaN values using backward-looking rolling mean
            X_val_imputed = X_val.copy()
            for col in feature_cols:
                # Create a mask of NaN values
                na_mask = X_val_imputed[col].isna()
                if na_mask.any():
                    # Forward fill to handle initial NaNs
                    col_ffill = X_val_imputed[col].ffill()
                    # Compute rolling mean (looking back)
                    col_roll_mean = col_ffill.rolling(window=window_size, min_periods=1).mean()
                    # Ensure data types are consistent
                    col_roll_mean = col_roll_mean.astype(X_val_imputed[col].dtype)
                    # Fill NaNs in the original series with rolling mean values
                    X_val_imputed.loc[na_mask, col] = col_roll_mean[na_mask].astype(X_val_imputed[col].dtype)
                    # If there are still NaNs, fill with overall mean
                    if X_val_imputed[col].isna().any():
                        overall_mean = X_val_imputed[col].mean()
                        X_val_imputed[col] = X_val_imputed[col].fillna(overall_mean)
            # Convert target variable to float64
            y_val = y_val.astype('float64')

            # Ensure no remaining NaNs
            X_val_imputed = X_val_imputed.fillna(0)
            
            # Append to validation LIBSVM file
            with open(val_libsvm_file, 'ab') as f_val:
                dump_svmlight_file(X_val_imputed, y_val, f_val, zero_based=True)
        
        # Clean up
        del df_chunk
        gc.collect()

        # Clean up training variables
        if 'train_chunk' in locals():
            del train_chunk
        if 'X_train' in locals():
            del X_train, y_train, X_train_imputed

        # Clean up validation variables
        if 'val_chunk' in locals():
            del val_chunk
        if 'X_val' in locals():
            del X_val, y_val, X_val_imputed

        gc.collect()

Processing file 1/10: Data/train.parquet\partition_id=0\part-0.parquet
Processing file 2/10: Data/train.parquet\partition_id=1\part-0.parquet
Processing file 3/10: Data/train.parquet\partition_id=2\part-0.parquet
Processing file 4/10: Data/train.parquet\partition_id=3\part-0.parquet
Processing file 5/10: Data/train.parquet\partition_id=4\part-0.parquet
Processing file 6/10: Data/train.parquet\partition_id=5\part-0.parquet
Processing file 7/10: Data/train.parquet\partition_id=6\part-0.parquet
Processing file 8/10: Data/train.parquet\partition_id=7\part-0.parquet
Processing file 9/10: Data/train.parquet\partition_id=8\part-0.parquet
Processing file 10/10: Data/train.parquet\partition_id=9\part-0.parquet


In [3]:
os.rename(train_libsvm_file, 'train.libsvm.cache')
os.rename(val_libsvm_file, 'val.libsvm.cache')

In [4]:

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment('XGBoost Out-of-Core Training 5')

params = {
    'max_depth': 6,
    'learning_rate': 0.0126,
    'subsample': 0.6919,
    'colsample_bytree': 0.6527,
    'gamma': 0.3388,
    'min_child_weight': 1,
    'reg_alpha': 0.1218,
    'reg_lambda': 2.7785,
    'objective': 'reg:squarederror',
    'tree_method': 'hist',
    'eval_metric': 'rmse',
    'n_jobs': -1,
    'verbosity': 1,
    'seed': 42
}

mlflow.xgboost.autolog(disable=True)

with mlflow.start_run(run_name='Training with Early Stopping'):
    
    mlflow.log_params(params)
    
    dtrain = xgb.DMatrix('train.libsvm.cache?format=libsvm')
    dval = xgb.DMatrix('val.libsvm.cache?format=libsvm')
    
    evals = [(dtrain, 'train'), (dval, 'validation')]
    
    # Train the model with early stopping
    model = xgb.train(
        params,
        dtrain,
        num_boost_round=1000,
        evals=evals,
        early_stopping_rounds=10,
        verbose_eval=10
    )
    
    # Log the best iteration
    mlflow.log_metric('best_iteration', model.best_iteration)
    
    # Save and log the model using joblib
    joblib.dump(model, 'model_with_early_stopping.joblib')
    mlflow.log_artifact('model_with_early_stopping.joblib')

    feature_names = pd.read_pickle('feature_columns.pkl')
    input_example = pd.DataFrame([[0]*len(feature_names)], columns=feature_names)
    
    # Log the model
    mlflow.xgboost.log_model(model, artifact_path='model', input_example=input_example)
    
    # Log feature importance
    importance = model.get_score(importance_type='gain')
    importance_df = pd.DataFrame({
        'feature': list(importance.keys()),
        'importance': list(importance.values())
    })
    importance_df.to_csv('feature_importance.csv', index=False)
    mlflow.log_artifact('feature_importance.csv')

2024/11/24 17:03:52 INFO mlflow.tracking.fluent: Experiment with name 'XGBoost Out-of-Core Training 5' does not exist. Creating a new experiment.


[0]	train-rmse:0.99396	validation-rmse:0.67732
[10]	train-rmse:0.93150	validation-rmse:0.63949
[20]	train-rmse:0.85972	validation-rmse:0.61806
[30]	train-rmse:0.80078	validation-rmse:0.61863
[32]	train-rmse:0.78740	validation-rmse:0.61907




Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

2024/11/24 17:04:22 INFO mlflow.tracking._tracking_service.client: 🏃 View run Training with Early Stopping at: http://localhost:5000/#/experiments/9/runs/686af10ca2b44e83b4ddf8d2b3499d99.
2024/11/24 17:04:22 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://localhost:5000/#/experiments/9.


In [5]:
best_iteration = model.best_iteration
# XGBoost uses 0-based indexing, so add 1 to get the total number of boosting rounds
optimal_num_boost_round = best_iteration + 1
print(f"Optimal number of boosting rounds: {optimal_num_boost_round}")

Optimal number of boosting rounds: 24
