In [None]:
# Copyright (c) 2021  IBM Corporation
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

In [None]:
%load_ext autoreload
%autoreload 2

import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

import numpy as np
import os, json, cv2, random

from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.data.datasets import register_coco_instances

import matplotlib.pyplot as plt

import visualization as vis

In [None]:
path_to_training_json = 'dataset/annotations.json'
path_to_training_images = 'dataset/raw/images/'
register_coco_instances("my_dataset_train", {}, path_to_training_json, path_to_training_images)

In [None]:
dataset_train = MetadataCatalog.get("my_dataset_train")

In [None]:
from detectron2.engine import DefaultTrainer

cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/retinanet_R_50_FPN_3x.yaml"))
cfg.DATASETS.TRAIN = ("my_dataset_train",)
cfg.DATASETS.TEST = ()
cfg.DATALOADER.NUM_WORKERS = 1
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-Detection/retinanet_R_50_FPN_3x.yaml")
cfg.SOLVER.IMS_PER_BATCH = 2
cfg.SOLVER.BASE_LR = 0.0025
cfg.SOLVER.MAX_ITER = 1000
cfg.SOLVER.STEPS = []
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 4

os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)
trainer = DefaultTrainer(cfg) 
trainer.resume_or_load(resume=False)
trainer.train()

In [None]:
# print detail about the trained data
dataset_train

# Load trained detector model

In [None]:
from detectron2.utils.visualizer import ColorMode

In [None]:
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
cfg.MODEL.RETINANET.SCORE_THRESH_TEST = 0.4
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.4
cfg.DATASETS.TEST = ("balloon/val",)
predictor = DefaultPredictor(cfg)

# Load OCR

In [None]:
import easyocr
reader_ocr = easyocr.Reader(['ja','en'])

# Load test image

In [None]:
filepath_sign = 'dataset/raw/images/20210312_1_Station/IMG_0620.JPG'

im = cv2.imread(filepath_sign)
im = cv2.resize(im, (0, 0), fx=.5, fy=.5)
img_rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
im_bw = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
fig, axs = plt.subplots(1, 1, figsize=(15, 15))
axs.imshow(img_rgb)

In [None]:
from typing import Dict
from modules.group_signs import SignGroupConfigOption, detect_and_merge_results, compute_group_signs

In [None]:
# set config
config_options = SignGroupConfigOption()
config_options.label_dict = {
                                'text': 0,
                                'symbol': 1,
                                'direction text': 2,
                                'direction arrow': 3,
                            } # a dictionary mapping label text to class id of [predictor], e.g., {'texts' : 1}
config_options.groupsign_thres_min_ratio_cc = 0.05 # there must be at least 5% of a cc_id in a bbox to consider that bbox as having that cc_id
config_options.groupsign_param_expand_bbox = 0.25 # parameter to expand bbox
config_options.conncomps_img_scale = 0.5 # scaling of image used for computing connected components
config_options.conncomps_thres_same_color = 10 # intensity difference for computing connected components after getting the Laplacian image

In [None]:
# run grouping
dict_bboxes = detect_and_merge_results(img_rgb, predictor, reader_ocr, config_options) # use detectron2 and easyocr to detect texts/symbols/boxes
bboxes, labels = dict_bboxes['bboxes'], dict_bboxes['labels'] # extract output from the detector
dict_group_signs = compute_group_signs(img_rgb, bboxes, labels, config_options)  # group the signs
output_block_labels = dict_group_signs['output_block_labels'] # extract the group information

In [None]:
# visualize
fig, axs = plt.subplots(1, 1, figsize=(10, 10))
vis.visualize_groups(axs, img_rgb, bboxes, output_block_labels, None)
axs.axis('off')

# Test code

In [None]:
from sign_dataset import SignDataset
from tqdm import tqdm
from modules import metrics

In [None]:
data_folder = 'dataset/processed_data/'
dict_categories = np.load(os.path.join(data_folder, 'categories.npy'), allow_pickle=True).item()
n_categories = len(dict_categories['cat_relabel'])

# create SignDataset object
test_dataset = SignDataset(data_folder, n_categories, 'test', augment_crop=False)

In [None]:
idx_data_test = np.arange(370, 412)

# dict for saving data
dict_clus_acc = {}

# loop thru test data
for it_data in tqdm(idx_data_test):

    sample = test_dataset[it_data]

    # load image and extract related information
    im = cv2.imread(os.path.join('dataset/raw/images', sample.details['filename']))
    im = cv2.resize(im, (0, 0), fx=.5, fy=.5)
    img_rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)

    bboxes = sample.details['orig_segm']*0.5
    labels = np.array(sample.details['cat_id'])

    # compute the groups
    dict_group_signs = compute_group_signs(img_rgb, bboxes, labels, config_options)
    output_block_labels = dict_group_signs['output_block_labels']

    # evaluate
    clus_acc_it = metrics.compute_cluster_accuracy(sample.node_group_id, output_block_labels)
    dict_clus_acc[it_data] = clus_acc_it
    
avg_clus_acc = np.mean(list(dict_clus_acc.values()))
print("Average clustering acc: {:.4f}".format(avg_clus_acc))

In [None]:
# visualize
fig, axs = plt.subplots(1, 1, figsize=(10, 10))
vis.visualize_groups(axs, img_rgb, bboxes, output_block_labels, None)
axs.axis('off')