# Import libraries

In [None]:
import pandas as pd
import tensorflow as tf
from pathlib import Path
from matplotlib import pyplot as plt
from sklearn.pipeline import Pipeline, FunctionTransformer, FeatureUnion
from sklearn.discriminant_analysis import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.base import ClassifierMixin
import numpy as np
from sklearn.feature_selection import RFECV
from sklearn.tree import DecisionTreeClassifier
from sklearn.calibration import LabelEncoder
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, StratifiedKFold
import warnings

SEED = 42

# Load data

In [None]:
data_path = Path('datasets/cleaned_data.csv')
data = pd.read_csv(data_path)
data = data.drop('ID', axis=1)
labels = data.pop('Label')

data.head

In [None]:
data.info()

In [None]:
data.describe().T

In [None]:
labels.value_counts().values

In [None]:
fig, ax = plt.subplots()

barplot = ax.bar(labels.value_counts().keys(), labels.value_counts().values)

for bar in barplot:
    text = bar.get_height()
    ax.text(bar.get_x() + bar.get_width() / 2,
            text, text, ha='center', va='bottom')


plt.show()

# Feature engineering

In [None]:
def energy_per_km(row):
    if row['Distance'] > 0:
        return row['Energy'] / row['Distance']
    else:
        return 0
    
data['Energy Per Km'] = data.apply(energy_per_km, axis=1)

In [None]:
def show_hist_based_on_labels(feature):
    classes = labels.unique()

    colors = ['blue', 'green', 'red', 'orange'] # 4 most popular labels

    for class_, color in zip(classes, colors):
        subset = data[labels == class_]
        plt.hist(subset[feature], bins=10, alpha=0.33, label=class_, color=color, histtype='bar')

    plt.xlabel(feature)
    plt.legend()
    plt.grid(True)
    plt.show()

feature = 'Energy Per Km'
show_hist_based_on_labels(feature) 
data = data.drop(feature, axis=1)

# Preprocessing

In [None]:
num_pipeline = Pipeline([
    ('scaler', StandardScaler()),
])

num_pipeline

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.decomposition import PCA


class Reducter(BaseEstimator, TransformerMixin):
    def __init__(self, n_components=2):
        self.n_components = n_components
        self.pca = PCA(n_components=self.n_components)

    def fit(self, X, Y=None):
        self.pca.fit(X)
        return self

    def transform(self, X):
        return self.pca.transform(X)
    
    def get_feature_names_out(self, input_features=None):
        return [f'Reducted#{i+1}' for i in range(self.n_components)]

In [None]:
reducter_pipeline = Pipeline([
    ('num_pipeline', num_pipeline),
    ('reducter', Reducter(n_components=2)),
])

reducter_pipeline

In [None]:
def energy_per_km(features):
    energy_index = data.columns.to_list().index('Energy')
    distance_index = data.columns.to_list().index('Distance')

    distance = features[:, distance_index]
    energy = features[:, energy_index]
    safe_distance = np.where(distance == 0, np.nan, distance)

    result = energy / safe_distance
    result = np.nan_to_num(result, nan=0.0)
    return result.reshape(-1, 1)

feature_pipeline = Pipeline([
    ('energy_per_km', FunctionTransformer(energy_per_km, feature_names_out=lambda _, __: ['Energy Per Km'])),
    ('scaler', StandardScaler()),
])

feature_pipeline

In [None]:
union = FeatureUnion([
    ('num', num_pipeline),
    ('reducter', reducter_pipeline),
    ('feature', feature_pipeline),
])

union

In [None]:
preprocess = Pipeline([
    ('imputer', SimpleImputer()),
    ('union', union)
])

preprocess

In [None]:
pd.DataFrame(preprocess.fit_transform(data), columns=preprocess.get_feature_names_out())

# Feature selection

In [None]:
feature_selection = Pipeline([
    ('preprocessing', preprocess),
    ('selection', RFECV(estimator=DecisionTreeClassifier(random_state=SEED), cv=StratifiedKFold(3, shuffle=True, random_state=SEED), scoring='accuracy'))
])

feature_selection

In [None]:
le = LabelEncoder()
labels = le.fit_transform(labels)
pd.DataFrame(feature_selection.fit_transform(data, labels), columns=feature_selection.get_feature_names_out())

# RandomForestClassifier

In [None]:
warnings.filterwarnings("ignore")

forest_pipeline = Pipeline([
    ('feature_selection', feature_selection),
    ('forest', RandomForestClassifier(random_state=SEED))
])


forest_params = [
    {
        'forest__n_estimators': [15, 50, 100],
        'forest__max_depth': [None, 15, 50],
    },
]

forest_searching = GridSearchCV(forest_pipeline, forest_params, cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED), scoring='accuracy', refit=True)
forest_searching.fit(data, labels)

print(forest_searching.best_params_)
pd.DataFrame(forest_searching.cv_results_).sort_values(by='rank_test_score', ascending=True)

In [None]:
print(list(zip(range(len(le.classes_)), le.classes_)))

model = forest_searching.best_estimator_ 
preds = model.predict(data)
print(f'Accuracy: {accuracy_score(labels, preds)}')
ConfusionMatrixDisplay(confusion_matrix(labels, preds)).plot()

# Neural Network

In [None]:
input_shape = feature_selection.fit_transform(data, labels).shape[1:]

In [None]:
class ModelBuilder:
    def __init__(self, optimizer=tf.keras.optimizers.Adam(), units=8, activation='relu', layers=1):
        self.optimizer = optimizer
        self.units = units
        self.activation = activation
        self.layers = layers

    def __call__(self, input_shape):
        input_layer = tf.keras.layers.Input(shape=input_shape)

        x = tf.keras.layers.Dense(units=self.units, activation=self.activation)(input_layer)

        for _ in range(self.layers-1):
            x = tf.keras.layers.Dense(units=self.units, activation=self.activation)(x)

        model = tf.keras.layers.Dense(units=len(le.classes_), activation='softmax')(x)

        model = tf.keras.Model(inputs=input_layer, outputs=model)

        model.compile(loss='categorical_crossentropy',
                    optimizer=self.optimizer,
                    metrics=['accuracy'])

        return model

    def set_params(self, **params):
        for key, value in params.items():
            setattr(self, key, value)
        return self

    def get_params(self, deep=True):
        return {
            'optimizer': self.optimizer,
            'units': self.units,
            'activation': self.activation,
            'layers': self.layers,
        }

In [None]:
class TFClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, build_model, epochs=100, batch_size=8, verbose=0, **build_kwargs):
        self.build_model = build_model
        self.epochs = epochs
        self.batch_size = batch_size
        self.verbose = verbose
        self.build_kwargs = build_kwargs
        self.model_ = None

    def fit(self, X, y, **fit_params):
        input_shape = X.shape[1:]
        self.model_ = self.build_model(input_shape=input_shape, **self.build_kwargs)

        if len(y.shape) == 1 or y.shape[1] == 1:
            num_classes = np.max(y) + 1
            y = tf.keras.utils.to_categorical(y, num_classes)

        self.model_.fit(X, y,
                        epochs=self.epochs,
                        batch_size=self.batch_size,
                        verbose=self.verbose,
                        **fit_params)
        return self

    def predict(self, X):
        probs = self.model_.predict(X, verbose=1)
        return np.argmax(probs, axis=1)

    def predict_proba(self, X):
        return self.model_.predict(X, verbose=1)

    def score(self, X, y):
        y_pred = self.predict(X)
        if len(y.shape) > 1 and y.shape[1] > 1:
            y = np.argmax(y, axis=1)
        return np.mean(y_pred == y)

In [None]:
make_model = ModelBuilder()

model = TFClassifier(make_model, epochs=50, batch_size=8, verbose=0)

nn_classifier = Pipeline([
    ('selection', feature_selection),
    ('nn', model),
])

nn_classifier

In [None]:
nn_params = {
    'nn__build_model__units': [4, 8, 16, 32, 128, 256],
    'nn__build_model__layers': [1, 2, 4, 8, 16],
    'nn__build_model__optimizer': [tf.keras.optimizers.Adam(), tf.keras.optimizers.Nadam(), tf.keras.optimizers.SGD()],
    'nn__build_model__activation': ['relu', 'sigmoid', 'tanh'],
}

nn_searching = RandomizedSearchCV(nn_classifier, nn_params, cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED), n_iter=12, n_jobs=1, refit=True)

In [None]:
nn_searching.fit(data, labels)

In [None]:
print(nn_searching.best_params_)
pd.DataFrame(nn_searching.cv_results_).sort_values(by='rank_test_score', ascending=True)

In [None]:
print(list(zip(range(len(le.classes_)), le.classes_)))

model = nn_searching.best_estimator_ 
preds = model.predict(data)
print(f'Accuracy: {accuracy_score(labels, preds)}')
ConfusionMatrixDisplay(confusion_matrix(labels, preds)).plot()