# Concealed Pistol Detection

### Imports

In [None]:
#imports for neural network
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchsummary import summary

#imports for vision
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
from torchvision.datasets import ImageFolder
from torchvision.utils import make_grid

#imports for preparing dataset
import os
import zipfile
#from google.colab import files
#from google.colab import drive

#imports for visualizations
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

from custom_gun_dataset import CustomDataset

### Preparing Dataset

In [None]:
#applying a transformation to the entire dataset, standardizing it
#reshapes the image to guarantee a 384 size
#grayscales the image
transform = transforms.Compose([
    transforms.Resize((384, 384)), 
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

#relative directory path
dataset_dir = '../Data/CombinedData'
dataset = CustomDataset(root_directory = dataset_dir, transform = transform, categories = ['with gun', 'without gun'])

#initialize train, validation, and test sets
train_size = int(0.8 * len(dataset))              #80% for training
val_size = int(0.1 * len(dataset))                #10% for validation
test_size = len(dataset) - train_size - val_size  #10% (remainder) for test
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

#print sizes of datasets
print(f"Train size: {len(train_dataset)}")
print(f"Valid size: {len(val_dataset)}")
print(f"Test size: {len(test_dataset)}")

#set the dataloaders to use the datasets
batch_size = 4
train_dl = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_dl = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

#variable to define number of classes
num_classes = 2


### Setting device to GPU 

In [None]:
#setting device to gpu if availible
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

#confirm device
print("Device:", device)


### Visualizing the transformed dataset

In [None]:
#plotting a grid of a single batch
def show_batch(dataLoader):
    for images, labels, boxes in dataLoader:
        fig, ax = plt.subplots(figsize = (15, 10))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images, nrow = 8).permute(1, 2, 0))
        break

show_batch(train_dl)

### Defining the model

In [None]:
#definition for a CNN
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()

        #model takes input of 384 x 384 x 1
        #make sure in_channels aligns with out_channels from the previous layer

        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 32, kernel_size = 3, stride = 1, padding = 1)
        self.bn1 = nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = 3, stride = 1, padding = 1)
        self.bn2 = nn.BatchNorm2d(32)

        self.conv3 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 3, stride = 1, padding = 1)
        self.bn3 = nn.BatchNorm2d(64)

        self.conv4 = nn.Conv2d(in_channels = 64, out_channels = 64, kernel_size = 3, stride = 1, padding = 1)
        self.bn4 = nn.BatchNorm2d(64)

        self.conv5 = nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 3, stride = 1, padding = 1)
        self.bn5 = nn.BatchNorm2d(128)

        self.conv6 = nn.Conv2d(in_channels = 128, out_channels = 128, kernel_size = 3, stride = 1, padding = 1)
        self.bn6 = nn.BatchNorm2d(128)

        self.conv7 = nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 3, stride = 1, padding = 1)
        self.bn7 = nn.BatchNorm2d(256)

        self.conv8 = nn.Conv2d(in_channels = 256, out_channels = 128, kernel_size = 3, stride = 1, padding = 1)
        self.bn8 = nn.BatchNorm2d(128)

        self.conv9 = nn.Conv2d(in_channels = 128, out_channels = 128, kernel_size = 3, stride = 1, padding = 1)
        self.bn9 = nn.BatchNorm2d(128)

        #16 x 16 x 128 comes from sizing, the pool in each layer cut dimensionality in half, 256 is out channels
        self.fc1 = nn.Linear(in_features = (24 * 24 * 128), out_features = 64)
        self.fc2 = nn.Linear(in_features = 64, out_features = 32)
        #self.fc3 = nn.Linear(in_features = 32, out_features = 32)
        #self.fc4 = nn.Linear(in_features = 32, out_features = 32)
        self.fc_class = nn.Linear(in_features = 32, out_features = num_classes)
        self.fc_bbox = nn.Linear(in_features = 32, out_features = 4)

        self.pool = nn.MaxPool2d(2,2)
        self.leaky_relu = nn.LeakyReLU()
        self.sigmoid = nn.Sigmoid()
        self.dropout = nn.Dropout(0.5)


    def forward(self, input):
        #forward pass first block, no pool
        output = self.conv1(input)
        output = self.bn1(output)
        output = self.leaky_relu(output)
        #output_pool_1 = self.pool(output)

        #forward pass second block, pool
        output = self.conv2(output)
        output = self.bn2(output)
        output = self.leaky_relu(output)
        output_pool_2 = self.pool(output)

        #forward pass third block, no pool
        output = self.conv3(output_pool_2)
        output = self.bn3(output)
        output = self.leaky_relu(output)
        #output_pool_3 = self.pool(output)

        #skip connection, connect 1-3
        #skip1 = self.pool(input) #downsample
        #if skip1.shape[1] != output.shape[1]:  # if channels differ, adjust
        #    skip1 = nn.functional.pad(skip1, (0, 0, 0, 0, 0, output.shape[1] - skip1.shape[1]))
        #output += skip1
        output_skip_1 = output

        #forward pass fourth block, pool
        output = self.conv4(output_skip_1)
        output = self.bn4(output)
        output = self.leaky_relu(output)
        output_pool_4 = self.pool(output)

        #forward pass fifth block, no pool
        output = self.conv5(output_pool_4)
        output = self.bn5(output)
        output = self.leaky_relu(output)
        #output_pool_5 = self.pool(output)

        #forward pass sixth block, pool
        output = self.conv6(output)
        output = self.bn6(output)
        output = self.leaky_relu(output)
        output_pool_6 = self.pool(output)

        #skip connection, connect 4-6
        skip2 = self.pool(self.pool(output_skip_1)) #downsample
        if skip2.shape[1] != output_pool_6.shape[1]:  # if channels differ, adjust
            skip2 = nn.functional.pad(skip2, (0, 0, 0, 0, 0, output_pool_6.shape[1] - skip2.shape[1]))
        output_pool_6 += skip2
        output_skip_2 = output_pool_6

        #forward pass seventh block, no pool
        output = self.conv7(output_pool_6)
        output = self.bn7(output)
        output = self.leaky_relu(output)

        #forward pass eigth block, pool
        output = self.conv8(output)
        output = self.bn8(output)
        output = self.leaky_relu(output)
        output_pool_8 = self.pool(output)

        #forward pass ninth block, no pool
        output = self.conv9(output_pool_8)
        output = self.bn9(output)
        output = self.leaky_relu(output)

        #skip connection, connect 6-9
        skip3 = self.pool(output_skip_2) # downsample input to match spatial dims
        if skip3.shape[1] != output.shape[1]:  # if channels differ, adjust
            skip3 = nn.functional.pad(skip3, (0,0,0,0, 0, output.shape[1] - skip3.shape[1]))
        output += skip3
        output_skip_3 = output

        #forward pass flattening
        output = output.view(-1, 128 * 24 * 24)

        #forward pass fully connected layers
        output = self.fc1(output)
        output = self.leaky_relu(output)
        output = self.dropout(output)

        output = self.fc2(output)
        output = self.leaky_relu(output)
        output = self.dropout(output)

        #output = self.fc3(output)
        #output = self.leaky_relu(output)
        #output = self.dropout(output)

        #output = self.fc4(output)
        #output = self.leaky_relu(output)
        #output = self.dropout(output)

        output_class = self.fc_class(output)
        
        output_bbox = self.sigmoid(output)
        output_bbox = self.fc_bbox(output_bbox)

        return output_class, output_bbox

model = Network().to(device)

#channels, height, width
summary(model,(1, 384, 384))



### Defining Loss and Optimizer


In [None]:
#cross entropy loss
criterion = nn.CrossEntropyLoss()
#adam optimizer
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

### Training loop of the model

In [None]:
#number of epochs and early stopping
epochs = 500
early_stopping_patience = 150
early_stopping_counter = 0

#using validation loss as the best model
best_val_loss = float('inf')
best_epoch = 0

#arrays to save each metric during training
train_loss_values = []
train_acc_values = []

val_loss_values = []
val_acc_values = []

#training loop
for epoch in range(epochs):
    #turn on training mode
    model.train()

    #storing loss and accuracy per batch
    train_losses = []
    train_acc = []

    for data, label, boxes in train_dl:
        #moving data to the right device
        data = data.to(device)
        label = label.to(device)

        #clear gradients, forward pass, loss, backward and update
        optimizer.zero_grad()
        output, _ = model(data)
        loss = criterion (output, label)
        loss.backward()
        optimizer.step()

        #storing loss and accuracy over the batch
        accuracy = (output.argmax(dim = 1) == label).float().mean().item()
        train_losses.append(loss.item())
        train_acc.append(accuracy)
    
    #average loss and acc over the epoch
    current_epoch_loss = sum(train_losses) / len(train_losses)
    current_epoch_acc = sum(train_acc) / len(train_acc)

    #storing current epoch into overall loss and acc
    train_loss_values.append(current_epoch_loss)
    train_acc_values.append(current_epoch_acc)

    #validation testing
    #turn on evaluation mode
    model.eval()

    #storing loss and accuracy 
    val_losses = []
    val_acc = []
    
    #disabling gradient tracking
    with torch.no_grad():
        for data, label, boxes in val_dl:
            #moving data to the right device
            data = data.to(device)
            label = label.to(device)

            #running forward pass and calculating loss
            val_output, _ = model(data)
            val_loss = criterion(val_output, label)

            #storing loss and accuracy
            accuracy = (val_output.argmax(dim = 1) == label).float().mean().item()
            val_losses.append(val_loss.item())
            val_acc.append(accuracy)

    #averaging loss and accuracy 
    current_epoch_val_loss = sum(val_losses) / len(val_losses)
    current_epoch_val_acc = sum(val_acc) / len(val_acc)

    #storing current epoch into overall
    val_loss_values.append(current_epoch_val_loss)
    val_acc_values.append(current_epoch_val_acc)

    #checking for model improvement
    if current_epoch_val_loss < best_val_loss:
        #if best epoch, save it
        torch.save(model.state_dict(), "best_concealed_model.pth")
        
        #best epoch information
        best_val_loss = current_epoch_val_loss
        best_epoch = epoch + 1
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1

    #output current epoch information
    print(f"Epoch: {epoch+1}")
    print(f"Training Accuracy: {current_epoch_acc:.3f}, Validation Accuracy: {current_epoch_val_acc:.3f}")
    print(f"Training Loss: {current_epoch_loss:.3f}, Validation Loss: {current_epoch_val_loss:.3f}")
    print(f"Current Best Epoch: {best_epoch}\n")

    #stop if not improving
    if early_stopping_counter >= early_stopping_patience:
        print(f"Stopping early after {early_stopping_patience} epochs with no improvement")
        break


### Plotting Model Training Results 

In [None]:
#epoch count
epoch_count = []
for i in range(len(train_loss_values)):
    epoch_count.append(i + 1)

#setting figure size
plt.figure(figsize = (8, 4))

#plotting loss
plt.subplot(1, 2, 1)
plt.title("Loss")
plt.plot(epoch_count, train_loss_values, label='Training Loss')
plt.plot(epoch_count, val_loss_values, label='Validation Loss', linestyle='--')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

#plotting accuracy
plt.subplot(1, 2, 2)
plt.title("Accuracy")
plt.plot(epoch_count, train_acc_values, label='Training Acc')
plt.plot(epoch_count, val_acc_values, label='Validation Acc', linestyle='--')
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()

#showing figures
plt.tight_layout()
plt.show()

### Testing Model


In [None]:
# Loading best saved model
model.load_state_dict(torch.load('best_concealed_model.pth', weights_only=True))

# Set to evaluation mode
model.eval() 

# Disabling gradient tracking
with torch.no_grad(): 
    total_accuracy = 0.0
    total_test_loss = 0.0
    num_batches = len(test_dl)

    for data, label, bbox in test_dl:
        # Loading data and labels to proper device
        data = data.to(device)
        label = label.to(device)

        # Forward pass, loss, accuracy calculation
        output, _ = model(data)
        loss = criterion(output, label)
        
        # Get predictions (argmax over the output logits)
        preds = output.argmax(dim=1)
        
        accuracy = (preds == label).float().mean()

        total_test_loss += loss.item()
        total_accuracy += accuracy.item() 
        
        # Show misclassified images
        for idx in range(len(preds)):
            img_tensor = data[idx].cpu()
            img_np = img_tensor.squeeze().numpy()  # For grayscale

            plt.imshow(img_np, cmap='gray')
            plt.title(f"Predicted: {preds[idx].item()}, Actual: {label[idx].item()}")
            plt.axis('off')
            plt.show()

    # Calculate the average loss and accuracy over all batches
    avg_loss = total_test_loss / num_batches
    avg_accuracy = total_accuracy / num_batches

    print(f"Test Accuracy: {avg_accuracy:.3f}")
    print(f"Test Loss : {avg_loss:.3f}")
