In [None]:
import torchvision
import os
import glob
import random
import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import scipy.io as scp
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import skimage.io as skio
import scipy.io as scp
from torch.utils.data import Dataset, DataLoader, Subset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
class CNNBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1):
        super(CNNBlock, self).__init__()
        padding = (kernel_size - 1) // 2  # Calculate padding to maintain the same output size

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [None]:
class MyCNN(nn.Module):
    def __init__(self, block, num_blocks, num_classes=102):
        super(MyCNN, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.layer5 = self._make_layer(block, 512, num_blocks[4], stride=2)
        self.layer6 = self._make_layer(block, 512, num_blocks[5], stride=2)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))  # Ensures output is (1, 1)
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, 3, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.layer6(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [68]:
IMAGE_W = 208
IMAGE_H = 208
BATCH_SIZE = 64
NUM_EPOCHS = 100
LR = 0.0001

# Device
device = "cuda" if torch.cuda.is_available() else "cpu"

model = MyCNN(CNNBlock, [2, 2, 2, 2, 2, 2], num_classes=102)
model = model.to(device)

# Optimizer
optimizer = optim.Adam(model.parameters(), lr = LR)

# Loss Function
criterion = nn.CrossEntropyLoss()


In [66]:
flower_transform = transforms.Compose([
    transforms.Resize((IMAGE_W, IMAGE_H)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

augmented_transform = transforms.Compose([
    transforms.Resize((IMAGE_W, IMAGE_H)),
    transforms.RandomHorizontalFlip(p=0.5),  # Flip the images randomly with a probability of 0.5
    transforms.RandomRotation(15),  # Randomly rotate images in the range (-15, 15) degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # Randomly change brightness and contrast
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

root = '/content/drive/MyDrive/ActualFlowers/jpg'

train_set = torchvision.datasets.Flowers102(root = root, split = 'train', transform = augmented_transform, target_transform = None, download = False)
test_set = torchvision.datasets.Flowers102(root = root, split = 'test', transform = flower_transform, target_transform = None, download = False)
validation_set = torchvision.datasets.Flowers102(root = root, split = 'val', transform = flower_transform, target_transform = None, download = False)

train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = DataLoader(validation_set, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
def check_accuracy(loader, model, num_classes=102):
    num_correct = 0
    num_samples = 0
    model.eval()  # Set the model to evaluation mode

    # Initialize the confusion matrix
    confusion_matrix = torch.zeros(num_classes, num_classes, dtype=torch.int64)

    with torch.no_grad():  # Do not calculate gradients
        for x, y in tqdm.tqdm(loader, desc="Evaluating"):  # Wrap the loader with tqdm
            x = x.to(device)  # Move data to the device
            y = y.to(device)  # Move labels to the device
            scores = model(x)  # Compute model output
            _, predictions = scores.max(1)  # Get the predicted classes
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

            # Update confusion matrix
            for t, p in zip(y.view(-1), predictions.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

    model.train()  # Set the model back to training mode
    accuracy = float(num_correct) / num_samples  # Calculate accuracy

    # Print overall accuracy
    print(f"Got {num_correct} / {num_samples} with accuracy {accuracy * 100:.2f}%")

    # Return accuracy and confusion matrix for further analysis
    return accuracy, confusion_matrix

In [None]:
def evaluate(model, loader, device):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    correct_predictions = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            correct_predictions += (predicted == labels).sum().item()

    avg_loss = total_loss / len(loader)
    accuracy = correct_predictions / len(loader.dataset)
    model.train()  # Set the model back to training mode
    return avg_loss, accuracy

In [None]:
best_val_loss = float('inf')
epochs_no_improve = 0
n_epochs_stop = 10
for epoch in range(NUM_EPOCHS):
    running_loss = 0
    with tqdm.tqdm(train_loader, unit='batch') as tepoch:
        for index, (x, y) in enumerate(tepoch):
            x = x.to(device)
            y = y.to(device)

            # Forward pass
            y_hat = model(x)
            loss = criterion(y_hat, y)
            running_loss += loss.item()

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            tepoch.set_postfix(loss=loss.item())
    #scheduler.step()
    # Compute average training loss
    avg_training_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch}: Training loss: {running_loss:.4f}")

    # Evaluate on the validation set
    if epoch >= 10:
      validation_loss, validation_accuracy = evaluate(model, validation_loader, device)
      print(f"Epoch {epoch}: Validation loss: {validation_loss:.4f}, Validation accuracy: {validation_accuracy*100:.4f}%")
      # Check for early stopping
      if validation_loss < best_val_loss:
          best_val_loss = validation_loss
          epochs_no_improve = 0
          # Save the model if validation loss improves
          torch.save(model.state_dict(), 'best_model_v4.pth')
      else:
          epochs_no_improve += 1
          print(f"No improvement in validation loss for {epochs_no_improve} epochs.")

      # Early stopping condition
      if epochs_no_improve == n_epochs_stop and epoch > 30:
          print("Early stopping triggered")
          break

In [None]:
model.load_state_dict(torch.load('best_model_v4.pth'))

<All keys matched successfully>

In [None]:
check_accuracy(test_loader, model)

Evaluating: 100%|██████████| 97/97 [01:02<00:00,  1.55it/s]

Got 2500 / 6149 with accuracy 40.66%





(0.4065701740120345,
 tensor([[ 6,  0,  0,  ...,  0,  0,  0],
         [ 0, 23,  0,  ...,  0,  0,  1],
         [ 0,  0,  4,  ...,  0,  0,  0],
         ...,
         [ 0,  0,  0,  ..., 12,  0,  4],
         [ 0,  0,  0,  ...,  1,  8,  0],
         [ 0,  0,  0,  ...,  0,  0, 21]]))