# Dependencies

In [44]:
# %pip install opencv-python
# %pip install mediapipe
# %pip install tensorflow
# %pip install matplotlib
# %pip install scikit-learn
# %pip install pillow

In [45]:
import cv2
import mediapipe as mp
import numpy as np
import os
from matplotlib import pyplot as plt
import winsound
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from scipy import stats
from __future__ import print_function
import torch
import torch.nn.functional as F
from torchvision import transforms



# Feature Extraction
<img src="https://www.researchgate.net/profile/Jungpil-Shin/publication/362351225/figure/fig3/AS:1183521695645696@1659184969422/Mediapipe-detects-33-nodes-of-the-human-pose.ppm" width="400px">

In [46]:
#Feed Input Source
feed = 1

In [47]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [48]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [49]:
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections

In [50]:
#Silhouette V1
def sil_preprocess(frame,fgbg,blurValue,binThreshold):

    #Background Subtraction
    silhouette = fgbg.apply(frame)

    #Noise Reduction
    blur = cv2.medianBlur(silhouette, blurValue)
    # blur = cv2.GaussianBlur(frame, (blurValue, blurValue), 0)

    #Binarization
    _, binary_mask = cv2.threshold(blur, binThreshold, 255, cv2.THRESH_BINARY)
    
    return binary_mask

In [51]:
# Load pretrained model
model = torch.hub.load('pytorch/vision:v0.6.0', 'deeplabv3_resnet101', pretrained=True)
# Segment people only for the purpose of human silhouette extraction
people_class = 15

# Evaluate model
model.eval()
print ("Model has been loaded.")

blur = torch.FloatTensor([[[[1.0, 2.0, 1.0],[2.0, 4.0, 2.0],[1.0, 2.0, 1.0]]]]) / 16.0

# Use GPU if supported, for better performance
if torch.cuda.is_available():
	model.to('cuda')
	blur = blur.to('cuda')
	
# Apply preprocessing (normalization)
preprocess = transforms.Compose([
	transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

Using cache found in C:\Users\lance/.cache\torch\hub\pytorch_vision_v0.6.0


Model has been loaded.


In [52]:
#Silhouette V2
def makeSegMask(img):
    # Scale input frame
	frame_data = torch.FloatTensor( img ) / 255.0

	input_tensor = preprocess(frame_data.permute(2, 0, 1))
    
    # Create mini-batch to be used by the model
	input_batch = input_tensor.unsqueeze(0)

    # Use GPU if supported, for better performance
	if torch.cuda.is_available():
		input_batch = input_batch.to('cuda')

	with torch.no_grad():
		output = model(input_batch)['out'][0]

	segmentation = output.argmax(0)

	bgOut = output[0:1][:][:]
	a = (1.0 - F.relu(torch.tanh(bgOut * 0.30 - 1.0))).pow(0.5) * 2.0

	people = segmentation.eq( torch.ones_like(segmentation).long().fill_(people_class) ).float()

	people.unsqueeze_(0).unsqueeze_(0)
	
	for i in range(3):
		people = F.conv2d(people, blur, stride=1, padding=1)

	# Activation function to combine masks - F.hardtanh(a * b)
	combined_mask = F.relu(F.hardtanh(a * (people.squeeze().pow(1.5)) ))
	combined_mask = combined_mask.expand(1, 3, -1, -1)

	res = (combined_mask * 255.0).cpu().squeeze().byte().permute(1, 2, 0).numpy()
	
	_,thresh = cv2.threshold(res,127,255,cv2.THRESH_BINARY)

	return thresh

In [53]:
cap = cv2.VideoCapture(feed)
fgbg = cv2.createBackgroundSubtractorKNN()
# fgbg = cv2.createBackgroundSubtractorKNN(history=200, dist2Threshold=30)
with  mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened(): 
        ret, frame = cap.read()

        #Silhouette V1
        blurValue = 13
        binThreshold = 128

        binary_mask = sil_preprocess(frame,fgbg,blurValue,binThreshold)
        cv2.imshow('Gait ID (Silhouette V1)', binary_mask)

        #Silhouette V2
        sil = makeSegMask(frame)
        cv2.imshow('Gait ID (Silhouette V2)', sil)

        #RGB
        image, results = mediapipe_detection(frame, holistic)
        draw_landmarks(image, results)
        cv2.imshow('Gait ID (RBG)', image)
        
        if cv2.waitKey(10) & 0xFF == 27:
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    return pose

In [None]:
# result_test = extract_keypoints(results)
# result_test

# Setup Folders

In [None]:
# Path for exported data, numpy arrays
# DATA_PATH = os.path.join('Temp_Dataset') 
DATA_PATH = os.path.join('Custom_Gait_Dataset_V.4')
EXPORTED = os.path.join('Exported')

# Persons that we try to detect
persons = np.array(['Hamza Tharwat','Lance Moheb','Mina Nady','Mohmed Youssef','Omar Amin','Romario Nagy','Seif Ibrahim','Ziad Diaa'])
# persons = np.array(['Lance Moheb'])

# 10 videos worth of data
no_sequences = 10

# Videos are going to be 30 frames in length
sequence_length = 90

persons_len = len(persons)
total_sequences = persons_len*no_sequences

# Capturing Dataset

In [None]:
# for person in persons:
#     for sequence in range(no_sequences):
#         try: 
#             os.makedirs(os.path.join(EXPORTED, person, str(sequence)))
#         except:
#             pass

In [None]:
# cap = cv2.VideoCapture(feed)
# # Set mediapipe model 
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
#     # NEW LOOP
#     # Loop through persons
#     for person in persons:
#         for sequence in range(no_sequences):
#             for frame_num in range(sequence_length):

#                 # Read feed
#                 ret, frame = cap.read()

#                 # Make detections
#                 image, results = mediapipe_detection(frame, holistic)

#                 # Draw landmarks
#                 draw_landmarks(image, results)
                
#                 # NEW Apply wait logic
#                 if frame_num == 0: 
#                     cv2.putText(image, 'STARTING COLLECTION', (120,200), 
#                             cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 1, cv2.LINE_AA)
#                     cv2.putText(image, 'Collecting frames for {} Video, frame Number #{}#'.format(person, sequence), (15,12), 
#                             cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
#                     winsound.Beep(1000,1000)
#                     cv2.waitKey(5000)
#                     winsound.Beep(1000,500)
#                     winsound.Beep(1000,500)
#                 else: 
#                     cv2.putText(image, 'Collecting frames for {} Video, frame Number #{}#'.format(person, sequence), (15,12), 
#                             cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
#                 # NEW Export keypoints
#                 keypoints = extract_keypoints(results)
#                 png_path = f"{DATA_PATH}/{person}/{sequence}/{frame_num}"
#                 img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), 'RGB')
#                 img.save(f'{png_path}.jpg')

                
#                 cv2.imshow('Gait ID', image)

#                 # Break gracefully
#                 if cv2.waitKey(10) & 0xFF == 27:
#                     break
#         if cv2.waitKey(10) & 0xFF == 27:
#             continue
#         else:
#             break
            
#     cap.release()
#     cv2.destroyAllWindows()

# Pre-Processing

* A) Importing Dataset

In [None]:
# label_map = {label:num for num, label in enumerate(persons)}
# print(label_map)

In [None]:
# sequences, labels = [], []
# for person in persons:
#     for sequence in np.array(os.listdir(os.path.join(DATA_PATH, person))).astype(int):
#         window = []
#         for frame_num in range(sequence_length):

#             image_path = os.path.join(DATA_PATH, person, str(sequence), f"{frame_num}.jpg")
#             print (image_path)
#             res = cv2.imread(image_path)
#             window.append(res)
#         sequences.append(window)
#         labels.append(label_map[person])

In [None]:
# seqq=np.array(sequences)
# print(seqq.shape)
# show_test = cv2.cvtColor(seqq[1][40], cv2.COLOR_BGR2RGB)
# plt.imshow(show_test)

* B) Converting to Silhouette Using:
    * Background Subtraction
    * Noise Reduction
    * Binarization

In [None]:
# exported = []
# fgbg = cv2.createBackgroundSubtractorKNN(detectShadows=False)
# # for sequence in  range(total_sequences):
# for sequence in  range(4):
#     for frame_num in range(sequence_length):

#         frame = cv2.cvtColor(seqq[sequence][frame_num], cv2.COLOR_BGR2RGB)

#         sil = makeSegMask(frame)

#         exported.append(sil)

In [None]:
# plt.imshow(exported[140],cmap='gray')

In [None]:
# # Assuming each person has 10 sequences, and each sequence has 90 frames
# total_sequences = 10
# sequence_length = 90

# # Define the base directory where the images will be saved
# base_dir = "image_exports"

# # Create the base directory if it doesn't exist
# if not os.path.exists(base_dir):
#     os.makedirs(base_dir)

# # Iterate through each person
# for person_name in persons:
#     person_dir = os.path.join(base_dir, person_name)
#     # Create a directory for the person if it doesn't exist
#     if not os.path.exists(person_dir):
#         os.makedirs(person_dir)
    
#     # Iterate through each sequence for the person
#     for sequence in range(total_sequences):
#         sequence_dir = os.path.join(person_dir, f"sequence_{sequence+1}")
#         # Create a directory for the sequence if it doesn't exist
#         if not os.path.exists(sequence_dir):
#             os.makedirs(sequence_dir)
        
#         # Iterate through each frame in the sequence
#         for frame_num in range(sequence_length):
#             # Generate the filename for the frame
#             filename = f"frame_{frame_num+1}.png"
#             filepath = os.path.join(sequence_dir, filename)
            
#             # Check if there are still images in the exported array
#             if len(exported) > 0:
#                 # Retrieve and save the next image from the exported array
#                 cv2.imwrite(filepath, exported.pop(0))
#             else:
#                 # If there are no more images in the exported array, break the loop
#                 break

* C) Removing Unwanted bLacks

In [None]:
# # Function to crop silhouette images to the person
# def crop_to_person(images):
#     cropped_images = []
#     for image in images:
#         # Find contours
#         contours, _ = cv2.findContours(image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
#         # Find the bounding box enclosing the contour
#         if contours:
#             x, y, w, h = cv2.boundingRect(contours[0])
#             cropped_image = image[y:y+h, x:x+w]
#         else:
#             # If no contours found, return original image
#             cropped_image = image
        
#         cropped_images.append(cropped_image)
    
#     return cropped_images
# cropped_images = crop_to_person(exported)


In [None]:
# plt.imshow(cropped_images[130],cmap='gray')

* E) GEI Generation

In [None]:
# def compute_gei(images):
#     # Convert images to numpy array
#     images_array = np.array(images)

#     # Compute GEI by averaging along the first axis (assuming images are stacked along the first axis)
#     gei = np.mean(images_array, axis=0)

#     return gei

In [None]:
# for frame in range(sequence_length):
#     temp = compute_gei(resized_images)
#     plt.imshow(temp,cmap='gray')

# Train

A) Splitting Dataset

In [None]:
# X = np.array(sequences)
# y = to_categorical(labels).astype(int)

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X, y)
# print(X_test.shape)
# print(y_test.shape)

B) Building CNN Architecture

In [None]:
# log_dir = os.path.join('Logs')
# tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dense
# from tensorflow.keras.callbacks import TensorBoard

In [None]:
# model = Sequential()
# model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(60,132)))
# model.add(LSTM(128, return_sequences=True, activation='relu'))
# model.add(LSTM(64, return_sequences=False, activation='relu'))
# model.add(Dense(64, activation='relu'))
# model.add(Dense(32, activation='relu'))
# model.add(Dense(persons.shape[0], activation='softmax'))

In [None]:
# #Compilation
# model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# model.fit(X_train, y_train, epochs=200, callbacks=[tb_callback])

C) Training Results

In [None]:
# model.summary()

In [None]:
# print("Evaluate on test data")
# results = model.evaluate(X_test, y_test, batch_size=128)
# print("test loss, test acc:", results)

In [None]:
# res = model.predict(X_test)

In [None]:
# persons[np.argmax(res[4])]

In [None]:
# persons[np.argmax(y_test[4])]

In [None]:
# model.save('train4.h5')

In [None]:
# yhat = model.predict(X_test)

In [None]:
# ytrue = np.argmax(y_test, axis=1).tolist()
# yhat = np.argmax(yhat, axis=1).tolist()

# multilabel_confusion_matrix(ytrue, yhat)

# accuracy_score(ytrue, yhat)

# Detection

In [None]:
# colors = [(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16),(245,117,16)]
# def prob_viz(res, actions, input_frame, colors):
#     output_frame = input_frame.copy()
#     for num, prob in enumerate(res):
#         cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
#         cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 1, cv2.LINE_AA)
        
#     return output_frame

In [None]:
# # 1. New detection variables
# sequence = []
# sentence = []
# predictions = []
# threshold = 0.3

# cap = cv2.VideoCapture(feed)
# # Set mediapipe model
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#     while cap.isOpened():

#         # Read feed
#         ret, frame = cap.read()

#         # Make detections
#         image, results = mediapipe_detection(frame, holistic)
#         print(results)
        
#         # Draw landmarks
#         draw_landmarks(image, results)
        
#         # 2. Prediction logic
#         keypoints = extract_keypoints(results)
#         sequence.append(keypoints)
#         sequence = sequence[-60:]
        
#         if len(sequence) == 60:
#             res = model.predict(np.expand_dims(sequence, axis=0))[0]
#             print(persons[np.argmax(res)])
#             predictions.append(np.argmax(res))
            
            
#         #3. Viz logic
#             if np.unique(predictions[-10:])[0]==np.argmax(res): 
#                 if res[np.argmax(res)] > threshold: 
                    
#                     if len(sentence) > 0: 
#                         if persons[np.argmax(res)] != sentence[-1]:
#                             sentence.append(persons[np.argmax(res)])
#                     else:
#                         sentence.append(persons[np.argmax(res)])

#             if len(sentence) > 5: 
#                 sentence = sentence[-5:]

#             # Viz probabilities
#             image = prob_viz(res, persons, image, colors)
            
#         cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
                
#         cv2.putText(image, ' '.join(sentence), (3,30), 
#                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
        
#         # Show to screen
#         cv2.imshow('Gait ID', image)

#         # Break gracefully
#         if cv2.waitKey(10) & 0xFF == 27:
#             break
#     cap.release()
#     cv2.destroyAllWindows()