In [1]:
import os
import copy
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
tqdm.pandas()

In [2]:
pattern_date = '2023-12-24'
root_dir = os.getcwd()
annotation_path = f'{root_dir}/data/outputs/01.format_and_cv/annotation-{pattern_date}.csv'
helmet_path = f'{root_dir}/data/outputs/03.helmet_detection/03.eval_coatnet_cls7_use_anno/pred_head_class-{pattern_date}.csv'
highplace_path = f'{root_dir}/data/outputs/04.highplace_detection/06.kosmos2_pred_highplace_detection/pred_highplace_class-{pattern_date}.csv'
safetybelt_path = f'{root_dir}/data/outputs/05.safetybelt_detection/02.eval_coatnet_cls3_use_anno/pred_safetybelt_class-{pattern_date}.csv'

In [3]:
def calculate_intersection_per_label(pred_bbox, label_bbox):
    # バウンディングボックスの座標情報を取得
    x1_1, y1_1, x2_1, y2_1 = label_bbox
    x1_2, y1_2, x2_2, y2_2 = pred_bbox

    # 交差する領域を計算
    x_intersection = max(0, min(x2_1, x2_2) - max(x1_1, x1_2))
    y_intersection = max(0, min(y2_1, y2_2) - max(y1_1, y1_2))
    intersection_area = x_intersection * y_intersection

    # 正解ラベルの面積
    area_label_bbox = (x2_1 - x1_1) * (y2_1 - y1_1)

    # 交差する領域の面積 / 正解ラベルの面積
    return intersection_area / area_label_bbox

In [4]:
def calculate_iou(box1, box2):
    # バウンディングボックスの座標情報を取得
    x1_1, y1_1, x2_1, y2_1 = box1
    x1_2, y1_2, x2_2, y2_2 = box2

    # 交差する領域を計算
    x_intersection = max(0, min(x2_1, x2_2) - max(x1_1, x1_2))
    y_intersection = max(0, min(y2_1, y2_2) - max(y1_1, y1_2))

    # 交差領域の面積と各バウンディングボックスの面積を計算
    intersection_area = x_intersection * y_intersection
    area_box1 = (x2_1 - x1_1) * (y2_1 - y1_1)
    area_box2 = (x2_2 - x1_2) * (y2_2 - y1_2)

    # IoUを計算
    iou = intersection_area / (area_box1 + area_box2 - intersection_area)

    return iou

In [5]:
all_annotation_df = pd.read_csv(annotation_path)
all_helmet_df = pd.read_csv(helmet_path)
all_highplace_df = pd.read_csv(highplace_path)
all_safetybelt_df = pd.read_csv(safetybelt_path)

In [6]:
annotation_df = all_annotation_df[
    (all_annotation_df['unique_key'].str.contains('fixed-point-camera')) |
    (all_annotation_df['unique_key'].str.contains('for-learning/2023-11-19-omaezaki-500')) |
    (all_annotation_df['unique_key'].str.contains('for-learning/2023-11-23-mie-safetybelt')) |
    (all_annotation_df['unique_key'].str.contains('evaluation'))
]

helmet_df = all_helmet_df[
    (all_helmet_df['unique_key'].str.contains('fixed-point-camera')) |
    (all_helmet_df['unique_key'].str.contains('for-learning/2023-11-19-omaezaki-500')) |
    (all_helmet_df['unique_key'].str.contains('for-learning/2023-11-23-mie-safetybelt')) |
    (all_helmet_df['unique_key'].str.contains('evaluation'))
]

highplace_df = all_highplace_df[
    (all_highplace_df['unique_key'].str.contains('fixed-point-camera')) |
    (all_highplace_df['unique_key'].str.contains('for-learning/2023-11-19-omaezaki-500')) |
    (all_highplace_df['unique_key'].str.contains('for-learning/2023-11-23-mie-safetybelt')) |
    (all_highplace_df['unique_key'].str.contains('evaluation'))
]

safetybelt_df = all_safetybelt_df[
    (all_safetybelt_df['unique_key'].str.contains('fixed-point-camera')) |
    (all_safetybelt_df['unique_key'].str.contains('for-learning/2023-11-19-omaezaki-500')) |
    (all_safetybelt_df['unique_key'].str.contains('for-learning/2023-11-23-mie-safetybelt')) |
    (all_safetybelt_df['unique_key'].str.contains('evaluation'))
]

In [7]:
iou_threshold = 0.1
annotation_safetybelt_df = annotation_df[annotation_df["label"] == 'safety belt']

new_label_list = []
for i, info_df in enumerate(annotation_df.values):
    label = info_df[2]
    if label not in ['person', 'person in high place']:
        new_label_list += [label]
        continue

    unique_key = info_df[0]
    person_bbox = info_df[3:7]
    safetybelt_bboxes = annotation_safetybelt_df[
        annotation_safetybelt_df['unique_key'] == unique_key
    ][['left', 'top', 'right', 'bottom']].values

    is_label_safetybelt = False
    for safetybelt_bbox in safetybelt_bboxes:
        iou = calculate_iou(person_bbox, safetybelt_bbox)
        if iou > iou_threshold:
            new_label_list += [f'{label} - safetybelt']
            is_label_safetybelt = True
            break
    if not is_label_safetybelt:
        new_label_list += [f'{label} - no-safetybelt']

annotation_df['label'] = new_label_list

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  annotation_df['label'] = new_label_list


In [8]:
df_per_image = pd.DataFrame(annotation_df['unique_key'].unique(), columns=['unique_key'])

In [9]:
has_labels = []
for unique_key in annotation_df['unique_key'].unique():
    _annotation_df = annotation_df[annotation_df['unique_key'] == unique_key]

    has_unique_key_labels = []
    for pred in annotation_df['label'].unique():
        if pd.isna(pred):
            continue
        if len(_annotation_df[_annotation_df['label'] == pred]) > 0:
            has_unique_key_labels += [True]
        else:
            has_unique_key_labels += [False]
    has_labels += [has_unique_key_labels]

In [10]:
df_per_image[[f'label_{i}' for i in annotation_df['label'].unique() if pd.notna(i)]] = has_labels

In [11]:
has_helmet_pred = []
for unique_key in annotation_df['unique_key'].unique():
    _helmet_df = helmet_df[helmet_df['unique_key'] == unique_key]

    has_unique_key_labels = []
    for pred in helmet_df['pred_head_class'].unique():
        if len(_helmet_df[_helmet_df['pred_head_class'] == pred]) > 0:
            has_unique_key_labels += [True]
        else:
            has_unique_key_labels += [False]
    has_helmet_pred += [has_unique_key_labels]

In [12]:
df_per_image[[f'pred_helmet_{i}' for i in helmet_df['pred_head_class'].unique()]] = has_helmet_pred

In [13]:
highplace_safetybelt_df = pd.concat([highplace_df, safetybelt_df[['pred_safetybelt_class']]], axis=1)
highplace_safetybelt_df['pred_alart'] = highplace_safetybelt_df['pred_person_in_highplace_class'] + ' - ' + highplace_safetybelt_df['pred_safetybelt_class']


In [14]:
has_highplace_safetybelt_pred = []
for unique_key in annotation_df['unique_key'].unique():
    _highplace_safetybelt_df = highplace_safetybelt_df[highplace_safetybelt_df['unique_key'] == unique_key]

    has_unique_key_labels = []
    for pred in highplace_safetybelt_df['pred_alart'].unique():
        if len(_highplace_safetybelt_df[_highplace_safetybelt_df['pred_alart'] == pred]) > 0:
            has_unique_key_labels += [True]
        else:
            has_unique_key_labels += [False]
    has_highplace_safetybelt_pred += [has_unique_key_labels]

In [15]:
df_per_image[[f'pred_highsafe_{i}' for i in highplace_safetybelt_df['pred_alart'].unique()]] = has_highplace_safetybelt_pred

In [16]:
df_per_image = pd.merge(
    df_per_image, annotation_df[['unique_key', 'validation']].drop_duplicates(), on='unique_key'
)

In [17]:
def label_helmet_alart(row):
    if (
        row['label_no-helmet'] or
        row['label_unsafe-helmet (no chin strap)'] or
        row['label_unsafe-helmet (inadequate covering)']
    ):
        return True
    else:
        return False

def pred_helmet_alart(row):
    if (
        row['pred_helmet_no-helmet'] or
        row['pred_helmet_unsafe-helmet (no chin strap)'] or
        row['pred_helmet_unsafe-helmet (inadequate covering)']
    ):
        return True
    else:
        return False

def label_highplace_safetybelt_alart(row):
    if row['label_person in high place - no-safetybelt']:
        return True
    else:
        return False

def pred_highplace_safetybelt_alart(row):
    if row['pred_highsafe_person in high place - no-safetybelt']:
        return True
    else:
        return False

def label_alart(row):
    if row['label_helmet_alart'] or row['label_highplace_safetybelt_alart']:
        return True
    else:
        return False

def pred_alart(row):
    if row['pred_helmet_alart'] or row['pred_highplace_safetybelt_alart']:
        return True
    else:
        return False

In [18]:
evaluation_df = copy.deepcopy(df_per_image[['unique_key', 'validation']])
evaluation_df['label_helmet_alart'] = df_per_image.apply(label_helmet_alart, axis=1)
evaluation_df['pred_helmet_alart'] = df_per_image.apply(pred_helmet_alart, axis=1)
evaluation_df['label_highplace_safetybelt_alart'] = df_per_image.apply(label_highplace_safetybelt_alart, axis=1)
evaluation_df['pred_highplace_safetybelt_alart'] = df_per_image.apply(pred_highplace_safetybelt_alart, axis=1)
evaluation_df['label_alart'] = evaluation_df.apply(label_alart, axis=1)
evaluation_df['pred_alart'] = evaluation_df.apply(pred_alart, axis=1)

In [19]:
evaluation_val_df = evaluation_df[evaluation_df['validation'] != 999]
evaluation_test_df = evaluation_df[evaluation_df['validation'] == 999]

In [20]:
confusion_mat = np.zeros([2, 2])
for label, pred in evaluation_val_df[['label_alart', 'pred_alart']].values:
    if label == False and pred == False:
        confusion_mat[0][0] += 1
    if label == False and pred == True:
        confusion_mat[0][1] += 1
    if label == True and pred == False:
        confusion_mat[1][0] += 1
    if label == True and pred == True:
        confusion_mat[1][1] += 1

print('学習用データ')
print(f'precision: {confusion_mat[1][1] / (confusion_mat[0][1] + confusion_mat[1][1])}')
print(f'recall:    {confusion_mat[1][1] / (confusion_mat[1][0] + confusion_mat[1][1])}')
display(
    pd.DataFrame(
        confusion_mat,
        index = [f'正解 - {i}' for i in ['アラートなし', 'アラートあり']],
        columns=[f'予測 - {i}' for i in ['アラートなし', 'アラートあり']]
    )
)

confusion_mat = np.zeros([2, 2])
for label, pred in evaluation_test_df[['label_alart', 'pred_alart']].values:
    if label == False and pred == False:
        confusion_mat[0][0] += 1
    if label == False and pred == True:
        confusion_mat[0][1] += 1
    if label == True and pred == False:
        confusion_mat[1][0] += 1
    if label == True and pred == True:
        confusion_mat[1][1] += 1

print('評価用データ')
print(f'precision: {confusion_mat[1][1] / (confusion_mat[0][1] + confusion_mat[1][1])}')
print(f'recall:    {confusion_mat[1][1] / (confusion_mat[1][0] + confusion_mat[1][1])}')
display(
    pd.DataFrame(
        confusion_mat,
        index = [f'正解 - {i}' for i in ['アラートなし', 'アラートあり']],
        columns=[f'予測 - {i}' for i in ['アラートなし', 'アラートあり']]
    )
)

学習用データ
precision: 0.9842931937172775
recall:    0.35946462715105165


Unnamed: 0,予測 - アラートなし,予測 - アラートあり
正解 - アラートなし,453.0,6.0
正解 - アラートあり,670.0,376.0


評価用データ
precision: 1.0
recall:    0.26


Unnamed: 0,予測 - アラートなし,予測 - アラートあり
正解 - アラートなし,40.0,0.0
正解 - アラートあり,37.0,13.0


In [21]:
confusion_mat = np.zeros([2, 2])
for label, pred in evaluation_val_df[['label_helmet_alart', 'pred_helmet_alart']].values:
    if label == False and pred == False:
        confusion_mat[0][0] += 1
    if label == False and pred == True:
        confusion_mat[0][1] += 1
    if label == True and pred == False:
        confusion_mat[1][0] += 1
    if label == True and pred == True:
        confusion_mat[1][1] += 1

print('学習用データ: ヘルメットアラート')
print(f'precision: {confusion_mat[1][1] / (confusion_mat[0][1] + confusion_mat[1][1])}')
print(f'recall:    {confusion_mat[1][1] / (confusion_mat[1][0] + confusion_mat[1][1])}')
display(
    pd.DataFrame(
        confusion_mat,
        index = [f'正解 - {i}' for i in ['アラートなし', 'アラートあり']],
        columns=[f'予測 - {i}' for i in ['アラートなし', 'アラートあり']]
    )
)

confusion_mat = np.zeros([2, 2])
for label, pred in evaluation_test_df[['label_helmet_alart', 'pred_helmet_alart']].values:
    if label == False and pred == False:
        confusion_mat[0][0] += 1
    if label == False and pred == True:
        confusion_mat[0][1] += 1
    if label == True and pred == False:
        confusion_mat[1][0] += 1
    if label == True and pred == True:
        confusion_mat[1][1] += 1

print('評価用データ: ヘルメットアラート')
print(f'precision: {confusion_mat[1][1] / (confusion_mat[0][1] + confusion_mat[1][1])}')
print(f'recall:    {confusion_mat[1][1] / (confusion_mat[1][0] + confusion_mat[1][1])}')
display(
    pd.DataFrame(
        confusion_mat,
        index = [f'正解 - {i}' for i in ['アラートなし', 'アラートあり']],
        columns=[f'予測 - {i}' for i in ['アラートなし', 'アラートあり']]
    )
)

学習用データ: ヘルメットアラート
precision: 0.9842105263157894
recall:    0.38556701030927837


Unnamed: 0,予測 - アラートなし,予測 - アラートあり
正解 - アラートなし,529.0,6.0
正解 - アラートあり,596.0,374.0


評価用データ: ヘルメットアラート
precision: 1.0
recall:    0.1891891891891892


Unnamed: 0,予測 - アラートなし,予測 - アラートあり
正解 - アラートなし,53.0,0.0
正解 - アラートあり,30.0,7.0


In [22]:
confusion_mat = np.zeros([2, 2])
for label, pred in evaluation_val_df[['label_highplace_safetybelt_alart', 'pred_highplace_safetybelt_alart']].values:
    if label == False and pred == False:
        confusion_mat[0][0] += 1
    if label == False and pred == True:
        confusion_mat[0][1] += 1
    if label == True and pred == False:
        confusion_mat[1][0] += 1
    if label == True and pred == True:
        confusion_mat[1][1] += 1

print('学習用データ: 安全帯アラート')
print(f'precision: {confusion_mat[1][1] / (confusion_mat[0][1] + confusion_mat[1][1])}')
print(f'recall:    {confusion_mat[1][1] / (confusion_mat[1][0] + confusion_mat[1][1])}')
display(
    pd.DataFrame(
        confusion_mat,
        index = [f'正解 - {i}' for i in ['アラートなし', 'アラートあり']],
        columns=[f'予測 - {i}' for i in ['アラートなし', 'アラートあり']]
    )
)

confusion_mat = np.zeros([2, 2])
for label, pred in evaluation_test_df[['label_highplace_safetybelt_alart', 'pred_highplace_safetybelt_alart']].values:
    if label == False and pred == False:
        confusion_mat[0][0] += 1
    if label == False and pred == True:
        confusion_mat[0][1] += 1
    if label == True and pred == False:
        confusion_mat[1][0] += 1
    if label == True and pred == True:
        confusion_mat[1][1] += 1

print('評価用データ: 安全帯アラート')
print(f'precision: {confusion_mat[1][1] / (confusion_mat[0][1] + confusion_mat[1][1])}')
print(f'recall:    {confusion_mat[1][1] / (confusion_mat[1][0] + confusion_mat[1][1])}')
display(
    pd.DataFrame(
        confusion_mat,
        index = [f'正解 - {i}' for i in ['アラートなし', 'アラートあり']],
        columns=[f'予測 - {i}' for i in ['アラートなし', 'アラートあり']]
    )
)

学習用データ: 安全帯アラート
precision: 1.0
recall:    0.022727272727272728


Unnamed: 0,予測 - アラートなし,予測 - アラートあり
正解 - アラートなし,1417.0,0.0
正解 - アラートあり,86.0,2.0


評価用データ: 安全帯アラート
precision: 0.6666666666666666
recall:    0.16666666666666666


Unnamed: 0,予測 - アラートなし,予測 - アラートあり
正解 - アラートなし,64.0,2.0
正解 - アラートあり,20.0,4.0
