# Привязка снимков к местности

Импортируем библиотеки

In [None]:
import pandas as pd 
import numpy as np
import glob
from tqdm import tqdm
import os
import json

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms
from torchvision.models import resnet50, ResNet50_Weights

from PIL import Image
import matplotlib.pyplot as plt
from IPython.display import clear_output

from math import sin, cos

Создаем таблицу с ответами из файлов json для тренировочного датасета

In [None]:
json_dir = "/train/json/"


data_df = pd.DataFrame({"id": [], 
                        "left_top_x": [], "left_top_y": [],
                        "right_top_x": [], "right_top_y": [],
                        "left_bottom_x": [], "left_bottom_y": [],
                        "right_bottom_x": [], "right_bottom_y": [],
                        "angle": []})

json_true = []
for _, _, files in os.walk(json_dir):
    for x in files:
        if x.endswith(".json"):
            data = json.load(open(json_dir + x))
            new_row = {"id": x.split(".")[0]+".png", 
                       "left_top_x":data["left_top"][0], "left_top_y":data["left_top"][1],
                       "right_top_x":data["right_top"][0], "right_top_y":data["right_top"][1],
                       "left_bottom_x": data["left_bottom"][0], "left_bottom_y": data["left_bottom"][1],
                       "right_bottom_x": data["right_bottom"][0], "right_bottom_y": data["right_bottom"][1],
                       "angle": data["angle"]}
            data_df = data_df.append(new_row, ignore_index=True)
data_df.head(5)

Создадим два класса: для создания патчей из очень больших космических снимков, чтобы расширить тренировочный набор данных, и для составления собственно датасета из тренировочных данных. 

Для обучения модели вместо 8 координат 4 углов изображения будет использоваться только одна пара координат центра патча, что позволит облегчить процесс обучения модели за счет уменьшения количества предсказываемых параметров: их останется 3 (координаты центра и угол поворота). Поскольку нам известна форма патчей (квадрат) и их размер (1024 на 1024), мы сможем восстановить координаты углов по ответам модели.

В генератор датасета также вшито вычисление координат центра патча.

In [None]:
class Patcher(Dataset):
    def __init__(self, path, transform, base_size, seed=42):
        super().__init__()
        self.transform = transform
        self.imgs = []
        for img_name in os.listdir(path):
            self.imgs.append(Image.open(path + img_name).resize((base_size // 4, base_size // 4)))
        self.rng = np.random.RandomState(seed)

    def crop(self, img, center_x, center_y, size):
        left = center_x - size / 2
        top = center_y - size / 2
        right = center_x + size / 2
        bottom = center_y + size / 2
        return img.crop((left, top, right, bottom))

    def __getitem__(self, indx):
        img = self.imgs[self.rng.randint(0, len(self.imgs))]
        x_center = self.rng.randint(512, base_size - 512)
        y_center = self.rng.randint(512, base_size - 512)
        angle = self.rng.randint(0, 360)

        cropped = self.crop(img, x_center // 4, y_center // 4, 1024 * 3 // 2 // 4)
        rotated = cropped.rotate(angle)
        fixed_size = self.crop(rotated, 1024 * 3 // 4 // 4, 1024 * 3 // 4 // 4, 1024 // 4)
        return self.transform(fixed_size), torch.tensor(np.array([x_center / base_size, y_center / base_size, angle / 360])).float()

    def __len__(self):
        return 10000


class ImageDataset(Dataset):
    def __init__(self, df, path, transform):
        super().__init__()
        self.transform = transform
        self.imgs = []
        for name in tqdm(df['id'].values):
            self.imgs.append(self.transform(Image.open(f'{path}{name}').resize((1024 // 4, 1024 // 4))))
        xs = ['left_top_x', 'right_top_x', 'right_bottom_x', 'left_bottom_x']
        ys = ['left_top_y', 'right_top_y', 'right_bottom_y', 'left_bottom_y']
        self.x_centers = (df[xs].max(axis=1) + df[xs].min(axis=1)) / 2 / base_size
        self.y_centers = (df[ys].max(axis=1) + df[ys].min(axis=1)) / 2 / base_size
        self.angles = df['angle'].values

    def __getitem__(self, indx):
        return self.imgs[indx], torch.tensor(np.array([self.x_centers[indx], self.y_centers[indx], self.angles[indx] / 360])).float()

    def __len__(self):
        return len(self.imgs)

Задаем трансформации. Мы не можем менять геометрию изображений, поэтому не будем добавлять различные афинные преобразования. Но мы можем немного менять контраст изображения и размывать его блюром. Зачем? Возможно, это может немного сымитировать дымчатые снимки (со слабой облачностью). 

Для моделей, которые уже есть внутри Pytorch, размер изображений должен быть не менее 224 по одной из сторон, а также они должны были нормированы определенным образом. Эти значения приведены на сайте Pytorch. Несмотря на то, что патчи изначально немаленькие (со стороной 1024), визуальный анализ показывает, что уменьшение их в 4 раза (до 256) не приводит к потере значимой информации.

In [None]:
train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(256),
    transforms.RandomAutocontrast(p=0.3),
    transforms.GaussianBlur(kernel_size=(3,3), sigma=(0.01, 2.0)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225]),
])

valid_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(256),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225]),
])

Чтобы увеличить объем тренировочных данных, при помощи ПО SAS.Planet были выгружены мозаики спутниковых снимков сверхвыского разрешения (того же размера, охвата и в той же проекции, что и оригинальное изображение), представленные в поисковых системах Google, Yandex, Bing, а также в Here, Mapbox, Esri. Поскольку все они безоблачные, а и тренировочные, и тестовые данные имеют облачные патчи, в Photoshop на некоторые из изображений были добавлены фейковые облака.
Кроме того, у нас есть оригинальное изображение.

In [None]:
base_size = 10496 # Height of the original and other big images
base_path = "/bigs/" # big images
img_dir = "/train/img/" # train images

На основе нарезки на патчи больших изображений создается тренировочный датасет. На основе тренировочных из задачи — валидационный.

In [None]:
train_dataset = Patcher(base_path, train_transform, base_size)
valid_dataset = ImageDataset(data_df, img_dir, valid_transform)

На их основе подготовим лоадеры

In [None]:
train_loader = DataLoader(dataset=train_dataset,
                          batch_size=25,
                          shuffle=True)

valid_loader = DataLoader(dataset=valid_dataset,
                          batch_size=25,
                          shuffle=False)

Зададим функцию построения графика обучения

In [None]:
def plot_history(train_history, val_history, title='loss'):
    plt.figure()
    plt.title('{}'.format(title))
    plt.plot(train_history, label='train', zorder=1)
    
    points = np.array(val_history)
    steps = list(range(0, len(train_history) + 1, int(len(train_history) / len(val_history))))[1:]
    
    plt.scatter(steps, val_history, marker='x', s=180, c='orange', label='val', zorder=2)
    plt.xlabel('train steps')
    
    plt.legend(loc='best')
    plt.grid()

    plt.show()

Также определим функцию вычисления метрики на основе той, что описана в тексте задания

In [None]:
def my_loss(output, target):
    pos_err = (torch.abs(target[:,0] - output[:,0]) + torch.abs(target[:,1] - output[:,1])) / 2
    ang_diff = torch.abs(target[:,2] - output[:,2])
    ang_err = torch.minimum(ang_diff, 1 - ang_diff)

    loss = torch.mean(0.7 * pos_err + 0.3 * ang_err)
    return loss

Напишем функцию обучения модели

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def train(model, criterion, optimizer, train_loader, val_loader, epochs=50):
    train_loss_log = []
    val_loss_log = []  
    
    for epoch in tqdm(range(epochs)):
        
        model.train()
        
        train_loss = 0

        for imgs, labels in train_loader:
            optimizer.zero_grad()

            imgs = imgs.to(device)
            labels = labels.to(device)

            outputs = model(imgs)

            loss = criterion(outputs, labels)
            loss.backward()
            
            train_loss += loss.item()
            train_loss_log.append(loss.data.cpu().detach().numpy())
            
            optimizer.step()

        val_loss = 0
        
        model.eval()
        
        with torch.no_grad():
            for imgs, labels in val_loader:
                
                imgs = imgs.to(device)
                labels = labels.to(device)
                
                pred = model(imgs)
                loss = criterion(pred, labels)
                
                val_loss += loss.item()

        val_loss_log.append(val_loss)

        clear_output()
        plot_history(train_loss_log, val_loss_log, 'loss')

        print('Train loss:', train_loss / len(train_loader))
        print('Val loss:', val_loss / len(val_loader))
        
    return train_loss_log, val_loss_log

Подгружаем модель. Здесь будет использоваться модель ResNet50 с предобученными весами и измененным выходным слоем, поскольку нам нужно 3 предсказания. Данная модель представляет собой сверточную нейронную сеть, отличительной особенностью которой является наличие остаточных слоев.

В качестве функции потерь используется описанная выше функция на основе метрики из задачи, учитывающая ошибку в определении центра патча и угла его поворота. 

В качестве оптимизатора выбран адаптивный метод Adam, как один из самых эффективных.

In [None]:
print(torch.cuda.is_available()) # check GPU

In [None]:
torch.cuda.empty_cache()

model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.fc = torch.nn.Sequential(torch.nn.Linear(2048, 3), torch.nn.Sigmoid())

model = model.to(device)
criterion = my_loss

optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.001)

Обучение модели на описанных ранее данных в течение 50 эпох.

In [None]:
train_loss_log, val_loss_log = train(model, 
                                     criterion,
                                     optimizer, 
                                     train_loader, 
                                     valid_loader,
                                     50)

Далее дообучим модель на собственно тренировочных данных. Они не участвовали до этого момента в обучении модели.

Для обучения датасет, который до этого был валидационным, придется разделить на тренировочный и валидационный. Определим долю последнего как 10 %.

In [None]:
train_size = int(0.9 * len(valid_dataset))
test_size = len(valid_dataset) - train_size
train_set, val_set = torch.utils.data.random_split(valid_dataset, [train_size, test_size])

In [None]:
tr_loader = DataLoader(dataset=train_set,
                          batch_size=25,
                          shuffle=True)

val_loader = DataLoader(dataset=val_set,
                          batch_size=25,
                          shuffle=False)

Продолжим обучение модели. Вновь на 50 эпохах.

In [None]:
train_loss_log, val_loss_log = train(model, 
                                     criterion, 
                                     optimizer, 
                                     tr_loader, 
                                     val_loader,
                                     50)

Итак, следующий шаг — финальный. Оценка тестового набора данных силами обученной выше модели. Однако напомним, что модель предсказывает не те ответы, которые нужны на выходе, поэтому их еще нужно преобразовать, вычислив на основе имеющихся данных координаты углов изображений.

In [None]:
def new_x(x, y, angle): # x coordinate after rotation
    angle = angle * np.pi / 180 # from degrees to radians
    return x * np.cos(angle) - y * np.sin(angle)

def new_y(x, y, angle): # y coordinate after rotation
    angle = angle * np.pi / 180 
    return x * np.sin(angle) + y * np.cos(angle)

# size of a patch = 1024. Half size = 512
# if central point has coordinates (0, 0) and angle = 0...
left_top_x = -512
left_top_y = -512
right_top_x = 512
right_top_y = -512
left_bottom_x = -512
left_bottom_y = 512
right_bottom_x = 512
right_bottom_y = 512

In [None]:
test_images_dir = '/test_dataset_test/'
f_names = os.listdir(test_images_dir)

sub_dir = "/submission/"
if not os.path.exists(sub_dir):
    os.makedirs(sub_dir)

model.eval()

for name in f_names:
    with open(f'{sub_dir}{name[:-4]}.json', 'w') as f:
        img = valid_transform(Image.open(f'{test_images_dir}{name}').resize((1024 // 4, 1024 // 4)))
        res = model(img.to(device).unsqueeze(0))
        res = res.cpu().detach().numpy()[0]
        res_coords = res[:2] * base_size
        res_ang = res[2] * 360
        const_pred = {"left_top": [round(new_x(left_top_x, left_top_y, res_ang) + res_coords[0]), 
                                   round(new_y(left_top_x, left_top_y, res_ang) + res_coords[1])], 
                      "right_top": [round(new_x(right_top_x, right_top_y, res_ang) + res_coords[0]), 
                                    round(new_y(right_top_x, right_top_y, res_ang) + res_coords[1])],
                      "left_bottom": [round(new_x(left_bottom_x, left_bottom_y, res_ang) + res_coords[0]), 
                                      round(new_y(left_bottom_x, left_bottom_y, res_ang) + res_coords[1])], 
                      "right_bottom": [round(new_x(right_bottom_x, right_bottom_y, res_ang) + res_coords[0]), 
                                       round(new_y(right_bottom_x, right_bottom_y, res_ang) + res_coords[1])],
                      "angle": round(res_ang)}
        json.dump(const_pred, f)