In [None]:
import torch 
from torch import optim, nn
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm
from tensorflow import keras
from maskrcnn_dataset import MaskRCNNDataset
import torchvision
import numpy as np
from torchvision.models.detection import MaskRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
import torchvision.transforms as T
from PIL import Image
import os
from torchvision import transforms
from engine import train_one_epoch, evaluate
import utils

%matplotlib inline
import matplotlib.pyplot as plt

import cv2
import random
import warnings
warnings.filterwarnings('ignore')

## Создание модели

In [2]:
num_classes = 2 # background / rats
device = "cuda" if torch.cuda.is_available() else "cpu"

def build_model(num_classes):
    # load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

    # get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # Stop here if you are fine-tunning Faster-RCNN

    # now get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    # and replace the mask predictor with a new one
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                       hidden_layer,
                                                       num_classes)

    return model

model = build_model(num_classes)
model = model.to(device)

## Функции для предсказаний

In [3]:
def make_prediction(model, img):
    transform = T.Compose([T.ToTensor()])
    img = transform(img)

    img = img.to(device)
    pred = model([img])
    mask = (pred[0]['masks']>0.5).squeeze().detach().cpu().numpy()
    if len(mask.shape) == 2:
       mask = torch.tensor([mask])
    if len(mask.shape) > 2:
      mask = mask[:1]
    return mask

In [4]:
def change_mask_format(mask):
    # (1, 544, 576) -> (544, 576, 1)
    result = np.zeros((544, 576, 1), dtype=float)
    for x in range(len(mask[0])):
        for y in range(len(mask[0][x])):
            result[x][y] = [float(mask[0][x][y])]
    return result

In [5]:
def upsize_mask(mask):
    size = 1
    for i in range(mask.shape[0]):
        for j in range(mask.shape[1]):
            if mask[i, j] == 1:
                for x in range(max(0, i - size), min(mask.shape[0], i + size + 1)):
                    for y in range(max(0, j - size), min(mask.shape[1], j + size + 1)):
                        if np.sqrt((x - i)**2 + (y - j)**2) < size:
                            mask[x, y] = 0.5
    mask[mask == 0.5] = 1
    return mask

## Загрузка обученной модели
#### Если надо сразу сегментировать видео без повтороного обчуения модели на данных, то после этого раздела надо сразу перейти в конец к разделу "Сегментация видео"

In [6]:
model.load_state_dict(torch.load('resized1_maskrcnn_lr=0.0001_ep=70.pth',map_location=torch.device('cuda')))
model = model.to(device)

## Обучение и валидация модели

In [5]:
LEARNING_RATE = 1e-4
BATCH_SIZE = 8
EPOCHS = 70
DATA_PATH = "data"
MODEL_SAVE_PATH = f"resized1_maskrcnn_lr={LEARNING_RATE}_ep={EPOCHS}.pth"

train_dataset = MaskRCNNDataset(DATA_PATH)

generator = torch.Generator().manual_seed(42)
train_dataset, val_dataset = random_split(train_dataset, [0.9, 0.1], generator=generator)

train_dataloader = DataLoader(dataset=train_dataset,
                            batch_size=BATCH_SIZE,
                            shuffle=True,
                            collate_fn=utils.collate_fn)
val_dataloader = DataLoader(dataset=val_dataset,
                            batch_size=BATCH_SIZE,
                            shuffle=True,
                            collate_fn=utils.collate_fn)

In [13]:
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=5,
                                               gamma=0.7)

In [None]:
for epoch in range(EPOCHS):
    train_one_epoch(model, optimizer, train_dataloader, device, epoch, print_freq=4)
    lr_scheduler.step()
    evaluate(model, val_dataloader, device=device)
torch.save(model.state_dict(), MODEL_SAVE_PATH)

In [22]:
def validate(model, img_path):
    model.eval()
    images = os.listdir(img_path)
    recalls = []
    precisions = []
    for img_name in images:
        image = Image.open('data/manual_test/' + img_name)
        orig_mask = cv2.imread('data/manual_test_masks/' + img_name)
        orig_mask = cv2.cvtColor(orig_mask, cv2.COLOR_BGR2RGB)
        orig_mask = orig_mask / 255
        orig_mask = orig_mask[:,:,:1]
        
        pred_mask = change_mask_format(make_prediction(model, image))
        
        recall = keras.metrics.Recall()
        precision = keras.metrics.Precision()
        recalls += [recall(y_true=orig_mask, y_pred=pred_mask)]
        precisions += [precision(y_true=orig_mask, y_pred=pred_mask)]
    print(f'Mean recall: {np.mean(recalls)}')
    print(f'Mean precision: {np.mean(precisions)}')

In [23]:
validate(model, 'data/manual_test')

Mean recall: 0.9996873736381531
Mean precision: 0.8088012337684631


## Сегментация видео

In [7]:
from moviepy.editor import VideoFileClip

In [8]:
def for_video(frame):
    result = np.zeros((frame.shape[0], frame.shape[1], 3), dtype=int)
    for i in range(frame.shape[0]):
        for j in range(frame.shape[1]):
            result[i][j] = [int(255 * frame[i][j][0]), int(255 * frame[i][j][0]), int(255 *frame[i][j][0])]
    return result.astype('uint8')

In [10]:
def predict_on_video(video_file_path, output_file_path):
    video_reader = cv2.VideoCapture(video_file_path)
    width = 576
    height = 544
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc(*"MP4V"),
                                   video_reader.get(cv2.CAP_PROP_FPS), (width, height))
    model.eval()
    idx = 0
    while video_reader.isOpened():
        ok, frame = video_reader.read()
        if not ok:
            break
        if idx % 100 == 0:
            print(idx)
        predicted_frame = for_video(change_mask_format(make_prediction(model, frame)))
        idx += 1
        video_writer.write(predicted_frame)
    video_reader.release()
    video_writer.release()

In [11]:
input_video_file_path = 'OFT_TBI_01$000159&03_.m4v'
output_video_file_path = 'predicted_' + input_video_file_path
predict_on_video(input_video_file_path, output_video_file_path)

0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
2300
2400
2500
2600
2700
2800
2900
3000
3100
3200
3300
3400
3500
3600
3700
3800
3900
4000
4100
4200
4300
4400


In [12]:
VideoFileClip(output_video_file_path).ipython_display(maxduration=180)

Moviepy - Building video __temp__.mp4.
Moviepy - Writing video __temp__.mp4



                                                                  

Moviepy - Done !
Moviepy - video ready __temp__.mp4
