In [None]:
import tensorflow as tf
tf.test.is_gpu_available(
    cuda_only = False, min_cuda_compute_capability=None)

tf.config.list_physical_devices('GPU')

gpu_available = tf.test.is_gpu_available()
is_cuda_gpu_available = tf.test.is_gpu_available(cuda_only=True)
is_cuda_gpu_min_3 = tf.test.is_gpu_available(True, (3,0))


In [None]:
import tensorflow as tf
print(tf.test.is_gpu_available())
print(tf.test.is_built_with_cuda())


In [None]:
import tensorflow as tf
import sklearn
import cv2
import numpy as np
import os 
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import pandas as pd

In [None]:
#holistic model 
mp_holistic = mp.solutions.holistic

# Drawing Utilities
mp_drawing = mp.solutions.drawing_utils

def mediapipe_detection(image, model):
    #Color conversion
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    #Image is no longer writable
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    
def draw_styled_landmarks(image, results):
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,110,76), thickness=8, circle_radius=8), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=8, circle_radius=4)
                             ) 

def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(23*3)
    return pose



In [None]:
os.getcwd()

os.chdir('D:\Final Year Project\Dataset')

os.getcwd()

# changing directory 
path = os.path.join(r'D:\Final Year Project\Dataset')

#making directory for temporary data
os.mkdir(os.path.join(r'D:\Final Year Project\TempExtractDataFP'))


# iterating over multiple files 
for folder in (os.listdir(path)):
    for file_num in range(len(os.listdir(folder))):
        np.array(os.makedirs(os.path.join(r'D:\Final Year Project\TempExtractDataFP\{folder}\{file}').format(folder = folder, file = file_num + 1)))


In [None]:
os.getcwd()
os.chdir('D:\Final Year Project\Classified Data')
os.getcwd()

video_temp = os.path.join(r'D:\Final Year Project\Classified Data')

for folder in (os.listdir(video_temp)):
    #print(folder)
    for file in (os.listdir(folder)):
        #print(file)
        cap = cv2.VideoCapture(os.path.join(video_temp, folder, file))
        total_frame = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if total_frame >= 160 :
            print(folder, file)

data_path = os.path.join(r'D:\Final Year Project\TempExtractDataFP')

for folder in os.listdir(data_path):
    print(folder)
    for file in os.listdir(folder):
        print(file)
        frame_size = len(os.listdir(os.path.join(data_path, folder, file)))
        desired_size = 154
        if frame_size < desired_size:
            new_frame = desired_size - frame_size
            keypoints = np.zeros(1503)
            for frame_index in range(1, new_frame + 1, 1):
                npy_path = os.path.join(r'D:\Final Year Project\TempExtractDataFP\{}\{}\{}').format(folder, file, frame_size + frame_index)
                np.save(npy_path, keypoints)
 


In [None]:
# video path
video_temp = os.path.join(r'D:\Final Year Project\Dataset')

# data path
temp_data1 = os.path.join(r'D:\Final Year Project\ExtractData')

for folder in (os.listdir(video_temp)):
    file_num = 0
    print(folder)
    for file in os.listdir(os.path.join(video_temp, folder)):
        file_num = file_num + 1
        print(file)
        
        cap = cv2.VideoCapture(os.path.join(dataset, folder, file))
        total_frame = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        # set mediapipe model 
        with mp_holistic.Holistic(min_detection_confidence = 0.7, min_tracking_confidence = 0.5) as holistic:
            for frame_index in range(1,total_frame,1):
                
                # read frame
                ret, frame = cap.read()
                
                # make detections
                image, results = mediapipe_detection(frame, holistic)
                
                # Draw landmarks
                draw_styled_landmarks(image, results)
                
                # Export Keypoints
                keypoints = extract_keypoints(results)
                #print(keypoints.shape)
                npy_path = os.path.join(r'D:\Final Year Project\ExtractData\{folder}\{file_num}\{frame_index}').format(folder = folder, file_num = file_num, frame_index=frame_index)
                np.save(npy_path, keypoints)

        cap.release()
        cv2.destroyAllWindows()
    
        


In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
actions = np.array(['Highly Engaged', 'Moderate Engaged', 'Not Engaged', 'Undefined'])


label_map = {label: num for num, label in enumerate(actions)}

In [None]:
sequences, labels = [], []

for action in actions:
    print(action)
    for file in np.array(os.listdir(os.path.join(r'D:\Final Year Project\Data\ExtractData\{}').format(action))).astype(int):
        print(file)
        total_frame = 154
        window = []
        for frame_index in range(1,total_frame + 1,1):
            result = np.load(os.path.join(r'D:\Final Year Project\Data\ExtractData\{}\{}\{}.npy').format(action, file,frame_index))
            window.append(result)
        sequences.append(window)
        labels.append(label_map[action])

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size = 0.25)

In [None]:
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Dense

In [None]:
model = Sequential()
model.add(tf.keras.layers.Conv1D(128, 3 ,activation='relu',input_shape = (154, 69)))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Dropout(.25))
model.add(tf.keras.layers.Conv1D(256, 3, activation='relu'))
model.add(tf.keras.layers.Dropout(.40))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Flatten())
model.add(Dense(256, activation='relu'))
model.add(tf.keras.layers.Dropout(.50))
model.add(Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(.50))
model.add(Dense(64, activation='relu'))
model.add(tf.keras.layers.Dropout(.50))
model.add(Dense(4, activation='softmax'))

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# Compile model
history = model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['categorical_accuracy'])
# Fit the model
history = model.fit(X, Y, validation_split=0.25, epochs=100, batch_size = 64, verbose=1)
# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('categorical_accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

yhat = model.predict(X_test)

ytrue = np.argmax(Y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

accuracy_score(ytrue, yhat)

from sklearn.metrics import roc_auc_score
roc_auc_score(ytrue,model.predict(X_test), multi_class = 'ovr')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn

ax = seaborn.set(color_codes=True)
plt.figure(1, figsize=(9, 6))
#ax= plt.subplot()
cm = confusion_matrix(ytrue, yhat)
sns.heatmap(cm, annot=True, fmt='g', ax=ax);  #annot=True to annotate cells, ftm='g' to disable scientific notation


# labels, title and ticks
ax.set_xlabel('Predicted labels');ax.set_ylabel('True labels'); 
ax.set_title('Confusion Matrix'); 
ax.xaxis.set_ticklabels(['0','1','2' '3']); ax.yaxis.set_ticklabels(['0', '1', '2', '3']);

In [None]:
loss_and_metrics = model.evaluate(X_test, Y_test, verbose=1)

print("Test Loss", loss_and_metrics[0])
print("Test Accuracy", loss_and_metrics[1])

In [None]:
model = Sequential()
model.add(tf.keras.layers(64, activation='relu', input_shape = (None, 10626)))
model.add(tf.keras.layers.Dropout(.25))
model.add(tf.keras.layers(128, activation='relu'))
model.add(tf.keras.layers.Dropout(.25))
model.add(tf.keras.layers(256, activation='relu'))
model.add(tf.keras.layers.Dropout(.25))
model.add(Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(.25))
model.add(Dense(32, activation='relu'))
model.add(Dense(4, activation='softmax'))

# Compile model
history = model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['categorical_accuracy'])
# Fit the model
history = model.fit(X, Y, validation_split=0.25, epochs=100, batch_size = 64, verbose=1)
# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('categorical_accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
loss_and_metrics = model.evaluate(X_test, Y_test, verbose=1)

print("Test Loss", loss_and_metrics[0])
print("Test Accuracy", loss_and_metrics[1])

In [None]:
model = Sequential()
model.add(tf.keras.layers.Conv1D(64, 3 ,activation ='tanh',input_shape = (154, 99)))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Dropout(.25))
model.add(tf.keras.layers.Conv1D(128, 3, activation ="tanh"))
model.add(tf.keras.layers.Dropout(.25))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Conv1D(256, 3,activation='tanh'))
model.add(tf.keras.layers.Dropout(.25))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2, strides=2, padding='valid'))
model.add(tf.keras.layers.Flatten())
model.add(Dense(256, activation='relu'))
model.add(tf.keras.layers.Dropout(.25))
model.add(Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(.25))
model.add(Dense(64, activation='relu'))
model.add(tf.keras.layers.Dropout(.25))
model.add(Dense(4, activation='softmax'))

In [None]:
# Compile model
history = model.compile(loss='categorical_crossentropy', optimizer='RMSprop', metrics=['categorical_accuracy'])
# Fit the model
history = model.fit(X, Y, validation_split=0.25, epochs=100, batch_size = 64, verbose=1)
# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('categorical_accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
loss_and_metrics = model.evaluate(X_test, Y_test, verbose=1)

print("Test Loss", loss_and_metrics[0])
print("Test Accuracy", loss_and_metrics[1])

In [None]:
model.save("engagement.h5")

from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, classification_report

yhat = model.predict(X_test)

ytrue = np.argmax(Y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

print(classification_report(ytrue, yhat))

In [None]:
from sklearn.metrics import roc_auc_score
roc_auc_score(ytrue,model.predict(X_test), multi_class = 'ovr')

In [None]:
labels = {0 : "Highly Engaged", 1 : "Moderate Engaged", 2: "Not Engaged", 3: "Undefined"}
yhat = model.predict(X_test)
predicted_label = np.argmax(yhat, axis=1).tolist()
ytrue = np.argmax(Y_test, axis=1).tolist()

for label, true_label in zip(predicted_label, range(true_labels)):
    print(f"the true label is {ytrue[true_label]} And The predicted engagement label is {labels[label]}")

In [None]:
data = confusion_matrix(ytrue, yhat)
plt.figure(1, figsize=(12, 9))
DetaFrame_cm = pd.DataFrame(data, range(4), range(4))
ax = seaborn.heatmap(DetaFrame_cm, annot=True, cmap='Greens_r',fmt='d', cbar=True, linewidths=3, linecolor='g', square=True, xticklabels=['Highly Engaged','Moderate Engaged','Not Engaged','Undefined'], yticklabels=['Highly Engaged','Moderate Engaged','Not Engaged','Undefined'])

ax.set(ylabel="True Label", xlabel="Predicted Label")
 
plt.savefig('confusion_matrix.png', bbox_inches='tight', dpi=600)
plt.show()
plt.close()

In [None]:
model = Sequential()
model.add(LSTM(128, return_sequences = True, activation='tanh', input_shape=(154, 99)))
model.add(LSTM(512, return_sequences=True, activation='tanh'))
model.add(tf.keras.layers.Dropout(.25))
model.add(LSTM(512, return_sequences=False, activation='tanh'))
model.add(Dense(256, activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(actions.shape[0],activation='softmax'))

In [None]:
# Compile model
history = model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['categorical_accuracy'])
# Fit the model
history = model.fit(X, Y, validation_split=0.25, epochs=100, batch_size = 64, verbose=1)
# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('categorical_accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# convert the history.history dict to a pandas DataFrame:     
hist_df = pd.DataFrame(history.history) 

hist_csv_file = 'LSTM_history4.csv'
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
data = confusion_matrix(y_pred, ytrue)
'''
DetaFrame_cm = pd.DataFrame(data, range(4), range(4))
seaborn.heatmap(DetaFrame_cm, annot=True)
plt.show()
'''
plt.figure(1, figsize=(12, 9))
DetaFrame_cm = pd.DataFrame(data, range(4), range(4))
ax = seaborn.heatmap(DetaFrame_cm, annot=True, cmap='Greens_r',fmt='d', cbar=True, linewidths=3, linecolor='g', square=True, xticklabels=['Highly Engaged','Moderate Engaged','Not Engaged','Undefined'], yticklabels=['Highly Engaged','Moderate Engaged','Not Engaged','Undefined'])

ax.set(ylabel="True Label", xlabel="Predicted Label")
 
plt.savefig('confusion_matrix.png', bbox_inches='tight', dpi=600)
plt.show()
plt.close()