# Exploration

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
patients_data = pd.read_csv('data.csv', decimal=',')
patients_data['Sedentary_hours_daily'] = patients_data['Sedentary_hours_daily'].astype(float)
patients_data['Regular_fiber_diet'] = patients_data['Regular_fiber_diet'].astype(float)
patients_data['Age'] = patients_data['Age'].astype(float)
patients_data['Est_avg_calorie_intake'] = patients_data['Est_avg_calorie_intake'].astype(int)
patients_data['Main_meals_daily'] = patients_data['Main_meals_daily'].astype(int)
patients_data['Height'] = patients_data['Height'].astype(float)
patients_data['Water_daily'] = patients_data['Water_daily'].astype(int)
patients_data['Weight'] = patients_data['Weight'].astype(float)
patients_data['Physical_activity_level'] = patients_data['Physical_activity_level'].astype(int)
patients_data['Technology_time_use'] = patients_data['Technology_time_use'].astype(int)

In [None]:
ptbdb_abnormal = pd.read_csv('ptbdb_abnormal.csv', header=None)
ptbdb_normal = pd.read_csv('ptbdb_normal.csv', header=None)

ptbdb_abnormal['label'] = 1
ptbdb_normal['label'] = 0
ptbdb = pd.concat([ptbdb_normal, ptbdb_abnormal], axis=0).reset_index(drop=True)

In [None]:
label_counts = ptbdb['label'].value_counts()
plt.figure(figsize=(10, 6))
sns.barplot(x=label_counts.index, y=label_counts)
plt.xticks(range(2), ['Normal', 'Abnormal'])
plt.title('Count')
plt.show()

In [None]:
plt.figure(figsize=(12, 6))

normal_example = ptbdb[ptbdb['label'] == 0].iloc[0, :-1]
plt.plot(normal_example, label='Normal')
plt.title('Normal series example')
plt.legend()
plt.show()

anormal_example = ptbdb[ptbdb['label'] == 1].iloc[0, :-1]
plt.plot(anormal_example, label='Anormal')
plt.title('Anormal series example')
plt.legend()
plt.show()

In [None]:
mean_normal = ptbdb[ptbdb['label'] == 0].iloc[:, :-1].mean()
std_normal = ptbdb[ptbdb['label'] == 0].iloc[:, :-1].std()

mean_anormal = ptbdb[ptbdb['label'] == 1].iloc[:, :-1].mean()
std_anormal = ptbdb[ptbdb['label'] == 1].iloc[:, :-1].std()

plt.figure(figsize=(12, 6))
plt.plot(mean_normal, label='Normal mean')
plt.fill_between(range(len(mean_normal)), mean_normal - std_normal, mean_normal + std_normal, alpha=0.2)
plt.title('Normal mean and standard deviation')
plt.legend()
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(mean_anormal, label='Anormal mean')
plt.fill_between(range(len(mean_anormal)), mean_anormal - std_anormal, mean_anormal + std_anormal, alpha=0.2)
plt.title('Anormal mean and standard deviation')
plt.legend()
plt.show()

# Train

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import numpy as np
from scipy.stats import zscore
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch

In [None]:
numeric_attributes = patients_data[['Sedentary_hours_daily', 'Regular_fiber_diet', 'Age', 'Est_avg_calorie_intake', 'Main_meals_daily',\
                      'Height', 'Water_daily', 'Weight', 'Physical_activity_level', 'Technology_time_use']]
categorical_attributes = patients_data[['Transportation', 'Diagnostic_in_family_history',\
                          'High_calorie_diet', 'Alcohol', 'Snacks', 'Smoker', 'Calorie_monitoring', 'Gender']]

# encode categorical attributes
encoder = LabelEncoder()
for column in categorical_attributes.columns:
    patients_data[column] = encoder.fit_transform(patients_data[column])
patients_data['Diagnostic'] = encoder.fit_transform(patients_data['Diagnostic'])

# remove outliers
z_scores = numeric_attributes.apply(zscore)
numeric_attributes[z_scores.abs() > 3] = np.nan

# treat missing values
imputer = IterativeImputer(missing_values=-1, max_iter=10)
patients_data['Weight'] = imputer.fit_transform(patients_data[['Weight']])
for column in numeric_attributes:
    patients_data[column] = imputer.fit_transform(patients_data[[column]])

# standardize data
scaler = StandardScaler()
patients_data[numeric_attributes.columns] = scaler.fit_transform(patients_data[numeric_attributes.columns])

X_patients = patients_data.drop('Diagnostic', axis=1)
Y_patients = patients_data['Diagnostic']

X_train_patients, X_test_patients, Y_train_patients, Y_test_patients = train_test_split(X_patients, Y_patients, test_size=0.2, random_state=42)

X_train_patients = torch.tensor(X_train_patients.values, dtype=torch.float32)
Y_train_patients = torch.tensor(Y_train_patients.values, dtype=torch.long)
X_test_patients = torch.tensor(X_test_patients.values, dtype=torch.float32)
Y_test_patients = torch.tensor(Y_test_patients.values, dtype=torch.long)

In [None]:
X_ptbdb = ptbdb.iloc[:, :-1].values
Y_ptbdb = ptbdb.iloc[:, -1].values.astype(int)

scaler = StandardScaler()
X_ptbdb = scaler.fit_transform(X_ptbdb)

X_train_ptbdb, X_test_ptbdb, Y_train_ptbdb, Y_test_ptbdb = train_test_split(X_ptbdb, Y_ptbdb, test_size=0.2, random_state=42)

X_train_ptbdb = torch.tensor(X_train_ptbdb, dtype=torch.float32).numpy()
Y_train_ptbdb = torch.tensor(Y_train_ptbdb, dtype=torch.long).numpy()
X_test_ptbdb = torch.tensor(X_test_ptbdb, dtype=torch.float32).numpy()
Y_test_ptbdb = torch.tensor(Y_test_ptbdb, dtype=torch.long).numpy()

In [None]:
import torch.nn as nn
import torch.optim as optim

class MLP(nn.Module):
    def __init__(self, input_size, hidden_layer_sizes):
        super(MLP, self).__init__()
        layers = []
        in_features = input_size
        for hidden_size in hidden_layer_sizes:
            layers.append(nn.Linear(in_features, hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.2))
            in_features = hidden_size
        layers.append(nn.Linear(in_features, 7))
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)
    
class CNN1D(nn.Module):
    def __init__(self, num_conv_layers, num_fc_layers, dropout_rate):
        super(CNN1D, self).__init__()
        self.num_conv_layers = num_conv_layers
        self.convs = self._create_conv_layers(num_conv_layers)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fcs = self._create_fc_layers(num_fc_layers, dropout_rate)

    def _create_conv_layers(self, num_layers):
        layers = []
        in_channels = 1
        out_channels = 32
        for _ in range(num_layers):
            layers.append(nn.Conv1d(in_channels, out_channels, kernel_size=5, padding=2))
            layers.append(nn.ReLU())
            in_channels = out_channels
            out_channels *= 2
        return nn.Sequential(*layers)

    def _create_fc_layers(self, num_layers, dropout_rate):
        layers = []
        in_features = 32 * 2 ** (self.num_conv_layers - 1)
        for _ in range(num_layers - 1):
            layers.append(nn.Linear(in_features, 64))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            in_features //= 2
        layers.append(nn.Linear(in_features, 2))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.convs(x)
        x = self.pool(x).squeeze(-1)
        x = self.fcs(x)
        return x

In [None]:
from skorch import NeuralNetClassifier
from skorch.callbacks import Callback

train_losses = []
valid_losses = []

class LossCallback(Callback):
    def __init__(self):
        self.train_losses_ = []
        self.valid_losses_ = []
        self.estimators_ = []
    
    def on_epoch_end(self, net, **kwargs):
        train_loss = net.history[-1, 'train_loss']
        valid_loss = net.history[-1, 'valid_loss']
        self.train_losses_.append(train_loss)
        self.valid_losses_.append(valid_loss)
        self.estimators_.append(net.module_)

    def on_train_end(self, net, **kwargs):
        train_losses.append(self.train_losses_)
        valid_losses.append(self.valid_losses_)
        print(net.get_params())

mlp_patients_model = NeuralNetClassifier(module=MLP, max_epochs=10, batch_size=32, module__input_size=X_patients.shape[1], lr=0.001, optimizer=optim.Adam, criterion=nn.CrossEntropyLoss)
mlp_ptbdb_model = NeuralNetClassifier(module=MLP, max_epochs=10, batch_size=32, module__input_size=X_ptbdb.shape[1], lr=0.001, optimizer=optim.Adam, criterion=nn.CrossEntropyLoss)
cnn1d_ptbdb_model = NeuralNetClassifier(module=CNN1D, max_epochs=10, batch_size=32, lr=0.001, optimizer=optim.Adam, criterion=nn.CrossEntropyLoss)
mlp_patients_callback = LossCallback()
mlp_ptbdb_callback = LossCallback()
cnn1d_ptbdb_callback = LossCallback()
mlp_patients_model.callbacks = [mlp_patients_callback]
mlp_ptbdb_model.callbacks = [mlp_ptbdb_callback]
cnn1d_ptbdb_model.callbacks = [cnn1d_ptbdb_callback]

algorithms = {
    'MLP Patients': {
        'model': mlp_patients_model,
        'params': {
            'module__hidden_layer_sizes': [(256, 128), (128, 64), (64, 32)]
        }
    },
    'MLP Ptbdb': {
        'model': mlp_ptbdb_model,
        'params': {
            'module__hidden_layer_sizes': [(256, 128), (128, 64), (64, 32)]
        }
    },
    'CNN1D Ptbdb': {
        'model': cnn1d_ptbdb_model,
        'params': {
            'module__num_conv_layers': [2, 3],
            'module__num_fc_layers': [1, 2],
            'module__dropout_rate': [0.3, 0.5]
        }
    }
}

In [None]:
from sklearn.model_selection import GridSearchCV

results = {}
def grid_search(algorithm, X_train, Y_train):
    data = algorithms[algorithm]
    grid_search = GridSearchCV(data['model'], data['params'],
                               scoring=['accuracy', 'precision_macro', 'recall_macro', 'f1_macro'],
                               refit='accuracy', cv=3, return_train_score=True, n_jobs = 1)
    grid_search.fit(X_train, Y_train)
    results[algorithm] = grid_search

print('Grid search MLP Patients')
grid_search('MLP Patients', X_train_patients, Y_train_patients)
print('Grid search MLP PTBDB')
grid_search('MLP Ptbdb', X_train_ptbdb, Y_train_ptbdb)
print('Grid search CNN1D PTBDB')
grid_search('CNN1D Ptbdb', X_train_ptbdb, Y_train_ptbdb)

In [None]:
train_losses_cp = train_losses.copy()
valid_losses_cp = valid_losses.copy()

adjusted_losses = []

for i in range(0, len(train_losses_cp), 3):
    adjusted_losses.append(np.mean(train_losses_cp[i:i+3], axis=0))
print(len(adjusted_losses))

adjusted_test_losses = []
for i in range(0, len(valid_losses_cp), 3):
    adjusted_test_losses.append(np.mean(valid_losses_cp[i:i+3], axis=0))
print(len(adjusted_test_losses))

In [None]:
from sklearn.metrics import confusion_matrix

for (algorithm, result), (s, e), (X_test, Y_test) in zip(results.items(), [(0, 3), (3, 6), (6, 13)], [(X_test_patients, Y_test_patients), (X_test_ptbdb, Y_test_ptbdb), (X_test_ptbdb, Y_test_ptbdb)]):
    print()
    print(f'Algorithm: {algorithm}')
    print(f'Best hyperparameters: {result.best_params_}')
    print(f'Best score: {result.best_score_}')
    cv_results = pd.DataFrame(result.cv_results_)
    res = cv_results[['params', 'mean_test_accuracy', 'std_test_accuracy', 'mean_test_precision_macro',
                            'std_test_precision_macro', 'mean_test_recall_macro', 'std_test_recall_macro', 'mean_test_f1_macro', 'std_test_f1_macro']]
    res = res.sort_values(by='mean_test_accuracy', ascending=False)
    res = res.set_index('params')
    res.columns = ['Accuracy', 'Accuracy std', 'Precision', 'Precision std', 'Recall', 'Recall std', 'F1', 'F1 std']
    res.index = res.index.map(lambda x: {k: v for k, v in x.items()})
    print(res.to_markdown())

    plt.figure(figsize=(10, 6))
    for i in range(s, e):
        trlosses = adjusted_losses[i]
        tlosses = adjusted_test_losses[i]
        plt.plot(np.arange(len(trlosses)), trlosses, label=f'Train Loss Variation {i+1} - {algorithm}')
        plt.plot(np.arange(len(tlosses)), tlosses, label=f'Test Loss Variation {i+1} - {algorithm}')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Loss Curves for {algorithm}')
    plt.legend()
    plt.show()

    best_model = result.best_estimator_
    Y_pred = best_model.predict(X_test)
    conf_matrix = confusion_matrix(Y_test, Y_pred)
    plt.figure(figsize=(10, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'Confusion Matrix for {algorithm}')
    plt.show()