In [1]:
#  PredictionBot
#  Copyright (C) 2025 CatraMyBeloved
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.

import sqlite3
import pandas as pd
import numpy as np

In [2]:
table_names = ["teams", "bans", "hero_composition", "heroes",
               "maps", "match_maps", "matches", "rounds"]

def load_data_from_sqlite(table_name: str,db_path: str = "../../data/owcs.db") \
        -> (
        pd.DataFrame):
    """
    Load data from a SQLite database into a Pandas DataFrame.

    Args:
        db_path (str): Path to the SQLite database file.
        table_name (str): Name of the table to load.

    Returns:
        pd.DataFrame: DataFrame containing the loaded data.
    """
    # Connect to the SQLite database
    conn = sqlite3.connect(db_path)

    # Load data from the specified table into a DataFrame
    df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)

    # Close the connection
    conn.close()

    return df

def load_all_tables(db_path: str = "../../data/owcs.db") -> dict:
    """
    Load all tables from the SQLite database into a dictionary of DataFrames.

    Args:
        db_path (str): Path to the SQLite database file.

    Returns:
        dict: Dictionary containing DataFrames for each table.
    """
    data = {}
    for table_name in table_names:
        data[table_name] = load_data_from_sqlite(table_name, db_path)
    return data

In [3]:
data = load_all_tables()

hero_composition = data["hero_composition"]
rounds = data["rounds"]
match_maps = data["match_maps"]
matches = data["matches"]
teams = data["teams"]
heroes = data["heroes"]
maps = data["maps"]
bans = data["bans"]

def determine_iswin(row: pd.Series) -> int:
    if row["team"] == row["map_win_team_id"]:
        return 1
    else:
        return 0

def join_all_tables() -> pd.DataFrame:
    """
    Join all tables in the database to create a comprehensive DataFrame.

    Returns:
        pd.DataFrame: DataFrame containing the joined data.
    """
    # Join the tables using the appropriate keys
    df = pd.merge(hero_composition, heroes, on = "hero_id")
    df = pd.merge(df, rounds, on="round_id")
    df = pd.merge(df, match_maps, on="match_map_id")
    df = pd.merge(df, matches, on="match_id")
    df = pd.merge(df, teams, left_on="team", right_on="team_id")
    df = pd.merge(df, maps, on="map_id")
    df["is_win"]  = df.apply(determine_iswin, axis=1)

    return df

In [4]:
def played_hero_transformation(group_df: pd.DataFrame) -> pd.Series:
    """
    Transform the played hero data for each team in a match.

    Args:
        group_df (pd.DataFrame): DataFrame containing the played hero data.

    Returns:
        pd.DataFrame: Transformed DataFrame with played hero data.
    """
    tank_played = group_df[group_df.role == "tank"]
    dps_played = group_df[group_df.role == "dps"].head(2)
    support_played = group_df[group_df.role == "sup"].head(2)
    map_name = group_df["map_name"].values[0]
    is_win = group_df["is_win"].values[0]



    transformed_dict = {
        "map_name": map_name,
        "is_win": is_win,
        "tank_hero": tank_played["hero_name"].values[0],
        "dps_heroes": list(dps_played["hero_name"].values.tolist()),
        "support_heroes": list(support_played["hero_name"].values.tolist())
    }


    return pd.Series(transformed_dict)

In [5]:

def create_composition_table() -> pd.DataFrame:
    """
    Create a DataFrame containing the hero composition for each map.

    Returns:
        pd.DataFrame: DataFrame containing the hero composition.
    """
    df = join_all_tables()

    # group table by matchmap_id and round_id

    all_columns = list(df.columns)

    df = df.groupby(["match_map_id", "round_id", "team_id"])

    transformed_series = df[all_columns].apply(played_hero_transformation)

    transformed_df = transformed_series.reset_index()

    transformed_df['team_index'] = transformed_df.groupby('round_id').cumcount()

    return transformed_df

In [6]:
def add_opponents(df: pd.DataFrame) -> pd.DataFrame:
    df_opponents = df.copy()

    df_opponents = df_opponents[["round_id", "team_id","team_index", "tank_hero", "dps_heroes", "support_heroes"]]

    self_merged_df = pd.merge(df, df_opponents, on="round_id", suffixes=("", "_opp"))

    self_merged_df = self_merged_df[self_merged_df["team_index"] != self_merged_df["team_index_opp"]]

    return self_merged_df

In [7]:
# group by round_id and sample one row per round
def sample_one_per_round(df: pd.DataFrame) -> pd.DataFrame:
    """
    Sample one row per round from the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing the data.

    Returns:
        pd.DataFrame: DataFrame with one row per round.
    """
    all_columns = list(df.columns)
    sampled_df = df.groupby("round_id")[all_columns].apply(lambda x: x.sample(1)).reset_index(drop=True)
    return sampled_df


In [8]:
def add_bans(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add ban information to the DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing the data.

    Returns:
        pd.DataFrame: DataFrame with ban information added.
    """
    bans = data["bans"]
    joined_team1 = pd.merge(df, bans, on=["match_map_id", "team_id"], how="left")
    joined_team1.rename(columns={"hero_id": "ban_hero"}, inplace=True)
    joined_team2 = pd.merge(joined_team1, bans, left_on=["match_map_id", "team_id_opp"], right_on = ["match_map_id", "team_id"], how="left", suffixes=("", "_opp_2"))
    joined_team2.rename(columns={"hero_id": "ban_hero_opp"}, inplace=True)

    return joined_team2


In [31]:
def clean_table(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean the DataFrame by dropping unnecessary columns.

    Args:
        df (pd.DataFrame): DataFrame containing the data.

    Returns:
        pd.DataFrame: Cleaned DataFrame.
    """
    columns_to_drop = ["match_map_id", "round_id", "team_index", "team_index_opp", "first_bool", "first_bool_opp_2", "team_id_opp_2"]
    columns_to_int = ["team_id", "team_id_opp", "ban_hero", "ban_hero_opp"]
    cleaned_df = df.drop(columns=columns_to_drop)

    # 2. Drop any rows where these columns are null
    cleaned_df = cleaned_df.dropna(subset=columns_to_int)

    # 3. Now safely cast to int
    for col in columns_to_int:
        cleaned_df[col] = cleaned_df[col].astype(int)
    return cleaned_df

In [66]:
def prepare_prediction_data(df, heroes_df, teams_df) -> pd.DataFrame:
    """
    Prepare the data for prediction by joining with heroes table
    and selecting only the necessary columns.
    """
    # Join ban_hero with heroes table
    df_with_bans = df.copy()
    df_with_bans = df_with_bans.merge(
        heroes_df[['hero_id', 'hero_name']],
        left_on='ban_hero',
        right_on='hero_id',
        how='left',
        suffixes=('', '_ban')
    ).rename(columns={'hero_name': 'banned_hero'})

    # Join ban_hero_opp with heroes table
    df_with_bans = df_with_bans.merge(
        heroes_df[['hero_id', 'hero_name']],
        left_on='ban_hero_opp',
        right_on='hero_id',
        how='left',
        suffixes=('', '_ban_opp')
    ).rename(columns={'hero_name': 'banned_hero_opp'})

    df_with_bans = df_with_bans.merge(
        teams_df[['team_id', 'team_name']],
        left_on='team_id',
        right_on='team_id',
        how='left',
    ).rename(columns={'team_name': 'team_name_x'})

    df_with_bans = df_with_bans.merge(
        teams_df[['team_id', 'team_name']],
        left_on='team_id_opp',
        right_on='team_id',
        how='left',
        suffixes=('', '_opp')
    ).rename(columns={'team_name': 'team_name_opp'})

    relevant_df = df_with_bans[[
        "team_name_x",
        "team_name_opp",
        "map_name",
        "is_win",
        "banned_hero",
        "banned_hero_opp"
    ]].rename(columns = {"team_name_x" : "team_name"})

    return relevant_df

In [70]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix

In [53]:
transformed_data = sample_one_per_round(add_opponents(create_composition_table()))
transformed_data = add_bans(transformed_data)

In [67]:
ml_data = clean_table(transformed_data)
ml_data_prepared = prepare_prediction_data(ml_data, heroes, teams)

In [69]:
X = ml_data_prepared.drop(columns=["is_win"])
y = ml_data_prepared["is_win"]

In [71]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [73]:
categorical_transformer = OneHotEncoder(handle_unknown='ignore')

In [74]:
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', categorical_transformer, X.columns)
    ])

In [80]:
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=200, random_state=42))
])

In [81]:
model.fit(X_train, y_train)

In [82]:
print(f"Accuracy: {model.score(X_test, y_test):.4f}")

Accuracy: 0.8844


In [83]:
y_pred = model.predict(X_test)
print("\nClassification Report:")
print(classification_report(y_test, y_pred))


print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))


Classification Report:
              precision    recall  f1-score   support

           0       0.89      0.89      0.89       175
           1       0.88      0.88      0.88       171

    accuracy                           0.88       346
   macro avg       0.88      0.88      0.88       346
weighted avg       0.88      0.88      0.88       346


Confusion Matrix:
[[155  20]
 [ 20 151]]


In [85]:
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
import lightgbm as lgb
from sklearn.model_selection import cross_val_score

# Create a list of models to try
models = {
    'Random Forest': RandomForestClassifier(n_estimators=200, random_state=42),
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'XGBoost': xgb.XGBClassifier(n_estimators=150, random_state=42),
    'LightGBM': lgb.LGBMClassifier(n_estimators=150, random_state=42)
}

# Compare models with cross-validation
results = {}
for name, model_clf in models.items():
    # Create a pipeline with this model
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model_clf)
    ])

    # Perform 5-fold cross-validation
    cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean_score': cv_scores.mean(),
        'std_score': cv_scores.std()
    }
    print(f"{name}: Accuracy = {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")

Random Forest: Accuracy = 0.8958 (±0.0247)
Logistic Regression: Accuracy = 0.7156 (±0.0122)
XGBoost: Accuracy = 0.8415 (±0.0220)
[LightGBM] [Info] Number of positive: 548, number of negative: 557
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000628 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 212
[LightGBM] [Info] Number of data points in the train set: 1105, number of used features: 106
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.495928 -> initscore=-0.016290
[LightGBM] [Info] Start training from score -0.016290
[LightGBM] [Info] Number of positive: 548, number of negative: 557
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000042 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total B

selected models, random forest, extra trees, neural network

In [95]:
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.neural_network import MLPClassifier
from scipy.stats import randint, uniform
import numpy as np

# Random Forest hyperparameter grid
rf_param_grid = {
    'classifier__n_estimators': randint(100, 1000),  # Number of trees
    'classifier__max_depth': [None, 10, 20, 30, 40, 50],  # Max tree depth
    'classifier__min_samples_split': randint(2, 20),  # Min samples to split
    'classifier__min_samples_leaf': randint(1, 10),  # Min samples at leaf
    'classifier__max_features': ['sqrt', 'log2', None],  # Features per split
    'classifier__bootstrap': [True, False],  # Bootstrap samples
    'classifier__class_weight': ['balanced', 'balanced_subsample', None]  # Class weighting
}

# Extra Trees hyperparameter grid
et_param_grid = {
    'classifier__n_estimators': randint(100, 1000),
    'classifier__max_depth': [None, 10, 20, 30, 40, 50],
    'classifier__min_samples_split': randint(2, 20),
    'classifier__min_samples_leaf': randint(1, 10),
    'classifier__max_features': ['sqrt', 'log2', None],
    'classifier__bootstrap': [True, False],
    'classifier__class_weight': ['balanced', 'balanced_subsample', None]
}

# Neural Network hyperparameter grid
nn_param_grid = {
    'classifier__hidden_layer_sizes': [(50,), (100,), (50, 50), (100, 50), (100, 100)],
    'classifier__activation': ['relu', 'tanh'],
    'classifier__alpha': uniform(0.0001, 0.01),  # L2 regularization
    'classifier__learning_rate_init': uniform(0.001, 0.1),
    'classifier__batch_size': [32, 64, 128, 'auto'],
    'classifier__solver': ['adam', 'sgd'],
    'classifier__early_stopping': [True, False]
}

In [96]:
# Create pipelines
rf_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(random_state=42))
])

et_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', ExtraTreesClassifier(random_state=42))
])

nn_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', MLPClassifier(max_iter=500, random_state=42))
])

# Store pipelines in a dictionary for easier handling
pipelines = {
    'RandomForest': (rf_pipeline, rf_param_grid),
    'ExtraTrees': (et_pipeline, et_param_grid),
    'NeuralNetwork': (nn_pipeline, nn_param_grid)
}

In [97]:
# Dictionary to store best models
best_models = {}
best_scores = {}

# Tune each model
for model_name, (pipeline, param_grid) in pipelines.items():
    print(f"\nTuning {model_name}...")

    # Create randomized search
    random_search = RandomizedSearchCV(
        pipeline,
        param_distributions=param_grid,
        n_iter=25,  # Number of parameter settings sampled
        cv=5,        # 5-fold cross-validation
        scoring='accuracy',
        random_state=42,
        n_jobs=-1,   # Use all CPU cores
        verbose=1
    )

    # Fit randomized search
    random_search.fit(X_train, y_train)

    # Store best model and score
    best_models[model_name] = random_search.best_estimator_
    best_scores[model_name] = {
        'best_params': random_search.best_params_,
        'cv_score': random_search.best_score_,
        'test_score': random_search.best_estimator_.score(X_test, y_test)
    }

    # Print results
    print(f"Best parameters for {model_name}:")
    for param, value in random_search.best_params_.items():
        print(f"  {param}: {value}")
    print(f"Best CV accuracy: {random_search.best_score_:.4f}")
    print(f"Test accuracy: {random_search.best_estimator_.score(X_test, y_test):.4f}")


Tuning RandomForest...
Fitting 5 folds for each of 25 candidates, totalling 125 fits
Best parameters for RandomForest:
  classifier__bootstrap: False
  classifier__class_weight: None
  classifier__max_depth: 20
  classifier__max_features: log2
  classifier__min_samples_leaf: 1
  classifier__min_samples_split: 5
  classifier__n_estimators: 661
Best CV accuracy: 0.8842
Test accuracy: 0.8699

Tuning ExtraTrees...
Fitting 5 folds for each of 25 candidates, totalling 125 fits
Best parameters for ExtraTrees:
  classifier__bootstrap: False
  classifier__class_weight: None
  classifier__max_depth: 20
  classifier__max_features: log2
  classifier__min_samples_leaf: 1
  classifier__min_samples_split: 5
  classifier__n_estimators: 661
Best CV accuracy: 0.8856
Test accuracy: 0.8728

Tuning NeuralNetwork...
Fitting 5 folds for each of 25 candidates, totalling 125 fits
Best parameters for NeuralNetwork:
  classifier__activation: tanh
  classifier__alpha: 0.010022115592912174
  classifier__batch_siz

In [98]:
# Compare tuned models
print("\nModel Comparison After Tuning:")
print("-" * 60)
print(f"{'Model':<15} {'CV Accuracy':<15} {'Test Accuracy':<15}")
print("-" * 60)
for model_name in best_scores:
    cv_score = best_scores[model_name]['cv_score']
    test_score = best_scores[model_name]['test_score']
    print(f"{model_name:<15} {cv_score:.4f}{'':8} {test_score:.4f}{'':8}")

# Identify the best model
best_model_name = max(best_scores, key=lambda x: best_scores[x]['test_score'])
print(f"\nBest model: {best_model_name} with test accuracy {best_scores[best_model_name]['test_score']:.4f}")


Model Comparison After Tuning:
------------------------------------------------------------
Model           CV Accuracy     Test Accuracy  
------------------------------------------------------------
RandomForest    0.8842         0.8699        
ExtraTrees      0.8856         0.8728        
NeuralNetwork   0.8893         0.8931        

Best model: NeuralNetwork with test accuracy 0.8931


In [103]:
from sklearn.ensemble import VotingClassifier

ensemble = VotingClassifier(
    estimators=[
        ('nn', best_models['NeuralNetwork']),
        ('et', best_models['ExtraTrees']),
        ('rf', best_models['RandomForest'])
    ],
    voting='soft',
    weights=[0.5, 0.25, 0.25]  # Weight the neural network higher
)

ensemble.fit(X_train, y_train)
ensemble_acc = ensemble.score(X_test, y_test)
print(f"Ensemble accuracy: {ensemble_acc:.4f}")

Ensemble accuracy: 0.8960


In [104]:
from sklearn.ensemble import VotingClassifier
import joblib

# Get the best parameters for each model from previous tuning
rf_best_params = {k.replace('classifier__', ''): v for k, v in best_scores['RandomForest']['best_params'].items()}
et_best_params = {k.replace('classifier__', ''): v for k, v in best_scores['ExtraTrees']['best_params'].items()}
nn_best_params = {k.replace('classifier__', ''): v for k, v in best_scores['NeuralNetwork']['best_params'].items()}

# Create models with best parameters
final_rf = RandomForestClassifier(**rf_best_params, random_state=42)
final_et = ExtraTreesClassifier(**et_best_params, random_state=42)
final_nn = MLPClassifier(**nn_best_params, random_state=42, max_iter=1000)

# Preprocess the complete dataset
X_all = ml_data_prepared.drop(columns=['is_win'])
y_all = ml_data_prepared['is_win']

# Fit preprocessor on all data
preprocessor.fit(X_all)

# Transform the data
X_all_transformed = preprocessor.transform(X_all)

# Train each model on the full dataset
print("Training Random Forest on full dataset...")
final_rf.fit(X_all_transformed, y_all)

print("Training Extra Trees on full dataset...")
final_et.fit(X_all_transformed, y_all)

print("Training Neural Network on full dataset...")
final_nn.fit(X_all_transformed, y_all)

# Create the final ensemble
final_ensemble = VotingClassifier(
    estimators=[
        ('rf', final_rf),
        ('et', final_et),
        ('nn', final_nn)
    ],
    voting='soft'
)

print("Training Ensemble on full dataset...")
final_ensemble.fit(X_all_transformed, y_all)

print("All models trained on the full dataset!")

Training Random Forest on full dataset...
Training Extra Trees on full dataset...
Training Neural Network on full dataset...
Training Ensemble on full dataset...
All models trained on the full dataset!


In [105]:
# Create a models directory if it doesn't exist
import os
if not os.path.exists('models'):
    os.makedirs('models')

# Save the preprocessor
joblib.dump(preprocessor, 'models/preprocessor.pkl')

# Save individual models
joblib.dump(final_rf, 'models/random_forest.pkl')
joblib.dump(final_et, 'models/extra_trees.pkl')
joblib.dump(final_nn, 'models/neural_network.pkl')

# Save the ensemble
joblib.dump(final_ensemble, 'models/ensemble.pkl')

print("All models saved successfully!")

All models saved successfully!
