In [1]:
#Note: this code is written as an assignment in a Helsinki Uni course on Deep Learning and is heavily influenced by
#starter code provided by the lecturers Hande Celikkanat and Roman Yangarber

In [2]:
#Got a working training session from start to finish
#Good result but this used pre-trained model

In [3]:
import os
import pandas as pd
import numpy as np
import random

In [4]:
#Custom functions to read in our data from internet
#Skips if data already exists

from src import data_download
data_download.fetch_data()

Folder already has folders 'annotations' and 'images'.
Assuming you already have the data and skipping fetch.


In [5]:
import torch
from torchvision import transforms, datasets
import torch.nn as nn
from torchvision.io import read_image, ImageReadMode
from torch.utils.data import Dataset
import torch.optim as optim

In [6]:
DATA_DIR = 'data/images'

In [7]:
#These are custom made functions to handle our data
#Maybe more documentation later
#The function used here can split our data to different sets

from src import data_handling
train, test, val = data_handling.get_target_dfs(train=0.6, test=0.2, val=0.2)

In [8]:
#this code is an edited version of the code found at https://pytorch.org/tutorials/beginner/basics/data_tutorial.html

class CustomImageDataset(Dataset):
    def __init__(self, labels_df, img_dir, transform=None, transform_rate=0.1):
        self.img_labels = labels_df
        self.img_dir = img_dir
        self.transform = transform
        self.transform_rate = transform_rate

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_idx = self.img_labels.index[idx]
        img_path = os.path.join(self.img_dir, "im"+str(img_idx)+".jpg")
        image = read_image(img_path, ImageReadMode.RGB)#.float()
        labels = torch.from_numpy(self.img_labels.iloc[idx].values).float()
        if torch.cuda.is_available():
            image = image.to("cuda")
            labels = labels.to("cuda")
        if self.transform:
            image = self._random_transform(image)
        image = image.float()
        return image, labels
    
    def _random_transform(self, image):
        for transform in self.transform:
            if random.random() < self.transform_rate:
                image = transform(image)
        return image

In [9]:
### Info About Custom Dataset:

## Using Data and Indexing System:
#The label data is kept in one-hotted Pandas dataframe "labels_df". This does not contain the image data. Label data is fairly small so can be kept directly in memory.
#Dataframes have two indexing systems. Hidden internal index (iloc, or .index[]), which always goes from 0 to len-1.
#Second indexing system is the visible index, which might be different. For our dataframe, the visible index follows image indexing, which can be used to load in image data.
#Pytorch DataLoaders call dataset __getitem__ method with idx values from 0 to __len__()-1. This corresponds to our dataframe hidden indexing.
#For one item within __getitem__ method, we are dealing with a single in our dataframe
#To get image index from hidden index we set img_idx = self.img_labels.index[idx]. This is used to get image data for the item.
#Corresponding one-hotted label data is obtained with hidden index: self.img_labels.iloc[idx].values

## Reading Image
#We use torchvision read_image method
#We force every image to be read in as color images with ImageReadMode.RGB
#This way every image has 3 channels, otherwise gray images have 1 channels and Dataloader fails
#Other way would be to grayscale everything

## Other
#Additional image transformers in self.transform are only applied if they exists (not None)
#For now it is hardcoded in that the data get sent to cuda device. If there's no cuda available, this class most likely fails. Maybe I'll turn this to dynamic version later

In [10]:
transform_mix = [transforms.ColorJitter(brightness=.5, hue=.3), transforms.RandomPerspective(distortion_scale=0.6, p=1.0),
                transforms.RandomAdjustSharpness(sharpness_factor=2)]

train_loader = torch.utils.data.DataLoader(dataset=CustomImageDataset(train, DATA_DIR, transform=transform_mix), batch_size=50, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=CustomImageDataset(test, DATA_DIR, transform=None), batch_size=50, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=CustomImageDataset(val, DATA_DIR, transform=None), batch_size=50, shuffle=True)

In [11]:
#This is a pretrained base-model
#DEFAULT weights = "best" weights

from torchvision.models import resnet50, ResNet50_Weights

In [12]:
#The rest of the code is a pretty standard simple Pytorch setup

In [13]:
class MultiLabelClassifier(nn.Module):
    def __init__(self, num_labels=14):
        super(MultiLabelClassifier, self).__init__()
        self.base_model = resnet50(weights=ResNet50_Weights.DEFAULT)
        self.classifier = nn.Sequential(
            nn.Linear(1000, 256),
            nn.ReLU(),
            nn.Linear(256, num_labels),
            nn.Sigmoid()
        )

    def forward(self, x):
        features = self.base_model(x)
        out = self.classifier(features)
        return out

In [14]:
#added a simple home-made CNN for comparing to resnet
class CNN(nn.Module):
    def __init__(self, num_classes=14, dropout_rate=0.1):
        super(CNN, self).__init__()
        self.dropout = nn.Dropout(0.05)
        self.features = nn.Sequential(
          nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(kernel_size=2, stride=2),
          nn.BatchNorm2d(16),
          nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(kernel_size=2, stride=2),
          nn.BatchNorm2d(32)
        )

        self.classify = nn.Sequential(
          nn.Linear(32 * 32 * 32, 60),
          nn.ReLU(),
          nn.Dropout(p=dropout_rate),
          nn.Linear(60, num_classes),
          nn.Sigmoid()
        )

    def forward(self, x):
        #print("forwarding")
        x = self.features(x)
        #print(x.shape)
        x = x.view(-1, 32 * 32 * 32)
        x = self.classify(x)
        return x

In [24]:
if torch.cuda.is_available():
    print("Found cuda device")
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

Found cuda device


In [34]:
#comment out the model you don't want to use
#model = MultiLabelClassifier().to(device)
model = CNN().to(device)

In [35]:
loss_function = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [36]:
epochs=10
early_stop_patience = 1 # How many epochs to go without improvement

best_val_accuracy = 0.0
postpone_early_stop = early_stop_patience

for epoch in range(epochs):
    model.train() #Enables dropout layer
    print(f'Epoch {epoch+1}')
    train_loss = 0
    train_correct = 0
    total = 0
    for batch_num, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, target)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    print(f"Epoch {epoch+1} done. Training loss {train_loss/(batch_num+1):.3f}")
    
    ### VALIDATION
    model.eval() #Disables dropout layer
    val_accuracy = 0
    with torch.no_grad():
        for batch_num, (inputs, labels) in enumerate(val_loader):
            outputs = model(inputs)
            predicted_labels = (outputs > 0.5).int()
            val_accuracy += (predicted_labels == labels).float().mean().item()
    val_accuracy = val_accuracy / len(val_loader)
    print(f"Validation accuracy was: {val_accuracy}")
    
    ### EARLY STOP
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        postpone_early_stop = early_stop_patience #Reset patience
        torch.save(model.state_dict(), 'best_model_state.pt') #Save best model state
        continue
        
    #allowing for some epoch to have worse accuracy than the one before
    elif postpone_early_stop > 0:
        postpone_early_stop -= 1 
        print("Postponing early-stopping")
        continue
    else:
        print("Breaking loop due to early-stopping")
        model.load_state_dict(torch.load('best_model_state.pt'))
        break

Epoch 1
Epoch 1 done. Training loss 0.776
Validation accuracy was: 0.9281535671081071
Epoch 2
Epoch 2 done. Training loss 0.698
Validation accuracy was: 0.9283163672612037
Epoch 3
Epoch 3 done. Training loss 0.695
Validation accuracy was: 0.9282512414602586
Postponing early-stopping
Epoch 4
Epoch 4 done. Training loss 0.694
Validation accuracy was: 0.9282186840787346
Breaking loop due to early-stopping


In [37]:
#The output returns a probability array for every label
#Probability is the probability of label=1 (image has the specific label)
#These are turned to actual predictions with predicted_labels = (outputs > 0.5).int()
#This means that if it's more likely than not that image has a certain label, then it gets assigned the label
#Otherwise the image will not have the label

with torch.no_grad():
    model.eval() #Disables dropout layer
    accuracy = 0
    for inputs, labels in test_loader:
        outputs = model(inputs)
        predicted_labels = (outputs > 0.5).int()
        accuracy += (predicted_labels == labels).float().mean().item()
accuracy = accuracy / len(test_loader)
print(f"Test accuracy: {accuracy:.3f}")

Test accuracy: 0.927


In [41]:
#Note about predictions:
#We're not actually told whats the exact evaluation metric for "competition"
#Is it the one used here? So for every every label a predicion of 0 or 1
#Could be also that missed labels get penalized differently than extra labels not in ground truth