In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#경로 설정
import os
os.chdir('/content/drive/My Drive/Colab Notebooks/모션키포인트검출AI경진대회')

# Motion Keypoint Baseline

### Module Mount & Data Load

In [None]:
import cv2
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import random
random.seed(2021)
import shutil

import tensorflow as tf
from tensorflow.keras.layers import LeakyReLU, PReLU
from math import cos, sin, pi
from PIL import Image
from tqdm import tqdm
from tensorflow.keras import Sequential, Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.layers import Activation, Convolution2D, MaxPool2D, MaxPooling2D, BatchNormalization, Flatten, Dense, Dropout, Conv2D, ZeroPadding2D, AveragePooling2D, GlobalAveragePooling2D, Conv2DTranspose, Input
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam

### Train, Valid Split

In [None]:
# # train, val folder 생성
# root_dir = './data'

# os.makedirs(root_dir +'/train')
# os.makedirs(root_dir +'/valid')

# # validation용 파일은 10% 비율로 random sampling
# # random.seed() 넣으시면 복원 가능
# src = "data/train_imgs"
# all_filename = os.listdir(src)

In [None]:
# valid_filename = random.sample(all_filename, int(len(all_filename) * 0.1))
# train_filename = [x for x in all_filename if x not in valid_filename]

# print(len(train_filename), len(valid_filename))

In [None]:
# train_filename = [src+'/'+ name for name in train_filename]
# valid_filename = [src+'/' + name for name in valid_filename]

# # copy & paste images
# for name in tqdm(train_filename):
#     shutil.copy(name, 'data/train')

# for name in tqdm(valid_filename):
#     shutil.copy(name, 'data/valid')
    
# print('Total images: ', len(all_filename))
# print('Training: ', len(train_filename))
# print('Validation: ', len(valid_filename))

In [None]:
# # train, valid folder 속 모든 이미지 파일 read & sort
# train_paths = glob.glob('./data/train/*.jpg')
# valid_paths = glob.glob('./data/valid/*.jpg')
# train_paths.sort()
# valid_paths.sort()

# train_filename = []
# valid_filename = []

# for t_paths in tqdm(train_paths):
#     filename = t_paths.split('/')[-1].split('\\')[1]
#     train_filename.append(filename)

# for v_paths in tqdm(valid_paths):
#     filename = v_paths.split('/')[-1].split('\\')[1]
#     valid_filename.append(filename)

In [None]:
# # 각각의 train, valid 이미지들의 정보만을 담고 있는 DataFrame 생성
# train = pd.read_csv('data/train_df.csv')
# train_df = train[train['image'].isin(train_filename)]
# train_df.reset_index(inplace=True, drop=True)

# valid_df = train[train['image'].isin(valid_filename)]
# valid_df.reset_index(inplace=True, drop=True)

# train_df.to_csv('data/train.csv', index=False)
# valid_df.to_csv('data/valid.csv', index=False)

### Load Data

In [None]:
# 해당 코드는 아래 Train, Valid Split 이후에 실행
train = pd.read_csv('./data/train.csv')
valid = pd.read_csv('./data/valid.csv')

train_paths = glob.glob('./data/train/*.jpg')
valid_paths = glob.glob('./data/valid/*.jpg')
test_paths = glob.glob('./data/test_imgs/*.jpg')
print(len(train_paths), len(valid_paths), len(test_paths))

train_paths.sort()
valid_paths.sort()
test_paths.sort()

train['path'] = train_paths
valid['path'] = valid_paths

## 시각화

In [None]:
plt.figure(figsize=(40,20))
count=1

for i in np.random.randint(0,len(train_paths),5):
    
    plt.subplot(5,1, count)
    
    img_sample_path = train_paths[i]
    img = Image.open(img_sample_path)
    img_np = np.array(img)

    keypoint = train.iloc[:,1:49] #위치 키포인트 하나씩 확인
    keypoint_sample = keypoint.iloc[i, :]
    
    for j in range(0,len(keypoint.columns),2):
        plt.plot(keypoint_sample[j], keypoint_sample[j+1],'rx')
        plt.imshow(img_np)
    
    count += 1

### Augmentation

In [None]:
# Augmentation Setting
pixel_shifts = [12]
rotation_angles = [12]
inc_brightness_ratio = 1.2
dec_brightness_ratio = 0.8
noise_ratio = 0.008

In [None]:
# 좌우 반전
def left_right_flip(images, keypoints):
    flipped_keypoints = []
    flipped_images = np.flip(images, axis=1)
    for idx, sample_keypoints in enumerate(keypoints):
        if idx%2 == 0:
            flipped_keypoints.append(480.-sample_keypoints)
        else:
            flipped_keypoints.append(sample_keypoints)
    
    # left_right_keypoints_convert
    for i in range(8):
        flipped_keypoints[2+(4*i):4+(4*i)], flipped_keypoints[4+(4*i):6+(4*i)] = flipped_keypoints[4+(4*i):6+(4*i)], flipped_keypoints[2+(4*i):4+(4*i)]
    flipped_keypoints[36:38], flipped_keypoints[38:40] = flipped_keypoints[38:40], flipped_keypoints[36:38]
    flipped_keypoints[44:46], flipped_keypoints[46:48] = flipped_keypoints[46:48], flipped_keypoints[44:46]
    
    return flipped_images, flipped_keypoints

In [None]:
# 수직/수평 동시 이동
# forloop에서 shift_x, shift_y 중 하나만 놓으면
# 수직 또는 수평 이동만 따로 시행 가능
def shift_images(images, keypoints):
    # tensor -> numpy
    images = images.numpy()
    shifted_images = []
    shifted_keypoints = []
    for shift in pixel_shifts:   
        for (shift_x,shift_y) in [(-shift,-shift),(-shift,shift),(shift,-shift),(shift,shift)]:
            # 이동할 matrix 생성
            M = np.float32([[1,0,shift_x],[0,1,shift_y]])
            shifted_keypoint = np.array([])
            shifted_x_list = np.array([])
            shifted_y_list = np.array([])
            # 이미지 이동
            shifted_image = cv2.warpAffine(images, M, (480,270), flags=cv2.INTER_CUBIC)
            # 이동한만큼 keypoint 수정
            for idx, point in enumerate(keypoints):
                if idx%2 == 0: 
                    shifted_keypoint = np.append(shifted_keypoint, point+shift_x)
                    shifted_x_list = np.append(shifted_x_list, point+shift_x)
                else: 
                    shifted_keypoint =np.append(shifted_keypoint, point+shift_y)
                    shifted_y_list = np.append(shifted_y_list, point+shift_y)
            # 수정된 keypoint가 이미지 사이즈를 벗어나지 않으면 append
            if np.all(0.0<shifted_x_list) and np.all(shifted_x_list<480) and np.all(0.0<shifted_y_list) and np.all(shifted_y_list<270):
                shifted_images.append(shifted_image.reshape(270,480,3))
                shifted_keypoints.append(shifted_keypoint)

    return shifted_images, shifted_keypoints

In [None]:
# 이미지 회전
def rotate_augmentation(images, keypoints):
    # tensor -> numpy
    images = images.numpy()
    rotated_images = []
    rotated_keypoints = []
    
    for angle in rotation_angles:
        for angle in [angle,-angle]:
            # 회전할 matrix 생성
            M = cv2.getRotationMatrix2D((240,135), angle, 1.0)
            # cv2_imshow로는 문제없지만 추후 plt.imshow로 사진을 확인할 경우 black screen 생성...
            # 혹시 몰라 matrix를 ndarray로 변환
            M = np.array(M, dtype=np.float32)
            angle_rad = -angle*pi/180
            rotated_image = cv2.warpAffine(images, M, (480,270))
            rotated_images.append(rotated_image)
            
            # keypoint를 copy하여 forloop상에서 값이 계속 없데이트 되는 것을 회피
            rotated_keypoint = keypoints.copy()
            rotated_keypoint[0::2] = rotated_keypoint[0::2] - 240
            rotated_keypoint[1::2] = rotated_keypoint[1::2] - 135
            
            for idx in range(0,len(rotated_keypoint),2):
                rotated_keypoint[idx] = rotated_keypoint[idx]*cos(angle_rad)-rotated_keypoint[idx+1]*sin(angle_rad)
                rotated_keypoint[idx+1] = rotated_keypoint[idx]*sin(angle_rad)+rotated_keypoint[idx+1]*cos(angle_rad)

            rotated_keypoint[0::2] = rotated_keypoint[0::2] + 240
            rotated_keypoint[1::2] = rotated_keypoint[1::2] + 135
            rotated_keypoints.append(rotated_keypoint)
        
    return rotated_images, rotated_keypoints

In [None]:
# 이미지 해상도 조절
def alter_brightness(images):
    altered_brightness_images = []
    inc_brightness_images = np.clip(images*inc_brightness_ratio, 0.0, 1.0)
    dec_brightness_images = np.clip(images*dec_brightness_ratio, 0.0, 1.0)
    altered_brightness_images.append(inc_brightness_images)
    altered_brightness_images.append(dec_brightness_images)
    return altered_brightness_images

In [None]:
# Random 노이즈 추가
def add_noise(images):
    images = images.numpy()
    noise = noise_ratio * np.random.randn(270,480,3)
    noise = noise.astype(np.float32)
    # 생성한 noise를 원본에 add
    noisy_image = cv2.add(images, noise)
    return noisy_image

### Generator

In [None]:
def trainGenerator():
    # 원본 이미지 resize
    for i in range(len(train)):
        img = tf.io.read_file(train['path'][i]) # path(경로)를 통해 이미지 읽기
        img = tf.image.decode_jpeg(img, channels=3) # 경로를 통해 불러온 이미지를 tensor로 변환
        img = tf.image.resize(img, [270,480]) # 이미지 resize 
        img = img/255                         # 이미지 rescaling
        target = train.iloc[:,1:49].iloc[i,:] # keypoint 뽑아주기
        target = target/4                     # image size를 1920x1080 -> 480x270으로 바꿔줬으므로 keypoint도 변경

        yield (img, target)
    
    # horizontal flip
    for i in range(len(train)):
        img = tf.io.read_file(train['path'][i]) 
        img = tf.image.decode_jpeg(img, channels=3) 
        img = tf.image.resize(img, [270,480]) 
        img = img/255
        target = train.iloc[:,1:49].iloc[i,:] 
        target = target/4
        img, target = left_right_flip(img, target)
        
        yield (img, target)

    # # Horizontal & Vertical shift
    # for i in range(len(train)):
    #     img = tf.io.read_file(train['path'][i])
    #     img = tf.image.decode_jpeg(img, channels=3)
    #     img = tf.image.resize(img, [270,480])
    #     img = img/255
    #     target = train.iloc[:,1:49].iloc[i,:]
    #     target = target/4
    #     img_list, target_list = shift_images(img, target)
    #     for shifted_img, shifted_target in zip(img_list, target_list):
            
    #         yield (shifted_img, shifted_target)

    # # Rotation
    # for i in range(len(train)):
    #     img = tf.io.read_file(train['path'][i])
    #     img = tf.image.decode_jpeg(img, channels=3)
    #     img = tf.image.resize(img, [270,480])
    #     img = img/255
    #     target = train.iloc[:,1:49].iloc[i,:]
    #     target = target/4
    #     img_list, target_list = rotate_augmentation(img, target)
    #     for rotated_img, rotated_target in zip(img_list, target_list):
            
    #         yield (rotated_img, rotated_target)

    # # Alter_Brightness
    # for i in range(len(train)):
    #     img = tf.io.read_file(train['path'][i])
    #     img = tf.image.decode_jpeg(img, channels=3)
    #     img = tf.image.resize(img, [270,480])
    #     img = img/255
    #     target = train.iloc[:,1:49].iloc[i,:]
    #     target = target/4
    #     img_list = alter_brightness(img)
    #     for altered_brightness_images in img_list:
            
    #         yield (altered_brightness_images, target)

    # # Adding_Noise
    # for i in range(len(train)):
    #     img = tf.io.read_file(train['path'][i])
    #     img = tf.image.decode_jpeg(img, channels=3)
    #     img = tf.image.resize(img, [270,480])
    #     img = img/255
    #     target = train.iloc[:,1:49].iloc[i,:]
    #     target = target/4
    #     noisy_img = add_noise(img)

    #     yield (noisy_img, target)

In [None]:
def validGenerator():
    # 원본 이미지 resize
    for i in range(len(valid)):
        img = tf.io.read_file(valid['path'][i]) # path(경로)를 통해 이미지 읽기
        img = tf.image.decode_jpeg(img, channels=3) # 경로를 통해 불러온 이미지를 tensor로 변환
        img = tf.image.resize(img, [270,480]) # 이미지 resize 
        img = img/255                         # 이미지 rescaling
        target = valid.iloc[:,1:49].iloc[i,:] # keypoint 뽑아주기
        target = target/4                     # image size를 1920x1080 -> 480x270으로 바꿔줬으므로 keypoint도 변경

        yield (img, target)

In [None]:
batch_size = 16

train_dataset = tf.data.Dataset.from_generator(trainGenerator, (tf.float32, tf.float32), (tf.TensorShape([270,480,3]),tf.TensorShape([48])))
train_dataset = train_dataset.batch(batch_size).prefetch(1)
valid_dataset = tf.data.Dataset.from_generator(validGenerator, (tf.float32, tf.float32), (tf.TensorShape([270,480,3]),tf.TensorShape([48])))
valid_dataset = valid_dataset.batch(batch_size).prefetch(1)

### Pre-trained Model

In [None]:
# tensorflow.keras.applications 사용 예시
from tensorflow.keras import models
from tensorflow.keras.applications import ResNet152V2

earlystop = EarlyStopping(patience=2)
# learning_rate_reduction = ReduceLROnPlateau(
#                         monitor= "val_loss", 
#                         patience = 2, 
#                         factor = 0.85, 
#                         min_lr = 1e-7,
#                         verbose = 1)

model_check = ModelCheckpoint( #에포크마다 현재 가중치를 저장    
        filepath="./resnet152.h5", #모델 파일 경로
        monitor='val_loss',  # val_loss 가 좋아지지 않으면 모델 파일을 덮어쓰지 않음.
        save_best_only=True)

callbacks = [earlystop, model_check]



# https://paperswithcode.com/paper/evopose2d-pushing-the-boundaries-of-2d-human
# https://arxiv.org/pdf/2011.08446v1.pdf
# https://github.com/wmcnally/evopose2d
# https://www.tensorflow.org/api_docs/python/tf/keras/applications

In [None]:
# resnet152
resnet152 = ResNet152V2(weights ='imagenet', include_top = False, input_shape = (270,480,3))

for layer in resnet152.layers:
    layer.trainable = True

# Building Layers

model = Sequential()
model.add(resnet152)
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Convolution2D(2048, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Convolution2D(4096, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Flatten())
model.add(Dense(1024, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.1))
model.add(Dense(256, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.1))
model.add(Dense(48))

print(model.summary())

In [None]:
model.compile(loss='mean_squared_error',
                optimizer='adam',
                metrics=['mae'])

In [None]:
# model.load_weights('resnet152.h5')
history = model.fit(train_dataset,
                    epochs=100,
                    validation_data=valid_dataset,
                    callbacks = callbacks)

### Load TestSet & Predict

In [None]:
test_paths = glob.glob('./data/test_imgs/*.jpg')
test_paths.sort()
X_test=[]

for test_path in tqdm(test_paths):
    img=tf.io.read_file(test_path)
    img=tf.image.decode_jpeg(img, channels=3)
    img=tf.image.resize(img, [270,480])
    img=img/255
    X_test.append(img)

X_test=tf.stack(X_test, axis=0)
X_test.shape

In [None]:
model.load_weights('resnet152.h5')
pred = model.predict(X_test)

### Submission

In [None]:
submission = pd.read_csv('./data/sample_submission.csv')
submission.iloc[:,1:] = pred * 4     # image size를 1920x1080 -> 480x270으로 바꿔서 예측했으므로 * 4
# submission

In [None]:
submission.to_csv('resnet152.csv', index=False)

### 예측 결과 시각화

In [None]:
# 예측 결과 시각화
n = random.randint(0, 1600)
predicted_keypoint = submission.iloc[n,1:49]
predicted_keypoint = np.array(predicted_keypoint)
img = Image.open(test_paths[n])
plt.imshow(img)
plt.scatter(predicted_keypoint[0::2], predicted_keypoint[1::2], marker='x')

In [None]:
# https://dacon.io/competitions/official/235701/codeshare/2383?page=1&dtype=recent&ptype=pub
# https://www.kaggle.com/gauravrajpal/facial-keypoint-detection-vgg16