In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random
from collections import deque
import tensorflow.keras as keras

2024-06-27 08:15:02.926899: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-06-27 08:15:02.976491: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-06-27 08:15:02.977197: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Load the MoveNet model from TensorFlow Hub
movenet_model = hub.load("https://tfhub.dev/google/movenet/multipose/lightning/1")
movenet = movenet_model.signatures['serving_default']

In [3]:
# Define the labels for keypoints
label = ["nose", "left eye", "right eye", "left ear", "right ear",
         "left shoulder", "right shoulder", "left elbow", "right elbow",
         "left wrist", "right wrist", "left hip", "right hip",
         "left knee", "right knee", "left ankle", "right ankle"]

pose_label = ["standing", "sitting", "walking"]

score_threshold = 25

In [4]:
# Parameters for DQN
alpha = 0.001  # Learning rate
gamma = 0.9  # Discount factor
epsilon = 1.0  # Exploration rate
epsilon_min = 0.01
epsilon_decay = 0.995
batch_size = 32
memory_size = 2000

In [5]:

# Neural network for Q-value approximation
def build_model(input_shape, output_shape):
    model = keras.Sequential([
        keras.layers.Dense(64, input_shape=(input_shape,), activation='relu'),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dense(output_shape, activation='linear')
    ])
    model.compile(optimizer=keras.optimizers.Adam(lr=alpha), loss='mse')
    return model

In [6]:
# Experience replay buffer
class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)
    
    def add(self, experience):
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
        return [self.buffer[i] for i in idx]

In [7]:
# Function to perform pose estimation on a single frame
def estimate_pose(frame, movenet, label):

    original_height , original_width, _ = frame.shape

    input_image = tf.expand_dims(frame, axis=0)

    input_image = tf.cast(tf.image.resize_with_pad(input_image, 256, 256), dtype=tf.int32)
    _, resized_height , resized_width, _ = input_image.shape

    scale_factor_height = original_height / resized_height
    scale_factor_width = original_width / resized_width

    outputs = movenet(input_image)
    keypoints = outputs['output_0'].numpy()

    num_keypoints = keypoints.shape[1]  # Get the number of detected keypoints

    max_key , key_val = keypoints[0,:,55].argmax(), keypoints[0,:,55].max()

    max_points = keypoints[0,max_key,:]
    max_points = max_points*256
    max_points = max_points.astype(float)


    keypoints_dict = {}
    for i in range(0,len(max_points)-5,3):
        if(max_points[i+2] > score_threshold):
            max_points[i] = max_points[i] * scale_factor_height
            max_points[i+1] = max_points[i+1] * scale_factor_width
            keypoints_dict[label[i//3]] = [max_points[i+1].astype(int),max_points[i].astype(int),max_points[i+2]]

    return keypoints_dict, keypoints

In [8]:

# Function to draw the predicted keypoints and connections on the frame
def draw_pose(frame, keypoints_dict, predicted_label):
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    for key in keypoints_dict:
        x, y, score = keypoints_dict[key]
        if score > score_threshold:  # Only plot keypoints with score above 0.2
            cv2.circle(frame, (int(x), int(y)), 5, (0, 255, 255), -1)
            cv2.putText(frame, key, (int(x) + 5, int(y) + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1, cv2.LINE_AA)

    # Define connections between keypoints
    connections = [
        ('nose', 'left eye'), ('left eye', 'left ear'), ('nose', 'right eye'), ('right eye', 'right ear'),
        ('nose', 'left shoulder'), ('left shoulder', 'left elbow'), ('left elbow', 'left wrist'),
        ('nose', 'right shoulder'), ('right shoulder', 'right elbow'), ('right elbow', 'right wrist'),
        ('left shoulder', 'left hip'), ('right shoulder', 'right hip'), ('left hip', 'right hip'),
        ('left hip', 'left knee'), ('right hip', 'right knee'), ('left knee', 'left ankle'), ('right knee', 'right ankle')
    ]

    for start_key, end_key in connections:
        if start_key in keypoints_dict and end_key in keypoints_dict:
            start_point = keypoints_dict[start_key][:2]
            end_point = keypoints_dict[end_key][:2]
            cv2.line(frame, tuple(start_point), tuple(end_point), (0, 255, 255), 2)

    # Add the predicted label text to the frame
    cv2.putText(frame, f"Predicted Pose: {predicted_label}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    return frame


In [9]:
# Function to capture frames from the camera and perform pose estimation
def pose_estimation_camera(movenet, label, model, target_model, buffer, gamma, epsilon, epsilon_min, epsilon_decay, batch_size):    
    
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform pose estimation
        keypoints_dict, keypoints = estimate_pose(frame, movenet, label)


        state = keypoints[0].flatten().astype(int).tobytes()
        state = np.frombuffer(state, dtype=int).astype(float)
        
              # Randomly choose an action (label) based on epsilon-greedy policy
        if np.random.rand() < epsilon:
            action_idx = random.choice(range(len(pose_label)))
        else:
            q_values = model.predict(state[np.newaxis])
            action_idx = np.argmax(q_values[0])

        predicted_idx = action_idx
        print(action_idx)

        # Get the correct label from the user
        correct_idx = input("Enter the correct label: ")

        # Calculate the reward
        reward = 1 if predicted_idx == correct_idx else -1

        # Perform pose estimation on the next frame
        keypoints_next_dict, keypoints_next  = estimate_pose(frame, movenet, label)
        next_state = keypoints_next[0].flatten().astype(int).tobytes()
        next_state = np.frombuffer(next_state, dtype=int).astype(float)

        # Add experience to replay buffer
        buffer.add((state, action_idx, reward, next_state))

        # Update Q-values using mini-batch from replay buffer
        if len(buffer.buffer) >= batch_size:
            minibatch = buffer.sample(batch_size)
            for s, a, r, s_next in minibatch:
                target = r
                if not (s_next == 0).all():
                    target = r + gamma * np.amax(target_model.predict(s_next[np.newaxis])[0])
                target_f = model.predict(s[np.newaxis])
                target_f[0][a] = target
                model.fit(s[np.newaxis], target_f, epochs=1, verbose=0)

            # Update the target model
            target_model.set_weights(model.get_weights())

        # Decay epsilon
        if epsilon > epsilon_min:
            epsilon *= epsilon_decay

        # Draw keypoints and connections on the frame
        frame_with_pose = draw_pose(frame, keypoints_dict, pose_label[predicted_idx])

        # Display the frame with pose estimation
        cv2.imshow('Pose Estimation', frame_with_pose)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [10]:
# Initialize DQN
input_shape = 51  # Assuming 17 keypoints with (x, y, score) each
output_shape = len(pose_label)
model = build_model(input_shape, output_shape)
target_model = build_model(input_shape, output_shape)
target_model.set_weights(model.get_weights())



In [11]:
# Initialize replay buffer
buffer = ReplayBuffer(memory_size)

In [12]:
# Run the camera pose estimation function with DQN
pose_estimation_camera(movenet, label, model, target_model, buffer, gamma, epsilon, epsilon_min, epsilon_decay, batch_size)

1


qt.qpa.plugin: Could not find the Qt platform plugin "wayland" in "/home/utkarsh-ranjan/Documents/shibata-labs/shibata-lab-pose-estimation/movenet/py3.8/lib/python3.8/site-packages/cv2/qt/plugins"


1


: 