In [1]:
# команды для того, чтобы не баговался импорт функций и классов из других файлов
%load_ext autoreload
%autoreload

In [2]:
# основной импорт
import os
import torch

# импорт для работы с нейронными сетями (nn) и для классов оптимизаторов (Adam, SGD)
from torch import nn, optim

# импорт функций, которые не имеют обучаемых параметров relu, siftmax и др
import torch.nn.functional as F

# импорт уже готовых датасетов и трансформаций (нормализация, обрезка и тд.)
from torchvision import datasets, transforms, models

# Dataset - базовый класс для работы с датасетами, можно создать свои, наследуясь от него
# DataLoader - для создания лоадеров - разделенных на батчи данных
from torch.utils.data import DataLoader, Dataset

# стандартная библиотека для работы со случайными числами
import random

# стандартный импорт нампая
import numpy as np

# библиотека OpenCV для обработки изображений
import cv2

# готовая функция для вычисления метрики IoU
# from torchmetrics.functional.detection.iou import intersection_over_union
from torchmetrics.functional import jaccard_index

# импорты для визуализации и стилизации
import matplotlib.pyplot as plt
# import mplcyberpunk
# plt.style.use("cyberpunk")

# torchutils для проверки модели на размерности
import torchutils as tu

In [3]:
# фикс рандомности
torch.manual_seed(52)

<torch._C.Generator at 0x105f8e410>

## 1. Зададим архитектуру модели

In [4]:
# The argument n_class specifies the number of classes for the segmentation task.
class UNet(nn.Module):
    def __init__(self, n_class):
        super().__init__()
        
        # Encoder
        # -------
        # Каждый блок в encoder состоит из двух сверточных слоев,
        # за которыми следует max-pooling, за исключением последнего блока.
        # ------- 
        # Используем padding=1 как best practice и для сохранения размеров карт 
        # признаков после сверток, облегчения реализации skip-connection, 
        # убираем необходимость пост-обработки выходного изображения (размеров)
        # -------
        # input: 640x640x3
        self.e11 = nn.Conv2d(3, 64, kernel_size=3, padding=1) # output: 640x640x64
        self.e12 = nn.Conv2d(64, 64, kernel_size=3, padding=1) # output: 640x640x64
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 320x320x64

        # input: 320x320x64
        self.e21 = nn.Conv2d(64, 128, kernel_size=3, padding=1) # output: 320x320x128
        self.e22 = nn.Conv2d(128, 128, kernel_size=3, padding=1) # output: 320x320x128
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 160x160x128

        # input: 160x160x128
        self.e31 = nn.Conv2d(128, 256, kernel_size=3, padding=1) # output: 160x160x256
        self.e32 = nn.Conv2d(256, 256, kernel_size=3, padding=1) # output: 160x160x256
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 80x80x256

        # input: 80x80x256
        self.e41 = nn.Conv2d(256, 512, kernel_size=3, padding=1) # output: 80x80x512
        self.e42 = nn.Conv2d(512, 512, kernel_size=3, padding=1) # output: 80x80x512
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 40x40x512

        # input: 40x40x512
        self.e51 = nn.Conv2d(512, 1024, kernel_size=3, padding=1) # output: 40x40x1024
        self.e52 = nn.Conv2d(1024, 1024, kernel_size=3, padding=1) # output: 40x40x1024


        # Decoder
        # -------
        # Повышает размерность обратно до исходного изображения
        # Каждый блок в декодировщике состоит из слоя апсемплинга,
        # конкатенации с соответствующей картой признаков из encoder,
        # и двух сверточных слоев
        # -------
        self.upconv1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2) # 40x40 -> 80x80
        self.d11 = nn.Conv2d(1024, 512, kernel_size=3, padding=1) # Concatenated: 512 + 512 = 1024
        self.d12 = nn.Conv2d(512, 512, kernel_size=3, padding=1)

        self.upconv2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2) # 80x80 -> 160x160
        self.d21 = nn.Conv2d(512, 256, kernel_size=3, padding=1) # Concatenated: 256 + 256 = 512
        self.d22 = nn.Conv2d(256, 256, kernel_size=3, padding=1)

        self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2) # 160x160 -> 320x320
        self.d31 = nn.Conv2d(256, 128, kernel_size=3, padding=1) # Concatenated: 128 + 128 = 256
        self.d32 = nn.Conv2d(128, 128, kernel_size=3, padding=1)

        self.upconv4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2) # 320x320 -> 640x640
        self.d41 = nn.Conv2d(128, 64, kernel_size=3, padding=1) # Concatenated: 64 + 64 = 128
        self.d42 = nn.Conv2d(64, 64, kernel_size=3, padding=1)

        # Output layer
        self.outconv = nn.Conv2d(64, n_class, kernel_size=1) # 640x640xn_class

    def forward(self, x):
        # Encoder
        xe11 = F.relu(self.e11(x)) # 640x640x64
        xe12 = F.relu(self.e12(xe11)) # 640x640x64
        xp1 = self.pool1(xe12) # 320x320x64

        xe21 = F.relu(self.e21(xp1)) # 320x320x128
        xe22 = F.relu(self.e22(xe21)) # 320x320x128
        xp2 = self.pool2(xe22)  # 160x160x128

        xe31 = F.relu(self.e31(xp2)) # 160x160x256
        xe32 = F.relu(self.e32(xe31)) # 160x160x256
        xp3 = self.pool3(xe32) # 80x80x256

        xe41 = F.relu(self.e41(xp3)) # 80x80x512
        xe42 = F.relu(self.e42(xe41)) # 80x80x512
        xp4 = self.pool4(xe42) # 40x40x512

        xe51 = F.relu(self.e51(xp4)) # 40x40x1024
        xe52 = F.relu(self.e52(xe51)) # 40x40x1024
        
        # Decoder
        xu1 = self.upconv1(xe52) # 80x80x512
        xu11 = torch.cat([xu1, xe42], dim=1) # 80x80x1024
        xd11 = F.relu(self.d11(xu11)) # 80x80x512
        xd12 = F.relu(self.d12(xd11)) # 80x80x512

        xu2 = self.upconv2(xd12) # 160x160x256
        xu22 = torch.cat([xu2, xe32], dim=1) # 160x160x512
        xd21 = F.relu(self.d21(xu22)) # 160x160x256
        xd22 = F.relu(self.d22(xd21)) # 160x160x256

        xu3 = self.upconv3(xd22) # 320x320x128
        xu33 = torch.cat([xu3, xe22], dim=1) # 320x320x256
        xd31 = F.relu(self.d31(xu33)) # 320x320x128
        xd32 = F.relu(self.d32(xd31)) # 320x320x128

        xu4 = self.upconv4(xd32) # 640x640x64
        xu44 = torch.cat([xu4, xe12], dim=1) # 640x640x128
        xd41 = F.relu(self.d41(xu44)) # 640x640x64
        xd42 = F.relu(self.d42(xd41)) # 640x640x64

        # Output layer
        out = self.outconv(xd42) # 640x640xn_class

        return out

### 1.1 Зададим фейковые данные для проверки корректности размерностей

In [5]:
# различные параметры для создания фейковых батчей данных
# стандартное определение девайса
# DEVICE = torch.device(
#     'cuda' if torch.cuda.is_available() else
#     'mps' if torch.backends.mps.is_available() else
#     'cpu'
# )
DEVICE = 'cpu'

# батчсайз определен 8 - больше не пролезало для теста на компуктере
BATCH_SIZE = 4

# каналов = 3 потому что цветное изображение
CHANNELS = 3

# высота изображения 640 - надо ресайзить к этому
HEIGHT = 640

# ширина изображения 640 - надо ресайзить к этому
WIDTH = 640

# количество классов в датасете, т.к. лес рисуется белым, а не лес - черным 
# всего два класса, лес и не лес
NUM_CLASSES = 1

# создание фейковых наборов данных
fake_pic = torch.randn(BATCH_SIZE, CHANNELS, HEIGHT, WIDTH, device=DEVICE)
# fake_label = torch.randint(0, NUM_CLASSES, (BATCH_SIZE,), device=DEVICE)
# fake_bbox = torch.rand(BATCH_SIZE, 4, device=DEVICE)

# # создание общего фейкового батча
# fake_batch = (fake_pic, fake_label, fake_bbox)

# # примечание 
# # попробуем пока что передавать просто fake_pic, т.к. у нас модель принимает
# # в функции forward только картинку (один тип элементов)

### 1.2 Проверим модель на корректность размерностей на всех этапах

In [6]:
# создадим экземпляр класса модели
model = UNet(n_class=NUM_CLASSES)
# и отправим ее сразу на девайс
model.to(DEVICE)

UNet(
  (e11): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (e12): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (e21): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (e22): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (e31): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (e32): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (e41): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (e42): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (e51): Conv2d(512, 1024, kernel_size=(

In [7]:
# отправим в модель фейковую пачку данных и посмотрим результат
tu.get_model_summary(model, fake_pic)

Layer              Kernel               Output          Params               FLOPs
0_e11             [3, 64, 3, 3]    [4, 64, 640, 640]       1,792     2,936,012,800
1_e12            [64, 64, 3, 3]    [4, 64, 640, 640]      36,928    60,502,835,200
2_pool1                       -    [4, 64, 320, 320]           0                 0
3_e21           [64, 128, 3, 3]   [4, 128, 320, 320]      73,856    30,251,417,600
4_e22          [128, 128, 3, 3]   [4, 128, 320, 320]     147,584    60,450,406,400
5_pool2                       -   [4, 128, 160, 160]           0                 0
6_e31          [128, 256, 3, 3]   [4, 256, 160, 160]     295,168    30,225,203,200
7_e32          [256, 256, 3, 3]   [4, 256, 160, 160]     590,080    60,424,192,000
8_pool3                       -     [4, 256, 80, 80]           0                 0
9_e41          [256, 512, 3, 3]     [4, 512, 80, 80]   1,180,160    30,212,096,000
10_e42         [512, 512, 3, 3]     [4, 512, 80, 80]   2,359,808    60,411,084,800
11_p

## 2. Зададим оптимизатор и определим loss-функцию

In [8]:
# так как бинарная классификация, используем бинарную кросс-энтропию
# используем на логитах, потому что в архитектуре на выходном слое сети 
# отсутствует функция активации sigmoid, а обычный BCELoss() ожидает на вход
# диапазон чисел от 0 до 1. 
# Если брать BCEWithLogitsLoss() то там уже под капотом применяется сигмоида
criterion = nn.BCEWithLogitsLoss()

# оптимизатор - используем мессию
optimizer = optim.Adam(model.parameters(), lr=0.001)

## 3. Получим и подготовим данные для обучения модели

In [9]:
# подгрузим с помощью апи кагла

dataset_dir = '../data'
zip_file = '../data/augmented-forest-segmentation.zip'
train_img_dir = '../data/unet-for-train/images/train'
valid_img_dir = '../data/unet-for-train/images/valid'
train_mask_dir = '../data/unet-for-train/masks/train'
valid_mask_dir = '../data/unet-for-train/masks/valid'

if not os.path.exists(dataset_dir):
    # загрузим датасет с Kaggle
    !kaggle datasets download -d quadeer15sh/augmented-forest-segmentation -p {dataset_dir}/

    # распакуем zip-файл с помощью команды unzip
    !unzip -qq {zip_file} -d {dataset_dir}/

    # удалим zip-файл после распаковки
    !rm {zip_file}

    print(f'Датасет успешно скачан и распакован в папку {dataset_dir}')
else:
    print(f'Директория {dataset_dir} уже существует. Пропуск скачивания.')

Директория ../data уже существует. Пропуск скачивания.


In [10]:
# добавим необходимые импорты для работы с данными
import pandas as pd
from sklearn.model_selection import train_test_split
from PIL import Image

In [11]:
# Определяем директорию с данными
data_dir = '../data'

# Путь к CSV-файлу
csv_file = os.path.join(data_dir, 'meta_data.csv')

# Читаем CSV-файл в DataFrame
df = pd.read_csv(csv_file)

# Выводим первые несколько строк DataFrame
df.head()

Unnamed: 0,image,mask
0,10452_sat_08.jpg,10452_mask_08.jpg
1,10452_sat_18.jpg,10452_mask_18.jpg
2,111335_sat_00.jpg,111335_mask_00.jpg
3,111335_sat_01.jpg,111335_mask_01.jpg
4,111335_sat_02.jpg,111335_mask_02.jpg


In [12]:
# Разделяем DataFrame на обучающую и валидационную выборки (80% и 20% соответственно)
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

# Сбрасываем индексы в DataFrame
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

# Выводим количество примеров в каждой выборке
print(f'Обучающая выборка: {len(train_df)} изображений с масками')
print(f'Валидационная выборка: {len(val_df)} изображений с масками')

Обучающая выборка: 4086 изображений с масками
Валидационная выборка: 1022 изображений с масками


### 3.1 Определение класса Dataset для загрузки изображений и масок

In [13]:
class SegmentationDataset(Dataset):
    def __init__(self, df, images_dir, masks_dir):
        self.df = df
        self.images_dir = images_dir
        self.masks_dir = masks_dir

        self.image_transform = transforms.Compose([
            transforms.Resize((640, 640)),
            transforms.ToTensor()  # Масштабирует значения в [0.0, 1.0]
        ])

        self.mask_transform = transforms.Compose([
            transforms.Resize((640, 640)),
            transforms.ToTensor()  # Масштабирует значения в [0.0, 1.0]
        ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = self.df.loc[idx, 'image']
        mask_name = self.df.loc[idx, 'mask']
        img_path = os.path.join(self.images_dir, img_name)
        mask_path = os.path.join(self.masks_dir, mask_name)

        image = Image.open(img_path).convert('RGB')
        mask = Image.open(mask_path).convert('L')

        image = self.image_transform(image)
        mask = self.mask_transform(mask)

        # Преобразуем маску в бинарный формат
        mask = (mask > 0).float()

        return image, mask

### 3.2 Создание объектов DataSet

In [14]:
images_dir = os.path.join(data_dir, 'Forest_Segmented/Forest_Segmented/images')
masks_dir = os.path.join(data_dir, 'Forest_Segmented/Forest_Segmented/masks')

train_dataset = SegmentationDataset(train_df, images_dir, masks_dir)
val_dataset = SegmentationDataset(val_df, images_dir, masks_dir)

### 3.3 Создание объектов DataLoader

In [15]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)

## 4. Напишем цикл обучения модели

In [16]:
def fit(
    model: nn.Module,
    n_epochs: int,
    optimizer: torch.optim.Optimizer,
    train_loader: DataLoader,
    valid_loader: DataLoader,
    log=None
) -> dict:
    
    # если не приняли с вызовом функции нужный словарь, то инициализируем его
    if log is None:
        log = dict()
        log['epoch_train_loss'] = []
        log['epoch_valid_loss'] = []
        log['epoch_train_iou'] = []
        log['epoch_valid_iou'] = []
    
    # определяем текущую эпоху обучения
    start_epoch = len(log['epoch_train_loss'])
    # и запускаем цикл обучения
    for epoch in range(start_epoch+1, start_epoch+n_epochs+1):
        print(f'{"-"*47} Epoch {epoch} {"-"*47}')
        
        batch_loss = []
        batch_iou = []
        
        model.train()
        
        for images, masks in train_loader:
            images = images.to(DEVICE)
            masks = masks.to(DEVICE)
            
            logits = model(images)
            
            loss = criterion(logits, masks)
            
            # Применяем sigmoid и бинаризуем прогнозы
            preds = torch.sigmoid(logits)
            preds = (preds > 0.5).float()
            iou = jaccard_index(preds, masks,task='binary')
            
            batch_loss.append(loss.item())
            batch_iou.append(iou.item())
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        log['epoch_train_loss'].append(np.mean(batch_loss))
        log['epoch_train_iou'].append(np.mean(batch_iou))
        
        batch_loss = []
        batch_iou = []
        
        model.eval()
        
        for images, masks in valid_loader:
            images = images.to(DEVICE)
            masks = masks.to(DEVICE)
            
            with torch.inference_mode():
                logits = model(images)
            
            loss = criterion(logits, masks)
            
            # Применяем sigmoid и бинаризуем прогнозы
            preds = torch.sigmoid(logits)
            preds = (preds > 0.5).float()
            iou = jaccard_index(preds, masks,task='binary')
           
            batch_loss.append(loss.item())
            batch_iou.append(iou.item())
        
        log['epoch_valid_loss'].append(np.mean(batch_loss))
        log['epoch_valid_iou'].append(np.mean(batch_iou))
        
        print(f"Train stage: loss: {log['epoch_train_loss'][-1]:.3f}, iou: {log['epoch_train_iou'][-1]:.3f}")
        print(f"Valid stage: loss: {log['epoch_valid_loss'][-1]:.3f}, iou: {log['epoch_valid_iou'][-1]:.3f}")
        
        # Отображение результатов после каждой эпохи
        # Получаем один батч из валидационного загрузчика
        images, masks = next(iter(valid_loader))
        images = images.to(DEVICE)
        masks = masks.to(DEVICE)
        
        with torch.inference_mode():
            logits = model(images)
            preds = torch.sigmoid(logits)
            preds = (preds > 0.5).float()
        
        # Случайный индекс картинки в батче
        random_img_index = np.random.randint(0, images.size(0))
        
        # Подготовка данных для отображения
        img = images[random_img_index].cpu().permute(1, 2, 0).numpy()
        img = (img * 255).astype(np.uint8)
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
        
        pred_mask = preds[random_img_index][0].cpu().numpy()
        true_mask = masks[random_img_index][0].cpu().numpy()
        
        # Отображение изображений
        fig, axs = plt.subplots(1, 3, figsize=(15, 5))
        axs[0].imshow(img)
        axs[0].set_title('Original Image')
        axs[0].axis('off')
        
        axs[1].imshow(pred_mask, cmap='gray')
        axs[1].set_title('Predicted Mask')
        axs[1].axis('off')
        
        axs[2].imshow(true_mask, cmap='gray')
        axs[2].set_title('True Mask')
        axs[2].axis('off')
        
        plt.show()
        
    return log

In [17]:
first_log = fit(
    model=model,
    n_epochs=1,
    optimizer=optimizer,
    train_loader=train_loader,
    valid_loader=valid_loader
)

----------------------------------------------- Epoch 1 -----------------------------------------------
