## Google Landmark Recognition 2021

https://www.kaggle.com/competitions/landmark-recognition-2021

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image

from sklearn.model_selection import train_test_split

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import transforms
from torchinfo import summary

from tqdm.notebook import tqdm_notebook

import sys
sys.path.append('..')

from python_scripts import engine
from python_scripts.models import Metrics

import warnings
warnings.filterwarnings('ignore')

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
elif torch.backends.mps.is_available():
    device = torch.device('mps')
else:
    device = torch.device('cpu')

In [None]:
path = '../data/Landmark/'

train_data = pd.read_csv(path + 'train.csv')
sample_csv = pd.read_csv(path + 'sample_submission.csv')

In [None]:
# Plot 5 (maximum) exmaples of images with the same landmark_id
def plot_examples(data=train_data, landmark_id=1):
    indexes = data[data['landmark_id'] == landmark_id].index
    num_pic = len(indexes) if len(indexes) < 5 else 5
    if num_pic == 0:
        print('No images available')
        return None

    fig, axs = plt.subplots(1, num_pic, figsize=(5 * num_pic, 12))
    fig.subplots_adjust(hspace=.2, wspace=.2)
    axs = axs.ravel()

    for i in range(num_pic):
        idx = indexes[i]
        image_id = train_data.loc[idx]['id']
        file = image_id + '.jpg'
        subpath = '/'.join([char for char in image_id[0:3]])
        img = cv2.imread(path + 'train/' + subpath + '/' + file)
        axs[i].imshow(img)
        axs[i].set_title('landmark_id: ' + str(landmark_id))
        axs[i].set_xticklabels([])
        axs[i].set_yticklabels([])

plot_examples(train_data, 32349)
plot_examples(train_data, 7)

In [None]:
train_val_list, _ = train_test_split(list(train_data['id']), train_size=0.01, random_state=42)
train_list, val_list = train_test_split(train_val_list, test_size=0.2, random_state=42)
test_list = list(sample_csv['id'])

len(train_list), len(val_list), len(test_list)

In [None]:
img_size = 224
batch_size = 64

In [None]:
train_transforms = transforms.Compose([
    transforms.RandomRotation(30),
    transforms.Resize(img_size),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor()
])

test_transforms = transforms.Compose([
    transforms.Resize(img_size),
    transforms.ToTensor()
])

In [None]:
class DatasetGenerator(Dataset):
    def __init__(self, data_list, path, transforms, data=None, image_size=224):
        self.data_list = data_list
        self.path = path
        self.transforms = transforms
        self.image_size = image_size
        self.data = data
        self.classes = []
        self.class_to_idx = {}
        if self.data is not None and 'landmark_id' in self.data.columns:
            self.classes = sorted(self.data['landmark_id'].unique())
            self.class_to_idx = dict(zip(self.classes, range(len(self.classes))))

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, index):
        image_id = self.data_list[index]
        file = image_id + '.jpg'
        subpath = '/'.join([char for char in image_id[0:3]])

        image = cv2.imread(self.path + subpath + '/' + file)
        image = cv2.resize(image, (self.image_size, self.image_size))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # used because OpenCV follows BGR convention and PIL follows RGB convention

        X = self.transforms(Image.fromarray(image))
        y = None
        if self.data is not None and 'landmark_id' in self.data.columns:
            c = self.data[self.data['id'] == image_id]['landmark_id'].values[0]
            y = self.class_to_idx[c]

        return X, y

In [None]:
train_dataset = DatasetGenerator(
    data_list=train_list,
    path=path + 'train/',
    transforms=train_transforms,
    data=train_data,
    image_size=img_size
)

val_dataset = DatasetGenerator(
    data_list=val_list,
    path=path + 'train/',
    transforms=test_transforms,
    data=train_data,
    image_size=img_size
)

test_dataset = DatasetGenerator(
    data_list=test_list,
    path=path + 'test/',
    transforms=test_transforms,
    image_size=img_size
)

len(train_dataset), len(val_dataset), len(test_dataset)

In [None]:
class_names = train_dataset.classes
class_names[0], len(class_names)

In [None]:
train_dataloader = DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=1,
    pin_memory=True
)

val_dataloader = DataLoader(
    dataset=val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=1,
    pin_memory=True
)

In [None]:
f, ax = plt.subplots(2, 3, figsize=(15, 12))
ax = ax.ravel()

for i, (data, label) in enumerate(train_dataloader):
    img = torchvision.utils.make_grid(data).numpy()
    img = np.transpose(img, (1, 2, 0))
    img += np.array([1, 1, 1])
    img *= 127.5
    img = img.astype(np.uint8)
    img = img[:, :, [2, 1, 0]]

    ax[i].imshow(img)
    if i == 6:
        break

plt.show()

In [None]:
learning_rate_list = [1e-3] # 각 LR 별로 10 epoch 씩 연달아 학습 진행
weight_decay_list = [0]
epochs_list = [10]
batch_size_list = [64]

In [None]:
model = torchvision.models.resnet50()
model.fc = nn.Linear(
    in_features=2048,
    out_features=512,
    bias=True
)
summary(model)

class Resnet50_Cos(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        weights = torchvision.models.ResNet50_Weights.DEFAULT
        self.backbone = torchvision.models.resnet50(weights=weights)
        self.backbone.fc = nn.Linear(
            in_features=2048,
            out_features=512,
            bias=True
        )
        self.metric = Metrics.AddMarginProduct(
            in_features=512,
            out_features=len(class_names),
            m=0.4
        )

    def forward(self, X, label):
        X = self.backbone(X)
        output = self.metric(X, label)
        return output

model = Resnet50_Cos()
summary(model)

In [None]:
tuning_results = engine.HP_tune_train(
    model=model,
    model_generator=None,
    model_weights=None,
    model_name='ResNet50_Cos_Google_landmark',
    train_dataset=train_dataset,
    test_dataset=val_dataset,
    learning_rate_list=learning_rate_list,
    weight_decay_list=weight_decay_list,
    epochs_list=epochs_list,
    batch_size_list=batch_size_list,
    is_tensorboard_writer=False,
    device=device,
    gradient_accumulation_num=1,
    metric_learning=True
)

In [None]:
model.eval()
train_features = []
train_labels = []

with torch.inference_mode():
    for X_batch_train, y_batch_train in tqdm_notebook(train_dataloader, desc='predict_1', leave=True):
        X_batch_train, y_batch_train = X_batch_train.to(device), y_batch_train.to(device)

        train_features.append(model.backbone(X_batch_train).detach().cpu())
        train_labels.append(y_batch_train.cpu())

    train_features = torch.cat(train_features, dim=0)
    train_labels = torch.cat(train_labels, dim=0)

    for X_test, _ in tqdm_notebook(test_dataset, desc='predict_2', leave=True):
        X_test = X_test.to(device)

        test_features = model.backbone(X_test.unsqueeze(0)).detach().cpu()
        cos_sim = torch.mm(nn.functional.normalize(test_features), nn.functional.normalize(train_features).T)
        test_pred = train_labels[torch.argmax(cos_sim, dim=1)]
        test_score = torch.max(nn.functional.softmax(cos_sim), dim=1)

        category = class_names[test_pred[0].numpy()]
        score = test_score[0].numpy()
        sample_csv.loc[i]['landmarks'] = str(category) + ' ' + str(score)

sample_csv.head()


In [None]:
sample_csv.to_csv('submission.csv', index=False)