In [1]:
# import packages here
import cv2
import numpy as np
import matplotlib.pyplot as plt
import glob
import random
import time

import torch
import torchvision
import torchvision.transforms as transforms

from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import albumentations
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import torch.optim as optim
import os
from google.colab import drive

  check_for_updates()


In [2]:
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
data_dir = '/content/gdrive/My Drive/data/'

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [5]:
# Define the transformations for testing
test_transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Assuming you have a test/validation dataset in the same format as your training data
test_dataset = datasets.ImageFolder(root=os.path.join(data_dir, 'test/'), transform=test_transform)

# Create DataLoader for test/validation
testloader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=8)

In [6]:
# Define the MoCo model
class MoCo(nn.Module):
    def __init__(self, base_encoder, dim=128, K=131072, m=0.999, T=0.07):
        super(MoCo, self).__init__()

        self.K = K  # Size of the queue
        self.m = m  # Momentum coefficient
        self.T = T  # Temperature for contrastive loss

        # Create the query encoder
        self.encoder_q = base_encoder(num_classes=dim)
        self.encoder_k = base_encoder(num_classes=dim)

        # Initialize the momentum encoder with the same weights
        self._init_momentum_encoder()

        # Create the queue
        self.register_buffer("queue", torch.randn(dim, K))
        self.queue = nn.functional.normalize(self.queue, dim=0)
        self.register_buffer("queue_ptr", torch.zeros(1, dtype=torch.long))

    def _init_momentum_encoder(self):
        # Initialize momentum encoder parameters to match the query encoder
        for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()):
            param_k.data.copy_(param_q.data)  # Copy weights
            param_k.requires_grad = False  # Do not update momentum encoder with gradients

    @torch.no_grad()
    def _momentum_update_key_encoder(self):
        # Update the momentum encoder with momentum
        for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()):
            param_k.data = param_k.data * self.m + param_q.data * (1. - self.m)

    @torch.no_grad()
    def _dequeue_and_enqueue(self, keys):
        # Update the queue
        batch_size = keys.shape[0]  # Use the actual batch size

        ptr = int(self.queue_ptr)
        assert self.K % batch_size == 0 or batch_size <= self.K  # Ensure the batch size fits in the queue

        # Replace the keys at ptr (dequeue and enqueue), handling small batches correctly
        end_ptr = min(ptr + batch_size, self.K)
        effective_batch_size = end_ptr - ptr

        self.queue[:, ptr:end_ptr] = keys[:effective_batch_size].T  # Dequeue and enqueue the keys
        ptr = (ptr + effective_batch_size) % self.K  # Move the pointer

        self.queue_ptr[0] = ptr

    def forward(self, im_q, im_k):
        # Compute query features
        q = self.encoder_q(im_q)  # Query image
        q = F.normalize(q, dim=1)

        # Compute key features using momentum encoder
        with torch.no_grad():
            self._momentum_update_key_encoder()  # Update the key encoder
            k = self.encoder_k(im_k)  # Key image
            k = F.normalize(k, dim=1)

        # Compute logits and contrastive loss
        # Positive logits: dot product between query and key
        l_pos = torch.einsum('nc,nc->n', [q, k]).unsqueeze(-1)

        # Negative logits: dot product between query and all negatives in the queue
        l_neg = torch.einsum('nc,ck->nk', [q, self.queue.clone().detach()])

        # Logits: [N, 1 + K]
        logits = torch.cat([l_pos, l_neg], dim=1)

        # Apply temperature scaling
        logits /= self.T

        # Labels: positive key is the first in the logit list
        labels = torch.zeros(logits.shape[0], dtype=torch.long).to(im_q.device)

        # Dequeue and enqueue the current mini-batch of keys
        self._dequeue_and_enqueue(k)

        return logits, labels


In [7]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Skip connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity  # Add skip connection (residual)
        return F.relu(out)

# Define the ResNetCNN model with Dropout (used as the base encoder)
class ResNetCNN(nn.Module):
    def __init__(self, num_classes=16):  # Adjust num_classes based on your dataset
        super(ResNetCNN, self).__init__()
        self.layer1 = ResidualBlock(3, 64)  # Start with 3 channels (RGB input)
        self.layer2 = ResidualBlock(64, 128, stride=2)
        self.layer3 = ResidualBlock(128, 256, stride=2)
        self.layer4 = ResidualBlock(256, 512, stride=2)
        self.fc = nn.Linear(512 * 16 * 16, 128)  # MoCo latent dimension
        self.classifier = nn.Sequential(
            nn.Linear(128, num_classes),  # Add classifier layer
            nn.Dropout(0.5)  # 50% dropout to avoid overfitting
        )

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc(x)  # Pass through MoCo latent fc layer
        return x

    def forward_for_classification(self, x):
        x = self.forward(x)  # Get the output from the encoder
        x = self.classifier(x)  # Pass through the classification layer
        return x

In [8]:
moco_model = MoCo(ResNetCNN, dim=128, K=131072, m=0.999, T=0.07)
criterion = nn.CrossEntropyLoss()

moco_model.load_state_dict(torch.load('/content/gdrive/My Drive/data/moco_model.pth'))
moco_model.to(device)

  moco_model.load_state_dict(torch.load('/content/gdrive/My Drive/data/moco_model.pth'))


MoCo(
  (encoder_q): ResNetCNN(
    (layer1): ResidualBlock(
      (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential(
        (0): Conv2d(3, 64, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (layer2): ResidualBlock(
      (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shor

In [9]:
# Evaluation function
def evaluate_model(model, testloader, criterion):
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():  # Disable gradient calculation for inference
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass through the query encoder and classification layer
            outputs = model.encoder_q.forward_for_classification(inputs)

            # Compute loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Compute accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = running_loss / len(testloader)
    accuracy = 100 * correct / total
    print(f'Test Loss: {avg_loss:.4f}, Test Accuracy: {accuracy:.2f}%')
    return avg_loss, accuracy

evaluate_model(moco_model, testloader, criterion)

Test Loss: 1.2948, Test Accuracy: 67.75%


(1.2947775295802526, 67.75)