<a href="https://colab.research.google.com/github/edatkinson/Prohibited-Item-Detection/blob/main/x_ray_cnn_classes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torchvision
import torchvision.transforms as transforms

# Check whether we have a GPU.  Use it if we do.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [2]:
from google.colab import drive
import pandas as pd
from PIL import Image
import os
from torch.utils.data import Dataset

# Mount Google Drive
drive.mount('/content/drive')

# Navigate to the folder
data_dir = '/content/drive/MyDrive/x_ray_dataset'

# Define transforms
transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor()
])

# Create dataset class
class X_ray_dataset(Dataset):
    def __init__(self, annotations_file = os.path.join(data_dir, 'Classes.csv'), img_dir = os.path.join(data_dir, 'x_ray_images'), transform = None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = Image.open(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        return image, label

Mounted at /content/drive


In [3]:
# Test to see if the class is working
x_ray = X_ray_dataset(transform=transform)
print('len dataset:', len(x_ray)) # use the __len__ method to show the length of the dataset
example = x_ray[0] # use the __getitem__ method to get the first example
print(example[0].shape)
print('Class:', example[1])

len dataset: 9841
torch.Size([3, 128, 128])
Class: 10


In [4]:
from torch.utils.data import random_split, DataLoader

x_ray = X_ray_dataset(transform=transform)
# Define the size of the training set
train_size = int(0.85 * len(x_ray))  # 85% for training

# Split the dataset into training and testing sets
train_ds, test_ds = random_split(x_ray, [train_size, len(x_ray) - train_size])

# Split the training set into train and validation (train ~ 70% (6887), test ~ 15% (1477), val ~ 15% (1477))
train_ds, val_ds = random_split(train_ds, [6887, 1477])

# Define class labels
classes = ('Safe', 'Baton', 'Bullet', 'Gun', 'Hammer', 'Handcuffs', 'Knife',
            'Lighter', 'Pliers', 'Powerbank', 'Scissors', 'Sprayer', 'Wrench')

batch_size = 10

# Define Dataloaders
train_loader = DataLoader(train_ds,
                          batch_size = batch_size,
                          shuffle = True)
test_loader = DataLoader(test_ds,
                         batch_size = batch_size*5,
                         shuffle = False)
val_loader = DataLoader(val_ds,
                        batch_size = batch_size*5,
                        shuffle = False)

In [40]:
import torch.nn as nn

# Create CNN
class CNN(nn.Module):
  def __init__(self):
    super().__init__()
    # Creating layers
    self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 256, kernel_size = 3)
    self.batch1 = nn.BatchNorm2d(256)
    self.relu1 = nn.ReLU()

    self.conv2 = nn.Conv2d(in_channels = 256, out_channels = 128, kernel_size = 3)
    self.batch2 = nn.BatchNorm2d(128)
    self.relu2 = nn.ReLU()
    self.pool2 = nn.AdaptiveAvgPool2d(1)

  def forward(self, x):
    x = nn.functional.relu(self.conv1(x)) # apply non-linearity
    x = self.batch1(x)
    x = self.relu1(x)

    x = nn.functional.relu(self.conv2(x)) # apply non-linearity
    x = self.batch2(x)
    x = self.relu2(x)
    x = self.pool2(x)

    return x

In [41]:
# Instantiate Network
net = CNN()
net.to(device)

CNN(
  (conv1): Conv2d(3, 256, kernel_size=(3, 3), stride=(1, 1))
  (batch1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu1): ReLU()
  (conv2): Conv2d(256, 128, kernel_size=(3, 3), stride=(1, 1))
  (batch2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu2): ReLU()
  (pool2): AdaptiveAvgPool2d(output_size=1)
)

In [42]:
# Checking the shape of an image before and after the network
for i, data in enumerate(train_loader):
  inputs, labels = data[0].to(device), data[1].to(device)
  print('Input shape:', inputs.shape)
  print('Output shape:', net(inputs).shape)
  break

Input shape: torch.Size([10, 3, 128, 128])
Output shape: torch.Size([10, 128, 1, 1])


In [43]:
# Specifying hyperparameters
import torch.optim as optim

loss_func = nn.CrossEntropyLoss()
optimiser = optim.SGD(net.parameters(), lr = 0.1)

In [44]:
# Function for training one epoch
def train_epoch():
  net.train(True) # set to training mode

  # Metrics that will build up
  running_loss = 0
  running_accuracy = 0

  # Iterate over train data
  for batch_index, data in enumerate(train_loader):
    inputs, labels = data[0].to(device), data[1].to(device) # get the images and labels

    optimiser.zero_grad() # set all non-zero values for gradients to 0 (reset gradients)

    outputs = net(inputs).squeeze((-1, -2)) # shape: [batch size, 2]
    correct_prediction = torch.sum(labels == torch.argmax(outputs, dim = 1)).item() # check how many images the model predicted correctly
    running_accuracy += correct_prediction/batch_size # update the accuracy

    # Train model
    loss = loss_func(outputs, labels) # compare model predictions with labels
    running_loss += loss.item() # update the loss
    loss.backward() # calculate gradients
    optimiser.step()

    if batch_index % 50 == 49:      # print for every 500 batchs
      avg_loss = running_loss/50  # get the average loss across batches
      avg_acc = (running_accuracy/50) * 100 # get the average accuracy across batches
      print('Batch {0}, Loss: {1:.3f}, Accuracy: {2:.1f}%'.format(batch_index+1, avg_loss, avg_acc))

      running_loss = 0
      running_accuracy = 0

  print()

In [45]:
# Function for testing
def test_epoch(epoch):
  with torch.no_grad():
    correct_prediction = 0
    total = 0
    for images, labels in test_loader:
      images = images.to(device)
      labels = labels.to(device)

      outputs = net(images).squeeze((-1,-2))

      predicted = torch.argmax(outputs, -1)
      correct_prediction += (predicted == labels).sum().item()
      total += labels.size(0)

    acc =(correct_prediction/total) * 100

    print('Test accuracy after {0:.0f} epoch(s): {1:.1f}%'.format(epoch+1, acc))
    print()

In [46]:
# Function for validating one epoch
def validate_epoch():
  net.train(False) # set to evaluation mode
  running_loss = 0
  running_acc = 0

  # Iterate over validation data
  for batch_index, data in enumerate(val_loader):
    inputs, labels = data[0].to(device), data[1].to(device)

    with torch.no_grad(): # not worried about gradients here as not training
      outputs = net(inputs).squeeze((-1, -2)) # shape [batch size, 2]
      correct_prediction = torch.sum(labels == torch.argmax(outputs, dim = 1)).item() # check how many images the model predicted correctly
      running_acc += correct_prediction/batch_size # update the accuracy
      loss = loss_func(outputs, labels) # compare model predictions with labels
      running_loss += loss.item() # update the loss

  avg_loss = running_loss/len(val_loader)
  avg_acc = (running_acc/len(val_loader)) * 100

  print('Val Loss: {0:.3f}, Val Accuracy: {1:.1f}%'.format(avg_loss, avg_acc))

  print('-----------------------------------------------------------')
  print()

In [None]:
# Training loop

num_epochs = 5

for i in range(num_epochs):
  print('Epoch:', i+1, '\n')

  train_epoch()
  test_epoch(i)
  validate_epoch()

print('Finished Training')

Epoch: 1 

