In [1]:
'''import os
from PIL import Image

dataset_path = r"C:/Users/nehan/Downloads/archive/Dataset/train"  # Replace with your dataset path

for root, _, files in os.walk(dataset_path):
    for file in files:
        if file.endswith(('.jpg', '.jpeg', '.png')):  # Add more extensions if needed
            file_path = os.path.join(root, file)
            try:
                img = Image.open(file_path)
                img.verify()  # Verify the image is not corrupted
            except (IOError, SyntaxError) as e:
                print(f'Corrupted image found: {file_path}')
                # Optionally, remove the corrupted image
                os.remove(file_path)
dataset_path = r"C:/Users/nehan/Downloads/archive/Dataset/val"  # Replace with your dataset path

for root, _, files in os.walk(dataset_path):
    for file in files:
        if file.endswith(('.jpg', '.jpeg', '.png')):  # Add more extensions if needed
            file_path = os.path.join(root, file)
            try:
                img = Image.open(file_path)
                img.verify()  # Verify the image is not corrupted
            except (IOError, SyntaxError) as e:
                print(f'Corrupted image found: {file_path}')
                # Optionally, remove the corrupted image
                os.remove(file_path)'''
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
import pickle

In [2]:
import torch
import torch.utils.data
from torchvision import transforms
from torchvision.datasets import CIFAR100
import numpy as np
from torchvision.models import resnet18, ResNet18_Weights
from tqdm import tqdm
!pip install torchmetrics
from torchmetrics import Accuracy
import matplotlib.pyplot as plt
import seaborn as sns



In [3]:
class DataModule():
    def __init__(self,batch_size):
        super().__init__()
        self.batch_size=batch_size
        # We define some augmentations that we would like to apply during training
        self.train_transform = transforms.Compose([
            transforms.Resize(256),
            transforms.RandomCrop(224, 4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])
        # During validation we need to only normalize and resize
        self.val_transform = transforms.Compose([
            transforms.Resize(224),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
        ])
        # This function sets up our datasets
    # which includes downloading and applying the augmentations
    def prepare_data(self):
        from torchvision import datasets, transforms
        #transform = transforms.Compose([
         #   transforms.Resize((224, 224)),
          #  transforms.ToTensor(),
        #])
        self.train_set = datasets.ImageFolder(root=r"C:\Users\mclor\Desktop\final_dataset\train" ,transform=self.train_transform)
        self.val_set = datasets.ImageFolder(root=r"C:\Users\mclor\Desktop\final_dataset\val" ,transform=self.train_transform)

    # This functions sets up the data loaders
    def setup(self):
        self.train_data_loader = torch.utils.data.DataLoader(self.train_set, batch_size=self.batch_size, shuffle=True)
        self.val_data_loader = torch.utils.data.DataLoader(self.val_set, batch_size=self.batch_size, shuffle=False)

    # This is simply a getter function for the training data loader
    def train_dataloader(self):
        return self.train_data_loader

    # This is simply a getter function for the validation data loader
    def val_dataloader(self):
        return self.val_data_loader

def load_image_from_url(path):
            img = Image.open(path)  # Load image directly from URL
            img = img.convert('RGB')  # Ensure 3 channels for ResNet
            img = transform(img)  # Apply transformations
            img = img.unsqueeze(0).to("cuda")
            return img


In [12]:
import torch
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights
from torchmetrics import Accuracy

class DLModel(nn.Module):
    def __init__(self, num_classes=8, pretrained=True, num_unfreeze_layers=0):
        super().__init__()
        # Load the ResNet50 backbone, pretrained if specified
        if pretrained:
            self.backbone = resnet50(weights=ResNet50_Weights.DEFAULT)
            self.backbone.requires_grad_(False)
            
            # Unfreeze some layers if requested
            if num_unfreeze_layers > 0:
                num_layers = 0
                for name, module in self.named_modules():
                    if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear) or isinstance(module, nn.BatchNorm2d):
                        num_layers += 1
                start_unfreezing_counter, counter = num_layers - num_unfreeze_layers, 0
                for name, module in self.named_modules():
                    if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear) or isinstance(module, nn.BatchNorm2d):
                        counter += 1
                    if counter >= start_unfreezing_counter:
                        module.requires_grad_(True)
        else:
            # Initialize ResNet50 from scratch
            self.backbone = resnet50(weights=None)
            self.backbone.requires_grad_(True)
        self.train_acc1, self.val_acc1 = Accuracy(task="multiclass", num_classes=num_classes), Accuracy(task="multiclass", num_classes=num_classes)
        #self.train_acc5, self.val_acc5 = Accuracy(task="multiclass", num_classes=num_classes, top_k=5), Accuracy(task="multiclass", num_classes=num_classes, top_k=5)
        # Remove the original FC layer (ResNet50 output size is 2048)
        in_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity()  # Remove the original FC layer
        
        # Add more Conv2D layers after the ResNet backbone
        self.additional_convs = nn.Sequential(
            nn.Conv2d(in_channels=2048, out_channels=1024, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(1024),
            nn.Conv2d(in_channels=1024, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            # Remove AdaptiveAvgPool2d here
        )

        
        # Fully connected layer for classification
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 128),  # Linear layer with 512 input (from conv output) to 256 neurons
            nn.ReLU(),
            nn.Dropout(0.5),  # Dropout to prevent overfitting
            nn.Linear(128, num_classes)  # Final output layer for classification into 'num_classes'
        )

        # Define loss function and metrics
        self.criterion = nn.CrossEntropyLoss()
        self.train_acc1 = Accuracy(task="multiclass", num_classes=num_classes)
        self.val_acc1 = Accuracy(task="multiclass", num_classes=num_classes)
    
    def configure_optimizers(self, lr, momentum, max_epochs):
        self.optimizer = torch.optim.SGD(self.parameters(), lr=lr, momentum=momentum)
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, T_max=max_epochs)

    def forward(self, x):
        # Forward pass through the ResNet backbone
        x = self.backbone(x)  # Output shape: [batch_size, 2048]
        
        # Reshape to [batch_size, 2048, 1, 1] for additional conv layers
        x = x.unsqueeze(-1).unsqueeze(-1)  # Add spatial dimensions
    
        # Forward pass through additional conv layers
        x = self.additional_convs(x)
    
        # Flatten and pass through classifier
        out = self.classifier(x)
        return out



    def training_step(self, x, y):
        self.optimizer.zero_grad()
        preds = self.forward(x)
        self.train_acc1.update(preds, y)
        loss = self.criterion(preds, y)
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def on_training_epoch_end(self, loss, epoch):
        acc1 = self.train_acc1.compute().item()
        print(f"Epoch No: {epoch+1}\nTraining Loss: {loss}\n Training Accuracy: {acc1}")
        return acc1

    def validation_step(self, x, y):
        preds = self.forward(x)
        self.val_acc1.update(preds, y)
        loss = self.criterion(preds, y)
        return loss.item()

    def on_validation_epoch_end(self, loss, epoch):
        acc1 = self.val_acc1.compute().item()
        print(f"Validation Loss: {loss}\nValidation Accuracy: {acc1} (Top-1)")
        return acc1

    def reset_metrics(self):
        self.train_acc1.reset(), self.val_acc1.reset()


In [24]:
# First define some static variables
num_classes = 7
num_epochs = 5
batch_size = 50
# Fine-tuning and training from scratch require different sets of learning rates
lr_finetune, lr_scratch = 0.001, 0.1
momentum = 0.9
device = torch.device("cuda")
# This variable will be used to save the per-epoch validation accuracy
shallow_finetuning_val_acc = list()
# This variable will be used to save the per-epoch training loss
shallow_finetuning_train_loss = list()

# Define the data
data_module = DataModule(batch_size=batch_size)
data_module.prepare_data()
data_module.setup()
train_loader, val_loader = data_module.train_dataloader(), data_module.val_dataloader()

model1 = DLModel(num_classes=num_classes, pretrained=True, num_unfreeze_layers=0).to(device)
model1.configure_optimizers(lr=lr_finetune, momentum=momentum, max_epochs=num_epochs)
for epoch in range(num_epochs):
    # This is the training cycle
    model1.train()
    avg_loss = 0
    # Iterate over each batch and update the model
    for x, y in tqdm(train_loader, total=len(train_loader)):
        x, y = x.to(device), y.to(device)
        avg_loss+= model1.training_step(x, y)
    avg_loss/=len(train_loader)
    shallow_finetuning_train_loss.append(avg_loss)
    acc1 = model1.on_training_epoch_end(avg_loss, epoch)

    # This is the validation cycle
    model1.eval()
    avg_loss = 0
    # Iterate over each batch and get the predictions
    for x, y in tqdm(val_loader, total=len(val_loader)):
        x, y = x.to(device), y.to(device)
        avg_loss+= model1.validation_step(x, y)
    avg_loss/=len(val_loader)
    acc1 = model1.on_validation_epoch_end(avg_loss, epoch)
    shallow_finetuning_val_acc.append(acc1)

    # Finally reset the metrics before going on to the next epoch
    model1.reset_metrics()

100%|██████████████████████████████████████| 64/64 [00:36<00:00,  1.77it/s]


Epoch No: 1
Training Loss: 0.7692635653074831
 Training Accuracy: 0.8139899373054504


100%|██████████████████████████████████████| 14/14 [00:08<00:00,  1.60it/s]


Validation Loss: 0.4069340207747051
Validation Accuracy: 0.9256449341773987 (Top-1)


100%|██████████████████████████████████████| 64/64 [00:35<00:00,  1.82it/s]


Epoch No: 2
Training Loss: 0.2449793970445171
 Training Accuracy: 0.9504391551017761


100%|██████████████████████████████████████| 14/14 [00:08<00:00,  1.60it/s]


Validation Loss: 0.20026149467698165
Validation Accuracy: 0.9317147135734558 (Top-1)


100%|██████████████████████████████████████| 64/64 [00:35<00:00,  1.79it/s]


Epoch No: 3
Training Loss: 0.16025684587657452
 Training Accuracy: 0.9642409086227417


100%|██████████████████████████████████████| 14/14 [00:08<00:00,  1.65it/s]


Validation Loss: 0.1886711467855743
Validation Accuracy: 0.9438543319702148 (Top-1)


100%|██████████████████████████████████████| 64/64 [00:35<00:00,  1.79it/s]


Epoch No: 4
Training Loss: 0.11037120409309864
 Training Accuracy: 0.9780426621437073


100%|██████████████████████████████████████| 14/14 [00:08<00:00,  1.59it/s]


Validation Loss: 0.17073394398071937
Validation Accuracy: 0.9408194422721863 (Top-1)


100%|██████████████████████████████████████| 64/64 [00:35<00:00,  1.80it/s]


Epoch No: 5
Training Loss: 0.10009059248841368
 Training Accuracy: 0.9767879843711853


100%|██████████████████████████████████████| 14/14 [00:08<00:00,  1.58it/s]

Validation Loss: 0.14501503934817656
Validation Accuracy: 0.9514415860176086 (Top-1)





In [9]:
with open('model1.pkl','wb') as model_file:
    pickle.dump(model1,model_file)

In [28]:
from PIL import Image
import torch.nn.functional as F
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize the image to the input size required by the model
    transforms.ToTensor(),  # Convert the image to a tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])
image=r"C:\Users\mclor\Desktop\final_dataset\train\cloths\clothes21.jpg"
img1=load_image_from_url(image)
class_names = ['class_1', 'class_2', 'class_3', 'class_4', 'class_5', 'class_6', 'class_7']
with torch.no_grad():
    output = model1.forward(img1)
    print(output)
    probabilities = F.softmax(output, dim=1)  
    predicted_class_idx = torch.argmax(probabilities, dim=1).item() 
    predicted_class_name = class_names[predicted_class_idx] 
    
    print(f"Predicted class: {predicted_class_name}, Probability: {probabilities[0][predicted_class_idx].item()}")

tensor([[-1.6452,  6.2281, -1.4315, -1.6041, -0.6796, -1.0085,  0.1944]],
       device='cuda:0')
Predicted class: class_2, Probability: 0.9946631193161011


In [None]:
torch.save(model1.state_dict(), 'model_weights1.pth')