In [None]:
import os, zipfile

os.environ['KAGGLE_USERNAME'] = os.environ['KAGGLE_USERNAME']
os.environ['KAGGLE_KEY'] = os.environ['KAGGLE_KEY']

!kaggle datasets download -d paramaggarwal/fashion-product-images-small

In [None]:
zip_path = 'fashion-product-images-small.zip'
extract_to = '/content/'

if not os.path.exists(extract_to):
    os.makedirs(extract_to)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to)

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from transformers import ViTModel, ViTConfig
from pathlib import Path
import albumentations as A
from albumentations.pytorch import ToTensorV2
import random
import os
import zipfile
from torchvision.models import ResNet50_Weights
from torch.utils.tensorboard import SummaryWriter
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt
import random
from tabulate import tabulate
import time

In [None]:
# Set random seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
class FashionDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, label_binarizer=None):
        self.data_frame = df
        self.img_dir = Path(img_dir)
        self.transform = transform
        self.included_cols = ['gender', 'masterCategory', 'subCategory', 'articleType']
        self.label_binarizer = label_binarizer

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        row = self.data_frame.iloc[idx]
        img_name = self.img_dir / f"{row['id']}.jpg"

        try:
            image = Image.open(img_name).convert('RGB')
        except (IOError, FileNotFoundError):
            print(f"Error opening image: {img_name}")
            return None, None

        labels = row[self.included_cols].values.tolist()
        encoded_labels = self.label_binarizer.transform([labels])[0]

        if self.transform:
            image = self.transform(image=np.array(image))['image']

        return image, torch.FloatTensor(encoded_labels)

    def print_picture(self, idx):
        row = self.data_frame.iloc[idx]
        img_name = self.img_dir / f"{row['id']}.jpg"

        try:
            original_image = Image.open(img_name).convert('RGB')
        except (IOError, FileNotFoundError):
            print(f"Error opening original image: {img_name}")
            return None, None

        labels = [self.data_frame.iloc[idx][col] for col in self.included_cols]

        transformed_image = None
        if self.transform:
            transformed_image = self.transform(image=np.array(original_image))['image']

        # Display original image
        plt.subplot(1, 2, 1)
        plt.imshow(original_image)
        plt.title("Original Image")
        plt.axis('off')

        # Display transformed image
        if transformed_image is not None:
            transformed_image = np.transpose(transformed_image, (1, 2, 0)) 
            plt.subplot(1, 2, 2)
            plt.imshow(transformed_image)
            plt.title("Transformed Image")
            plt.axis('off')

        plt.show()

        return labels

In [None]:
df = pd.read_csv('data/styles.csv', on_bad_lines='skip')

filters = {
    'gender': ['Men', 'Women', 'Unisex'],
    'masterCategory': ['Apparel', 'Accessories', 'Footwear'],
    'subCategory': df['subCategory'].value_counts()[df['subCategory'].value_counts() > 450].index,
    'articleType': df['articleType'].value_counts()[df['articleType'].value_counts() > 500].index
}

criteria = df[list(filters)].isin(filters).all(axis=1)
df = df[criteria]
df = df[df['id'].apply(lambda x: (Path('data/images') / f"{x}.jpg").exists())]

train_df, val_df = train_test_split(df, test_size=0.2, random_state=SEED)
included_cols = ['gender', 'masterCategory', 'subCategory', 'articleType']
all_labels = pd.concat([train_df[included_cols], val_df[included_cols]])

label_binarizer = MultiLabelBinarizer()
label_binarizer.fit(train_df[included_cols].values)

In [None]:
BATCH_SIZE = 64
num_epochs = 1

device = torch.device("cuda")

train_dataset = FashionDataset(train_df, img_dir='data/images', transform=None, label_binarizer=label_binarizer)
val_dataset = FashionDataset(val_df, img_dir='data/images', transform=None, label_binarizer=label_binarizer)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

noise_transform = A.Compose([
    A.GaussNoise(var_limit=(10.0, 50.0)),
    A.JpegCompression(quality_lower=50, quality_upper=100, p=0.5),
    A.Blur(blur_limit=(3, 7), p=0.3),
    A.Rotate(limit=30, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
    ToTensorV2()
])

noisy_dataset = FashionDataset(val_df, img_dir='data/images', transform=noise_transform, label_binarizer=label_binarizer)
noisy_loader = DataLoader(noisy_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

noisy_dataset.print_picture(0)
noisy_dataset.print_picture(1)
noisy_dataset.print_picture(2)
noisy_dataset.print_picture(3)
noisy_dataset.print_picture(4)

In [None]:
print(f"Number of images: {len(df)}")

columns = train_dataset.included_cols

print(train_dataset.print_picture(253))
print()
fig, axs = plt.subplots(2, 2, figsize=(12, 8))
df['masterCategory'].value_counts().plot(kind='bar', ax=axs[0, 0], title='Number of images per masterCategory', ylabel='Number of images', xlabel='masterCategory')
df['gender'].value_counts().plot(kind='bar', ax=axs[0, 1], title='Number of images per gender', ylabel='Number of images', xlabel='gender')
df['subCategory'].value_counts().nlargest(10).plot(kind='bar', ax=axs[1, 0], title='Number of images per subCategory (top 10)', ylabel='Number of images', xlabel='subCategory')
df['articleType'].value_counts().nlargest(10).plot(kind='bar', ax=axs[1, 1], title='Number of images per articleType (top 10)', ylabel='Number of images', xlabel='articleType')
plt.tight_layout()
plt.show()

summary_stats = df[columns].describe(include='all')

fig, ax = plt.subplots(figsize=(8, 6))

ax.xaxis.set_visible(False)
ax.yaxis.set_visible(False)
ax.set_frame_on(False)

table = pd.plotting.table(ax, summary_stats, loc='center', cellLoc='center', colWidths=[0.2] * len(summary_stats.columns))

table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 1.2)

plt.savefig('summary_statistics_table.png', bbox_inches='tight', pad_inches=0.05) 
plt.show() 

In [None]:
# Residual Network CNN model
class ResNet(nn.Module):
  def __init__(self, num_classes):
    super(ResNet,self).__init__()
    self.resnet = models.resnet50(weights=None)
    self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes)

  def forward(self, x):
    return self.resnet(x)

In [None]:
# Vision Transformer model
class ViT(nn.Module):
 def __init__(self, num_classes):
   super(ViT, self).__init__()
   self.vit = ViTModel(ViTConfig())
   self.classifier = nn.Linear(768, num_classes)

 def forward(self, x):
   outputs = self.vit(pixel_values=x).last_hidden_state
   x = outputs[:, 0, :]
   x = self.classifier(x)

   return x

In [None]:
num_labels = len(label_binarizer.classes_)

resnet_model = ResNet(num_classes = num_labels)
vit_model = ViT(num_classes=num_labels)

label_frequency = np.zeros(num_labels)
for labels in all_labels.values:
    encoded_labels = label_binarizer.transform([labels.tolist()])[0]
    label_frequency += encoded_labels

epsilon = 1e-5
label_frequency[label_frequency == 0] = epsilon

criterion = nn.BCEWithLogitsLoss()

optimizer_resnet = optim.Adam(resnet_model.parameters(), lr = 0.001)
optimizer_vit = optim.Adam(vit_model.parameters(), lr=0.001)

lr_scheduler_resnet = StepLR(optimizer_resnet, step_size=3, gamma=0.1)
lr_scheduler_vit = StepLR(optimizer_vit, step_size=2, gamma=0.1)

In [None]:
class EarlyStopping:
      def __init__(self, patience=7, verbose=False, delta=0, path ='checkpoint.pt'):
          self.patience = patience
          self.verbose = verbose
          self.delta = delta
          self.best_score = None
          self.early_stop = False
          self.counter = 0
          self.path = path

      def __call__(self, val_metric, model):
          score = -val_metric
          if self.best_score is None:
              self.best_score = score
              self.save_checkpoint(val_metric, model)
          elif score < self.best_score + self.delta:
              self.counter += 1
              if self.counter >= self.patience:
                  self.early_stop = True
          else:
            if score > self.best_score + self.delta:
              self.save_checkpoint(val_metric, model)
            self.best_score = score
            self.counter = 0

      def save_checkpoint(self, val_metric, model):
          if self.verbose:
              print(f'Validation metric decreased ({self.best_score:.6f} --> {val_metric:.6f}).  Saving model ...')
              torch.save(model.state_dict(), self.path)

early_stopping_resnet = EarlyStopping(patience=5, verbose = True, path = 'resnet_checkpoint.pt')
early_stopping_vit = EarlyStopping(patience=5, verbose = True, path = 'vit_checkpoint.pt')

In [None]:
losses_resnet = [1]
losses_vit = [1]

f1_scores_resnet = [0]
f1_scores_vit = [0]

def train_model(model, dataloader, criterion, optimizer, scheduler, epoch, device):
    model.to(device)
    model.train()
    total_loss = 0
    all_labels = []
    all_predictions = []

    for batch_idx, (images, labels) in enumerate(dataloader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        predictions = torch.sigmoid(outputs)
        predictions = (predictions > 0.3).float()

        all_labels.append(labels.cpu().numpy())
        preds = predictions.cpu().numpy()
        all_predictions.append(preds)

        # doing \r escape just to not have huge stdout
        print(f'\rEpoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(dataloader)}]', end='')

    all_labels = np.concatenate(all_labels, axis=0)
    all_predictions = np.concatenate(all_predictions, axis=0)

    all_labels = all_labels.ravel()
    all_predictions = all_predictions.ravel()

    epoch_f1 = f1_score(all_labels, all_predictions, average="micro")

    avg_loss = total_loss / len(dataloader)

    if model == resnet_model:
        losses_resnet.append(avg_loss)
        f1_scores_resnet.append(epoch_f1)
    else:
        losses_vit.append(avg_loss)
        f1_scores_vit.append(epoch_f1)

def evaluate_model(model, dataloader, criterion, device, epoch):
    model.eval()
    all_labels = []
    all_predictions = []
    val_loss = 0.0

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(dataloader):
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            predictions = torch.sigmoid(outputs)
            predictions = (predictions > 0.3).float()
            preds = predictions.cpu().numpy()

            all_labels.append(labels.cpu().numpy())
            all_predictions.append(preds)

    all_labels = np.concatenate(all_labels)
    all_predictions = np.concatenate(all_predictions)

    all_labels = all_labels.ravel() # needed to flatten this np array into a single dimension
    all_predictions = all_predictions.ravel()

    f1 = f1_score(all_labels, all_predictions, average="micro")

    avg_val_loss = val_loss / len(dataloader)
    print(f'Validation Loss: {avg_val_loss:.4f}, F1 Score: {f1:.4f}')

    return avg_val_loss, f1

def train_and_evaluate_model(model, train_loader, test_loader, criterion, optimizer, scheduler, early_stopping, num_epochs, device):
    for epoch in range(num_epochs):
        train_model(model, train_loader, criterion, optimizer, scheduler, epoch, device)
        val_loss, f1_score_val = evaluate_model(model, test_loader, criterion, device, epoch)

In [None]:
start_time = time.time()

print("Training and Evaluating ResNet")
train_and_evaluate_model(resnet_model, train_loader, test_loader, criterion, optimizer_resnet, lr_scheduler_resnet, early_stopping_resnet, num_epochs, device)

end_time = time.time()
execution_time = round(end_time - start_time, 2)
print(f"Execution time: {execution_time} seconds")

In [None]:
start_time = time.time()

print("Training and Evaluating ViT")
train_and_evaluate_model(vit_model, train_loader, test_loader, criterion, optimizer_resnet, lr_scheduler_resnet, early_stopping_resnet, num_epochs, device)

end_time = time.time()
execution_time = round(end_time - start_time, 2)
print(f"Execution time: {execution_time} seconds")

In [None]:
plt.figure()
plt.title('Training Loss')
plt.ylim(0, 1)
plt.xlim(0, num_epochs)
plt.plot(losses_resnet, label='ResNet Training Loss')
plt.plot(losses_vit, label='ViT Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.xticks(np.arange(0, num_epochs, 1))
plt.legend()
plt.show()

plt.figure()
plt.title('F1 Score')
plt.ylim(0, 1)
plt.xlim(0, num_epochs)
plt.plot(f1_scores_resnet, label='ResNet F1 Score')
plt.plot(f1_scores_vit, label='ViT F1 Score')
plt.xlabel('Epochs')
plt.ylabel('F1')
plt.xticks(np.arange(0, num_epochs, 1))
plt.legend()
plt.show()

In [None]:
def get_random_images(dataset, num_images=10):
    random_indices = random.sample(range(len(dataset)), num_images)
    random_images = [dataset[i] for i in random_indices]
    return random_images

def predict_labels(model, image, device, threshold=0.3):
    model.eval()
    image = image.to(device)
    image = image.unsqueeze(0)

    with torch.no_grad():
        output = model(image)
        predictions = torch.sigmoid(output)

    return (predictions > threshold).cpu().numpy()[0]

def map_labels_to_categories(labels, dataset):
    label_map = {
        "masterCategory" : None,
        "subCategory" :  None,
        "articleType" : None,
        "gender" : None
    }

    for label in labels:
        for col in dataset.included_cols:
            if label in dataset.data_frame[col].values:
                label_map[col] = label
                break

    return label_map

def get_category_label_string(label_map):
    return '\n'.join([f"{category}: {label}" for category, label in label_map.items()])

def plot_images_with_predictions(dataset, label_binarizer, num_images=5, device='cuda'):
    fig, axs = plt.subplots(num_images, 4, figsize=(15, 4 * num_images))
    fig.tight_layout()

    for i, (image, ground_truth) in enumerate(get_random_images(dataset, num_images)):
        ground_truth_np = ground_truth.cpu().numpy()
        ground_truth_labels = label_binarizer.classes_[ground_truth_np.astype(bool)]

        image = np.clip(image, 0.0, 1.0)
        axs[i, 0].imshow(image.permute(1, 2, 0))
        axs[i, 0].axis('off')

        ground_truth_info = map_labels_to_categories(ground_truth_labels, dataset)

        axs[i, 1].text(0.5, 0.6, f"Ground Truth", ha='center', va='center', wrap=True, fontweight="bold")
        axs[i, 1].text(0.5, 0.4, f"{get_category_label_string(ground_truth_info)}", ha='center', va='center', wrap=True)
        axs[i, 1].axis('off')

        resnet_predictions = predict_labels(resnet_model, image, device)
        resnet_predictions_labels = [label_binarizer.classes_[j] for j, pred in enumerate(resnet_predictions) if pred]
        resnet_predictions_info = map_labels_to_categories(resnet_predictions_labels, dataset)
        resnet_matching = sum(1 for key, value in ground_truth_info.items() if key in resnet_predictions_info and resnet_predictions_info[key] == value)

        axs[i, 2].text(0.5, 0.6, "ResNet Prediction", ha='center', va='center', wrap=True, fontweight="bold")
        axs[i, 2].text(0.5, 0.4, f"{get_category_label_string(resnet_predictions_info)}", ha='center', va='center', wrap=True)
        axs[i, 2].text(0.5, 0.2, f"Matches: {resnet_matching} / 4", ha='center', va='center', wrap=True)
        axs[i, 2].axis('off')

        vit_predictions = predict_labels(vit_model, image, device)
        vit_predictions_labels = [label_binarizer.classes_[j] for j, pred in enumerate(vit_predictions) if pred]
        vit_predictions_info = map_labels_to_categories(vit_predictions_labels, dataset)
        vit_matching = sum(1 for key, value in ground_truth_info.items() if key in vit_predictions_info and vit_predictions_info[key] == value)

        axs[i, 3].text(0.5, 0.6, "ViT Prediction", ha='center', va='center', wrap=True, fontweight='bold', fontsize=10)
        axs[i, 3].text(0.5, 0.4, f"{get_category_label_string(vit_predictions_info)}", ha='center', va='center', wrap=True)
        axs[i, 3].text(0.5, 0.2, f"Matches: {vit_matching} / 4", ha='center', va='center', wrap=True)
        axs[i, 3].axis('off')

    plt.grid(True)
    plt.show()

plot_images_with_predictions(val_dataset, label_binarizer)
print()
plot_images_with_predictions(noisy_dataset, label_binarizer)