# ResNet50

In [1]:
import torch
torch.set_default_device('cuda')

In [2]:
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms

class CustomDataset(Dataset):
  def __init__(self, csv_file, transform=None):
    self.data = pd.read_csv(csv_file)
    self.transform = transform
    self.label_dict = {"not_fish": 0, "fish": 1}

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    # get image file paths from csv file
    img_name = self.data.iloc[idx, 0]
    # get the image from path
    image = Image.open(img_name)
    # get the image label from csv file
    label = self.data.iloc[idx, 1]
    # encode the image label accordingly
    encoded_label = self.label_dict[label]
    if self.transform:
      image = self.transform(image)

    return image, encoded_label


# Define transforms for the input images
transform = transforms.Compose(
    [
        transforms.Resize((244, 244)),
        transforms.ToTensor(),
        # normalization for color images
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)


# Load train and test datasets
train_set = CustomDataset(
    csv_file="train.csv",
    transform=transform,
)
test_set = CustomDataset(
    csv_file="test.csv",
    transform=transform,
)

In [3]:
# Print set sizes
print(len(train_set))
print(len(test_set))

3407
852


In [4]:
from torch.utils.data import DataLoader
batch_size = 32

# Define data loaders
train_loader = DataLoader(train_set, 
                          batch_size=batch_size, 
                          shuffle=True, 
                          pin_memory=False, 
                          generator=torch.Generator(device='cuda'))

test_loader = DataLoader(test_set, 
                         batch_size=batch_size, 
                         shuffle=False, 
                         pin_memory=False, 
                         generator=torch.Generator(device='cuda'))

In [5]:
import torch
# Check if GPU is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

# Define the ResNet50 model
class ResNet50(nn.Module):
    def __init__(self):
        super(ResNet50, self).__init__()
        # Load the pre-trained ResNet50 model
        self.resnet50 = models.resnet50(weights='ResNet50_Weights.DEFAULT')
        # Replace the final fully connected layer with a new one for binary classification
        num_features = self.resnet50.fc.in_features
        # Binary classification, so output size is 1
        self.resnet50.fc = nn.Linear(num_features, 1)
        
        # Sigmoid activation function for binary classification
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Forward pass through the network
        x = self.resnet50(x)
        # Apply sigmoid activation function to get probabilities
        x = self.sigmoid(x)
        return x

# Initialize the model
model = ResNet50()

# Move the model to the appropriate device
model = model.to(device)

In [7]:
# Define loss function and optimizer
loss_fn = nn.BCELoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [9]:
import time

# Training loop
num_epochs = 10

start_time = time.time()
for epoch in range(num_epochs):
    epoch_start_time = time.time()

    # Set the model to training mode
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images).squeeze(dim=1)
        loss = loss_fn(outputs, labels.float())

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Track the accuracy and loss
        predicted = torch.round(outputs)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct / total

    epoch_time_minutes = (time.time() - epoch_start_time) / 60.0
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}, Time: {epoch_time_minutes:.2f} minutes")
    
    # Save model checkpoints for each  epoch
    torch.save(model.state_dict(), f"Checkpoints/resNet50_model_bth{batch_size}_eph{num_epochs}_{epoch+1}.pth")

total_training_time_minutes = (time.time() - start_time) / 60.0
print(f"Total training time: {total_training_time_minutes:.2f} minutes")

Epoch [1/10], Loss: 0.0367, Accuracy: 0.9894, Time: 0.62 minutes
Epoch [2/10], Loss: 0.0589, Accuracy: 0.9847, Time: 0.62 minutes
Epoch [3/10], Loss: 0.0406, Accuracy: 0.9868, Time: 0.63 minutes
Epoch [4/10], Loss: 0.0275, Accuracy: 0.9912, Time: 0.63 minutes
Epoch [5/10], Loss: 0.0263, Accuracy: 0.9935, Time: 0.62 minutes
Epoch [6/10], Loss: 0.0253, Accuracy: 0.9912, Time: 0.63 minutes
Epoch [7/10], Loss: 0.0225, Accuracy: 0.9924, Time: 0.63 minutes
Epoch [8/10], Loss: 0.0107, Accuracy: 0.9959, Time: 0.62 minutes
Epoch [9/10], Loss: 0.0068, Accuracy: 0.9977, Time: 0.63 minutes
Epoch [10/10], Loss: 0.0022, Accuracy: 0.9994, Time: 0.62 minutes
Total training time: 6.26 minutes


# Test Model

In [15]:
# Load the saved model state from the .pth file
saved_model_path = "Checkpoints/resNet50_model_bth32_eph10_10.pth"
model.load_state_dict(torch.load(saved_model_path))

<All keys matched successfully>

In [16]:
# Set the model to evaluation mode
model.eval()

# Define a function to evaluate the model on the test loader
def evaluate_model(model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images).squeeze(dim=1)
            predicted = torch.round(outputs)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total
    return accuracy

# Evaluate the model on the test loader
test_accuracy = evaluate_model(model, test_loader)
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Accuracy: 1.0000
