In [1]:
import os
import numpy as np
import pandas as pd
import nibabel as nib
import matplotlib.pyplot as plt
from datetime import datetime
from copy import deepcopy

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, balanced_accuracy_score

from train_model import train_model
from validate import validate

seed = 42
torch.manual_seed(1337)
BATCH_SIZE = 5 # må bytte før "stor" trening
N_EPOCHS = 5
LEARNING_RATE = 0.00001
DROPOUT_RATE = 0.5
NUM_CLASSES = 2

device = ('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [2]:
class COBRE_dataset_subj(Dataset):
    """This Dataset class loads pairs of COBRE images and labels into the computer memory. The entire subject file is loaded.

    Parameters:
    - data_path (str): Path to the folder containing images and labels
    Returns:
    - Tensor: Torch tensor with COBRE time-series
    - int: label (0 or 1)

   """
    def __init__(self, data_path):
        self.data_path = data_path
        self.num_subjects = len(os.listdir(self.data_path))

    def __len__(self):
        return self.num_subjects

    def __getitem__(self, index):
        subject = os.listdir(self.data_path)[index]
        img = self.data_path + '/' + subject + '/nourest.nii'
        label = self.data_path + '/' + subject + '/' + subject + '_data.csv'

        img = nib.load(img).get_fdata()
        img = np.swapaxes(img, 0, 3)
        img = torch.from_numpy(img)
        img = img.to(torch.float)

        label = pd.read_csv(label)
        label = label.iloc[0]['subject_type']
        # covariates = df.iloc[0]['age', 'gender', 'handedness'].tolist()

        ### LABEL
        if label == 'Patient':
            label = 1
        elif label == 'Control':
            label = 0
        else:
            print('Something wrong with data label:', end=' ')
            print(label)
        
        ### AGE, GENDER AND HANDEDNESS
        # age = df.iloc[0]['age']
        # gender = df.iloc[0]['gender']
        # handedness = df.iloc[0]['handedness']

        # ### AGE
        # # age = (age - 18)/(65-18)

        # ### GENDER
        # if gender == 'Female':
        #     gender = 1
        # elif gender == 'Male':
        #     gender = 0
        # else:
        #     print('Something wrong with data label (gender):', end=' ')
        #     print(gender)

        # ### HANDEDNESS
        # if handedness == 'Right':
        #     handedness = [1,0]
        # elif handedness == 'Left':
        #     handedness = [0,1]
        # elif handedness == 'Both':
        #     handedness = [1,1]
        # else:
        #     print('Something wrong with data label (handedness):', end=' ')
        #     print(handedness)

        # covariates = [age, gender]
        # for h in handedness:
        #     covariates.append(h)
        
        # covariates = torch.Tensor(covariates)

        return img, label #, covariates

In [3]:
train_path = 'C:/Users/oscar/OneDrive - University of Bergen/Documents/Master/vsc/COBRE_learning/data/train'
val_path = 'C:/Users/oscar/OneDrive - University of Bergen/Documents/Master/vsc/COBRE_learning/data/val'

train_data = COBRE_dataset_subj(train_path)
val_data = COBRE_dataset_subj(val_path)

trainloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True) #, pin_memory=True, pin_memory_device=device)
valloader = DataLoader(val_data, batch_size=1, shuffle=False) #, pin_memory=True, pin_memory_device=device)

In [None]:
# class Net(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.conv1 = nn.Conv3d(150, 300, 5) #, padding=5)
#         self.pool = nn.MaxPool3d(2, 2)
#         self.conv2 = nn.Conv3d(300, 16, 5)
#         #self.fc1 = nn.LazyLinear(120)
#         self.fc1 = nn.Linear(16 * 2 * 2, 120)
#         self.fc2 = nn.Linear(120, 84)
#         self.fc3 = nn.Linear(84, 17)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
#         x = torch.flatten(x, 1) # flatten all dimensions except batch
#         x = F.relu(self.fc1(x))
#         x = F.relu(self.fc2(x))
#         x = self.fc3(x)
#         return x


# net = Net()
# net = net.to(device)
# criterion = nn.CrossEntropyLoss().to(device)
# optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:
# ## Alternativ fra ChatGPT

# class SimpleCNN(nn.Module):
#     def __init__(self, num_classes):
#         super(SimpleCNN, self).__init__()
        
#         # Convolutional layers
#         self.conv1 = nn.Conv3d(in_channels=150, out_channels=50, kernel_size=(3, 3, 3), padding=1)
#         self.conv2 = nn.Conv3d(in_channels=50, out_channels=100, kernel_size=(3, 3, 3), padding=1)
#         self.conv3 = nn.Conv3d(in_channels=100, out_channels=150, kernel_size=(3, 3, 3), padding=1)
        
#         # Max pooling layer
#         self.pool = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=2)
        
#         # Fully connected layers
#         self.fc1 = nn.Linear(150 * 891, 512)  # Adjust the input size based on your spatial dimensions, 9 * 11 * 9
#         self.fc2 = nn.Linear(512, num_classes)
        
#         # Dropout layer to reduce overfitting
#         self.dropout = nn.Dropout(0.5)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
#         x = self.pool(F.relu(self.conv3(x)))
        
#         # Flatten the tensor before fully connected layers
#         x = x.view(-1, 150 * 891)  # Adjust the size based on your spatial dimensions
        
#         x = F.relu(self.fc1(x))
#         x = self.dropout(x)
#         x = self.fc2(x)
        
#         return x

# # Instantiate the model
# num_classes = 2  # Adjust based on the number of classes in your classification task
# net = SimpleCNN(num_classes)

# # Define loss function and optimizer
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(net.parameters(), lr=0.0001)

In [None]:
# ### med UNET som inspirasjon

# class UNET_inspired(nn.Module):
#     def __init__(self, num_classes, in_channels):
#         super(UNET_inspired, self).__init__()
        
#         # Convolutional layers
#         self.conv1 = (Conv_layer(in_channels, in_channels*2))
#         self.down1 = (Down(in_channels*2, in_channels*4))
#         self.down2 = (Down(in_channels*4, in_channels*8))
#         self.down3 = (Down(in_channels*8, in_channels*16))
#         self.pool = (nn.MaxPool3d(kernel_size=(2,2,2), stride=2))
#         self.fc1 = (nn.LazyLinear(num_classes))
        
#         # Dropout layer to reduce overfitting
#         self.dropout = nn.Dropout(0.5)

        
#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.down1(x)
#         x = self.down2(x)
#         x = self.down3(x)
#         x = self.pool(x)
#         x = x.view(-1, 2400 * 80)
#         x = self.fc1(x)
#         return x


# class Conv_layer(nn.Module):
#     def __init__(self, in_channels, out_channels):
#         super().__init__()
#         self.conv = nn.Sequential(
#             nn.Conv3d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, 3, 3), padding=1),
#             nn.BatchNorm3d(out_channels),
#             nn.ReLU(inplace=True),
#             nn.Conv3d(in_channels=out_channels, out_channels=out_channels, kernel_size=(3, 3, 3), padding=1),
#             nn.BatchNorm3d(out_channels),
#             nn.ReLU(inplace=True)
#         )

#     def forward(self, x):
#         return self.conv(x)
        
# class Down(nn.Module):
#     def __init__(self, in_channels, out_channels):
#         super().__init__()
#         self.pool_conv = nn.Sequential(
#             nn.MaxPool3d(kernel_size=(2, 2, 2)),
#             Conv_layer(in_channels, out_channels)
#         )

#     def forward(self, x):
#         return self.pool_conv(x)


# # Instantiate the model
# num_classes = 2  # Adjust based on the number of classes in your classification task
# unet = UNET_inspired(num_classes, 150)

# # Define loss function and optimizer
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(unet.parameters(), lr=0.001) #endre på denne for å få ned loss?

In [None]:
# ### SAVED, 77.777777
# ### med UNET som inspirasjon, med Mari sin hjelp
# ### kan ta inspirasjon fra denne: https://doi.org/gjj4xv

# class UNET_Mari(nn.Module):
#     def __init__(self, num_classes, in_channels):
#         super(UNET_Mari, self).__init__()
        
#         # Convolutional layers
#         self.conv1 = (nn.Conv3d(in_channels, 10, kernel_size=(3, 3, 3), padding=1))
#         self.conv1_2 = (nn.Conv3d(10, 1, kernel_size=(3, 3, 3), padding=1))
#         self.pool1 = (nn.MaxPool3d(kernel_size=(2,2,2), stride=2))
#         self.dropout = (nn.Dropout(0.5))
#         self.conv2 = (nn.Conv2d(47, 1, kernel_size=(3, 3), padding=1))
#         # also more layers here, perhaps
#         self.pool2 = (nn.MaxPool2d(kernel_size=(2,2), stride=2))
#         self.fc1 = (nn.Linear(19*19, 4*4)) # one functional layer or two?
#         self.fc2 = (nn.Linear(4*4, num_classes))

#     def forward(self, x): # should ReLU be used in every step here?
#         x = F.relu(self.conv1(x))
#         x = F.relu(self.conv1_2(x))
#         x = self.dropout(x)
#         x = self.pool1(x)
#         x = torch.squeeze(x, dim=1)
#         x = F.relu(self.conv2(x))
#         x = self.dropout(x)
#         x = self.pool2(x)
#         x = torch.flatten(x, start_dim=1)
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

# # Instantiate the model
# num_classes = 2  # Adjust based on the number of classes in your classification task
# unet = UNET_Mari(num_classes, 150)

# # Define loss function and optimizer
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(unet.parameters(), lr=0.0001) #endre på lr for å få ned loss?

In [5]:
### 77.7777 modified
### med UNET som inspirasjon, med Mari sin hjelp
### kan ta inspirasjon fra denne: https://doi.org/gjj4xv

class UNET_Mari(nn.Module):
    def __init__(self, num_classes, dropout_rate):
        super(UNET_Mari, self).__init__()
        
        # Convolutional layers
        self.conv1 = (nn.Conv3d(150, 10, kernel_size=(3, 3, 3), padding=1))
        self.conv1_2 = (nn.Conv3d(10, 1, kernel_size=(3, 3, 3), padding=1))
        self.pool1 = (nn.MaxPool3d(kernel_size=(2,2,2), stride=2))
        self.dropout = (nn.Dropout(dropout_rate))
        self.conv2 = (nn.Conv2d(47, 1, kernel_size=(3, 3), padding=1))
        # also more layers here, perhaps
        self.pool2 = (nn.MaxPool2d(kernel_size=(2,2), stride=2))
        self.fc1 = (nn.Linear(19*19, 4*4)) # one functional layer or two?
        self.fc2 = (nn.Linear(4*4, num_classes))

    def forward(self, x): # should ReLU be used in every step here?
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv1_2(x))
        x = self.dropout(x)
        x = self.pool1(x)
        x = torch.squeeze(x, dim=1)
        x = F.relu(self.conv2(x))
        x = self.dropout(x)
        x = self.pool2(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Instantiate the model
unet = UNET_Mari(NUM_CLASSES, dropout_rate=DROPOUT_RATE)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(unet.parameters(), lr=0.00001) #endre på lr for å få ned loss?

In [None]:
best_params, losses_val, losses_train = train_model(unet, device, trainloader, valloader, N_EPOCHS, learning_rate=LEARNING_RATE)

In [None]:
best_model = UNET_Mari(NUM_CLASSES, dropout_rate=DROPOUT_RATE)
# best_model.load_state_dict(torch.load('models/best_model_params_nou.pt'))
best_model.load_state_dict(best_params)
best_model.to(device)

In [None]:
y_true, y_pred = validate(best_model, device, valloader, 'val')

In [None]:
y_true_train, y_pred_train = validate(best_model, device, trainloader, 'train')

In [None]:
print('Val')
print('F1-score:', f1_score(y_true, y_pred, average="weighted")*100)
print('Balanced accuracy score:', balanced_accuracy_score(y_true, y_pred)*100)
print()
print('Train')
print('F1-score:', f1_score(y_true_train, y_pred_train, average="weighted")*100)
print('Balanced accuracy score:', balanced_accuracy_score(y_true_train, y_pred_train)*100)

In [None]:
print('Val')
print('True:', y_true)
unique, count = np.unique(y_true, return_counts=True)
print(unique, count)
print('Pred:', y_pred)
unique, count = np.unique(y_pred, return_counts=True)
print(unique, count)
print('Missed:', abs(count[0]-count[1])//2)
unique, count = np.unique([t==y for t,y in zip(y_true, y_pred)], return_counts=True)
print(unique, count)
print()
print('Train')
print('True:', y_true_train)
unique, count = np.unique(y_true_train, return_counts=True)
print(unique, count)
print('Pred:', y_pred_train)
unique, count = np.unique(y_pred_train, return_counts=True)
print(unique, count)
print('Missed:', abs(count[0]-count[1])//2)
unique, count = np.unique([t==y for t,y in zip(y_true_train, y_pred_train)], return_counts=True)
print(unique, count)

In [None]:
validation_losses = torch.Tensor([losses_val], device='cpu')
train_losses = torch.Tensor([losses_train], device='cpu')
colors = ['r', 'b', 'g']

for i in range(len(validation_losses)):
    xvalues = np.linspace(0, N_EPOCHS, len(validation_losses[i]))
    yvalues01 = validation_losses[i]
    yvalues02 = train_losses[i]
    name = "model %i "%(i+1)

    plt.plot(xvalues, yvalues01, label=name + "val loss", color='r')
    plt.plot(xvalues, yvalues02, label=name + "train loss", color='b', linestyle="dashed")

plt.title('Validation  and training loss')
plt.xlabel('epoch')
plt.ylabel('value')
plt.grid(True)
plt.legend()
plt.show()