In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose
import matplotlib.pyplot as plt
import numpy as np
import SimpleITK as sitk
import os
from torchvision.io import read_image
from PIL import Image
import pandas as pd
from torch import reshape

from torch.autograd import Variable
from torch.nn import Linear, ReLU, CrossEntropyLoss, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout, Conv3d
from torch.optim import Adam, SGD

In [None]:
idx = 0
transform = ToTensor()
croppath = "Mphys project/cropped niftys"

class CustomImageDataset():
    def __init__(self, croppath, transform=None, target_transfrom=None):
        #self.img_labels = pd.read_csv("/mnt/c/Users/Patrick/Documents/NSCLC Radiomics Lung1.clinical-version3-Oct 2019(1).csv")
        #needs new path
        self.img_labels = pd.read_csv("Mphys project/cancerdata.csv")
        #print(self.img_labels)
        self.img_dir = croppath
        self.transform=transform
        #print(self.transform)
        self.target_transform=target_transfrom

    def __len__ (self):
        return len(self.img_labels)

    def __getitem__ (self, idx):
        img_path = os.path.join(self.img_dir, f"{self.img_labels.iloc[idx, 0]}-GTV-1.nii")
        image = sitk.ReadImage(img_path)
        array = sitk.GetArrayFromImage(image)
        #tensor = torch.from_numpy(array)
        time_to_death = self.img_labels.iloc[idx,8]
        dead_status = self.img_labels.iloc[idx,9]
        if time_to_death < 1.5*365 and dead_status == 1:
            label=1
        else:
            label=0
        
        return(array, label)
dataset = CustomImageDataset(croppath, ToTensor(), None)
#print(len(dataset))

trainset, valset, testset = torch.utils.data.random_split(dataset, [40,8,8])

In [None]:
train_dataloader = DataLoader(trainset, batch_size=4, shuffle=True)
train_features = next(iter(train_dataloader))
test_dataloader = DataLoader(testset, batch_size=4, shuffle=True)
test_features = next(iter(train_dataloader))
val_dataloader = DataLoader(valset, batch_size=4, shuffle=True)
val_features = next(iter(val_dataloader))

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

In [None]:
class Net(Module):   
    def __init__(self):
        super(Net, self).__init__()
  
        self.cnn_layers = Sequential(
            # Defining a 2D convolution layer
            Conv3d(1, 4, 3, 1, 1),
            nn.BatchNorm3d(4),#normalises batch
            ReLU(inplace=True),#applies a ReLu to the neurons
            nn.MaxPool3d(kernel_size=2, stride=2),#finds max pool of feature map
            # Defining another 2D convolution layer
            Conv3d(4, 8, 3, 1, 1),#64 neurons per layer
            nn.BatchNorm3d(8),
            ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
        )
        
        self.linear_layers = Sequential(
            Linear(2299968, 2)
        )

    # Defining the forward pass    
    def forward(self, x):
        x = self.cnn_layers(x)#calls the constructor to execute the convolutions, passes the tensor x and gets the 
        #result of the convolution passed back.
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)
        return x

model = Net().to(device)
print(model)

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        X = reshape(X, (X.shape[0],1,264,264,264))
        X = X.float()
        X = X.to(device)
        y = y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)
        
        

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 1 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        loss_temp.append(loss)
        

def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X = reshape(X, (X.shape[0],1,264,264,264))
            X = X.float()
            X = X.to(device)
            y = y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
  
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    val_loss.append(test_loss)

learning_rate = 0.001
# defining the model
model = Net()
# defining the optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# defining the loss function
loss_fn = nn.CrossEntropyLoss()
# checking if GPU is available
model.to(device)
loss_fn.to(device)

In [None]:
epochs = 10
train_loss = []
val_loss = []
for t in range(epochs):
    loss_temp=[]
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
    
    train_loss.append(loss_temp.pop())
    print(train_loss, val_loss)
print("Done!")

In [None]:
# plotting the training and validation loss
plt.plot(train_loss, label='Training loss')
plt.plot(val_loss, label='Validation loss')
plt.legend()
plt.show()