In [None]:
#Importing libraries

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from torchsummary import summary
from tqdm import tqdm
import os
import shutil
from sklearn.model_selection import train_test_split
import seaborn as sns
from torchvision.datasets import ImageFolder
from sklearn.metrics import confusion_matrix

In [None]:
#Create ResNet50 baseline without using pretraining weights

In [None]:
test_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Testing/testing'
train_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Training'
val_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Validation'

# Print the number of images in each category for the training set
print('Training set:')
for class_name in os.listdir(train_dir):
    class_dir = os.path.join(train_dir, class_name)
    print(class_name + ':', len(os.listdir(class_dir)))

# Print the number of images in each category for the validation set
print('Validation set:')
for class_name in os.listdir(val_dir):
    class_dir = os.path.join(val_dir, class_name)
    print(class_name + ':', len(os.listdir(class_dir)))

batch_size = 32
epochs = 10

# # Create an instance of the VGG16 model
# vgg = models.vgg16(pretrained=True)

# # Freeze the layers in the VGG16 model
# for param in vgg.parameters():
#     param.requires_grad = False

model = torchvision.models.resnet50()

# Modify the last linear layer to include dropout
model.fc = torch.nn.Sequential(
    torch.nn.Linear(in_features=2048, out_features=2048),
    torch.nn.ReLU(),
    torch.nn.Dropout(0.8),  # Add dropout layer with dropout rate of 0.5
    torch.nn.Linear(in_features=2048, out_features=1),
    torch.nn.Sigmoid()
)

# Move the model to the GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define the loss function, optimizer, and scheduler
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.01)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Create transforms for the training and validation data
train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
val_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Create datasets for the training and validation data
train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(val_dir, transform=val_transforms)

# Create data loaders for the training and validation data
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)


In [None]:
# Lists to store the training and testing accuracy and loss values
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

# Train the model
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.float().unsqueeze(1))
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        predicted = outputs > 0.5
        total += labels.size(0)
        correct += (predicted == labels.float().unsqueeze(1)).sum().item()
    train_loss = running_loss / len(train_dataset)
    train_acc = correct / total
    
    # Evaluate the model on the validation set
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels.float().unsqueeze(1))
            val_loss += loss.item() * inputs.size(0)
            predicted = outputs > 0.5
            total += labels.size(0)
            correct += (predicted == labels.float().unsqueeze(1)).sum().item()
    val_loss = val_loss / len(val_dataset)
    val_acc = correct / total
    
    # Append the training and validation accuracy and loss values
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)

    print(f'Epoch {epoch+1}/{epochs} -- Training Loss: {train_loss:.4f} -- Training Accuracy: {train_acc:.4f} -- Validation Loss: {val_loss:.4f} -- Validation Accuracy: {val_acc:.4f}')


In [None]:
test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_dataset = ImageFolder(test_dir, transform=test_transforms)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

# Initialize empty lists to store true labels and predicted labels
true_labels = []
predicted_labels = []

# Evaluate the model on the test set
model.eval()
test_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels.float().unsqueeze(1))
        test_loss += loss.item() * inputs.size(0)
        predicted = torch.round(torch.sigmoid(outputs))  # Round the output values
        total += labels.size(0)
        correct += (predicted == labels.float().unsqueeze(1)).sum().item()

        # Append true labels and predicted labels to the respective lists
        true_labels.extend(labels.tolist())
        predicted_labels.extend(predicted.tolist())

test_loss = test_loss / len(test_dataset)
test_acc = correct / total
print(f'Test Loss: {test_loss:.4f} -- Test Accuracy: {test_acc:.4f}')

# Plot the training and validation accuracy curves
plt.figure()
plt.plot(range(1, epochs+1), train_accuracies, label='Training Accuracy')
plt.plot(range(1, epochs+1), val_accuracies, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.show()

# Plot the training and validation loss curves
plt.figure()
plt.plot(range(1, epochs+1), train_losses, label='Training Loss')
plt.plot(range(1, epochs+1), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.show()

# Calculate the confusion matrix
confusion_matrix_test = confusion_matrix(true_labels, predicted_labels)
print(confusion_matrix_test)

# Plot the confusion matrix using Seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g')
plt.title('Confusion Matrix (Test Set)')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

In [None]:
summary(model, input_size=(3,224,224))

In [None]:
torch.save(model.state_dict(), "C:/Users/Kranti/Desktop/EE541_project/model_state.pth")

In [None]:
#Visualization before fine-tuning for conv1 layer

In [None]:
def visualize_hook(module, input, output):
    num_channels = output.size(1)
    num_rows = (num_channels - 1) // 8 + 1

    plt.figure(figsize=(15, 15))
    for i in range(num_channels):
        plt.subplot(num_rows, 8, i + 1)
        plt.imshow(output[0, i].detach().cpu().numpy(), cmap="gray")
        plt.axis("off")
    plt.show()

# Choose a specific layer and register the hook
layer_to_visualize = model.conv1
hook = layer_to_visualize.register_forward_hook(visualize_hook)

# Run a single image through the model
image = torch.randn(1, 3, 224, 224)  # Replace this with a real image from the dataset
_ = model(image)

hook.remove()  # Remove the hook

In [None]:
#Fine-tuning of above ResNet50 network

In [None]:
from tqdm import tqdm
# Set the batch size and number of epochs
batch_size = 32
num_epochs = 10

# Update the dataset paths for finetuning and testing
finetuning_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Testing/finetuning'
testing_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Testing/testing'
train_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Training'
val_dir = 'C:/Users/Kranti/Desktop/EE541_project/archive_new/Validation'

# Create datasets for the finetuning and testing data
finetuning_dataset = datasets.ImageFolder(finetuning_dir, transform=train_transforms)
testing_dataset = datasets.ImageFolder(testing_dir, transform=val_transforms)

# Create data loaders for the finetuning and testing data
finetuning_loader = DataLoader(finetuning_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
testing_loader = DataLoader(testing_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Define the learning rates and layers to unfreeze
learning_rates = [1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9, 1e-10, 1e-11, 1e-12]
layer_unfreeze = list(range(0, -11, -1))

# Move the model and loss function to the device
model = model.to(device)
criterion = criterion.to(device)

# Create lists to store the losses and accuracies
train_losses = {i: [] for i in layer_unfreeze}
val_losses = {i: [] for i in layer_unfreeze}
train_accs = {i: [] for i in layer_unfreeze}
val_accs = {i: [] for i in layer_unfreeze}
best_val_acc = {i: 0.0 for i in layer_unfreeze}

# Create dictionary to store the best validation accuracy for each layer configuration
best_val_acc = {i: 0.0 for i in layer_unfreeze}

# Create dictionaries to store the true labels and predicted labels for each layer configuration
true_labels = {i: [] for i in layer_unfreeze}
predicted_labels = {i: [] for i in layer_unfreeze}

# Train the model for each learning rate and layer unfreezing combination
for i in layer_unfreeze:
    # Unfreeze the layers
    for param in model.parameters():
        param.requires_grad = False
    for param in list(model.parameters())[i:]:
        param.requires_grad = True

    # Set the optimizer for this set of layers
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rates[i])

    # Train the model for the specified number of epochs
    for epoch in range(num_epochs):
        # Training loop
        running_train_loss = 0.0
        running_train_corrects = 0
        model.train()
        for inputs, labels in tqdm(finetuning_loader, desc=f'Train Epoch {i}.{epoch}'):
            inputs = inputs.to(device)
            labels = labels.to(device).float()
            optimizer.zero_grad()
            outputs = model(inputs)
            labels = labels.view(-1, 1)  # Reshape target tensor
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_train_loss += loss.item() * inputs.size(0)
            running_train_corrects += torch.sum(torch.round(outputs) == labels)
        epoch_train_loss = running_train_loss / len(finetuning_loader.dataset)
        epoch_train_acc = running_train_corrects / len(finetuning_loader.dataset)
        train_losses[i].append(epoch_train_loss)
        train_accs[i].append(epoch_train_acc)

        # Validation loop
        running_val_loss = 0.0
        running_val_corrects = 0
        model.eval()
        with torch.no_grad():
            for inputs, labels in tqdm(test_loader, desc=f'Val Epoch {i}.{epoch}'):
                inputs = inputs.to(device)
                labels = labels.to(device).float()
                outputs = model(inputs)
                labels = labels.view(-1, 1)  # Reshape target tensor
                loss = criterion(outputs, labels)
                running_val_loss += loss.item() * inputs.size(0)
                running_val_corrects += torch.sum(torch.round(outputs) == labels)
                
                # Append true labels and predicted labels
                true_labels[i] += labels.tolist()
                predicted_labels[i] += torch.round(outputs).tolist()

            epoch_val_loss = running_val_loss / len(test_loader.dataset)
            epoch_val_acc = running_val_corrects / len(test_loader.dataset)
            val_losses[i].append(epoch_val_loss)
            val_accs[i].append(epoch_val_acc)
            print(f'Val Loss: {epoch_val_loss:.4f} Val Acc: {epoch_val_acc:.4f}')
    
            # Update best validation accuracy
            if epoch_val_acc > best_val_acc[i]:
                best_val_acc[i] = epoch_val_acc
                torch.save(model.state_dict(), f'best_model_{i}.pth')

for i in layer_unfreeze:
    confusion_matrix_val = confusion_matrix(true_labels[i], predicted_labels[i])
    plt.figure(figsize=(8, 6))
    sns.heatmap(confusion_matrix_val, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.title(f"Confusion Matrix (Layer Unfreeze: {i})")
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.show()

In [None]:
# Plot the learning and accuracy curves for each layer configuration
for i in layer_unfreeze:
    if i < 0:
        lr_name = f'lr: {learning_rates[-i-1]}'
    else:
        lr_name = f'lr: {learning_rates[i]}'

    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))
    fig.suptitle(f'Layer Unfreeze {i} ({lr_name})')

    # Plot the learning curves
    ax[0].plot(train_losses[i], label='Train')
    ax[0].plot(val_losses[i], label='Validation')
    ax[0].set_xlabel('Epoch')
    ax[0].set_ylabel('Loss')
    ax[0].legend()
    ax[0].grid(True)

    # Plot the accuracy curves
    ax[1].plot(train_accs[i], label='Train')
    ax[1].plot(val_accs[i], label='Validation')
    ax[1].set_xlabel('Epoch')
    ax[1].set_ylabel('Accuracy')
    ax[1].legend()
    ax[1].grid(True)

    plt.show()

In [None]:
#Visualization after fine-tuning for conv1 layer

In [None]:
def visualize_hook(module, input, output):
    num_channels = output.size(1)
    num_rows = (num_channels - 1) // 8 + 1

    plt.figure(figsize=(15, 15))
    for i in range(num_channels):
        plt.subplot(num_rows, 8, i + 1)
        plt.imshow(output[0, i].detach().cpu().numpy(), cmap="gray")
        plt.axis("off")
    plt.show()

# Choose a specific layer and register the hook
layer_to_visualize = model.conv1
hook = layer_to_visualize.register_forward_hook(visualize_hook)

# Run a single image through the model
image = torch.randn(1, 3, 224, 224)  # Replace this with a real image from the dataset
_ = model(image)

hook.remove()  # Remove the hook