##Установка пакетов и импорт библиотек

In [None]:
!pip install torchmetrics[detection]

Collecting torchmetrics[detection]
  Downloading torchmetrics-1.2.0-py3-none-any.whl (805 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m805.2/805.2 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from torchmetrics[detection])
  Downloading lightning_utilities-0.9.0-py3-none-any.whl (23 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.9.0 torchmetrics-1.2.0


In [None]:
from torchmetrics.detection import MeanAveragePrecision
import os
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from pycocotools.coco import COCO
import time
from tqdm import tqdm
from PIL import Image
import warnings
warnings.filterwarnings("ignore")

import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, models
import torchvision.transforms.v2 as transforms

from torchvision.utils import draw_bounding_boxes
from torchvision.io import read_image, ImageReadMode

from torchvision.models.detection.ssd import SSDClassificationHead
from torchvision.models.detection import _utils
from torchvision.models.detection import SSD300_VGG16_Weights
from torch import nn

plt.ion()

<contextlib.ExitStack at 0x7d07b81bdea0>

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cpu device


## Глобальные переменные и пути к данным

In [None]:
TRAIN_SIZE = .75
BATCH_SIZE = 8

dataDir = 'drive/MyDrive/Colab Notebooks/samolet/data/train/images/'

modelDir = 'drive/MyDrive/Colab Notebooks/samolet/data/train/model/'
dataType = 'default'
annFile = '{}annotations/instances_{}.json'.format(dataDir,dataType)
ann_path = 'drive/MyDrive/Colab Notebooks/samolet/data/train/images/annotations/instances_default.json'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Загрузка аннотаций из Coco датасета

In [None]:
coco = COCO(annFile)
cats = coco.loadCats(coco.getCatIds())
nms=[cat['name'] for cat in cats]
print('COCO categories: \n{}\n'.format(' '.join(nms)))

nms = set([cat['supercategory'] for cat in cats])

loading annotations into memory...
Done (t=4.63s)
creating index...
index created!
COCO categories: 
window empty filled



## Трансформации и аугментации



In [None]:
transformer = transforms.Compose([
    transforms.RandomHorizontalFlip(0.5),
    transforms.ToTensor(),
    #transforms.Resize((320,320)),
    transforms.RandomPerspective(distortion_scale=0.1, p=0.5),
    #transforms.RandomPerspective(distortion_scale=0.25, p=0.5),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    #transforms.RandomPhotometricDistort()
    ])

## Класс датасета

In [None]:
class BuildingsDataset(torch.utils.data.Dataset):
    def __init__(self, root, coco, transform=None):
        self.root = root
        self.transforms = transform
        self.coco = coco
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = self.coco.loadAnns(ann_ids)

        # open the input image
        path = self.coco.loadImgs(img_id)[0]["file_name"]
        img = Image.open(os.path.join(self.root, path))

        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor([ann["category_id"] for ann in coco_annotation], dtype=torch.int64) - 1

        # Annotation is in dictionary format
        new_annotation = {}
        new_annotation["boxes"] = boxes
        new_annotation["labels"] = labels

        if self.transforms is not None:
            img = self.transforms(img)

        return img, new_annotation

    def __len__(self):
        return len(self.ids)

imgs = BuildingsDataset(root=dataDir, coco=coco, transform=transformer)

## Загрузчики

In [None]:
train_size = int(TRAIN_SIZE * len(imgs))
valid_size = len(imgs) - train_size
generator = torch.Generator().manual_seed(2109)
train_dataset, valid_dataset = torch.utils.data.random_split(imgs, [train_size, valid_size], generator=generator)
dataset_list = {'train': train_dataset, 'val': valid_dataset}
dataset_sizes = {'train': train_size, 'val': valid_size}
dataloaders = {x: torch.utils.data.DataLoader(dataset_list[x], batch_size=BATCH_SIZE,
                                             shuffle=True, num_workers=0, collate_fn=lambda batch: tuple(zip(*batch))) for x in ['train', 'val']}


## Загрузка и кастомизация модели
**Подходят любые модели, которые понимают вот такой формат данных на входе:**

```
        boxes (FloatTensor[N, 4]): the ground-truth boxes in [x1, y1, x2, y2] format, with 0 <= x1 < x2 <= W and 0 <= y1 < y2 <= H.

        labels (Int64Tensor[N]): the class label for each ground-truth box

```

**Faster-RCNN**

In [None]:
#from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
#def create_model(num_classes):
#    model = models.detection.fasterrcnn_resnet50_fpn(weights=None, weights_backbone=None)

#    model.roi_heads.box_predictor.cls_score = nn.Linear(1024,num_classes)

#    return model

**SSD300-VGG16**

In [None]:
#def create_model(num_classes=91, size=300):
#    model = models.get_model("ssd300_vgg16", weights=SSD300_VGG16_Weights.COCO_V1)
#    in_channels = _utils.retrieve_out_channels(model.backbone, (size, size))
#    num_anchors = model.anchor_generator.num_anchors_per_location()
#    model.head.classification_head = SSDClassificationHead(
#        in_channels=in_channels,
#        num_anchors=num_anchors,
#        num_classes=num_classes,
#    )
    # Image size for transforms.
#    model.transform.min_size = (size,)
#    model.transform.max_size = size
#    return model

#model = create_model(3, 300)
#print(model)

**SSD-Mobilenet**

In [None]:
model = models.detection.ssdlite320_mobilenet_v3_large(num_classes=3)

Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_large-8738ca79.pth
100%|██████████| 21.1M/21.1M [00:00<00:00, 51.6MB/s]


In [None]:
#inp = read_image(dataDir+'0000000154building.jpg').to(torch.float32)
#predictions = model.eval()([inp, ])

## Вспомогательные классы для осреднения результата и сохранения лучшей модели

In [None]:
class Averager:
    def __init__(self):
      self.current_total = 0.0
      self.iterations = 0.0

    def send(self, value):
      self.current_total += value
      self.iterations += 1

    @property
    def value(self):
      if self.iterations == 0:
        return 0
      else:
        return 1.0 * self.current_total / self.iterations

    def reset(self):
      self.current_total = 0.0
      self.iterations = 0.0

class SaveBestModel:
    """
    Class to save the best model while training. If the current epoch's
    validation loss is less than the previous least less, then save the
    model state.
    """
    def __init__(
        self, best_valid_loss=float('inf')
    ):
        self.best_valid_loss = best_valid_loss

    def __call__(
        self, current_valid_loss,
        epoch, model, optimizer
    ):
      if current_valid_loss < self.best_valid_loss:
        self.best_valid_loss = current_valid_loss
        print(f"\nBest validation loss: {self.best_valid_loss}")
        print(f"\nSaving best model for epoch: {epoch+1}\n")
        torch.save(model.state_dict(), modelDir + 'mobile_net.pth')

## Функции для обучения и валидации

In [None]:
def train(train_data_loader, model):
  model.train()
  print('Training')
  global train_itr
  global train_loss_list

     # initialize tqdm progress bar
  prog_bar = tqdm(train_data_loader, total=len(train_data_loader))

  for i, data in enumerate(prog_bar):
    optimizer.zero_grad()
    images, targets = data

    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
    loss_dict = model(images, targets)
    losses = sum(loss for loss in loss_dict.values())
    loss_value = losses.item()
    train_loss_list.append(loss_value)
    train_loss_hist.send(loss_value)
    losses.backward()
    optimizer.step()
    train_itr += 1

        # update the loss value beside the progress bar for each iteration
    prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
  return train_loss_list

In [None]:
def validate(valid_data_loader, model):
  print('Validating')
  global val_itr
  global val_loss_list
  #global map_list

    # initialize tqdm progress bar
  prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))

  for i, data in enumerate(prog_bar):
    images, targets = data

    images = list(image.to(device) for image in images)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    with torch.no_grad():
      model.train()
      loss_dict = model(images, targets)
      metric = MeanAveragePrecision(iou_type="bbox")
      model.eval()
      output = model(images, targets)
      metric.update(output, targets)
      map_50 = metric.compute()['map_50']

    losses = sum(loss for loss in loss_dict.values())
    loss_value = losses.item()
    val_loss_list.append(loss_value)
    #map_list.append(map_50)
    #map_hist.send(map_50)
    val_loss_hist.send(loss_value)
    val_itr += 1
    # update the loss value beside the progress bar for each iteration
    prog_bar.set_description(desc=f"Loss: {loss_value:.4f}---mAP50: {map_50:.4f}")
  return val_loss_list

## Обучение

In [None]:
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
#exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
model = model.to(device)
train_loss_hist = Averager()
val_loss_hist = Averager()
#map_hist = Averager()
train_itr = 1
val_itr = 1
# train and validation loss lists to store loss values of all...
# ... iterations till ena and plot graphs for all iterations
train_loss_list = []
val_loss_list = []
#map_list = []
save_best_model = SaveBestModel()
# name to save the trained model with
NUM_EPOCHS = 50
# start the training epochs
for epoch in range(NUM_EPOCHS):
  print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")
  # reset the training and validation loss histories for the current epoch
  train_loss_hist.reset()
  val_loss_hist.reset()
  # start timer and carry out training and validation
  start = time.time()
  train_loss = train(dataloaders['train'], model)
  val_loss = validate(dataloaders['val'], model)
  print(f"Epoch #{epoch+1} train loss: {train_loss_hist.value:.3f}")
  print(f"Epoch #{epoch+1} validation loss: {val_loss_hist.value:.3f}")
  end = time.time()
  print(f"Took {((end - start) / 60):.3f} minutes for epoch {epoch+1}")
  # save the best model till now if we have the least loss in the...
  # ... current epoch
  save_best_model(
            val_loss_hist.value, epoch, model, optimizer
        )

## Загрузка весов лучшей из моделей

In [None]:
best_model_params_path = os.path.join(modelDir, 'mobile_net.pth')
trained_model = model
trained_model.load_state_dict(torch.load(best_model_params_path, map_location=torch.device('cpu')))

<All keys matched successfully>

## Функция для рисования картинок и границ объектов

In [None]:
def draw(path, model):
  image = read_image(path).to(torch.float32)
  with torch.no_grad():
    x = image.to(device)
    predictions = model.eval()([x, ])
  pred = predictions[0]
  image = (255.0 * (image - image.min()) / (image.max() - image.min())).to(torch.uint8)
  #pred_score = [f"value: {score:.3f}" for score in pred["scores"]]
  #pred_label = [f"value: {label}" for label in pred["labels"]]
  pred_boxes = pred["boxes"].long()
  output_image = draw_bounding_boxes(image, pred_boxes, fill=True, width=3)

  plt.figure(figsize=(12, 12))
  plt.imshow(output_image.permute(1, 2, 0))
  plt.show()

## Проверка на тестовых изображениях

In [None]:
testDir = 'drive/MyDrive/Colab Notebooks/samolet/data/test/'
tests = []
for filename in os.listdir(testDir):
  tests.append(filename)

for i in np.random.randint(0, len(tests), 3):
  draw(testDir + tests[i], trained_model)