<div style="background-color:rgb(0, 55, 207); padding: 30px; border-radius: 20px; box-shadow: 0 4px 15px rgba(105, 195, 255, 0.3); color:rgb(187, 201, 248); font-family: 'Times New Roman', serif;">

<h1 style="text-align: center; font-size: 38px; color: white; font-weight: bold;">Digital Twin Integration</h1>

<h3 style="font-size: 22px; color: white; font-weight: bold;">Libraries</h3>

In [1]:
# %pip install -r requirements.txt

In [2]:
import os
import cv2
import numpy as np
import mediapipe as mp
import torch
import requests
import time
from model.st_gcn import Model
from transformers import pipeline

  from .autonotebook import tqdm as notebook_tqdm


<h3 style="font-size: 22px; color: white; font-weight: bold;">Configuration</h3>

In [3]:
MODEL_WEIGHTS = './Best_fine_tuned_model/st-gcn.pt' # or the model you have trained
NUM_CLASSES = 90    # Adjust to your number of classes
# Adjust to your class names
CLASS_NAMES = [
    'all', 'almost', 'approve', 'before', 'boss', 'break', 'business', 'busy', 'but', 'buy', 'can', 
    'change', 'clock', 'computer', 'deaf', 'decide', 'delay', 'different', 'discuss', 'drink', 
    'eat', 'email', 'evaluate', 'explain', 'family', 'fine', 'finish', 'forget', 'full', 'give', 
    'goal', 'have', 'hearing', 'help', 'how', 'idea', 'improve', 'inform', 'last', 'later', 'leader', 
    'like', 'manager', 'many', 'meet', 'meeting', 'money', 'month', 'need', 'no', 'now', 'office', 
    'paper', 'plan', 'policy', 'presentation', 'problem', 'professional', 'provide', 'responsibility', 
    'result', 'role', 'same', 'schedule', 'secretary', 'sell', 'show', 'sorry', 'study', 'support', 
    'table', 'take', 'team', 'time', 'trade', 'understand', 'vacation', 'visit', 'wait', 'want', 
    'week', 'what', 'who', 'why', 'with', 'work', 'workshop', 'year', 'yes', 'yesterday'
]

# MediaPipe setup for hand landmarks
mp_hands = mp.solutions.hands
hand_detector = mp_hands.Hands(static_image_mode=False, 
                               max_num_hands=2, 
                               model_complexity=1, 
                               min_detection_confidence=0.5, 
                               min_tracking_confidence=0.5)



<h3 style="font-size: 22px; color: white; font-weight: bold;">Model-GPU</h3>

In [4]:
# Detect device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load model
model = Model(in_channels=6, num_class=NUM_CLASSES, num_point=21, num_person=1, 
              graph="graph.mediapipe_asl.Graph", graph_args={"layout":"mediapipe_asl", "strategy":"spatial"})

# Move model to GPU
model.load_state_dict(torch.load(MODEL_WEIGHTS, map_location=device))
model = model.to(device)
model.eval()


Model(
  (st_gcn_networks): ModuleList(
    (0): STGCNBlock(
      (gcn): Conv2d(6, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 1))
      (tcn): Sequential(
        (0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU(inplace=True)
        (2): Conv2d(64, 64, kernel_size=(9, 1), stride=(1, 1))
        (3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (residual): Sequential(
        (0): Conv2d(6, 64, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (relu): ReLU(inplace=True)
    )
    (1-2): 2 x STGCNBlock(
      (gcn): Conv2d(64, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 1))
      (tcn): Sequential(
        (0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU(inplace=True)
        (2): Conv2d(64, 64, kernel_size=(9, 1), stride=(1, 

<h3 style="font-size: 22px; color: white; font-weight: bold;">Open Web Cam</h3>

In [5]:
def capture_sign_from_camera():
    """Capture a sign sequence from the webcam. Returns a list of frames (BGR images)."""
    cap = cv2.VideoCapture(0)  # open default camera 
    if not cap.isOpened():
        print("Error: Cannot access camera.")
        return None
    
    print("Press 's' to start recording the sign, 'e' to end recording, or 'q' to quit.")
    recorded_frames = []
    recording = False
    start_time = None
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break  # camera read error
        # Optionally, flip the frame horizontally for a mirror-view (if needed)
        frame = cv2.flip(frame, 1)
        
        # If currently recording, save frames to list
        if recording:
            recorded_frames.append(frame.copy())
            # Draw a recording indicator on frame
            cv2.circle(frame, (30, 30), 10, (0,0,255), -1)  # red dot
            cv2.putText(frame, "Recording...", (50, 35), cv2.FONT_HERSHEY_SIMPLEX, 
                        0.8, (0,0,255), 2)
            # Auto-stop recording after a certain time to avoid infinite recording 
            if time.time() - start_time > 5:  # 5 seconds max for a sign 
                recording = False
                print("Auto-stopped recording after 5 seconds.")
                # break out to process the recorded frames
                break
        
        else:
            # Not recording yet: overlay instructions
            cv2.putText(frame, "Press 's' to start recording a sign", (10,30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
            cv2.putText(frame, "Press 'q' to quit", (10,60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
        
        # Show the camera feed
        cv2.imshow("Camera", frame)
        key = cv2.waitKey(1) & 0xFF  # read keyboard input
        if key == ord('q'):
            # Quit entire program
            cap.release()
            cv2.destroyWindow("Camera")
            return None
        if key == ord('s') and not recording:
            # Start recording
            print("Recording started. Perform the sign and press 'e' when done.")
            recording = True
            start_time = time.time()
        if key == ord('e') and recording:
            # End recording
            recording = False
            print("Recording stopped.")
            break
    
    # Cleanup camera resources
    cap.release()
    cv2.destroyWindow("Camera")
    return recorded_frames


<h3 style="font-size: 22px; color: white; font-weight: bold;">Translation (sign to text)</h3>

In [6]:

def predict_sign(frames):
    """Given a list of frames (BGR images) for a sign, return the predicted text."""
    if not frames:
        return None
    # Extract hand keypoints from each frame using MediaPipe
    all_landmarks = []  # will be list of 21 (x,y,z) for each frame
    for frame in frames:
        # Convert BGR frame to RGB for MediaPipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = hand_detector.process(rgb_frame)
        if results.multi_hand_landmarks:
            # If a hand is detected, take the first hand's landmarks
            hand_landmarks = results.multi_hand_landmarks[0]
            # Normalize or scale coordinates:
            # MediaPipe provides x, y, z normalized relative to the image and metric depth.
            landmarks = []
            for lm in hand_landmarks.landmark:
                landmarks.append([lm.x, lm.y, lm.z])
            # landmarks is a list of 21 [x,y,z]
        else:
            # If no hand detected in this frame, use zeros (or could skip frame)
            landmarks = [[0.0, 0.0, 0.0]] * 21
        all_landmarks.append(landmarks)
    # Convert to NumPy array for model input
    all_landmarks = np.array(all_landmarks)  # shape (T, 21, 3)
    T = all_landmarks.shape[0]
    # Rearrange to shape (3, T, 21)
    all_landmarks = np.transpose(all_landmarks, (2, 0, 1))  # (3, T, 21)
    # Compute velocity (difference between consecutive frames)
    if T > 1:
        velocity = all_landmarks[:, 1:, :] - all_landmarks[:, :-1, :]
    else:
        # If only 1 frame, velocity can be zeros
        velocity = np.zeros_like(all_landmarks)
        velocity = velocity[:, :-1, :]  # will be shape (3, 0, 21), effectively no time steps
    # Concatenate original coords (excluding last frame to match velocity length) with velocity
    if velocity.shape[1] > 0:
        data = np.concatenate((all_landmarks[:, :-1, :], velocity), axis=0)  # shape (6, T-1, 21)
    else:
        # In case of single-frame (which is rare for a sign), handle separately
        data = np.concatenate((all_landmarks, np.zeros_like(all_landmarks)), axis=0)  # (6, T, 21) with velocity zero
    # Add batch dimension and person dimension if needed
    data = np.expand_dims(data, axis=0)  # shape (1, 6, T-1, 21)
    # Convert to torch tensor
    data_tensor = torch.from_numpy(data).float().to(device)

    data_tensor = data_tensor.to(device)
    # model is already on appropriate device
    with torch.no_grad():
        output = model(data_tensor)  # shape (1, num_class)
        _, pred_class = torch.max(output, dim=1)  # predicted class index
        pred_class = int(pred_class.item())
    # Map class index to text
    if pred_class < len(CLASS_NAMES):
        predicted_text = CLASS_NAMES[pred_class]
    else:
        # If for some reason class is out of range, just return the index as string
        predicted_text = str(pred_class)
    return predicted_text

<h3 style="font-size: 22px; color: white; font-weight: bold;">Words to Sentence (GenAI)</h3>

In [7]:
# Load text generation pipeline
nlp = pipeline("text2text-generation", model="google/flan-t5-large")

def generate_sentence_better(accepted_words):
    input_prompt = f"Create a meaningful English sentence using the following words: {', '.join(accepted_words)}."
    
    output = nlp(input_prompt, max_length=50, do_sample=False)[0]['generated_text']
    return output




Device set to use cpu


<h3 style="font-size: 22px; color: white; font-weight: bold;">Avatar API call</h3>

In [8]:

# def generate_avatar_video(text):
#     """Call the Digital Twin API to generate the avatar video. Returns the video URL."""
#     if text is None:
#         return None
#     print(f"Sending text to Digital Twin API: '{text}'")
#     try:
#         # Call local Node API
#         response = requests.post("h
# ttp://localhost:3000/generate", json={"text": text}, timeout=60)
#         response.raise_for_status()
#     except requests.RequestException as e:
#         print("Error communicating with Digital Twin API:", e)
#         return None
#     data = response.json()
#     video_url = data.get("videoUrl") or data.get("url")
#     if video_url:
#         print("Received video URL:", video_url)
#     else:
#         print("No video URL received. Response:", data)
#     return video_url


<h3 style="font-size: 22px; color: white; font-weight: bold;">Show results</h3>

In [9]:
# def play_video_from_url(video_url):
#     if not video_url:
#         return

#     # === Download and save the video ===
#     try:
#         print("Downloading avatar video...")
#         video_data = requests.get(video_url, timeout=60).content
#         video_path = "avatar_output.mp4"
#         with open(video_path, "wb") as f:
#             f.write(video_data)
#         print(f"Video saved as {video_path}")
#     except Exception as e:
#         print("Failed to download video:", e)
#         return

#     while True:
#         # === Play the saved video with VLC ===
#         print("Opening avatar video with VLC...")
#         os.system(f'start vlc --play-and-exit {video_path}')

#         # === Ask user if they want to replay ===
#         print("Press 'r' to replay the avatar, any other key to continue...")
#         key = input().strip().lower()
#         if key == 'r':
#             continue  # Replay again
#         else:
#             break  # Exit playing


<h3 style="font-size: 22px; color: white; font-weight: bold;">Main Code</h3>

In [None]:
# import ipywidgets as widgets
# from IPython.display import display, clear_output

if __name__ == "__main__":
    print("Starting Sign Language Translation with Digital Twin Integration...")
    while True:
        accepted_words = []
        while True:
            frames = capture_sign_from_camera()
            if frames is None or len(frames) == 0:
                print("No frames captured or quit requested.")
                break
            
            text = predict_sign(frames)

        
            
            print(f"Predicted sign text: {text}")   # üî• PRINT IMMEDIATELY
            decision = input("Accept word? [y = yes, n = no, f = finish]: ").strip().lower()

            if decision == 'y':
                accepted_words.append(text)
                print(f"‚úÖ Added '{text}'.")
            elif decision == 'n':
                print("üîÅ Retry recording.")
                continue
            elif decision == 'f':
                finish_decision = input(f"Do you want to add '{text}' before finishing? [y/n]: ").strip().lower()
                if finish_decision == 'y':
                    accepted_words.append(text)
                    print(f"‚úÖ Added '{text}' before finishing.")
                else:
                    print("‚è© Word skipped before finishing.")
                break
            else:
                print("‚ùì Invalid input. Press 'y', 'n', or 'f'.")
                continue
        if not accepted_words:
            print("Prediction failed.")
            continue

        generated_sentence = generate_sentence_better(accepted_words)  # using better sentence generator
        print(f"üìù Generated sentence: {generated_sentence}")

        # video_url = generate_avatar_video(text)
        # if video_url:
        #     play_video_from_url(video_url)
        # another = input("Start another translation? [y/n]: ").strip().lower()
        # if another != 'y':
        #     print("üëã Exiting. Goodbye!")
        #     break


Starting Sign Language Translation with Digital Twin Integration...
Press 's' to start recording the sign, 'e' to end recording, or 'q' to quit.
Recording started. Perform the sign and press 'e' when done.
Auto-stopped recording after 5 seconds.
Predicted sign text: help
