# Intel D435 prediction cam codes

In [None]:
# Institution: Carleton University
# Course: OSS4900 Capstone 
# Term: F22 - W23
#
# Filename: 1 - MAIN - Intel D435_HG_recognition.ipynb
#
# Students: Adam Thompson, Philippe Beaulieu
# Advisor:  Dr. Marzieh Amini
#
# Description: This program load the Mediapipe .csv dataset to be used with the RGB stream, and
#              load the light CNN model to be used with the DEPTH stream, if the DEPTH prediction
#              falls under the threshold (here 60%) the mediapipe hand model result will be used.
#
#              In this project we use Mediapipe as support when the CNN model goes below a set threshold.
#     

setup

In [None]:
import pathlib
import os
import json

import pyrealsense2 as rs
import cv2

import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
import mediapipe as mp

import time
import pandas as pd

from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier # KNN

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential


import warnings
warnings.filterwarnings('ignore')


Load the trainedf model

In [None]:
TF_MODEL_FILE_PATH = 'model.tflite' # The default path to the saved TensorFlow Lite model

interpreter = tf.lite.Interpreter(model_path=TF_MODEL_FILE_PATH)

interpreter.allocate_tensors()         # Needed before execution!
interpreter.get_signature_list()

signatures = interpreter.get_signature_list()
print(signatures)


In [None]:
mp_hands          = mp.solutions.hands
mp_drawing        = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

data    = pd.read_csv("dataset3.csv",index_col=0) 
X,Y     = data.iloc[:,:63],data['target']

#{using SVC}
#model   = SVC(kernel = 'rbf')

#{using K Neighbors Classifier}  - good standing and sitting
model   = KNeighborsClassifier() # Initialize our classifier
model.fit(X,Y)

with open("class_name.json", 'r') as f:
    class_name = json.load(f)

print(class_name)


Using Intel D435 pipeline input for images

In [None]:
# Redefining class name for controls
class_name = ['Volume Up', 'Volume Down', 'Previous Song', 'Play / Pause', 'Next Song']

# Label variable
asan  = ""
asan2 = ""

#img_height = 270
#img_width = 480
img_height = 120
img_width  = 160

# Colors.
blue        = (255, 127, 0)
green       = (127, 255, 0)
dark_blue   = (127, 20, 0)
light_green = (127, 233, 100)
yellow      = (0, 255, 255)
pink        = (255, 0, 255)
red         = (50, 50, 255)
colors = green
color2 = colors

# Configure depth and color streams
pipeline = rs.pipeline()
config = rs.config()

# Get device product line for setting a supporting resolution
pipeline_wrapper = rs.pipeline_wrapper(pipeline)
pipeline_profile = config.resolve(pipeline_wrapper)
device = pipeline_profile.get_device()
device_product_line = str(device.get_info(rs.camera_info.product_line))

found_rgb = False
for s in device.sensors:
    if s.get_info(rs.camera_info.name) == 'RGB Camera':
        found_rgb = True
        break
if not found_rgb:
    print("This code requires Depth camera with Color sensor")
    exit(0)

config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

if device_product_line == 'L500':
    config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 30)
else:
    config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

# Start streaming
pipeline.start(config)

try:
    
    with mp_hands.Hands(max_num_hands=1,
                        model_complexity=1, 
                        min_detection_confidence=0.6, 
                        min_tracking_confidence=0.6) as hands:
        
        while True:

            # Wait for a coherent pair of frames: depth and color
            frames = pipeline.wait_for_frames()
            depth_frame = frames.get_depth_frame()
            color_frame = frames.get_color_frame()
            if not depth_frame or not color_frame:
                continue

            decimation = rs.decimation_filter()
            decimation.set_option(rs.option.filter_magnitude, 1)
            decimated_depth = decimation.process(depth_frame)

            spatial = rs.spatial_filter()
            spatial.set_option(rs.option.filter_magnitude, 5)
            spatial.set_option(rs.option.filter_smooth_alpha, 1)
            spatial.set_option(rs.option.filter_smooth_delta, 50)
            filtered_depth = spatial.process(decimated_depth)    
                   
            hole_filling = rs.hole_filling_filter()
            filled_depth = hole_filling.process(depth_frame)
                
            #preparing the stream captured images
            # Convert images to numpy arrays
            depth_image = np.asanyarray(filled_depth.get_data())
            color_image = np.asanyarray(color_frame.get_data())

            # Apply colormap on depth image (image must be converted to 8-bit per pixel first)
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.15), cv2.COLORMAP_JET)

            depth_colormap_dim = depth_colormap.shape
            color_colormap_dim = color_image.shape


            
            # CNN prediction processing section    
            # processing image for prediction    
            img_array = tf.keras.utils.img_to_array(cv2.resize(depth_colormap, (img_height, img_width)))
            #img_array = tf.keras.utils.img_to_array(cv2.resize(images, (img_height, img_width)))
            img_array = tf.expand_dims(img_array, 0) # Create a batch

            classify_lite = interpreter.get_signature_runner('serving_default')
# l input   #predictions_lite = classify_lite(sequential_1_input=input)['outputs']
            predictions_lite = classify_lite(rescaling_input=img_array)['dense_4']
            score_lite = tf.nn.softmax(predictions_lite)
                
            # set the color of the text for action predicted
            if (np.argmax(score_lite) == 0):
                colors = green
            elif (np.argmax(score_lite) == 1):
                colors = light_green
            elif (np.argmax(score_lite) == 2):
                colors = yellow
            elif (np.argmax(score_lite) == 3):
                colors = pink
            else:
                colors = red 

            # print the class on the image
            #print("{} at {:.2f}%".format(class_name[np.argmax(score_lite)], 100 * np.max(score_lite)))
            asan = 'CNN ' + class_name[np.argmax(score_lite)]     # extract the class name to be displayed            

                
            # Mediapipe KNN prediction processing section              
            results = hands.process(color_image)
            ttemp   = []
            if results.multi_hand_landmarks:
                for hand_landmarks in results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(color_image, hand_landmarks, mp_hands.HAND_CONNECTIONS,
                                              mp_drawing_styles.get_default_hand_landmarks_style(),
                                              mp_drawing_styles.get_default_hand_connections_style())

                    for point in mp_hands.HandLandmark:
                        nLandmark = hand_landmarks.landmark[point]
                        ttemp = ttemp + [nLandmark.x, nLandmark.y, nLandmark.z]

                # KNN prediction
                preds = model.predict([ttemp])
                asan2 = 'KNN ' + class_name[int(preds)]
                if (int(preds) == 0):
                    color2 = green
                elif (int(preds) == 1):
                    color2 = light_green
                elif (int(preds) == 2):
                    color2 = yellow
                elif (int(preds) == 3):
                    color2 = pink
                else:
                    color2 = red 

                #cv2.putText(color_image, asan2, (50,50), cv2.FONT_HERSHEY_SIMPLEX,1,color2,3)


            # for a single extimation result, use the codes below
            if ((100 * np.max(score_lite)) < 60):
                asan = asan2
                colors = color2

            #cv2.putText(depth_colormap, asan, (50,50), cv2.FONT_HERSHEY_SIMPLEX,1,colors,3)            
            #cv2.putText(images, asan, (50,50), cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,0),3)
            #cv2.putText(color_image, asan, (400,50), cv2.FONT_HERSHEY_SIMPLEX,1,colors,3)
            cv2.putText(color_image, asan, (300,50), cv2.FONT_HERSHEY_SIMPLEX,1,colors,3)
            
            # If depth and color resolutions are different, resize color image to match depth image for display
            if depth_colormap_dim != color_colormap_dim:
                resized_color_image = cv2.resize(color_image, dsize=(depth_colormap_dim[1], depth_colormap_dim[0]), interpolation=cv2.INTER_AREA)
                images = np.hstack((resized_color_image, depth_colormap))
            else:
                images = np.hstack((color_image, depth_colormap))            

            
            # Show images 
            cv2.namedWindow('RealSense', cv2.WINDOW_AUTOSIZE)
            cv2.imshow('RealSense', images)

            key = cv2.waitKey(1)
            if key & 0xFF == ord('q') or key == 27:
                cv2.destroyAllWindows()
                break

finally:

    # Stop streaming
    pipeline.stop()
    