In [1]:
#!pip install torch
#!pip install torchvision
#!pip install imgaug
import torch      #pytorch
import torch.nn as nn     #pytorch network
from torch.utils.data import Dataset, DataLoader      #pytorch dataset
from torch.utils.tensorboard import SummaryWriter     #tensorboard
import torchvision      #torchvision
import torch.optim as optim     #pytorch optimizer
import numpy as np      #numpy
import matplotlib.pyplot as plt     #matplotlib(이미지 표시를 위해 필요)
from collections import OrderedDict     #python라이브러리 (라벨 dictionary를 만들 때 필요)
import os     #os
import xml.etree.ElementTree as Et      #Pascal xml을 읽어올 때 필요
from xml.etree.ElementTree import Element, ElementTree
import cv2      #opencv (box 그리기를 할 때 필요)
from PIL import Image     #PILLOW (이미지 읽기)
import time     #time
import imgaug as ia     #imgaug
from imgaug import augmenters as iaa
from torchvision import transforms      #torchvision transform
import pandas as pd
#GPU연결
if torch.cuda.is_available():
  device = torch.device('cuda:0')
else:
  device = torch.device('cpu')
print(device)

cpu


In [2]:
from utils import CLASS_NAME_TO_ID, visualize

In [3]:
data_dir = './DRIVING-DATASET/Detection/' # 안에 image, train, val, df.csv가 있음 데이터가 나눠지면 수정예정
data_df = pd.read_csv('all_data.csv')

In [4]:
data_df[data_df['filename'].str.contains('2012-09-11_15_16_58')]

Unnamed: 0,filename,center_x,center_y,width,height,angle,occupied,x_max,x_min,y_max,y_min
0,2012-09-11_15_16_58_1,300,207,55,32,-74,1.0,324,278,230,185
1,2012-09-11_15_16_58_2,332,209,56,33,-77,0.0,355,310,233,185
2,2012-09-11_15_16_58_3,366,208,52,32,-77,1.0,388,345,233,185
3,2012-09-11_15_16_58_4,398,207,54,36,-79,0.0,421,375,232,184
4,2012-09-11_15_16_58_5,430,210,50,31,-75,1.0,452,409,232,187
...,...,...,...,...,...,...,...,...,...,...,...
95,2012-09-11_15_16_58_96,441,530,67,53,-77,0.0,475,408,561,504
96,2012-09-11_15_16_58_97,494,537,70,54,-79,0.0,527,461,567,507
97,2012-09-11_15_16_58_98,549,534,64,55,-82,0.0,581,518,563,504
98,2012-09-11_15_16_58_99,602,536,64,47,-85,0.0,629,576,567,505


In [None]:
image_path = os.path.join('../DRIVING-DATASET/Detection/images/', image_file) # 모든 jpg 파일 중 첫번째 파일의 path를 저장

In [9]:
class Detection_dataset() :
    def init(self, data_dir, phase, transformer = None) :
        self.data_dir = data_dir
        self.phase = phase
        self.transformer = transformer
        self.data_df = pd.read_csv(os.path.join(self.data_dir, 'all_data.csv'))
        self.image_files = [fn for fn in os.listdir(os.path.join(self.data_dir, phase)) if fn.endswith('jpg')]

    def len(self) :
        return len(self.image_files)

    def getitem(self, index) :
        filename, image = self.get_image(index) # index의 이미지 파일 제목, 이미지
        bboxes, class_ids = self.get_label(filename) # ['XMin', 'YMin', 'XMax', 'YMax'], class_ids

        if self.transformer :
            image = self.transformer(image)

        target = {}
        target['boxes'] = torch.Tensor(bboxes).float()
        target['labels'] = torch.Tensor(class_ids).long()
        return image, target, filename # target에는 박스 좌표와 라벨이 들어가 있음

    def get_image(self, index) :
        filename = self.image_files[index]
        image_path = os.path.join(self.data_dir, self.phase, filename)
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return filename, image

    def get_label(self, filename) :
        image_id = filename.split('.')[0]
        meta_data = self.data_df[self.data_df['filename'].str.contains(image_id)]
        class_ids = meta_data['occupied'].values
        bboxes = meta_data[['x_min', 'y_min', 'x_max', 'y_max']].values
        return bboxes, class_ids # 하나의 사진 안에 들어있는 바운딩 박스(들)의 좌표와 라벨(1, 0)

In [None]:
data_dir = './DRIVING-DATASET/Detection/'
dataset = Detection_dataset(data_dir = data_dir, phase = 'train', transformer=None)

In [None]:
len(dataset)

In [None]:
IMAGE_SIZE = 448 

transformer = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(size = (IMAGE_SIZE, IMAGE_SIZE)),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
data_dir = './DRIVING-DATASET/Detection/'
transformed_dataset = Detection_dataset(data_dir=data_dir, phase = 'train', transformer=transformer)

In [None]:
from torchvision.utils import make_grid

In [None]:
np_image = make_grid(image, normalize = True).cpu().permute(1, 2, 0).numpy()

boxes = target['boxes'].numpy() 
class_ids = target['labels'].numpy() 

n_obj = boxes.shape[0] 
bboxes = np.zeros(shape = (n_obj, 4), dtype = np.float32) 
bboxes[:, 0:2] = (boxes[:, 0:2] + boxes[:, 2:4]) / 2 
bboxes[:, 2:4] = boxes[:, 2:4] - boxes[:, 0:2]

canvas = visualize(np_image, bboxes, class_ids)

plt.figure(figsize = (6, 6))
plt.imshow(canvas)
plt.show() 

In [None]:
from torch.utils.data import DataLoader

In [None]:
def collate_fn(batch) :
    image_list = []
    target_list = []
    filename_list = []
    
    for a, b, c in batch :
        image_list.append(a)
        target_list.append(b)
        filename_list.append(c)
    
    return image_list, target_list, filename_list

In [None]:
data_dir = './DRIVING-DATASET/Detection'
BATCH_SIZE = 6

trainset = Detection_dataset(data_dir = data_dir, phase = 'train', transformer = transformer)
trainloader = DataLoader(trainset, batch_size = BATCH_SIZE, shuffle = True, collate_fn = collate_fn)

In [None]:
for index, batch in enumerate(trainloader) :
    images = batch[0]
    targets = batch[1]
    filnames = batch[2]
    
    if index == 0 :
        break

In [None]:
def build_dataloader(data_dir, batch_size = 4, image_size = 448) :
    dataloaders = {}
    transformer = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(size = (image_size, image_size)),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    train_dataset = Detection_dataset(data_dir, 'train', transformer)
    dataloaders['train'] = DataLoader(train_dataset, batch_size = batch_size, shuffle = True, collate_fn = collate_fn)
    
    val_dataset = Detection_dataset(data_dir, 'val', transformer)
    dataloaders['val'] = DataLoader(val_dataset, batch_size = batch_size, shuffle = True, collate_fn = collate_fn)
    
    return dataloaders

In [None]:
data_dir = './DRIVING-DATASET/Detection/'
dloaders = build_dataloader(data_dir, batch_size = 4, image_size = 448)

for phase in ['train', 'val'] :
    for index, batch in enumerate(dloaders[phase]) :
        images = batch[0]
        targets = batch[1]
        filnames = batch[2]
        print(len(filnames))
        if index == 0 :
            break

# 블로그에서 복사해온 모델링 코드 부분

```
backbone = torchvision.models.vgg16(pretrained=True).features[:-1]
backbone_out = 512
backbone.out_channels = backbone_out

anchor_generator = torchvision.models.detection.rpn.AnchorGenerator(sizes=((128, 256, 512),),aspect_ratios=((0.5, 1.0, 2.0),))

resolution = 7
roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=['0'], output_size=resolution, sampling_ratio=2)

box_head = torchvision.models.detection.faster_rcnn.TwoMLPHead(in_channels= backbone_out*(resolution**2),representation_size=4096) 
box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(4096,21) #21개 class

model = torchvision.models.detection.FasterRCNN(backbone, num_classes=None,
                   min_size = 600, max_size = 1000,
                   rpn_anchor_generator=anchor_generator,
                   rpn_pre_nms_top_n_train = 6000, rpn_pre_nms_top_n_test = 6000,
                   rpn_post_nms_top_n_train=2000, rpn_post_nms_top_n_test=300,
                   rpn_nms_thresh=0.7,rpn_fg_iou_thresh=0.7,  rpn_bg_iou_thresh=0.3,
                   rpn_batch_size_per_image=256, rpn_positive_fraction=0.5,
                   box_roi_pool=roi_pooler, box_head = box_head, box_predictor = box_predictor,
                   box_score_thresh=0.05, box_nms_thresh=0.7,box_detections_per_img=300,
                   box_fg_iou_thresh=0.5, box_bg_iou_thresh=0.5,
                   box_batch_size_per_image=128, box_positive_fraction=0.25
                 )
#roi head 있으면 num_class = None으로 함

for param in model.rpn.parameters():
  torch.nn.init.normal_(param,mean = 0.0, std=0.01)

for name, param in model.roi_heads.named_parameters():
  if "bbox_pred" in name:
    torch.nn.init.normal_(param,mean = 0.0, std=0.001)
  elif "weight" in name:
    torch.nn.init.normal_(param,mean = 0.0, std=0.01)
  if "bias" in name:
    torch.nn.init.zeros_(param)
```

# 코드에 관한 설명

### 모델을 만드는 코드입니다. backbone으로 VGG16을 사용하며 마지막 max pooling층은 제거해 줍니다. Faster RCNN을 사용하기 위해서는 fully connected layer를 만들기 위해 최종 backbone output채널이 512임을 알려주어야 합니다.
### 이후 anchor generator, roi pooler, box head, box predictor를 각각 만들어 줍니다.
### box head는 Fast RCNN에서 처음 두 FC layer에 해당하는 층이고 box predictor는 예측을 하는 FC layer입니다.

### 모델은 torchvision.models.detection에 있는 FasterRCNN을 사용합니다. 입력해야 하는 값을 point 에 있는 값들을 참고하여 입력하면 됩니다. default로 None인 항목 중 point에 없는 항목은 그대로 두어도 논문과 같거나 큰 영향이 없는 값들입니다.
### 마지막으로 weight와 bias를 초기화합니다. 참고로 box_score_thresh와 box_nms_thresh는 예측때 필요한 값이므로 일단 임의의 값을 입력합니다.

In [None]:
from torchvision import models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor # 모델의 헤더를 수정할 수 있게 해주는 클래스

In [None]:
in_features = model.roi_heads.box_predictor.cls_score.in_features

In [None]:
num_classes = 2
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [None]:
def build_model(num_classes) :
    model = models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

In [None]:
NUM_CLASSES = 2
model = build_model(num_classes = NUM_CLASSES)

In [None]:
phase = 'train'
model.train() # Sets the module in training mode.

for index, batch in enumerate(dloaders[phase]) :
    images = batch[0]
    targets = batch[1]
    filenames = batch[2]
    
    images = list(image for image in images)
    targets = [{k : v for k, v in t.items()} for t in targets]
    
    loss = model(images, targets)
    if index == 0 :
        break

In [None]:
loss
# 어떤 object인지 확인 (위에 거 두 개)
# object가 있는지 확인 (밑에 거 두 개)

In [None]:
from collections import defaultdict

def train_one_epoch(dataloaders, model, optimizer, device) :
    train_loss = defaultdict(float) # 딕셔너리를 만들 때 안에 들어가는 데이터들을 float으로 설정
    val_loss = defaultdict(float)
    
    model.train()
    
    for phase in ['train', 'val'] :
        for index, batch in enumerate(dataloaders[phase]) :
            images = batch[0]
            targets = batch[1]
            filenames = batch[2]
            
            images = list(image for image in images)
            targets = [{k : v for k, v in t.items()} for t in targets]
            
            with torch.set_grad_enabled(phase == 'train') :
                loss = model(images, targets)
            total_loss = sum(each_loss for each_loss in loss.values())
            
            if phase == 'train' :
                optimizer.zero_grad()
                total_loss.backward()
                optimizer.step()
                
                if (index > 0) and (index % VERBOSE_FREQ) == 0 :
                    text = f"{index}/{len(dataloaders[phase])} - "
                    for k, v in loss.items() :
                        text += f"{k}: {v.item():.4f}"
                    print(text)
                    
                for k, v in loss.items() :
                    train_loss[k] += v.item()
                train_loss['total_loss'] += total_loss.item()
            
            else :
                for k, v in loss.items() :
                    val_loss[k] += v.item()
                val_loss['total_loss'] += total_loss.item()
    
    for k in train_loss.keys() :
        train_loss[k] /= len(dataloaders['train'])
        val_loss[k] /= len(dataloaders['val'])
    return train_loss, val_loss

In [None]:
from utils import save_model

In [None]:
data_dir = './DRIVING-DATASET/Detection/'
is_cuda = True

NUM_CLASSES = 2
IMAGE_SIZE = 448
BATCH_SIZE = 6
VERBOSE_FREQ = 200
# DEVICE = torch.device('cuda' if torch.cuda.is_available and is_cuda else 'cpu')
DEVICE = 'cpu'
dataloaders = build_dataloader(data_dir = data_dir, batch_size = BATCH_SIZE, image_size = IMAGE_SIZE)
model = build_model(num_classes = NUM_CLASSES)
model = model.to(DEVICE)
optimizer = torch.optim.SGD(model.parameters(), lr = 0.001, momentum = 0.9)

In [None]:
num_epochs = 30
train_losses = []
val_losses = []

for epoch in range(1, num_epochs) :
    
    train_loss, val_loss = train_one_epoch(dataloaders, model, optimizer, DEVICE)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    print(f"{epoch+1}/{num_epochs} - Train Loss : {train_loss['total_loss']:.4f}, Val_Loss : {val_loss['total_loss']:.4f}")
    
    if (epoch+1)%10 == 0 :
        save_model(model.state_dict(), f'model_{epoch+1}.pth')

In [None]:
tr_loss_classifier = []
tr_loss_box_reg = []
tr_loss_objectness = []
tr_loss_rpn_box_reg = []
tr_loss_total = []
for tr_loss in train_losses:
    tr_loss_classifier.append(tr_loss['loss_classifier'])
    tr_loss_box_reg.append(tr_loss['loss_box_reg'])
    tr_loss_objectness.append(tr_loss['loss_objectness'])
    tr_loss_rpn_box_reg.append(tr_loss['loss_rpn_box_reg'])
    tr_loss_total.append(tr_loss['total_loss'])
val_loss_classifier = []
val_loss_box_reg = []
val_loss_objectness = []
val_loss_rpn_box_reg = []
val_loss_total = []
for vl_loss in val_losses:
    val_loss_classifier.append(vl_loss['loss_classifier'])
    val_loss_box_reg.append(vl_loss['loss_box_reg'])
    val_loss_objectness.append(vl_loss['loss_objectness'])
    val_loss_rpn_box_reg.append(vl_loss['loss_rpn_box_reg'])
    val_loss_total.append(vl_loss['total_loss'])

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(tr_loss_total, label="train_total_loss")
plt.plot(tr_loss_classifier, label="train_loss_classifier")
plt.plot(tr_loss_box_reg,  label="train_loss_box_reg")
plt.plot(tr_loss_objectness, label="train_loss_objectness")
plt.plot(tr_loss_rpn_box_reg,  label="train_loss_rpn_box_reg")

plt.plot(val_loss_total, label="val_total_loss")
plt.plot(val_loss_classifier, label="val_loss_classifier")
plt.plot(val_loss_box_reg,  label="val_loss_box_reg")
plt.plot(val_loss_objectness, label="val_loss_objectness")
plt.plot(val_loss_rpn_box_reg,  label="val_loss_rpn_box_reg")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.grid("on")
plt.legend(loc='upper right')
plt.tight_layout()

In [None]:
def load_model(ckpt_path, num_classes, device) :
    checkpoint = torch.load(ckpt_path, map_location = device)
    model = build_model(num_classes = num_classes)
    model.load_state_dict(checkpoint)
    model = model.to(device)
    model.eval() # Sets the module in evaluation mode.
                 # This is equivalent with self.train(False)
    return model

In [None]:
is_cuda = True

NUM_CLASSES = 2
DEVICE = torch.device('cuda' if is_cuda and torch.cuda.is_available() else 'cpu')
DEVICE = 'cpu'
data_dir = './DRIVING-DATASET/Detection/'
dataloaders = build_dataloader(data_dir, batch_size=1)
num_classes = len(CLASS_NAME_TO_ID)

model = load_model(ckpt_path='./trained_model/model_30.pth', num_classes = NUM_CLASSES, device = DEVICE)

In [None]:
from torchvision.ops import nms

In [None]:
def postprocess(prediction, conf_thres = 0.2, IoU_threshold = 0.1) :
    pred_box = prediction['boxes'].cpu().detach().numpy()
    pred_label = prediction['labels'].cpu().detach().numpy()
    pred_conf = prediction['scores'].cpu().detach().numpy()
    
    conf_thres = 0.2
    valid_index = pred_conf > conf_thres
    pred_box = pred_box[valid_index]
    pred_label = pred_label[valid_index]
    pred_conf = pred_conf[valid_index]
    
    valid_index = nms(torch.tensor(pred_box.astype(np.float32)), torch.tensor(pred_conf), IoU_threshold)
    pred_box = pred_box[valid_index.numpy()]
    pred_conf = pred_conf[valid_index.numpy()]
    pred_label = pred_label[valid_index.numpy()]
    
    return np.concatenate((pred_box, pred_conf[:, np.newaxis], pred_label[:, np.newaxis]), axis = 1)

In [None]:
pred_images = []
pred_labels =[]
for index, (images, _, filenames) in enumerate(dataloaders["val"]):
    images = list(image.to(DEVICE) for image in images)
    filename = filenames[0]
    image = make_grid(images[0].cpu().detach(), normalize=True).permute(1,2,0).numpy()
    image = (image * 255).astype(np.uint8)
    with torch.no_grad():
        prediction = model(images)
    prediction = postprocess(prediction[0])
    prediction[:, 2].clip(min=0, max=image.shape[1])
    prediction[:, 3].clip(min=0, max=image.shape[0])
    xc = (prediction[:, 0] + prediction[:, 2])/2
    yc = (prediction[:, 1] + prediction[:, 3])/2
    w = prediction[:, 2] - prediction[:, 0]
    h = prediction[:, 3] - prediction[:, 1]
    cls_id = prediction[:, 5]
    prediction_yolo = np.stack([xc,yc, w,h, cls_id], axis=1)
    pred_images.append(image)
    pred_labels.append(prediction_yolo)
    if index == 1:
        break

# 모델 테스트 부분 일단 복사만 해옴 

In [None]:
from time import time

In [None]:
video_path = './DRIVING-DATASET/sample_video.mp4'

In [None]:
transformer = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(size=(IMAGE_SIZE, IMAGE_SIZE)),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
@torch.no_grad()
def model_predict(image, model):
    tensor_image = transformer(image)
    tensor_image = tensor_image.to(DEVICE)
    prediction = model([tensor_image])
    return prediction

In [None]:
vid = cv2.VideoCapture(video_path)
while (vid.isOpened()):
    ret, frame = vid.read()
    if ret:
        since = time()
        ori_h, ori_w = frame.shape[:2]
        image = cv2.resize(frame, dsize=(IMAGE_SIZE, IMAGE_SIZE))
        prediction = model_predict(image, model)
        prediction = postprocess(prediction[0])
        prediction[:, [0,2]] *= (ori_w/IMAGE_SIZE)
        prediction[:, [1,3]] *= (ori_h/IMAGE_SIZE)
        prediction[:, 2].clip(min=0, max=ori_w)
        prediction[:, 3].clip(min=0, max=ori_h)
        xc = (prediction[:, 0] + prediction[:, 2])/2
        yc = (prediction[:, 1] + prediction[:, 3])/2
        w = prediction[:, 2] - prediction[:, 0]
        h = prediction[:, 3] - prediction[:, 1]
        cls_id = prediction[:, 5]
        prediction_yolo = np.stack([xc,yc, w,h, cls_id], axis=1)
        text= f"{(time() - since)*1000:.0f}ms/image"
        canvas = visualize(frame, prediction_yolo[:, 0:4], prediction_yolo[:, 4])
        cv2.putText(canvas, text, (20, 40), cv2.FONT_HERSHEY_PLAIN, 2, (255, 255, 255), 2)
        cv2.imshow('camera', canvas)
        key = cv2.waitKey(1)
        if key == 27:
            break
        if key == ord('s'):
            cv2.waitKey()
vid.release()
cv2.destroyAllWindows()