### import and implement model

In [None]:
# packages
import torch.nn as nn
import torch.optim as optim
from torch.nn import Conv2d, LeakyReLU, MaxPool2d, Linear # import them seperetly because I think its more readable
from torchvision.io import read_image
from torch.utils.data import DataLoader, random_split
from torchmetrics.classification import BinaryPrecision
import torch
import pandas as pd
import numpy as np
import os 
import gc
from torchvision.transforms.v2 import Resize

In [2]:
# set wd
os.chdir('C:/Users/dalto/OneDrive/Pictures/Documents/Projects/Fracture')

# set seed
torch.manual_seed(126)

<torch._C.Generator at 0x2806bcf0110>

import data and load onto tensors

In [3]:
# image dataset class
class ImageDataset():
    def __init__(self, class_dir, img_dir): # load labels and the img 
        self.img_labels = pd.read_csv(class_dir)
        self.img_dir = img_dir
        self.transforms = Resize((224,224), antialias=True)

    def __len__ (self): # len of labels for image
        return len(self.img_labels)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        try:
            image = read_image(img_path)  # read image
        except Exception:
            # Get a random index and try to load a different image
            new_idx = torch.randint(0, len(self.img_labels), (1,)).item()
            img_path = os.path.join(self.img_dir, self.img_labels.iloc[new_idx, 0])
            image = read_image(img_path)
            
        image = self.transforms(image)
        label = self.img_labels.iloc[idx, 1] # read label
        return image, label


In [None]:
# directories for classes and images
class_dir = './images/class_ids.csv' 
image_dir = './images/resize_data' 

# load dataset using made class function
data_set = ImageDataset(class_dir, image_dir) # create dataset

# set train and test set
train_size = int(.75 * len(data_set)) 
test_size = int(.125 * len(data_set)) 
val_size = len(data_set) - test_size - train_size

# random split 
training, testing, val = random_split(data_set, [train_size, test_size, val_size])

In [5]:
batch_size = 32 # batch size, up from 32
device = "cuda" # use gpu

In [6]:
# load the split data on the tensors
train_loader = DataLoader(training, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(testing, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val, batch_size=batch_size, shuffle=True)

In [7]:
print(torch.cuda.is_available()) # check if gpu is working correctly

True


model without transfer learning (will add just wanted to build one from stratch)

In [8]:
# I chose to use a CNN for the image classifcation. 
# CNNs preform much better then feed forward networks for image classification tasks and are still easy to implement

class CNN (nn.Module):
    def __init__(self):
        super().__init__()
        # 1 input layer, to 32 filters, stride of one pixel, 3x3 kernal, padding = (kernal - 1)/2

        # 3 layers like this
        self.conv1 = Conv2d(in_channels=1, out_channels=64, stride=1, kernel_size=3, padding=1) 
        self.Lrelu1 = LeakyReLU() # better preformance on average compared to regular ReLu
        self.bn1 = nn.BatchNorm2d(64)# prevent exploding / vanishing gradients
        self.conv2 = Conv2d(in_channels=64, out_channels=64, stride=1, kernel_size=5, padding=2)
        self.Lrelu2 = LeakyReLU()
        self.bn2 = nn.BatchNorm2d(64)
        self.maxpool1 = MaxPool2d(kernel_size = 2, stride = 2)

        # 3 layers like this
        self.conv3 = Conv2d(in_channels=64, out_channels=128, stride=1, kernel_size=5, padding=2) 
        self.Lrelu3 = LeakyReLU() # better preformance on average compared to regular ReLu
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = Conv2d(in_channels=128, out_channels=128, stride=1, kernel_size=7, padding=3)
        self.Lrelu4 = LeakyReLU()
        self.bn4 = nn.BatchNorm2d(128)
        self.maxpool2 = MaxPool2d(kernel_size = 2, stride = 2)

        # 3 layers like this
        self.conv5 = Conv2d(in_channels=128, out_channels=256, stride=1, kernel_size=5, padding=2) 
        self.Lrelu5 = LeakyReLU() # better preformance on average compared to regular ReLu
        self.bn5 = nn.BatchNorm2d(256)
        self.conv6 = Conv2d(in_channels=256, out_channels=256, stride=1, kernel_size=3, padding=1)
        self.Lrelu6 = LeakyReLU()
        self.bn6 = nn.BatchNorm2d(256)
        self.maxpool3 = MaxPool2d(kernel_size = 2, stride = 2)

        # reduce the number of features
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))

        # fully connected layers
        self.fc1 = Linear(256, 512) # pool layer reduces 
        self.relu1 = LeakyReLU()
        self.fc3 = Linear(512, 256)
        self.relu3 = LeakyReLU()
        self.dropout2 = nn.Dropout(p = 0.4)
        self.fc4 = nn.Linear(256, 1) # one output


        # this reduces overfitting making one neuron not resposnible for everything, also improves regualrization

    def forward(self, x):
        # Pass through Convolutional Block 1
        x = self.conv1(x)
        x = self.Lrelu1(x)
        x = self.bn1(x)
        x = self.conv2(x)
        x = self.Lrelu2(x)
        x = self.bn2(x)
        x = self.maxpool1(x)

        # Pass through Convolutional Block 2
        x = self.conv3(x)
        x = self.Lrelu3(x)
        x = self.bn3(x)
        x = self.conv4(x)
        x = self.Lrelu4(x)
        x = self.bn4(x)
        x = self.maxpool2(x)

        # Pass through Convolutional Block 3
        x = self.conv5(x)
        x = self.Lrelu5(x)
        x = self.bn5(x)
        x = self.conv6(x)
        x = self.Lrelu6(x)
        x = self.bn6(x)
        x = self.maxpool3(x)

        # pooling layer
        x = self.global_avg_pool(x)

        # flatten
        x = torch.flatten(x, 1)

        # Pass through Fully Connected Layers
        x = self.fc1(x)
        x = self.relu1(x) 
        x = self.fc3(x)
        x = self.dropout2(x) # Apply dropout
        x = self.relu3(x)

        # Pass through the final Linear layer
        x = self.fc4(x)

        # Apply Dropout, sigmoind applied in loss function, better preformance

        return x


model training loop

In [9]:
# run model on GPU
model = CNN()
model.to(device)

CNN(
  (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (Lrelu1): LeakyReLU(negative_slope=0.01)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(64, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (Lrelu2): LeakyReLU(negative_slope=0.01)
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(64, 128, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (Lrelu3): LeakyReLU(negative_slope=0.01)
  (bn3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(128, 128, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3))
  (Lrelu4): LeakyReLU(negative_slope=0.01)
  (bn4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilati

In [10]:
learning_rate = 0.1 # standard learning rate
loss_fn = nn.BCEWithLogitsLoss() # add activation function in here
optimizer = optim.AdamW(model.parameters(), lr=learning_rate) # adam w has better preformance, weight decay is applied sep, 
# leads to more peak ram may have to reduce batch size

In [11]:
def val_set_test():
    model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    n_rounds = 0
    with torch.no_grad():  # Disable gradient computation
        for batch_idx, (image, label) in enumerate(val_loader):
            # set up
            image = image.to(device).float()
            label = label.to(device).float()
            
            # make predictions on val
            predictions = model(image)
            predictions = predictions.squeeze()
            loss = loss_fn(predictions, label)
            
            # loss
            val_loss += loss.item()
            n_rounds = batch_idx + 1
            
    model.train()  # Set model back to training mode
    return val_loss / n_rounds
    

In [12]:
def training(epochs):
    model.train()  # Set the model to training mode
    training_losses = []  # To track loss history
    min_loss = float('inf') # es min loss
    patience = 0 # es track
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1) # learning rate decay, 0.1 is default gammma
    for i in range(epochs):
        tloss = 0.0
        n_rounds = 0
        for batch_idx, (image, label) in enumerate(train_loader):
            # Move data to device and ensure correct data types
            image = image.to(device).float()
            label = label.to(device).float()
        
            # Zero gradients
            optimizer.zero_grad()
        
            # Forward pass
            predictions = model(image)
            predictions = predictions.squeeze()
            loss = loss_fn(predictions, label)
        
            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Accumulate loss
            tloss += loss.item()
            n_rounds = batch_idx + 1
            avg_loss = tloss / n_rounds

            # Update training loss stats
            training_losses.append(avg_loss)

        # early stopping
        val_loss = val_set_test() # return loss for validation set
        
        if val_loss > min_loss:
            patience += 1

        if val_loss < min_loss:
            min_loss = val_loss
            patience = 0

        if patience > 10: # early stopping after 5 rounds
            print(f"early stopping at round {i}")
            return model, training_losses

        scheduler.step() # step for learning rate decay
        print(f"{avg_loss} is the average loss at epoch {i}") # still provide avg and epoch after through early stopping

    
    return model, training_losses



In [13]:
# memory manage
torch.cuda.empty_cache()
gc.collect()

36

In [None]:
t_model, t_loss = training(300)

In [None]:
results = [] # array for res
model.eval() # set model to eval mode, disables dropout

# testing loop
for images, labels in test_loader:
    images = images.to(device).float() # load onto device with correct data type
    labels = labels.to(device).float()

    predictions = model(images) # make predections on image in model
    preds_1_0 = torch.where(predictions>0.5, 1, 0)
    correct = (preds_1_0 == labels)
    results.append(correct.detach().cpu().numpy().mean())

    precision = BinaryPrecision().to(device) # binary percison
    result = precision(preds_1_0.squeeze(), labels)
    

accuracy = np.array(results).mean()
print(accuracy)
print(f"Binary Precision: {result}")
