<h1><center>Dependencies Installation</center></h1>

In [None]:
%pip install pandas==2.2.2
%pip install matplotlib==3.9.2
%pip install seaborn==0.13.2
%pip install scikit-learn==1.5.2
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124

<h1><center>Import all needed libraries</center></h1>

In [None]:
import os
from pathlib import Path
from typing import List, Tuple

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset

from sklearn.impute import KNNImputer
from sklearn.preprocessing import LabelEncoder, StandardScaler

<h1><center>Setup agnostic device</center></h1

In [None]:
device_name = (
    "cuda" if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available()
    else "cpu"
)

device = torch.device(device_name)
print(f"Using {device_name} device")

<h1><center>Data Preparation</center></h1>

<h2>Read in dataset</h2>

In [None]:
ROOT_DIR: str = Path().resolve().parent
DATA_DIR: str = os.path.join(ROOT_DIR, 'data')

df: pd.DataFrame = pd.read_csv(f'{DATA_DIR}/almond.csv').drop(columns=["Unnamed: 0"])
df

In [None]:
df.head()

In [None]:
df.tail()

In [None]:
df.columns

<h2>Sanity-check on the dataset</h2>

<h3><li>Find and drop duplicates across rows</li></h3>

In [None]:
df.duplicated().sum()

In [None]:
df.drop_duplicates(inplace=True)

In [None]:
df.duplicated().sum()

<h3><li>Check the shape of the dataset and see if there are any null values in columns</li></h3>

In [None]:
df.shape

In [None]:
df.info()

In [None]:
df.isna().sum()

In [None]:
(df.isnull().sum() / df.shape[0]) * 100

<h3><li>Visualize the distribution of the data</li></h3>

In [None]:
numeric_columns = df.columns

plt.figure(figsize=(15, 15))
for i, column in enumerate(numeric_columns, 1):
    if column == 'Type':
        continue
    plt.subplot(len(numeric_columns) // 3 + 1, 3, i)
    sns.histplot(df[column], kde=True, bins=30)
    plt.title(f'Distribution of {column}')
    plt.xlabel(column)
    plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

<h3><li>Log transform the columns with outliers</li></h3>

In [None]:
for column_name in numeric_columns:
    if column_name != 'Type' and not (-0.5 < df[column_name].skew() < 0.5):
        print(column_name)
        df[column_name] = np.log(df[column_name])

In [None]:
numeric_columns = df.columns

plt.figure(figsize=(15, 15))
for i, column in enumerate(numeric_columns, 1):
    if column == 'Type':
        continue
    plt.subplot(len(numeric_columns) // 3 + 1, 3, i)
    sns.histplot(df[column], kde=True, bins=30)
    plt.title(f'Distribution of {column}')
    plt.xlabel(column)
    plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

<h3><li>Fill in the missing null values</li></h3>

In [None]:
columns_with_missing_values: List[str] = df.columns[df.isnull().any()].tolist()
knn_imputer: KNNImputer = KNNImputer(n_neighbors=5)
df[columns_with_missing_values] = knn_imputer.fit_transform(df[columns_with_missing_values])
df.isnull().sum()

<h3><li>Find garbage values in the target variable 'Type'</li></h3>

In [None]:
column_name: str = 'Type'
print('****' * 5 + ' START ' + "****" * 5)
print(df[column_name].value_counts())
print('****' * 5 + ' END ' + "****" * 6)

In [None]:
plt.figure(figsize=(8, 4))
df[column_name].value_counts().plot(kind='pie', autopct='%1.1f%%')
plt.title(f'Value Counts for {column_name}')
plt.ylabel('x')
plt.show()

<h3><li>Encode the Target Variable</li></h3>

In [None]:
label_encoder: LabelEncoder = LabelEncoder()
df['Type'] = label_encoder.fit_transform(df['Type'])
df.head(10)

<h2>Exploratory Data Analysis (EDA)</h2>

<h3><li>Perform statistical analysis on the dataset</li></h3>

In [None]:
df.describe().T

In [None]:
plt.figure(figsize=(15, 15))
for i, column in enumerate(numeric_columns, 1):
    if column == 'Type':
        continue
    plt.subplot(len(numeric_columns) // 3 + 1, 3, i)
    sns.boxplot(x=df[column])
    plt.title(f'Box Plot of {column}')
    plt.xlabel(column)

plt.tight_layout()
plt.show()

<h3><li>Standardize the features data</li></h3>

In [None]:
scaler = StandardScaler()

columns_to_scale: List[str] = df.columns.tolist()
columns_to_scale.remove('Type')

df[columns_to_scale] = scaler.fit_transform(df[columns_to_scale])
df.head()

<h2>Turn data into tensors and split the data into train and test datasets</h2>

In [None]:
X = df.drop('Type', axis=1)
y = df['Type']

num_of_features = X.shape[1]
num_of_classes = y.nunique()
num_of_features, num_of_classes

In [None]:
X_tensor = torch.tensor(X.values, dtype=torch.float)
y_tensor = torch.tensor(y.values, dtype=torch.long)
dataset = TensorDataset(X_tensor, y_tensor)

<h1><center>Implementation</center></h1>

<h2>Setup NeuralNetwork</h2>


In [None]:
class AlmondClassificationNN(nn.Module):
    def __init__(
            self,
            seed: int,
            in_features: int,
            out_features: int,
            hidden_layers=None
    ):
        super(AlmondClassificationNN, self).__init__()
        self.seed = seed
        self._seed_model()

        if hidden_layers is None:
            hidden_layers = [(10, 10)]

        self.input = nn.Linear(in_features, hidden_layers[0][0])
        self.input_activation = nn.ReLU()

        self.hidden_layers = nn.ModuleList()
        self.num_of_hidden_layers = len(hidden_layers)
        for index in range(0, self.num_of_hidden_layers):
            in_neurons, out_neurons = hidden_layers[index]
            self.hidden_layers.append(nn.Linear(in_neurons, out_neurons))
            self.hidden_layers.append(nn.ReLU())

        self.output = nn.Linear(hidden_layers[-1][1], out_features)
        self.output_activation = F.softmax

        self._initialize_weights()

    def _initialize_weights(self):
        for layer in self.modules():
            if isinstance(layer, nn.Linear):
                nn.init.kaiming_uniform_(layer.weight, nonlinearity='relu')
                if layer.bias is not None:
                    nn.init.zeros_(layer.bias)

    def _seed_model(self):
        torch.manual_seed(self.seed)
        torch.cuda.manual_seed(self.seed)

        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(self.seed)
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False

    def forward(self, features):
        features = self.input_activation(self.input(features))

        for hidden_layer in self.hidden_layers:
            features = hidden_layer(features)

        return self.output_activation(self.output(features), dim=-1)

<h2>Setup neural network and training algorithms / optimizers</h2>

In [174]:
from sklearn.metrics import accuracy_score
import itertools
from sklearn.model_selection import KFold
from torch.utils.data import Subset, DataLoader


def grid_search(
        seed: int,
        num_of_splits: int,
        param_grid: dict,
        num_of_epoches: int, 
        optimizer_name: str, 
        optimizer_momentum: float
):
    keys, values = zip(*param_grid.items())
    param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

    for param_combination in param_combinations:
        learning_rate = param_combination['learning_rate']
        hidden_layers: List[Tuple[int, int]] = param_combination['hidden_layers']

        kf: KFold = KFold(n_splits=num_of_splits, shuffle=True, random_state=seed)
        fold_accuracies = []
        fold_losses = []

        for fold, (train_index, validation_index) in enumerate(kf.split(dataset)):
            train_subset = Subset(dataset, train_index)
            validation_subset = Subset(dataset, validation_index)

            train_loader = DataLoader(train_subset, batch_size=12, shuffle=True)
            validation_loader = DataLoader(validation_subset, batch_size=12)
            
            neuralnetwork = AlmondClassificationNN(
                seed=seed,
                in_features=num_of_features,
                out_features=num_of_classes,
                hidden_layers=hidden_layers,
            )

            criterion = nn.CrossEntropyLoss()

            optimizers_dict = {
                'sgd': optim.SGD(neuralnetwork.parameters(), lr=learning_rate, momentum=optimizer_momentum),
                'adam': optim.Adam(neuralnetwork.parameters(), lr=learning_rate),
                'rmsprop': optim.RMSprop(neuralnetwork.parameters(), lr=learning_rate),
                'rprop': optim.Rprop(neuralnetwork.parameters(), lr=learning_rate)
            }
            optimizer = optimizers_dict.get(optimizer_name)

            for epoch in range(num_of_epoches):
                neuralnetwork.train()
                running_loss = 0.0

                for X_train_batch, y_train_batch in train_loader:
                    optimizer.zero_grad()
                    y_batch_prediction = neuralnetwork(X_train_batch)

                    loss = criterion(y_batch_prediction, y_train_batch.long())
                    loss.backward()

                    optimizer.step()
                    running_loss += loss.item()

                print(f'Epoch [{epoch + 1}/{num_of_epoches}], Training Loss: {running_loss / len(train_loader):.4f}')
                
            with torch.no_grad():
                neuralnetwork.eval()
                
                val_preds = []
                val_true = []
                running_val_loss = 0.0
                with torch.no_grad():
                    for X_validation_batch, y_validation_batch in validation_loader:
                        y_validation_prediction = neuralnetwork(X_validation_batch)
                        loss = criterion(y_validation_prediction, y_validation_batch.long())
                        running_val_loss += loss.item()
    
                        _, predicted = torch.max(y_validation_prediction, 1)
                        val_preds.extend(predicted.numpy())
                        val_true.extend(y_validation_batch.numpy())

                    avg_val_loss = running_val_loss / len(validation_loader)
                    accuracy = accuracy_score(val_true, val_preds)
                    fold_accuracies.append(accuracy)
                    fold_losses.append(avg_val_loss)
            
            print(f'Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {accuracy:.4f}')
        avg_accuracy = sum(fold_accuracies) / len(fold_accuracies)
        avg_loss = sum(fold_losses) / len(fold_losses)
        print(f'  Average Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.4f} for parameters {param_combination}')


In [175]:
grid_search(
    seed=999,
    num_of_splits=2,
    param_grid={
        'learning_rate': [0.01, 0.003, 0.005, 0.001],
        'hidden_layers': [[(512, 256), (256, 128), (128, 64), (64,32), (32, 16)]]
    },
    num_of_epoches=100,
    optimizer_name='sgd',
    optimizer_momentum=0.5
)

Epoch [1/100], Training Loss: 0.9933
Epoch [2/100], Training Loss: 0.9296
Epoch [3/100], Training Loss: 0.9030
Epoch [4/100], Training Loss: 0.8841
Epoch [5/100], Training Loss: 0.8656
Epoch [6/100], Training Loss: 0.8477
Epoch [7/100], Training Loss: 0.8388
Epoch [8/100], Training Loss: 0.8296
Epoch [9/100], Training Loss: 0.8167
Epoch [10/100], Training Loss: 0.8083
Epoch [11/100], Training Loss: 0.8125
Epoch [12/100], Training Loss: 0.8062
Epoch [13/100], Training Loss: 0.7890
Epoch [14/100], Training Loss: 0.7905
Epoch [15/100], Training Loss: 0.7964
Epoch [16/100], Training Loss: 0.7854
Epoch [17/100], Training Loss: 0.7799
Epoch [18/100], Training Loss: 0.7769
Epoch [19/100], Training Loss: 0.7607
Epoch [20/100], Training Loss: 0.7637
Epoch [21/100], Training Loss: 0.7731
Epoch [22/100], Training Loss: 0.7633
Epoch [23/100], Training Loss: 0.7728
Epoch [24/100], Training Loss: 0.7522
Epoch [25/100], Training Loss: 0.7481
Epoch [26/100], Training Loss: 0.7449
Epoch [27/100], Train