# Intro

The task consists of classifying images into those that contain St. George and those that don't.
I will train several pre-trained models on the given data.

# Data Exploration

Let's take a look at the images

In [1]:
import pandas as pd
import requests
import itertools
import torch
import torch.nn as nn, torch.optim as optim
from torchvision import models, transforms, datasets
import albumentations as alb
import albumentations.augmentations.transforms as aat
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [2]:
df_george = pd.read_csv("./image_urls/georges.csv")
df_george.head()

Unnamed: 0,https://i.pinimg.com/736x/17/0d/5b/170d5b93d80d247be60f22ca1216bef7.jpg
0,https://i.pinimg.com/736x/47/b9/9a/47b99a2ddcd...
1,https://i.pinimg.com/736x/90/e8/90/90e890f054b...
2,https://i.pinimg.com/736x/0a/71/6f/0a716f6f14e...
3,https://i.pinimg.com/736x/f1/95/be/f195bea0b78...
4,https://i.pinimg.com/736x/78/da/54/78da54b8fa9...


In [None]:
df_non_george = pd.read_csv("./image_urls/non_georges.csv")
df_non_george.head()

In [5]:
george_links = set(itertools.chain(*pd.read_csv("./image_urls/georges.csv").values.tolist()))
non_george_links = set(itertools.chain(*pd.read_csv("./image_urls/non_georges.csv").values.tolist()))

Let's check if there are some bad photos, (photos both classified as george appearing and george not appearing).

In [None]:
miss_labeled = george_links.intersection(non_george_links)
miss_labeled

We can notice there are few links in both classes. Let's ignore these links

In [7]:
george_links.difference_update(miss_labeled)
non_george_links.difference_update(miss_labeled)

assert len(set(george_links).intersection(set(non_george_links))) == 0

In [8]:
len(george_links), len(non_george_links)

(2349, 3328)

In [9]:
data = []
for link in george_links:
    data.append([link, 0])
for link in non_george_links:
    data.append([link, 1])
data = np.array(data)
data.shape

(5677, 2)

### Let's now save

In [None]:
! mkdir data
! mkdir data/train
! mkdir data/test
! mkdir data/train/george
! mkdir data/train/non_george
! mkdir data/test/george
! mkdir data/test/non_george

In [None]:
def save_image(path: str, url: str, idx: int) -> None:
    response = requests.get(url)
    if response.status_code == 200:
        image_data = response.content
        with open(f"{path}/img_{idx}.jpg", 'wb') as file:
            file.write(image_data)

In [None]:
train_data, test_data = train_test_split(data, random_state=42)

In [None]:
for idx, (link, label) in enumerate(train_data):
    if label == '0':
        save_image(path='data/train/george', url=link, idx=idx)
    elif label == '1':
        save_image(path='data/train/non_george', url=link, idx=idx)

In [None]:
for idx, (link, label) in enumerate(test_data):
    if label == '0':
        save_image(path='data/test/george', url=link, idx=idx)
    elif label == '1':
        save_image(path='data/test/non_george', url=link, idx=idx)

# Loading and Augmenting Data

Let's load the data!

In [10]:
transform = transforms.Compose([transforms.Resize(255),
                                transforms.CenterCrop(224),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))])

In [11]:
class AlbuWrapper:  # typing: ignore
    def __init__(self, atrans: alb.BasicTransform):
        self.atrans = atrans

    def __call__(self, img: Image.Image) -> Image.Image:
        return self.atrans(image=np.array(img))["image"]

In [12]:
alb_transforms = alb.Compose(
    [
        alb.GaussNoise(),
        alb.OneOf(
            [alb.MotionBlur(p=0.2), alb.MedianBlur(blur_limit=3, p=0.1), alb.Blur(blur_limit=3, p=0.1)], p=0.2
        ),
        alb.OneOf([alb.OpticalDistortion(p=0.3), alb.GridDistortion(p=0.1), alb.PiecewiseAffine(p=0.3)], p=0.2),
        alb.OneOf([aat.CLAHE(clip_limit=2), alb.Sharpen(), alb.Emboss()], p=0.3),
        aat.HueSaturationValue(p=0.3),
        alb.HorizontalFlip(),
        aat.RGBShift(),
        aat.RandomBrightnessContrast(),
        aat.RandomGamma(),
        alb.CoarseDropout(2, 10, 10)
    ])

In [13]:
train_transforms = transforms.Compose(
    [AlbuWrapper(alb_transforms), transform])
test_transforms = transform

In [14]:
toPIL = transforms.ToPILImage()

In [15]:
train_dataset = datasets.ImageFolder('./data/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

test_dataset = datasets.ImageFolder('./data/test', transform=transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=True)

In [None]:
toPIL(next(iter(train_loader))[0][0][0])

# Training

Let's use a pretrained VGG16 model, and retrain it on our data.

In [17]:
dtype = torch.float

device_name = 'cpu'

if torch.cuda.is_available(): # NVIDIA GPU
    device_name = 'cuda'
elif torch.backends.mps.is_available() and torch.backends.mps.is_built(): # Apple Silicon GPU
    device_name = 'mps'

device = torch.device(device_name)
device

device(type='mps')

In [18]:
vgg_16_pretrained = models.vgg16(weights = models.VGG16_Weights.DEFAULT)
vgg_16_pretrained

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

Let's change the classifier to `output_features = 2`, as we're doing binary classification.
Also, we can notice that the `avgpool` layer has an output size of `7x7`. Let's change it to 1, so that it takes less time to train.

In [19]:
vgg_16_pretrained.classifier = nn.Sequential(
    nn.Linear(in_features=512, out_features=512, bias=True),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(in_features=512, out_features=512, bias=True),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(in_features=512, out_features=2, bias=True)
)
vgg_16_pretrained.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
vgg_16_pretrained

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

Let's define our training and testing helpers.

In [20]:
def train(model, train_loader, optimizer, loss_function):
    print('Training...')
    model.train()
    model = model.to(device)
    loss_function = loss_function.to(device)
    for batch_idx, (data, label) in tqdm(enumerate(train_loader)):
        data = data.to(device)
        label = label.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, label)
        loss.backward()
        optimizer.step()

        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))

In [21]:
def test(model, test_loader, loss_function):
    print(f'Validating ...')
    model.eval()

    test_loss = 0
    num_correct = 0
    cnt = 0

    model = model.to(device=device)
    loss_function = loss_function.to(device=device)

    with torch.no_grad():
        for data, label in tqdm(test_loader):
            data = data.to(device=device)
            label = label.to(device=device)
            output = model(data)
            test_loss += loss_function(output, label).item()
            pred = output.argmax(dim=1, keepdim=True)
            num_correct += pred.eq(label.view_as(pred)).sum().item()
            cnt += 1

    test_loss /= cnt

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, num_correct, len(test_loader.dataset),
        100. * num_correct / len(test_loader.dataset)))

For training, we'll first freeze the first few layers of the feature extractor.
And later on, we'll unfreeze them and continue training.

In [22]:
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
vgg_16_pretrained = vgg_16_pretrained.to(device)

In [None]:
for layer in vgg_16_pretrained.features[0:19]:
    for p in layer.parameters():
        p.requires_grad = False

optimizer = optim.Adam(params=vgg_16_pretrained.parameters(), lr=1e-4)

for epoch in range(0, 30):
    train(model=vgg_16_pretrained, train_loader=train_loader, optimizer=optimizer, loss_function=criterion)
    test(model=vgg_16_pretrained, test_loader=test_loader, loss_function=criterion)

We end up with $\approx$ 90% accuracy. Nice! Let's save the model at this state.

In [None]:
! mkdir checkpoints

In [None]:
state = {"model_state": vgg_16_pretrained.state_dict(),"optimizer_state": optimizer.state_dict(),"freeze":True}
torch.save(state['model_state'], "./checkpoints/conv_layers_frozen.pth")

In [None]:
test(vgg_16_pretrained, test_loader=test_loader, loss_function=criterion)

In [25]:
model = vgg_16_pretrained
model.load_state_dict(torch.load('./checkpoints/conv_layers_frozen.pth'))

<All keys matched successfully>

In [26]:
test(vgg_16_pretrained, test_loader=test_loader, loss_function=criterion)

Validating ...


100%|██████████| 23/23 [00:40<00:00,  1.76s/it]


Test set: Average loss: 0.7602, Accuracy: 1290/1420 (91%)






# Classification metrics report

In [27]:
true_labels = torch.tensor([]).to(device=device)
predicted_labels = torch.tensor([]).to(device=device)

with torch.no_grad():
    for data, label in tqdm(test_loader):
        data = data.to(device=device)
        label = label.to(device=device)
        output = model(data)
        pred = output.argmax(dim=1)
        true_labels = torch.cat([true_labels, label])
        predicted_labels = torch.cat([predicted_labels, pred])

100%|██████████| 23/23 [00:27<00:00,  1.18s/it]


In [28]:
true_labels = np.array(true_labels.cpu())
predicted_labels = np.array(predicted_labels.cpu())

In [29]:
from sklearn.metrics import classification_report

print(classification_report(y_true=true_labels, y_pred=predicted_labels))

              precision    recall  f1-score   support

         0.0       0.92      0.87      0.89       617
         1.0       0.90      0.94      0.92       803

    accuracy                           0.91      1420
   macro avg       0.91      0.90      0.91      1420
weighted avg       0.91      0.91      0.91      1420

