# Anti-Cheat Pre-Filter System

In [None]:
!pip install kaggle optuna catboost

In [None]:
import os
import zipfile
import joblib
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import fbeta_score
import catboost as cb
import xgboost as xgb
import lightgbm as lgb
import optuna

try:
    from google.colab import files
    COLAB_ENV = True
except ImportError:
    COLAB_ENV = False

In [None]:
# Configuration
CORRELATION_THRESHOLD = 0.8
RANDOM_STATE = 42
N_TRIALS = 200
KNN_COLS = ['kill_death_ratio', 'headshot_percentage', 'accuracy_score', 'damage_per_round']

## Data Loading Functions

In [None]:
def download_kaggle_data():
    """Download and extract Kaggle competition data."""
    if COLAB_ENV:
        try:
            uploaded = files.upload()
        except Exception as e:
            print(f"File upload failed: {e}")
    else:
        print("Running outside of Colab. Ensure kaggle.json is in ~/.kaggle/")

    if 'kaggle.json' in os.listdir('.'):
        !mkdir -p ~/.kaggle
        !mv kaggle.json ~/.kaggle/
        !chmod 600 ~/.kaggle/kaggle.json
    else:
        print("kaggle.json not found.")

    if not os.path.exists('cpe342-karena.zip'):
        print("Downloading data...")
        !kaggle competitions download -c cpe342-karena
    else:
        print("Data already downloaded.")

    if os.path.exists('cpe342-karena.zip'):
        print("Extracting data...")
        with zipfile.ZipFile('cpe342-karena.zip', 'r') as zip_ref:
            zip_ref.extractall('.')
        print("Data extracted.")

## Data Preprocessing Functions

In [None]:
def clean_data(df, knn_cols, knn_imputer=None, median_imputer=None, is_training=True):
    """Clean and impute missing values."""
    df_clean = df.copy()
    
    if is_training:
        df_clean = df_clean.drop(['id', 'player_id'], axis=1, errors='ignore')
        knn_imputer = KNNImputer(n_neighbors=5)
        median_imputer = SimpleImputer(strategy='median')
        
        df_clean[knn_cols] = knn_imputer.fit_transform(df_clean[knn_cols])
        
        remaining_cols = df_clean.columns[df_clean.isnull().any()].tolist()
        if 'is_cheater' in remaining_cols:
            remaining_cols.remove('is_cheater')
        
        if remaining_cols:
            df_clean[remaining_cols] = median_imputer.fit_transform(df_clean[remaining_cols])
        
        if 'is_cheater' in df_clean.columns:
            df_clean = df_clean[df_clean['is_cheater'].notna()].reset_index(drop=True)
    else:
        df_clean[knn_cols] = knn_imputer.transform(df_clean[knn_cols])
        remaining_cols = df_clean.columns[df_clean.isnull().any()].tolist()
        if 'is_cheater' in remaining_cols:
            remaining_cols.remove('is_cheater')
        if remaining_cols:
            df_clean[remaining_cols] = median_imputer.transform(df_clean[remaining_cols])
    
    return df_clean, knn_imputer, median_imputer

In [None]:
def engineer_features(df):
    """Create new features from existing ones."""
    df_features = df.copy()
    
    # Efficiency features
    df_features["kill_efficiency"] = df_features["kill_death_ratio"] * df_features["accuracy_score"]
    df_features["headshot_ratio_to_accuracy"] = df_features["headshot_percentage"] / (df_features["accuracy_score"] + 1e-6)
    df_features["reaction_accuracy_ratio"] = df_features["accuracy_score"] / (df_features["reaction_time_ms"] + 1e-6)
    df_features["damage_efficiency"] = df_features["damage_per_round"] / (df_features["survival_time_avg"] + 1e-6)
    
    # Stability features
    df_features["reports_per_day"] = df_features["reports_received"] / (df_features["account_age_days"] + 1)
    df_features["device_change_rate"] = df_features["device_changes_count"] / (df_features["account_age_days"] + 1)
    df_features["session_intensity"] = df_features["sessions_per_day"] * df_features["avg_session_length_min"]
    
    # Behavioral features
    df_features["performance_per_account_age"] = (
        (df_features["kill_death_ratio"] + df_features["accuracy_score"] + df_features["win_rate"]) / 
        (df_features["account_age_days"] + 1)
    )
    df_features["input_to_accuracy_ratio"] = df_features["input_consistency_score"] / (df_features["accuracy_score"] + 1e-6)
    df_features["friendliness_ratio"] = (
        df_features["communication_rate"] * df_features["team_play_score"] / 
        (df_features["reports_per_day"] + 1e-6)
    )
    
    return df_features

In [None]:
def standardize_features(df, scaler=None, is_training=True):
    """Standardize numerical features."""
    df_scaled = df.copy()
    exclude_cols = ['id', 'player_id', 'is_cheater']
    
    numerical_cols = df_scaled.select_dtypes(include=['float64', 'int64']).columns.tolist()
    numerical_cols = [col for col in numerical_cols if col not in exclude_cols]
    
    if is_training:
        scaler = StandardScaler()
        df_scaled[numerical_cols] = scaler.fit_transform(df_scaled[numerical_cols])
    else:
        df_scaled[numerical_cols] = scaler.transform(df_scaled[numerical_cols])
    
    return df_scaled, scaler

In [None]:
def remove_correlated_features(df, threshold=0.9):
    """Remove highly correlated features."""
    df_reduced = df.copy()
    exclude_cols = ['id', 'player_id', 'is_cheater']
    
    numerical_cols = df_reduced.select_dtypes(include=['float64', 'int64']).columns.tolist()
    numerical_cols = [col for col in numerical_cols if col not in exclude_cols]
    
    corr_matrix = df_reduced[numerical_cols].corr().abs()
    upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    
    features_to_drop = [column for column in upper_triangle.columns if any(upper_triangle[column] > threshold)]
    df_reduced.drop(columns=features_to_drop, inplace=True)
    
    return df_reduced, features_to_drop

## Model Optimization Functions

In [None]:
def optimize_catboost(X, y, n_trials=50, random_state=42):
    """Optimize CatBoost hyperparameters."""
    classes = np.unique(y)
    class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y)
    class_weight_dict = dict(zip(classes, class_weights))
    
    def objective(trial):
        params = {
            "iterations": trial.suggest_int("iterations", 200, 800),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3),
            "depth": trial.suggest_int("depth", 4, 12),
            "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-3, 10.0, log=True),
            "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 5.0),
            "border_count": trial.suggest_int("border_count", 32, 255),
            "loss_function": "Logloss",
            "eval_metric": "Logloss",
            "random_seed": random_state,
            "verbose": False,
            "class_weights": class_weight_dict,
        }
        
        threshold = trial.suggest_float("threshold", 0.05, 0.80)
        model = cb.CatBoostClassifier(**params)
        
        kf = StratifiedKFold(n_splits=3, shuffle=True, random_state=random_state)
        f2_scores = []
        
        for train_idx, valid_idx in kf.split(X, y):
            model.fit(X.iloc[train_idx], y.iloc[train_idx])
            preds = model.predict_proba(X.iloc[valid_idx])[:, 1]
            preds_bin = (preds > threshold).astype(int)
            f2 = fbeta_score(y.iloc[valid_idx], preds_bin, beta=2)
            f2_scores.append(f2)
        
        return np.mean(f2_scores)
    
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials)
    return study.best_params

In [None]:
def optimize_xgboost(X, y, n_trials=50, random_state=42):
    """Optimize XGBoost hyperparameters."""
    classes = np.unique(y)
    class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y)
    pos_weight = class_weights[1] / class_weights[0]
    
    def objective(trial):
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 200, 800),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3),
            "max_depth": trial.suggest_int("max_depth", 3, 12),
            "min_child_weight": trial.suggest_float("min_child_weight", 1, 20),
            "gamma": trial.suggest_float("gamma", 0, 5),
            "subsample": trial.suggest_float("subsample", 0.5, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log=True),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log=True),
            "random_state": random_state,
            "scale_pos_weight": pos_weight,
            "tree_method": "hist",
            "eval_metric": "logloss"
        }
        
        threshold = trial.suggest_float("threshold", 0.05, 0.80)
        model = xgb.XGBClassifier(**params)
        
        kf = StratifiedKFold(n_splits=3, shuffle=True, random_state=random_state)
        f2_scores = []
        
        for train_idx, valid_idx in kf.split(X, y):
            model.fit(X.iloc[train_idx], y.iloc[train_idx])
            preds = model.predict_proba(X.iloc[valid_idx])[:, 1]
            preds_bin = (preds > threshold).astype(int)
            f2 = fbeta_score(y.iloc[valid_idx], preds_bin, beta=2)
            f2_scores.append(f2)
        
        return np.mean(f2_scores)
    
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials)
    return study.best_params

In [None]:
def optimize_lightgbm(X, y, n_trials=50, random_state=42):
    """Optimize LightGBM hyperparameters."""
    classes = np.unique(y)
    class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y)
    class_weight_dict = dict(zip(classes, class_weights))
    
    def objective(trial):
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 200, 800),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3),
            "num_leaves": trial.suggest_int("num_leaves", 31, 200),
            "max_depth": trial.suggest_int("max_depth", -1, 12),
            "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
            "subsample": trial.suggest_float("subsample", 0.5, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 10.0, log=True),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 10.0, log=True),
            "random_state": random_state,
            "class_weight": class_weight_dict,
        }
        
        threshold = trial.suggest_float("threshold", 0.05, 0.80)
        model = lgb.LGBMClassifier(**params)
        
        kf = StratifiedKFold(n_splits=3, shuffle=True, random_state=random_state)
        f2_scores = []
        
        for train_idx, valid_idx in kf.split(X, y):
            model.fit(X.iloc[train_idx], y.iloc[train_idx])
            preds = model.predict_proba(X.iloc[valid_idx])[:, 1]
            preds_bin = (preds > threshold).astype(int)
            f2 = fbeta_score(y.iloc[valid_idx], preds_bin, beta=2)
            f2_scores.append(f2)
        
        return np.mean(f2_scores)
    
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials)
    return study.best_params

In [None]:
def create_ensemble(X_train, y_train, best_cb, best_xgb, best_lgb):
    """Create ensemble models."""
    threshold_cb = best_cb.pop("threshold")
    threshold_xgb = best_xgb.pop("threshold")
    threshold_lgb = best_lgb.pop("threshold")
    
    model_cb = cb.CatBoostClassifier(verbose=False, **best_cb)
    model_xgb = xgb.XGBClassifier(**best_xgb)
    model_lgb = lgb.LGBMClassifier(**best_lgb)
    
    model_cb.fit(X_train, y_train)
    model_xgb.fit(X_train, y_train)
    model_lgb.fit(X_train, y_train)
    
    return {
        "cb": model_cb,
        "xgb": model_xgb,
        "lgb": model_lgb,
        "thresholds": {
            "cb": threshold_cb,
            "xgb": threshold_xgb,
            "lgb": threshold_lgb
        }
    }

# Training Pipeline

In [None]:
# Download data
download_kaggle_data()

In [None]:
# Initialize imputers (will be fitted during training)
knn_imputer = None
median_imputer = None

## Training Data Preprocessing

In [None]:
# Load and preprocess data
df = pd.read_csv('public_dataset/task1/train.csv')
df = df[df['is_cheater'].notna()].reset_index(drop=True)

df_clean, knn_imputer, median_imputer = clean_data(df, KNN_COLS, knn_imputer, median_imputer, is_training=True)
df_feat = engineer_features(df_clean)
df_scaled, scaler = standardize_features(df_feat, is_training=True)
df_final, dropped_features = remove_correlated_features(df_scaled, threshold=CORRELATION_THRESHOLD)

X_train = df_final.drop(columns=['is_cheater'])
y_train = df_final['is_cheater']

## Model Training and Optimization

In [None]:
print("Optimizing CatBoost...")
best_cb = optimize_catboost(X_train, y_train, N_TRIALS, RANDOM_STATE)

In [None]:
print("Optimizing XGBoost...")
best_xgb = optimize_xgboost(X_train, y_train, N_TRIALS, RANDOM_STATE)

In [None]:
print("Optimizing LightGBM...")
best_lgb = optimize_lightgbm(X_train, y_train, N_TRIALS, RANDOM_STATE)

In [None]:
# Create ensemble
ensemble = create_ensemble(X_train, y_train, best_cb, best_xgb, best_lgb)

In [None]:
# Save models and preprocessors
joblib.dump(ensemble, 'ensemble.joblib')
joblib.dump(knn_imputer, 'knn_imputer.joblib')
joblib.dump(median_imputer, 'median_imputer.joblib')
joblib.dump(scaler, 'scaler.joblib')
joblib.dump(dropped_features, 'dropped_features.joblib')

if COLAB_ENV:
    files.download('ensemble.joblib')
    files.download('knn_imputer.joblib')
    files.download('median_imputer.joblib')
    files.download('scaler.joblib')
    files.download('dropped_features.joblib')

# Prediction Pipeline

## Load and Preprocess Test Data

In [None]:
def preprocess_test_data(test_path, knn_imputer, median_imputer, scaler, dropped_features):
    """Preprocess test data using fitted transformers."""
    df_test = pd.read_csv(test_path)
    test_ids = df_test['id'].copy()
    
    # Clean and transform test data
    df_test_clean, _, _ = clean_data(df_test, KNN_COLS, knn_imputer, median_imputer, is_training=False)
    df_test_feat = engineer_features(df_test_clean)
    df_test_scaled, _ = standardize_features(df_test_feat, scaler=scaler, is_training=False)
    
    # Remove dropped features
    X_test = df_test_scaled.drop(columns=[f for f in dropped_features if f in df_test_scaled.columns])
    
    return X_test, test_ids

In [None]:
def make_ensemble_predictions(ensemble, X_test):
    """Make predictions using ensemble with optimized thresholds."""
    # Get predictions from each model
    pred_cb = ensemble['cb'].predict_proba(X_test)[:, 1]
    pred_xgb = ensemble['xgb'].predict_proba(X_test)[:, 1]
    pred_lgb = ensemble['lgb'].predict_proba(X_test)[:, 1]
    
    # Apply optimized thresholds
    pred_cb_bin = (pred_cb > ensemble['thresholds']['cb']).astype(int)
    pred_xgb_bin = (pred_xgb > ensemble['thresholds']['xgb']).astype(int)
    pred_lgb_bin = (pred_lgb > ensemble['thresholds']['lgb']).astype(int)
    
    # Majority voting
    ensemble_pred = np.round((pred_cb_bin + pred_xgb_bin + pred_lgb_bin) / 3).astype(int)
    
    return ensemble_pred

In [None]:
# Load saved models and preprocessors
ensemble = joblib.load('ensemble.joblib')
knn_imputer = joblib.load('knn_imputer.joblib')
median_imputer = joblib.load('median_imputer.joblib')
scaler = joblib.load('scaler.joblib')
dropped_features = joblib.load('dropped_features.joblib')

In [None]:
# Preprocess test data
X_test, test_ids = preprocess_test_data(
    'public_dataset/task1/test.csv', 
    knn_imputer, median_imputer, scaler, dropped_features
)

## Generate Predictions and Create Submission

In [None]:
# Make predictions
predictions = make_ensemble_predictions(ensemble, X_test)

# Create submission file
submission = pd.DataFrame({
    'id': test_ids,
    'task1': predictions
})

submission.to_csv('submission.csv', index=False)
print(f"Submission created with {len(submission)} predictions")
print(f"Prediction distribution: {submission['task1'].value_counts().to_dict()}")

if COLAB_ENV:
    files.download('submission.csv')