In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import seaborn as sns
import sklearn
import plotly.graph_objects as go
import plotly.express as px

In [None]:
df = pd.read_csv("/content/Datasetfinal.csv")

In [None]:
df.replace('$', np.nan, inplace = True)
df.replace('$$', np.nan, inplace = True)
df['Timestamp'] = pd.to_datetime(df['Timestamp'],format="%d-%m-%Y %H:%M:%S")
df['PM2.5(ug/m3)'] = df['PM2.5(ug/m3)'].astype('float64')
df['PM10(ug/m3)'] = df['PM10(ug/m3)'].astype('float64')

In [None]:
df['Temp(C)'] = df['Temp(C)'].replace('$$', np.nan)  # Replace '$$' with NaN if needed
df['Temp(C)'] = df['Temp(C)'].str.replace('C', '')  # Remove 'C' from all values
df['Temp(C)'] = df['Temp(C)'].astype('float64')

In [None]:
df['Humi(%)'] = df['Humi(%)'].replace('$$', np.nan)  # Replace '$$' with NaN if needed
df['Humi(%)'] = df['Humi(%)'].str.replace('%', '')  # Remove '%' from all values
df['Humi(%)'] = df['Humi(%)'].astype('float64')

In [None]:
# Find the columns which contain strings
for column in df.columns:
    if pd.api.types.is_string_dtype(df[column]):
        print(column)

Target


In [None]:
{i: pd.api.types.is_string_dtype(df[i]) for i in df.columns}

{'Timestamp': False,
 'Temp(C)': False,
 'Humi(%)': False,
 'VOC': False,
 'PM1.0(ug/m3)': False,
 'PM2.5(ug/m3)': False,
 'PM10(ug/m3)': False,
 'CO2': False,
 'Target': True}

In [None]:
for column in df.columns:
    if pd.api.types.is_string_dtype(df[column]):
        df[column] = df[column].astype('category').cat.as_ordered()

In [None]:
# Check missing data ratio percentage

df.isna().sum() * 100.00 / len(df)

Timestamp       0.000000
Temp(C)         0.000000
Humi(%)         0.000000
VOC             0.000000
PM1.0(ug/m3)    0.000000
PM2.5(ug/m3)    0.000000
PM10(ug/m3)     0.000000
CO2             0.029218
Target          0.000000
dtype: float64

In [None]:
df.isna().sum()

Timestamp       0
Temp(C)         0
Humi(%)         0
VOC             0
PM1.0(ug/m3)    0
PM2.5(ug/m3)    0
PM10(ug/m3)     0
CO2             2
Target          0
dtype: int64

In [None]:
for column in df.columns:
    if pd.api.types.is_numeric_dtype(df[column]):
        print(column)

Temp(C)
Humi(%)
VOC
PM1.0(ug/m3)
PM2.5(ug/m3)
PM10(ug/m3)
CO2


In [None]:
# Check for which numeric columns have null values
for column in df.columns:
    if pd.api.types.is_numeric_dtype(df[column]) and df[column].isna().sum():
        print(column)

In [None]:
# Check which rows have null values in any column
null_rows_any_column = df[df.isnull().any(axis=1)]

# Display the rows with null values in any column
print(null_rows_any_column)


In [None]:
column_name = 'CO2'

# Remove rows with null values in the specified column
df = df.dropna(subset=[column_name])

# If you want to remove rows with null values across all columns, you can use:
# df = df.dropna()

# Reset the index after removing rows
df = df.reset_index(drop=True)

# Display the DataFrame after removing null values
print(df)

In [None]:
ax = df['Target'].value_counts().plot(kind = 'bar')
for container in ax.containers:
    ax.bar_label(container, fmt='%d', label_type='edge')

plt.xticks(rotation = 90);

In [None]:
# Dropping all missing rows
new_df = df

In [None]:
# Check for columns which aren't numeric or is categorical
# and print category codes
l = 0
for column in new_df.columns:
    if pd.api.types.is_categorical_dtype(new_df[column]):
        l += 1
        print(f'{column}: {dict(enumerate(df[column].cat.categories))}')
print(l)

In [None]:
# Turn all categorical variables into numbers and fill missing
for column in new_df.columns:
    if pd.api.types.is_categorical_dtype(new_df[column]):
        # Turn categories into numbers and add +1
        new_df[column] = pd.Categorical(new_df[column]).codes

In [None]:
new_df.head()

Unnamed: 0,Timestamp,Temp(C),Humi(%),VOC,PM1.0(ug/m3),PM2.5(ug/m3),PM10(ug/m3),CO2,Target
0,2023-12-28 08:38:39,22.59,87.62,0,408,201.0,442.0,813.0,5
1,2023-12-28 08:38:44,22.62,87.25,0,410,202.0,444.0,813.0,5
2,2023-12-28 08:38:49,22.66,87.06,0,408,201.0,441.0,814.0,5
3,2023-12-28 08:38:54,22.62,86.94,0,406,201.0,439.0,812.0,5
4,2023-12-28 08:38:59,22.66,86.5,0,404,200.0,437.0,814.0,5


# ML

## Original ML Pipeline Module

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, confusion_matrix
import time
np.random.seed(42)

class MultiModelEvaluator:
    def __init__(self, models):
        self.models = models
        self.model_names = list(models.keys())
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.metric_scores = {}

    def split_data(self, X, y, test_size=0.3, random_state=42):
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    def train_models(self):
        for model_name, model in self.models.items():
            print(f"\n================================================\n{model_name} model has started training")
            start = time.time()
            pipeline = Pipeline([
                ('scaler', StandardScaler()),
                ('classifier', model)
            ])
            pipeline.fit(self.X_train, self.y_train)
            self.models[model_name] = pipeline
            print(f"{model_name} model has ended training. Time -> {round(time.time() - start, 2)}s. Accuracy - > {(pipeline.score(self.X_test, self.y_test) * 100.00):.2f} %\n================================================\n")

    def evaluate_models(self, X_test, y_test):
        for model_name, pipeline in self.models.items():
            Train_y_pred = pipeline.predict(self.X_train)
            Train_accuracy = accuracy_score(self.y_train, Train_y_pred)
            Train_f1_macro = f1_score(self.y_train, Train_y_pred, average='macro')
            Train_f1_weighted = f1_score(self.y_train, Train_y_pred, average='weighted')
            Train_recall_macro = recall_score(self.y_train, Train_y_pred, average='macro')
            Train_recall_weighted = recall_score(self.y_train, Train_y_pred, average='weighted')
            Train_precision_macro = precision_score(self.y_train, Train_y_pred, average='macro')
            Train_precision_weighted = precision_score(self.y_train, Train_y_pred, average='weighted')
            Train_confusion = confusion_matrix(self.y_train, Train_y_pred)

            Test_y_pred = pipeline.predict(X_test)
            Test_accuracy = accuracy_score(y_test, Test_y_pred)
            Test_f1_macro = f1_score(y_test, Test_y_pred, average='macro')
            Test_f1_weighted = f1_score(y_test, Test_y_pred, average='weighted')
            Test_recall_macro = recall_score(y_test, Test_y_pred, average='macro')
            Test_recall_weighted = recall_score(y_test, Test_y_pred, average='weighted')
            Test_precision_macro = precision_score(y_test, Test_y_pred, average='macro')
            Test_precision_weighted = precision_score(y_test, Test_y_pred, average='weighted')
            Test_confusion = confusion_matrix(y_test, Test_y_pred)

            self.metric_scores[model_name] = {
                'Train Accuracy': Train_accuracy,
                'Train F1 Macro': Train_f1_macro,
                'Train F1 Weighted': Train_f1_weighted,
                'Train Recall Macro': Train_recall_macro,
                'Train Recall Weighted': Train_recall_weighted,
                'Train Precision Macro': Train_precision_macro,
                'Train Precision Weighted': Train_precision_weighted,
                'Train Confusion Matrix': Train_confusion,
                'Test Accuracy': Test_accuracy,
                'Test F1 Macro': Test_f1_macro,
                'Test F1 Weighted': Test_f1_weighted,
                'Test Recall Macro': Test_recall_macro,
                'Test Recall Weighted': Test_recall_weighted,
                'Test Precision Macro': Test_precision_macro,
                'Test Precision Weighted': Test_precision_weighted,
                'Test Confusion Matrix': Test_confusion
            }

    def get_metric_scores(self, model_name):
        return self.metric_scores.get(model_name, {})

## Hyper-Tuned Pipeline Module

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, confusion_matrix
import numpy as np
import time

np.random.seed(42)

class MultiModelEvaluatorWithTuning:
    def __init__(self, models, param_grids, n_iter_values = {}, n_jobs_values = {}, verbose_values = {}):
        self.models = models
        self.model_names = list(models.keys())
        self.param_grids = param_grids
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.n_iter_values = n_iter_values
        self.n_jobs_values = n_jobs_values
        self.verbose_values = verbose_values
        self.metric_scores = {}
        self.best_params = {}

    def split_data(self, X, y, test_size=0.3, random_state=42):
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    def train_models(self):
        for model_name, model in self.models.items():
            print(f"\n================================================\n{model_name} tuned model has started training")
            start = time.time()
            pipeline = Pipeline([
                ('scaler', StandardScaler()),
                ('classifier', model)
            ])
            if model_name not in self.n_iter_values:
                search = GridSearchCV(pipeline, self.param_grids[model_name], cv=5, n_jobs = self.n_jobs_values.get(model_name, -1), verbose = self.verbose_values.get(model_name, 1))
            else:
                search = RandomizedSearchCV(pipeline, self.param_grids[model_name], cv=5, n_jobs = self.n_jobs_values.get(model_name, -1), n_iter = self.n_iter_values.get(model_name, 10), verbose = self.verbose_values.get(model_name, 1))
            search.fit(self.X_train, self.y_train)
            best_model = search.best_estimator_
            self.models[model_name] = best_model
            self.best_params[model_name] = search.best_params_
            print(f"{model_name} tuned model has ended training. Time -> {round(time.time() - start, 2)}s. Accuracy - > {(best_model.score(self.X_test, self.y_test) * 100.00):.2f} %\n================================================\n")


    def evaluate_models(self, X_test, y_test):
        for model_name, pipeline in self.models.items():
            Train_y_pred = pipeline.predict(self.X_train)
            Train_accuracy = accuracy_score(self.y_train, Train_y_pred)
            Train_f1_macro = f1_score(self.y_train, Train_y_pred, average='macro')
            Train_f1_weighted = f1_score(self.y_train, Train_y_pred, average='weighted')
            Train_recall_macro = recall_score(self.y_train, Train_y_pred, average='macro')
            Train_recall_weighted = recall_score(self.y_train, Train_y_pred, average='weighted')
            Train_precision_macro = precision_score(self.y_train, Train_y_pred, average='macro')
            Train_precision_weighted = precision_score(self.y_train, Train_y_pred, average='weighted')
            Train_confusion = confusion_matrix(self.y_train, Train_y_pred)

            Test_y_pred = pipeline.predict(X_test)
            Test_accuracy = accuracy_score(y_test, Test_y_pred)
            Test_f1_macro = f1_score(y_test, Test_y_pred, average='macro')
            Test_f1_weighted = f1_score(y_test, Test_y_pred, average='weighted')
            Test_recall_macro = recall_score(y_test, Test_y_pred, average='macro')
            Test_recall_weighted = recall_score(y_test, Test_y_pred, average='weighted')
            Test_precision_macro = precision_score(y_test, Test_y_pred, average='macro')
            Test_precision_weighted = precision_score(y_test, Test_y_pred, average='weighted')
            Test_confusion = confusion_matrix(y_test, Test_y_pred)

            self.metric_scores[model_name] = {
                'Train Accuracy': Train_accuracy,
                'Train F1 Macro': Train_f1_macro,
                'Train F1 Weighted': Train_f1_weighted,
                'Train Recall Macro': Train_recall_macro,
                'Train Recall Weighted': Train_recall_weighted,
                'Train Precision Macro': Train_precision_macro,
                'Train Precision Weighted': Train_precision_weighted,
                'Train Confusion Matrix': Train_confusion,
                'Test Accuracy': Test_accuracy,
                'Test F1 Macro': Test_f1_macro,
                'Test F1 Weighted': Test_f1_weighted,
                'Test Recall Macro': Test_recall_macro,
                'Test Recall Weighted': Test_recall_weighted,
                'Test Precision Macro': Test_precision_macro,
                'Test Precision Weighted': Test_precision_weighted,
                'Test Confusion Matrix': Test_confusion
            }

    def get_metric_scores(self, model_name):
        return self.metric_scores.get(model_name, {})

    def get_best_params(self, model_name):
        return self.best_params.get(model_name, {})

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import classification_report, f1_score
import matplotlib.pyplot as plt

class DeepModel(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(DeepModel, self).__init__()
        layers = []
        in_features = input_size

        # Input normalization
        layers.append(nn.BatchNorm1d(input_size))

        for hidden_size in hidden_sizes:
            layers.append(nn.Linear(in_features, hidden_size))
            # Batch normalization for hidden layers
            layers.append(nn.BatchNorm1d(hidden_size))
            layers.append(nn.ReLU())
            in_features = hidden_size

        layers.append(nn.Linear(hidden_sizes[-1], output_size))
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

def evaluate_accuracy(model, data_loader):
    model.eval()
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
    accuracy = total_correct / total_samples
    return accuracy

def train_and_evaluate(model, train_loader, dev_loader, y_fold_dev_list):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    num_epochs = 50

    # Lists to store convergence data
    train_losses = []
    dev_losses = []
    train_accuracies = []
    dev_accuracies = []

    # Lists to store true labels for each fold
    true_labels_list = []

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        # Compute and store losses and accuracies
        train_loss = loss.item()
        train_losses.append(train_loss)
        train_acc = evaluate_accuracy(model, train_loader)
        train_accuracies.append(train_acc)
        dev_acc = evaluate_accuracy(model, dev_loader)
        dev_accuracies.append(dev_acc)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, Dev Accuracy: {dev_acc:.4f}")

    # Plot convergence graph
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(range(1, num_epochs + 1), train_accuracies, label='Train Accuracy')
    plt.plot(range(1, num_epochs + 1), dev_accuracies, label='Dev Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Evaluation
    model.eval()
    total_correct = 0
    total_samples = 0
    predicted_labels = []
    with torch.no_grad():
        for inputs, labels in dev_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            predicted_labels.extend(predicted.tolist())

            # Store true labels for the current fold
            true_labels_list.extend(labels.tolist())

    accuracy = total_correct / total_samples

    # Classification report and F1-score
    print("Classification Report:")
    print(classification_report(true_labels_list, predicted_labels))
    f1 = f1_score(true_labels_list, predicted_labels, average='weighted')
    print(f"F1-Score: {f1:.4f}")

    return accuracy

def ann_model(X, y, num_folds=5, test_size=0.3):
    # Convert to PyTorch tensors
    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.long)

    # Split the data into training, validation, and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)

    # Initialize the model
    model = DeepModel(input_size=X.shape[1], hidden_sizes=[64, 32, 16, 8], output_size=y.shape[0])

    # Perform k-fold cross-validation on the training set
    kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

    best_accuracy = 0.0
    best_model = None

    for fold, (train_index, dev_index) in enumerate(kf.split(X_train)):
        X_fold_train, X_fold_dev = X_train[train_index], X_train[dev_index]
        y_fold_train, y_fold_dev = y_train[train_index], y_train[dev_index]

        train_loader = data.DataLoader(data.TensorDataset(X_fold_train, y_fold_train), batch_size=32, shuffle=True)
        dev_loader = data.DataLoader(data.TensorDataset(X_fold_dev, y_fold_dev), batch_size=32, shuffle=False)

        # Train the model
        accuracy = train_and_evaluate(model, train_loader, dev_loader, y_fold_dev_list=y_fold_dev)

        print(f"Fold {fold + 1}/{num_folds}, Accuracy: {accuracy}")

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_model = model

    print(f"Best model, Best accuracy: {best_accuracy}")

    # Evaluate the best model on the separate test set
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.long)
    test_loader = data.DataLoader(data.TensorDataset(X_test, y_test), batch_size=32, shuffle=False)

    test_accuracy = evaluate_accuracy(best_model, test_loader)
    print(f"Test Accuracy: {test_accuracy}")

    return best_model

# Call ann_model with your data
# ann_model(X_data, y_data, num_folds=5, test_size=0.3)


In [None]:
def original_ml_pipeline_obj(x, y, test_size = 0.3):
    # Define the machine learning models
    models = {

        'SVM': SVC(kernel = 'rbf', gamma = 0.1, C = 1.0),
        'MLP Neural Net': MLPClassifier(alpha=1, max_iter=1000, random_state=42),
    }

    # Initialize the MultiModelEvaluator
    evaluator = MultiModelEvaluator(models)

    # Split the data into training and testing sets
    evaluator.split_data(x, y, test_size = test_size)

    # Train the models
    evaluator.train_models()

    # Evaluate the models
    evaluator.evaluate_models(evaluator.X_test, evaluator.y_test)

    # Get metric scores for a specific model
    # model_name = 'RandomForest'
    # scores = evaluator.get_metric_scores(model_name)
    # print(f'Metric Scores for Model {model_name}:')
    # for metric, score in scores.items():
    #     print(f'{metric}: {score}')
    return evaluator

In [None]:
def hyper_tuned_ml_pipeline_obj(x, y, test_size = 0.3):
    # Define the machine learning models
    models = {
        'SVM': SVC(kernel = 'rbf', gamma = 0.1, C = 1.0),
        'MLP Neural Net': MLPClassifier(alpha=1, max_iter=1000, random_state=42),
    }

    n_jobs_values = {

    }

    verbose_values = {

    }

    n_iter_values = {

        'MLP Neural Net': 50,
    }

    # Define parameter grids for hyperparameter tuning
    param_grids = {


        'SVM': {'classifier__C': [0.1, 0.5, 1, 2, 5, 10, 20], 'classifier__kernel': ['rbf'], "classifier__gamma": [0.001, 0.01, 0.1, 0.25, 0.5, 0.75, 1]},


        'MLP Neural Net': {'classifier__hidden_layer_sizes': [(150,100,50), (120,80,40), (100,50,30)], 'classifier__max_iter': [50, 100, 150],
                           'classifier__activation': ['tanh', 'relu'], 'classifier__solver': ['lbfgs', 'adam'], 'classifier__alpha': [0.0001, 0.05],
                           'classifier__learning_rate': ['constant','adaptive'], 'classifier__random_state': [42]},

        }

    # Initialize the MultiModelEvaluatorWithTuning
    evaluator = MultiModelEvaluatorWithTuning(models, param_grids, n_iter_values = n_iter_values, n_jobs_values = n_jobs_values, verbose_values = verbose_values)

    # Split the data into training and testing sets
    evaluator.split_data(x, y, test_size = test_size)

    # Train the models with hyperparameter tuning
    evaluator.train_models()

    # Evaluate the models
    evaluator.evaluate_models(evaluator.X_test, evaluator.y_test)

    # Get metric scores for a specific model
    # model_name = 'RandomForest'
    # scores = evaluator_tuned.get_metric_scores(model_name)
    # print(f'Metric Scores for Model {model_name}:')
    # for metric, score in scores.items():
    #     print(f'{metric}: {score}')

    return evaluator

In [None]:
def evaluate_result(evaluator):
    result = {'Model Name': [], 'Train Accuracy': [], 'Train F1 Macro': [], 'Train F1 Weighted': [], 'Train Recall Macro': [],
              'Train Recall Weighted': [], 'Train Precision Macro': [], 'Train Precision Weighted': [],
              'Test Accuracy': [], 'Test F1 Macro': [], 'Test F1 Weighted': [], 'Test Recall Macro': [],
              'Test Recall Weighted': [], 'Test Precision Macro': [], 'Test Precision Weighted': []
              }

    for model_name in evaluator.metric_scores:
        result['Model Name'].append(model_name)
        for metric_name in list(result.keys())[1:]:
            result[metric_name].append(round(evaluator.metric_scores[model_name][metric_name] * 100.00, 2))

    result = pd.DataFrame(result).T
    result.rename(columns=result.iloc[0], inplace = True)
    result.drop(result.index[0], inplace = True)
    result.index = pd.MultiIndex.from_tuples(
        [('Train', metric.replace('Train ', '')) if i < 7 else ('Test', metric.replace('Test ', '')) for i, metric in enumerate(result.index)],
        names=['', 'Metrics'])
    result.index.names = ['',  'Metrics']
    return result

In [None]:
def evaluate_ann(model, X_train, X_test, y_train, y_test):
    model.eval()
    with torch.no_grad():
        Train_y_pred = torch.max(model(torch.tensor(X_train, dtype=torch.float32)), 1)[1].numpy()
    Train_accuracy = accuracy_score(y_train, Train_y_pred)
    Train_f1_macro = f1_score(y_train, Train_y_pred, average='macro')
    Train_f1_weighted = f1_score(y_train, Train_y_pred, average='weighted')
    Train_recall_macro = recall_score(y_train, Train_y_pred, average='macro')
    Train_recall_weighted = recall_score(y_train, Train_y_pred, average='weighted')
    Train_precision_macro = precision_score(y_train, Train_y_pred, average='macro')
    Train_precision_weighted = precision_score(y_train, Train_y_pred, average='weighted')
    Train_confusion = confusion_matrix(y_train, Train_y_pred)

    model.eval()
    with torch.no_grad():
        Test_y_pred = torch.max(model(torch.tensor(X_test, dtype=torch.float32)), 1)[1].numpy()
    Test_accuracy = accuracy_score(y_test, Test_y_pred)
    Test_f1_macro = f1_score(y_test, Test_y_pred, average='macro')
    Test_f1_weighted = f1_score(y_test, Test_y_pred, average='weighted')
    Test_recall_macro = recall_score(y_test, Test_y_pred, average='macro')
    Test_recall_weighted = recall_score(y_test, Test_y_pred, average='weighted')
    Test_precision_macro = precision_score(y_test, Test_y_pred, average='macro')
    Test_precision_weighted = precision_score(y_test, Test_y_pred, average='weighted')
    Test_confusion = confusion_matrix(y_test, Test_y_pred)

    metrics = {
                    'Train Accuracy': Train_accuracy,
                    'Train F1 Macro': Train_f1_macro,
                    'Train F1 Weighted': Train_f1_weighted,
                    'Train Recall Macro': Train_recall_macro,
                    'Train Recall Weighted': Train_recall_weighted,
                    'Train Precision Macro': Train_precision_macro,
                    'Train Precision Weighted': Train_precision_weighted,
                    'Train Confusion Matrix': Train_confusion,
                    'Test Accuracy': Test_accuracy,
                    'Test F1 Macro': Test_f1_macro,
                    'Test F1 Weighted': Test_f1_weighted,
                    'Test Recall Macro': Test_recall_macro,
                    'Test Recall Weighted': Test_recall_weighted,
                    'Test Precision Macro': Test_precision_macro,
                    'Test Precision Weighted': Test_precision_weighted,
                    'Test Confusion Matrix': Test_confusion
                }

    result = {'Model Name': [], 'Train Accuracy': [], 'Train F1 Macro': [], 'Train F1 Weighted': [], 'Train Recall Macro': [],
              'Train Recall Weighted': [], 'Train Precision Macro': [], 'Train Precision Weighted': [],
              'Test Accuracy': [], 'Test F1 Macro': [], 'Test F1 Weighted': [], 'Test Recall Macro': [],
              'Test Recall Weighted': [], 'Test Precision Macro': [], 'Test Precision Weighted': []
              }
    result['Model Name'].append('ANN')
    for metric_name in list(result.keys())[1:]:
        result[metric_name].append(round(metrics[metric_name] * 100.00, 2))

    result = pd.DataFrame(result).T
    result.rename(columns=result.iloc[0], inplace = True)
    result.drop(result.index[0], inplace = True)
    result.index = pd.MultiIndex.from_tuples(
        [('Train', metric.replace('Train ', '')) if i < 7 else ('Test', metric.replace('Test ', '')) for i, metric in enumerate(result.index)],
        names=['', 'Metrics'])
    result.index.names = ['',  'Metrics']
    return metrics, result

In [None]:
def plot_feature_importances(models, model_names, feature_names):
    """
    Plot feature importances for a list of machine learning models.

    Parameters:
    - models (list): List of trained models.
    - model_names (list): Names of the models for labeling in the plot.
    - feature_names (list): Names of the features for labeling in the plot.

    Returns:
    - None
    """
    num_models = len(models)
    num_features = len(feature_names)

    for i in range(num_models):
        plt.figure(figsize=(10, 6))

        if isinstance(models[i], DecisionTreeClassifier) or isinstance(models[i], RandomForestClassifier):
            importances = models[i].feature_importances_
        elif isinstance(models[i], XGBClassifier) or isinstance(models[i], AdaBoostClassifier):
            importances = models[i].feature_importances_
        elif isinstance(models[i], LGBMClassifier):
            importances = models[i].feature_importances_
        elif isinstance(models[i], LogisticRegression):
            importances = np.abs(models[i].coef_[0])
        elif isinstance(models[i], MLPClassifier):
            importances = [np.sum(np.abs(layer), axis=1) for layer in [layer / np.linalg.norm(layer, ord=2, axis=0)
                            for layer in models[i].coefs_]][0]
        # elif isinstance(models[i], KNeighborsClassifier) or isinstance(models[i], SVC):
        #     importances = [1] * num_features  # KNN and SVM doesn't have feature importance, for instance let's say every feature importance is equal
        elif isinstance(models[i], GradientBoostingClassifier):
            importances = models[i].feature_importances_
        else:
            pass  # KNN, SVM, Naive Bayes and QDA doesn't have feature importance
            # raise ValueError(f"Unsupported model type: {type(models[i])}")

        # Sort feature importances in descending order
        sorted_indices = np.argsort(importances)[::-1]
        sorted_importances = [importances[idx] for idx in sorted_indices]  # Convert to a list of values
        sorted_feature_names = [feature_names[idx] for idx in sorted_indices]

        plt.bar(range(num_features), sorted_importances, tick_label=sorted_feature_names)
        plt.title(f'Feature Importances for {model_names[i]}')
        plt.xticks(rotation=90)
        plt.tight_layout()
        plt.show()

## ML Machine Learning Model

In [None]:
# for feature scaling
from sklearn.preprocessing import StandardScaler
st_x = StandardScaler()

In [None]:
# Preapring data for ML
df_tmp = new_df.drop(["Timestamp"], axis = 1)

In [None]:
# Split data into x and y
x = df_tmp.drop('Target', axis = 1)
y = df_tmp['Target'].values  # converting to numpy array

# Scaling input variables, output variables doens't required as we are just predicting discrete outcomes
x = st_x.fit_transform(x)

In [None]:
evaluator = original_ml_pipeline_obj(x, y, test_size = 0.3)

In [None]:
result = evaluate_result(evaluator)
result

In [None]:
metric_to_show = 'Accuracy'

ax = result.loc[[('Train', metric_to_show), ('Test', metric_to_show)]].T.plot(marker='o', figsize=(14, 8))
plt.title(f'Train and Test {metric_to_show} for Different Models')
plt.xlabel('Model')
plt.ylabel(metric_to_show)
plt.xticks(np.arange(len(result.columns.to_numpy())), result.columns.to_numpy(), rotation=90)
plt.legend(loc='best')
plt.grid(True)
plt.show()

In [None]:
evaluator_hyper_tuned = hyper_tuned_ml_pipeline_obj(x, y, test_size = 0.3)

In [None]:
result_hyper_tuned = evaluate_result(evaluator_hyper_tuned)
result_hyper_tuned

In [None]:
metric_to_show = 'Accuracy'

result_hyper_tuned.loc[[('Train', metric_to_show), ('Test', metric_to_show)]].T.plot(marker='o', figsize=(14, 8))
plt.title(f'Train and Test {metric_to_show} for Different Hyper Tuned Models')
plt.xlabel('Model')
plt.ylabel(metric_to_show)
plt.xticks(np.arange(len(result_hyper_tuned.columns.to_numpy())), result_hyper_tuned.columns.to_numpy(), rotation=90)
plt.legend(loc='best')
plt.grid(True)
plt.show()

In [None]:
ann_model = ann_model(x, y, test_size = 0.3)

In [None]:
evaluate_ann(ann_model, *train_test_split(x, y, test_size=0.3, random_state=42))[1]

In [None]:
metric_to_show = 'Accuracy'       # Accuracy, F1 Macro, F1 Weighted, Recall Macro, Recall Weighted, Precision Macro, Precision Weighted

fig, ax = plt.subplots(figsize=(10, 6))

# Plot the first set of data
# result_hyper_tuned.loc[[('Train', metric_to_show), ('Test', metric_to_show)]].T.plot(ax=ax, marker='o', figsize=(14, 8), title=f'Train and Test {metric_to_show} for Different Models')
result_hyper_tuned.loc[[('Test', metric_to_show)]].T.plot(ax=ax, marker='o', figsize=(14, 8), title=f'Test {metric_to_show} for Different Models')

# Plot the second set of data
# result.loc[[('Train', metric_to_show), ('Test', metric_to_show)]].T.plot(ax=ax, marker='o', figsize=(14, 8))
result.loc[[('Test', metric_to_show)]].T.plot(ax=ax, marker='o', figsize=(14, 8))

# Set labels, legends, and show the plot
ax.set_xlabel('Model')
ax.set_ylabel(metric_to_show)
ax.grid(True)
# ax.legend(['Hyper Tuned Train', 'Hyper Tuned Test', 'Original Train', 'Original Test'])
ax.legend(['Hyper Tuned Test', 'Original Test'])
plt.xticks(np.arange(len(result.columns.to_numpy())), result.columns.to_numpy(), rotation=90)
plt.show();

In [None]:
model_names, models = [], []
for model_name in evaluator_hyper_tuned.model_names:
    model_names.append(model_name)
    models.append(evaluator_hyper_tuned.models[model_name].named_steps['classifier'])
plot_feature_importances(models, model_names, df_tmp.columns.to_numpy()[:-1])

Objects ->
- evaluator
- evaluator_hyper_tuned

Results ->
- result
- result_hyper_tuned

# Saving ML models

In [None]:
from joblib import Parallel, delayed
import joblib

In [None]:
with open("original_models.pkl", "wb") as file:
    joblib.dump(evaluator, file)

with open("hypertuned_models.pkl", "wb") as file:
    joblib.dump(evaluator_hyper_tuned, file)

In [None]:
with open("ann_model.pkl", "wb") as file:
    torch.save(ann_model, file)

In [None]:
# checking

with open("/content/hypertuned_models.pkl", "rb") as file:
    tmp = joblib.load(file)

model_name = 'MLP Neural Net'
scores = tmp.get_metric_scores(model_name)
print(f'Metric Scores for Model {model_name}:')
for metric, score in scores.items():
    print(f'{metric}: {score}')

In [None]:
# checking
ob = torch.load("/content/ann_models.pkl")
evaluate_ann(ob, *train_test_split(x, y, test_size=0.3, random_state=42))[1]

## Merging ANN with other models

In [None]:
with open("/content/original_models.pkl", "rb") as file:
    tmp1 = joblib.load(file)

tmp1 = evaluate_result(tmp1)

ob = torch.load("/content/ann_models.pkl")
tmp2 = evaluate_ann(ob, *train_test_split(x, y, test_size=0.3, random_state=42))[1]


tmp1.join(tmp2)

In [None]:
metric_to_show = 'Accuracy'

tmp1.join(tmp2).loc[[('Train', metric_to_show), ('Test', metric_to_show)]].T.plot(marker='o', figsize=(14, 8))
plt.title(f'Train and Test {metric_to_show} for Different Models')
plt.xlabel('Model')
plt.ylabel(metric_to_show)
plt.xticks(np.arange(len(tmp1.join(tmp2).columns.to_numpy())), tmp1.join(tmp2).columns.to_numpy(), rotation=90)
plt.legend(loc='best')
plt.grid(True)
plt.show()

In [None]:
with open("/content/hypertuned_models.pkl", "rb") as file:
    tmp3 = joblib.load(file)

tmp3 = evaluate_result(tmp3)

ob = torch.load("/content/ann_models.pkl")
tmp4 = evaluate_ann(ob, *train_test_split(x, y, test_size=0.3, random_state=42))[1]


tmp3.join(tmp4)

In [None]:
metric_to_show = 'Accuracy'

tmp3.join(tmp4).loc[[('Train', metric_to_show), ('Test', metric_to_show)]].T.plot(marker='o', figsize=(14, 8))
plt.title(f'Train and Test {metric_to_show} for Different Hyper Tuned Models')
plt.xlabel('Model')
plt.ylabel(metric_to_show)
plt.xticks(np.arange(len(tmp3.join(tmp4).columns.to_numpy())), tmp3.join(tmp4).columns.to_numpy(), rotation=90)
plt.legend(loc='best')
plt.grid(True)
plt.show()

#### FOR RESEARCH PAPER

In [None]:
with open("/content/original_models.pkl", "rb") as file:
    tmp1 = joblib.load(file)

tmp1 = evaluate_result(tmp1)

ob = torch.load("/content/ann_models.pkl")
tmp2 = evaluate_ann(ob, *train_test_split(x, y, test_size=0.3, random_state=42))[1]

qwe = tmp1.join(tmp2).loc['Test'].loc[['Accuracy', 'F1 Weighted', 'Recall Weighted', 'Precision Weighted']][[ 'SVM','MLP Neural Net', 'ANN']].T
qwe['F1 Weighted'] = qwe['F1 Weighted'].map(lambda x: round(x / 100, 2))
qwe['Recall Weighted'] = qwe['Recall Weighted'].map(lambda x: round(x / 100, 2))
qwe['Precision Weighted'] = qwe['Precision Weighted'].map(lambda x: round(x / 100, 2))
qwe

In [None]:
print(qwe.to_markdown())

In [None]:
with open("/content/hypertuned_models.pkl", "rb") as file:
    tmp3 = joblib.load(file)

tmp3 = evaluate_result(tmp3)

ob = torch.load("/content/ann_models.pkl")
tmp4 = evaluate_ann(ob, *train_test_split(x, y, test_size=0.3, random_state=42))[1]

qwe_h = tmp3.join(tmp4).loc['Test'].loc[['Accuracy', 'F1 Weighted', 'Recall Weighted', 'Precision Weighted']][['KNN', 'SVM', 'DecisionTree', 'RandomForest', 'XGB', 'LGBM', 'MLP Neural Net', 'ANN']].T
qwe_h['F1 Weighted'] = qwe_h['F1 Weighted'].map(lambda x: round(x / 100, 2))
qwe_h['Recall Weighted'] = qwe_h['Recall Weighted'].map(lambda x: round(x / 100, 2))
qwe_h['Precision Weighted'] = qwe_h['Precision Weighted'].map(lambda x: round(x / 100, 2))
qwe_h

In [None]:
print(qwe_h.to_markdown())

In [None]:
# Sample data
models = qwe.index.tolist()
accuracy_nontuned = qwe['Accuracy']
accuracy_tuned = qwe_h['Accuracy']

# Set the figure size
fig, ax = plt.subplots(figsize=(10, 6))

bar_width = 0.35
index = np.arange(len(models))

# Plotting the bars
bars1 = ax.bar(index, accuracy_nontuned, width=bar_width, label='Non-Tuned')
bars2 = ax.bar(index + bar_width, accuracy_tuned, width=bar_width, label='Tuned')

# Annotate the bars with accuracy values
for bar, acc in zip(bars1, accuracy_nontuned):
    height = bar.get_height()
    ax.annotate(f'{int(acc)}%', xy=(bar.get_x() + bar.get_width() / 2, height),
                xytext=(0, 3),  # 3 points vertical offset
                textcoords="offset points",
                ha='center', va='bottom')

for bar, acc in zip(bars2, accuracy_tuned):
    height = bar.get_height()
    ax.annotate(f'{int(acc)}%', xy=(bar.get_x() + bar.get_width() / 2, height),
                xytext=(0, 3),  # 3 points vertical offset
                textcoords="offset points",
                ha='center', va='bottom')


# Set labels and title
ax.set_xlabel('Models', fontsize=15)
ax.set_ylabel('Accuracy (%)', fontsize=15)
# ax.set_title('Comparison of Non-Tuned vs. Tuned Models', fontsize=15)

# Rotate x-axis labels for better readability
ax.set_xticks(index + bar_width / 2)
ax.set_xticklabels(models, rotation=90, fontsize=15)

ax.tick_params(axis='y', which='both', labelsize=15)
# Add legend
ax.legend(loc='lower right')

# Show the plot
plt.show()

In [None]:
def calculate_class_accuracy(obj = None, confusion_matrix_ann = None):
    each_model_each_class_accuracy = {}
    res = None
    if obj is not None:
        for model in obj.model_names:
            confusion_matrix = obj.get_metric_scores(model)['Test Confusion Matrix']
            num_classes = len(confusion_matrix)
            class_accuracies = {}

            for i in range(num_classes):
                TP = confusion_matrix[i, i]
                FP = sum(confusion_matrix[:, i]) - TP
                FN = sum(confusion_matrix[i, :]) - TP
                TN = np.sum(confusion_matrix) - TP - FP - FN

                total_samples = TP + TN + FP + FN

                accuracy = (TP + TN) / total_samples
                class_accuracies[f'Class {i}'] = {'Accuracy (%)': round(accuracy * 100.00, 2), 'Total Samples': total_samples, 'Total Correct Samples Predicted': TP + TN}
            each_model_each_class_accuracy[model] = class_accuracies

    if confusion_matrix_ann is not None:
        num_classes = len(confusion_matrix_ann)
        class_accuracies = {}
        for i in range(num_classes):
            TP = confusion_matrix_ann[i, i]
            FP = sum(confusion_matrix_ann[:, i]) - TP
            FN = sum(confusion_matrix_ann[i, :]) - TP
            TN = np.sum(confusion_matrix_ann) - TP - FP - FN

            total_samples = TP + TN + FP + FN

            accuracy = (TP + TN) / total_samples
            class_accuracies[f'Class {i}'] = {'Accuracy (%)': round(accuracy * 100.00, 2), 'Total Samples': total_samples, 'Total Correct Samples Predicted': TP + TN}
        each_model_each_class_accuracy['ANN'] = class_accuracies

    for model_name in each_model_each_class_accuracy:
        tmp = pd.DataFrame(each_model_each_class_accuracy[model_name]).T.stack(0).reset_index().rename(columns = {'level_0': 'Class', 'level_1': 'Attributes', 0: model_name}).set_index(['Class', 'Attributes'], drop = True)
        if res is None:
            res = tmp
        else:
            res = res.join(tmp)

    return res

with open("/content/hypertuned_models.pkl", "rb") as file:
    tmp1 = joblib.load(file)

tmp2 = evaluate_ann(torch.load("/content/ann_models.pkl"),
                    *train_test_split(x, y, test_size=0.3, random_state=42))[0]['Test Confusion Matrix']

class_accuracy_df = calculate_class_accuracy(tmp1, tmp2).query("Attributes == 'Accuracy (%)'")

data_to_plot = class_accuracy_df[['XGB','LGBM', 'DecisionTree', 'RandomForest', 'GradientBoosting', 'MLP Neural Net']]

ax = data_to_plot.plot(marker='o', figsize=(10, 6))

# for model in data_to_plot.columns:
#     for index, value in enumerate(data_to_plot[model]):
#         ax.annotate(f'{value:.2f}%', (index, value), textcoords="offset points", xytext=(0, 5), ha='center')

new_labels = data_to_plot.index.get_level_values(0).to_list()
ax.set_xticks(range(len(new_labels)))
ax.set_xticklabels(new_labels)

ax.set_xlabel('Target', fontsize=15)
ax.set_ylabel('Accuracy (%)', fontsize=15)
# ax.set_title('Occupancy Levels Accuracy Comparison', fontsize=15)
ax.tick_params(axis='y', which='both', labelsize=15);

plt.show()

## Each Class Accuracy of Prediction

In [None]:
def calculate_class_metrics(obj = None, confusion_matrix_ann = None):
    each_model_each_class_metrics = {}
    res = None
    epsilon = 1e-7  # small constant

    if obj is not None:
        for model in obj.model_names:
            confusion_matrix = obj.get_metric_scores(model)['Test Confusion Matrix']
            num_classes = len(confusion_matrix)
            class_metrics = {}

            for i in range(num_classes):
                TP = confusion_matrix[i, i]
                FP = sum(confusion_matrix[:, i]) - TP
                FN = sum(confusion_matrix[i, :]) - TP
                TN = np.sum(confusion_matrix) - TP - FP - FN

                total_samples = TP + FP + FN+TN

                accuracy = TP / (total_samples + epsilon)
                precision = TP / (TP + FP + epsilon)
                recall = TP / (TP + FN + epsilon)
                f1_score = 2 * (precision * recall) / (precision + recall + epsilon)

                class_metrics[f'Class {i}'] = {'Accuracy (%)': round(accuracy * 100.00, 2), 'Precision (%)': round(precision * 100.00, 2), 'Recall (%)': round(recall * 100.00, 2), 'F1 Score (%)': round(f1_score * 100.00, 2)}
            each_model_each_class_metrics[model] = class_metrics

    if confusion_matrix_ann is not None:
        num_classes = len(confusion_matrix_ann)
        class_metrics = {}
        for i in range(num_classes):
            TP = confusion_matrix_ann[i, i]
            FP = sum(confusion_matrix_ann[:, i]) - TP
            FN = sum(confusion_matrix_ann[i, :]) - TP
            TN = np.sum(confusion_matrix) - TP - FP - FN

            total_samples = TP + FP + FN + TN

            accuracy = TP / (total_samples + epsilon)
            precision = TP / (TP + FP + epsilon)
            recall = TP / (TP + FN + epsilon)
            f1_score = 2 * (precision * recall) / (precision + recall + epsilon)

            class_metrics[f'Class {i}'] = {'Accuracy (%)': round(accuracy * 100.00, 2), 'Precision (%)': round(precision * 100.00, 2), 'Recall (%)': round(recall * 100.00, 2), 'F1 Score (%)': round(f1_score * 100.00, 2)}
        each_model_each_class_metrics['ANN'] = class_metrics

    for model_name in each_model_each_class_metrics:
        tmp = pd.DataFrame(each_model_each_class_metrics[model_name]).T.stack(0).reset_index().rename(columns = {'level_0': 'Class', 'level_1': 'Attributes', 0: model_name}).set_index(['Class', 'Attributes'], drop = True)
        if res is None:
            res = tmp
        else:
            res = res.join(tmp, how='outer')

    return res

In [None]:
with open("/content/hypertuned_models.pkl", "rb") as file:
    tmp1 = joblib.load(file)

tmp2 = evaluate_ann(torch.load("/content/ann_models.pkl"),
                    *train_test_split(x, y, test_size=0.3, random_state=42))[0]['Test Confusion Matrix']

calculate_class_accuracy(tmp1, tmp2)
# calculate_class_accuracy(tmp1, tmp2).query("Attributes == 'Accuracy (%)'")