In [None]:
import json
import random
import glob
import os
import shutil
from collections import defaultdict
import time

import cv2
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import matplotlib.pyplot as plt
import tqdm

# from ultralytics import RTDETR
from ultralytics import YOLO

# 전역변수 설정
# 색상(list) (총 24개)
COLORS = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255),
          (0, 255, 255), (255, 128, 0), (128, 0, 255), (0, 255, 128), (255, 128, 128),
          (128, 255, 128), (128, 128, 255), (128, 128, 0), (128, 0, 128), (0, 128, 128),
          (192, 64, 0), (192, 192, 64), (64, 192, 192), (64, 64, 192), (192, 64, 192),
          (64, 192, 64), (255, 192, 128), (128, 255, 192), (128, 192, 255)]

# 라벨이름(dict) : area_code 및 PM_code (총 35개)
# 주의 : 원본데이터에서 라벨 34는 제외되어있음. 즉 1부터36까지 34제외하고 35개
LABEL_NAMES = ['인도', '횡단보도', '자전거 도로', '교차로', '중앙 차선', '안전지대',
              '정지선', '정지선 위반 판별구역', '보행자 신호등 녹색', '보행자 신호등 적색',
              '차량 신호등 녹색', '차량 신호등 적색', '오토바이', '오토바이_보행자도로 통행위반',
              '오토바이_안전모 미착용', '오토바이_무단횡단', '오토바이_신호위반', '오토바이_정지선위반',
              '오토바이_횡단보도 주행위반', '자전거', '자전거 캐리어', '자전거_보행자도로 통행위반',
              '자전거_안전모 미착용', '자전거_무단횡단', '자전거_신호위반', '자전거_정지선위반',
              '자전거_횡단보도 주행위반', '킥보드', '킥보드 캐리어', '킥보드_보행자도로 통행위반',
              '킥보드_안전모 미착용', '킥보드_무단횡단', '킥보드_신호위반', '킥보드_횡단보도 주행위반',
              '킥보드_동승자 탑승위반']
LABEL_NAME = {str(k):v for k, v in zip(list(range(1,34)) + [35, 36], LABEL_NAMES)}

# 블랙박스에서 위반사항만 검출하기위한 라벨
LABEL_NAMES2 = ['오토바이_보행자도로 통행위반', '오토바이_안전모 미착용', '오토바이_무단횡단',
                '오토바이_신호위반', '오토바이_정지선위반', '오토바이_횡단보도 주행위반',
                '자전거 캐리어', '자전거_보행자도로 통행위반', '자전거_안전모 미착용',
                '자전거_무단횡단', '자전거_신호위반', '자전거_정지선위반', '자전거_횡단보도 주행위반',
                '킥보드 캐리어', '킥보드_보행자도로 통행위반', '킥보드_안전모 미착용', '킥보드_무단횡단',
                '킥보드_신호위반', '킥보드_횡단보도 주행위반', '킥보드_동승자 탑승위반']

# data폴더 상대경로
FILE_PATH = 'data'

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" # kernel crash 방지


# 샘플 annot과 img확인 (파일없으면 랜덤으로)
def random_sample(file=None):
    """
    랜덤 이미지와 그에 맞는 라벨을 뽑아주는 함수
    """
    if file:
        choice = glob.glob(FILE_PATH + f'/라벨링데이터/**/{file}', recursive=True)
        choice = choice[0]
    else:
        # file이 입력이 없으면 data폴더안에 아무 json 선택
        choice = random.choice(
            glob.glob(FILE_PATH + '/**/*.json', recursive=True)
        )
    # json과 그에 맞는 jpg 불러오기
    with open(choice, 'r') as f:
        annot = json.load(f)

    choice = choice.replace('라벨링데이터', '원천데이터').replace('.json', '.jpg')
    img = Image.open(choice)

    return annot, img


# annotation 포함된 이미지 확인
def pic_with_annotation(annot, img):
    """
    라벨(Seg, bbox)을 이미지에 올려주는 함수
    """
    draw = ImageDraw.Draw(img, 'RGBA')
    font = ImageFont.truetype("./batang.ttc", 30)

    # polygon은 1부터 12까지 12종류의 고유한 area_code를 가진다.
    # 그에 맞는 고유 색상 매핑
    poly_color = {str(k):(r, g, b, 70) for k, (r, g, b) in \
                  zip(range(1,13), COLORS)}
    # 위반코드는 13부터 36까지 (34제외하고) 23종류의 고유한 PM_code를 가진다.
    # 그에 맞는 색상 매핑
    box_color = {str(k):(r, g, b, 255) for k, (r, g, b) in \
                  zip(range(13, 37), COLORS)}

    # polygon 그리기
    for seg in annot['annotations']['environment']:
        area_code = seg['area_code']
        # 원본 좌표가 y,x 로 되어있으므로 뒤집기
        points = [(x,y) for [y, x] in seg['points']]

        draw.polygon(points, fill=poly_color[area_code])
    
    # bbox 그리기
    for box in annot['annotations']['PM']:
        PM_code = box['PM_code']
        # points 양식은 [left, top, width, height] 이다.
        # PIL 양식은 [x0, y0, x1, y1] 이므로 이에 맞게 변환
        # 이때 x0, y0 : 왼쪽상단, x1, y1 : 오른쪽하단
        left, top, width, height = box['points']
        points = [left, top, left+width, top+height]

        draw.rectangle(points, outline=box_color[PM_code], width=3)

        text = LABEL_NAME[PM_code]
        tbbox = draw.textbbox([points[0], points[1]-31], text, font=font)
        draw.rectangle(tbbox, fill=box_color[PM_code])
        draw.text([points[0], points[1]-31], text, font=font, fill='black')


# yolo format의 Groud Truth 확인
def yolo_gt(file=None):
    if not file:
        file = random.choice(glob.glob('data/coco/test/*.txt'))

    with open(file, 'r') as f:
        annot = f.read()

    img = Image.open(file.replace('.txt', '.jpg'))
    draw = ImageDraw.Draw(img, 'RGB')

    for line in annot.split('\n'):
        width = img.size[0]
        height = img.size[1]
        bbox = list(map(float, line.split(' ')))[1:]
        x0 = (bbox[0] - bbox[2]/2) * width
        y0 = (bbox[1] - bbox[3]/2) * height
        x1 = (bbox[0] + bbox[2]/2) * width
        y1 = (bbox[1] + bbox[3]/2) * height
        draw.rectangle([x0,y0,x1,y1], outline='red', width=3)
    return img


In [None]:
# COCO 형식에서 YOLO에 맞는 형태로 변환
# YOLOv5 라벨형식 : class_label, x_mid(0~1), y_mid(0~1), width(0~1), height(0~1) (txt파일)
# 그림 사이즈는 (1920*1080), (3072*1728) 로 두개이다.
def coco_to_yolo(file, mode='train'):
    """
    coco annotation을 YOLOv5 format으로 변경해누즌 함수
    Bounding Box와 라벨만 변환
    """
    # json dict가 아니라 경로로 되있으면 dict로 변환
    if isinstance(file, str):
        with open(file, 'r') as f:
            file = json.load(f)


    print(f'{mode} annotation 변환')
    # image_id 별로 annotation추가 (string형식)
    yolo_annots = {}
    for annot in tqdm.tqdm(file['annotations'], desc='annot처리중'):
        img_id = annot['image_id']
        # YOLO의 라벨은 0부터 시작한다! COCO는 1부터 시작하므로 1빼기
        label = annot['category_id'] - 1  
        left = annot['bbox'][0]
        top = annot['bbox'][1]
        width = annot['bbox'][2]
        height = annot['bbox'][3]

        x = (left + width/2)
        y = (top + height/2)

        width = width
        height = height

        yolo_annots.setdefault(img_id, '')
        yolo_annots[img_id] += f'{label} {x:.5f} {y:.5f} {width:.5f} {height:.5f}\n'

    # image 별로 annotation을 txt파일로 저장
    for image in tqdm.tqdm(file['images'], desc='.txt로 저장중'):
        file_name = image['file_name'].replace('.jpg', '.txt')
        img_width = image['width']
        img_height = image['height']
        img_id = image['id']

        # 이미지 크기로 bbox정규화
        reg_annots = ''
        for line in yolo_annots[img_id][:-1].split('\n'):
            ls = line.split(' ')
            ls[1] = float(ls[1]) / img_width
            ls[2] = float(ls[2]) / img_height
            ls[3] = float(ls[3]) / img_width
            ls[4] = float(ls[4]) / img_height

            # 이상하면 그림 불러와서 사이즈 재확인
            if ls[1] > 1 or ls[2] > 1 or ls[3] > 1 or ls[4] > 1:
                with Image.open(os.path.join('data/coco/', mode, file_name.replace('.txt', '.jpg'))) as img:
                    real_width, real_height = img.size

                ls[1] = ls[1] * img_width / real_width
                ls[2] = ls[2] * img_height / real_height
                ls[3] = ls[3] * img_width / real_width
                ls[4] = ls[4] * img_height / real_height
                print(f'파일 {file_name} 표시 {img_width}*{img_height}\t실제 {real_width}*{real_height}로 정규화됨')
            # nan있으면 넘어가기
            if any(np.isnan(i) for i in ls[1:]):
                print(f'파일 {file_name} nan발생 {ls[1]}, {ls[2]}, {ls[3]}, {ls[4]}')
                continue
                
            assert ls[1] <= 1, f'x이상 {ls[1]}, width {img_width} img_id {img_id} file name {file_name}'
            assert ls[2] <= 1, f'y이상 {ls[2]}, height {img_height} img_id {img_id} file name {file_name}'
            assert ls[3] <= 1, f'w이상 {ls[3]}, width {img_width} img_id {img_id} file name {file_name}'
            assert ls[4] <= 1, f'h이상 {ls[4]}, height {img_height} img_id {img_id} file name {file_name}'
            reg_annots += f'{ls[0]} {ls[1]:.5f} {ls[2]:.5f} {ls[3]:.5f} {ls[4]:.5f}\n'
        reg_annots = reg_annots[:-1]

        # 이미지와 같은 디렉토리에 txt저장
        with open(os.path.join(FILE_PATH, 'coco', mode, file_name), 'w') as f:
            f.write(reg_annots)
        
    return 

# 특정경로의 txt 모두 제거
def txt_remove(path): 
    for file in glob.glob(os.path.join(path, '*.txt')):
        os.remove(file)

# coco_to_yolo('data/coco/coco_annotations/test_annotations.json', mode='test')
# coco_to_yolo('data/coco/coco_annotations/valid_annotations.json', mode='val')
# coco_to_yolo('data/coco/coco_annotations/train_annotations.json', mode='train')

In [None]:
# 블랙박스용으로, 훈련용 블랙박스데이터 11.5GB, 검증용 1.3GB 변환

def to_YOLO_blackbox(mode='train'):
    # 블랙박스 데이터 폴더 생성
    os.makedirs(FILE_PATH + '/blackbox_yolo', exist_ok=True)
    os.makedirs(FILE_PATH + '/blackbox_yolo/val', exist_ok=True)
    os.makedirs(FILE_PATH + '/blackbox_yolo/train', exist_ok=True)

    # x-labeller포맷변환
    path = '/블랙박스/라벨링데이터/**/*.json' if mode=='train' else '/라벨링데이터/**/*_B_*.json'
    jsons = glob.glob(FILE_PATH + path, recursive=True)
    yolo_label_dict = dict(zip([i for i in range(13, 37) if i not in [13, 20, 28, 34]], range(len(LABEL_NAMES2))))

    for file in tqdm.tqdm(jsons, desc=f'블랙박스 {mode} 변환중'):
        with open(file, 'r') as f:
            annot = json.load(f)

        jpg_file = file.replace('라벨링데이터', '원천데이터').replace('.json', '.jpg')
        jpg = Image.open(jpg_file)
        img_width, img_height = jpg.size

        # bbox와 카테고리 변환
        string = ''
        
        for seg in annot['annotations']['PM']:
            # 오토바이(13), 자전거(20), 킥보드(28)는 위반사항이 아니므로 넘어가기
            # 이후 14~36까지 되있는 라벨을 0~19까지로 변환
            if (label:=int(seg['PM_code'])) in [13, 20, 28]:
                continue
            else:
                label = yolo_label_dict[label]

            left, top, width, height = seg['points']
            # 0~1 사이로 정규화
            x = (left + width/2) / img_width
            y = (top + height/2) / img_height
            width = width / img_width
            height = height / img_height

            string += f'{label} {x:.5f} {y:.5f} {width:.5f} {height:.5f}\n'
        string = string[:-1]

        # blackbox_yolo/train에 img옮기고 txt추가
        shutil.copy(jpg_file, FILE_PATH + f'/blackbox_yolo/{mode}/{os.path.basename(jpg_file)}')
        # 있는라벨들을 제거했기때문에, bbox가 없는 이미지들이 있다.
        # 해당이미지들은 배경이미지로 학습되도록 annotation파일 만들지 않기
        if bool(string) is True:
            with open(FILE_PATH + f'/blackbox_yolo/{mode}/' + os.path.basename(file).replace('.json', '.txt'), 'w') as f:
                f.write(string)
    

# to_YOLO_blackbox(mode='val')
# to_YOLO_blackbox(mode='train')
# train 23187개
# val 2732개
# 이중에 txt로 annotation이 있는게 절반정도.. 나머진 위반사항 없이 자전거, 오토바이, 킥보드만 있다.

In [None]:
import re
# 데이터 비율확인

# train 39496
# valid 11284
# test 5643

files = glob.glob('data/라벨링데이터/**/*.json', recursive=True)
n_all = len(files)
n_cctv = len([1 for file in files if bool(re.search(r'_C_', file))])
n_black = len([1 for file in files if bool(re.search(r'_B_', file))])
n_day = len([1 for file in files if bool(re.search(r'_D_', file))])
n_night = len([1 for file in files if bool(re.search(r'_N_', file))])
n_fine = len([1 for file in files if bool(re.search(r'_F_', file))])
n_rain = len([1 for file in files if bool(re.search(r'_R_', file))])

print(f'훈련\t: 39496')
print(f'검증\t: 11284')
print(f'테스트\t: 5643')
print(f'총합\t: {39496+11284+5643}\n')
print(f"CCTV\t: {n_cctv}\t\t비율 : {n_cctv/n_all:.3f}")
print(f"블랙박스\t: {n_black}\t\t비율 : {n_black/n_all:.3f}")
print(f"주간\t: {n_day}\t\t비율 : {n_day/n_all:.3f}")
print(f"야간\t: {n_night}\t\t비율 : {n_night/n_all:.3f}")
print(f"맑음\t: {n_fine}\t\t비율 : {n_fine/n_all:.3f}")
print(f"우천\t: {n_rain}\t\t비율 : {n_rain/n_all:.3f}")

plt.figure(figsize=(14,4))
plt.suptitle('데이터세트 비율', fontsize=20)

plt.subplot(1,4,1)
plt.pie([39496, 11284, 5643],labels=['훈련', '검증','테스트'], autopct='%.1f%%',
        shadow=True, explode=[0, 0.1, 0.1])
plt.title('훈련 vs 검증 vs 테스트')

plt.subplot(1,4,2)
plt.pie([n_cctv, n_black],labels=['CCTV', '블랙박스'], autopct='%.1f%%',
        shadow=True, explode=[0, 0.3])
plt.title('CCTV vs 블랙박스')

plt.subplot(1,4,3)
plt.pie([n_day, n_night],labels=['주간', '야간'], autopct='%.1f%%',
        shadow=True, explode=[0, 0.3])
plt.title('주간 vs 야간')

plt.subplot(1,4,4)
plt.pie([n_fine, n_rain],labels=['맑음', '우천'], autopct='%.1f%%',
        shadow=True, explode=[0, 0.3])
plt.title('맑음 vs 우천')
plt.tight_layout()
plt.show()

## YOLOv8n 훈련 및 평가

In [None]:
# YAML파일 작성

yaml = f"""# YOLOv8 설정파일
# 경로설정
path: ../{FILE_PATH}/coco/ # root (루트이전에 기본 datasets폴더를 가정하므로 ..로 한번 나가기)
train: train
val: val
test: test

# 클래스 설정
nc: 23
names: {LABEL_NAMES[12:]}
"""

with open('yolo_train.yaml', 'w') as f:
    f.write(yaml)

In [None]:
# nano모델 훈련

model = YOLO('yolov8n.pt')

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" # kernel crash 방지
# 디버깅 옵션
# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# 모델 학습 (파라미터 3M)
# results = model.train(data='yolo_train.yaml',
#                       epochs=50,
#                       batch=32,
#                       imgsz=640,
#                       close_mosaic=0,
#                       workers=4,
#                       save_period=5)

# 사이트 패키지 설치장소 Lib\site-packages\ultralytics\utils\plotting.py
# 에 plt.rcParams['font.family'] = 'Gulim' 적용해야 한글라벨 정상출력
# close_mosaic=0 으로 설정해서 mosaic augumentation 적용해제 가능

In [None]:
# 훈련된 모델 불러와서 검증
model = YOLO('YOLOv8/train/weights/best.pt')

# metrics = model.val(data='yolo_train.yaml') 

In [None]:
print(f'mAP@50-95   : {metrics.box.map:.3f}')
print(f'mAP@50      : {metrics.box.map50:.3f}')
print(f'mAP@75      : {metrics.box.map75:.3f}')
print(f'inferece speed : {metrics.speed["inference"]:.3f}ms')

cls_maps = [metrics.maps[i] for i in range(23)][::-1]
cls_names = [metrics.names[i] for i in range(23)][::-1]
c = ['C1' if i==max(cls_maps) else 'C2' if i==min(cls_maps) else 'C0' for i in cls_maps]
plt.figure(figsize=(10,8))
plt.barh(cls_names, cls_maps, color=c)
plt.title('YOLOv8n 29Epochs 클래스별 mAP@50-95', fontsize=20)
plt.show()

# 오토바이_정지선위반이 가장 정확도가 낮다. (~0.72)
# 오토바이_횡단보도 주행위반이 가장 정확도가 낮다. (~0.14)

## RT-DETR 훈련 및 평가

In [None]:
# YAML파일 작성

yaml = f"""# RT-DETR-l 설정파일
# 경로설정
path: ../{FILE_PATH}/blackbox_yolo/ # root (루트이전에 기본 datasets폴더를 가정하므로 ..로 한번 나가기)
train: train
val: val
test: test

# 클래스 설정
nc: 23
names: {LABEL_NAMES[12:]}
"""

with open('rtdetr_train.yaml', 'w') as f:
    f.write(yaml)

In [None]:

# 훈련시간이 길것으로 예상..

model2 = RTDETR('rtdetr-l.pt')

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" # kernel crash 방지
# 디버깅 옵션
# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# 모델 학습 (파라미터 32M)
# results = model2.train(data='rtdetr_train.yaml',
#                        epochs=50,
#                        batch=4,
#                        imgsz=640,
#                        close_mosaic=2,
#                        workers=4,
#                        project='RTDETR',
#                        name='train',
#                        freeze=1)

In [None]:
# 코랩에서 50 epoch 학습시킨 모델 불러와서 평가
model2 = RTDETR('RTDETR/train/weights/best.pt')
# metrics = model2.val()

In [None]:
print(f'mAP@50-95   : {metrics.box.map:.3f}')
print(f'mAP@50      : {metrics.box.map50:.3f}')
print(f'mAP@75      : {metrics.box.map75:.3f}')
print(f'inferece speed : {metrics.speed["inference"]:.3f}ms')

cls_maps = [metrics.maps[i] for i in range(20)][::-1]
cls_names = [metrics.names[i] for i in range(20)][::-1]
c = ['C1' if i==max(cls_maps) else 'C2' if i==min(cls_maps) else 'C0' for i in cls_maps]
plt.figure(figsize=(10,8))
plt.barh(cls_names, cls_maps, color=c)
plt.title('RT-DETR-l 50Epochs 클래스별 mAP@50-95', fontsize=20)
plt.show()

# 오토바이_정지선위반이 가장 정확도가 낮다. (~0.9)
# 오토바이_횡단보도 주행위반이 가장 정확도가 낮다. (~0.15)

## YOLOv8m 훈련 및 평가
* 기존 RT-DETR과 YOLOv8n의 트래킹성능이 별로다.
    1. 클래스의 형상이 비슷해서, 하나의 객체에 여러 클래스가 잡히는경우있음
    2. 클래스 정확도는 낮고, bbox 위치정확도는 높음.  
>
* YOLOv8m 으로 학습 (파라미터 25M)
* 추론시 agnostic-NMS를 사용한다. 또, cls_loss를 높이고 box_loss를 낮추어 학습
* RT-DETR의 데이터 적용사항 롤백. (다시 23개 클래스사용,  블박, CCTV 전부 포함하여 학습)

In [None]:
# YOLOv8m YAML파일 작성
FILE_PATH = '/data/coco'

data_yaml = f"""# YOLOv8m 설정파일
# 경로설정
path: ..{FILE_PATH} # root (루트이전에 기본 datasets폴더를 가정하므로 ..로 한번 나가기)
train: train
val: val
test: test

# 클래스 설정
nc: 23
names: {LABEL_NAMES[12:]}
"""

train_yaml = f"""# 학습 및 추론환경 설정
# 현재 데이터는 사실상 오토바이, 자전거, 킥보드 3개의 형상밖에 없다.
# 비슷한 형상에, 여러가지 위반사항이 존재한다.
# 따라서 형상 위치를 찾는것(localization)은 쉽지만, 어떤 위반인지 판별(classification)이 힘들다.
# 위반사항의 불균형이 있으며, 이에따라 AP의 차이가 있는편이다.

# val & pred  ------------------------------------------------
agnostic_nms: True  # 추론시 클래스 상관없이 NMS
iou: 0.8 # default 0.7. agnostic-NMS인만큼 IoU 기준을 높임 (근처의 다른 위반행위 삭제 방지)
max_det: 50 # default 300. 일반적으로 도로에 오토바이, 자전거, 킥보드수가 많진 않으므로 최대 50개 탐지
half: True # 기본적으로 반정밀도 사용

# train ------------------------------------------------
close_mosaic: 20 # 학습종료 20epoch 전부터 모자이크 증강 해제
box: 5.0 # default 7.5. bbox좌표오차 패널티 완화
cls: 3.0 # default 0.5. 클래스 오분류 패널티 강화
dfl: 2.0 # default 1.5. 클래스 불균형을 해결하기위해 살짝 증가
label_smoothing: 0.1 # 예측확률 신뢰도 증가 및 일반화성능 증가를 위해 라벨 스무딩
cache: True

# augmentation ------------------------------------------------
hsv_s: 0.2  # default 0.7(frac). 채도 변경비율 낮추기
hsv_v: 0.2  # default 0.4(frac). 명도 변경비율 낮추기
scale: 0.1  # default 0.5(gain). 이미지 스케일 변경 범위 낮추기
mosaic: 0.5 # default 1(prob). 모자이크 확률 낮추기
"""

with open('yolov8m_data.yaml', 'w') as f:
    f.write(data_yaml)
    print('data yaml 추가완료')

with open('yolov8m_train.yaml', 'w') as f:
    f.write(train_yaml)
    print('train yaml 추가완료')

In [None]:
# YOLOv8m 모델 훈련 (코랩 A100으로 진행)
model3 = YOLO('yolov8m.pt')
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" # kernel crash 방지
# results = model3.train(data='yolov8m_data.yaml',
#                        cfg='yolov8m_train.yaml',
#                        epochs=100,
#                        patience=10,
#                        batch=-1, # 자동배치
#                        imgsz=640,
#                        project='drive/MyDrive/YOLOv8m',
#                        name='train')

In [None]:
# 훈련된 모델 불러오기
model3 = YOLO('YOLOv8m/train/weights/last.pt')
metrics = model3.val(project='YOLOv8m', name='val2')

In [None]:
%matplotlib inline
print(f'mAP@50-95   : {metrics.box.map:.3f}')
print(f'mAP@50      : {metrics.box.map50:.3f}')
print(f'mAP@75      : {metrics.box.map75:.3f}')
print(f'inferece speed : {metrics.speed["inference"]:.3f}ms')

cls_maps = [metrics.maps[i] for i in range(20)][::-1]
cls_names = [metrics.names[i] for i in range(20)][::-1]
c = ['C1' if i==max(cls_maps) else 'C2' if i==min(cls_maps) else 'C0' for i in cls_maps]
plt.figure(figsize=(10,8))
plt.barh(cls_names, cls_maps, color=c)
plt.title('YOLOv8m 78Epochs 클래스별 mAP@50-95', fontsize=20)
plt.show()


In [None]:
# 추가로 훈련한 모델 평가
# 모자이크 증강없애고, 스케일 증강 확대, AdamW 옵티마이저로 변경, 배치사이즈 16으로 줄여서 40Epoch 추가로 학습

# 훈련된 모델 불러오기
model4 = YOLO('YOLOv8m/second_train/weights/last.pt')
# metrics = model4.val(project='YOLOv8m', name='train2_val', verbose=False)

print(f'mAP@50-95   : {metrics.box.map:.3f}')
print(f'mAP@50      : {metrics.box.map50:.3f}')
print(f'mAP@75      : {metrics.box.map75:.3f}')
print(f'inferece speed : {metrics.speed["inference"]:.3f}ms')

cls_maps = [metrics.maps[i] for i in range(20)][::-1]
cls_names = [metrics.names[i] for i in range(20)][::-1]
c = ['C1' if i==max(cls_maps) else 'C2' if i==min(cls_maps) else 'C0' for i in cls_maps]
plt.figure(figsize=(10,8))
plt.barh(cls_names, cls_maps, color=c)
plt.title('YOLOv8m 118Epochs 클래스별 mAP@50-95', fontsize=20)
plt.show()

## 모델 성능비교

In [None]:
model_names = ['Faster_RCNN (5epoch)', 'RTMDet (10epoch)', 'YOLOv3mobile (3epoch)', 'RT-DETR-l (50epoch)',
               'YOLOv8n (50epoch)', 'YOLOv8s (50epoch)', 'YOLOv8m (78epoch)']
model_mAP5095 = [0.35, 0.23, 0.24, 0.596, 0.57, 0.62, 0.61]

plt.barh(model_names, model_mAP5095,
         color=['C1' if i==max(model_mAP5095) else 'C0' for i in model_mAP5095])
plt.title('val mAP50-95 of 7 models', fontsize=20)
plt.plot()

## 비디오 트래킹

In [None]:
model4 = YOLO('YOLOv8m/second_train/weights/last.pt')
                                  
# Open the video file
cap = cv2.VideoCapture('data/example2.mkv')
video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter('example.mp4',cv2.VideoWriter_fourcc(*'DIVX'), 20, (video_width, video_height))

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()

    if success:
        # Run YOLOv8 tracking on the frame, persisting tracks between frames
        results = model4.predict(frame,
                                 conf=0.1,
                                 iou=0.7,
                                 agnostic_nms=True)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        # Display the annotated frame
        cv2.namedWindow('YOLOv8m Tracking', cv2.WINDOW_NORMAL)
        cv2.imshow("YOLOv8m Tracking", annotated_frame)

        out.write(annotated_frame)
        
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            cv2.destroyAllWindows()
            break
            
    else:
        # Break the loop if the end of the video is reached
        cv2.destroyAllWindows()
        break

cap.release()
out.release()
cv2.destroyAllWindows()


In [None]:
model4 = YOLO('YOLOv8m/second_train/weights/last.pt')

# 비디오 열기
video_path = "data/example3.mp4"
cap = cv2.VideoCapture(video_path)
video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter('example4_track.mp4',cv2.VideoWriter_fourcc(*'DIVX'), 30, (video_width, video_height))

# 트래킹 히스토리 딕셔너리 선언
track_history = defaultdict(lambda: [])

while cap.isOpened():
    success, frame = cap.read()
    
    if success:
        # 프레임별로 트래킹
        results = model4.track(frame, persist=True, tracker='botsort.yaml', imgsz=640, agnostic_nms=True)

        # bbox와 id 획득
        boxes = results[0].boxes.xywh.cpu()
        try:
            track_ids = results[0].boxes.id.int().cpu().tolist()
        except:
            track_ids = [None] * len(boxes)

        # bbox 및 id추론 표시된 프레임
        annotated_frame = results[0].plot()

        # bbox 중앙에 흰점으로 트래킹 결과 표시
        for box, track_id in zip(boxes, track_ids):
            x, y, w, h = box
            track = track_history[track_id]
            track.append((float(x), float(y)))
            if len(track) > 30:  # 30프레임 이상 유지시 처음 트랙히스토리 삭제
                track.pop(0)

            # 흰점 표시
            points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
            cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=10)

        # 추론결과 표시 (저장만할시 주석처리)
        cv2.namedWindow('YOLOv8 Tracking', cv2.WINDOW_NORMAL)
        cv2.imshow("YOLOv8 Tracking", annotated_frame)

        # 추론결과 저장
        out.write(annotated_frame)

        # q 누르면 강제종료
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        break

# 비디오 리더와 라이터 객체, 창 닫기
cap.release()
out.release()
cv2.destroyAllWindows()


## 모델 ONNX export 및 ONNX 추론

In [None]:
model4 = YOLO('YOLOv8m/second_train/weights/last.pt')
# opset 17 Export
# model4.export(format='ONNX', imgsz=640, simplify=True) # CPU용 Export
# model4.export(format='ONNX', imgsz=640, half=True) # GPU용 Export

In [None]:
def choose_img(file=None):
    if not file:
        file = random.choice(glob.glob('data/coco/test/*.jpg'))
    array = cv2.imread(file)
    # BGR 형태로 반환
    return array

test_img = choose_img()

In [None]:
import cv2.dnn
import PIL
import time

onnx_model = cv2.dnn.readNetFromONNX('YOLOv8m/second_train/weights/YOLOv8m.onnx')

def inference(model:cv2.dnn.Net, img, conf=0.25, nms_th=0.8):
    """
    ONNX 모델 받아서 추론
    
    returns:
        result : bbox표기된 이미지 (width, height, RGB)
        infer_time : 추론시간 (ms)
    """
    CLASS_NAMES = LABEL_NAMES[12:]
    [height, width, _] = img.shape
    length = max((height, width))
    resized_img = np.zeros((length, length, 3), np.uint8)
    resized_img[0:height, 0:width] = img
    scale = length / 640

    blob = cv2.dnn.blobFromImage(resized_img, scalefactor=1 / 255, size=(640, 640), swapRB=True)
    model.setInput(blob)

    # 출력 형태 (batch, 27, 8400)
    # 여기서 27은 차례대로 bbox좌표4개(x,y,w,h) + 클래스 23개
    # bbox의 숫자 8400개
    t1 = time.time()
    outputs = model.forward()
    t2 = time.time()
    infer_time = round((t2-t1)*1000, 2)
    # 출력형태 (8400, 27)로 변환 후 agnostic NMS 진행
    outputs = outputs[0].transpose()

    boxes = [] 
    cls_ids = []
    scores = []
    # 기준신뢰도 이상의 box만 추출
    for row in outputs:
        cls_score = row[4:]
        x, y, w, h = row[:4]
        # 최대 최소 및 최대 index 구하기
        minScore, maxScore, (_, minClassLoc), (_, maxClassIndex) = cv2.minMaxLoc(cls_score)
        if maxScore <= conf: # 기준 신뢰도 이하면 버리고 다음 row
            continue
        # left, top, width, height로 변환 (NMSBoxes input format)
        x1, y1= x-(0.5*w), y-(0.5*h)
        boxes.append([x1,y1,w,h])
        scores.append(maxScore)
        cls_ids.append(maxClassIndex)
    
    # agnostic NMS (선택된 bbox index list출력)
    nms_indices = cv2.dnn.NMSBoxes(boxes, scores, conf, nms_th)

    # 선택된 bbox 그리기 (cv2는 한글폰트 적용 x => PIL 사용)
    img = PIL.Image.fromarray(img[..., ::-1])
    font = PIL.ImageFont.truetype("./batang.ttc", 30)
    draw = PIL.ImageDraw.Draw(img, 'RGB')
    for idx in nms_indices:
        cls_name = CLASS_NAMES[cls_ids[idx]]
        score = scores[idx]
        x1, y1, w, h = np.array(boxes[idx]) * scale # 640*640에 원본스케일 곱하기
        color = COLORS[cls_ids[idx]]

        draw.rectangle((x1,y1,x1+w,y1+h), outline=color, width=5)
        tbbox = draw.textbbox([x1, y1-30], f'{score:.2f} '+cls_name, font=font)
        draw.rectangle(tbbox, fill=color)
        draw.text([x1, y1-30], f'{score:.2f} '+cls_name, font=font, fill='black')

    # 640*640*RGB 행렬 반환
    result = np.asarray(img)
    return result, infer_time

In [None]:
import onnxruntime

onnxrt_model = onnxruntime.InferenceSession('YOLOv8m.onnx')


def onnxrt_inference(model, img, conf=0.25, nms_th=0.8):
    """
    ONNX 모델 받아서 추론 (ONNX runtime 버전)
    
    returns:
        result : bbox표기된 이미지 (width, height, RGB)
        infer_time : 추론시간 (ms)
    """
    CLASS_NAMES = LABEL_NAMES[12:]
    [height, width, _] = img.shape
    length = max((height, width))
    resized_img = np.zeros((length, length, 3), np.float32)
    resized_img[0:height, 0:width] = img / 255
    resized_img = cv2.resize(resized_img, (640,640))[np.newaxis,...].transpose(0, 3, 1, 2)
    scale = length / 640

    input_name = model.get_inputs()[0].name

    # 출력 형태 (batch, 27, 8400)
    # 여기서 27은 차례대로 bbox좌표4개(x,y,w,h) + 클래스 23개
    # bbox의 숫자 8400개
    t1 = time.time()
    outputs = model.run(None, {input_name:resized_img})
    t2 = time.time()
    infer_time = round((t2-t1)*1000, 2)
    # 출력형태 (8400, 27)로 변환 후 agnostic NMS 진행
    outputs = outputs[0].transpose().squeeze()
    
    boxes = [] 
    cls_ids = []
    scores = []
    # 기준신뢰도 이상의 box만 추출
    for row in outputs:
        cls_score = row[4:]
        x, y, w, h = row[:4]
        # 최대 최소 및 최대 index 구하기
        minScore, maxScore, (_, minClassLoc), (_, maxClassIndex) = cv2.minMaxLoc(cls_score)
        if maxScore <= conf: # 기준 신뢰도 이하면 버리고 다음 row
            continue
        # left, top, width, height로 변환 (NMSBoxes input format)
        x1, y1= x-(0.5*w), y-(0.5*h)
        boxes.append([x1,y1,w,h])
        scores.append(maxScore)
        cls_ids.append(maxClassIndex)
    
    # agnostic NMS (선택된 bbox index list출력)
    nms_indices = cv2.dnn.NMSBoxes(boxes, scores, conf, nms_th)

    # 선택된 bbox 그리기 (cv2는 한글폰트 적용 x => PIL 사용)
    img = PIL.Image.fromarray(img[..., ::-1])
    font = PIL.ImageFont.truetype("./batang.ttc", 30)
    draw = PIL.ImageDraw.Draw(img, 'RGB')
    for idx in nms_indices:
        cls_name = CLASS_NAMES[cls_ids[idx]]
        score = scores[idx]
        x1, y1, w, h = np.array(boxes[idx]) * scale # 640*640에 원본스케일 곱하기
        color = COLORS[cls_ids[idx]]

        draw.rectangle((x1,y1,x1+w,y1+h), outline=color, width=5)
        tbbox = draw.textbbox([x1, y1-30], f'{score:.2f} '+cls_name, font=font)
        draw.rectangle(tbbox, fill=color)
        draw.text([x1, y1-30], f'{score:.2f} '+cls_name, font=font, fill='black')

    # 640*640*RGB 행렬 반환
    result = np.asarray(img)
    return result, infer_time

In [None]:
test_img = choose_img()
result, infer_time = onnxrt_inference(onnxrt_model, test_img, conf=0.5)
plt.figure(figsize=(10,10))
plt.imshow(result)
plt.title(f'cv2.dnn 추론시간 {infer_time}ms', fontsize=15)
plt.axis('off')
plt.show()

result, infer_time = inference(onnx_model, test_img, conf=0.5)
plt.figure(figsize=(10,10))
plt.imshow(result)
plt.title(f'ONNX runtime 추론시간 {infer_time}ms', fontsize=15)
plt.axis('off')
plt.show()

# cv2.dnn보다 onnxruntime이 추론시간이 매우빠르다.
# 런타임 패키지도 포함해서 서버백엔드 구성하는편이 좋다.

In [None]:
cap = cv2.VideoCapture('data/example3.mp4')

while cap.isOpened():
    # 프레임 읽기
    success, frame = cap.read()

    # 성공시 프레임 추론
    if success:
        results, infer_time = inference(onnx_model, frame, conf=0.2)

        cv2.namedWindow('YOLOv8m Tracking', cv2.WINDOW_NORMAL)
        cv2.imshow("YOLOv8m Tracking", results[..., ::-1]) # RGB => BGR
        print(f'추론시간 {infer_time}ms')

        # 키보드 q 누르면 중간 종료
        if cv2.waitKey(1) & 0xFF == ord("q"):
            cv2.destroyAllWindows()
            cap.release()
            break
    
    # 프레임 전부 읽어들이면 종료
    else:
        cv2.destroyAllWindows()
        cap.release()
        break