In [2]:
from google.colab import drive
drive.mount('/content/drive')
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()

gc = gspread.authorize(creds)

Mounted at /content/drive


In [3]:
main_path = '/content/drive/MyDrive/MyProject/videos/videos/' 
my_path = '/content/drive/MyDrive/MyProject/'
move_seq_path = my_path + 'MoveSeqs/'
holds_seq_path = my_path + 'HoldsSeqs/'

In [4]:
from keras.models import Model
from keras.layers import Input, LSTM, Dense

import numpy as np
import pandas as pd
import json

import sklearn as sk
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from random import randint
from numpy import array

import matplotlib.pyplot as plt

Get the pose estimation and move sequence to test the RNN on

In [5]:
worksheet = pd.read_csv(my_path + 'videos.csv')
worksheet = worksheet.iloc[:,1:]
worksheet

Unnamed: 0,0
0,IMG_1521.MOV_STAB.MOV
1,IMG_1522.MOV_STAB.MOV
2,IMG_1515.MOV_STAB.MOV
3,IMG_1513.MOV_STAB.MOV
4,IMG_1544.MOV_STAB.MOV
...,...
172,IMG_4054.MOV_STAB.MOV
173,IMG_6820.MOV_STAB.MOV
174,IMG_6821.MOV_STAB.MOV
175,IMG_6824.MOV_STAB.MOV


In [6]:
import itertools

def convert_df_into_array(df):
    """Concatenates all the rows of a dataframe into a big list of strings.
    WARNING: the column names are not registered, so the order has to be implicitly respected"""
    df_list = []

    for i in range(df.shape[0]):
        df_list.append(list(df.iloc[i]))

    df_list = list(itertools.chain.from_iterable(df_list))
    
    return np.array(df_list)

In [7]:
def pad_seq(move_seq, max_size = 100):
    if move_seq.shape[0] > max_size:
        print("Error")
        return 0
    last = np.repeat(['X'], repeats = max_size - move_seq.shape[0], axis = 0)
    
    move_seq = np.hstack([move_seq, last])

    return move_seq

In [8]:
def standardize_df(df, nb_decimals=2):
    return df.round(nb_decimals)

Prepare encoders for the holds sequence and the move sequence

In [9]:
nb_decimals = 1

def generate_encoders(nb_decimals=2):
    extrs = ['left_hand', 'right_hand', 'left_foot', 'right_foot']
    range = 2
    coords = np.round(np.linspace(-range,range,2*range*(1+10**nb_decimals)), decimals=nb_decimals)
    coords = [f'{a}_{b}' for a in coords for b in coords]

    # move_seq_classes = np.hstack([extrs, coords, ['0']])
    # holds_seq_classes = np.hstack([['X'], coords, ['0']])

    move_seq_classes = [f'{e}_{c}' for e in extrs for c in coords]
    move_seq_classes = np.hstack([move_seq_classes, ['0'],['X']])
    holds_seq_classes = np.hstack([['X'], coords, ['0']])

    voc_holds = len(holds_seq_classes)
    voc_move = len(move_seq_classes)

    encode_move_seq = LabelEncoder()
    encode_move_seq.fit(move_seq_classes)
    encode_holds_seq = LabelEncoder()
    encode_holds_seq.fit(holds_seq_classes)
    
    return encode_move_seq, encode_holds_seq, voc_holds, voc_move
    

encode_move_seq, encode_holds_seq, voc_holds, voc_move = generate_encoders(nb_decimals)
print(voc_holds, voc_move)


1938 7746


In [10]:
def format_seqs(holds_seq, move_seq):

  H = [f'{holds_seq[i]}_{holds_seq[i+1]}' for i in range(0,holds_seq.shape[0],2)]
  

  M = [f'{move_seq[i+2]}_{move_seq[i]}_{move_seq[i+1]}' for i in range(0,move_seq.shape[0],3)]

  return np.array(H), np.array(M)

In [11]:
def get_dataset(sheet, cardinality=1000, train_size=100, n_input = 200):
    global encode_pose_ests
    global encode_pose_ests
    X1, X2, y = list(), list(), list()
    for i in range(0, train_size):
      video = sheet.iloc[i,0]
      # print(video)
      try:
        holds_seq = pd.read_csv(holds_seq_path + video + '_HOLDS_SEQ.csv', dtype=float)
        move_seq = pd.read_csv(move_seq_path + video + '_MOVE_SEQ.csv')
      except FileNotFoundError:
        continue

      if(move_seq.shape[0] > 4 and holds_seq.shape[0] > 4):

        try:
          move_seq = standardize_df(move_seq.iloc[:,2:], nb_decimals)
          target = convert_df_into_array(move_seq)

          holds_seq = convert_df_into_array(standardize_df(holds_seq.iloc[:,1:], nb_decimals))

          # Format the sequences
          holds_seq, target = format_seqs(holds_seq, target)

          # Pad the sequences
          source = pad_seq(holds_seq)
          target = pad_seq(target, n_input)


          # create padded input target sequence
          target_in = np.concatenate(([0], target[:-1]), axis=0)

          # Encode all sequences into int
          source = encode_holds_seq.transform(source)
          target = encode_move_seq.transform(target)
          target_in = encode_move_seq.transform(target_in)
          # encode
          src_encoded = to_categorical([source], num_classes=voc_holds)
          tar_encoded = to_categorical([target], num_classes=voc_move)
          tar2_encoded = to_categorical([target_in], num_classes=voc_move)
          # store
          X1.append(src_encoded)
          X2.append(tar_encoded)
          y.append(tar2_encoded)

        except TypeError:
          continue

    X1 = np.squeeze(array(X1), axis=1) 
    X2 = np.squeeze(array(X2), axis=1) 
    y = np.squeeze(array(y), axis=1) 
    return X1, X2, y

# decode a one hot encoded string
def one_hot_decode(encoded_seq):
	return [np.argmax(vector) for vector in encoded_seq]

In [12]:
# generate training dataset
n_input = 30 * 2 # 30 holds with 2 coordinates
n_output = 30 * 2 # 30 moves with 2 coordinates
X1, X2, y = get_dataset(worksheet, train_size=150, n_input=n_input)
print(X1.shape,X2.shape,y.shape)

AxisError: ignored

In [None]:
# returns train, inference_encoder and inference_decoder models
def define_models(n_input, n_output, n_units):
	# define training encoder
	encoder_inputs = Input(shape=(None, n_input))
	encoder = LSTM(n_units, return_state=True)
	encoder_outputs, state_h, state_c = encoder(encoder_inputs)
	encoder_states = [state_h, state_c]
	# define training decoder
	decoder_inputs = Input(shape=(None, n_output))
	decoder_lstm = LSTM(n_units, return_sequences=True, return_state=True)
	decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
	decoder_dense = Dense(n_output, activation='softmax')
	decoder_outputs = decoder_dense(decoder_outputs)
	model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
	# define inference encoder
	encoder_model = Model(encoder_inputs, encoder_states)
	# define inference decoder
	decoder_state_input_h = Input(shape=(n_units,))
	decoder_state_input_c = Input(shape=(n_units,))
	decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
	decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
	decoder_states = [state_h, state_c]
	decoder_outputs = decoder_dense(decoder_outputs)
	decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)
	# return all models
	return model, encoder_model, decoder_model

In [None]:
n_features = 1000
n_units = 128
# define model
train, infenc, infdec = define_models(voc_holds, voc_move, n_units)
train.compile(optimizer='rmsprop', loss='categorical_crossentropy')

In [None]:
history = train.fit([X1, X2], y,
          batch_size=1,
          epochs=10,
          validation_split=0.2)

In [None]:
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()

In [None]:
# generate target given source sequence
def predict_sequence(infenc, infdec, source, n_steps, cardinality):
	# encode
	state = infenc.predict(source)
	# start of sequence input
	target_seq = array([0.0 for _ in range(cardinality)]).reshape(1, 1, cardinality)
	# collect predictions
	output = list()
	for t in range(n_steps):
		# predict next char
		yhat, h, c = infdec.predict([target_seq] + state)
		# store prediction
		output.append(yhat[0,0,:])
		# update state
		state = [h, c]
		# update target sequence
		target_seq = yhat
	return array(output)

In [None]:
def get_test_video(video, cardinality=1000):
    global encode_pose_ests
    global encode_pose_ests
    nb_decimals = 1
    X1, X2, y = list(), list(), list()
    print(video)
    holds_seq = pd.read_csv(holds_seq_path + video + '_HOLDS_SEQ.csv', dtype=float)
    move_seq = pd.read_csv(move_seq_path + video + '_MOVE_SEQ.csv')

    if(move_seq.shape[0] > 4 and holds_seq.shape[0] > 4):
        move_seq = standardize_df(move_seq.iloc[:,2:], nb_decimals)
        target = convert_df_into_array(move_seq)

        holds_seq = convert_df_into_array(standardize_df(holds_seq.iloc[:,1:], nb_decimals))

        # Format the sequences
        holds_seq, target = format_seqs(holds_seq, target)

        # Pad the sequences
        source = pad_seq(holds_seq)
        target = pad_seq(target, n_input)

        # create padded input target sequence
        target_in = np.concatenate(([0], target[:-1]), axis=0)

        # Encode all sequences into int
        source = encode_holds_seq.transform(source)
        target = encode_move_seq.transform(target)
        target_in = encode_move_seq.transform(target_in)
        # encode
        src_encoded = to_categorical([source], num_classes=voc_holds)
        tar_encoded = to_categorical([target], num_classes=voc_move)
        tar2_encoded = to_categorical([target_in], num_classes=voc_move)
        # store
        X1.append(src_encoded)
        X2.append(tar_encoded)
        y.append(tar2_encoded)

    X1 = np.squeeze(array(X1), axis=1) 
    X2 = np.squeeze(array(X2), axis=1) 
    y = np.squeeze(array(y), axis=1) 
    return X1, X2, y


In [None]:
video_test = worksheet.iloc[168,0]
X1_test, X2_test, y_test = get_test_video(video_test)
print(X1_test.shape, X2_test.shape, y_test.shape)

In [None]:
target = predict_sequence(infenc, infdec, X1_test, n_output, voc_move)

In [None]:
# Prediction
prediction = encode_move_seq.inverse_transform(one_hot_decode(target))
prediction[1:]

In [None]:
# True value

encode_move_seq.inverse_transform(one_hot_decode(y_test[0,1:]))

In [None]:
# def decode_sequence(input_seq):
#     # Encode the input as state vectors.
#     states_value = encoder_model.predict(input_seq)

#     # Generate empty target sequence of length 1.
#     target_seq = np.zeros((1, 1, num_decoder_tokens))
#     # Populate the first character of target sequence with the start character.
#     target_seq[0, 0, target_token_index['\t']] = 1.

#     # Sampling loop for a batch of sequences
#     # (to simplify, here we assume a batch of size 1).
#     stop_condition = False
#     decoded_sentence = ''
#     while not stop_condition:
#         output_tokens, h, c = decoder_model.predict(
#             [target_seq] + states_value)

#         # Sample a token
#         sampled_token_index = np.argmax(output_tokens[0, -1, :])
#         sampled_char = reverse_target_char_index[sampled_token_index]
#         decoded_sentence += sampled_char

#         # Exit condition: either hit max length
#         # or find stop character.
#         if (sampled_char == '\n' or
#            len(decoded_sentence) > max_decoder_seq_length):
#             stop_condition = True

#         # Update the target sequence (of length 1).
#         target_seq = np.zeros((1, 1, num_decoder_tokens))
#         target_seq[0, 0, sampled_token_index] = 1.
refer
#         # Update states
#         states_value = [h, c]

#     return decoded_sentence