This notebook implements a majority voting fusion strategy to combine predictions from both the DTW and LSTM models for hand trajectory classification. Instead of relying on a single model, the system decides the final predicted point by counting votes from both models and selecting the most frequent prediction.

As a hand trajectory is being captured in real-time, both DTW and LSTM models produce predictions continuously and the predictions are stored in separate lists (dtw_preds and lstm_preds). Once a sufficient number of predictions is collected (5 recent ones from each model), a combined list is formed. The most frequent label (the mode) from the combined predictions is selected as the final output.

In [1]:
from collections import defaultdict, deque, Counter
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
import numpy as np
import joblib
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
import cv2
import mediapipe as mp
import pyrealsense2 as rs
import time
from scipy.spatial.distance import euclidean

In [2]:
# Load trained LSTM model and label encoder
lstm_model = load_model("lstm_index_finger_model_new.keras")
label_encoder = joblib.load("label_encoder_new.pkl")

# Load DTW reference data
# Assume you already have X (padded sequences) and y (encoded labels)
# Example:
X = np.load("X_dtw_new.npy")
y = np.load("y_dtw_new.npy")

# Create DTW templates from training data

def setup_dtw_templates(X, y):
    reference_sequences = defaultdict(list)
    for i, label in enumerate(y):
        reference_sequences[label].append(X[i])
    return {label: seqs[0] for label, seqs in reference_sequences.items()}

templates = setup_dtw_templates(X, y)  # X and y need to be defined from your DTW data

# DTW classification
def classify_with_dtw(sequence, templates):
    best_label, best_distance = None, float('inf')
    for label, ref_seq in templates.items():
        dist, _ = fastdtw(sequence, ref_seq, dist=euclidean)
        if dist < best_distance:
            best_distance = dist
            best_label = label
    return best_label

# Combined majority vote prediction
def majority_vote_predict(sequence, templates, max_len=50):
    padded_seq = pad_sequences([sequence], maxlen=max_len, padding='post', truncating='post')

    # Predict with LSTM
    lstm_probs = lstm_model.predict(padded_seq, verbose=0)[0]
    lstm_pred = np.argmax(lstm_probs)

    # Predict with DTW
    dtw_pred = classify_with_dtw(sequence, templates)

    # Use majority voting logic
    if lstm_pred == dtw_pred:
        final = lstm_pred
    else:
        final = lstm_pred  # fallback strategy

    return label_encoder.inverse_transform([final])[0]


In [6]:
# ─────────────── Load Models and Encoders ───────────────
lstm_model = load_model("lstm_index_finger_model_new.keras")
label_encoder = joblib.load("label_encoder_new.pkl")
templates_X = np.load("X_dtw_new.npy")
templates_y = np.load("y_dtw_new.npy")

# DTW template setup
def setup_dtw_templates(X, y):
    reference_sequences = defaultdict(list)
    for i, label in enumerate(y):
        reference_sequences[label].append(X[i])
    return {label: seqs[0] for label, seqs in reference_sequences.items()}

templates = setup_dtw_templates(templates_X, templates_y)

# DTW classification function
def classify_with_dtw(sequence):
    best_label, best_distance = None, float('inf')
    for label, ref_seq in templates.items():
        dist, _ = fastdtw(sequence, ref_seq, dist=euclidean)
        if dist < best_distance:
            best_distance = dist
            best_label = label
    return best_label

# Combined voting function
def majority_vote_predict(sequence, max_len=50):
    padded = pad_sequences([sequence], maxlen=max_len, padding='post', truncating='post')
    lstm_probs = lstm_model.predict(padded, verbose=0)[0]
    lstm_pred = np.argmax(lstm_probs)
    dtw_pred = classify_with_dtw(sequence)

    final_pred = lstm_pred if lstm_pred == dtw_pred else lstm_pred
    return label_encoder.inverse_transform([final_pred])[0]

# ─────────────── Camera Setup ───────────────
MAX_LEN = 50
trajectory = []
recent_predictions = deque(maxlen=10)
stable_label = None

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)
align = rs.align(rs.stream.color)

# ─────────────── MediaPipe Setup ───────────────
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(max_num_hands=1, min_detection_confidence=0.7, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

try:
    while True:
        frames = pipeline.wait_for_frames()
        aligned = align.process(frames)
        depth_frame = aligned.get_depth_frame()
        color_frame = aligned.get_color_frame()

        if not depth_frame or not color_frame:
            continue

        color_image = np.asanyarray(color_frame.get_data())
        frame_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
        results = hands.process(frame_rgb)

        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                mp_drawing.draw_landmarks(color_image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
                h, w, _ = color_image.shape
                lm = hand_landmarks.landmark[8]
                cx, cy = int(lm.x * w), int(lm.y * h)
                cx, cy = np.clip(cx, 0, w - 1), np.clip(cy, 0, h - 1)
                z = depth_frame.get_distance(cx, cy) * 1000  # mm
                trajectory.append([cx, cy, z])

                if len(trajectory) > MAX_LEN:
                    trajectory = trajectory[-MAX_LEN:]

                if len(trajectory) >= MAX_LEN:
                    label = majority_vote_predict(trajectory)
                    recent_predictions.append(label)
                    most_common = Counter(recent_predictions).most_common(1)[0]

                    if most_common[1] >= 6:
                        stable_label = most_common[0]

                    if stable_label:
                        cv2.putText(color_image, f"Predicted: {stable_label}", (10, 60),
                                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

                # Draw circle and depth
                cv2.circle(color_image, (cx, cy), 8, (0, 255, 0), -1)
                cv2.putText(color_image, f"{round(z)} mm", (cx + 10, cy - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)

        cv2.imshow("Real-Time Prediction", color_image)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except KeyboardInterrupt:
    print("Interrupted")

finally:
    print("Exiting")
    pipeline.stop()
    cv2.destroyAllWindows()


Exiting
