## Neural Network

This jupyter notebook implements the neural network model using convolutional layers

In [27]:
#Import libraries
from torchvision.io import read_image
from torch.utils.data import DataLoader
import torchvision.transforms as T
import torch 
import torch.nn as nn # basic building block for neural neteorks
import torch.nn.functional as F # import convolution functions like Relu
import torch.optim as optim # optimzer
import os
from sklearn.metrics import recall_score, accuracy_score, f1_score
import numpy as np
from XRayDataset import XRayDataset
from dataloader import train_path, test_path, val_path

In [40]:
class CustomNeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.LeNet = nn.Sequential(     
            # convolutional layers            
            nn.Sequential(                                            # FIRST LAYER: (INPUT LAYER)
              nn.Conv2d(1, 6, kernel_size=5, stride=1, padding=0),    # CONVOLUTION 
              nn.BatchNorm2d(6),
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2)),                 # POOLING
            nn.Sequential(                                            # SECOND LAYER: HIDDEN LAYER 1
              nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0),   # CONVOLUTION 
              nn.BatchNorm2d(16),
              nn.ReLU(),
              nn.MaxPool2d(kernel_size=2, stride=2)),                 # POOLING
            # fully connected layers
            nn.Flatten(),
            nn.Linear(16 * 125 * 125, 400),                            # THIRD LAYER: LINEAR YEAR, HIDDEN LAYER 2
            nn.ReLU(),                                                # HIDDEN LAYER's ACTIVATION FUNCION
            nn.Linear(400, 10),                                   # OUTPUT LAYER                          # THIRD LAYER: LINEAR YEAR, HIDDEN LAYER 2
            # nn.Sigmoid(),                                                # HIDDEN LAYER's ACTIVATION FUNCION
            nn.Linear(10, 2),
            nn.Sigmoid()              
        )

    def forward(self, x):
        out = self.LeNet(x)
        return out



In [41]:
model = CustomNeuralNetwork()

In [42]:
#Dataloader code from Checkpoint #1

resize = T.Compose([
            T.ToPILImage(), 
            T.Grayscale(num_output_channels=1),
            T.Resize((int(512), int(512))), # Resize the image to match median ratio using median length, we can try later with smaller length 
            T.ToTensor()
        ])
 
transforms = T.Compose([
            T.ToPILImage(), 
            T.Grayscale(num_output_channels=1),
            T.RandomAdjustSharpness(sharpness_factor=10),
            T.ColorJitter(brightness=.5, hue=.3),
            T.Resize((int(512), int(512))),
            # Resize the image to match median ratio using median length 
            T.ToTensor()
        ])
training_data = XRayDataset(train_path, transforms)
val_data = XRayDataset(val_path, resize)
test_data = XRayDataset(test_path, resize)

#Discussion with Ian said to start with small batches (2,4)
#Medium post said do powers of 2 starting with 16 
#I'll go with middle point and do batch = 8
#https://medium.com/data-science-365/determining-the-right-batch-size-for-a-neural-network-to-get-better-and-faster-results-7a8662830f15

batch_s = 64

train_dataloader = DataLoader(training_data, batch_size=batch_s, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=batch_s, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_s, shuffle=True)


In [43]:
# 3: Define a Loss function and optimizer
criterion = nn.CrossEntropyLoss() #a.Change the cross entropy of the original code to NLL 
#which we have used before for binary classification
#b. 
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [63]:
# 4: Train and validate the network
EPOCHS = 3
train_losses = []
train_accuracies = []
train_recalls = []
for _ in range(EPOCHS):
    # TRAIN
    model.train()  # Make sure gradient tracking is on, and do a pass over the data
    running_loss = 0.0
    y_true = np.array([])
    y_predict = np.array([])
    for i, data in enumerate(train_dataloader):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        optimizer.zero_grad()  # zero the parameter gradients
        outputs = model(inputs)  # forward pass
        label_dict = {"PNEUMONIA": 0, "NORMAL": 1}
        labels_tensor = torch.tensor([label_dict[label] for label in labels])
        loss = criterion(outputs, labels_tensor)  # calculate loss
        loss.backward()  # backward pass
        optimizer.step()  # update weights
        running_loss += loss.item()
        y_true = np.concatenate((y_true, labels_tensor.numpy()))
        y_pred_batch = np.argmax(outputs.detach().numpy(), axis=1)
        y_predict = np.concatenate((y_predict, y_pred_batch))
    
    epoch_accuracy = (y_predict == y_true).mean()
    epoch_recall = recall_score(y_true, y_predict, average='binary')
    train_losses.append(running_loss)
    train_accuracies.append(epoch_accuracy)
    train_recalls.append(epoch_recall)


In [64]:
print("y_predict", y_predict)
print("y_true", y_true)
print(train_losses)
print(train_accuracies)
print(train_recalls)


y_predict [0. 0. 0. ... 0. 0. 0.]
y_true [0. 0. 0. ... 0. 0. 0.]
[37.14831039309502, 31.12672910094261, 29.61932474374771]
[0.870590490797546, 0.9472776073619632, 0.9637653374233128]
[0.580909768829232, 0.877703206562267, 0.9217002237136466]


In [65]:
# 4: Train and validate the network
EPOCHS = 3
val_losses = []
val_accuracies = []
val_recalls = []
for _ in range(EPOCHS):
    # VAL
    model.eval()  # # we don't want to keep track of the gradients in the validation part
    val_running_loss = 0.0
    y_true_val = np.array([])
    y_predict_val = np.array([])
    for i, data in enumerate(val_dataloader):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        optimizer.zero_grad()  # zero the parameter gradients
        outputs = model(inputs)  # forward pass
        label_dict = {"PNEUMONIA": 0, "NORMAL": 1}
        labels_tensor = torch.tensor([label_dict[label] for label in labels])
        loss = criterion(outputs, labels_tensor)  # calculate loss
        loss.backward()  # backward pass
        optimizer.step()  # update weights
        val_running_loss += loss.item()
        y_true_val = np.concatenate((y_true_val, labels_tensor.numpy()))
        y_predict_val = np.concatenate((y_predict_val, outputs.argmax(dim=1).numpy()))

    val_epoch_accuracy = (y_predict_val == y_true_val).mean()
    val_epoch_recall = recall_score(y_true_val, y_predict_val, average='binary')
    val_losses.append(val_running_loss)
    val_accuracies.append(val_epoch_accuracy)
    val_recalls.append(val_epoch_recall)

In [66]:
print(val_accuracies)
print(val_losses)
print(val_recalls)

unique, counts = np.unique(y_predict_val, return_counts=True)

# print the unique values and their counts
for value, count in zip(unique, counts):
    print(f"{value}: {count}")

[0.9375, 1.0, 0.9375]
[0.3876577317714691, 0.3318898677825928, 0.3688928484916687]
[0.875, 1.0, 1.0]
0.0: 7
1.0: 9


In [68]:
#5. Test model
EPOCHS = 1
test_losses = []
test_accuracies = []
test_recalls = []
with torch.no_grad():
    for _ in range(EPOCHS):
        # TEST
        model.train()  # Make sure gradient tracking is on, and do a pass over the data
        test_running_loss = 0.0
        y_true_test = np.array([])
        y_predict_test = np.array([])
        for i, data in enumerate(test_dataloader):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data
            optimizer.zero_grad()  # zero the parameter gradients
            outputs = model(inputs)  # forward pass
            print("OUTPUTS: ", outputs)
            label_dict = {"PNEUMONIA": 0, "NORMAL": 1}
            labels_tensor = torch.tensor([label_dict[label] for label in labels])
            loss = criterion(outputs, labels_tensor)  # calculate loss
            optimizer.step()  # update weights
            test_running_loss += loss.item()
            y_true_test = np.concatenate((y_true_test, labels_tensor.numpy()))
            y_predict_test = np.concatenate((y_predict_test, outputs.argmax(dim=1).numpy()))

        test_epoch_accuracy = (y_predict_test == y_true_test).mean()
        test_epoch_recall = recall_score(y_true_test, y_predict_test, average='binary')
        test_losses.append(test_running_loss)
        test_accuracies.append(test_epoch_accuracy)
        test_recalls.append(test_epoch_recall)
  
    

OUTPUTS:  tensor([[9.9993e-01, 1.1051e-04],
        [9.9846e-01, 2.3455e-03],
        [9.9959e-01, 5.9133e-04],
        [9.9890e-01, 1.4377e-03],
        [9.8109e-01, 2.1246e-02],
        [6.3483e-01, 3.6520e-01],
        [9.9611e-01, 5.5270e-03],
        [1.0000e+00, 5.6834e-06],
        [9.9999e-01, 2.3638e-05],
        [9.7138e-01, 2.8862e-02],
        [1.4589e-01, 8.5023e-01],
        [2.2475e-03, 9.9514e-01],
        [9.9948e-01, 7.5620e-04],
        [9.9133e-01, 1.2435e-02],
        [9.9981e-01, 2.7706e-04],
        [9.9726e-01, 4.0727e-03],
        [8.6206e-01, 9.9039e-02],
        [5.5549e-01, 4.5538e-01],
        [1.0000e+00, 1.9017e-06],
        [9.9629e-01, 4.2956e-03],
        [9.9993e-01, 1.2495e-04],
        [8.8171e-01, 1.0584e-01],
        [9.8435e-01, 1.6822e-02],
        [9.9979e-01, 3.1359e-04],
        [9.9954e-01, 6.8935e-04],
        [9.9416e-01, 7.3714e-03],
        [9.9973e-01, 4.9360e-04],
        [9.9665e-01, 5.5680e-03],
        [9.9347e-01, 9.1560e-03],
    

In [69]:
print(test_accuracies)
print(test_losses)
print(test_recalls)

unique, counts = np.unique(y_predict_test, return_counts=True)

# print the unique values and their counts
for value, count in zip(unique, counts):
    print(f"{value}: {count}")


[0.6923076923076923]
[5.979405164718628]
[0.18376068376068377]
0.0: 580
1.0: 44


In [None]:
# Activation Functions
# ReLU
# x being the tensor
y = torch.relu(x)