In [1]:
import matplotlib.pyplot as plt
import torchvision
from torchvision.io import read_video
from torchvision.utils import save_image
from typing import List, OrderedDict, Tuple
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2
from PIL import Image, ImageDraw, ImageFont
from torchvision import transforms
import torchinfo
import torch
from pathlib import Path
import cv2
from enum import Enum
import xml.etree.ElementTree as ET 
from typing import Dict
import pandas as pd
import json

In [14]:
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

In [59]:
def load_frcnn_model():
  frcnn = fasterrcnn_resnet50_fpn_v2()
  frcnn = frcnn.eval().cuda()
  torchinfo.summary(frcnn)
  return frcnn

class ObjectType(Enum):
  CONSTRUCTION = 1
  CYCLIST = 2
  VEHICLE = 3
  PEDESTRIAN = 4
  BUS = 5

class DetectedObject:
  def __init__(self, xmin, ymin, xmax, ymax):
    self.xmin = xmin
    self.ymin = ymin
    self.xmax = xmax
    self.ymax = ymax
  
  def __str__(self):
    return f'({self.xmin}, {self.ymin}, {self.xmax}, {self.ymax})'
    
class AnnotationObject:
  def __init__(self, object: ET.Element):
    self.type: ObjectType = ObjectType.__dict__[object.find("./name").text.upper()]
    self.xmin = float(object.find("./bndbox/xmin").text)
    self.xmax = float(object.find("./bndbox/xmax").text)
    self.ymin = float(object.find("./bndbox/ymin").text)
    self.ymax = float(object.find("./bndbox/ymax").text)
    
  def __str__(self):
    return f'({self.xmin}, {self.ymin}, {self.xmax}, {self.ymax})'
  
  @property
  def area(self):
    return (self.ymax-self.ymin)*(self.xmax-self.xmin)
    
  def intersect_area(self, other):
    dx = min(other.xmax, self.xmax) - max(other.xmin, self.xmin)
    dy = min(other.ymax, self.ymax) - max(other.ymin, self.ymin)
    if (dx>=0) and (dy>=0):
      return dx*dy
  
  def difference(self, other):
    intersect = self.intersect_area(other)
    if intersect is None:
      return 1
    my_area = self.area
    other_area = (other.ymax-other.ymin)*(other.xmax-other.xmin)
    score = ((my_area+other_area) - (2*intersect))/my_area
    return score if score<1 else 1

class Annotation:
  def __init__(self, tree: ET.ElementTree):
    root = tree.getroot()
    self.image_filename = root.find("filename").text
    self.objects: List[AnnotationObject] = []
    self.types = set()
    for object in root.findall("./object"):
      anot = AnnotationObject(object)
      self.objects.append(anot)
      self.types.add(anot.type)
    

def load_canada_dataset():
  parent_dir = Path("canada-dataset")
  annotation_dir = parent_dir.joinpath("Annotations")
  image_dir = parent_dir.joinpath("JPEGImages")
  
  annotations: Dict[str,Annotation] = {}
  for annotation in annotation_dir.glob("*.xml"):
    tree = ET.parse(annotation)
    a = Annotation(tree)
    annotations[a.image_filename] = a
  
  images: Dict[str,Image.Image] = {}
  for img_file in image_dir.glob("*.jpeg"):
    images[img_file.name] = Image.open(img_file)
    images[img_file.name].load()
  
  return annotations, images

def show_rectangle(img: Image.Image, xmin, ymin, xmax, ymax):
  tmp_img = img.copy()
  draw = ImageDraw.Draw(tmp_img)
  draw.rectangle([(xmin,ymin),(xmax,ymax)], outline="red", width=1)
  tmp_img.show()
  print(tmp_img.size)
  print(xmin, ymin, xmax, ymax)

def show_annotations(img: Image.Image, annotation: Annotation):
  tmp_img = img.copy()
  draw = ImageDraw.Draw(tmp_img)
  for obj in annotation.objects:
    print((obj.xmin,obj.ymin),(obj.xmax,obj.ymax))
    draw.rectangle([(obj.xmin,obj.ymin),(obj.xmax,obj.ymax)], outline="red", width=1)
  tmp_img.show()

def apply_annotations(
  img: Image.Image, true_annotations: List[AnnotationObject], 
  predicted_annotations, other_annotations, dif_scores
):
  tmp_img = img.copy()
  draw = ImageDraw.Draw(tmp_img)
  for i, true_annot in enumerate(true_annotations):
    draw.rectangle([(true_annot.xmin,true_annot.ymin),(true_annot.xmax,true_annot.ymax)], outline="green", width=1)
  for p_annot in predicted_annotations:
    draw.rectangle([(p_annot.xmin,p_annot.ymin),(p_annot.xmax,p_annot.ymax)], outline="orange", width=1)
  for p_annot in other_annotations:
    draw.rectangle([(p_annot.xmin,p_annot.ymin),(p_annot.xmax,p_annot.ymax)], outline="blue", width=1)
  for i, true_annot in enumerate(true_annotations):
    draw.text((true_annot.xmin,true_annot.ymin), f'{dif_scores[i]:.2f}', "red", font=ImageFont.truetype("arial.ttf", 15))
  return tmp_img

def extract_frcnn_detections(pred_bboxes):
  detections = []
  for box in pred_bboxes:
    detections.append(DetectedObject(int(box[1]), int(box[0]), int(box[3]), int(box[2])))
  return detections

In [3]:
annotations, images = load_canada_dataset()
frcnn = load_frcnn_model().eval()

In [78]:
frcnn = frcnn.cpu()

In [75]:
transform = transforms.Compose([
    transforms.ToTensor(),
])
img_raw = images['000001.jpeg']
img_raw = img_raw.resize((int(img_raw.size[0]*(800/img_raw.size[1])),800))
img = transform(img_raw).unsqueeze(0).cuda()
print(img.shape)
print(img_raw.size)
outputs = frcnn(img)
pred_scores = outputs[0]['scores'].detach().cpu().numpy()
pred_bboxes = outputs[0]['boxes'].detach().cpu().numpy()
labels = outputs[0]['labels']
boxes = extract_frcnn_detections(pred_bboxes)

torch.Size([1, 3, 800, 977])
(977, 800)


In [76]:
[str(box) for box in boxes]

['(12, 70, 37, 105)',
 '(0, 45, 23, 71)',
 '(0, 39, 18, 58)',
 '(16, 51, 45, 95)',
 '(6, 33, 20, 54)',
 '(7, 33, 36, 59)',
 '(19, 13, 61, 50)',
 '(7, 26, 27, 53)',
 '(23, 27, 43, 57)',
 '(13, 51, 31, 89)',
 '(19, 83, 38, 119)',
 '(10, 41, 32, 76)',
 '(18, 61, 34, 92)',
 '(7, 66, 23, 91)',
 '(16, 38, 42, 58)',
 '(24, 77, 48, 121)',
 '(4, 38, 29, 63)',
 '(11, 74, 30, 97)',
 '(38, 58, 53, 86)',
 '(40, 53, 54, 74)',
 '(4, 30, 7, 68)',
 '(22, 64, 41, 100)',
 '(10, 61, 27, 86)',
 '(10, 41, 23, 64)',
 '(45, 87, 54, 136)',
 '(8, 30, 11, 62)',
 '(49, 18, 68, 38)',
 '(0, 43, 4, 75)',
 '(41, 69, 57, 94)',
 '(7, 36, 11, 65)',
 '(9, 22, 14, 61)',
 '(39, 40, 56, 59)',
 '(12, 40, 15, 80)',
 '(6, 40, 9, 74)',
 '(29, 84, 38, 128)',
 '(4, 58, 8, 91)',
 '(7, 65, 11, 94)',
 '(1, 52, 5, 85)',
 '(6, 51, 17, 75)',
 '(8, 72, 12, 101)',
 '(45, 82, 57, 108)',
 '(9, 10, 17, 64)',
 '(49, 85, 60, 114)',
 '(7, 76, 13, 110)',
 '(0, 58, 16, 72)',
 '(6, 66, 10, 102)',
 '(10, 60, 15, 116)',
 '(7, 79, 22, 94)',
 '(14, 8

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])
def test_frcnn(out_dir: str, model):
  out_dir = Path(out_dir)
  out_dir.mkdir(exist_ok=True)
  track_list = set([ObjectType.BUS.name, ObjectType.VEHICLE.name, ObjectType.PEDESTRIAN.name, ObjectType.CYCLIST.name])
  detected_classes = set()
  summary = {}
  for tot in track_list:
    summary[f'total_{tot}_misses'] = 0
    summary[f'total_{tot}_loss_sum'] = 0
    summary[f'total_{tot}_objects'] = 0
  summary[f'total_VEHICLE_detections'] = 0
  summary[f'total_objects'] = 0
  summary[f'total_PERSON_detections'] = 0
      
  for img_file, annotation in annotations.items():
    img_raw = images[img_file]
    img = transform(img_raw).unsqueeze(0).cpu()
    out = model(img)[0]['boxes'].detach().cpu().numpy()
    detected_objects = extract_frcnn_detections(out)
    
    detected_interests = []
    detected_car_count = 0
    detected_person_count = 0
    detected_other = []
    for index, object in enumerate(detected_objects):
      # Collect all detected objects for image annotations
      detected_car_count += 1
      summary[f'total_VEHICLE_detections'] += 1
      detected_interests.append(object)
      
    dif_scores = []
    score_sum = 0
    miss_count = 0
    true_annotations = []
    summary['total_objects'] += len(annotation.objects)
    for true_obj in annotation.objects:
      tot = true_obj.type.name
      if tot in track_list:
        true_annotations.append(true_obj)
        summary[f'total_{tot}_objects'] += 1
        best = 1
        for object in detected_interests:
          dif = true_obj.difference(object)
          if dif is not None:
            if dif < best:
              best = dif
        dif_scores.append(best)
        score_sum += best
        summary[f'total_{tot}_loss_sum'] += best
        if best == 1:
          miss_count += 1
          summary[f'total_{tot}_misses'] += 1
    
    avg_score = score_sum/len(dif_scores) if len(dif_scores)>0 else 0
    annotated = apply_annotations(img_raw, true_annotations, detected_interests, detected_other, dif_scores)
    annotated.save(out_dir.joinpath(f'{img_file.rstrip(".jpeg")}-{avg_score:.2f}-{miss_count}-{len(dif_scores)}.jpeg'))
    
  total_score_sum = 0
  total_objects = 0
  total_found = 0
  for tot in track_list:
    score = summary[f'total_{tot}_loss_sum']
    objects = summary[f'total_{tot}_objects']
    misses = summary[f'total_{tot}_misses']
    summary[f'total_{tot}_avg_loss'] = score/objects if objects > 0 else 0
    summary[f'total_{tot}_pct_found'] = (objects-misses)/objects if objects > 0 else 1
    total_found += (objects-misses)
    total_score_sum += score
    total_objects += objects
  summary[f'total_avg_loss'] = total_score_sum/total_objects
  summary[f'total_pct_found'] = total_found/total_objects
  summary[f'detected_classes'] = list(detected_classes)
  with open(out_dir.joinpath('summary.json'), 'w') as outfile:
    json.dump(summary, outfile)
    
#test_yolo('test-small')
test_frcnn('test-frcnn', frcnn)