In [1]:
import os

root_folder= 'D:/yogi/shoppin/ImageNet-Mini/augmented_images_200'
img_paths= []
labels= []
labels_set= set()

for dir, folder, files in os.walk(root_folder):
    if len(labels_set) >= 200:
        break

    for file in files:
        labels_set.add(os.path.basename(dir))
        img_path= os.path.join(dir, file)
        img_paths.append(img_path)
        labels.append(os.path.basename(dir))

In [2]:
from sklearn import preprocessing

label_encoder= preprocessing.LabelEncoder()
img_ids= label_encoder.fit_transform(labels)

### Dataset Split

In [3]:
from math import floor
import random

def split_data(img_paths, img_ids):

    # Combine data and labels into a list of tuples
    combined = list(zip(img_paths, img_ids))

    # Shuffle the combined list
    random.shuffle(combined)

    # Split the shuffled combined list into two lists
    split_index1 = floor(len(combined) * 0.8)
    split_index2 = floor(len(combined) * 0.9)

    train_combined  = combined[:split_index1]
    test_combined   = combined[split_index1:split_index2]
    enroll_combined = combined[split_index2:]

    # Unpack the combined tuples into separate paths and labels
    train_img_paths, train_img_ids   = zip(*train_combined)
    test_img_paths, test_img_ids     = zip(*test_combined)
    enroll_img_paths, enroll_img_ids = zip(*enroll_combined)

    return train_img_paths, train_img_ids, test_img_paths, test_img_ids, enroll_img_paths, enroll_img_ids


In [4]:
import torch

train_img_paths, train_img_ids, test_img_paths, test_img_ids, enroll_img_paths, enroll_img_ids= split_data(img_paths, img_ids)


### Dataloader

In [10]:
import os
import pandas as pd
from PIL import Image
from torchvision.io import read_image
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torch
import numpy as np
from ultralytics import YOLO
import cv2
import matplotlib.pyplot as plt

In [6]:
device= 'cuda' if torch.cuda.is_available() else 'cpu'

In [7]:
def transform_image():
  
  transform= transforms.Compose([
      transforms.RandomHorizontalFlip(),
      transforms.RandomResizedCrop(224),
      transforms.ToTensor(),
      transforms.Lambda(lambda img: img.repeat(3, 1, 1) if img.shape[0] == 1 else img),
      transforms.Normalize(mean=0.5,  # Standard ImageNet normalization
      std=0.5)
  ])

  return transform

In [8]:
# Load the YOLO segmentation model
seg_model = YOLO('yolov8n-seg.pt').to(device)

In [31]:
training_mask_data= []
transform= transform_image()
objects_detected= 0

for i, image_path in enumerate(train_img_paths):

    image = cv2.imread(image_path)
    # plt.imshow(image)
    # break
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Resize the image to a valid shape for YOLO (e.g., 640x640)
    image_resized = cv2.resize(image, (640, 640))

    # Convert to tensor and normalize
    image_tensor = torch.tensor(image_resized, dtype=torch.float32).permute(2, 0, 1) / 255.0  # Shape: (3, 640, 640)
    image_tensor = image_tensor.unsqueeze(0).to(device)  # Add batch dimension: (1, 3, 640, 640)

    # Perform segmentation
    results = seg_model(image)
    
    if results[0].masks is not None:
        # Extract masks
        masks = results[0].masks.data.cpu().numpy()

        # Combine masks (union of all masks)
        combined_mask = np.any(masks, axis=0)

        # Resize mask to match image dimensions
        original_height, original_width = image.shape[:2]
        resized_mask = cv2.resize(combined_mask.astype(np.uint8), (original_width, original_height), interpolation=cv2.INTER_NEAREST)
        resized_mask = resized_mask.astype(bool)  # Convert back to boolean

        # Invert mask to focus on unmasked areas
        inverted_mask = ~resized_mask

        # Apply mask to image
        masked_image = image.copy()
        masked_image[inverted_mask] = 0  # Set masked regions to black
        # masked_image = Image.fromarray(masked_image)
        # plt.imshow(masked_image)
        # break
        print(f'objects_detected: {objects_detected}')
        objects_detected += 1
    else:
        # If no masks are found, use the original image
        masked_image = image

    # Convert NumPy array to PIL Image
    masked_image = Image.fromarray(masked_image)
    # plt.imshow(masked_image)
    # break

    # Apply transformations (if any)
    
    masked_image = transform(masked_image)

    training_mask_data.append(masked_image)
    print(i)


0: 640x640 (no detections), 18.0ms
Speed: 5.9ms preprocess, 18.0ms inference, 0.6ms postprocess per image at shape (1, 3, 640, 640)
0

0: 640x640 (no detections), 14.4ms
Speed: 4.7ms preprocess, 14.4ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
1

0: 640x640 1 cat, 1 dog, 15.0ms
Speed: 6.0ms preprocess, 15.0ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 640)
objects_detected: 0
2

0: 640x640 (no detections), 16.0ms
Speed: 4.0ms preprocess, 16.0ms inference, 0.6ms postprocess per image at shape (1, 3, 640, 640)
3

0: 640x640 1 tv, 14.0ms
Speed: 4.0ms preprocess, 14.0ms inference, 2.9ms postprocess per image at shape (1, 3, 640, 640)
objects_detected: 1
4

0: 640x640 (no detections), 12.4ms
Speed: 2.9ms preprocess, 12.4ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
5

0: 640x640 1 teddy bear, 21.3ms
Speed: 3.8ms preprocess, 21.3ms inference, 2.7ms postprocess per image at shape (1, 3, 640, 640)
objects_detected: 2
6

0: 640x640 

In [32]:
test_mask_data= []
transform= transform_image()

for i, image_path in enumerate(test_img_paths):

    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Resize the image to a valid shape for YOLO (e.g., 640x640)
    image_resized = cv2.resize(image, (640, 640))

    # Convert to tensor and normalize
    image_tensor = torch.tensor(image_resized, dtype=torch.float32).permute(2, 0, 1) / 255.0  # Shape: (3, 640, 640)
    image_tensor = image_tensor.unsqueeze(0).to(device)  # Add batch dimension: (1, 3, 640, 640)

    # Perform segmentation
    results = seg_model(image)
    
    if results[0].masks is not None:
        # Extract masks
        masks = results[0].masks.data.cpu().numpy()

        # Combine masks (union of all masks)
        combined_mask = np.any(masks, axis=0)

        # Resize mask to match image dimensions
        original_height, original_width = image.shape[:2]
        resized_mask = cv2.resize(combined_mask.astype(np.uint8), (original_width, original_height), interpolation=cv2.INTER_NEAREST)
        resized_mask = resized_mask.astype(bool)  # Convert back to boolean

        # Invert mask to focus on unmasked areas
        inverted_mask = ~resized_mask

        # Apply mask to image
        masked_image = image.copy()
        masked_image[inverted_mask] = 0  # Set masked regions to black
    else:
        # If no masks are found, use the original image
        masked_image = image

    # Convert NumPy array to PIL Image
    masked_image = Image.fromarray(masked_image)

    # Apply transformations (if any)
    
    masked_image = transform(masked_image)

    test_mask_data.append(masked_image)
    print(i)


0: 640x640 1 bird, 15.8ms
Speed: 5.2ms preprocess, 15.8ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 640)
0

0: 640x640 1 dog, 1 bed, 23.0ms
Speed: 4.0ms preprocess, 23.0ms inference, 6.1ms postprocess per image at shape (1, 3, 640, 640)
1

0: 640x640 3 persons, 13.0ms
Speed: 4.0ms preprocess, 13.0ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 640)
2

0: 640x640 (no detections), 13.6ms
Speed: 3.5ms preprocess, 13.6ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 640)
3

0: 640x640 (no detections), 14.7ms
Speed: 4.0ms preprocess, 14.7ms inference, 0.3ms postprocess per image at shape (1, 3, 640, 640)
4

0: 640x640 (no detections), 13.4ms
Speed: 3.0ms preprocess, 13.4ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
5

0: 640x640 1 dog, 23.1ms
Speed: 3.9ms preprocess, 23.1ms inference, 3.3ms postprocess per image at shape (1, 3, 640, 640)
6

0: 640x640 (no detections), 14.7ms
Speed: 3.3ms preprocess, 14.7ms inference, 1.0m

In [33]:
class MaskedDataset(Dataset):
    def __init__(self, masked_image, labels, transform=None):
        # self.image_paths = image_paths
        self.masked_image= masked_image
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.masked_image)
        # return len(self.image_paths)

    def __getitem__(self, idx):
        # Load image
        # image_path = self.image_paths[idx]
        # image = cv2.imread(image_path)
        # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # image= image.to(device)

        # # Perform segmentation
        # results = seg_model(image)
        
        # if results[0].masks is not None:
        #     # Extract masks
        #     masks = results[0].masks.data.cpu().numpy()

        #     # Combine masks (union of all masks)
        #     combined_mask = np.any(masks, axis=0)

        #     # Resize mask to match image dimensions
        #     original_height, original_width = image.shape[:2]
        #     resized_mask = cv2.resize(combined_mask.astype(np.uint8), (original_width, original_height), interpolation=cv2.INTER_NEAREST)
        #     resized_mask = resized_mask.astype(bool)  # Convert back to boolean

        #     # Invert mask to focus on unmasked areas
        #     inverted_mask = ~resized_mask

        #     # Apply mask to image
        #     masked_image = image.copy()
        #     masked_image[~inverted_mask] = 0  # Set masked regions to black
        # else:
        #     # If no masks are found, use the original image
        #     masked_image = image

        # # Convert NumPy array to PIL Image
        # masked_image = Image.fromarray(masked_image)

        # # Apply transformations (if any)
        # if self.transform:
        #     masked_image = self.transform(masked_image)

        masked_image= self.masked_image[idx]



        # Get label
        label = self.labels[idx]

        return masked_image, label


In [34]:
training_data   = MaskedDataset(training_mask_data, train_img_ids, transform_image())
val_data        = MaskedDataset(test_mask_data, test_img_ids, transform_image())

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=64, shuffle=True)

In [35]:
# Access batches
for batch_idx, (images, labels) in enumerate(train_dataloader):
    print(f"Batch {batch_idx} - Images: {images.shape}, Labels: {labels.shape}")
    break

Batch 0 - Images: torch.Size([64, 3, 224, 224]), Labels: torch.Size([64])


### Model Configuration

In [37]:
import torch
from torchvision.models import resnet50, resnet34, resnet18, vit_b_16
import torch.nn as nn
from torch.optim import SGD
from torch.nn import CrossEntropyLoss

In [38]:
device= 'cuda' if torch.cuda.is_available() else 'cpu'

In [39]:
model= resnet34(weights= True)



In [40]:
in_features= 512
classes= 200

model.fc= nn.Sequential(
    nn.Linear(in_features, classes))

In [46]:
for param in model.parameters():
    assert param.requires_grad, "Model parameter does not require gradient!"

In [47]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
criteria= CrossEntropyLoss()

### Training & Testing

In [48]:
model= model.to(device)

In [73]:
def Train(model, train_dataloader, epoch):
  model.train()
  loss_running= 0
  total= 0
  correct= 0

  for idx, (inputs, labels) in enumerate(train_dataloader):
    inputs= inputs.to(device)
    labels= labels.to(device)
    outputs= model(inputs)

    # print(outputs)
    # print(labels)

    # break

    loss = criteria(outputs, labels)
    # print(loss)
    # break
    loss_running += loss

    optimizer.zero_grad()
    # break
    loss.requires_grad = True
    # print(loss)
    loss.backward()
    # break
    optimizer.step()

    _, predicted = outputs.max(1)

    total += labels.size(0)
    correct += predicted.eq(labels).sum().item()

    # print(f'Epoch {epoch + 1}: Batch idx: {idx}/{len(train_dataloader)} train accuracy: {correct/total} | train loss: {loss}')

  print(f'Epoch {epoch + 1}: train accuracy: {correct/total} | train loss: {loss_running/len(train_dataloader)}')

In [74]:
def Test(model, test_dataloader, epoch=0):
    model.eval()
    total= 0
    correct= 0
    predictions= []
    true_labels= []

    for input, labels in test_dataloader:
        input= input.to(device)
        labels= labels.to(device)
        outputs= model(input)

        _, predicted = outputs.max(1)
        predictions.extend(predicted)
        true_labels.extend(labels)

        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    print(f'Epoch: {epoch + 1} | test accuracy: {correct/total}')

In [75]:
epochs= 10

for i in range(epochs):
    Train(model, train_dataloader, i)
    # break
    Test(model, val_dataloader, i)

Epoch 1: train accuracy: 0.0030262636452066147 | train loss: 5.542102336883545
Epoch: 1 | test accuracy: 0.002881844380403458
Epoch 2: train accuracy: 0.0029181828007349498 | train loss: 5.5429534912109375
Epoch: 2 | test accuracy: 0.00345821325648415
Epoch 3: train accuracy: 0.003710775660193825 | train loss: 5.541103363037109
Epoch: 3 | test accuracy: 0.002881844380403458
Epoch 4: train accuracy: 0.0032063983859927227 | train loss: 5.543647289276123
Epoch: 4 | test accuracy: 0.003170028818443804
Epoch 5: train accuracy: 0.002990236697049393 | train loss: 5.5418701171875
Epoch: 5 | test accuracy: 0.002881844380403458
Epoch 6: train accuracy: 0.0034585870230932738 | train loss: 5.542503356933594
Epoch: 6 | test accuracy: 0.003170028818443804
Epoch 7: train accuracy: 0.0032784522823071658 | train loss: 5.542421817779541
Epoch: 7 | test accuracy: 0.0025936599423631124
Epoch 8: train accuracy: 0.0028461289044205067 | train loss: 5.5414886474609375
Epoch: 8 | test accuracy: 0.0028818443804