<a href="https://colab.research.google.com/github/yukinaga/object_detection/blob/main/section_2/01_faster_rcnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### annotationデータ変換

In [None]:
import glob
import os
import json
import pandas as pd
import shutil

In [None]:
base_dir = 'datasets/annotation_bbox'
org_dir = 'datasets/annotations'
test_ann_path  = f'{base_dir}/test-annotations-bbox.csv'
train_ann_path = f'{base_dir}/train-annotations-bbox.csv'
val_ann_path   = f'{base_dir}/val-annotations-bbox.csv'
test_ann_path_org = f'{org_dir}/test-annotations-bbox.csv'
df_test  = pd.read_csv(test_ann_path)
df_train = pd.read_csv(train_ann_path)
df_val   = pd.read_csv(val_ann_path)
df_test_org = pd.read_csv(test_ann_path_org)

In [None]:
label_dict = {
    "/m/01g317": "person",
    "/m/0k4j"  : "car",
    "/m/0c9ph5": "flower",
    "/m/0bt9lr": "dog",
    "/m/0cgh4" : "building"
}

In [None]:
for key, value in label_dict.items():
    df_test  = df_test.replace(key, value)
    df_train = df_train.replace(key, value)
    df_val   = df_val.replace(key, value)
    df_test_org = df_test_org.replace(key, value)

In [None]:
label_dict4df = {
    "building": 1,
    "car": 2,
    "person": 3,
    "flower": 4,
    "dog": 5
}
df_label = pd.DataFrame(label_dict4df.items(), columns=['LabelName', 'CategoryID'])
df_label

In [None]:
open_images_dir = 'datasets/open_images_baobab'
dataset_kinds = ['test', 'train', 'val']

In [None]:
# データセット（画像）の元の置き場所のパスをdict型で保存
dataset_path = {}
for dataset_kind in dataset_kinds:
    dataset_path[dataset_kind] = f'{open_images_dir}/all/{dataset_kind}'

### 画像をallに移動

In [None]:
# open_imagesのまとめディレクトリallを作成
all_dir_path = f'{open_images_dir}/all'
if not os.path.isdir(all_dir_path):
    os.mkdir(all_dir_path)
for dataset_kind in dataset_kinds:
    kind_dir_path = f'{all_dir_path}/{dataset_kind}'
    if not os.path.isdir(kind_dir_path):
        os.mkdir(kind_dir_path)

In [None]:
# open_imagesの中で各カテゴリからallへまとめる
for value in label_dict.values():
    for dataset_kind in dataset_kinds:
        tar_dir = f'{open_images_dir}/{value}/{dataset_kind}/image'
        files = glob.glob(f'{tar_dir}/*.jpg')
        for file in files:
            shutil.copy(file, f'{open_images_dir}/all/{dataset_kind}')

In [None]:
print(len(glob.glob(f'{all_dir_path}/test/*.jpg')))
print(len(glob.glob(f'{all_dir_path}/train/*.jpg')))
print(len(glob.glob(f'{all_dir_path}/val/*.jpg')))

### GoogleOpenImagesのアノテーションデータからFaster R-CNN用のアノテーションデータに変換

In [None]:
# DataFrameにCategoryIDを追加
df_test  = pd.merge(df_test, df_label, on='LabelName').sort_values('ImageID')
df_train = pd.merge(df_train, df_label, on='LabelName').sort_values('ImageID')
df_val   = pd.merge(df_val, df_label, on='LabelName').sort_values('ImageID')
df_test_org = pd.merge(df_test_org, df_label, on='LabelName').sort_values('ImageID')

df_test  = df_test.sort_values('ImageID').reset_index(drop=True)
df_train = df_train.sort_values('ImageID').reset_index(drop=True)
df_val   = df_val.sort_values('ImageID').reset_index(drop=True)
df_test_org = df_test_org.sort_values('ImageID').reset_index(drop=True)

In [None]:
# DataFrameごとの画像枚数を表示
print(len(df_test['ImageID'].unique()))
print(len(df_train['ImageID'].unique()))
print(len(df_val['ImageID'].unique()))
print(len(df_test_org['ImageID'].unique()))

In [None]:
# DataFrameをdict型に保存
df_dict = {
    'test' : df_test,
    'train': df_train,
    'val'  : df_val,
    'test_org': df_test_org
}

In [None]:
# 指定拡張子のファイル名取得
def glob_file_names(dir_path, ext):
    files = glob.glob(f'{dir_path}/*.{ext}')
    files = [file.replace('\\', '/') for file in files]
    return files

In [None]:
# 画像のパスをdict型で取得しつつ、画像枚数を調べる
files_dict = {}
for dataset_kind in dataset_kinds:
    files_dict[dataset_kind] = glob_file_names(f'{dataset_path[dataset_kind]}', 'jpg')
for kind, files in files_dict.items():
    print(f'{kind}:\t{len(files)}')
files_dict['test_org'] = glob_file_names(f'{dataset_path["test"]}', 'jpg')
print(f'test_org: {len(files_dict["test_org"])}')

In [None]:
from PIL import Image

In [None]:
# 画像のサイズ情報をデータフレームに追加する
for dataset_kind, files in files_dict.items():
    img_size_list = []
    for file in files:
        img = Image.open(file)
        image_id = file.split('/')[-1].split('.')[0]
        img_size = [f'{image_id}', img.size[0], img.size[1]]
        img_size_list.append(img_size)
    df_size = pd.DataFrame(img_size_list, columns=['ImageID', 'Width', 'Height'])
    df_dict[dataset_kind] = pd.merge(df_dict[dataset_kind], df_size, on='ImageID')
    df_dict[dataset_kind]  = df_dict[dataset_kind].sort_values('ImageID').reset_index(drop=True)
    
df_dict[dataset_kind]

### カスタムデータセットを作成

In [None]:
import torch
from torch.utils.data import DataLoader

import torchvision
import torchvision.transforms as transforms
from torchvision.utils import draw_bounding_boxes
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

import numpy as np
import matplotlib.pyplot as plt

In [None]:
# カスタムデータセットに与えるデータを作成するクラス
class GoogleOpenImagesDataset(torch.utils.data.Dataset):
    
    def __init__(self, df, image_dir, transform=None):
        super().__init__()
        self.image_ids = df['ImageID'].unique()
        self.df = df
        self.image_dir = image_dir
        self.transform = transform
        
    def __getitem__(self, index):
        if self.transform is None:
            self.transform = transforms.Compose([
                transforms.ToTensor()
            ])

        # 入力画像の読み込み
        image_id = self.image_ids[index]
        image = Image.open(f'{self.image_dir}/{image_id}.jpg')
        image = self.transform(image)
        
        # アノテーションデータの読み込み
        df_unique_id = self.df[self.df['ImageID'] == image_id]
        width = df_unique_id['Width'].unique()[0]
        height = df_unique_id['Height'].unique()[0]
        boxes_arr = df_unique_id[['XMin', 'YMin', 'XMax', 'YMax']].values
        boxes_arr[:, 0] = boxes_arr[:, 0] * width
        boxes_arr[:, 1] = boxes_arr[:, 1] * height
        boxes_arr[:, 2] = boxes_arr[:, 2] * width
        boxes_arr[:, 3] = boxes_arr[:, 3] * height
        boxes = torch.tensor(boxes_arr, dtype=torch.int64)
        for box in boxes:
            box[2] = box[2] + 1 if (box[2] - box[0]) == 0 else box[2]
            box[3] = box[3] + 1 if (box[3] - box[1]) == 0 else box[3]
        labels = torch.tensor(df_unique_id['CategoryID'].values, dtype=torch.int64)
        target = {}
        target['boxes'] = boxes
        target['labels']= labels
        
        return image, target
    
    def __len__(self):
        return self.image_ids.shape[0]

### データセットの読み込み

In [None]:
dataset_train = GoogleOpenImagesDataset(df_dict['train'], dataset_path['train'])
dataset_val = GoogleOpenImagesDataset(df_dict['val'], dataset_path['val'])
dataset_test = GoogleOpenImagesDataset(df_dict['test'], dataset_path['test'])
dataset_test_org = GoogleOpenImagesDataset(df_dict['test_org'], dataset_path['test'])

### DataLoaderの設定

In [None]:
data_loader_train =  DataLoader(dataset_train, batch_size=1, shuffle=True)
data_loader_val =  DataLoader(dataset_val, batch_size=1, shuffle=False)
data_loader_test =  DataLoader(dataset_test, batch_size=1, shuffle=False)
data_loader_test_org =  DataLoader(dataset_test_org, batch_size=1, shuffle=False)

### ターゲットの表示

In [None]:
def show_boxes(image, boxes, names):
    drawn_boxes = draw_bounding_boxes(image, boxes, labels=names)

    plt.figure(figsize = (16,16))
    plt.imshow(np.transpose(drawn_boxes, (1, 2, 0)))  # チャンネルを一番後ろに
    plt.tick_params(labelbottom=False, labelleft=False, bottom=False, left=False)  # ラベルとメモリを非表示に
    plt.show()

In [None]:
dataiter = iter(data_loader_test)  # イテレータ

In [None]:
#dataiter = iter(data_loader_test)  # イテレータ
image, target = dataiter.next()  # バッチを取り出す
print(target)

image = image[0]
image = (image*255).to(torch.uint8)  # draw_bounding_boxes関数の入力は0-255

boxes = target["boxes"][0]

labels = target["labels"][0]
names = [df_label['LabelName'][label.item()] for label in labels]

show_boxes(image, boxes, names)

### モデルの構築

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes=len(df_label)+1  # 背景も含めて分類するため1を加える
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.cuda()  # GPU対応

### 学習

In [None]:
# モデルの保存場所の設定
data_path = 'save_model'
# ディレクトリを確認して作成
_dir_path = f'{data_path}'
if not os.path.exists(_dir_path):
    os.mkdir(_dir_path)
epoch_loss_path = f'{data_path}/losses_every_epoch.txt'
iter_loss_path = f'{data_path}/losses_every_iteration.txt'

In [None]:
# 最適化アルゴリズム
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)

loss_list = []
val_loss_list = []
model.train()  # 訓練モード
epochs = 100
for epoch in range(epochs):
    # train
    losses = []
    for i, (image, target) in enumerate(data_loader_train):
        image = [img.cuda() for img in image]  # GPU対応

        boxes = target["boxes"][0].cuda()
        labels = target["labels"][0].cuda()
        target = [{"boxes":boxes, "labels":labels}]  # ターゲットは辞書を要素に持つリスト

        loss_dic = model(image, target)
        loss = sum(loss for loss in loss_dic.values())  # 誤差の合計を計算

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())

        if (i+1)%100 == 0:  # 100回ごとに経過を表示
            print("Train epoch:", epoch,  "iteration:", i,  "loss:", loss.item())
    
    # 10epochごとにmodelを保存
    (epoch+1)%10 == 0:
        torch.save(model.state_dict(), model_path)
        loss_list.append(np.mean(losses))

    # validation
    val_losses =  []
    for i, (val_image, val_target) in enumerate(data_loader_val):
        val_image = [img.cuda() for img in val_image]  # GPU対応

        val_boxes = val_target["boxes"][0].cuda()
        val_labels = val_target["labels"][0].cuda()
        val_target = [{"boxes":val_boxes, "labels":val_labels}]  # ターゲットは辞書を要素に持つリスト

        val_loss_dic = model(val_image, val_target)
        val_loss = sum(val_loss for val_loss in val_loss_dic.values()) # 誤差の合計を計算

        val_losses.append(val_loss.item())

        if (i+1)%100 == 0:  # 100回ごとに経過を表示
            print("Val epoch:", epoch,  "iteration:", i,  "loss:", val_loss.item())
    
    val_loss_list.append(np.mean(val_losses))

    with open(loss_path, 'w') as f:
        for i, loss in enumerate(loss_list):
            f.write(f'{i} train_loss: {loss} val_loss: {val_loss_list[i]}/n')

### 検出

In [None]:
load_model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes=len(df_label)+1  # 背景も含めて分類するため1を加える
in_features = load_model.roi_heads.box_predictor.cls_score.in_features
load_model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
load_model_pth = f'{data_path}/model_epoch100.pth'
load_model.load_state_dict(torch.load(load_model_pth))
load_model.cuda()

In [None]:
dataiter = iter(data_loader_test)  # イテレータ

In [None]:
# 1枚ずつ検出テスト
image, target = dataiter.next()  # バッチを取り出す

image = [img.cuda() for img in image]  # GPU対応

load_model.eval()
predictions = load_model(image)
pred = predictions[0]

scores = pred['scores'].cpu().tolist()
boxes = pred['boxes'].cpu().tolist()
labels = pred['labels'].cpu().tolist()
names = [df_label['LabelName'][label] for label in labels]

iou50_scores = []
iou50_boxes = []
iou50_labels = []
iou50_names = []

for i, score in enumerate(scores):
    if score >= 0.5:
        iou50_scores.append(score)
        iou50_boxes.append(boxes[i])
        iou50_labels.append(labels[i])
        iou50_names.append(names[i])
        print(f'{names[i]} {labels[i]} {score:.02f} {boxes[i][0]:.01f} {boxes[i][1]:.01f} {boxes[i][2]:.01f} {boxes[i][3]:.01f}')
iou50_boxes = torch.tensor(iou50_boxes)

image = (image[0]*255).to(torch.uint8).cpu()
show_boxes(image, iou50_boxes, iou50_names)


In [None]:
# 検出結果の出力先設定
dets_dir_path = f'{data_path}/dets_result'
# ディレクトリを確認して作成
if not os.path.exists(dets_dir_path):
    os.mkdir(dets_dir_path)
img_names = df_test['ImageID'].unique()

In [None]:
# 検出結果をファイルに出力
for i, (image, target) in enumerate(data_loader_test_org):

    image = [img.cuda() for img in image]  # GPU対応
    img_name = img_names[i]
    # うまく読めなかった画像(ImageID)をスキップ
    if img_name == 'a8601676c86366e3':
        continue
    print(f'{i}: {img_name}')
    test_model.eval()
    predictions = test_model(image)
    pred = predictions[0]

    scores = pred['scores'].cpu().tolist()
    boxes = pred['boxes'].cpu().tolist()
    labels = pred['labels'].cpu().tolist()
    names = [df_label['LabelName'][label] for label in labels]

    iou50_scores = []
    iou50_boxes = []
    iou50_labels = []
    iou50_names = []

    dets_file_path = f'{dets_dir_path}/{img_name}.txt'
    with open(dets_file_path, 'w') as f:
        for i, score in enumerate(scores):
            if score >= 0.5:
                iou50_scores.append(score)
                iou50_boxes.append(boxes[i])
                iou50_labels.append(labels[i])
                iou50_names.append(names[i])
                pred_data = f'{names[i]} {score:.03f} {int(boxes[i][0])} {int(boxes[i][1])} {int(boxes[i][2])} {int(boxes[i][3])}'
                f.write(f'{pred_data}\n')
                print(pred_data)