# Подготовка тестируемого видео для детекции на нем светофоров

In [1]:
import cv2 as cv
import matplotlib.pyplot as plt
import numpy as np
import torch
import os
import pandas as pd
from torchvision import transforms
import torchvision

In [2]:
cap = cv.VideoCapture("phase_1/video_0.MP4") # Вывод с видео файла
length = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
width  = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
print(length)
print(width)
print(height)

1203
1280
720


## Создание генератора датасета

### Создание класса датасета

In [442]:
class LISADataset(object):
    def __init__(self):
        # упорядоченный список названий кадров из одной папки (пока что)
        self.imgs = list(sorted(os.listdir(os.path.join("LISA\dayTrain\dayTrain\dayClip1", "frames"))))
        # загрузка датасета аннотаций для bounding box'ов
        self.df = pd.read_csv("LISA\Annotations\Annotations\dayTrain\dayClip1\\frameAnnotationsBOX.csv", sep = ';')

    def __getitem__(self, idx):
        # load images
        img_path = os.path.join("LISA\dayTrain\dayTrain\dayClip1", "frames", self.imgs[idx])
        img = cv.imread(img_path)
        img = cv.cvtColor(img, cv.COLOR_BGR2RGB)

        # get bounding box coordinates for each mask
        num_objs = len(self.df[self.df["Origin frame number"]==idx])
        boxes = []
        for i in range(num_objs):
            x_left = list(self.df[self.df["Origin frame number"]==idx]["Upper left corner X"])[i]
            x_right = list(self.df[self.df["Origin frame number"]==idx]["Lower right corner X"])[i]
            y_left = list(self.df[self.df["Origin frame number"]==idx]["Upper left corner Y"])[i]
            y_right = list(self.df[self.df["Origin frame number"]==idx]["Lower right corner Y"])[i]
            boxes.append([x_left, y_left, x_right, y_right])

        # convert everything into a torch.Tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # there is only one class
        labels = torch.ones((num_objs,), dtype=torch.int64)

        image_id = torch.tensor([idx])
        if num_objs == 0:
            print(idx)
            boxes = torch.as_tensor([[0,0,0,0]], dtype=torch.float32)
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        
        img = torch.tensor(img)
        return img, target

    def __len__(self):
        return len(self.imgs)

In [443]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [444]:
dataset = LISADataset()
data_loader = torch.utils.data.DataLoader(dataset, batch_size=20, shuffle=True, num_workers=0, collate_fn=collate_fn)

In [466]:
images,targets = next(iter(data_loader))

1453


In [431]:
device = torch.device('cuda:0')

In [467]:
images = list(torch.reshape(image, (3, image.shape[0], image.shape[1])).to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

In [468]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

In [469]:
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

# load a pre-trained model for classification and return
# only the features
backbone = torchvision.models.mobilenet_v2(pretrained=True).features
# FasterRCNN needs to know the number of
# output channels in a backbone. For mobilenet_v2, it's 1280
# so we need to add it here
backbone.out_channels = 1280

# let's make the RPN generate 5 x 3 anchors per spatial
# location, with 5 different sizes and 3 different aspect
# ratios. We have a Tuple[Tuple[int]] because each feature
# map could potentially have different sizes and
# aspect ratios
anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),),
                                   aspect_ratios=((0.5, 1.0, 2.0),))

# let's define what are the feature maps that we will
# use to perform the region of interest cropping, as well as
# the size of the crop after rescaling.
# if your backbone returns a Tensor, featmap_names is expected to
# be [0]. More generally, the backbone should return an
# OrderedDict[Tensor], and in featmap_names you can choose which
# feature maps to use.
roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=["0"],
                                                output_size=7,
                                                sampling_ratio=2)

# put the pieces together inside a FasterRCNN model
model = FasterRCNN(backbone,
                   num_classes=2,
                   rpn_anchor_generator=anchor_generator,
                   box_roi_pool=roi_pooler)

In [470]:
model.to(device)
model.train()

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): Sequential(
    (0): ConvBNReLU(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU6(inplace=True)
    )
    (1): InvertedResidual(
      (conv): Sequential(
        (0): ConvBNReLU(
          (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU6(inplace=True)
        )
        (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (2): InvertedResidual(
      (conv): Se

In [471]:
output = model(images,targets)

ValueError: All bounding boxes should have positive height and width. Found invalid box [0.0, 0.0, 0.0, 0.0] for target at index 11.

# Test

In [9]:
images,targets = next(iter(data_loader))

0 1397
1 1397
2 1397
0 1277
1 1277
2 1277


In [None]:
for epoch in range(2):
    
    start_time = time()
    model.train()
    lossHist.reset()
    
    for images, targets, image_ids in tqdm(trainDataLoader):
        
        images = torch.stack(images).to(device)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        bs = images.shape[0]
        
        loss_dict = model(images, targets)
        
        totalLoss = sum(loss for loss in loss_dict.values())
        lossValue = totalLoss.item()
        
        lossHist.update(lossValue,bs)

        optimizer.zero_grad()
        totalLoss.backward()
        optimizer.step()
    
    if lr_scheduler is not None:
        lr_scheduler.step(totalLoss)

    print(f"[{str(datetime.timedelta(seconds = time() - start_time))[2:7]}]")
    print(f"Epoch {epoch}/{EPOCHS}")
    print(f"Train loss: {lossHist.avg}")

# Примеры

In [None]:
ret, frame = cap.read()

In [8]:
frame.shape

(720, 1280, 3)

In [5]:
cv.imshow("frame", frame)
cv.waitKey()
cv.destroyAllWindows()