In [None]:
import warnings
import pandas as pd
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import RandomOverSampler 
from sklearn.metrics import make_scorer, accuracy_score, f1_score
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import LabelEncoder
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
from itertools import product
from sklearn.feature_selection import SelectKBest
from sklearn.model_selection import StratifiedKFold
from imblearn.metrics import geometric_mean_score
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")

# Specify of hyperparameters

In [None]:
model_parameter_rules = {
    SVC: [
        (
            {},
            {
                "C": [0.5, 0.75, 1, 1.5],
                "kernel": ["linear"],
                "class_weight": ["balanced"],
            },
        ),
        ({"kernel":{"poly"}}, {"degree": [2, 3, 4]}), 
        ({"kernel": {"rbf", "sigmoid"}}, {"gamma": ["auto"]}),
    ],
    KNeighborsClassifier: [
        (
            {},
            {
                "n_neighbors": [3, 5, 7, 9, 11, 15],
                "weights": ["uniform", "distance"],
                "metric": [
                    "cityblock",
                    "cosine",
                    "l1",
                    "l2",
                    "nan_euclidean",
                ],
            },
        ),
        ({"metric": ["minkowski"]}, {"p": [1, 2, 3, 4]}),
    ],
    RandomForestClassifier: [
        (
            {},
            {
                "n_estimators": [50, 100, 200, 400, 800, 1600],
                "criterion": ["gini", "entropy", "log_loss"],
                "max_features": ["sqrt", "log2", None],
            },
        )
    ],
    XGBClassifier: [
        (
            {},
            {
                "n_estimators": [25, 50, 100, 200, 400, 800],
                "grow_policy": ["depthwise", "lossguide"],
                "learning_rate": [0.01, 0.1, 1],
            },
        )
    ]
}

# Extracted code from PADDEL library

In [None]:
class HashableDict(dict):
    def __hash__(self):
        return hash(tuple(sorted(self.items())))


def expand_rules(parameter_rules: list) -> dict:
    """Converts list of conditions and options to a dictionary that is easier to
    parse.

    Args:
        parameter_rules (list): Rules to expand.

    Returns:
        dict: Expanded rules.
    """
    expanded_rules = {}

    for conditions, parameters in parameter_rules:
        for values in product(*conditions.values()):
            simple_conditions = HashableDict(
                zip(conditions.keys(), [tuple([v]) for v in values])
            )

            if simple_conditions not in expanded_rules:
                expanded_rules[simple_conditions] = {}

            expanded_rules[simple_conditions].update(parameters)

    return expanded_rules


def matches(conditions: dict, other_conditions: dict) -> bool:
    """Determines if a condition dictionary matches within another.

    Args:
        conditions (dict): Conditions to match.
        other_conditions (dict): Conditions to match with.

    Returns:
        bool: If condition matches.
    """
    return conditions.items() > other_conditions.items()


def merge_parameters(one: dict, other: dict) -> dict:
    """Merge parameters from one dict to another.

    Args:
        one (dict): Dict to merge into.
        other (dict): Dict to merge from.

    Returns:
        dict: Merged dictionary.
    """
    one = one.copy()
    for key in other:
        if key in one:
            one[key] += other[key]
        else:
            one[key] = other[key]

    return one


def parse_hyper_parameters(parameter_rules: list, prefix="") -> list:
    """Parses custom formatted parameter rules to sklearn compatible parameter grid.

    Args:
        parameter_rules (list): Parameter rules.
        prefix (str, optional): Prefix to be used when naming the parameters. Useful when working with pipelines. Defaults to "".

    Returns:
        list: Parameter grid.
    """
    parameter_rules = expand_rules(parameter_rules)

    param_grid = []

    for conditions in parameter_rules:
        parameters = parameter_rules[conditions].copy()
        for other_conditions in parameter_rules:
            if matches(conditions, other_conditions):
                parameters = merge_parameters(
                    parameters, parameter_rules[other_conditions]
                )

        parameters.update({k: list(v) for k, v in conditions.items()})

        param_grid.append(parameters)

    # Rename to match model if using pipeline
    for params in param_grid:
        for key in list(params):
            params[f"{prefix}{key}"] = params.pop(key)

    return param_grid

In [None]:
def load_data():
    """
    Load and concatenate multiple CSV files into a single DataFrame.

    Returns:
    data (pandas.DataFrame): The concatenated DataFrame containing data from misc_df, classic_df, and fresh_df.
    """
    misc_df = pd.read_csv('misc_df.csv')
    classic_df = pd.read_csv('classic_df.csv')
    fresh_df = pd.read_csv('fresh_df.csv')
    data = pd.concat([misc_df, classic_df, fresh_df], axis=1)
    return data

data = load_data()
data.head()

In [None]:

def print_value_counts():
    """
    Prints the value counts for the 'lent' and 'amp' columns in the 'data' DataFrame.
    """
    value_counts_lent = data['lent'].value_counts()
    value_counts_amp = data['amp'].value_counts()

    fig, ax = plt.subplots(1, 2, figsize=(10, 4))
    ax[0].bar(value_counts_lent.index, value_counts_lent.values)
    ax[0].set_xlabel('Class')
    ax[0].set_ylabel('Instances')
    ax[0].set_title('Number of instances per class in "lent"')

    ax[1].bar(value_counts_amp.index, value_counts_amp.values)
    ax[1].set_xlabel('Class')
    ax[1].set_ylabel('Instances')
    ax[1].set_title('Number of instances per class in "amp"')

    plt.tight_layout()
    plt.show()

In [None]:
print_value_counts()


In [None]:
def preprocess_data(data):
    """
    Preprocesses the given data by removing rows and columns based on certain conditions.

    Args:
        data (pandas.DataFrame): The input data to be preprocessed.

    Returns:
        pandas.DataFrame: The preprocessed data.
    """
    index_to_remove = data[(data['lent'] == 4) | (data['amp'] == 4) | (data['age'] == 'XX')].index
    data = data.drop(index_to_remove)
    data = data[data['detection_time'] >= 15]
    columns_to_remove = ['sample_name', 'date', 'video_path']
    data = data.drop(columns=columns_to_remove)
    return data
data=preprocess_data(data)

In [None]:
index_to_remove=data[(data['lent']==3)].index
data=data.drop(index_to_remove)

In [None]:
print_value_counts()

In [None]:
def encode_data(data):
    """
    Performs label encoding and data type conversion.

    Args:
        data (pandas.DataFrame): The input data to be preprocessed.

    Returns:
        pandas.DataFrame: The preprocessed data.
    """
    label_encoder = LabelEncoder()
    data['hand'] = label_encoder.fit_transform(data['hand'])
    data['gender'] = label_encoder.fit_transform(data['gender'])
    data['handedness'] = label_encoder.fit_transform(data['handedness'])
    data = data.drop(columns="angle__query_similarity_count__query_None__threshold_0.0")
    data['age'] = data['age'].astype(int)
    return data


In [None]:
data = encode_data(data)

# Model selection

## Slowness

In [None]:
n_folds=5
random_state=42
k_neighbors=3
lent_data=data.drop(columns=['amp'])
X=data.drop(columns=['lent'])
y=data['lent']
all_results_smote_lent=[]
all_results_ros_lent=[]
models = {
    "svm_linear": SVC,
    "xgboost":XGBClassifier,
    "knn": KNeighborsClassifier,
    "rf": RandomForestClassifier
}
smote = SMOTE(random_state=random_state,k_neighbors=k_neighbors)
ros = RandomOverSampler(random_state=random_state)
skf=StratifiedKFold(n_splits=n_folds)
accuracy_scorer = make_scorer(accuracy_score)
f1_scorer = make_scorer(f1_score, average="weighted")
gmean_scorer = make_scorer(geometric_mean_score, average='weighted')
for train_index, test_index in skf.split(X, y):
    
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]
    SelectKBest(k=320).fit_transform(X_train, y_train)
    X_res, y_res = ros.fit_resample(X_train, y_train)
    X_res_s, y_res_s = smote.fit_resample(X_train, y_train)
    for model_name, model in models.items():
        model_param_grid = parse_hyper_parameters(model_parameter_rules[model])
        grid = GridSearchCV(
            estimator=model(),
            param_grid=model_param_grid,
            scoring={
                "g_mean":gmean_scorer,
            },
            refit="g_mean",
            n_jobs=10,
            cv=StratifiedKFold(n_splits=2),
            verbose=0
        )
        print(f"Doing dataset: full, model: {model_name}, features: 320")
        grid.fit(X_res, y_res)
        best_estimator = grid.best_estimator_
        accuracy = accuracy_scorer(best_estimator,X_test, y_test)
        f1 = f1_scorer(best_estimator,X_test, y_test)
        gmean = gmean_scorer(best_estimator, X_test, y_test)
        
        all_results_ros_lent.append({
            "dataset": "full",
            "model": model_name,
            "features": 320,
            "accuracy": accuracy,
            "f1": f1,
            "gmean": gmean,
            "parameters": grid.best_params_
        })
        grid.fit(X_res_s, y_res_s)
        best_estimator = grid.best_estimator_
        accuracy = accuracy_scorer(best_estimator,X_test, y_test)
        f1 = f1_scorer(best_estimator,X_test, y_test)
        gmean = gmean_scorer(best_estimator, X_test, y_test)
        all_results_smote_lent.append({
            "dataset": "full",
            "model": model_name,
            "features": 320,
            "accuracy": accuracy,
            "f1": f1,
            "gmean": gmean,
            "parameters": grid.best_params_
        })


In [None]:
df = pd.DataFrame(all_results_smote_lent)
df.to_csv("./all_results_smote_lent.csv", index=False)
df = pd.DataFrame(all_results_ros_lent)
df.to_csv("./all_results_ros_lent.csv", index=False)

## Amplitude

In [None]:
n_folds=5
random_state=42
k_neighbors=3
lent_data=data.drop(columns=['lent'])
X=data.drop(columns=['amp'])
y=data['amp']
all_results_smote_amp=[]
all_results_ros_amp=[]
models = {
    "svm_linear": SVC,
    "xgboost":XGBClassifier,
    "knn": KNeighborsClassifier,
    "rf": RandomForestClassifier
}
smote = SMOTE(random_state=random_state,k_neighbors=k_neighbors)
ros = RandomOverSampler(random_state=random_state)
skf=StratifiedKFold(n_splits=n_folds)
accuracy_scorer = make_scorer(accuracy_score)
f1_scorer = make_scorer(f1_score, average="weighted")
gmean_scorer = make_scorer(geometric_mean_score, average='weighted')
for train_index, test_index in skf.split(X, y):
    
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]
    SelectKBest(k=320).fit_transform(X_train, y_train)
    X_res, y_res = ros.fit_resample(X_train, y_train)
    X_res_s, y_res_s = smote.fit_resample(X_train, y_train)
    for model_name, model in models.items():
        model_param_grid = parse_hyper_parameters(model_parameter_rules[model])
        grid = GridSearchCV(
            estimator=model(),
            param_grid=model_param_grid,
            scoring={
                "g_mean":gmean_scorer,
            },
            refit="g_mean",
            n_jobs=10,
            cv=StratifiedKFold(n_splits=2),
            verbose=0
        )
        print(f"Doing dataset: full, model: {model_name}, features: 320")
        grid.fit(X_res, y_res)
        best_estimator = grid.best_estimator_
        accuracy = accuracy_scorer(best_estimator,X_test, y_test)
        f1 = f1_scorer(best_estimator,X_test, y_test)
        gmean = gmean_scorer(best_estimator, X_test, y_test)
        
        all_results_ros_amp.append({
            "dataset": "full",
            "model": model_name,
            "features": 320,
            "accuracy": accuracy,
            "f1": f1,
            "gmean": gmean,
            "parameters": grid.best_params_
        })
        grid.fit(X_res_s, y_res_s)
        best_estimator = grid.best_estimator_
        accuracy = accuracy_scorer(best_estimator,X_test, y_test)
        f1 = f1_scorer(best_estimator,X_test, y_test)
        gmean = gmean_scorer(best_estimator, X_test, y_test)
        all_results_smote_amp.append({
            "dataset": "full",
            "model": model_name,
            "features": 320,
            "accuracy": accuracy,
            "f1": f1,
            "gmean": gmean,
            "parameters": grid.best_params_
        })

In [None]:
df = pd.DataFrame(all_results_smote_amp)
df.to_csv("./all_results_smote_amp.csv", index=False)
df = pd.DataFrame(all_results_ros_amp)
df.to_csv("./all_results_ros_amp.csv", index=False)