In [1]:
# imports and settings
import pandas as pd
import warnings
from time import time
from sklearn.model_selection import train_test_split as split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import balanced_accuracy_score, confusion_matrix, precision_score, recall_score, \
    ConfusionMatrixDisplay, roc_auc_score
from pickle import dump as store_model
from pickle import load as load_model
from os.path import isdir as is_dir
from os import mkdir
from sklearn.metrics import classification_report
from sklearn.neighbors import KNeighborsClassifier

warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

# define variables and settings
DATA_PATH = "data/processed data/cleaned_data.csv"
HOLDOUT_PATH = "data/processed data/holdout_cleaned.csv"

MODEL_DIR = "models"
METRIC_DIR = "metrics"
TRAIN_DATA = "data/processed data/cleaned_train.csv"
TEST_DATA = "data/processed data/cleaned_test.csv"

SAVE_MODELS = False # save trained models
LOAD_MODELS = False # load pre-trained models or train new ones
DO_LOGGING = True

DO_KNN = True
DO_RF = True

# k-Nearest Neighbor settings
N_NEAREST_NEIGHBORS = 2
DISTANCE_TYPE = 'euclidean' # euclidean or manhattan

# random forest settings
N_ESTIMATORS = 100
MAX_LEAF_NODES = None
MAX_DEPTH = None

# define functions
def log(text: str):
    if DO_LOGGING:
        print(f"{text}  {time_check()}")
    else:
        pass

time_checkpoint = time()
def time_check():
    global time_checkpoint
    out = "["
    time_elapsed = round(time() - time_checkpoint, 2)
    time_checkpoint = time()
    if time_elapsed > 60:
        out += f"{int(time_elapsed/60)} min, "
    out += f"{round(time_elapsed % 60, 2)} sec]"
    return out

def get_metrics(train_pred, test_pred, model_type):

    assert model_type in ["k Nearest Neighbor", "Random Forest"]
    output = ""
    # displaying settings
    output += "# Settings\n"
    output += f"**Model**: {model_type}\n"
    if model_type == 'k Nearest Neighbor':
        output += f"- Num Neighbors = {N_NEAREST_NEIGHBORS}\n"
        output += f"- Distance = {DISTANCE_TYPE}\n"
    elif  model_type == "Random Forest":
        output += f"- Num Estimators: {N_ESTIMATORS}\n"
        output += f"- Max Depth: {MAX_DEPTH}\n"
        output += f"- Max Leaf Nodes: {MAX_LEAF_NODES}\n"

    output += f"\n**Features**: "
    for col in TRAIN_X.columns:
        output += f" '{col}' "
    output += "\n"

    for title, y, pred in [["Test", TEST_Y, test_pred], ["Train", TRAIN_Y, train_pred]]:

        # calculating metrics
        b_accuracy = balanced_accuracy_score(y, pred)
        conf_matrix = confusion_matrix(y, pred)
        class_report = classification_report(y, pred, output_dict=True)

        # displaying metrics in MD formatting (uses table formatting)
        output += f"\n---\n# {title} metrics\n"
        output += "### Classification Report:\n"
        output += (f"| | Precision | Recall | f1-Score | Support |\n"
                   f"| ---: | :---: | :---: | :---: | :---: |\n"
                   f"| **DEM** | {round(class_report['DEMOCRAT']['precision'], 2)} | {round(class_report['DEMOCRAT']['recall'], 2)} | {round(class_report['DEMOCRAT']['f1-score'], 2)} | {int(class_report['DEMOCRAT']['support'])} |\n"
                   f"| **IND** | {round(class_report['INDEPENDENT']['precision'], 2)} | {round(class_report['INDEPENDENT']['recall'], 2)} | {round(class_report['INDEPENDENT']['f1-score'], 2)} | {int(class_report['INDEPENDENT']['support'])} |\n"
                   f"| **REP** | {round(class_report['REPUBLICAN']['precision'], 2)} | {round(class_report['REPUBLICAN']['recall'], 2)} | {round(class_report['REPUBLICAN']['f1-score'], 2)} | {int(class_report['REPUBLICAN']['support'])} |\n"
                   f"| **Macro Avg** | {round(class_report['macro avg']['precision'], 2)} | {round(class_report['macro avg']['recall'], 2)} | {round(class_report['macro avg']['f1-score'], 2)} | {int(class_report['macro avg']['support'])} |\n"
                   f"| **Weighted Avg** | `{round(class_report['weighted avg']['precision'], 2)}` | `{round(class_report['weighted avg']['recall'], 2)}` | `{round(class_report['weighted avg']['f1-score'], 2)}` | {int(class_report['weighted avg']['support'])} |\n\n")
        output += f"**Accuracy**: {round(class_report['accuracy'], 2)}\t**Balanced Accuracy**: {round(b_accuracy, 2)}\n"

        output += f"### **Confusion Matrix**:\n"
        output += (f"| **True / Predicted** | DEM | IND | REP |\n"
                   f"| --: | :---: | :---: | :---: |\n"
                   f"| **DEM** | `{conf_matrix[0][0]}` | {conf_matrix[0][1]} | {conf_matrix[0][2]} |\n"
                   f"| **IND** | {conf_matrix[1][0]} | `{conf_matrix[1][1]}` | {conf_matrix[1][2]} |\n"
                   f"| **REP** | {conf_matrix[2][0]} | {conf_matrix[2][1]} | `{conf_matrix[2][2]}` |\n")

    return output

In [2]:
# load data
time_check()
TRAIN = pd.read_csv(TRAIN_DATA)
log("Loaded training data")
TEST = pd.read_csv(TEST_DATA)
log("Loaded test data")

TRAIN_X = TRAIN.drop(['party'], axis='columns')
TRAIN_Y = TRAIN['party']

TEST_X = TEST.drop(['party'], axis='columns')
TEST_Y = TEST['party']

log("Split all data into X and Y frames")

Loaded training data  [0.3 sec]
Loaded test data  [0.27 sec]
Split all data into X and Y frames  [0.01 sec]


In [3]:
# k-Nearest Neighbor
time_check()
if DO_KNN:
    if LOAD_MODELS:
        with open (f"{MODEL_DIR}/kNN.pkl", "rb") as file:
            knn_model = load_model(file)
        log(f"Loaded kNN from {MODEL_DIR}/kNN.pkl")
    else:
        if DISTANCE_TYPE == 'euclidean':
            p = 2
        elif DISTANCE_TYPE == 'manhattan':
            p = 1
        else:
            log(f"kNN Distance type {DISTANCE_TYPE} not supported.")

        knn_model = KNeighborsClassifier(p=p, n_neighbors=N_NEAREST_NEIGHBORS)
        knn_model.fit(TRAIN_X, TRAIN_Y)
        log("Fitted kNN to training data")

        if SAVE_MODELS:
            with open(f"{MODEL_DIR}/kNN.pkl", "wb") as file:
                store_model(knn_model, file)
            log(f"Saved kNN to '{MODEL_DIR}/kNN.pkl'")

    knn_train_pred = knn_model.predict(TRAIN_X)
    log("kNN made predictions on training data")
    knn_test_pred = knn_model.predict(TEST_X)
    log("kNN made predictions on test data")

    with open(f"{METRIC_DIR}/kNN metrics.md", 'w') as file:
        file.write(get_metrics(knn_train_pred, knn_test_pred, "k Nearest Neighbor"))
    log(f"kNN metrics saved to '{METRIC_DIR}/kNN metrics.md'")


Fitted kNN to training data  [1.35 sec]
kNN made predictions on training data  [25.16 sec]
kNN made predictions on test data  [17.9 sec]
kNN metrics saved to 'metrics/kNN metrics.md'  [24.73 sec]


In [4]:
# random forest
time_check()
if DO_RF:
    if LOAD_MODELS:
        with open (f"{MODEL_DIR}/random_forest.pkl", "rb") as file:
            rf_model = load_model(file)
        log(f"Loaded random forest from {MODEL_DIR}/random_forest.pkl")
    else:
        rf_model = RandomForestClassifier(n_estimators=N_ESTIMATORS, max_depth=MAX_DEPTH, max_leaf_nodes=MAX_LEAF_NODES)
        rf_model.fit(TRAIN_X, TRAIN_Y)
        log("Fitted random forest to training data")

        if SAVE_MODELS:
            with open(f"{MODEL_DIR}/random_forest.pkl", "wb") as file:
                store_model(rf_model, file)
            log(f"Saved random forest to '{MODEL_DIR}/random_forest.pkl'")

    rf_train_pred = rf_model.predict(TRAIN_X)
    log("Random Forest made predictions on training data")
    rf_test_pred = rf_model.predict(TEST_X)
    log("Random Forest made predictions on test data")

    with open(f"{METRIC_DIR}/random_forest metrics.md", 'w') as file:
        file.write(get_metrics(rf_train_pred, rf_test_pred, "Random Forest"))
    log(f"Random Forest metrics saved to '{METRIC_DIR}/random_forest metrics.md'")


Fitted random forest to training data  [1 min, 4.88 sec]
Random Forest made predictions on training data  [8.57 sec]
Random Forest made predictions on test data  [9.22 sec]
Random Forest metrics saved to 'metrics/random_forest metrics.md'  [23.42 sec]
