### Import Required Modules and Functions

In [14]:
import numpy as np

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T

from torch.utils.data import Dataset, DataLoader
from PIL import Image

import os
import json

import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

### Set Device to GPU

In [2]:
USE_GPU = True
dtype = torch.float32 

if USE_GPU and torch.cuda.is_available(): 
    device = torch.device('cuda')
else:
    device = torch.device('cpu')


### Prepare Data Loaders
##### Ensure That WildCam_3classes is in the correct location
##### Run Brightness_subset_maker.ipynb to create "brightest" image folder

In [3]:
class WildCamDataset(Dataset):
    def __init__(self, img_paths, annotations, transform=T.ToTensor(), directory='WildCam_3classes/train'):
        self.img_paths = img_paths
        self.annotations = annotations
        self.transform = transform
        self.dir = directory

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index):
        ID = '{}/{}'.format(self.dir, self.img_paths[index])
        img = Image.open(ID).convert('RGB')
        X = self.transform(img)             
        y = self.annotations['labels'][self.img_paths[index]]
        loc = self.annotations['locations'][self.img_paths[index]]
        return X, y, loc
    
normalize = T.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
transform = T.Compose([
            T.Resize((112,112)),
            T.ToTensor(),
            normalize
])

param_train = {
    'batch_size': 256,       
    'shuffle': True
    }

param_valtest = {
    'batch_size': 256,
    'shuffle': False
    }

annotations = json.load(open('WildCam_3classes/annotations.json'))

train_images = sorted(os.listdir('WildCam_3classes/train'))
train_dset = WildCamDataset(train_images, annotations, transform, directory='WildCam_3classes/train')
train_loader = DataLoader(train_dset, **param_train)

val_images = sorted(os.listdir('WildCam_3classes/val'))
val_dset = WildCamDataset(val_images, annotations, transform, directory="WildCam_3classes/val")
val_loader = DataLoader(val_dset, **param_valtest)

test_images = sorted(os.listdir('WildCam_3classes/test'))
test_dset = WildCamDataset(test_images, annotations, transform, directory="WildCam_3classes/test")
test_loader = DataLoader(test_dset, **param_valtest)

brightest_labels = json.load(open('WildCam_3classes/brightest_labels.json'))

bright_images = sorted(os.listdir('WildCam_3classes/brightest'))
bright_dset = WildCamDataset(bright_images, brightest_labels, transform, directory="WildCam_3classes/brightest")
bright_loader = DataLoader(bright_dset, **param_valtest)

### Define ResNet+ Model

In [15]:
# Hyperparameters
channel_1 = 64
channel_2 = 128
channel_3 = 256
hidden_layer_1 = 256
hidden_layer_2 = 128
learning_rate = 1e-3
epochs = 5
dropout_rate = 0.4

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None, use_se=False):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        self.use_se = use_se
        if self.use_se:
            self.se_block = SEBlock(out_channels)

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.use_se:
            out = self.se_block(out)
        out += identity
        out = self.relu(out)
        return out

class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.fc1 = nn.Linear(channels, channels // reduction)
        self.fc2 = nn.Linear(channels // reduction, channels)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        batch, channels, _, _ = x.size()
        y = x.mean((2, 3))  
        y = self.fc1(y)
        y = self.relu(y)
        y = self.fc2(y)
        y = self.sigmoid(y).view(batch, channels, 1, 1)
        return x * y

class BrightResNet18(nn.Module):
    def __init__(self, num_classes=3):
        super(BrightResNet18, self).__init__()
        self.in_channels = 64  
        self.conv1 = nn.Conv2d(3, 64, kernel_size=9, stride=2, padding=4, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(BasicBlock, 64, 2)
        self.layer2 = self._make_layer(BasicBlock, 128, 2, stride=2)
        self.layer3 = self._make_layer(BasicBlock, 256, 2, stride=2, use_se=True)
        self.layer4 = self._make_layer(BasicBlock, 512, 2, stride=2, use_se=True)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * BasicBlock.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1, use_se=False):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample, use_se=use_se))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels, use_se=use_se))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


model = BrightResNet18(3)  

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

### Perform Training
##### Same Across all 3 models
##### If models stored locally, skip

In [6]:
def train(model, optimizer, loader_train, epochs=5, print_every=1):
    iteration_loss = []  
    model = model.to(device=device)

    for e in range(epochs):
        for t, (x, y, _) in enumerate(loader_train):
            model.train()
            x, y = x.to(device), y.to(device)

            scores = model(x)
            loss = criterion(scores, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            iteration_loss.append(loss.item())  # Track loss for each iteration
            if t % print_every == 0:
                print(f"Epoch {e}, Iteration {t}, loss = {loss.item():.4f}")
        
        print(f"Epoch {e} complete.")
    
    # Plot training loss per iteration
    plt.plot(iteration_loss, label='Training Loss (Pretrained)')
    plt.xlabel('Iterations')
    plt.ylabel('Loss')
    plt.title('Training Loss Across Iterations (Pretrained Model)')
    plt.legend()
    plt.show()

In [None]:
train(model, optimizer, train_loader, 5, 1)

In [8]:
torch.save(model, 'resnet_plus.pth')

### Load stored model

In [8]:
resnet_plus_loaded = torch.load('resnet_plus.pth')
resnet_plus_loaded = resnet_plus_loaded.to(device)

  resnet_plus_loaded = torch.load('resnet_plus.pth')


### Collect Classification Report and Confusion Matrix for Model on each data_loader

In [9]:
def evaluate(model, loader, device):
    model.eval()  
    y_true = []
    y_pred = []

    with torch.no_grad():
        for x, y, _ in loader:  
            x, y = x.to(device), y.to(device)

            scores = model(x)
            _, preds = scores.max(1)

            y_true.extend(y.cpu().numpy()) 
            y_pred.extend(preds.cpu().numpy())  

    print("Classification Report:")
    print(classification_report(y_true, y_pred, target_names=['Rabbit', 'Bobcat', 'Cat']))

    return y_true, y_pred

In [None]:
print("ResNet+ Test Set Evaluation:")
y_true_custom, y_pred_custom = evaluate(resnet_plus_loaded, test_loader, device)

print("ResNet+ Validation Set Evaluation:")
y_true_custom, y_pred_custom = evaluate(resnet_plus_loaded, val_loader, device)

print("ResNet+ Bright Set Evaluation:")
y_true_custom, y_pred_custom = evaluate(resnet_plus_loaded, bright_loader, device)

In [11]:
def check_accuracy_and_confusion_matrix(loader, model):
    num_correct = 0
    num_samples = 0
    all_preds = []
    all_labels = []

    class_names = ["rabbit", "bobcat", "cat"]
    num_classes = len(class_names)
    class_correct = np.zeros(num_classes)  
    class_samples = np.zeros(num_classes)  

    model.eval()  
    with torch.no_grad(): 
        for x, y, _ in loader:
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            scores = model(x)
            _, preds = scores.max(1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())
            
            num_correct += (preds == y).sum().item()
            num_samples += preds.size(0)
            
            for i in range(num_classes):
                class_correct[i] += ((preds == y) & (y == i)).sum().item()
                class_samples[i] += (y == i).sum().item()

    overall_acc = float(num_correct) / num_samples
    print(f'Overall accuracy: {num_correct} / {num_samples} ({100 * overall_acc:.2f}%)')
    
    for i in range(num_classes):
        if class_samples[i] > 0:
            class_acc = float(class_correct[i]) / class_samples[i]
            print(f'Accuracy for class {i} ({class_names[i]}): {class_correct[i]} / {class_samples[i]} ({100 * class_acc:.2f}%)')
        else:
            print(f'No samples for class {i} ({class_names[i]})')

    cm = confusion_matrix(all_labels, all_preds)

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap="Blues")
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.show()

In [None]:
print("ResNet+ Test Set Confusion Matrix")
check_accuracy_and_confusion_matrix(test_loader, resnet_plus_loaded)

print("ResNet+ Validation Set Confusion Matrix")
check_accuracy_and_confusion_matrix(val_loader, resnet_plus_loaded)

print("ResNet+ Bright Set Confusion Matrix")
check_accuracy_and_confusion_matrix(bright_loader, resnet_plus_loaded)