In [None]:
import tensorflow as tf
import numpy as np
import os
import cv2
from tqdm.notebook import tqdm

In [None]:
path_to_train_dir = '/home/kpst/dataset/OTB'
path_to_test = '/home/kpst/PycharmProjects/yolov1/data/test/test1.npy'
name_label_file = 'groundtruth_rect.txt'
name_train_file = 'yolo_out'
train_data = []
labels = []
image_size = (480, 640)
n_step = 3
n_output = 4
n_predict = 6
n_feature = 4096
n_input = n_feature + n_predict
n_heatmap = 1024
batch_size = 16
EPOCH = 200
learning_rate = 0.00001
datas = []
name_data = []

In [None]:
def load_label(path_to_label, scale):
    lines, labels = [], []
    
    with open (path_to_label, 'r') as fr:
        lines = fr.read().split('\n')
    
    for line in lines:
        if len(line) > 0:
            bbox     = [int(e) for e in line.split(',') if ',' in line]
            if len(bbox) < 4:
                bbox = [int(e) for e in line.split('\t')]
            bbox[0]  = (bbox[0]+bbox[2]/2)/scale[1]
            bbox[1]  = (bbox[1]+bbox[3]/2)/scale[0]
            bbox[2] /= scale[1]
            bbox[3] /= scale[0]

            labels.append(bbox)
    return labels

In [None]:
data_len = 0
for folder in sorted(os.listdir(path_to_train_dir)):
    base_path = os.path.join(path_to_train_dir, folder)
    path_to_label = os.path.join(base_path, name_label_file)
    path_to_train = os.path.join(base_path, name_train_file)
    path_to_img   = os.path.join(base_path, 'img')
    
    scale = cv2.imread(os.path.join(path_to_img, os.listdir(path_to_img)[0])).shape
    
    data_label = load_label(path_to_label, scale)
    train      = os.listdir(path_to_train)
    if len(data_label) != len(train):
        print(folder, len(data_label), len(train))
        continue
    data_feature = []
    data_name = []
    for sample in sorted(train):
        feature = np.load(os.path.join(path_to_train, sample))
        feature[0][4096] = 0
        feature[0][4101] = 0
        data_feature.append(feature)
        data_name.append(os.path.join(path_to_img, sample))
    data_len    += len(data_name)
    data_name    = np.stack(data_name)
    data_label   = np.stack(data_label)
    data_feature = np.stack(data_feature)
    datas.append((data_feature, data_label, data_name))
data_len

In [None]:
STEP_PER_EPOCH = data_len//(batch_size*n_step)
TEST = int(STEP_PER_EPOCH*0.9)
TEST

In [None]:
batch_data = []
for data in datas[:-1]:
    feat, label, name = data
    n_bs = feat.shape[0]//(batch_size*n_step)
    end = n_bs * batch_size * n_step
    if end - feat.shape[0] != 0:
        feat_bs  = feat[:end]
        label_bs = label[:end]
        name_bs  = name[:end]
        feat     = np.concatenate((feat_bs,  feat[feat.shape[0]-batch_size*n_step:]), axis=0)
        label    = np.concatenate((label_bs,label[label.shape[0]-batch_size*n_step:]), axis=0)
        name     = np.concatenate((name_bs,  name[name.shape[0]-batch_size*n_step:]), axis=0)

    feat_bs = np.reshape(feat, (-1, batch_size, n_step, n_input))
    label_bs = np.reshape(label, (-1, batch_size, n_step, n_output))
    name_bs = np.reshape(name, (-1,batch_size, n_step))    
    batch_data.append((feat_bs, label_bs, name_bs))    

In [None]:
train_data, test_data = batch_data[:-1], batch_data[-1]

In [None]:
from tensorflow.keras.layers import LSTM, Dense

X_in = tf.keras.layers.Input((n_step, n_input))
# X = tf.keras.layers.LSTM(n_input)(X_in)
X = tf.keras.layers.LSTM(512, return_sequences=True)(X_in)
X = tf.keras.layers.LSTM(512, return_sequences=True)(X)
X = tf.keras.layers.TimeDistributed(Dense(256))(X)
X = tf.keras.layers.Flatten()(X)
X = tf.keras.layers.Activation('relu')(X)
X = tf.keras.layers.Dense(256, activation='relu')(X)
X = tf.keras.layers.Dropout(0.3)(X)
X = tf.keras.layers.Dense(128, activation='relu')(X)
X = Dense(4, activation='sigmoid')(X)
model = tf.keras.models.Model(X_in, X)
# model = tf.keras.models.Model(X_in, X[:, 4097:4101])
model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001))
model.summary()

In [None]:
orgi_loss = 100
down = 0
lr = 0.00001
for epoch in range(EPOCH):
    print('Epoch {}:'.format(epoch+1))
    step = 0
    for feature, label, _ in train_data:
        for idx in range(label.shape[0]):
            train = feature[idx]
            dets  = label[idx, :, 2]
            loss  = model.train_on_batch(train, dets)
            
            step += 1
            if step%5 == 0:
                print('.', end="")
    if orgi_loss >= loss:
        orgi_loss = loss
        down = 0
    else:
        down += 1
    if down == 15:
        down = 0
        print('\n#### learning rate drop #####')
        lr /= 10 
        model.optimizer.learning_rate.assign(lr)
    print("\tLoss: ", loss)

In [None]:
model.optimizer.learning_rate

In [None]:
model.save_weights("model_new1.h5")

In [None]:
model.load_weights('model_new.h5')

In [None]:
x_test = np.load(path_to_test)
x_test = np.reshape(x_test, (-1, n_step, n_feature))
pred = model.predict(x_test)
pred

In [None]:
pred = model.predict(test_data[0][0])
pred

In [None]:
def save_video(frames, size):
    out = cv2.VideoWriter('video/walk-1.avi', cv2.VideoWriter_fourcc(*'MJPG'), 20, size)
    for img in frames:
        out.write(img)
    out.release()

In [None]:
def get_path_image(test_name):
    name = test_name.split('.')[0]+'.jpg'
    return name

In [None]:
def convert_coord(pred, scale):    
    x, y, w, h = pred
    x *= scale[1]
    y *= scale[0]
    w *= scale[1]
    h *= scale[0]
    x =  int(x - w//2)
    y =  int(y - h//2)
    return x, y, int(w), int(h)

In [None]:
import matplotlib.pyplot as plt


def draw_bbox(img, X1, X2, X3):
    color = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
    
    for idx, X in enumerate((X1, X2, X3)):
        x, y, w, h = X
        cv2.rectangle(img, (x, y), (x+w, y+h), color[idx], 2)
        cv2.putText(img, 'rolo', (5, 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color[0], 1)
        cv2.putText(img, 'ground truth', (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color[1], 1)
        cv2.putText(img, 'yolo', (5, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color[2], 1)
#     plt.imshow(img[:,:,::-1])
#     plt.show()
    return img 

In [None]:
def calculate_dist(X1, X2):
    x1, y1, w1, h1 = X1
    x2, y2, w2, h2 = X2
    a = np.array((x1+w1//2, y1+h1//2))
    b = np.array((x2+w2//2, y2+h2//2))
    dist = np.linalg.norm(a-b)
    return dist 

In [None]:
def print_result(batch, n):
    pred = model.predict(batch_data[n][0][batch])
    dists = []
    video = []
    for idx, p in enumerate(pred):
        path = get_path_image(batch_data[n][2][batch][idx][2])
        img = cv2.imread(path)
        scale = img.shape
        x, y, w, h = convert_coord(p, scale)
        x_tr, y_tr, w_tr, h_tr = convert_coord(batch_data[n][1][batch][idx][2], scale)
        x_d, y_d, w_d, h_d = convert_coord(batch_data[n][0][batch][idx][2][4097:4101], scale)
        dists.append(calculate_dist((x, y, w, h), (x_tr, y_tr, w_tr, h_tr)))
        img = draw_bbox(img, (x, y, w, h), (x_tr, y_tr, w_tr, h_tr), (x_d, y_d, w_d, h_d))
        video.append(img)
        print("Predict: ", x, y, w, h)
        print("True:    ", x_tr, y_tr, w_tr, h_tr)
        print("Yolo:    ", x_d, y_d, w_d, h_d)
    size = video[0].shape[:2][::-1]
    return dists, video

In [None]:
print_result(4, -1)

In [None]:
dists, video = [], []
for i in range(len(test_data[0])):
    dis, vid = print_result(i, -1)
    dists+=dis
    video+=vid

In [None]:
size = video[0].shape[:2][::-1]
save_video(video, size)

In [None]:
dists = np.asarray(dists)
np.average(dists)

In [None]:
def iou(box1, box2):
    tb = min(box1[0] + 0.5 * box1[2], box2[0] + 0.5 * box2[2]) - max(box1[0] - 0.5 * box1[2],
                                                                     box2[0] - 0.5 * box2[2])
    lr = min(box1[1] + 0.5 * box1[3], box2[1] + 0.5 * box2[3]) - max(box1[1] - 0.5 * box1[3],
                                                                     box2[1] - 0.5 * box2[3])
    if tb < 0 or lr < 0:
        intersection = 0
    else:
        intersection = tb * lr
    return intersection / (box1[2] * box1[3] + box2[2] * box2[3] - intersection)