In [2]:
import os
import random
import functools
from functools import partial
import PIL

import numpy as np 
import pandas as pd

from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

import timm

In [3]:
class SoftMaxDS(Dataset):
    def __init__(self, data, images_path, return_triplet = True):
        super().__init__()
        self.imgs = data['image'].tolist()
        self.unique_labels = data['label_group'].unique().tolist()
        self.labels = data['label_group'].astype('category')
        self.label_codes = self.labels.cat.codes
        
        self.images_path = images_path
        
    def __getitem__(self, idx):
        
        img = self._get_item(idx)
        label = self.label_codes.iloc[idx]
        return img, label
    def __len__(self):
        return len(self.imgs)
    
    def _get_item(self, idx):
        im = PIL.Image.open(os.path.join(self.images_path, self.imgs[idx]))
        im = torch.tensor(np.array(im) / 255.0, dtype = torch.float).permute(2,0,1)
        return im

In [4]:
# load in data

df = pd.read_csv('data/train.csv')
small_images_dir = 'data/small_train_images/'
n_classes = df['label_group'].nunique()
np.random.seed(1337)

# train val split

train_perc = 0.7
n_train_examples = int(train_perc * len(df))

train_df = df.iloc[:n_train_examples]
val_df = df.iloc[n_train_examples:]

In [5]:
# creating dataloaders

vision_model = 'resnet50'

bs = 64
tr_ds = SoftMaxDS(df, small_images_dir)
tr_dl = DataLoader(tr_ds, batch_size = bs, shuffle = True, pin_memory = True)

device = torch.device('cuda')

In [6]:
class EMBCLass(nn.Module) :
    def __init__(self, pretrained_image_embedor='resnet50',
                output_dim=512) :
        super(EMBCLass, self).__init__()
        self.image_embedor = timm.create_model(pretrained_image_embedor, pretrained=True)
        self.image_pool = nn.AdaptiveAvgPool2d((1,1))
        self.head = nn.Sequential(nn.Linear(2048, output_dim), 
                                  #nn.ReLU(),
                                  )
        
        for m in self.head.modules():
            if isinstance(m, nn.Linear):
                sz = m.weight.data.size(-1)
                m.weight.data.normal_(mean=0.0, std=1/np.sqrt(sz))
            elif isinstance(m, (nn.LayerNorm, nn.BatchNorm1d)):
                m.bias.data.zero_()
                m.weight.data.fill_(1.0)
                m.bias.data.zero_()
            if isinstance(m, nn.Linear) and m.bias is not None:
                m.bias.data.zero_()
    
    def _get_embs(self, x) :
        images = x
        out_images = self.image_embedor.forward_features(images)
        out_images = self.image_pool(out_images).squeeze()
        #return F.normalize(out_images, dim=-1)
        return out_images
    
    def forward(self, x) :
        out_images = self._get_embs(x)
        
        return self.head(out_images)

In [7]:
model = EMBCLass(vision_model, output_dim=n_classes).to(device)

In [8]:

normalize = transforms.Normalize(mean=(0.485, 0.456, 0.406),
                                 std=(0.229, 0.224, 0.225))

train_transforms = transforms.Compose([transforms.ColorJitter(.3,.3,.3),
                                       transforms.RandomRotation(5),
                                       transforms.RandomCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       normalize
                                       ])

val_transforms = transforms.Compose([transforms.Resize((224,224)),
                                     normalize
                                     ])

n_epochs = 30

lf = nn.CrossEntropyLoss()

lr = 1e-2
wd = 0
no_decay = ["bias", "BatchNorm2d.weight", "BatchNorm2d.bias", "LayerNorm.weight", 'LayerNorm.bias',
            "BatchNorm1d.weight", "BatchNorm1d.bias"]

optimizer_grouped_parameters = [
    {
        "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
        "weight_decay": wd,
    },
    {
        "params": [p for n, p in  model.named_parameters() if any(nd in n for nd in no_decay)],
        "weight_decay": 0.0,
    },
]
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=lr)

# learning rate scheduler
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr =lr, pct_start = 0.3, #anneal_strategy = 'linear',
                                            total_steps = int(n_epochs * len(tr_dl)))

In [None]:
tr_losses = []
val_losses = []
for ep in tqdm(range(n_epochs)):
    model.train()
    tr_loss = []
    pbar = tqdm(tr_dl)
    for imgs, labels in pbar:
        
        imgs = train_transforms(imgs.to(device))
        
        optimizer.zero_grad()
        out = model(imgs)
        loss = lf(out, labels.long().to(device))
            
        loss.backward()
        optimizer.step()
        sched.step()
        
        tr_loss.append(loss.item())
        pbar.set_description(f"Train loss: {round(np.mean(tr_loss),3)}")
    
    if ep%2==0 :
        torch.save(model.state_dict(), 'data/tests_model_image/model_class_ep_{}.pth'.format(ep))
    model.eval()
    tr_losses.append(tr_loss)
    summary = f"Ep {ep}: Train loss {tr_loss}"
    print(summary) 
    

HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=536.0), HTML(value='')))