# Library Imports

In [30]:
# General imports
import numpy as np
import matplotlib.pyplot as plt
import os
import csv
import ast
import cv2
import pandas as pd
# Pytorch imports
import torch
import torchvision
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms

# Prep Dataset

In [31]:
# square = 0, circle = 1, triangle = 2
# red = 0, green = 1, blue = 2
LABELS = ['square', 'circle', 'triangle', 'NONE']
COLORS = ['red', 'green', 'blue', 'NONE']

def customCollate(batch):
    images = torch.stack([item[0] for item in batch])

    shapes = torch.stack([item[1]["shape"] for item in batch])
    colors = torch.stack([item[1]["color"] for item in batch])

    label_dict = {"shape": shapes, "color": colors}
    return images, label_dict


class CustomDataset(Dataset):
    def __init__(self, input_dir, label_csv, transform=transforms.ToTensor(), max_shapes=10):
        self.max_shapes = max_shapes
        self.transform = transform
        self.label_to_num = {"square" : 0, "circle" : 1, "triangle" : 2}
        self.color_to_num = {"red" : 0, "green" : 1, "blue" : 2}
        self.img_filenames = []
        self.img_labels = []
        
        files = os.listdir(input_dir)
        files = sorted(files, key=lambda x: int(x.split("_")[1].split(".")[0]))
        for img in files:
            self.img_filenames.append(os.path.join(input_dir, img))
        with open(label_csv, 'r') as file:
            reader = csv.reader(file)
            first = True
            for row in reader:
                if first:
                    first = False
                    continue
                self.img_labels.append(ast.literal_eval(row[1]))

    def __len__(self):
        return len(self.img_filenames)
    
    def __getitem__(self, idx):
        img_filename = self.img_filenames[idx]
        get_labels = self.img_labels[idx]

        image = cv2.imread(img_filename)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transform(image)

        shape_list = []
        color_list = []
        for i in range(self.max_shapes):
            if i < len(get_labels):
                shape, col = get_labels[i]
                shape_list.append(self.label_to_num[shape])
                color_list.append(self.color_to_num[col])
            else:
                shape_list.append(3)
                color_list.append(3)
        label_dict = {"shape": torch.from_numpy(np.array(shape_list)), "color": torch.from_numpy(np.array(color_list))}

        return image, label_dict

class CustomTestSet(Dataset):
    def __init__(self, input_dir, transform=transforms.ToTensor(), max_shapes=10):
        self.image_dir = input_dir
        self.max_shapes = max_shapes
        self.transform = transform
        self.img_filenames = []
        
        files = os.listdir(input_dir)
        files = sorted(files, key=lambda x: int(x.split("_")[1].split(".")[0]))
        for img in files:
            self.img_filenames.append(os.path.join(input_dir, img))

    def __len__(self):
        return len(self.img_filenames)
    
    def __getitem__(self, idx):
        img_filename = self.img_filenames[idx]

        image = cv2.imread(img_filename)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transform(image)

        relative_path = os.path.relpath(img_filename, start=self.image_dir)
        relative_path = relative_path.replace("\\", "/")
        relative_path = f"test_dataset/{relative_path}"

        return image, relative_path


In [32]:
train_dataset = CustomDataset(r"dataset_v3\train_dataset", r"dataset_v3\train.csv")
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0, collate_fn=customCollate, pin_memory=True)

# Custom Loss

In [33]:
def customLoss(pred, labels):
    pred_shapes = pred["shape"]
    pred_colors = pred["color"]
    label_shapes = labels["shape"]
    label_colors = labels["color"]

    criterion_shape = nn.CrossEntropyLoss()
    criterion_color = nn.CrossEntropyLoss()

    shape_loss = criterion_shape(pred_shapes.reshape(-1, 4), label_shapes.reshape(-1))
    color_loss = criterion_color(pred_colors.reshape(-1, 4), label_colors.reshape(-1))

    return (shape_loss + color_loss) / 2.0


# Model Architecture

In [None]:
def kaimingInitWeights(seq_layer):
    if isinstance(seq_layer, nn.Linear):
        nn.init.kaiming_normal_(seq_layer.weight, nonlinearity='relu')
        if seq_layer.bias is not None:
            nn.init.constant_(seq_layer.bias, 0)


class ShapeColorModel(nn.Module):
    def __init__(self, max_shapes=10):
        super().__init__()
        self.max_shapes = max_shapes

        # Get resnet50 but remove the last layer 
        self.backbone = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)
        # To get output size (batch, 2048)
        self.backbone.fc = nn.Identity()
        self.backbone_dim = 2048

        # One head to predict each of shape and color
        self.shape_head = nn.Sequential(
            nn.Linear(self.backbone_dim, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, self.max_shapes * 4)
        )

        self.color_head = nn.Sequential(
            nn.Linear(self.backbone_dim, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, self.max_shapes * 4)
        )
        
        self.shape_head.apply(kaimingInitWeights)
        self.color_head.apply(kaimingInitWeights)
            
    def forward(self, x):
        features = self.backbone(x)
    
        # Get shape and color preds for each slot
        out_shape = self.shape_head(features).view(-1, self.max_shapes, 4) # Get to shape (batch_size, max_shapes, 4)
        out_color = self.color_head(features).view(-1, self.max_shapes, 4) # Get to shape (batch_size, max_shapes, 4)

        return {"shape" : out_shape, "color" : out_color}


# Training Loop

In [35]:
NUM_EPOCHS = 50

model = ShapeColorModel()
model.cuda()

print(next(model.shape_head.parameters()).device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS, eta_min=0.00001)

model.train()

device = torch.device('cuda')

for epoch in range(NUM_EPOCHS):
    epoch_loss = []
    # print("Entering batches")
    for i, (batch, labels) in enumerate(train_dataloader):
        # print("Starting batch to cuda")
        batch = batch.to(device, non_blocking=True)
        # print("Done with batch loading")
        labels = {k: v.to(device, non_blocking=True) for k, v in labels.items()}

        # print("Forward pass")
        pred = model(batch)
        optimizer.zero_grad()
        loss = customLoss(pred, labels)
        # print("Post loss")
        loss.backward()
        # print("Post backward pass")
        optimizer.step()
        
        epoch_loss.append(loss)
    
    scheduler.step()
    print(f"Avg Epoch Loss: {sum(epoch_loss) / len(epoch_loss)}")


cuda:0
Avg Epoch Loss: 0.168692484498024
Avg Epoch Loss: 0.07702013850212097
Avg Epoch Loss: 0.06421952694654465
Avg Epoch Loss: 0.05819423496723175
Avg Epoch Loss: 0.05389714241027832
Avg Epoch Loss: 0.04817719757556915
Avg Epoch Loss: 0.06601420044898987
Avg Epoch Loss: 0.04684426262974739
Avg Epoch Loss: 0.04212377220392227
Avg Epoch Loss: 0.03737744688987732
Avg Epoch Loss: 0.03427338972687721
Avg Epoch Loss: 0.03379887342453003
Avg Epoch Loss: 0.03246501088142395
Avg Epoch Loss: 0.03112662024796009
Avg Epoch Loss: 0.03348517790436745
Avg Epoch Loss: 0.022238869220018387
Avg Epoch Loss: 0.013855098746716976
Avg Epoch Loss: 0.013589863665401936
Avg Epoch Loss: 0.02160891890525818
Avg Epoch Loss: 0.009305385872721672
Avg Epoch Loss: 0.007834656164050102
Avg Epoch Loss: 0.004741713870316744
Avg Epoch Loss: 0.0030881718266755342
Avg Epoch Loss: 0.0018660166533663869
Avg Epoch Loss: 0.0013848316157236695
Avg Epoch Loss: 0.00436432333663106
Avg Epoch Loss: 0.0016415377613157034
Avg Epoch

# Inference

In [37]:
test_dataset = CustomTestSet(r"dataset_v3\test_dataset")
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=True)

model.eval()
model.cuda()

predictions = []

for i, (batch, label) in enumerate(test_dataloader):
        batch = batch.to(device, non_blocking=True)

        pred = model(batch)
        shape_preds = pred["shape"].squeeze(0).argmax(dim=1)
        color_preds = pred["color"].squeeze(0).argmax(dim=1)

        shape_preds = shape_preds.cpu().numpy()
        color_preds = color_preds.cpu().numpy()

        label_list = []
        for shape, color in zip(shape_preds, color_preds):
                if shape == 3 or color == 3:
                        continue
                if (LABELS[shape], COLORS[color]) in label_list:
                      continue
                label_list.append((LABELS[shape], COLORS[color]))
        predictions.append((label, str(label_list)))

with open("test_predictions.csv", "w", newline='') as f:
    writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
    writer.writerow(["img_path", "label"])
    for filename, label_string in predictions:
        writer.writerow([filename[0], label_string])