# Start and settings

In [15]:
# Техно-библиотеки без которых никуда
import os
from IPython.display import clear_output
from tqdm.notebook import tqdm, trange
import numpy as np

# Компьютерное зрение и визуализация
%matplotlib inline
import matplotlib.pyplot as plt
import cv2
import albumentations as A

# Нейросетки
import torch
from torchvision import models
from torch import nn

# Диск
from google.colab import drive
drive.mount('/content/drive')

# Сажаем на свой диск
%cd '/content/drive/MyDrive/DataScience[Практика]/Цифровой прорыв/'

# DEVICE = device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'
classifier_weights = 'checkpoints/ResNet101-best.pth'
detector_weights = 'checkpoints/bear_detect_model.pth'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/DataScience[Практика]/Цифровой прорыв


In [16]:
def to_torch_image(image):
    image = np.moveaxis(image, -1, 0)
    return torch.from_numpy(image)

def to_numpy_image(image):
    image = np.array(image)
    image = np.moveaxis(image, 0, -1)
    return image

def draw_bboxes(image, bboxes, thickness=5):
    for bear in bboxes:
        x1, y1, x2, y2 = bear
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), thickness)
    return image

normalize = A.Compose([A.RandomCrop(224, 224), 
                       A.Normalize()], bbox_params={'format':'pascal_voc'})

# Classificator

In [17]:
class ResNet(nn.Module):
    def __init__(self):
        super().__init__()

        model = models.resnet101(pretrained=True)
        for param in model.parameters():
            param.requires_grad = False
        for param in model.layer4.parameters():
            param.requires_grad = True
        for param in model.fc.parameters():
            param.requires_grad = True

        self.model = model
        self.end_layer = nn.Sequential(
            nn.Linear(1000, 128),
            nn.Linear(128, 1)
        )

    def forward(self, X):
        return self.end_layer(self.model(X))
    
    def __call__(self, X):
        return self.forward(X)

classifier = ResNet().to(device)
classifier.load_state_dict(torch.load(classifier_weights))

<All keys matched successfully>

# Detector

In [18]:
!pip install pyyaml==5.1
# workaround: install old version of pytorch since detectron2 hasn't released packages for pytorch 1.9 (issue: https://github.com/facebookresearch/detectron2/issues/3158)
!pip install torch==1.8.0+cu101 torchvision==0.9.0+cu101 -f https://download.pytorch.org/whl/torch_stable.html

# install detectron2 that matches pytorch 1.8
# See https://detectron2.readthedocs.io/tutorials/install.html for instructions
!pip install detectron2 -f https://dl.fbaipublicfiles.com/detectron2/wheels/cu101/torch1.8/index.html
# exit(0)  # After installation, you need to "restart runtime" in Colab. This line can also restart runtime
import torch, torchvision
print(torch.__version__, torch.cuda.is_available())
assert torch.__version__.startswith("1.8")   # please manually install torch 1.8 if Colab changes its default version
# Some basic setup:
# Setup detectron2 logger
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()
clear_output()
# import some common libraries
import numpy as np
import os, json, cv2, random
from google.colab.patches import cv2_imshow

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.engine import DefaultTrainer
clear_output()

cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")  # Let training initialize from model zoo
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1  # only has one class (ballon). (see https://detectron2.readthedocs.io/tutorials/datasets.html#update-the-config-for-new-datasets)
cfg.MODEL.WEIGHTS = detector_weights  # path to the model we just trained
cfg.MODEL.DEVICE = 'cpu'
detector = DefaultPredictor(cfg)

# Data prepairing

In [21]:
class Data:
    def __init__(self, data):
        self.data = data
        self.length = len(data)

    def __getitem__(self, i):
        if isinstance(self.data, str): # Путь до картнки
            image = cv2.imread(self.data[i])
        elif isinstance(self.data, torch.Tensor):
            if len(self.data.shape) == 3: # Картинка
                image = to_numpy_image(self.data[i])
            elif len(self.data.shape) == 4: # Батч картинок
                image = to_numpy_image(self.data[i])
        elif isinstance(self.data, list):
            if isinstance(self.data[i], str):
                image = cv2.imread(self.data[i]) # Если путь
            else:
                image = self.data[i]
        else:
            image = self.data[i]
        return image
    
    def __len__(self):
        return self.length
    
    def __iter__(self):
        self.n = 0
        return self
    
    def __next__(self):
        x = self.__getitem__(self.n)
        self.n += 1
        if self.n < self.length:
            return x
        raise StopIteration


class Cutter:
    def __init__(self, data, size=224, step_size=200):
        self.size = size
        self.step_size = step_size
        self.data = data
        self.length = len(self.data)
    
    def __getitem__(self, i):
        image = self.data[i]
        images = []
        for r in range(0, image.shape[0], self.step_size):
            for c in range(0, image.shape[1], self.step_size):
                new_image = image[r:r + self.size, c:c + self.size, :]
                if new_image.shape == (self.size, self.size, 3):
                    images.append([new_image, [c, r, c, r]])
        return images
    
    def __len__(self):
        return self.length

    def __iter__(self):
        self.n = 0
        return self
    
    def __next__(self):
        x = self.__getitem__(self.n)
        self.n += 1
        if self.n < self.length:
            return x
        raise StopIteration

In [22]:
class Predictor:
    def __init__(self, model, raw_data):
        data = Data(raw_data)
        self.model = model
        self.cutter = Cutter(data)
        self.length = len(self.cutter)

    def __getitem__(self, i):
        self.model.eval()
        images =  self.cutter[i]
        for cutted_image in tqdm(images):
            image, coords = cutted_image
            preds = self.model(image)
            if not isinstance(preds, list):
                output = preds["instances"][preds['instances'].get_fields()['scores'] > 0.9]
                preds = list(list(output.get_fields()['pred_boxes']))
                preds = [list(map(lambda x: x.item(), list(preds[i]))) for i in range(len(preds))]
                coords = np.array(coords)
                coords = np.array([int(coords[1]), int(coords[0]),
                                int(coords[1]), int(coords[0])])
                preds = np.array(preds, dtype=np.int)

            if len(preds) > 0:
                preds = preds + coords
        
    def __len__(self):
        return self.length
    
    def __iter__(self):
        self.n = 0
        return self
    
    def __next__(self):
        x = self.__getitem__(self.n)
        self.n += 1
        if self.n < self.length:
            return x
        raise StopIteration

# BearCatcher

In [23]:
class BearCatcher(nn.Module):
    def __init__(self, classifier, detector):
        super().__init__()
        self.classifier = classifier
        self.detector = detector
    
    def forward(self, X):
        detection = []
        normed = normalize(image=X, bboxes=[])['image']
        z = to_torch_image(normed)
        z = z.unsqueeze(0).to(device)
        z = self.classifier(z)
        if not (torch.sigmoid(z) > 0.95):
            return detection
        X = np.array(X, dtype=np.uint8)
        detection = self.detector(X)
        return detection


    def __call__(self, X):
        return self.forward(X)

# Playground

In [24]:
model = BearCatcher(classifier, detector).to(device)
clear_output()

In [19]:
images = [cv2.imread('data/images/_2016-05-12 14-20-10_2145_2R.JPG')]

In [20]:
predictor = Predictor(model, images)
for image in predictor:
    print(image.shape)

HBox(children=(FloatProgress(value=0.0, max=864.0), HTML(value='')))




In [26]:
torch.save(model.state_dict(), 'checkpoints/BearCatcher.pth')

In [25]:
import pickle
pickle.dump(model, open('checkpoints/BearCatcher.pkl', 'wb'))