In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset

# Original Dataset

In [2]:
# Load the data
df = pd.read_csv('../../data/processed/heart_se.csv')

In [3]:
# Split the data into features and target variable
X = df.drop(['target','index'], axis=1)
y = df['target']

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [4]:
# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32)

# Create DataLoader for training and testing
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

## Automatic Tuning

In [25]:
import optuna
# Define the neural network model inside an objective function for Optuna
def objective(trial):
    # Suggest hyperparameters
    n_layers = trial.suggest_int('n_layers', 1, 10)
    layers = []
    input_size = X_train.shape[1]
    
    for i in range(n_layers):
        num_hidden_units = trial.suggest_int(f'n_units_l{i}', 1, 128)
        layers.append(nn.Linear(input_size, num_hidden_units))
        layers.append(nn.ReLU())
        input_size = num_hidden_units
    
    layers.append(nn.Linear(input_size, 1))
    layers.append(nn.Sigmoid())  # Assuming binary classification
    
    model = nn.Sequential(*layers)
    
    # Suggest optimizer
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD', 'Adadelta', 'Adagrad', 'AdamW'])
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
    
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
    
    # Define loss function
    criterion = nn.BCELoss()
    
    # Training loop
    num_epochs = 50
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch).squeeze()
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
    
    # Validation loop
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch).squeeze()
            predicted = (outputs > 0.5).float()
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    
    accuracy = correct / total
    return accuracy

# Optimize the objective function
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=500)

# Print best trial
print("Best trial:")
trial = study.best_trial

print(f"  Accuracy: {trial.value}")
print("  Best hyperparameters: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

[I 2024-08-28 16:48:21,922] A new study created in memory with name: no-name-b5094171-0b2e-4732-b24e-4d9d57a02158

suggest_loguniform has been deprecated in v3.0.0. This feature will be removed in v6.0.0. See https://github.com/optuna/optuna/releases/tag/v3.0.0. Use suggest_float(..., log=True) instead.

[I 2024-08-28 16:48:22,099] Trial 0 finished with value: 0.8524590163934426 and parameters: {'n_layers': 5, 'n_units_l0': 31, 'n_units_l1': 11, 'n_units_l2': 17, 'n_units_l3': 81, 'n_units_l4': 67, 'optimizer': 'Adagrad', 'lr': 0.029352889965675578}. Best is trial 0 with value: 0.8524590163934426.
[I 2024-08-28 16:48:22,239] Trial 1 finished with value: 0.8032786885245902 and parameters: {'n_layers': 3, 'n_units_l0': 32, 'n_units_l1': 112, 'n_units_l2': 125, 'optimizer': 'Adagrad', 'lr': 0.00014210764136038644}. Best is trial 0 with value: 0.8524590163934426.
[I 2024-08-28 16:48:22,544] Trial 2 finished with value: 0.47540983606557374 and parameters: {'n_layers': 7, 'n_units_l0': 123, 

Best trial:
  Accuracy: 0.9180327868852459
  Best hyperparameters: 
    n_layers: 4
    n_units_l0: 83
    n_units_l1: 33
    n_units_l2: 18
    n_units_l3: 22
    optimizer: RMSprop
    lr: 7.625737685718206e-05


In [41]:
from optuna.visualization import plot_optimization_history, plot_param_importances, plot_parallel_coordinate, plot_slice

# Plot the optimization history
opt_history = plot_optimization_history(study)
opt_history.show()



In [38]:
import plotly.graph_objects as go
import optuna
from optuna.importance import get_param_importances

# Fetch the parameter importances
param_importances = get_param_importances(study)

# Optionally aggregate or normalize the importance of the `n_units_l{i}` parameters
aggregated_importance = {}
for param, importance in param_importances.items():
    if 'n_units_l' in param:
        if 'n_units' not in aggregated_importance:
            aggregated_importance['n_units'] = []
        aggregated_importance['n_units'].append(importance)
    else:
        aggregated_importance[param] = importance

# Average the importance of the `n_units` hyperparameters
if 'n_units' in aggregated_importance:
    aggregated_importance['n_units'] = sum(aggregated_importance['n_units']) / len(aggregated_importance['n_units'])

# Convert aggregated importance back to a list for plotting
param_importances = sorted(aggregated_importance.items(), key=lambda x: x[1], reverse=True)

# Extract the parameters and their importances for plotting
params, importances = zip(*param_importances)

# Create a horizontal bar chart using Plotly
fig = go.Figure(go.Bar(
    x=importances,
    y=params,
    orientation='h'
))

fig.update_layout(
    title='Hyperparameter Importances',
    xaxis_title='Importance',
    yaxis_title='Hyperparameter',
    yaxis=dict(categoryorder='total ascending')
)

# Show the plot
fig.show()

In [37]:
# Extract the best number of layers
best_n_layers = study.best_trial.params['n_layers']

# Filter the trials to create a new study object with only relevant trials
filtered_trials = [t for t in study.trials if t.params['n_layers'] == best_n_layers]

# Create a new study object with the filtered trials
filtered_study = optuna.create_study(direction='maximize')
for trial in filtered_trials:
    filtered_study.add_trial(trial)

# Generate the parallel coordinate plot for the filtered study
parallel_coordinate = plot_parallel_coordinate(filtered_study)
parallel_coordinate.show()

[I 2024-08-28 16:56:35,884] A new study created in memory with name: no-name-e52fb873-f820-41a0-bad5-36b95240fd89


In [29]:
# Plot the slice plot for each hyperparameter
slice_plot = plot_slice(study)
slice_plot.show()

In [134]:
# confusion matrix and classification report
from sklearn.metrics import classification_report, confusion_matrix
import numpy

# Extract the best trial
best_trial = study.best_trial

# Extract the best hyperparameters
best_n_layers = best_trial.params['n_layers']
best_layers = [best_trial.params[f'n_units_l{i}'] for i in range(best_n_layers)]
best_optimizer = best_trial.params['optimizer']
best_lr = best_trial.params['lr']

# Define the model using the best hyperparameters
layers = []
input_size = X_train.shape[1]
for num_hidden_units in best_layers:
    layers.append(nn.Linear(input_size, num_hidden_units))
    layers.append(nn.ReLU())
    input_size = num_hidden_units
    
layers.append(nn.Linear(input_size, 1))
layers.append(nn.Sigmoid())
model = nn.Sequential(*layers)

# Define the optimizer
optimizer = getattr(optim, best_optimizer)(model.parameters(), lr=best_lr)

# Define the loss function
criterion = nn.BCELoss()

losses_train = []

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch).squeeze()
        loss = criterion(outputs, y_batch)
        losses_train.append(loss.item())
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

# Test loop
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    y_pred = []
    y_true = []
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch).squeeze()
        predicted = (outputs > 0.5).float()
        y_pred.extend(predicted.numpy())
        y_true.extend(y_batch.numpy())
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()




In [None]:
#plot confusion matrix with seaborn blue
import seaborn as sns
import matplotlib.pyplot as plt

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()

In [50]:
# Print classification report
print(classification_report(y_true, y_pred, target_names=['less', 'more']))


              precision    recall  f1-score   support

        less       0.96      0.79      0.87        29
        more       0.84      0.97      0.90        32

    accuracy                           0.89        61
   macro avg       0.90      0.88      0.88        61
weighted avg       0.90      0.89      0.88        61



# Hand tuning

In [133]:

# Define the model using the best hyperparameters
layers = []
input_size = X_train.shape[1]

layers.append(nn.Linear(input_size, 27*2))
layers.append(nn.ReLU())
layers.append(nn.Linear(27*2, 3))
layers.append(nn.ReLU())
layers.append(nn.Linear(3, 1))
layers.append(nn.Sigmoid())

model = nn.Sequential(*layers)

# Define the optimizer
optimizer = optim.AdamW(model.parameters(), lr=0.0001)

# Define the loss function
criterion = nn.BCELoss()

losses_train = []
losses_test = []
accuracies_train = []
accuracies_test = []

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch).squeeze()
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        predicted = (outputs > 0.5).float()
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    
    losses_train.append(running_loss / len(train_loader))
    accuracies_train.append(correct / total)
    
    # Validation loop
    model.eval()
    with torch.no_grad():
        running_loss = 0.0
        correct = 0
        total = 0
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch).squeeze()
            loss = criterion(outputs, y_batch)
            running_loss += loss.item()
            predicted = (outputs > 0.5).float()
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
        
        losses_test.append(running_loss / len(test_loader))
        accuracies_test.append(correct / total)
    
    if accuracies_test[-1] > 0.90:
        break

#plot training and validation loss and accuracy with plotly
import plotly.graph_objects as go


fig = go.Figure()

fig.add_trace(go.Scatter(x=list(range(num_epochs)), y=accuracies_train, mode='lines', name='Training Accuracy'))
fig.add_trace(go.Scatter(x=list(range(num_epochs)), y=accuracies_test, mode='lines', name='Validation Accuracy'))

fig.update_layout(
    title='Accuracy Curves',
    xaxis_title='Epoch',
    yaxis_title='Accuracy',
)

fig.show()

fig = go.Figure()

fig.add_trace(go.Scatter(x=list(range(num_epochs)), y=losses_train, mode='lines', name='Training Loss'))
fig.add_trace(go.Scatter(x=list(range(num_epochs)), y=losses_test, mode='lines', name='Validation Loss'))

fig.update_layout(
    title='Loss Curves',
    xaxis_title='Epoch',
    yaxis_title='Loss',
)

fig.show()




