# CNN Model using PyTorch for Abdominal Trauma Detection in CT scan

> Detect and classify traumatic abdominal injuries

![](https://www.kaggle.com/competitions/52254/images/header)

In [1]:
# Importing libraries
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
import os 

from tqdm import tqdm

from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import WeightedRandomSampler
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Version Check

In [2]:
print('np:', np.__version__)
print('pd:', pd.__version__)
print('sklearn:', sklearn.__version__)
print('torch:',tf.__version__)

np: 1.26.0
pd: 2.1.3
sklearn: 1.3.2
tf: 2.10.0
tfp: 0.14.0
tfa: 0.22.0
w&b: 0.16.0


## 1. Data Loading and Preprocessing
First, we'll define the code for loading and preprocessing the data. This involves normalizing the tensors and splitting the dataset into training and validation sets.

### Dataset Preparation with Balancing

In [2]:
#define my path in hard drive
# Windows
data_dir = "S:/Capston/data2"
# My 3D tensor files directory
SAVE_FOLDER = "S:/Capston/data4"

In [3]:
class CFG:
    model_name = 'PyTorch_CNN'

    # size of the image
    img_size = [128, 128]

    # seed for data-split, layer init, augs
    seed = 42

    # batch_size, epochs and learning_rate 
    batch_size = 16  
    epochs = 4    
    learning_rate = 0.001  q
    
    # target column
    target_col  = ['bowel_injury', 'kidney_healthy', 'kidney_low', 'kidney_high',
       'liver_healthy', 'liver_low', 'liver_high', 'spleen_healthy',
       'spleen_low', 'spleen_high'] # not using 'bowel_healthy', 'extravasation_healthy', 'extravasation_injury' & 'any_injury'

In [4]:
# Read CSV and making dataframe
df = pd.read_csv(os.path.join(data_dir,'patients_meta_plus_path.csv'))
# sanity check
df.shape

(3147, 16)

In [5]:
df.head()

Unnamed: 0,patient_id,bowel_healthy,bowel_injury,extravasation_healthy,extravasation_injury,kidney_healthy,kidney_low,kidney_high,liver_healthy,liver_low,liver_high,spleen_healthy,spleen_low,spleen_high,any_injury,paths
0,10004,1,0,0,1,0,1,0,1,0,0,0,0,1,1,S:\Capston\data4\10004.pt
1,10005,1,0,1,0,1,0,0,1,0,0,1,0,0,0,S:\Capston\data4\10005.pt
2,10007,1,0,1,0,1,0,0,1,0,0,1,0,0,0,S:\Capston\data4\10007.pt
3,10026,1,0,1,0,1,0,0,1,0,0,1,0,0,0,S:\Capston\data4\10026.pt
4,10051,1,0,1,0,1,0,0,1,0,0,0,1,0,1,S:\Capston\data4\10051.pt


In [6]:
df.columns

Index(['patient_id', 'bowel_healthy', 'bowel_injury', 'extravasation_healthy',
       'extravasation_injury', 'kidney_healthy', 'kidney_low', 'kidney_high',
       'liver_healthy', 'liver_low', 'liver_high', 'spleen_healthy',
       'spleen_low', 'spleen_high', 'any_injury', 'paths'],
      dtype='object')

In [7]:
df = df.drop(columns=['bowel_healthy', 'extravasation_healthy',
       'extravasation_injury','any_injury'])

In [8]:
# Splitting the dataset
remain_df, test_df = train_test_split(df, test_size=0.2, random_state=CFG.seed)  # 20% data for testing
train_df, val_df = train_test_split(remain_df, test_size=0.25, random_state=CFG.seed)  # 60% training, 20% validation

# Balancing the training dataset
class_counts = train_df[['bowel_injury', 'kidney_healthy', 'kidney_low', 'kidney_high',
       'liver_healthy', 'liver_low', 'liver_high', 'spleen_healthy',
       'spleen_low', 'spleen_high']].sum().values
num_samples = len(train_df)
weights = 1. / class_counts
samples_weights = weights[train_df[['bowel_injury', 'kidney_healthy', 'kidney_low', 'kidney_high',
       'liver_healthy', 'liver_low', 'liver_high', 'spleen_healthy',
       'spleen_low', 'spleen_high']].values.argmax(axis=1)]
sampler = WeightedRandomSampler(samples_weights, num_samples)


In [9]:
class CTScanDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None):
        """
        Args:
            dataframe (DataFrame): DataFrame containing the file paths and labels.
            root_dir (string): Directory with all the tensor files.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        
        img_name = os.path.join(self.root_dir, self.dataframe.iloc[idx, -1])
        # ... load the image ...
        image = torch.load(img_name)
        # Add a channel dimension
        image = image.unsqueeze(0)  # This adds a channel dimension
        labels = self.dataframe.iloc[idx, 1:-1].values
        labels = torch.from_numpy(labels.astype('float')).float()

        if self.transform:
            image = self.transform(image)

        return image, labels

# Dataset and DataLoader
train_dataset = CTScanDataset(train_df, SAVE_FOLDER)
val_dataset = CTScanDataset(val_df, SAVE_FOLDER)
test_dataset = CTScanDataset(test_df, SAVE_FOLDER)

train_loader = DataLoader(train_dataset, batch_size=CFG.batch_size, sampler=sampler)
val_loader = DataLoader(val_dataset, batch_size=CFG.batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False)


## 2. CNN Model Architecture
Next, we'll define a simple 3D CNN model architecture.

In [10]:
class Combined3DCNN(nn.Module):
    def __init__(self):
        super(Combined3DCNN, self).__init__()

        # First convolutional layer: 
        # Input channels = 1, output channels = 64, kernel size = 3
        self.conv1 = nn.Conv3d(1, 64, kernel_size=3)
        # Batch normalization for the first convolutional layer
        self.bn1 = nn.BatchNorm3d(64)

        # Second convolutional layer: 
        # Input channels = 64 (from previous layer), output channels = 64, kernel size = 3
        self.conv2 = nn.Conv3d(64, 64, kernel_size=3)
        # Batch normalization for the second convolutional layer
        self.bn2 = nn.BatchNorm3d(64)

        # Third convolutional layer: 
        # Input channels = 64, output channels = 128, kernel size = 3
        self.conv3 = nn.Conv3d(64, 128, kernel_size=3)
        # Batch normalization for the third convolutional layer
        self.bn3 = nn.BatchNorm3d(128)

        # Fourth convolutional layer: 
        # Input channels = 128, output channels = 256, kernel size = 3
        self.conv4 = nn.Conv3d(128, 256, kernel_size=3)
        # Batch normalization for the fourth convolutional layer
        self.bn4 = nn.BatchNorm3d(256)

        # Max pooling layer with a kernel size of 2
        self.pool = nn.MaxPool3d(2)

        # Global average pooling layer to reduce spatial dimensions to 1
        self.global_avg_pool = nn.AdaptiveAvgPool3d(1)

        # Fully connected layers
        # First fully connected layer, input features = 256 (from the last conv layer), output features = 120
        self.fc1 = nn.Linear(256, 120)
        # Second fully connected layer, input features = 120, output features = 84
        self.fc2 = nn.Linear(120, 84)
        # Third fully connected layer, input features = 84, output features = 10 (for 10 output nodes)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        # Apply first conv layer followed by ReLU activation function and pooling
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        # Apply second conv layer followed by ReLU activation function and pooling
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        # Apply third conv layer followed by ReLU activation function and pooling
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        # Apply fourth conv layer followed by ReLU activation function and pooling
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        # Apply global average pooling
        x = self.global_avg_pool(x)
        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)
        # Apply first fully connected layer with ReLU activation
        x = F.relu(self.fc1(x))
        # Apply second fully connected layer with ReLU activation
        x = F.relu(self.fc2(x))
        # Apply third fully connected layer (no activation function here)
        # Assuming use of cross-entropy loss which does not require softmax in the model definition
        x = self.fc3(x)
        return x

# Create the model instance
model2 = Combined3DCNN()

# Print the model's structure
print(model2)

## 3. Training the Model
Finally, we'll set up a basic training loop.

In [None]:
# Define the criterion and optimizer
criterion = nn.BCEWithLogitsLoss() 
optimizer = optim.SGD(model.parameters(), lr=CFG.learning_rate, momentum=0.9)

# Function to calculate accuracy
def calculate_accuracy(outputs, labels):
    predicted = (outputs > 0.5).float()
    correct = (predicted == labels).float()  # convert to float for division
    acc = correct.sum() / len(correct)
    return acc

# Arrays to track training and validation losses and accuracies
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

# Training loop with accuracy tracking
num_epochs = CFG.epochs 
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0.0
    total_train_acc = 0.0

    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch + 1}"):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        total_train_acc += calculate_accuracy(outputs, labels)

    avg_train_loss = total_train_loss / len(train_loader)
    avg_train_acc = total_train_acc / len(train_loader)
    train_losses.append(avg_train_loss)
    train_accuracies.append(avg_train_acc)

    # Validation phase
    model.eval()
    total_val_loss = 0.0
    total_val_acc = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels.float())
            total_val_loss += loss.item()
            total_val_acc += calculate_accuracy(outputs, labels)

    avg_val_loss = total_val_loss / len(val_loader)
    avg_val_acc = total_val_acc / len(val_loader)
    val_losses.append(avg_val_loss)
    val_accuracies.append(avg_val_acc)

    print(f'Epoch {epoch + 1}, Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_acc:.4f}, Val Loss: {avg_val_loss:.4f}, Val Acc: {avg_val_acc:.4f}')

print('Finished Training')

Epoch 1: 100%|███████████████████████████████████████████████████████████████████████| 118/118 [42:56<00:00, 21.84s/it]


Epoch 1, Train Loss: 0.8575, Train Acc: 8.2989, Val Loss: 0.3500, Val Acc: 8.7604


Epoch 2:  93%|██████████████████████████████████████████████████████████████████▏    | 110/118 [39:22<03:08, 23.61s/it]

## 4.Evaluate the Model

In [None]:
# Ensure the model is in evaluation mode

model.eval()

# Initialize counters
label_correct = torch.zeros(10)  # Assuming 10 labels
label_total = torch.zeros(10)

# No gradient is needed for evaluation
with torch.no_grad():
    for data in test_loader:
        images, labels = data
        outputs = model(images)

        # Convert outputs to predictions: apply a threshold for multi-label classification
        predicted = (outputs > 0.5).float()
        for i in range(10):  # Loop over each label
            label_total[i] += len(labels[:, i])  # Total instances of each label
            label_correct[i] += (predicted[:, i] == labels[:, i]).sum()  # Correct predictions for each label

# Calculate the accuracy for each label
label_accuracy = 100 * label_correct / label_total

# Print accuracy for each label
for i in range(10):
    print(f'Accuracy for {CFG.target_col[i]}: {label_accuracy[i]:.2f}%')


In [None]:
# saving my entire model
torch.save(model, "S:/Capston/data/model/" + CFG.model_name + "_" + str(CFG.epochs) + "epoch_" + str(CFG.batch_size) + "batch.pt")

In [None]:
# Creating a figure and two subplots (axes) side by side
fig, ax = plt.subplots(1, 2, figsize=(10, 5))

# Flatten the array of axes to easily index each subplot
ax = ax.ravel()

# Plotting training and validation accuracies on the first subplot
ax[0].plot(train_accuracies, label='Train Accuracy')  # Plot training accuracy
ax[0].plot(val_accuracies, label='Val Accuracy')     # Plot validation accuracy
ax[0].set_title('Model Accuracy')                    # Set title for the first subplot
ax[0].set_xlabel('Epochs')                           # Set x-axis label for the first subplot
ax[0].set_ylabel('Accuracy')                         # Set y-axis label for the first subplot
ax[0].legend()                                       # Display legend for the first subplot

# Plotting training and validation losses on the second subplot
ax[1].plot(train_losses, label='Train Loss')         # Plot training loss
ax[1].plot(val_losses, label='Val Loss')             # Plot validation loss
ax[1].set_title('Model Loss')                        # Set title for the second subplot
ax[1].set_xlabel('Epochs')                           # Set x-axis label for the second subplot
ax[1].set_ylabel('Loss')                             # Set y-axis label for the second subplot
ax[1].legend()                                       # Display legend for the second subplot

# Display the plot
plt.show()