In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import sys
import time
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score

In [2]:
sys.path.append('../../IPYNB/DataPreparation')

from Preparation import CustomDataLoader

In [3]:
# training data properties
MEAN = [0.5, 0.5, 0.5]
STD = [0.5, 0.5, 0.5]
BATCH_SIZE = 4

In [4]:
# Instantiate the CustomDataLoader class for training
train_loader = CustomDataLoader(data_path="../../all_data/data", batch_size=BATCH_SIZE, dataset_type="train", mean=MEAN, std=STD).data_loader
val_loader = CustomDataLoader(data_path="../../all_data/data", batch_size=BATCH_SIZE, dataset_type="val", mean=MEAN, std=STD).data_loader
test_loader = CustomDataLoader(data_path="../../all_data/data", batch_size=BATCH_SIZE, dataset_type="test", mean=MEAN, std=STD).data_loader

In [5]:
class SpatialAttention(nn.Module):
    def __init__(self, in_channels):
        super(SpatialAttention, self).__init__()
        # Define pooling layers
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        
        # Define 1x1 convolutional layer
        self.conv = nn.Conv2d(in_channels * 2, 1, kernel_size=1)
        
        # Sigmoid activation
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Compute average and max pooled representations
        avg_out = self.avg_pool(x)
        max_out = self.max_pool(x)
        
        # Concatenate average and max pooled representations
        pooled_features = torch.cat((avg_out, max_out), dim=1)
        
        # Apply 1x1 convolutional layer
        conv_out = self.conv(pooled_features)
        
        # Apply sigmoid activation
        attn_scores = self.sigmoid(conv_out)

        # Apply attention weights to the input features
        attn_output = x * attn_scores

        return attn_output, attn_scores


# Define the architecture of the model
class CustomModel(nn.Module):
    def __init__(self):
        super(CustomModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.spatial_attention = SpatialAttention(in_channels=64)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))

        # Apply spatial attention
        x, _ = self.spatial_attention(x)

        x = x.view(-1, 64 * 8 * 8)  # Reshape to (batch, features)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Instantiate the model
model = CustomModel().cuda()

# Define the loss function and the optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
# Assuming you have a validation dataset and a validation loader named val_loader
best_val_loss = float('inf')

num_epochs = 25
device = 'cuda'

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        # Move data to the appropriate device if using GPU
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = loss_function(output, target)
        loss.backward()
        # Optionally clip gradients
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch + 1, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

    # Validation
    model.eval()  # Set the model to evaluation mode
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            val_loss += loss_function(output, target).item()  # Accumulate validation loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    val_loss /= len(val_loader.dataset)
    accuracy = correct / len(val_loader.dataset) * 100
    print('Validation set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
        val_loss, correct, len(val_loader.dataset), accuracy))
    
    torch.save(model.state_dict(), f'epoch_{epoch}.pt')

    # Save the model if validation loss has decreased
    if val_loss < best_val_loss:
        print('Validation loss decreased ({:.6f} --> {:.6f}). Saving model...'.format(
            best_val_loss, val_loss))
        torch.save(model.state_dict(), 'best_model.pt')
        best_val_loss = val_loss


In [None]:
# Instantiate your model
model = CustomModel().cuda()

# Load the saved model parameters
model.load_state_dict(torch.load('../../models/05-25-2024/best_model.pt'))

# Test the best model
correct = 0
total = 0
with torch.no_grad():
    for data, target in val_loader:
        output = model(data.cuda())
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted.cpu() == target).sum().item()

print('Accuracy of the model on the 18 val images: {} %'.format(100 * correct / total))

In [16]:
# Defining F2 Score evaluation metric
def f2_score(precision, recall):
    
    # Beta value for F2 score
    beta = 2
    
    # F2 score formula
    f2_score = (1 + beta**2) * (precision * recall) / (beta**2 * precision + recall)
    
    return f2_score

In [None]:
# Predict the classes for the test dataset
predicted_classes = []
y_true = []
total_inference_time = 0  # To keep track of the total inference time
num_images = 0  # To count the number of images processed

with torch.no_grad():
    for data, target in test_loader:
        # Move data to GPU
        data = data.cuda()
        
        # Record the start time before processing the batch
        start_time = time.time()
        
        # Model inference for the entire batch
        output = model(data)
        
        # Record the end time after the batch inference
        end_time = time.time()
        
        # Calculate inference time for the batch and add to total
        total_inference_time += (end_time - start_time)
        
        # Get the predicted classes
        _, predicted = torch.max(output.data, 1)
        
        # Append predicted classes
        predicted_classes.extend(predicted.cpu().tolist())  # Move to CPU before converting to list
        
        # Append true labels
        y_true.extend(target.cpu().tolist())  # Move target to CPU before converting to list
        
        # Update the total number of images
        num_images += data.size(0)

# Calculate the average inference time per image
average_inference_time = total_inference_time / num_images
print(f"Average inference time per image: {average_inference_time:.6f} seconds")

# Calculate different metrics
accuracy = accuracy_score(y_true, predicted_classes)
precision = precision_score(y_true, predicted_classes, average='weighted')
recall = recall_score(y_true, predicted_classes, average='weighted')
f2 = f2_score(precision, recall)

print('Accuracy:', accuracy)
print('Precision:', precision)
print('Recall:', recall)
print('F2-score:', f2)
