In [None]:
## library import
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import re
from PIL import Image
from glob import glob
import shutil
import xml.etree.ElementTree as et
import random
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle

In [None]:
## oxford_pet.zip이 보이는지 확인
glob('oxford_pet.zip')

In [None]:
## 압축풀기
if not os.path.exists('./oxford_pet'):
  !unzip -q oxford_pet.zip -d oxford_pet

In [None]:
## 압축이 풀린 directory 확인
!ls oxford_pet

In [None]:
## directory 정보
cur_dir = os.getcwd()
data_dir = os.path.join(cur_dir, 'oxford_pet')
image_dir = os.path.join(data_dir, 'images')
bbox_dir = os.path.join(data_dir, 'annotations', 'xmls')  # BBOX 정보
seg_dir = os.path.join(data_dir, 'annotations', 'trimaps')  # Segmentation 정보

In [None]:
print('data_dir:' ,data_dir)
print('image_dir:' ,image_dir)
print('bbox_dir:', bbox_dir)
print('seg_dir:', seg_dir)

In [None]:
## image file 수 확인
image_files = [fname for fname in glob(image_dir +  '/*.jpg') if os.path.splitext(fname)[-1] == '.jpg']
print(len(image_files))

In [None]:
## localization을 위한 annotation이 되어 있는 file의 수 확인
## 위의 이미지 갯수보다 annotation XML 파일 갯수가 적다. annotation XML 파일을 기준으로 해야함.

bbox_files = [fname for fname in glob(bbox_dir +  '/*.xml') if os.path.splitext(fname)[-1] == '.xml']
len(bbox_files)

In [None]:
## 600 bbox와 매칭되는 이미지를 모우기 위한 새로운 폴더(new_images) 생성

new_image_dir = os.path.join(data_dir, 'new_images')
os.makedirs(new_image_dir, exist_ok=True)

print('new_images:', new_image_dir)

In [None]:
## 600 bbox 리스트 읽어 bbox 이름과 같은 이미지를 new_images 폴더에 복사한다.

for bbox_filename in bbox_files:
  bbox_filename = bbox_filename.split('/')[-1]
  image_name = os.path.splitext(bbox_filename)[0]
  image_file = image_dir + '/' + image_name + '.jpg'
  # print(glob(image_file))
  shutil.copy(image_file, new_image_dir)

In [None]:
## new_images 폴더에 복사된 이미지 건수를 카운트한다.

new_image_files = glob(new_image_dir + '/*')
len(new_image_files)

In [None]:
## 600개의 새로 복사된 이미지 리스트

new_image_files[:10]

In [None]:
## DataFrame 만들기

pets_df = pd.DataFrame(new_image_files)
pets_df

In [None]:
## 컬럼명 입력

pets_df.columns = ['full_path']
pets_df.head(3)

In [None]:
## full_path 컬럼에서 이미지 이름을 분리하여 file_name 컬럼명에 저장

pets_df['file_name'] = pets_df['full_path'].str.split('/').str[-1]
pets_df.head(3)

In [None]:
## file_name 컬럼에서 라벨 분리하여 label 컬럼에 저장

pets_df['label'] = pets_df['file_name'].str.replace('_\d+','').str.split('.').str[0]

In [None]:
pets_df.head(10)

In [None]:
## 이미지 파일명을 입력으로 받아, 같은 이름과 xml 확장자로, 그리고 xml 위치로 변경해서 리턴

def name_convert(col):
  bbox_fname = bbox_dir + '/' + col.replace('jpg','xml')
  return bbox_fname

In [None]:
## name_convert 함수 호출해서 이미지 파일명과 같은 이름의 xml 확장자로 만든다.

pets_df['bbox_full_path'] = pets_df['file_name'].apply(name_convert)
pets_df.head(3)

In [None]:
## pets_df 데이터 구조 파악

pets_df.info()

In [None]:
## label 분포 확인
## 각 class마다 200개 이미지 구성

pets_df['label'].value_counts()

In [None]:
## bbox_full_path 컬럼의 첫번째 데이터 가져오고 해당 내용 보기

sample_xml_file = pets_df.loc[0, 'bbox_full_path']
print(sample_xml_file)

!cat /content/oxford_pet/annotations/xmls/boxer_190.xml

In [None]:
## 1. xml Annotation 파일 읽어 이지미 크기와 Bounding box 위치을 파악
## 2. xmin, ymin, xmax, ymax 형태를  x, y(중앙), w, h 형태로 변환하여 저장

def xml_annot_getxywh(xmlfile):
  tree = et.parse(xmlfile)

  width = float(tree.find('./size/width').text)
  height = float(tree.find('./size/height').text)
  xmin = float(tree.find('./object/bndbox/xmin').text)
  ymin = float(tree.find('./object/bndbox/ymin').text)
  xmax = float(tree.find('./object/bndbox/xmax').text)
  ymax = float(tree.find('./object/bndbox/ymax').text)
  xc = (xmin + xmax) / 2.
  yc = (ymin + ymax) / 2.
  x = xc / width
  y = yc / height
  w = (xmax - xmin) / width
  h = (ymax - ymin) / height

  return x, y, w, h

In [None]:
## 샘플 xml 파일에 대한 x, y, w, h 얻어오기

xml_annot_getxywh(sample_xml_file)

In [None]:
pets_df

In [None]:
## xml Annot 파일의 x, y, w, h 위치 정보를 가져와 DataFrame에 저장한다.

pets_df['xywh'] = pets_df['bbox_full_path'].apply(xml_annot_getxywh)
pets_df.head()

In [None]:
## 튜플로 저장된 xywh 컬럼 값들을 하나씩 x, y, w, h 컬럼에 저장한다.

pets_df['x'] = pets_df['xywh'].str[0]
pets_df['y'] = pets_df['xywh'].str[1]
pets_df['w'] = pets_df['xywh'].str[2]
pets_df['h'] = pets_df['xywh'].str[3]

In [None]:
pets_df

In [None]:
## label에 대한 라벨인코딩 수행
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
pets_df['label_conv'] = le.fit_transform(pets_df['label'])
le.classes_

In [None]:
pets_df.head(3)

In [None]:
## class와 idx 간의 변환

class2idx = { label : idx for idx, label in enumerate(le.classes_) }
idx2class = { idx :label for idx, label in enumerate(le.classes_) }

In [None]:
print(class2idx)
print(idx2class)

In [None]:
## 이미지 패스 리스트 만들기

images_list = pets_df['full_path'].values

In [None]:
## 이미지 파일 리스트에서 이미지 패스로 해당 이미지 읽고 리스트에 넣는다.
## 이미지 resize로 맞추지 않으면 뒤쪽에서 Tensor에러 발생한다.

x_image_list = []

for fname in images_list:
  image = Image.open(fname)
  image = image.resize((224,224))
  image = np.array(image)

  x_image_list.append(image)

x_image_list = np.array(x_image_list)

In [None]:
## 전체 이미지 리스트 형태 보기

x_image_list.shape, type(x_image_list)

In [None]:
## 이미지 라벨값과 bbox 위치값을 같이 저장한다.

y1_label = pets_df[['x', 'y', 'w', 'h', 'label_conv']].values

In [None]:
y1_label[:4]

In [None]:
IMG_SIZE = 224
N_BBOX = len(x_image_list)
N_TRAIN = 500
N_VAL = N_BBOX - N_TRAIN

In [None]:
## Annotation XML 파일 갯수로 리스트 만들고 Shuffle하고 500개 Train set과 100개 Valid set으로 나눈다.

shuffle_list = list(range(N_BBOX))
random.shuffle(shuffle_list)

train_idx_list = shuffle_list[:N_TRAIN]
val_idx_list = shuffle_list[N_TRAIN:]

In [None]:
## Train set idx 보기
train_idx_list[:10]

In [None]:
## train 데이터셋 만들기

train_image_list = x_image_list[train_idx_list]
train_label = y1_label[train_idx_list]

In [None]:
train_image_list.shape, train_label.shape

In [None]:
## valid 데이터셋 만들기

valid_image_list = x_image_list[val_idx_list]
valid_label = y1_label[val_idx_list]

In [None]:
valid_image_list.shape, valid_label.shape

In [None]:
## Train / Valid Dataset 만들기

train_dataset_image = tf.data.Dataset.from_tensor_slices((train_image_list, train_label))
train_dataset = train_dataset_image.batch(16).shuffle(1000).repeat()

valid_dataset_image = tf.data.Dataset.from_tensor_slices((valid_image_list, valid_label))
valid_dataset = valid_dataset_image.batch(16).repeat()

In [None]:
## Hyper Parameters

N_CLASS = len(class2idx)
N_EPOCHS = 40
N_BATCH = 16
IMG_SIZE = 224
learning_rate = 0.0001

steps_per_epoch = N_TRAIN / N_BATCH
validation_steps = int(np.ceil(N_VAL / N_BATCH))

In [None]:
## valid_dataset에서 1개 가져와 분석해 보자
## 이미지와 라벨로 받고 라벨은 다시 실제 라벨과 x,y,w,h 위치 값으로 나뉜다.

for image, label in valid_dataset.take(1):
  print(image.shape)
  print(label.shape)
  print(label[:, :4].shape)
  print(label[:, -1].shape)
  break


In [None]:
## valid dataset에서 1개의 image와 bbox, label 를 읽어서 확인

cnt = 0
for image, label in valid_dataset.take(1):

    ''' matplotlib Rectangle 이용하여 사각형 그릴 경우,
    그림을 그리기 위해서 bbox의 왼쪽 아래 (xmin, ymin) 꼭지점 좌표를 계산하고,
    xmin, ymin, w, h 각각을 image size에 맞게 scaling'''

    # x, y(중앙), w, h 형태를 xmin, ymin, w, h 형태로 변환해야 함.

    x = label[:,0]
    y = label[:,1]
    w = label[:,2]
    h = label[:,3]
    classes = label[:,4].numpy()

    xmin = x[0].numpy() - w[0].numpy()/2.
    ymin = y[0].numpy() - h[0].numpy()/2.
    rect_x = int(xmin * IMG_SIZE)
    rect_y = int(ymin * IMG_SIZE)
    rect_w = int(w[0].numpy() * IMG_SIZE)
    rect_h = int(h[0].numpy() * IMG_SIZE)

    ## 그림 그리기
    rect = Rectangle((rect_x, rect_y), rect_w, rect_h, fill=False, color='red')
    plt.axes().add_patch(rect)
    plt.title(f'{classes[0]}, {idx2class[classes[0]]}')
    plt.imshow(image[0])
    plt.show()

In [None]:
# Functional API를 사용하여 모델 생성
# 입력과 출력 레이어에 이름 붙여주자!!!

def create_model():

    # 입력
    inputs = tf.keras.layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name='inputs')

    # 컨볼루션
    conv = tf.keras.layers.Conv2D(filters=256, kernel_size=3, activation='relu', padding='SAME', name='conv2d_layer_1')(inputs)
    pool = tf.keras.layers.MaxPooling2D((2, 2), padding='SAME', name='maxpool_layer_1')(conv)

    conv = tf.keras.layers.Conv2D(filters=128, kernel_size=3, activation='relu', padding='SAME', name='conv2d_layer_2')(inputs)
    pool = tf.keras.layers.MaxPooling2D((2, 2), padding='SAME', name='maxpool_layer_2')(conv)

    conv = tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation='relu', padding='SAME', name='conv2d_layer_3')(inputs)
    pool = tf.keras.layers.MaxPooling2D((2, 2), padding='SAME', name='maxpool_layer_3')(conv)

    conv = tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation='relu', padding='SAME', name='conv2d_layer_4')(inputs)
    pool = tf.keras.layers.MaxPooling2D((2, 2), padding='SAME', name='maxpool_layer_4')(conv)
    flat = tf.keras.layers.Flatten(name='flatten_layer')(pool)

    # 출력
    dense1 = tf.keras.layers.Dense(128, activation='relu')(flat)
    outputs_xywh = tf.keras.layers.Dense(4, activation='sigmoid', name='get_xywh')(dense1)  # 4개 X, Y, W, H 좌표
    outputs_classes = tf.keras.layers.Dense(N_CLASS, activation='softmax', name='get_classes')(dense1)  # 6개 클래스 레이블

    concat = tf.keras.layers.Concatenate()([outputs_xywh, outputs_classes])  # 총 10개의 출력

    # 모델
    model = tf.keras.models.Model(inputs=inputs, outputs=concat)

    return model


In [None]:
## Create model, compile & summary
model = create_model()
model.summary()

In [None]:
## 모델의 입력과 출력을 나타내는 텐서
## model.output : (None, 10) ? --> 4개의 x, y, w, h 와 class 6개 one-hot-encoding

print(model.input)
print(model.output)

In [None]:
# 커스텀 Loss Function
# 자동으로 y_true와 y_pred 두개 인자가 들어옴.
# 정답과 예측값은 10개의 값들로 구성됨 : 앞 4개(X, Y, H, W) + 뒤 6개(원핫인코딩된 Class 종류)
# 앞 4개(X, Y, H, W)와 뒤 6개(원핫인코딩된 Class 종류) 각각에 대해서 Loss 함수를 구하고 합쳐야 한다.
# cls_labels는 정답으로 숫자 1자리로 되어 있어 one-hot-encoding 되어야 함(Sparse_categorical_crossentropy 필요)
# 하지만, y_pred 경우 이미 6개 Class에 대해 one_hot_encoding 된 상태로 예측값을 주므로 인덱스 위치를 잘 찾아 비교해야함.

def loss_fn(y_true, y_pred):
  loc_labels = y_true[:,:4] # y_true[:,:4] -> 정답 bbox 4개 위치 값
  # Selects all rows but only the first four columns of y_true
  loc_preds = y_pred[:,:4] # y_pred[:,:4] -> 예측값 bbox 4개 위치 값

  cls_labels = tf.cast(y_true[:,4:], tf.int64) # y_true[:, 4:] -> 정답 class 1개 값이어서 원핫인코딩 필요
  cls_preds = y_pred[:,4:] # y_pred[:,4:] -> 예측값 class 1개값에 대해 이미 6개로 one-hot-encoding 되어 있음

  loc_loss = tf.keras.losses.MeanSquaredError()(loc_labels, loc_preds)  # 회귀 : MSE
  cls_loss = tf.keras.losses.SparseCategoricalCrossentropy()(cls_labels, cls_preds)  # 분류 : Crossentropy

  # 2개의 loss 함쳐 리터
  return cls_loss + 5*loc_loss

In [None]:
# 모델 컴파일

## learning rate scheduing
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=learning_rate,
                                                          decay_steps=steps_per_epoch*10,
                                                          decay_rate=0.5,
                                                          staircase=True)

## optimizer는 RMSprop, loss는 mean squared error 사용
model.compile(tf.keras.optimizers.RMSprop(lr_schedule, momentum=0.9), loss=loss_fn, metrics=['accuracy'])

In [None]:
## callbacks : EarlyStopping, ModelCheckpoint
es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, verbose=1)
mc = tf.keras.callbacks.ModelCheckpoint('best_model_{val_loss:.2f}.h5', monitor='val_loss', save_best_only=True, verbose=1)

In [None]:
## Train!
model.fit(train_dataset, steps_per_epoch=steps_per_epoch,
         epochs=N_EPOCHS,
         validation_data=valid_dataset,
         validation_steps=validation_steps,
         callbacks=[es, mc])


### Pretrained model(MobileNetv2)

In [None]:
from tensorflow.keras import models
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.layers import Conv2D, ReLU, MaxPooling2D, Dense, BatchNormalization, GlobalAveragePooling2D

In [None]:
mobilenetv2 = MobileNetV2(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))

In [None]:
mobilenetv2.summary()

In [None]:
# Functional API를 사용하여 모델 생성
# 입력과 출력 레이어에 이름 붙여주자!!!
# Sigmoid: Maps  any real-valued input to a value between 0 and 1
    # Using in binary-classificatiob tasks
# Sigmoid squashes the network output into the 0 - 1 range, which matches the usual normalization of bounding-box values

def create_mv_model():

    # mobilenetv2
    globalavgpool = tf.keras.layers.GlobalAveragePooling2D()(mobilenetv2.output)

    # 출력
    dense1 = tf.keras.layers.Dense(128, activation='relu')(globalavgpool)
    outputs_xywh = tf.keras.layers.Dense(4, activation='sigmoid', name='get_xywh')(dense1)
    outputs_classes = tf.keras.layers.Dense(N_CLASS, activation='softmax', name='get_classes')(dense1)

    concat = tf.keras.layers.Concatenate()([outputs_xywh, outputs_classes])

    # 모델
    model = tf.keras.models.Model(inputs=mobilenetv2.input, outputs=concat)

    return model

In [None]:
## Create model, compile & summary
model = create_mv_model()
model.summary()

In [None]:
# 커스텀 Loss Function
# 자동으로 y_true와 y_pred 두개 인자가 들어옴.
# 정답과 예측값은 10개의 값들로 구성됨 : 앞 4개(X, Y, H, W) + 뒤 6개(원핫인코딩된 Class 종류)
# 앞 4개(X, Y, H, W)와 뒤 6개(원핫인코딩된 Class 종류) 각각에 대해서 Loss 함수를 구하고 합쳐야 한다.
# cls_labels는 정답으로 숫자 1자리로 되어 있어 one-hot-encoding 되어야 함(Sparse_categorical_crossentropy 필요)
# 하지만, y_pred 경우 이미 6개 Class에 대해 one_hot_encoding 된 상태로 예측값을 주므로 인덱스 위치를 잘 찾아 비교해야함.

def loss_fn(y_true, y_pred):
  loc_labels = y_true[:,:4] # y_true[:,:4] -> 정답 bbox 4개 위치 값
  loc_preds = y_pred[:,:4] # y_pred[:,:4] -> 예측값 bbox 4개 위치 값

  cls_labels = tf.cast(y_true[:,4:], tf.int64) # y_true[:, 4:] -> 정답 class 1개 값이어서 원핫인코딩 필요
  cls_preds = y_pred[:,4:] # y_pred[:,4:] -> 예측값 class 1개값에 대해 이미 6개로 one-hot-encoding 되어 있음

  loc_loss = tf.keras.losses.MeanSquaredError()(loc_labels, loc_preds)  # 회귀 : MSE
  cls_loss = tf.keras.losses.SparseCategoricalCrossentropy()(cls_labels, cls_preds)  # 분류 : Crossentropy

  # 2개의 loss 함쳐 리터
  return cls_loss + 5*loc_loss

In [None]:
# 모델 컴파일

## learning rate scheduing
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=learning_rate,
                                                          decay_steps=steps_per_epoch*10,
                                                          decay_rate=0.5,
                                                          staircase=True)

## optimizer는 RMSprop, loss는 mean squared error 사용
model.compile(tf.keras.optimizers.RMSprop(lr_schedule, momentum=0.9), loss=loss_fn, metrics=['accuracy'])

In [None]:
## callbacks : EarlyStopping, ModelCheckpoint
es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, verbose=1)
mc = tf.keras.callbacks.ModelCheckpoint('best_model_{val_loss:.2f}.h5', monitor='val_loss', save_best_only=True, verbose=1)

In [None]:
## Train!
model.fit(train_dataset, steps_per_epoch=steps_per_epoch,
         epochs=N_EPOCHS,
         validation_data=valid_dataset,
         validation_steps=validation_steps,
         callbacks=[es, mc])


In [None]:
# validation data 일부 읽어와 예측해 보고 Class와 Bbox에 대한 정답과 예측 확인
## 정답은 빨간색 box, 예측은 파란색 box

idx = 2
num_imgs = validation_steps

# val_dataset 포맷: (None, 224, 224, 3), (None, 5)
for val_data, val_gt in valid_dataset.take(num_imgs):

    ## 정답 box 그리기
    x = val_gt[:,0]
    y = val_gt[:,1]
    w = val_gt[:,2]
    h = val_gt[:,3]
    gt_class = val_gt[:,4]

    gt_class_num = gt_class[idx]
    xmin = x[idx].numpy() - w[idx].numpy()/2.
    ymin = y[idx].numpy() - h[idx].numpy()/2.
    rect_x = int(xmin * IMG_SIZE)
    rect_y = int(ymin * IMG_SIZE)
    rect_w = int(w[idx].numpy() * IMG_SIZE)
    rect_h = int(h[idx].numpy() * IMG_SIZE)

    rect = Rectangle((rect_x, rect_y), rect_w, rect_h, fill=False, color='red')
    plt.axes().add_patch(rect)

    ## 예측 box 그리기
    ## validation set에 대해서 bounding box 예측
    prediction = model.predict(val_data) # prediction.shape : (None, 10)
    pred_x = prediction[:,0]
    pred_y = prediction[:,1]
    pred_w = prediction[:,2]
    pred_h = prediction[:,3]
    pred_class = np.argmax(prediction[:,4:], axis=1)

    pred_class_num = pred_class[idx]
    pred_xmin = pred_x[idx] - pred_w[idx]/2.
    pred_ymin = pred_y[idx] - pred_h[idx]/2.
    pred_rect_x = int(pred_xmin * IMG_SIZE)
    pred_rect_y = int(pred_ymin * IMG_SIZE)
    pred_rect_w = int(pred_w[idx] * IMG_SIZE)
    pred_rect_h = int(pred_h[idx] * IMG_SIZE)

    pred_rect = Rectangle((pred_rect_x, pred_rect_y), pred_rect_w, pred_rect_h,
                         fill=False, color='blue')
    plt.axes().add_patch(pred_rect)

    ## image와 bbox 함께 출력
    plt.title(f'GT : {gt_class_num}, Pred : {pred_class_num}')
    plt.imshow(val_data[idx])
    plt.show()