In [18]:
from torchvision import transforms, utils
from torchvision.transforms import Compose, RandomResizedCrop, RandomHorizontalFlip, ColorJitter, ToTensor, Normalize
from transformers import AutoImageProcessor, AutoModelForImageClassification
from glob import glob
from PIL import Image
from tqdm.notebook import tqdm
import torch
import torch.optim as optim


from Data_Setup import setup_data_loaders, id2label, label2id

In [2]:
import platform
if platform.system() == 'Darwin':
    DATA_PATH = "/Users/maltegenschow/Documents/Uni/Thesis/Data.nosync"
elif platform.system() == 'Linux':
    DATA_PATH = "/pfs/work7/workspace/scratch/tu_zxmav84-thesis/Data.nosync"

In [3]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

### Initialize Model and Dataloaders

In [12]:
id2label

{0: 'casual_dresses',
 1: 'jersey_dresses',
 2: 'evening_dresses',
 3: 'knitted_dresses',
 4: 'maxi_dresses',
 5: 'shift_dresses',
 6: 'occasion_dresses',
 7: 'denim_dresses'}

In [13]:
# %%
model_name = "facebook/dinov2-base"
processor = AutoImageProcessor.from_pretrained(model_name)
model = AutoModelForImageClassification.from_pretrained(model_name, id2label=id2label, label2id=label2id)
model = model.to(device)

mean = processor.image_mean
std = processor.image_std
interpolation = processor.resample

train_transform = Compose([
    #RandomResizedCrop(size=(224, 224), scale=(0.08, 1.0), ratio=(0.75, 1.3333), interpolation=interpolation),
    RandomHorizontalFlip(p=0.5),
    ColorJitter(brightness=(0.6, 1.4), contrast=(0.6, 1.4), saturation=(0.6, 1.4)),
    ToTensor(),
    Normalize(mean=mean, std=std),
])

test_transform = Compose([
    ToTensor(),
    Normalize(mean=mean, std=std),
])

train_loader, test_loader = setup_data_loaders(train_transform, test_transform, batch_size=1)

Some weights of Dinov2ForImageClassification were not initialized from the model checkpoint at facebook/dinov2-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


### Test Pretrained Embeddings

In [5]:
# imgs = glob(f'{DATA_PATH}/Zalando_Germany_Dataset/dresses/images/square_images/*.jpg')

# embeddings = {}
# for path in tqdm(imgs[:300]):
#     img = Image.open(path).convert("RGB").resize((224, 224))
#     input = processor(img, return_tensors="pt")
#     inputs = input.to(device)
#     outputs = model.dinov2(**input)
#     embeddings[path] = outputs.pooler_output

In [6]:
# # sample random item from embeddings dict
# import numpy as np
# sample = np.random.choice(list(embeddings.keys()))

# # Calculate cosine similarity between sample and all other embeddings
# from sklearn.metrics.pairwise import cosine_similarity
# cosine_sim = {}
# for path, embedding in embeddings.items():
#     cosine_sim[path] = cosine_similarity(embeddings[sample].detach().numpy(), embedding.detach().numpy())

# del cosine_sim[sample]

# # Get item with second highest similarity
# import operator
# max_sim = max(cosine_sim.items(), key=operator.itemgetter(1))[0]
# # Display images
# import matplotlib.pyplot as plt
# fig, ax = plt.subplots(1, 2)
# ax[0].imshow(Image.open(sample))
# ax[0].set_title("Sample")
# ax[1].imshow(Image.open(max_sim))
# ax[1].set_title("Most similar")
# plt.show()

### Training Loop

In [8]:
# Define Hyperparameters
NU_EPOCHS = 1
LR = 0.001
BATCH_SIZE = 8
NUM_WORKERS = 0

BACKBONE_FROZEN = True

In [21]:
if BACKBONE_FROZEN:
    for param in model.dinov2.parameters():
        param.requires_grad = False

model = model.to(device)

In [19]:
# Define Loss and optimizers
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

In [41]:
def train_epoch(model, train_loader, loss_fn, optimizer, report_interval=10):
    model.train()
    running_loss = 0.0
    for i, data in enumerate(tqdm(train_loader)):
        inputs, labels = data['image'], data['label']
        inputs = inputs.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs.logits, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        if i % report_interval == report_interval - 1:
            print(f'[{i + 1}/{len(train_loader)}] loss: {running_loss / report_interval}')
            running_loss = 0.0

    return running_loss / len(train_loader)

def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in tqdm(test_loader):
            inputs, labels = data['image'], data['label']
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.logits, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return correct / total

In [42]:
test_model(model, test_loader)

  0%|          | 0/703 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [33]:
train_loader, test_loader = setup_data_loaders(train_transform, test_transform, batch_size=4)

img, label = next(iter(train_loader))['image'], next(iter(train_loader))['label']

output = model(img)

In [34]:
loss_fn(output.logits, label)

tensor(2.7542, grad_fn=<NllLossBackward0>)