In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Flatten
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, Concatenate
import pandas as pd
import matplotlib.pyplot as plt
import os, glob
import pickle
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import confusion_matrix, classification_report
import itertools

In [None]:
def plot_confusion_matrix(cm, classes, normalize=False, name='lstm_conf'):

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    print(name)
    plt.figure(figsize=(30, 30))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion matrix', size=20)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90, fontsize=20)
    plt.yticks(tick_marks, classes, fontsize=20)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black", fontsize=20)
    plt.tight_layout()
    plt.ylabel('True label', fontsize=20)
    plt.xlabel('Predicted label', fontsize=20)
    plt.savefig(f'{name}.png')


def multi_modal_seperate_network(output_shape, input_shape_pose, input_shape_lip):

    hand_pose_input = Input(shape=input_shape_pose)
    hand_pose_lstm = LSTM(64, return_sequences=True)(hand_pose_input)
    hand_pose_lstm = LSTM(64)(hand_pose_lstm)
    hand_pose_lstm = Dropout(0.5)(hand_pose_lstm)
    
    # Lip sub-network
    lip_input = Input(shape=input_shape_lip)
    lip_lstm = LSTM(64, return_sequences=True)(lip_input)
    lip_lstm = LSTM(64)(lip_lstm)
    lip_lstm = Dropout(0.5)(lip_lstm)
    
    # Combine the outputs
    combined = Concatenate()([hand_pose_lstm, lip_lstm])
    
    # Final classification layer
    output = Dense(output_shape, activation='softmax')(combined)
    
    # Define and compile the model
    model = Model(inputs=[hand_pose_input, lip_input], outputs=output)
    return model




In [1]:
### dataset prepare

In [None]:
folder = 'NEW_MANUAL'
# print(os.listdir(folder))

npys = dict()
for fol_singer in os.listdir(folder):
    for fol_sign in os.listdir(f'{folder}//{fol_singer}'):
        if fol_sign != 'P2':
            if os.path.isdir(f'{folder}//{fol_singer}//{fol_sign}'):
                for file in glob.glob(f'{folder}//{fol_singer}//{fol_sign}//*.npy'):
                    np_arr = np.load(file)
                    if fol_sign in npys.keys():
                        npys[fol_sign].append(np_arr)
                    else:
                        npys[fol_sign] = [np_arr]


data = npys.copy() # pickle.load(open('coorrds_2.pkl', 'rb'))
data_labels = {v: k for k, v in enumerate(list(data.keys()))}

print('len: ', len(data.keys()))
print('Classes', list(data.keys()))


info = {}
# number of frame for process x-10:x+50
c, b = 10, 50
# lip coords
lip_indexes = [0, 13, 14, 17, 37, 39, 40, 61, 78, 80, 81, 82, 84, 87, \
               88, 91, 95, 146, 178, 181, 185, 191, 267, 269, 270, 291, \
               308, 310, 311, 312, 314, 317, 318, 321, 324, 375, 402, 405, \
               409, 415]

general_data = {}
general_lip_data = {}

for let in data.keys():
    let_sequence = []
    let_lip_sequence = []
    for j in range(len(data[let])):
        sequence = []
        lip_sequence = []
        for i in range(data[let][j].shape[0]):
            if np.any(data[let][j][i][-21*3*2:]):
                # print(i)
                break
        if i-c > -1:
            for ret in data[let][j][i-c:i+b]:
                sequence.append(np.append(ret[:132], ret[-126:]))
                
                face_landmarks = ret[33*4:-(21*3*2)].reshape(468, 3)
                lip_coords = []
                for idx in lip_indexes:
                    lip_coords.append((face_landmarks[idx]))
                lip_coords = np.array(lip_coords).flatten()
                lip_sequence.append(lip_coords)
            # for i in range(data[let][j].shape[0]):
            #     # if np.any(data[let][j][i,-126:]):
            #     sequence.append(np.append(data[let][j][i,:132], data[let][j][i,-126:]))
            let_sequence.append(np.array(sequence))
            let_lip_sequence.append(np.array(lip_sequence))
            a = let_sequence[-1].shape
            if a in info.keys():
                info[a] +=1
            else:
                info[a] =1
    general_data[let] = let_sequence
    general_lip_data[let] = let_lip_sequence

In [None]:
final_data = []
final_label = []

final_data_lip = []
final_label_lip = []

for let in general_data.keys():
    for obj in general_data[let]:
        # if obj.shape[0] >= 30 and obj.shape[0] <= 45:
        if obj.shape[0] >= c+b:
            final_data.append(obj[:c+b])
            final_label.append(let)
    for obj in general_lip_data[let]:
        # if obj.shape[0] >= 30 and obj.shape[0] <= 45:
        if obj.shape[0] >= c+b:
            final_data_lip.append(obj[:c+b])
            final_label_lip.append(let)

final_label = np.array([data_labels[i] for i in final_label])
final_data = np.array(final_data)

final_label_lip = np.array([data_labels[i] for i in final_label_lip])
final_data_lip = np.array(final_data_lip)

In [None]:
output_shape = y_test.shape[1]
input_shape_pose = x_test.shape[1:]
input_shape_lip = x_test_lip.shape[1:]

In [None]:
# train 

In [None]:
with tf.device('/device:GPU:0'):
    model = multi_modal_seperate_network(output_shape, input_shape_pose, input_shape_lip)
    model.compile(optimizer=tf.keras.optimizers.legacy.Adam(),
                  loss='categorical_crossentropy',
                  metrics=['categorical_accuracy'])
    history = model.fit([x_train, x_train_lip], y_train, 
              epochs=1000, 
              batch_size=32, 
              validation_split=0.1,
              callbacks=[EarlyStopping(patience=100)])
    test_loss, test_acc = model.evaluate([x_test, x_test_lip], y_test)
    print(f'Test accuracy: {test_acc}')