##  Import libraries

In [1]:
import os
import csv
from random import choice
from PIL import Image

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.utils.data import Dataset, SubsetRandomSampler
from torchvision import transforms
from sklearn.metrics import f1_score

import albumentations

In [8]:
device = torch.device('cuda:0')

## Help objects

In [7]:
class HotdogOrNotDataset(Dataset):
    def __init__(self, folder, transform=None, use_albumentations=False):
        self.transform = transform
        self.folder = folder
        self.name_data = os.listdir(folder)
        self.use_albumentations = use_albumentations
    def __len__(self):
        return len(self.name_data)
    
    def __getitem__(self, index):        
        img_id = self.name_data[index]
        img = Image.open(os.path.join(self.folder, img_id))
        
        frame = img_id.find('_')
        name = img_id[:frame]
        y = 0
        if name in (set(['frankfurter', 'chili-dog', 'hotdog'])):
            y = 1

        if self.transform:
            if self.use_albumentations:
                image = self.transform(image=np.array(img))
                img = image['image']
            else:
                img = self.transform(img)
            
    
        return img, y, img_id

In [4]:
def visualize_samples(dataset, indices, title=None, count=10):
    plt.figure(figsize=(count*3,3))
    display_indices = indices[:count]
    if title:
        plt.suptitle("%s %s/%s" % (title, len(display_indices), len(indices)))        
    for i, index in enumerate(display_indices):    
        x, y, _ = dataset[index]
        plt.subplot(1,count,i+1)
        plt.title("Label: %s" % y)
        plt.imshow(x)
        plt.grid(False)
        plt.axis('off')

In [6]:
def compute_accuracy_and_f1(model, loader):
    model.eval()
    
    correct_samples = 0
    total_samples = 0
    total_f1 = 0
    y_full = []
    prediction_full = []
    for _, (x, y, _) in enumerate(loader):
        x_gpu = x.to(device)
        y_gpu = y.to(device)
        prediction = model(x_gpu)
        indices = torch.argmax(prediction, 1)
        correct_samples += torch.sum(indices == y_gpu)
        total_samples += y.shape[0]
        y_full.extend(y.tolist())
        prediction_full.extend(indices.tolist())
    val_accuracy = float(correct_samples) / total_samples
    total_f1 = f1_score(y_full, prediction_full)
    
    return val_accuracy, total_f1

In [5]:
def train_model(model, train_loader, val_loader, loss, optimizer, num_epochs, scheduler):    
    loss_history = []
    train_history = []
    val_history = []
    val_f1_history = []
    for epoch in range(num_epochs):
        print(f'--Epoch {epoch+1}/{num_epochs}, lr = {scheduler.get_last_lr()[-1]}')
        
        model.train()
        
        loss_accum = 0
        correct_samples = 0
        total_samples = 0
        for i_step, (x, y,_) in enumerate(train_loader):
          
            x_gpu = x.to(device)
            y_gpu = y.to(device)
            prediction = model(x_gpu)    
            loss_value = loss(prediction, y_gpu)
            optimizer.zero_grad()
            loss_value.backward()
            optimizer.step()
            
            _, indices = torch.max(prediction, 1)
            correct_samples += torch.sum(indices == y_gpu)
            total_samples += y.shape[0]
            
            loss_accum += loss_value
        scheduler.step()

        ave_loss = loss_accum / i_step
        train_accuracy = float(correct_samples) / total_samples
        val_accuracy, val_f1 = compute_accuracy_and_f1(model, val_loader)
        
        loss_history.append(float(ave_loss))
        train_history.append(train_accuracy)
        val_history.append(val_accuracy)
        val_f1_history.append(val_f1)
        
        print("----Average loss: %f, Train accuracy: %f, Val accuracy: %f, Val f1 score: %f" % (ave_loss, train_accuracy, val_accuracy, val_f1))
        
    return loss_history, train_history, val_history, val_f1_history

## Load and prepaire data

In [None]:
tfs_for_vis = albumentations.Compose([
    albumentations.RandomRotate90(p=0.3),
    albumentations.Blur(p=0.1),
    albumentations.RandomBrightness(p=0.3),
    albumentations.GaussNoise(p=0.3),
    albumentations.ShiftScaleRotate(p=0.3),
    albumentations.JpegCompression(quality_lower=90, p=0.3),
    ])

tfs = albumentations.Compose([
    albumentations.RandomRotate90(p=0.3),
    albumentations.Blur(p=0.1),
    albumentations.RandomBrightness(p=0.3),
    albumentations.GaussNoise(p=0.3),
    albumentations.ShiftScaleRotate(p=0.3),
    albumentations.JpegCompression(quality_lower=90, p=0.3),
    albumentations.Resize(224, 224),
    albumentations.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    albumentations.torch.ToTensor(),
    ])

In [None]:
train_dataset = HotdogOrNotDataset('data/train.lnk', 
                       transform=transforms.Compose([
                           transforms.Resize((224, 224)),
                           #transforms.ColorJitter(hue=.50, saturation=.50),
                           transforms.RandomVerticalFlip(),
                           transforms.RandomRotation(50),
                           transforms.ToTensor(),
                           # Use mean and std for pretrained models
                           # https://pytorch.org/docs/stable/torchvision/models.html
                           transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])                         
                       ])
                      )
test_dataset = HotdogOrNotDataset('data/test.lnk', 
                       transform=transforms.Compose([
                           transforms.Resize((224, 224)),
                           transforms.ToTensor(),
                           # Use mean and std for pretrained models
                           # https://pytorch.org/docs/stable/torchvision/models.html
                           transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])                         
                       ])
                      )

In [None]:
indices = np.random.choice(np.arange(len(train_dataset)), 10, replace=False)

visualize_samples(train_dataset, indices, "Samples")

In [None]:
batch_size = 8

data_size = len(train_dataset)
validation_fraction = .2


val_split = int(np.floor((validation_fraction) * data_size))
indices = list(range(data_size))
np.random.shuffle(indices)

val_indices, train_indices = indices[:val_split], indices[val_split:]

train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, 
                                           sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,
                                         sampler=val_sampler)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

## Models

In [None]:
params = {'lr': [1e-2, 1e-3, 1e-4],
          'wd': [1e-2, 1e-3, 1e-4, 1e-5],
          'gamma': [0.6, 0.8, 0.9]}

iterations = 4
best_model = None
best_val_f1 = 0
best_lr = None
best_wd = None
best_gamma = None

### ResNet50

In [None]:
for i in range(iterations):
    lr = choice(params['lr'])
    wd = choice(params['wd'])
    gamma = choice(params['gamma'])
    
    print(f'Iteration {i+1}/{iterations}: lr = {lr}, wd = {wd}, gamma = {gamma}')
    
    model = models.resnte50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
    # for param in model.parameters():
    #     param.requires_grad = False
        
    num_ft = model.fc.in_features
    
    model.fc = nn.Linear(num_ft, 2)
    model.to(device)
    model_params = model.fc.parameters()

    loss = nn.CrossEntropyLoss()
    optimizer = optim.Adam([
        {'params': [param for name, param in model.state_dict().items()
                    if not 'fc' in name]},
        {'params': model_params, 'lr': lr, 'weight_decay': wd}],
                           lr=lr*1e-2, weight_decay=wd*1e-2)
    scheduler = optim.lr_scheduler.StepLR(optimizer, 2, gamma=gamma)
    loss_history, train_history, val_history, val_f1_history = train_model(model, train_loader, val_loader, loss, optimizer, 3, scheduler)
    f1_model = val_f1_history[-1]
    if f1_model> best_val_f1:
        best_val_f1 = f1_model
        best_model = model
        best_lr = lr
        best_wd = wd
        best_gamma = gamma