# danbooru tagging with efficientnet

In [1]:
import numpy as np
import pandas as pd

import os
import shutil
from pathlib import Path
from PIL import Image 

import torch
from torch import nn
from torch.nn import functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import transforms as T

from torchinfo import summary

In [2]:
dev = torch.device(('cpu', 'cuda')[torch.cuda.is_available()])

## dataset

see dataprep dir for full dataprep process

In [3]:
img_url, top_url = ['/'.join([
    'https:/',
    'raw.githubusercontent.com',
    'Morshay',
    'tag-ur-it',
    'main',
    f'{f}_tags.csv'
]) for f in ['img', 'top']]

all_labels = pd.read_csv(img_url, converters={'tags': eval})
label_converter = pd.read_csv(top_url).squeeze()

### helper defs

In [4]:
def lbls2proba(labels):
    return torch.FloatTensor(
        label_converter.apply(
            lambda name:
            .9 if name in labels else .1
        )
    )

In [5]:
preprocess = T.Compose([
    T.Resize(224),
    T.ToTensor(),
    T.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])

### dataset class

In [6]:
class DanbooruDataset(Dataset):

    def __init__(self, label_data, img_dir,
                 transform=preprocess,
                 target_transform=lbls2proba):
        
        self.label_data = label_data
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.label_data)

    def __getitem__(self, idx):
        img_id = self.label_data.iloc[idx, 0]
        img_path = Path(self.img_dir) / f'{img_id}.jpg'
        image = self.transform(Image.open(img_path))        
        labels = self.target_transform(self.label_data.iloc[idx, 1])
        return image.to(dev), labels.to(dev)

### instantiate

In [7]:
train_ids = [int(f.stem) for f in Path('train').glob('*')]
train_labels = all_labels[all_labels.id.isin(train_ids)]
train_ds = DanbooruDataset(label_data=train_labels, img_dir='train')

In [8]:
val_ids = [int(f.stem) for f in Path('val').glob('*')]
val_labels = all_labels[all_labels.id.isin(val_ids)]
val_ds = DanbooruDataset(label_data=val_labels, img_dir='val')

## model

pretty much everything was taken from [RF5](https://github.com/RF5/danbooru-pretrained) and [anthony](https://github.com/anthony-dipofi/danbooru-tagger). i just edited them together.

### defs

In [9]:
def bn_drop_lin(in_size, out_size):
    return nn.Sequential(
        nn.BatchNorm1d(
            in_size,
            eps=1e-05,
            momentum=0.1,
            affine=True,
            track_running_stats=True),
        nn.Dropout(p=0.25, inplace=False),
        nn.Linear(in_size, out_size)
    )

In [10]:
class EffnetTagger(nn.Module):
    def __init__(self,
                 out_classes=3773,
                 base_model='efficientnet_b4',  # effnet_v2_s SOON™
                 effnet_out_features=1792):
        super(EffnetTagger, self).__init__()
        
        self.out_classes=out_classes
        self.effnet_out_features = effnet_out_features

        net = torch.hub.load('pytorch/vision:v0.12.0',
                             base_model, pretrained=True)
        
        self.effnet = nn.Sequential(*list(net.children())[:-1])
        self.out_1 = bn_drop_lin(self.effnet_out_features, 512)
        self.out_2 = bn_drop_lin(512, self.out_classes)

    def forward(self, t_in):

        t = F.leaky_relu(self.effnet(t_in))[:, :, 0, 0]

        t1 = F.leaky_relu(self.out_1(t))
        t2 = self.out_2(t1)
        
        t_rs = t2.reshape([len(t), self.out_classes])
        t_cl = torch.clamp(t_rs, -10, 10)
        
        t_out = torch.sigmoid(t_cl)

        return t_out

### summary

something with torchinfo if i don't forget to read how to do it

In [11]:
EffnetTagger()

Using cache found in C:\Users\Morshay/.cache\torch\hub\pytorch_vision_v0.12.0


EffnetTagger(
  (effnet): Sequential(
    (0): Sequential(
      (0): ConvNormActivation(
        (0): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): SiLU(inplace=True)
      )
      (1): Sequential(
        (0): MBConv(
          (block): Sequential(
            (0): ConvNormActivation(
              (0): Conv2d(48, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=48, bias=False)
              (1): BatchNorm2d(48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
              (2): SiLU(inplace=True)
            )
            (1): SqueezeExcitation(
              (avgpool): AdaptiveAvgPool2d(output_size=1)
              (fc1): Conv2d(48, 12, kernel_size=(1, 1), stride=(1, 1))
              (fc2): Conv2d(12, 48, kernel_size=(1, 1), stride=(1, 1))
              (activation): SiLU(inplace=True)
              (scale_activation):

## training model

### defs

In [12]:
def get_data(train_ds, val_ds, bs):
    return (
        DataLoader(train_ds, batch_size=bs, shuffle=True),
        DataLoader(val_ds, batch_size=bs * 2)
    )

In [13]:
def get_model():
    model = EffnetTagger().to(dev)
    optimizer = optim.AdamW(
        model.parameters(),
        lr=0.001,
        betas=(0.9, 0.999),
        eps=1e-08,
        weight_decay=0.01,
        amsgrad=False
    )
    
    return model, optimizer

In [14]:
def loss_batch(model, loss_func, xb, yb, opt=None):
    loss = loss_func(model(xb), yb)

    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(), len(xb)

In [15]:
def fit(model, opt, train_dl, val_dl, epochs=2, loss_func=nn.MSELoss()):
    
    loss_data = {
        'train': [],
        'val': []
    }
    
    for epoch in range(epochs):
        model.train()
        for xb, yb in train_dl:
            xb.to(dev)
            yb.to(dev)
            losses, nums = zip(
                *[loss_batch(model, loss_func, xb, yb, opt)
                  for xb, yb in train_dl]
            )
        train_loss = np.sum(np.multiply(losses, nums)) / np.sum(nums)
        loss_data['train'].append(train_loss)
        
        model.eval()
        with torch.no_grad():
            losses, nums = zip(
                *[loss_batch(model, loss_func, xb, yb)
                  for xb, yb in val_dl]
            )
        val_loss = np.sum(np.multiply(losses, nums)) / np.sum(nums)
        loss_data['val'].append(val_loss)

        print(
            f'epoch: {epoch} ; train MSE: {train_loss:.4f} ; val MSE: {val_loss:.4f}')
        
    return pd.DataFrame(loss_data)

### loop

In [16]:
train_dl, val_dl = get_data(train_ds, val_ds, 2)
model, opt = get_model()
res = pd.DataFrame()

Using cache found in C:\Users\Morshay/.cache\torch\hub\pytorch_vision_v0.12.0


In [17]:
res = pd.concat([res, fit(model, opt, train_dl, val_dl)])

epoch: 0 ; train MSE: 0.1766 ; val MSE: 0.1565
epoch: 1 ; train MSE: 0.1608 ; val MSE: 0.3002
