In [None]:
import cv2
import glob
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import random
import shutil
import tensorflow as tf

from PIL import Image
from tqdm import tqdm
from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, concatenate, Input, Flatten, Dense
from tensorflow.keras import Model

In [None]:
os.chdir('/content/drive/MyDrive/Colab/Dacon/Motion_Keypoint/data') 

In [None]:
train = pd.read_csv('./train.csv')
valid = pd.read_csv('./valid.csv')
display(train.head(2))
print()
display(valid.head(2))

In [None]:
# train.isnull().sum()

In [None]:
train_paths = glob.glob('./train/*.jpg')
valid_paths = glob.glob('./valid/*.jpg')
test_paths = glob.glob('./test_imgs/*.jpg')
print(len(train_paths), len(valid_paths), len(test_paths))

In [None]:
train_paths.sort()
valid_paths.sort()

### Train, Valid Split

In [None]:
# root_dir = '/content/drive/My Drive/Colab/Dacon/Motion_Keypoint/data'

# os.makedirs(root_dir +'/train')
# os.makedirs(root_dir +'/val')

src = "train_imgs"
all_filename = os.listdir(src)
valid_filename = random.sample(all_filename, int(len(train_all) * 0.1))
train_filename = [x for x in all_filename if x not in valid_filename]

print(len(train_filename), len(valid_filename))

train_filename = [src+'/'+ name for name in train_filename]
valid_filename = [src+'/' + name for name in valid_filename]

print('Total images: ', len(all_filename))
print('Training: ', len(train_filename))
print('Validation: ', len(valid_filename))

# Copy-pasting images
for name in tqdm(train_filename):
    shutil.copy(name, 'train')

for name in tqdm(valid_filename):
    shutil.copy(name, 'val')

In [None]:
train_filename = []
valid_filename = []

for t_paths in tqdm(train_paths):
    filename = t_paths.split('/')[-1]
    train_filename.append(filename)

for v_paths in tqdm(valid_paths):
    filename = v_paths.split('/')[-1]
    valid_filename.append(filename)

In [None]:
train_df = train[train['image'].isin(train_filename)]
train_df.reset_index(inplace=True, drop=True)

valid_df = train[train['image'].isin(valid_filename)]
valid_df.reset_index(inplace=True, drop=True)

train_df.to_csv('train.csv', index=False)
valid_df.to_csv('valid.csv', index=False)

### 시각화

In [None]:
plt.figure(figsize=(40,20))
count=1

for i in np.random.randint(0,len(train_paths),5):
    
    plt.subplot(5,1, count)
    
    img_sample_path = train_paths[i]
    img = Image.open(img_sample_path)
    img_np = np.array(img)

    keypoint = train.iloc[:,1:49] #위치 키포인트 하나씩 확인
    keypoint_sample = keypoint.iloc[i, :]
    
    for j in range(0,len(keypoint.columns),2):
        plt.plot(keypoint_sample[j], keypoint_sample[j+1],'rx')
        plt.imshow(img_np)
    
    count += 1

In [None]:
train['path'] = train_paths
# train
valid['path'] = valid_paths
print(len(train['path']))
print(len(valid['path']))

### Augmentation

In [None]:
def left_right_flip(images, keypoints):
    flipped_keypoints = []
    flipped_images = np.flip(images, axis=1)
    for idx, sample_keypoints in enumerate(keypoints):
        if idx%2 == 0:
            flipped_keypoints.append(480.-sample_keypoints)
        else:
            flipped_keypoints.append(sample_keypoints)
    return flipped_images, flipped_keypoints

In [None]:
def trainGenerator():
    # 원본 이미지 resize
    for i in range(len(train)):
        img = tf.io.read_file(train['path'][i]) # path(경로)를 통해 이미지 읽기
        img = tf.image.decode_jpeg(img, channels=3) # 경로를 통해 불러온 이미지를 tensor로 변환
        img = tf.image.resize(img, [270,480]) # 이미지 resize 
        img = img/255
        target = train.iloc[:,1:49].iloc[i,:] # keypoint 뽑아주기
        target = target/4

        yield (img, target)
    
    # horizontal flip
    for i in range(len(train)):
        img = tf.io.read_file(train['path'][i]) # path(경로)를 통해 이미지 읽기
        img = tf.image.decode_jpeg(img, channels=3) # 경로를 통해 불러온 이미지를 tensor로 변환
        img = tf.image.resize(img, [270,480]) # 이미지 resize 
        img = img/255
        target = train.iloc[:,1:49].iloc[i,:] # keypoint 뽑아주기
        target = target/4
        img, target = left_right_flip(img, target)
        
        yield (img, target)

In [None]:
def validGenerator():
    # 원본 이미지 resize
    for i in range(len(valid)):
        img = tf.io.read_file(valid['path'][i]) # path(경로)를 통해 이미지 읽기
        img = tf.image.decode_jpeg(img, channels=3) # 경로를 통해 불러온 이미지를 tensor로 변환
        img = tf.image.resize(img, [270,480]) # 이미지 resize 
        img = img/255
        target = valid.iloc[:,1:49].iloc[i,:] # keypoint 뽑아주기
        target = target/4

        yield (img, target)
    
    # horizontal flip
    for i in range(len(valid)):
        img = tf.io.read_file(valid['path'][i]) # path(경로)를 통해 이미지 읽기
        img = tf.image.decode_jpeg(img, channels=3) # 경로를 통해 불러온 이미지를 tensor로 변환
        img = tf.image.resize(img, [270,480]) # 이미지 resize 
        img = img/255
        target = valid.iloc[:,1:49].iloc[i,:] # keypoint 뽑아주기
        target = target/4
        img, target = left_right_flip(img, target)
        
        yield (img, target)

In [None]:
train_dataset = tf.data.Dataset.from_generator(trainGenerator, (tf.float32, tf.float32), (tf.TensorShape([270,480,3]),tf.TensorShape([48])))
train_dataset = train_dataset.batch(32).prefetch(1)
valid_dataset = tf.data.Dataset.from_generator(validGenerator, (tf.float32, tf.float32), (tf.TensorShape([270,480,3]),tf.TensorShape([48])))
valid_dataset = valid_dataset.batch(32).prefetch(1)

In [None]:
a = list(train_dataset)[0][0]
t = list(train_dataset)[0][1]

In [None]:
plt.imshow(a)
plt.scatter(t[0::2], t[1::2], marker='x')

In [None]:
horizontal_flip = True
rotation_augmentation = True
brightness_augmentation = True
shift_augmentation = True
random_noise_augmentation = True

sample_image_index = 20    # Index of sample train image used for visualizing various augmentations

rotation_angles = [12]    # Rotation angle in degrees (includes both clockwise & anti-clockwise rotations)
pixel_shifts = [12]    # Horizontal & vertical shift amount in pixels (includes shift from all 4 corners)

NUM_EPOCHS = 80
BATCH_SIZE = 64

In [None]:
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Sequential, Model
from keras.layers import Activation, Convolution2D, MaxPooling2D, BatchNormalization, Flatten, Dense, Dropout, Conv2D,MaxPool2D, ZeroPadding2D

In [None]:
model = Sequential()

# Input dimensions: (None, 96, 96, 1)
model.add(Convolution2D(32, (3,3), padding='same', use_bias=False, input_shape=(270,480,3)))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
# Input dimensions: (None, 96, 96, 32)
model.add(Convolution2D(32, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

# Input dimensions: (None, 48, 48, 32)
model.add(Convolution2D(64, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
# Input dimensions: (None, 48, 48, 64)
model.add(Convolution2D(64, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

# Input dimensions: (None, 24, 24, 64)
model.add(Convolution2D(96, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
# Input dimensions: (None, 24, 24, 96)
model.add(Convolution2D(96, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

# Input dimensions: (None, 12, 12, 96)
model.add(Convolution2D(128, (3,3),padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
# Input dimensions: (None, 12, 12, 128)
model.add(Convolution2D(128, (3,3),padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

# Input dimensions: (None, 6, 6, 128)
model.add(Convolution2D(256, (3,3),padding='same',use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
# Input dimensions: (None, 6, 6, 256)
model.add(Convolution2D(256, (3,3),padding='same',use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

# Input dimensions: (None, 3, 3, 256)
model.add(Convolution2D(512, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
# Input dimensions: (None, 3, 3, 512)
model.add(Convolution2D(512, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

# Input dimensions: (None, 3, 3, 512)
model.add(Flatten())
model.add(Dense(512,activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(48))
model.summary()

In [None]:
from tensorflow.keras.optimizers import Adam

model.compile(optimizer=Adam(learning_rate=0.001), 
              loss='mean_squared_error',
              metrics=['mae'])

In [None]:
history = model.fit(train_dataset,
                    epochs=5,
                    validation_data=valid_dataset,
                    verbose=1)

In [None]:
test_paths.sort()
X_test=[]

for test_path in tqdm(test_paths):
    img=tf.io.read_file(test_path)
    img=tf.image.decode_jpeg(img, channels=3)
    img=tf.image.resize(img, [270,480])
    img=img/255
    X_test.append(img)

In [None]:
X_test=tf.stack(X_test, axis=0)
X_test.shape

In [None]:
pred=model.predict(X_test)

In [None]:
submission = pd.read_csv('./sample_submission.csv')
submission.iloc[:,1:]=pred * 4
submission

In [None]:
test_paths[0]

In [None]:
bb = submission.iloc[1000,1:49]
bb = np.array(bb)
aa = Image.open(test_paths[1000])
plt.imshow(aa)
plt.scatter(bb[0::2], bb[1::2], marker='x')

In [None]:
submission.to_csv('baseline_submission.csv', index=False)