<a href="https://colab.research.google.com/github/shahriarivari/NoisyBirdClassification/blob/main/pre_training_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# basic imports

In [None]:
import sys
import os
from typing import Tuple, List
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torch.nn.functional as F
from torch.utils.data import Dataset,DataLoader
from sklearn.metrics import f1_score
from huggingface_hub import snapshot_download
import matplotlib.pyplot as plt
from PIL import Image
import random

# downloading the dataset

In [None]:
# device agnostic code
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
dataset_id = "RayanAi/Noisy_birds"
# Set the local directory where you want to store the dataset
local_dataset_dir = "./Noisy_birds"  # You can change this path to your desired location

# Create the directory if it doesn't exist
os.makedirs(local_dataset_dir, exist_ok=True)

# Suppress the output by redirecting it to os.devnull
with open(os.devnull, 'w') as fnull:
    # Save the original stdout
    original_stdout = sys.stdout
    try:
        # Redirect stdout to devnull to suppress output
        sys.stdout = fnull
        # Download the dataset and store it locally
        snapshot_download(repo_id=dataset_id, local_dir=local_dataset_dir, repo_type="dataset")
    finally:
        # Restore the original stdout
        sys.stdout = original_stdout

# Print message when download is complete
print("Dataset downloaded completely.")

# Calculate and print the total size of the downloaded files
total_size = 0
for dirpath, dirnames, filenames in os.walk(local_dataset_dir):
    for f in filenames:
        fp = os.path.join(dirpath, f)
        total_size += os.path.getsize(fp)

# Convert size to MB and print
print(f"Total size of downloaded files: {total_size / (1024 * 1024):.2f} MB")

# Get the absolute path of the dataset directory and print it
dataset_abs_path = os.path.abspath(local_dataset_dir)
print(f"Dataset has been saved at: [{dataset_abs_path}]")

In [None]:
!unzip -qo ./Noisy_birds/Noisy_birds.zip -d ./Noisy_birds/

# creatin dataset and datalaode objects

In [None]:
# SimCLR Dataset that returns two views of each image
class SimCLRDataset(torch.utils.data.Dataset):
    def __init__(self, folder_path, transform):
      self.folder_path = folder_path
      self.image_paths = [os.path.join(folder_path, fname) for fname in os.listdir(folder_path) if fname.endswith(('.jpg', '.png', '.jpeg'))]
      self.transform = transform

    def __len__(self):
      return len(self.image_paths)

    def __getitem__(self, index):
      img_path = self.image_paths[index]
      img = Image.open(img_path).convert("RGB")

      # Apply the same transform twice to get two views of the image
      img1 = self.transform(img)
      img2 = self.transform(img)

      return img1, img2

# Define the data transformations (used to generate two views)
simclr_transform = transforms.Compose([
    transforms.RandomResizedCrop(64),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.4, 0.4, 0.4, 0.4),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Example image paths (modify according to your dataset)
folder_path = 'path/to/your/images/folder'
dataset = SimCLRDataset(folder_path, simclr_transform)

# Create DataLoader
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# creating the model

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class SimpleCNN(nn.Module):
  def __init__(self):
    super(SimpleCNN, self).__init__()
    self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
    self.pool = nn.MaxPool2d(2, 2)
    self.fc = nn.Linear(256 * 16 * 16, 128)  # Output of the CNN encoder

    # Projection head for SimCLR (2-layer MLP)
    self.projector = nn.Sequential(
      nn.Linear(128, 128),
      nn.ReLU(),
      nn.Linear(128, 64)  # Projection to a lower dimension (SimCLR uses a smaller space)
    )

  def forward(self, x):
    # Forward pass through CNN encoder
    x = self.pool(F.relu(self.conv1(x)))
    x = self.pool(F.relu(self.conv2(x)))
    x = self.pool(F.relu(self.conv3(x)))
    x = x.view(x.size(0), -1)  # Flatten the output
    x = self.fc(x)  # Pass through fully connected layer (output features)
    x = F.normalize(x, dim=1)  # Normalize the features

    # Apply projection head (used for SimCLR pretraining)
    x = self.projector(x)
    return x

# define the loss function

In [None]:
import torch

def info_nce_loss(features, batch_size, temperature, device):
  # Create labels for positive pairs (augmented views of the same image)
  labels = torch.cat([torch.arange(batch_size) for _ in range(2)], dim=0)
  labels = (labels.unsqueeze(0) == labels.unsqueeze(1)).float().to(device)

  # Normalize the features
  features = F.normalize(features, dim=1)

  # Compute the similarity matrix (dot product between all pairs of features)
  similarity_matrix = torch.matmul(features, features.T)

  # Remove self-similarity (diagonal elements)
  mask = torch.eye(labels.shape[0], dtype=torch.bool).to(device)
  labels = labels[~mask].view(labels.shape[0], -1)
  similarity_matrix = similarity_matrix[~mask].view(similarity_matrix.shape[0], -1)

  # Select positive and negative pairs
  positives = similarity_matrix[labels.bool()].view(labels.shape[0], -1)
  negatives = similarity_matrix[~labels.bool()].view(similarity_matrix.shape[0], -1)

  # Concatenate positive and negative pairs
  logits = torch.cat([positives, negatives], dim=1)
  labels = torch.zeros(logits.shape[0], dtype=torch.long).to(device)

  # Scale by temperature
  logits /= temperature

  return logits, labels


# training loop

In [None]:
import torch.optim as optim

# Initialize model, optimizer, and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Hyperparameters
num_epochs = 10
batch_size = 64
temperature = 0.5

# Training loop
for epoch in range(num_epochs):
  model.train()

  for img1, img2 in dataloader:
    # Move data to the device (GPU or CPU)
    img1, img2 = img1.to(device), img2.to(device)

    # Pass both views through the encoder and projection head
    features_img1 = model(img1)
    features_img2 = model(img2)

    # Concatenate features from both views
    features = torch.cat([features_img1, features_img2], dim=0)

    # Compute the InfoNCE loss
    logits, labels = info_nce_loss(features, batch_size=batch_size, temperature=temperature, device=device)

    # Compute the cross-entropy loss
    loss = criterion(logits, labels)

    # Backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
