<a href="https://colab.research.google.com/github/LastHopeForRaoha/11-22-24/blob/main/ufc_prediction_B_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.ensemble import StackingClassifier
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from lightgbm import LGBMClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report, confusion_matrix
import joblib
import warnings
import logging
import sys

# Suppress warnings and unnecessary logs
warnings.filterwarnings("ignore")
logging.getLogger("lightgbm").setLevel(logging.CRITICAL)
logging.getLogger("xgboost").setLevel(logging.CRITICAL)

# File path
file_path = "/content/ULTIMATE_DATABASE.csv"

# Step 1: Load and Filter Dataset
def load_and_filter_dataset(file_path):
    print("\n--- Step 1: Loading and Filtering Dataset ---")
    dataset = pd.read_csv(file_path, low_memory=False)

    # Drop duplicate columns
    dataset = dataset.loc[:, ~dataset.columns.duplicated()]

    # Filter out women's fights and REDFLAG == False
    print("Filtering out women's fights and invalid rows...")
    dataset = dataset[~dataset['weight_class'].str.contains("Women", case=False, na=False)]
    if 'REDFLAG' in dataset.columns:
        dataset = dataset[dataset['REDFLAG'] == True]

    # Replace missing values with placeholders
    placeholder_map = {
        'R_method_of_win': "Unknown",
        'B_method_of_win': "Unknown",
        'R_round_of_win': -1,
        'B_round_of_win': -1
    }
    for col, placeholder in placeholder_map.items():
        if col in dataset.columns:
            dataset[col] = dataset[col].fillna(placeholder)
            print(f"Filled missing values in '{col}' with '{placeholder}'")
        else:
            print(f"Warning: Column '{col}' not found!")

    # Drop rows missing 'Winner'
    if 'Winner' in dataset.columns:
        print("Dropping rows missing 'Winner' column...")
        dataset = dataset.dropna(subset=['Winner'])
    else:
        print("Error: 'Winner' column not found!")
        sys.exit()

    print(f"Dataset filtered to {dataset.shape[0]} rows and {dataset.shape[1]} columns.")
    return dataset

# Step 2: Preprocess Dataset
def preprocess_dataset(dataset, target_column):
    print(f"\n--- Step 2: Preprocessing Dataset for Target '{target_column}' ---")
    if target_column not in dataset.columns:
        print(f"Target column '{target_column}' not found. Skipping...")
        return None, None

    # Handle low-frequency classes
    y = dataset[target_column]
    class_counts = y.value_counts()
    y = y[y.isin(class_counts[class_counts > 1].index)]
    dataset = dataset.loc[y.index]

    # Drop target column and separate features
    X = dataset.drop(columns=[target_column], errors='ignore')
    num_cols = X.select_dtypes(include=["float64", "int64"]).columns
    cat_cols = X.select_dtypes(include=["object"]).columns

    # Preprocessing pipelines
    num_pipeline = Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler())])
    cat_pipeline = Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), ('encoder', OneHotEncoder(drop='first'))])
    preprocessor = ColumnTransformer([('num', num_pipeline, num_cols), ('cat', cat_pipeline, cat_cols)])

    X_preprocessed = preprocessor.fit_transform(X)
    print(f"Processed dataset has {X_preprocessed.shape[1]} features.")
    return X_preprocessed, y

# Step 3: Hyperparameter Tuning
def hyperparameter_tuning(model, param_grid, X_train, y_train):
    print(f"Performing hyperparameter tuning for {type(model).__name__}...")
    grid_search = GridSearchCV(model, param_grid, cv=3, scoring='accuracy', verbose=0)
    grid_search.fit(X_train, y_train)
    print("Best Parameters:", grid_search.best_params_)
    return grid_search.best_estimator_

# Step 4: Train, Evaluate, and Save Models
def train_and_evaluate(X_train, y_train, X_test, y_test, task_name):
    print(f"\n--- Training and Evaluating Model for {task_name} ---")
    models = {
        'XGBoost': XGBClassifier(use_label_encoder=False, eval_metric='logloss'),
        'CatBoost': CatBoostClassifier(verbose=0),
        'LightGBM': LGBMClassifier(verbose=-1)
    }

    # Hyperparameter tuning for each model
    param_grids = {
        'XGBoost': {'n_estimators': [100, 200], 'max_depth': [3, 5]},
        'CatBoost': {'depth': [6, 8], 'iterations': [200, 300]},
        'LightGBM': {'n_estimators': [100, 200], 'learning_rate': [0.05, 0.1]}
    }
    tuned_models = {}
    for name, model in models.items():
        tuned_models[name] = hyperparameter_tuning(model, param_grids[name], X_train, y_train)

    # Stacked model with meta-learner
    print("Building Stacked Ensemble Model...")
    stack = StackingClassifier(
        estimators=[(name, model) for name, model in tuned_models.items()],
        final_estimator=LogisticRegression(max_iter=1000),
        cv=3
    )
    stack.fit(X_train, y_train)

    # Evaluate
    y_pred = stack.predict(X_test)
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("Classification Report:\n", classification_report(y_test, y_pred))
    print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

    # Save models
    print("\nSaving Models...")
    for name, model in tuned_models.items():
        model.save_model(f"{name}_model.json")
        print(f"{name} model saved as '{name}_model.json'")
    joblib.dump(stack, "stacked_model.pkl")
    print("Stacked Model saved as 'stacked_model.pkl'")

# Step 5: Execute Task
def execute_task(dataset, target_column, task_name):
    X, y = preprocess_dataset(dataset, target_column)
    if X is None or y is None or y.nunique() < 2:
        print(f"Skipping {task_name}: Not enough data.")
        return
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    train_and_evaluate(X_train, y_train, X_test, y_test, task_name)

# Main Execution
print("Starting UFC Fight Prediction Pipeline...")
dataset = load_and_filter_dataset(file_path)

if not dataset.empty:
    execute_task(dataset, 'Winner', "Win/Loss Prediction")
    execute_task(dataset, 'R_method_of_win', "Method of Victory Prediction")
    execute_task(dataset, 'B_method_of_win', "Method of Victory Prediction (Blue)")
    execute_task(dataset, 'R_round_of_win', "Round Prediction")
    execute_task(dataset, 'B_round_of_win', "Round Prediction (Blue)")
else:
    print("Dataset is empty after filtering. Please check the input data.")
print("\nPipeline Completed Successfully!")
