# Выделение маски леса на снимке Sentinel-1 SAR: Тестирование моделей

Блокнот разбит на 4 части: оценка сверточной сети с тремя сверточными слоями, обучение ResNet-7, обучение U-Net и обучение Random Forest. 

В ходе оценки качества моделей производится предсказание ответов для всего снимка, формирование маски леса для всего снимка, оценка метрик для полученной маски, вычисление разницы между оригинальной и сгенерированной масками, а также экспорт маски в виде растра формата geotiff (tiff, содержащий информацию о пространственном расположении снимка на местности).

## Подготовка данных

Загрузка библиотек

In [None]:
# basic
import os
import joblib
import numpy as np
from osgeo import gdal
from tqdm import tqdm
from tqdm import trange
import random
import matplotlib.pyplot as plt

# preprocessing
from sklearn.preprocessing import StandardScaler

# dl
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms

# classic ml
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

# metrics
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix, matthews_corrcoef
from sklearn.metrics import balanced_accuracy_score

# seed
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.is_available()

Функции

In [None]:
# расчет метрики Intersection over Union
def IoU(tp, fp, fn):
    return tp / (tp + fp + fn)

In [None]:
# Вычисление метрик
def print_metrics(labels, outputs):
    print('Доля залесеных территорий на оригинальной маске')
    print(f'{labels[labels == 1].size / labels.size :.2}')
    print('Доля залесеных территорий на результате работы модели')
    print(f'{outputs[outputs == 1].size / outputs.size :.2}')
    
    if len(labels.shape) > 1:
        labels = labels.flatten()
        outputs = outputs.flatten()

    print(f"Matthews correlation coefficient={matthews_corrcoef(labels, outputs):.2}")
    print(f"ROC_AUC={roc_auc_score(labels, outputs):.2}")
    print(f"Balanced accuracy score={balanced_accuracy_score(labels, outputs):.2}")

    tn, fp, fn, tp = confusion_matrix(labels, outputs).ravel()
    iou = IoU(tp, fp, fn)

    print(f"Intersection over Union = {iou:.2}")

In [None]:
# геопривязка классифицированного изображения
def createGeotiff(out_raster, data, geo_transform, projection):
    driver = gdal.GetDriverByName('GTiff')
    rows, cols = data.shape
    rasterDS = driver.Create(out_raster, cols, rows, 1, gdal.GDT_Byte) 
    rasterDS.SetGeoTransform(geo_transform)
    rasterDS.SetProjection(projection)
    band = rasterDS.GetRasterBand(1)
    band.WriteArray(data)
    rasterDS = None

### Загрузка и предобработка

In [None]:
!gdown --id 18RnYJKsWTqHfdYZ1Ej_EY44FpJntrKae # forest_mask
!gdown --id 18jTiMnaGNneKwCLaThF4tEZWXKdT5fF5 # sentinel-1 sar image

In [None]:
raster_path = "/content/subalos_S1B_20191021_.tif"
mask_path = "/content/forest_mask_.tif"

image = gdal.Open(raster_path, gdal.GA_ReadOnly)

# получаем инфо о пространственной привязке
geo_transform = image.GetGeoTransform()
projection = image.GetProjectionRef()

image_array = image.ReadAsArray()

mask = gdal.Open(mask_path, gdal.GA_ReadOnly)
mask_array = mask.ReadAsArray().astype(np.int8)

image = None 
mask = None

print(image_array.shape)
print(mask_array.shape)

In [None]:
band_1 = image_array[0]
band_2 = image_array[1]

band_1 = StandardScaler().fit_transform(band_1)
band_2 = StandardScaler().fit_transform(band_2)

image_norm = np.stack((band_1, band_2), axis=-1)
image_norm.shape

Чтобы не забывать границу, после которой можно оценивать метрики

In [None]:
bound_test = 2700

Готовые модели также можно скачать, однако необходимо будет изменить пути на загружаемые модели, потому что они имеют отличные названия

In [None]:
!gdown --id 1toFZ-cKZowAvnZHNWqhzPWF8dgSUORnA # cnn
!gdown --id 1tFHnTMZH7z_2K8y9Huv1q4bpuK1xKzZr # resnet
!gdown --id 1EF0AO0sWjBHeep6NPjqby8IG7bfrygdl # unet
# готовая модель random forest весит 3 ГБ и у меня нет возможности ее загрузить на диск :(

Готовые маски также можно скачать и использовать для оценки качества

In [None]:
!gdown --id 1GT4_eYq-130UXZXJA30J5D6nTgITqI81 # cnn
!gdown --id 12kWhnVvfQ5TbbSkCuMLkymQKY2SpaZKk # resnet
!gdown --id 1n1im0gdadqvHKlQb36bYjQ8-FC2IMpwl # unet
!gdown --id 1pPidsiMsX19hM3frVq9qIFpwqWq8cMnQ # rf

## Сверточные сети для классификации

In [None]:
# генератор патчей
# так как мы классифицируем центральный пиксел, размер стороны патча должен быть нечетным
class Patcher(Dataset):
    def __init__(self, image, mask, transform, patch_size):
        super().__init__()
              
        assert patch_size % 2, "Нечетные патчи, пожалуйста!"
        self.image = image
        self.mask = mask
        self.transform = transform
        self.patch_size = patch_size
        self.im_h, self.im_w = image.shape[0], image.shape[1]
    
        half_patch = self.patch_size // 2
        # координаты центрального пиксела для восстановления маски
        coord_list = list()
        for central_x in trange(half_patch, self.im_w - half_patch): 
            for central_y in range(half_patch, self.im_h - half_patch):
                # создаем патч, только если он не нулевой
                if (self.image[central_y - half_patch:central_y + half_patch + 1,
                               central_x - half_patch:central_x + half_patch + 1] != 0).all():
                    coord_list.append([central_y, central_x])
        self.coords = np.array(coord_list)
        self.size = len(self.coords)

    def __getitem__(self, indx):
        central_x = self.coords[indx, 1] # на основе координат центрального пиксела
        central_y = self.coords[indx, 0]
        
        half_patch = self.patch_size // 2
        # вырезаем патч
        patch = self.image[central_y - half_patch:central_y + half_patch + 1, 
                           central_x - half_patch:central_x + half_patch + 1]
        
        # определяем класс
        label = self.mask[central_y][central_x]
        return self.transform(patch), torch.tensor(label), indx 
    
    def __len__(self):
        return self.size

In [None]:
# функция для валидации на тесте 
def final_validate(model, 
                  criterion,
                  test_loader):
    model.eval()
    outputs = []
    coords = []
    with torch.no_grad():
        for batch in test_loader:
            patch, label, coord = batch
            patch, label = patch.to(device), label.to(device)
            y_pred = model(patch) # get predictions
            y_pred = y_pred.squeeze().cpu()
            y_pred = torch.where(y_pred > 0.5, 1, 0)
            outputs.append(y_pred.numpy())
            coords.append(coord)
        
    outputs = np.concatenate(outputs, axis=0)
    coords = np.concatenate(coords, axis=0)
    
    
    return outputs, coords

In [None]:
# заполнение болванки изображения ответами
def fill_image(coord, dataset, zero_image, outputs):
    for indx in trange(len(coord)):
        coord_x, coord_y = dataset.coords[coord[indx]]
        zero_image[coord_x, coord_y] = outputs[indx]
        
    return zero_image

In [None]:
def generate_result_image(patch_size, 
                          img, lbl, 
                          transform, batch_size, 
                          model, criterion):
    
    dataset = Patcher(img, lbl, transform, patch_size) # without noise
        
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=False)
    
    outputs, coords = final_validate(model, 
                                     criterion,
                                     loader)

    result_image = np.zeros_like(lbl)
    
    result_image = fill_image(coord=coords, 
                              dataset=dataset, 
                              zero_image=result_image, 
                              outputs=outputs)


    result_image[result_image > 1] = 1
    
    return result_image

Так как мы только оцениваем модели, нам не нужны аугментации

In [None]:
valid_transform = transforms.Compose([
    transforms.ToTensor()])

### Simple CNN

In [None]:
class CNN_s1(nn.Module):
    def __init__(self, patch_size: int = 5):
        super().__init__()
        self.conv_stack = nn.Sequential(
            nn.Conv2d(2, 32, 3, stride=1, padding=1), # shape: [32,patch_size,patch_size]
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=1, padding=1), # shape: [64,patch_size,patch_size] 
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=1, padding=1), # shape: [128,patch_size,patch_size] 
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(128*patch_size*patch_size, 1000),
            nn.Dropout(0.25),
            nn.ReLU(), 
            nn.Linear(1000, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        scores = self.conv_stack(x)
        return scores

In [None]:
patch_size = 15
batch_size = 2048

model_cnn = torch.load('/content/model_cnn.pt')
model_cnn.eval()
model_cnn = model_cnn.to(device)
criterion = nn.BCELoss()

In [None]:
result_image = generate_result_image(patch_size=patch_size, 
                                     img=image_norm, lbl=mask_array, 
                                     transform=valid_transform, batch_size=batch_size, 
                                     model=model_cnn, criterion=criterion)

__Метрики__

In [None]:
print('Метрики для всего изображения')
print()
print_metrics(mask_array, result_image)

In [None]:
print('Метрики для тестовой части')
print()
print_metrics(mask_array[bound_test:], result_image[bound_test:])

__Оригинальная и смоделированная маски__

In [None]:
fig, axis = plt.subplots(1, 2, figsize=(20, 30))
axis[0].imshow(mask_array, cmap='Greys_r')
axis[1].imshow(result_image, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off') 

Фрагмент

In [None]:
start_x = 3500
step_x = 1000
start_y = 100
step_y = 4900

figure1 = mask_array[start_x:start_x + step_x, start_y:start_y + step_y]
figure2 = result_image[start_x:start_x + step_x, start_y:start_y + step_y]

fig, axis = plt.subplots(2, 1, figsize=(15, 7))
axis[0].imshow(figure1, cmap='Greys_r')
axis[1].imshow(figure2, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off') 

__Разница между оригинальной маской и смоделированной__

In [None]:
diff_mask = mask_array - result_image

fig = plt.figure(figsize=(10, 15))
plt.imshow(diff_mask, cmap='bwr')
plt.axis('off')
plt.show()

Белый - нет разницы

Красный - необнаруженный лес

Синий - лишний лес

__Экспорт результата__

In [None]:
out_raster = "/content/result_cnn.tif"

# экспорт результата
createGeotiff(out_raster, result_image, geo_transform, projection)

### ResNet

In [None]:
class CustomResnet(nn.Module):
    def __init__(self, class_nums = 1, patch_size=5):
        super(CustomResnet, self).__init__()
        self.activation = nn.ReLU()

        resnet_module = nn.Sequential(
            nn.Conv2d(2, 64, 3, stride=1, padding=1),
            self.activation,
            BasicBlock(64, 64, 1),
            BasicBlock(64, 128, 2),
            BasicBlock(128, 128, 1),
            nn.AdaptiveAvgPool2d((1,1))
        )

        dummy_imput = torch.rand(1, 2, patch_size, patch_size)
        out = resnet_module(dummy_imput)

        self.resnet = resnet_module
        self.fc = nn.Sequential(
            nn.Linear(out.shape[1], class_nums),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.resnet(x)
        x = nn.Flatten()(x)
        scores = self.fc(x)
        return scores

class BasicBlock(nn.Module):
    def __init__(self, conv_in, conv_out, stride_first, activation = nn.ReLU):
        super(BasicBlock, self).__init__()
        self.activation = activation()

        if stride_first == 2:
            downs_module = nn.Sequential(
            nn.Conv2d(conv_in, conv_out, 1, stride=2),
            nn.BatchNorm2d(conv_out)
            )
            self.downsample = downs_module
        else:
            self.downsample = None

        bb_module = nn.Sequential(
            nn.Conv2d(conv_in, conv_out, 3, stride=stride_first, padding=1),
            nn.BatchNorm2d(conv_out),
            self.activation,
            nn.Conv2d(conv_out, conv_out, 3, stride=1, padding=1),
            nn.BatchNorm2d(conv_out)
        )
        self.bb = bb_module

    def forward(self, x):
        x_identity = x
        out = self.bb(x)

        if self.downsample is not None: 
            x_identity = self.downsample(x) 

        out += x_identity
        out = self.activation(out)

        return out

In [None]:
patch_size = 15
batch_size = 2048

model_resnet = torch.load('/content/model_resnet.pt')
model_resnet.eval()
model_resnet = model_resnet.to(device)
criterion = nn.BCELoss()

In [None]:
result_image = generate_result_image(patch_size=patch_size, 
                                     img=image_norm, lbl=mask_array, 
                                     transform=valid_transform, batch_size=batch_size, 
                                     model=model_resnet, criterion=criterion)

__Метрики__

In [None]:
print('Метрики для всего изображения')
print()
print_metrics(mask_array, result_image)

In [None]:
print('Метрики для тестовой части')
print()
print_metrics(mask_array[bound_test:], result_image[bound_test:])

__Оригинальная и смоделированная маски__

In [None]:
fig, axis = plt.subplots(1, 2, figsize=(20, 30))
axis[0].imshow(mask_array, cmap='Greys_r')
axis[1].imshow(result_image, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off') 

Фрагмент

In [None]:
start_x = 3500
step_x = 1000
start_y = 100
step_y = 4900

figure1 = mask_array[start_x:start_x + step_x, start_y:start_y + step_y]
figure2 = result_image[start_x:start_x + step_x, start_y:start_y + step_y]

fig, axis = plt.subplots(2, 1, figsize=(15, 7))
axis[0].imshow(figure1, cmap='Greys_r')
axis[1].imshow(figure2, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off') 

__Разница между оригинальной маской и смоделированной__

In [None]:
diff_mask = mask_array - result_image

fig = plt.figure(figsize=(10, 15))
plt.imshow(diff_mask, cmap='bwr')
plt.axis('off')
plt.show()

Белый - нет разницы

Красный - необнаруженный лес

Синий - лишний лес

__Экспорт результата__

In [None]:
out_raster = "/content/result_resnet.tif"

# экспорт результата
createGeotiff(out_raster, result_image, geo_transform, projection)

## Сверточная сеть U-Net для сегментации

In [None]:
# new patches
class Patcher_UNet(Dataset):
    def __init__(self, image, mask, transform, patch_size=256, train_part=True):
        super().__init__()
              
        self.image = image
        self.mask = mask
        self.transform = transform
        self.patch_size = patch_size
        self.im_h, self.im_w = image.shape[0], image.shape[1]
        self.train_part = train_part
    
        coord_list = list()
        for corner_x in trange(0, self.im_w // self.patch_size * self.patch_size, self.patch_size): 
            for corner_y in range(0,  self.im_h // self.patch_size * self.patch_size, self.patch_size):
                if (self.image[corner_y:corner_y + self.patch_size,
                               corner_x:corner_x + self.patch_size] != 0).all():
                    coord_list.append([corner_y, corner_x])
        if not train_part:
            corner_x = self.im_w - self.patch_size
            for corner_y in range(0,  self.im_h // self.patch_size * self.patch_size, self.patch_size):
                coord_list.append([corner_y, corner_x])
            
        self.coords = np.array(coord_list)
        self.size = len(self.coords)

    def __getitem__(self, indx):
        corner_x = self.coords[indx, 1]
        corner_y = self.coords[indx, 0]
        
        patch = self.image[corner_y:corner_y + self.patch_size, 
                           corner_x:corner_x + self.patch_size]
        label = self.mask[corner_y:corner_y + self.patch_size, 
                           corner_x:corner_x + self.patch_size]
        
        if self.train_part:
            trans = transforms.Compose([transforms.ToTensor()])
            concat = torch.cat((trans(patch), trans(label)))
            concat_transformed = self.transform(concat)
            patch, label = torch.split(concat_transformed, 2)
        else:
            patch, label = self.transform(patch), self.transform(label)
        return patch, label, indx # dataset.coords[indx]
    
    def __len__(self):
        return self.size

In [None]:
# заполнение болванки изображения ответами
def fill_image_Unet(coord, dataset, zero_image, outputs, patch_size):
    for indx in trange(len(coord)):
        coord_x, coord_y = dataset.coords[coord[indx]]
        zero_image[coord_x:coord_x + patch_size, coord_y:coord_y + patch_size] = outputs[indx]
        
    return zero_image

In [None]:
def generate_result_image_unet(patch_size, 
                          img, lbl, 
                          transform, batch_size, 
                          model, criterion):
    
    dataset = Patcher_UNet(img, lbl, transform, patch_size, train_part = False) # without noise
        
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=False)
    model.eval()
    outputs = []
    coords = []
    with torch.no_grad():
        for batch in loader:
            imgs, labels, coord = batch
            imgs, labels = imgs.to(device), labels.to(device)
            y_pred = model(imgs)
            y_pred = y_pred.squeeze().cpu()
            y_pred = torch.where(y_pred > 0.5, 1, 0)
            outputs.append(y_pred.numpy())
            coords.append(coord)
    outputs = np.concatenate(outputs, axis=0)
    coords = np.concatenate(coords, axis=0)
        

    result_image = np.zeros_like(lbl)
    
    result_image = fill_image_Unet(coord=coords, 
                                   dataset=dataset, 
                                   zero_image=result_image, 
                                   outputs=outputs,
                                   patch_size=patch_size)


    result_image[result_image > 1] = 1
    
    return result_image

In [None]:
# loss
class BinaryDiceLoss(nn.Module):
    def __init__(self, p=2, epsilon=1e-6):
        super().__init__()
        self.p = p  # pow degree
        self.epsilon = epsilon

    def forward(self, predict, target):
        predict = predict.flatten(1)
        target = target.flatten(1)

        num = torch.sum(torch.mul(predict, target), dim=1) + self.epsilon
        den = torch.sum(predict.pow(self.p) + target.pow(self.p), dim=1) + self.epsilon
        loss = 1 - 2 * num / den

        return loss.mean()  # over batch

In [None]:
valid_transform = transforms.Compose([
    transforms.ToTensor()])

In [None]:
patch_size = 256
batch_size = 4

In [None]:
model_unet = torch.hub.load('mateuszbuda/brain-segmentation-pytorch', 'unet',
                       in_channels=2, out_channels=1, init_features=32, pretrained=False)
model_unet = torch.load("/model_unet.pkl")
model_unet.eval()
model_unet = model_unet.to(device)
criterion = BinaryDiceLoss()

In [None]:
result_image = generate_result_image_unet(patch_size=patch_size, 
                                         img=image_norm, lbl=mask_array, 
                                         transform=valid_transform, batch_size=batch_size, 
                                         model=model_unet, criterion=criterion)

__Метрики__

In [None]:
print('Метрики для всего изображения')
print()
print_metrics(mask_array, result_image)

In [None]:
print('Метрики для тестовой части')
print()
print_metrics(mask_array[bound_test:], result_image[bound_test:])

__Оригинальная и смоделированная маски__

In [None]:
fig, axis = plt.subplots(1, 2, figsize=(20, 30))
axis[0].imshow(mask_array, cmap='Greys_r')
axis[1].imshow(result_image, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off')

Фрагмент

In [None]:
start_x = 3500
step_x = 1000
start_y = 100
step_y = 4900

figure1 = mask_array[start_x:start_x + step_x, start_y:start_y + step_y]
figure2 = result_image[start_x:start_x + step_x, start_y:start_y + step_y]

fig, axis = plt.subplots(2, 1, figsize=(15, 7))
axis[0].imshow(figure1, cmap='Greys_r')
axis[1].imshow(figure2, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off') 

__Разница между оригинальной маской и смоделированной__

In [None]:
diff_mask = mask_array - result_image

fig = plt.figure(figsize=(10, 15))
plt.imshow(diff_mask, cmap='bwr')
plt.axis('off')
plt.show()

Белый - нет разницы

Красный - необнаруженный лес

Синий - лишний лес

__Экспорт результата__

In [None]:
out_raster = "/content/result_unet.tif"

# экспорт результата
createGeotiff(out_raster, result_image, geo_transform, projection)

## Random Forest

In [None]:
rf = joblib.load("/content/random_forest.joblib")

In [None]:
full_image_rf = np.reshape(image_norm, [image_norm.shape[0] * image_norm.shape[1], image_norm.shape[2]])
full_labels_rf = mask_array.flatten()

result_image = rf.predict(full_image_rf)
result_image = np.reshape(result_image, (image_norm.shape[0], image_norm.shape[1]))
result_image.shape

__Метрики__

In [None]:
print('Метрики для всего изображения')
print()
print_metrics(mask_array, result_image)

In [None]:
print('Метрики для тестовой части')
print()
print_metrics(mask_array[bound_test:], result_image[bound_test:])

__Оригинальная и смоделированная маски__

In [None]:
fig, axis = plt.subplots(1, 2, figsize=(20, 30))
axis[0].imshow(mask_array, cmap='Greys_r')
axis[1].imshow(result_image, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off')

Фрагмент

In [None]:
start_x = 3500
step_x = 1000
start_y = 100
step_y = 4900

figure1 = mask_array[start_x:start_x + step_x, start_y:start_y + step_y]
figure2 = result_image[start_x:start_x + step_x, start_y:start_y + step_y]

fig, axis = plt.subplots(2, 1, figsize=(15, 7))
axis[0].imshow(figure1, cmap='Greys_r')
axis[1].imshow(figure2, cmap='Greys_r')
axis[0].title.set_text('Original')
axis[1].title.set_text('Parody')
for a in axis:
    a.axis('off') 

__Разница между оригинальной маской и смоделированной__

In [None]:
diff_mask = mask_array - result_image

fig = plt.figure(figsize=(10, 15))
plt.imshow(diff_mask, cmap='bwr')
plt.axis('off')
plt.show()

Белый - нет разницы

Красный - необнаруженный лес

Синий - лишний лес

__Экспорт результата__

In [None]:
out_raster = "/content/result_rf.tif"

# экспорт результата
createGeotiff(out_raster, result_image, geo_transform, projection)