[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Denis-R-V/TSR/blob/main/simple_detector_v2.ipynb)

# Система распознавания дорожных знаков на датасете RTSD

In [76]:
import os
import json
import numpy as np 
import pandas as pd 

from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches

from pycocotools.coco import COCO
import fiftyone as fo
import time
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
import cv2

## Обучение модели

In [1]:

import os
import torch
import torch.utils.data
import torchvision
from PIL import Image
from pycocotools.coco import COCO

In [3]:

class RTSD_dataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        super().__init__()
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        # Own coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # path for input image
        path = coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        img = Image.open(os.path.join(self.root, path))

        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # Labels (In my case, I only one class: target class or background)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        # Tensorise img_id
        img_id = torch.tensor([img_id])
        # Size of bbox (Rectangular)
        areas = []
        for i in range(num_objs):
            areas.append(coco_annotation[i]['area'])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        # Iscrowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = img_id
        my_annotation["area"] = areas
        my_annotation["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)

In [2]:
# In my case, just added ToTensor
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)

In [4]:
# Data scaled and normalization for training and testing
#data_transforms = {
#    'train': transforms.Compose([
#        transforms.RandomResizedCrop(32),
##        transforms.ToTensor(),
 #       transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
 #   ]),
 #   'test': transforms.Compose([
 #       transforms.RandomResizedCrop(32),
 #       transforms.ToTensor(),
 #       transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
 #   ]),
#}
 

In [8]:
# path to your own data and coco file
train_data_dir = 'data'
train_coco = 'data/train_anno_reduced_bin_class.json'

# create own Dataset
my_dataset = RTSD_dataset(root=train_data_dir,
                          annotation=train_coco,
                          transforms=get_transform()
                          )

# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))

# Batch size
train_batch_size = 4

# own DataLoader
data_loader = torch.utils.data.DataLoader(my_dataset,
                                          batch_size=train_batch_size,
                                          shuffle=True,
                                          #num_workers=1,
                                          collate_fn=collate_fn)

loading annotations into memory...
Done (t=0.09s)
creating index...
index created!


In [9]:
# select device (whether GPU or CPU)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('mps')

# DataLoader is iterable over Dataset
#for imgs, annotations in data_loader:
#    imgs = list(img.to(device) for img in imgs)
#    annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
#    print(annotations)

In [4]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=None)
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [10]:
# 2 classes; Only target class or background
num_classes = 2
num_epochs = 1
model = get_model_instance_segmentation(num_classes)

# move model to the right device
model.to(device)
    
# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

len_dataloader = len(data_loader)

for epoch in range(num_epochs):
    model.train()
    i = 0    
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        print(f'Iteration: {i}/{len_dataloader}, Loss: {losses}')

Iteration: 1/473, Loss: 1.4519239664077759
Iteration: 2/473, Loss: 1.2817339897155762
Iteration: 3/473, Loss: 1.132690191268921
Iteration: 4/473, Loss: 0.9888722896575928
Iteration: 5/473, Loss: 0.9863886833190918
Iteration: 6/473, Loss: 0.9339880347251892
Iteration: 7/473, Loss: 0.6847689151763916
Iteration: 8/473, Loss: 0.5321398377418518
Iteration: 9/473, Loss: 0.36475276947021484
Iteration: 10/473, Loss: 0.46246615052223206
Iteration: 11/473, Loss: 0.29618746042251587
Iteration: 12/473, Loss: 0.30712226033210754
Iteration: 13/473, Loss: 0.24444308876991272
Iteration: 14/473, Loss: 0.30756765604019165
Iteration: 15/473, Loss: 0.14773672819137573
Iteration: 16/473, Loss: 0.2400197833776474
Iteration: 17/473, Loss: 0.1496814340353012
Iteration: 18/473, Loss: 0.12262721359729767
Iteration: 19/473, Loss: 0.24144229292869568
Iteration: 20/473, Loss: 0.14327022433280945
Iteration: 21/473, Loss: 0.08851338922977448
Iteration: 22/473, Loss: 0.22065573930740356
Iteration: 23/473, Loss: 0.148

In [13]:
torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            #'lr_scheduler_state_dict': lr_scheduler.state_dict(),
            #'loss_train': train_loss,
            #'loss_val': val_loss
            #}, os.path.join(dataset_path, f'./checkpoints/model_detector_resnet50_{epoch}.pth'))
            }, f'./checkpoints/model_detector_resnet50_{epoch}.pth')

In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('mps')
num_classes = 2
num_epochs = 1
model = get_model_instance_segmentation(num_classes)
model.to(device)
checkpoint = torch.load('checkpoints/model_detector_resnet50_0.pth')
model.load_state_dict(checkpoint['model_state_dict'])

<All keys matched successfully>

In [56]:
def get_prediction(img_path, img_name, threshold):
    model.eval()
    img = Image.open(os.path.join(img_path, img_name))
    transforms=get_transform()
    img = transforms(img).to(device)
    prediction = model([img])
    pred_boxes = [[i[0], i[1], i[2], i[3]] for i in list(prediction[0]['boxes'].detach().cpu().numpy())]
    pred_labels = list(prediction[0].get('labels').cpu().numpy())
    pred_scores = list(prediction[0].get('scores').detach().cpu().numpy())
    pred_tr = [pred_scores.index(x) for x in pred_scores if x > threshold][-1]
    pred_labels = pred_labels[:pred_tr+1]
    pred_boxes = pred_boxes[:pred_tr+1]
    pred_scores = pred_scores[:pred_tr+1]
    
    #boxes = []
    #for i in range(num_objs):
    #    min = coco_annotation[i]['bbox'][0]
    #    ymin = coco_annotation[i]['bbox'][1]
    #    xmax = xmin + coco_annotation[i]['bbox'][2]
    #    ymax = ymin + coco_annotation[i]['bbox'][3]
    #    boxes.append([xmin, ymin, xmax, ymax])
    
    return pred_boxes, pred_labels, pred_scores



img_path = 'data'
img_name = 'rtsd-frames/autosave16_10_2012_08_58_25_5.jpg'
threshold = 0.1
prediction = get_prediction(img_path, img_name, threshold)
prediction

([[1144.7318, 41.82097, 1278.5386, 121.2058],
  [1152.7927, 5.567641, 1246.3629, 134.04039]],
 [1, 1],
 [0.41962793, 0.3895109])

In [109]:
def get_predict_anno(img_path, img_name, threshold):

    pred_boxes, pred_labels, pred_scores = get_prediction(img_path, img_name, threshold)

    anno = {}

    images = []
    image_item = {}
    image_item['id'] = 0
    img = Image.open(os.path.join(img_path, img_name))
    transforms=get_transform()
    img = transforms(img).to(device)
    image_item['width'] = img.shape[2]
    image_item['height'] = img.shape[1]
    image_item['file_name'] = img_name
    images.append(image_item)
    anno['images'] = images

    annotations = []
    for i in range(len(pred_boxes)):
        anno_item = {}
        anno_item['id'] = i
        anno_item['image_id'] = 0
        anno_item['category_id'] = int(pred_labels[i])
        anno_item['area'] = (float(pred_boxes[i][2]) - float(pred_boxes[i][0]))*(float(pred_boxes[i][3]) - float(pred_boxes[i][1]))
        anno_item['bbox'] = [float(pred_boxes[i][0]), float(pred_boxes[i][1]), float(pred_boxes[i][2]) - float(pred_boxes[i][0]), float(pred_boxes[i][3]) - float(pred_boxes[i][1])]
        anno_item['iscrowd'] = 0
        annotations.append(anno_item)
    anno['annotations'] = annotations
    
    anno['categories'] = [{'id': 1, 'name': 'sign'}]

    with open(os.path.join(img_path, 'predicted_anno.json'), 'w') as f:
        json.dump(anno, f)#ensure_ascii=False, indent=4)
    f.close()



    # загрузка изображения в fiftyone
    data_path = img_path      # можно не указывать, если в JSON путь совпадает

    # The path yo the COCO labels JSON file
    labels_file = "train_anno_bin_class.json"
    labels_path = os.path.join(img_path, 'predicted_anno.json')


    # Import the dataset
    dataset = fo.Dataset.from_dir(
        dataset_type=fo.types.COCODetectionDataset,
        data_path=data_path,
        labels_path=labels_path
    )
    # Визуализация набора данных
    session = fo.launch_app(dataset)


    return anno


img_path = 'data'
img_name = 'rtsd-frames/autosave23_10_2012_11_27_23_1.jpg'
threshold = 0.6
prediction_anno = get_predict_anno(img_path, img_name, threshold)

 100% |█████████████████████| 1/1 [19.0ms elapsed, 0s remaining, 56.4 samples/s] 


In [8]:

# data loader
batch_size = 1
num_workers = 4

data_loader = torch.utils.data.DataLoader(my_dataset,
                                          batch_size=batch_size,
                                          shuffle=False,
                                          num_workers=num_workers
                                         )


import numpy as np
import matplotlib.pyplot as plt

for images, labels in data_loader:
    # image shape is [batch_size, 3 (due to RGB), height, width]
    img = transforms.ToPILImage()(images[0])
    plt.imshow(img)
    plt.show()
    print(labels)

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/anaconda3/envs/ds_env/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/anaconda3/envs/ds_env/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'RTSD_dataset' on <module '__main__' (built-in)>


KeyboardInterrupt: 

In [41]:
torch.cuda.empty_cache()