# Testing Model Notebook
---

## Import Depedencies

In [1]:
import mediapipe as mp
import tensorflow as tf
import tensorflow_addons as tfa

import torch
import torch.nn.functional as F

import pandas as pd

import numpy as np
import cv2
import h5py
import os
import time


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.9.0 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


## Function Helper

In [2]:
mp_holistic = mp.solutions.holistic

ROWS_PER_FRAME = 543
FIXED_FRAMES = 34

RH_IDX = 501
LH_IDX = 522
POSE_IDX = 468
FACE_IDX = 0

lips_UpperOuter = [185, 40, 39, 37, 0, 267, 269, 270, 409]
lips_LowerOuter = [61, 146, 91, 181, 84, 17, 314, 405, 321, 375, 291]
lips_UpperInner = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308]
lips_LowerInner = [191, 80, 81, 82, 13, 312, 311, 310, 415]
LIPS_IDX = np.concatenate(
    [lips_UpperOuter, lips_LowerOuter, lips_UpperInner, lips_LowerInner]
)

UPPER_BODY_IDX = np.arange(0, 25)

In [3]:
def mp_detection(frame, mp_model):
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame.flags.writeable = False
    landmarks = mp_model.process(frame)
    return landmarks

In [4]:
def zsc(data, mean, std):
    return (data - mean) / std

In [5]:
def preprocessing_landmark(landmarks, faceIDX = np.arange(0,468), poseIDX = np.arange(0,33), component = ['face', 'pose', 'right_hand', 'left_hand']):
    with h5py.File('param.h5','r') as hf:
    
        #Face
        if 'face' in component:
            if landmarks.face_landmarks:            
                face = np.array([[landmarks.face_landmarks.landmark[idx].x, 
                                landmarks.face_landmarks.landmark[idx].y, 
                                landmarks.face_landmarks.landmark[idx].z] 
                                for idx in faceIDX])
            else:
                face = np.zeros((len(faceIDX),3))
            
            face = np.array(zsc(
                face.T, np.array(hf.get('face/mean'))[:,faceIDX], np.array(hf.get('face/std'))[:,faceIDX]
            )).T.flatten()
        else:
            face = [None] * (len(faceIDX) * 3)

        #Pose
        if 'pose' in component:
            if landmarks.pose_landmarks:            
                pose = np.array([[landmarks.pose_landmarks.landmark[idx].x, 
                                landmarks.pose_landmarks.landmark[idx].y, 
                                landmarks.pose_landmarks.landmark[idx].z,
                                landmarks.pose_landmarks.landmark[idx].visibility] 
                                for idx in poseIDX])
            else:
                pose = np.zeros((len(poseIDX),4))
            
            pose = np.array(zsc(
                pose.T, np.array(hf.get('pose/mean'))[:,poseIDX], np.array(hf.get('pose/std'))[:,poseIDX]
            )).T.flatten()
        else:
            pose = [None] * (len(poseIDX) * 4)
        
        #Right Hand
        if 'right_hand' in component:
            if landmarks.right_hand_landmarks:            
                rh = np.array([[cord.x, cord.y, cord.z] for cord in landmarks.right_hand_landmarks.landmark])
            else:
                rh = np.zeros((21,3))
            
            rh = np.array(zsc(
                rh.T, np.array(hf.get('right_hand/mean')), np.array(hf.get('right_hand/std'))
            )).T.flatten()
        else:
            rh = [None] * (63)
        
        #Left Hand
        if 'left_hand' in component:
            if landmarks.left_hand_landmarks:            
                lh = np.array([[cord.x, cord.y, cord.z] for cord in landmarks.left_hand_landmarks.landmark])
            else:
                lh = np.zeros((21,3))
            
            lh = np.array(zsc(
                lh.T, np.array(hf.get('left_hand/mean')), np.array(hf.get('left_hand/std'))
            )).T.flatten()
        else:
            lh = [None] * (63)
        
        result = np.concatenate([face,pose,rh,lh])
    return result[result != np.array(None)].astype('float')

In [6]:
decoder = {0: 'Batuk',
           1: 'Demam',
           2: 'Gigi',
           3: 'Kepala',
           4: 'Minum',
           5: 'Obat',
           6: 'Perut',
           7: 'Resep',
           8: 'Sakit'}

In [7]:
def predict_perf(model, input):
    start = time.perf_counter()
    result = model.predict(input)
    elapsed = time.perf_counter() - start
    return result, elapsed

## Testing

In [8]:
lstm = [
    'models1/64/lstm_dset1_0.0001.h5',
    'models3/64/lstm_dset2_0.0001.h5',
    'models1/64/lstm_dset3_0.0001.h5',
    'models3/16/lstm_dset4_0.0001.h5', 
    'models2/32/lstm_dset5_0.0001.h5',
]

bilstm = [
    'models1/32/bilstm_dset1_0.0001.h5',
    'models2/64/bilstm_dset2_0.0001.h5',
    'models3/v2/64/bilstm_dset3_0.0001.h5',
    'models2/32/bilstm_dset4_0.0001.h5',
    'models3/32/bilstm_dset5_0.001.h5',
]

In [9]:
MAIN_PATH = f'test/{os.listdir("test/")[0]}'
MAIN_PATH

'test/age'

In [10]:
fPATH = f'{MAIN_PATH}/{os.listdir(MAIN_PATH)[0]}'
fPATH

'test/age/Batuk.MOV'

### Trained w/ Dataset 1

In [11]:
model1 = tf.keras.models.load_model(lstm[0], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
model2 = tf.keras.models.load_model(bilstm[0], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})

In [12]:
sequence_landmark = []
total_frame = 0

with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
    cap = cv2.VideoCapture(fPATH)
    
    while True:
        ret, frame = cap.read()
        if not ret: break
            
        mp_results = mp_detection(frame, holistic_model)
        sequence_landmark.append(preprocessing_landmark(mp_results))
        total_frame += 1
        
    cap.release()
    cv2.destroyAllWindows()

sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
if total_frame > FIXED_FRAMES:
    selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
    sequence_landmark = sequence_landmark[:,selected_idx,:]
elif total_frame < FIXED_FRAMES:
    sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
    sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()

result, elapsed = predict_perf(model1, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')
result, elapsed = predict_perf(model2, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')

perdict: Batuk 
elapsed: 14.970698899999661s

perdict: Batuk 
elapsed: 1.2157256000000416s



### Trained w/ Dataset 2

In [None]:
model1 = tf.keras.models.load_model(lstm[1], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
model2 = tf.keras.models.load_model(bilstm[1], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})

In [41]:
sequence_landmark = []
total_frame = 0

with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
    cap = cv2.VideoCapture(fPATH)
    
    while True:
        ret, frame = cap.read()
        if not ret: break
            
        mp_results = mp_detection(frame, holistic_model)
        sequence_landmark.append(preprocessing_landmark(mp_results, faceIDX=LIPS_IDX))
        total_frame += 1
        
    cap.release()
    cv2.destroyAllWindows()

sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
if total_frame > FIXED_FRAMES:
    selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
    sequence_landmark = sequence_landmark[:,selected_idx,:]
elif total_frame < FIXED_FRAMES:
    sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
    sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()

result, elapsed = predict_perf(model1, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')
result, elapsed = predict_perf(model2, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')

perdict: demam 
elapsed: 1.8414400000037858s

perdict: demam 
elapsed: 2.1403086000063922s

perdict: demam 
elapsed: 1.128532400005497s

perdict: sakit 
elapsed: 2.079010699999344s



### Trained w/ Dataset 3

In [None]:
model1 = tf.keras.models.load_model(lstm[2], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
model2 = tf.keras.models.load_model(bilstm[2], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})

In [42]:
sequence_landmark = []
total_frame = 0

with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
    cap = cv2.VideoCapture(fPATH)
    
    while True:
        ret, frame = cap.read()
        if not ret: break
            
        mp_results = mp_detection(frame, holistic_model)
        sequence_landmark.append(preprocessing_landmark(mp_results, poseIDX=UPPER_BODY_IDX))
        total_frame += 1
        
    cap.release()
    cv2.destroyAllWindows()

sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
if total_frame > FIXED_FRAMES:
    selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
    sequence_landmark = sequence_landmark[:,selected_idx,:]
elif total_frame < FIXED_FRAMES:
    sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
    sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()

result, elapsed = predict_perf(model1, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')
result, elapsed = predict_perf(model2, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')

perdict: kepala 
elapsed: 1.2758823000040138s

perdict: demam 
elapsed: 2.1433032999993884s

perdict: kepala 
elapsed: 1.1492311999973026s

perdict: obat 
elapsed: 2.127744099998381s



### Trained w/ Dataset 4

In [None]:
model1 = tf.keras.models.load_model(lstm[3], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
model2 = tf.keras.models.load_model(bilstm[3], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})

In [24]:
sequence_landmark = []
total_frame = 0

with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
    cap = cv2.VideoCapture(fPATH)
    
    while True:
        ret, frame = cap.read()
        if not ret: break
            
        mp_results = mp_detection(frame, holistic_model)
        sequence_landmark.append(preprocessing_landmark(mp_results, LIPS_IDX, UPPER_BODY_IDX))
        total_frame += 1
        
    cap.release()
    cv2.destroyAllWindows()

sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
if total_frame > FIXED_FRAMES:
    selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
    sequence_landmark = sequence_landmark[:,selected_idx,:]
elif total_frame < FIXED_FRAMES:
    sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
    sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()


result, elapsed = predict_perf(model1, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')
result, elapsed = predict_perf(model2, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')

perdict: batuk 
elapsed: 1.4101594000021578s

perdict: batuk 
elapsed: 2.687711599999602s

perdict: batuk 
elapsed: 1.1585707000012917s

perdict: batuk 
elapsed: 2.2014583999989554s



### Trained w/ Dataset 5

In [None]:
model1 = tf.keras.models.load_model(lstm[4], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
model2 = tf.keras.models.load_model(bilstm[4], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})

In [27]:
sequence_landmark = []
total_frame = 0

with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
    cap = cv2.VideoCapture(fPATH)
    
    while True:
        ret, frame = cap.read()
        if not ret: break
            
        mp_results = mp_detection(frame, holistic_model)
        sequence_landmark.append(preprocessing_landmark(mp_results, component=['right_hand','left_hand']))
        total_frame += 1
        
    cap.release()
    cv2.destroyAllWindows()

sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
if total_frame > FIXED_FRAMES:
    selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
    sequence_landmark = sequence_landmark[:,selected_idx,:]
elif total_frame < FIXED_FRAMES:
    sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
    sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()

result, elapsed = predict_perf(model1, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')
result, elapsed = predict_perf(model2, sequence_landmark)
print(f'perdict: {decoder[result.argmax()]} \nelapsed: {elapsed}s\n')

perdict: gigi 
elapsed: 1.3343352999982017s

perdict: gigi 
elapsed: 2.3062443000017083s

perdict: gigi 
elapsed: 1.1710155000000668s

perdict: gigi 
elapsed: 2.547043799997482s



## Purge

In [None]:
acc = []

for mod in np.arange(0,5):
    model3 = tf.keras.models.load_model(lstm[mod], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
    true = 0
    
    for p in os.listdir("test"):
        temp = []
        for vid in os.listdir(f'test/{p}'):
            fPATH = f'test/{p}/{vid}'
            sequence_landmark = []
            total_frame = 0

            with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
                cap = cv2.VideoCapture(fPATH) 
                
                while True:
                    ret, frame = cap.read()
                    if not ret: break
                        
                    mp_results = mp_detection(frame, holistic_model)
                    
                    if mod == 0:
                        sequence_landmark.append(preprocessing_landmark(mp_results))
                    elif mod == 1:
                        sequence_landmark.append(preprocessing_landmark(mp_results, faceIDX=LIPS_IDX))
                    elif mod == 2:
                        sequence_landmark.append(preprocessing_landmark(mp_results, poseIDX=UPPER_BODY_IDX))
                    elif mod == 3:
                        sequence_landmark.append(preprocessing_landmark(mp_results, LIPS_IDX, UPPER_BODY_IDX))
                    elif mod == 4:
                        sequence_landmark.append(preprocessing_landmark(mp_results, component=['right_hand','left_hand']))

                    total_frame += 1
                    
                cap.release()
                cv2.destroyAllWindows()

            sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
            if total_frame > FIXED_FRAMES:
                selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
                sequence_landmark = sequence_landmark[:,selected_idx,:]
            elif total_frame < FIXED_FRAMES:
                sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
                sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()   
            
            result, elapsed = predict_perf(model3, sequence_landmark)
            
            if decoder[result.argmax()] == vid.split('.')[0]:
                true += 1
            
    #         temp.append(decoder[result.argmax()])
    #     res.append(temp)
    # print(res)
    acc.append(true/90)
    
print(acc)

In [None]:
res = []

for mod in np.arange(0,4):
    model3 = tf.keras.models.load_model(bilstm[mod], custom_objects={'Addons>F1Score': tfa.metrics.F1Score(9)})
    
    temp = []
    for p in os.listdir("test"):
        
        for vid in os.listdir(f'test/{p}'):
            fPATH = f'test/{p}/{vid}'
            sequence_landmark = []
            total_frame = 0

            with mp_holistic.Holistic(min_detection_confidence=.5, min_tracking_confidence=.5) as holistic_model:
                cap = cv2.VideoCapture(fPATH) 
                
                while True:
                    ret, frame = cap.read()
                    if not ret: break
                        
                    mp_results = mp_detection(frame, holistic_model)
                    
                    if mod == 0:
                        sequence_landmark.append(preprocessing_landmark(mp_results))
                    elif mod == 1:
                        sequence_landmark.append(preprocessing_landmark(mp_results, faceIDX=LIPS_IDX))
                    elif mod == 2:
                        sequence_landmark.append(preprocessing_landmark(mp_results, poseIDX=UPPER_BODY_IDX))
                    elif mod == 3:
                        sequence_landmark.append(preprocessing_landmark(mp_results, LIPS_IDX, UPPER_BODY_IDX))
                    elif mod == 4:
                        sequence_landmark.append(preprocessing_landmark(mp_results, component=['right_hand','left_hand']))

                    total_frame += 1
                    
                cap.release()
                cv2.destroyAllWindows()

            sequence_landmark = np.expand_dims(np.array(sequence_landmark), axis=0)
            if total_frame > FIXED_FRAMES:
                selected_idx = np.linspace(0, total_frame-1, FIXED_FRAMES, dtype=int)
                sequence_landmark = sequence_landmark[:,selected_idx,:]
            elif total_frame < FIXED_FRAMES:
                sequence_landmark = torch.from_numpy(np.array(sequence_landmark))
                sequence_landmark = F.interpolate(sequence_landmark.permute(0,2,1), size=(FIXED_FRAMES), mode= 'nearest-exact').permute(0,2,1).numpy()   
            
            result, elapsed = predict_perf(model3, sequence_landmark)
            temp.append(decoder[result.argmax()])
            
    res.append(temp)

print(res)

In [None]:
for x in res:
    print(x)