In [None]:
training_dataset_name = 'train2023.csv'
testing_dataset_name = 'test2023.csv'

In [None]:
import os
import pandas as pd

train_df = pd.read_csv(os.path.join('data', training_dataset_name), sep=';', header=None)
test_df = pd.read_csv(os.path.join('data', testing_dataset_name), sep=';', header=None)

classes = [1, 2, 3]

In [None]:
import logging

logging.basicConfig(level=logging.DEBUG)

In [None]:
import numpy as np

from sklearn.base import BaseEstimator


class OutlierTransformer:

    def __init__(self, outlier_detector, class_labels):
        self._class_labels = class_labels
        self._outlier_detector = outlier_detector

    def fit_transform(self, X, y, logging_level):
        before = X.shape
        X_separated_by_class = {i: X[y == i, :] for i in self._class_labels}
        X_separated_by_class_cleared = {i:
                                            X_separated_by_class[i][
                                            self._outlier_detector.fit_predict(X=X_separated_by_class[i]) == 1, :] for i
                                        in
                                        self._class_labels}
        X = np.vstack(list(X_separated_by_class_cleared.values()))
        after = X.shape
        y = np.hstack([np.full(X_separated_by_class_cleared[i].shape[0], i) for i in self._class_labels])
        logging.log(level=logging_level, msg=f'CLEARING OUTLIERS: {before} -> {after}')

        return X, y

In [None]:
from sklearn.pipeline import Pipeline


class Scheme:

    def __init__(self, pipeline: Pipeline, outlier_detector=None, class_labels=classes, hyperparams_str: str = None):
        if outlier_detector is not None:
            self._outlier_transformer = OutlierTransformer(outlier_detector=outlier_detector, class_labels=classes)
        else:
            self._outlier_transformer = None
        self._pipeline = pipeline

        self.hyperparams = hyperparams_str

    def fit(self, X, y, logging_level=logging.INFO):
        if self._outlier_transformer is not None:
            X, y = self._outlier_transformer.fit_transform(X=X, y=y, logging_level=logging_level)
        self._pipeline.fit(X=X, y=y)

    def predict(self, X):
        return self._pipeline.predict(X=X)
    
    def predict_proba(self, X): 
        return np.max(self._pipeline.predict_proba(X=X), axis=1)

In [None]:
from sklearn.model_selection import StratifiedKFold


def tune_hyperparams(schemas, X, y, metric: callable, n_splits: int = 5):
    skf = StratifiedKFold(n_splits=n_splits)
    best_scheme = None
    best_score = None
    for scheme in schemas:
        score = 0
        logging.debug(scheme.hyperparams)
        for train, valid in skf.split(X, y):
            scheme.fit(X=X[train], y=y[train], logging_level=logging.DEBUG)
            y_predict = scheme.predict(X[valid])
            score += metric(y[valid], y_predict)
        if best_score is None or score > best_score:
            best_score = score
            best_scheme = scheme
        logging.debug('metric = ' + str(score / n_splits))
    return best_scheme, best_score / n_splits

In [None]:
X_train, y_train = train_df.iloc[:, :-1], train_df.iloc[:, -1]
X_train = X_train.to_numpy()
y_train = y_train.to_numpy()
X_test = test_df.to_numpy()

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import make_scorer
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import roc_auc_score

metrics = {"ACCURACY": accuracy_score, "F1": f1_score, "PRECISION": precision_score, "RECALL": recall_score,
           "AUC": roc_auc_score}
scorers = {metric_name: make_scorer(metric_callable) for metric_name, metric_callable in metrics.items()}

In [None]:
def is_iterable(obj):
    try:
        iter(obj)
        return True
    except TypeError:
        return False

In [None]:
def train_and_test(scheme, result_filename: str, hyperparams_metric=accuracy_score):
    if is_iterable(scheme):
        logging.info('HYPERPARAMS TUNING')
        scheme, hyperparams_tuning_metric = tune_hyperparams(schemas=scheme, X=X_train, y=y_train,
                                                             metric=accuracy_score)
        logging.info('HYPERPARAMS TUNING: ' + scheme.hyperparams + '. METRIC: ' + str(hyperparams_tuning_metric))

    logging.info('TRAINING')
    scheme.fit(X=X_train, y=y_train)

    logging.info('PREDICTING')
    y_predict = scheme.predict(X=X_test)
    y_predict_proba = scheme.predict_proba(X=X_test)
    y_predict_df = pd.DataFrame(data={'class': y_predict, 'certainty': y_predict_proba})
    y_predict_df.to_csv(os.path.join('result', result_filename + '.csv'), header=True, index=False, mode='w')

    return scheme, y_predict

In [None]:
from sklearn.feature_selection import mutual_info_classif
from sklearn.feature_selection import SelectPercentile
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler

schemes_knn = [Scheme(pipeline=Pipeline(steps=[
    ('scaler', MinMaxScaler()),
    ('feature_selector', SelectPercentile(score_func=mutual_info_classif, percentile=50)),
    ('classifier', KNeighborsClassifier(n_neighbors=n, weights='distance'))
]), outlier_detector=LocalOutlierFactor(n_neighbors=n), hyperparams_str=f'number of neighbours = {n}') for n in range(1, 2)]
train_and_test(scheme=schemes_knn, result_filename='KNN')

In [None]:
from sklearn.neural_network import MLPClassifier 

schemes_mlp = []
for i in range(15, 16):
    for j in range(10, 11):
        for k in range(5, 6):
            schemes_mlp.append(Scheme(pipeline=Pipeline(steps=[
                ('scaler', MinMaxScaler()),
                ('feature_selector', SelectPercentile(score_func=mutual_info_classif, percentile=50)),
                ('classifier', MLPClassifier(
                                    hidden_layer_sizes=[50 * i, 50 * j, 50 * k],
                                    max_iter=1000,
                                ))]), outlier_detector=LocalOutlierFactor(n_neighbors=15), hyperparams_str=f'layers = ({i}, {j}, {k})'))
train_and_test(scheme=schemes_mlp, result_filename='MLP')

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import IsolationForest

scheme_tree = Scheme(pipeline=Pipeline(steps=[
    ('scaler', MinMaxScaler()),
    ('feature_selector', SelectPercentile(score_func=mutual_info_classif, percentile=50)),
    ('classifier', DecisionTreeClassifier(criterion='entropy', min_samples_split = 0.05, ))
]), outlier_detector=IsolationForest(n_jobs=-1))    
train_and_test(scheme=scheme_tree, result_filename='Decision tree')

In [None]:
from sklearn.linear_model import LogisticRegression

scheme_lr = Scheme(
    pipeline=Pipeline(
        steps=[
            ("scaler", MinMaxScaler()),
            ('feature_selector', SelectPercentile(score_func=mutual_info_classif, percentile=50)),  # normalize each feature independently
            ("classifier", LogisticRegression()),
        ]
    )
)
train_and_test(scheme=scheme_lr, result_filename='Logistic regression')