In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from xgboost import XGBClassifier
import utils
from sklearn.impute import SimpleImputer
from visualization_and_analysis import create_matplotlib_table, plot_confusion_matrix
import os
from imblearn.over_sampling import RandomOverSampler

In [None]:
# Get X (features) and y (target)
def get_X_y(data):
    X = data.drop(['veracity', 'nationality', 'position', 'source'], axis=1)
    y = data['veracity']
    return X, y

In [None]:
# Impute missing values using mean imputation
def impute_missing_values(X):
    imputer = SimpleImputer(strategy='mean')
    X_imputed = imputer.fit_transform(X)
    X_imputed = pd.DataFrame(X_imputed, columns=X.columns)
    return X_imputed

In [None]:
def apply_ros(X_train, y_train):
    print("\nApplying Random Over Sampling...")
    print(f"Before applying ROS, the number of samples in the minority class: {y_train.value_counts()[0]}")
    print(f"Before applying ROS, the number of samples in the majority class: {y_train.value_counts()[1]}")

    ros = RandomOverSampler(random_state=42)
    X_train_resampled, y_train_resampled = ros.fit_resample(X_train, y_train)

    print(f"After applying ROS, the number of samples in the minority class: {y_train_resampled.value_counts()[0]}")
    print(f"After applying ROS, the number of samples in the majority class: {y_train_resampled.value_counts()[1]}")

    return X_train_resampled, y_train_resampled

In [None]:
def split_data(data, oversample=False):
    X, y = get_X_y(data)
    X_imputed = impute_missing_values(X)
    X_train, X_test, y_train, y_test = train_test_split(X_imputed, y, test_size=0.4, random_state=42)

    if oversample:
        X_train_resampled, y_train_resampled = apply_ros(X_train, y_train)
        return X_train_resampled, X_test, y_train_resampled, y_test
    else:
        return X_train, X_test, y_train, y_test

In [None]:
def train_and_evaluate_model(model, X_train, X_test, y_train, y_test, cv):
    # Train and evaluate the model on the test set
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    # Perform k-fold cross-validation and compute mean accuracy
    cv_accuracy = np.mean(cross_val_score(model, X_train, y_train, cv=cv, scoring='accuracy'))
    report = classification_report(y_test, y_pred, output_dict=True)
    # Add cross-validated accuracy to the report dictionary
    report['cross_validated_accuracy'] = cv_accuracy

    return accuracy, cv_accuracy, report, y_pred

In [None]:
def top_5_feature_importances(model, X_train):
    total_features = len(X_train.columns)
    feature_importances = sorted(zip(X_train.columns, model.feature_importances_), key=lambda x: x[1], reverse=True)
    top_5_features = feature_importances[:5]

    return total_features, top_5_features

In [None]:
def convert_top_5_feature_importances_to_df(top_5_features):
    return pd.DataFrame(top_5_features, columns=['Feature', 'Importance']).set_index('Feature')

In [None]:
def print_top_5_feature_importances(model_name, model, X_train):
    total_features, top_5_features = top_5_feature_importances(model, X_train)
    print(f"\n{model_name} top 5 features out of {total_features} features:")
    for feature, importance in top_5_features:
        print(f"{feature}: {importance:.4f}")

In [None]:
def convert_model_report_to_df(report):
    report_df = pd.DataFrame(report).transpose()
    report_df = report_df.drop(columns=['support'])
    report_df = report_df.drop(['macro avg', 'weighted avg'])
    return report_df

In [None]:
def print_model_results(model_name, accuracy, cv_accuracy, report_df):
    print(f"\n{model_name} cross-validated accuracy: {cv_accuracy}")
    print(f"{model_name} accuracy: {accuracy:.4f}")
    print("Classification report table:")
    print(report_df)

In [None]:
def print_confusion_matrix(model_name, y_test, y_pred):
    print(f"{model_name} confusion matrix:")
    print(confusion_matrix(y_test, y_pred))

In [None]:
def find_best_model(models, X_train, X_test, y_train, y_test, cv):
    best_model = None
    best_model_name = None
    best_accuracy = 0

    for model_name, model in models.items():
        print(f"\n{model_name} model:")
        accuracy, cv_accuracy, report, _ = train_and_evaluate_model(model, X_train, X_test, y_train, y_test, cv)
        show_model_report_results(model_name, accuracy, cv_accuracy, report)

        if accuracy > best_accuracy:
            best_model = model
            best_model_name = model_name
            best_accuracy = accuracy

    print(f"\n===================== Best model: {best_model_name} =====================")

    return best_model, best_model_name

In [None]:
def show_model_report_results(model_name, accuracy, cv_accuracy, report, save_path=None):
    report_df = convert_model_report_to_df(report).round(4)
    print_model_results(model_name, accuracy, cv_accuracy, report_df)
    if not save_path:
        save_path = os.path.join('results', f'{model_name.lower()}_classification_report.png')

    create_matplotlib_table(report_df, save_path)

In [None]:
def show_feature_importance_results(model_name, model, X_train, save_path=None):
    print_top_5_feature_importances(model_name, model, X_train)
    top_5_features = top_5_feature_importances(model, X_train)[1]
    top_5_importances_df = convert_top_5_feature_importances_to_df(top_5_features).round(4)
    if not save_path:
        save_path = os.path.join('results', f'{model_name.lower()}_top_5_features.png')

    create_matplotlib_table(top_5_importances_df, save_path)

In [None]:
def show_confusion_matrix_results(model_name, y_test, y_pred, save_path=None):
    print_confusion_matrix(model_name, y_test, y_pred)
    if not save_path:
        save_path = os.path.join('results', f'{model_name.lower()}_confusion_matrix.png')
    plot_confusion_matrix(model_name, y_test, y_pred, save_path)

In [None]:
def train_and_evaluate_models(data):
    X_train, X_test, y_train, y_test = split_data(data)

    models = {
        'Random Forest': RandomForestClassifier(random_state=42),
        'AdaBoost': AdaBoostClassifier(random_state=42),
        'XGBoost': XGBClassifier(random_state=42, eval_metric='mlogloss')
    }

    # Using StratifiedKFold to maintain class distribution across folds
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    best_model, best_model_name = find_best_model(models, X_train, X_test, y_train, y_test, cv)

    X_train_resampled, X_test, y_train_resampled, y_test = split_data(data, oversample=True)
    best_accuracy, best_cv_accuracy, best_report, best_y_pred = train_and_evaluate_model(best_model, X_train_resampled, X_test, y_train_resampled, y_test, cv)

    best_report_save_path = os.path.join('results', f'{best_model_name.lower()}_classification_report_oversampled_data.png')
    show_model_report_results(best_model_name, best_accuracy, best_cv_accuracy, best_report, save_path=best_report_save_path)

    best_feature_importance_save_path = os.path.join('results', f'{best_model_name.lower()}_top_5_features_oversampled_data.png')
    show_feature_importance_results(best_model_name, best_model, X_train_resampled, save_path=best_feature_importance_save_path)

    best_confusion_matrix_save_path = os.path.join('results', f'{best_model_name.lower()}_confusion_matrix_oversampled_data.png')
    show_confusion_matrix_results(best_model_name, y_test, best_y_pred, save_path=best_confusion_matrix_save_path)

In [None]:
def main():
    output_data = utils.pandas_load_csv("output_data.csv")
    
    train_and_evaluate_models(output_data)

In [None]:
if __name__ == '__main__':
    main()