In [2]:
import torch
import torch.utils.data
from torchvision import transforms
from torchvision.datasets import CIFAR100
import numpy as np
from torchvision.models import resnet18, ResNet18_Weights
from tqdm import tqdm
!pip install torchmetrics
from torchmetrics import Accuracy
import matplotlib.pyplot as plt
import seaborn as sns




[notice] A new release of pip is available: 24.1.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip
  from .autonotebook import tqdm as notebook_tqdm


In [15]:
class DataModule():
    def __init__(self,batch_size):
        super().__init__()
        self.batch_size=batch_size
        # We define some augmentations that we would like to apply during training
        self.train_transform = transforms.Compose([
            transforms.Resize(256),
            transforms.RandomCrop(224, 4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])
        # During validation we need to only normalize and resize
        self.val_transform = transforms.Compose([
            transforms.Resize(224),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])
        # This function sets up our datasets
    # which includes downloading and applying the augmentations
    def prepare_data(self):
        from torchvision import datasets, transforms
        #transform = transforms.Compose([
         #   transforms.Resize((224, 224)),
          #  transforms.ToTensor(),
        #])
        self.train_set = datasets.ImageFolder(root=r"C:/Users/nehan/Downloads/archive/Dataset/train" ,transform=self.train_transform)
        self.val_set = datasets.ImageFolder(root=r"C:/Users/nehan/Downloads/archive/Dataset/val" ,transform=self.train_transform)

    # This functions sets up the data loaders
    def setup(self):
        self.train_data_loader = torch.utils.data.DataLoader(self.train_set, batch_size=self.batch_size, shuffle=True)
        self.val_data_loader = torch.utils.data.DataLoader(self.val_set, batch_size=self.batch_size, shuffle=False)

    # This is simply a getter function for the training data loader
    def train_dataloader(self):
        return self.train_data_loader

    # This is simply a getter function for the validation data loader
    def val_dataloader(self):
        return self.val_data_loader

def load_image_from_url(path):
            img = Image.open(path)  # Load image directly from URL
            img = img.convert('RGB')  # Ensure 3 channels for ResNet
            img = transform(img)  # Apply transformations
            img = img.unsqueeze(0)
            return img


In [4]:
import torch
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights
from torchmetrics import Accuracy

class DLModel(nn.Module):
    def __init__(self, num_classes=8, pretrained=True, num_unfreeze_layers=0):
        super().__init__()
        # Load the ResNet50 backbone, pretrained if specified
        if pretrained:
            self.backbone = resnet50(weights=ResNet50_Weights.DEFAULT)
            self.backbone.requires_grad_(False)
            
            # Unfreeze some layers if requested
            if num_unfreeze_layers > 0:
                num_layers = 0
                for name, module in self.named_modules():
                    if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear) or isinstance(module, nn.BatchNorm2d):
                        num_layers += 1
                start_unfreezing_counter, counter = num_layers - num_unfreeze_layers, 0
                for name, module in self.named_modules():
                    if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear) or isinstance(module, nn.BatchNorm2d):
                        counter += 1
                    if counter >= start_unfreezing_counter:
                        module.requires_grad_(True)
        else:
            # Initialize ResNet50 from scratch
            self.backbone = resnet50(weights=None)
            self.backbone.requires_grad_(True)
        self.train_acc1, self.val_acc1 = Accuracy(task="multiclass", num_classes=num_classes), Accuracy(task="multiclass", num_classes=num_classes)
        #self.train_acc5, self.val_acc5 = Accuracy(task="multiclass", num_classes=num_classes, top_k=5), Accuracy(task="multiclass", num_classes=num_classes, top_k=5)
        # Remove the original FC layer (ResNet50 output size is 2048)
        in_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity()  # Remove the original FC layer
        
        # Add more Conv2D layers after the ResNet backbone
        self.additional_convs = nn.Sequential(
            nn.Conv2d(in_channels=2048, out_channels=1024, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(1024),
            nn.Conv2d(in_channels=1024, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            # Remove AdaptiveAvgPool2d here
        )

        
        # Fully connected layer for classification
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 256),  # Linear layer with 512 input (from conv output) to 256 neurons
            nn.ReLU(),
            nn.Dropout(0.5),  # Dropout to prevent overfitting
            nn.Linear(256, num_classes)  # Final output layer for classification into 'num_classes'
        )

        # Define loss function and metrics
        self.criterion = nn.CrossEntropyLoss()
        self.train_acc1 = Accuracy(task="multiclass", num_classes=num_classes)
        self.val_acc1 = Accuracy(task="multiclass", num_classes=num_classes)
    
    def configure_optimizers(self, lr, momentum, max_epochs):
        self.optimizer = torch.optim.SGD(self.parameters(), lr=lr, momentum=momentum)
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, T_max=max_epochs)

    def forward(self, x):
        # Forward pass through the ResNet backbone
        x = self.backbone(x)  # Output shape: [batch_size, 2048]
        
        # Reshape to [batch_size, 2048, 1, 1] for additional conv layers
        x = x.unsqueeze(-1).unsqueeze(-1)  # Add spatial dimensions
    
        # Forward pass through additional conv layers
        x = self.additional_convs(x)
    
        # Flatten and pass through classifier
        out = self.classifier(x)
        return out



    def training_step(self, x, y):
        self.optimizer.zero_grad()
        preds = self.forward(x)
        self.train_acc1.update(preds, y)
        loss = self.criterion(preds, y)
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def on_training_epoch_end(self, loss, epoch):
        acc1 = self.train_acc1.compute().item()
        print(f"Epoch No: {epoch+1}\nTraining Loss: {loss}\n Training Accuracy: {acc1}")
        return acc1

    def validation_step(self, x, y):
        preds = self.forward(x)
        self.val_acc1.update(preds, y)
        loss = self.criterion(preds, y)
        return loss.item()

    def on_validation_epoch_end(self, loss, epoch):
        acc1 = self.val_acc1.compute().item()
        print(f"Validation Loss: {loss}\nValidation Accuracy: {acc1} (Top-1)")
        return acc1

    def reset_metrics(self):
        self.train_acc1.reset(), self.val_acc1.reset()
