In [None]:
# Import necessary libraries
from robopianist import music
import cv2
import numpy as np
from robopianist.suite.tasks import piano_with_one_shadow_hand
from mujoco_utils import composer_utils
from robopianist.models.hands import HandSide
from robopianist.models.piano import piano_constants as consts
from robopianist.music import midi_file
import mediapipe as mp
from utils import process_landmarks, create_detector, preprocess_frame, adjust_hand_action, val_hand_action, \
                    draw_landmarks_on_image, extract_finger_based_on_key, HandAction
import os
import pickle
from IPython.display import HTML
from base64 import b64encode
from robopianist.suite.tasks import piano_with_shadow_hands_res
from dm_env_wrappers import CanonicalSpecWrapper
from robopianist.wrappers import PianoSoundVideoWrapper
from robopianist.wrappers.deep_mimic import DeepMimicWrapper
from robopianist.wrappers.residual import ResidualWrapper
from robopianist.wrappers.dm2gym import Dm2GymWrapper
from dm_env_wrappers import SinglePrecisionWrapper
from dm_env_wrappers import DmControlWrapper
from robopianist.wrappers.evaluation import MidiEvaluationWrapper
from dm_control.mujoco.wrapper import mjbindings

## Step 1: Estimate the Homography Matrix

To transform video coordinates to real world coordinates, you'll need to estimate a **homography matrix** using known landmark points.

### 📸 Example

![Example of the landmarks](piano_example.jpg)

### 📝 Instructions
1. Replace the video_path with the path to your actual video file in the script.
2. **Click on the landmark points** on a video frame:
    - **Blue points (left to right)** first
    - **Red points (left to right)** next
3. This step is **only required once** for videos from the same YouTube channel, since their viewpoints are usually consistent.
4. Replace the video_path with the path to your actual video file in the script.
5. The homography matrix will be saved as homography_matrix.npy


In [3]:

'''
Mapping from pixel coordinate (x, y) to world coordinate (x', y') with homography matrix H
But real coordinate is different from mujoco coordinate
Needs a second mapping from real coordinate (x', y') to mujoco coordinate (x'', y''):
x'' = -y'
y'' = x'
Pixel coordinate:  World coordinate:   Mujoco coordinate:
|-------------> x  ^ y'                |--------------> y''
|                  |                   |
|                  |                   |
|                  |                   |
|                  |                   |
\/                 |-------------> x'  \/ 
y                                      x''          
'''
# Specify the path to the input video file
video_path = 'Stan_1.mp4'  # TODO Replace with your video file's path

def click_event(event, x, y, flags, params):
    global estimate_homography
    
    if event == cv2.EVENT_LBUTTONDOWN:
        # Record the pixel point
        pixel_points.append((x, y))
        print(f"Pixel Point: ({x}, {y})")
        
        if len(pixel_points) <= 14:
            # Blue
            cv2.circle(image, (x, y), 5, (255, 0, 0), -1)
        else:
            # Red
            cv2.circle(image, (x, y), 5, (0, 0, 255), -1)

        # Check if we have enough points to estimate homography
        if len(pixel_points) == len(real_world_points):
            estimate_homography = True
            # Save the image
            cv2.imwrite('Landmarks.png', image)

# Lists to store the correspondence points
pixel_points = []
real_world_points = []
hints = ["Between key 2 and key 3 top", 
            "Between key 2 and key 3 bottom",
            "Between key 14 and key 15 top",
            "Between key 14 and key 15 bottom",
            "Between key 26 and key 27 top",
            "Between key 26 and key 27 bottom",
            "Between key 31 and key 32 top",
            "Between key 31 and key 32 bottom",
            "Between key 38 and key 39 top",
            "Between key 38 and key 39 bottom",
            "Between key 43 and key 44 top",
            "Between key 43 and key 44 bottom",
            "Between key 62 and key 63 top",
            "Between key 62 and key 63 bottom",
            "Between key 74 and key 75 top",
            "Between key 74 and key 75 bottom",
            "Between key 86 and key 87 top",
            "Between key 86 and key 87 bottom",
            "Between key 8 and 10 middle",
            "Between key 20 and 22 middle",
            "Between key 32 and 34 middle",
            "Between key 51 and 53 middle",
            "Between key 68 and 70 middle",
            "Between key 80 and 82 middle"
        ]
task = piano_with_one_shadow_hand.PianoWithOneShadowHand(
    hand_side=HandSide.LEFT,
    midi=music.load("TwinkleTwinkleRousseau"),
    disable_colorization=True,
    change_color_on_activation=True,
    trim_silence=True,
    control_timestep=0.01,
    )

env = composer_utils.Environment(
    recompile_physics=False, task=task, strip_singleton_obs_buffer_dim=True
)
white_keys = [2, 14, 26, 43, 62, 74, 86]
for key in white_keys:
    y = consts.WHITE_KEY_LENGTH/2
    x = (env.task.piano._keys[key].pos[1] + env.task.piano._keys[key+1].pos[1])/2
    real_world_points.append((x, y))
    real_world_points.append((x, -y))

black_keys = [9, 21, 33, 52, 69, 81]
for key in black_keys:
    y = consts.WHITE_KEY_LENGTH/2 - consts.BLACK_KEY_LENGTH
    x = (env.task.piano._keys[key-1].pos[1] + env.task.piano._keys[key+1].pos[1])/2
    real_world_points.append((x, y))

# Flag to indicate when to perform homography estimation
estimate_homography = False

# Create a VideoCapture object to load the input video
cap = cv2.VideoCapture(video_path)

# Create a window and set a callback function
cv2.namedWindow('Image')
cv2.setMouseCallback('Image', click_event)
if not cap.isOpened():
    print("Error: Could not open video.")
else:   
    ret, image = cap.read()

while True:
    # Display the image
    cv2.imshow('Image', image)
    
    # If the 'q' key is pressed, break from the loop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
    # If we have enough points, estimate homography
    if estimate_homography:
        # Convert points to numpy arrays
        pts_src = np.array(pixel_points, dtype='float32')
        pts_dst = np.array(real_world_points, dtype='float32')
        
        # Estimate the homography
        H, status = cv2.findHomography(pts_src, pts_dst)
        print("Homography Matrix:")
        print(H)
        # Save the homography matrix
        np.save('homography_matrix.npy', H)
        break
    


  '''
2025-04-03 22:40:56.964 Python[23606:3294427] +[IMKClient subclass]: chose IMKClient_Modern
2025-04-03 22:40:56.964 Python[23606:3294427] +[IMKInputSession subclass]: chose IMKInputSession_Modern
  '''


KeyboardInterrupt: 

## Step 2: Generate Fingering from Video Using Hand Tracking

In this step, you'll use hand tracking to generate the fingering information from the video.

### 🛠️ Instructions

1. **Replace** `TASK_NAME` in your script with the actual name of your task.
2. **Add the corresponding MIDI file** to the folder named exactly the same as your video file.

### 🎯 Output

- Fingering data generated will be saved to a pickle file with the same name as the video file.

In [None]:
CTRL_TIMESTEP = 0.05
TASK_NAME = "Stan_1" # TODO Replace with your task name

TASK_MIDI = "{}.mid".format(TASK_NAME) 

midi = music.load(TASK_MIDI)
note_traj = midi_file.NoteTrajectory.from_midi(midi, CTRL_TIMESTEP)
note_traj = note_traj.trim_silence()
start_from = 0 # To align with the video (different for each video)
notes = note_traj.notes[start_from:]
sustains = note_traj.sustains[start_from:]
note_length = len(notes)

detector = create_detector()

# Specify the path to the input video file
video_path = '{}.mp4'.format(TASK_NAME)  # Replace with your video file's path
out_filename = 'out.mp4'

# Create a VideoCapture object to load the input video
cap = cv2.VideoCapture(video_path)

fps = cap.get(cv2.CAP_PROP_FPS)
print("FPS:", fps)

frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
print("Video width, height:", frame_width, frame_height)

# Load homography matrix
H = np.load('homography_matrix.npy')

out = cv2.VideoWriter(out_filename, cv2.VideoWriter_fourcc(
    'M', 'J', 'P', 'G'), 10, (frame_width, frame_height))

last_timestamp = float('-inf') # Let the first frame be processed -inf
timestep = 0
last_fingering = []
last_keys = []
first_timestamp = None
# Check if the video was successfully opened
if not cap.isOpened():
    print("Error: Could not open video.")
else:
    # Get the frame rate of the video
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
    timestamp = 0
    # Loop through each frame in the video
    while timestep < note_length:
        # Read the next frame from the video
        ret, frame = cap.read()
        timestamp = int(cap.get(cv2.CAP_PROP_POS_MSEC))
        if first_timestamp is None:
            first_timestamp = timestamp
        # Check if the video has ended
        if not ret:
            break

        # Process the frame here 
        if timestamp - first_timestamp < CTRL_TIMESTEP * 1000 * timestep:
            pass
        else:
            last_timestamp = timestamp
            if timestep >= note_length:
                break
            keys = [note.key for note in notes[timestep]]
            fingering = [note.fingering for note in notes[timestep]]

            original_frame = frame.copy()

            # brightness_factor = 8 # You can adjust this value to control the brightness
            # frame = cv2.addWeighted(frame, brightness_factor, np.zeros_like(frame), 0, 0)

            # contrast_factor = 1 # You can adjust this value to control the contrast
            # frame = cv2.convertScaleAbs(frame, alpha=contrast_factor, beta=0.5)

            # frame = enhance_hand_visibility(frame)
            frame_eq, frame = preprocess_frame(frame) # Enhance the local contrast
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            frame.flags.writeable = False
            image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
            detection_result = detector.detect_for_video(image, timestamp)
            original_frame.flags.writeable = True
            
            original_frame = draw_landmarks_on_image(original_frame, detection_result)
            
            original_frame, fingering = extract_finger_based_on_key(original_frame, detection_result, keys, H, 
                                                                        last_keys=last_keys, last_fingering=last_fingering)
            for i, note in enumerate(notes[timestep]):
                object.__setattr__(note, 'fingering', fingering[i])
            last_fingering = fingering
            last_keys = keys

            # Write the frame to the output video file
            out.write(original_frame)

            # Display the frame (you can remove this line if you don't need to display the video)
            cv2.imshow('Video Frame', original_frame)
            timestep += 1

        # Exit the loop if the 'q' key is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release the VideoCapture and close the display window
    cap.release()
    cv2.destroyAllWindows()
    # Save the updated notes to pickle file
    file_name = '{}.pkl'.format(TASK_NAME)
    with open(file_name, 'wb') as f:
        pickle.dump(note_traj, f)
    os.rename(out_filename, '{}_fingering.mp4'.format(TASK_NAME))


    

FPS: 60.0
Video width, height: 1280 720


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
OpenCV: FFMPEG: tag 0x47504a4d/'MJPG' is not supported with codec id 7 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


## Step 3: Extract Fingertip Trajectory in MuJoCo Coordinates

In this step, you'll convert the detected human fingertip positions into **MuJoCo-compatible coordinates**.

### ⚙️ How It Works

- The system uses **heuristics** to ensure that the generated fingertip trajectory is properly **aligned with the MIDI notes** of the song using the previously estimated fingering.
- This alignment helps synchronize finger movements with keypress events.

### 💾 Output

- The fingertip trajectories are saved as `.npy` files.
- Example output filenames:
  - `Stan_1_left_hand_action_list.npy`
  - `Stan_2_left_hand_action_list.npy`

These trajectory files can then be used to drive simulations or robots in the MuJoCo environment.


In [8]:
CTRL_TIMESTEP = 0.05

TASK_NAME = "Stan_1" # TODO Replace with your task name
TASK_VIDEO = "{}.mp4".format(TASK_NAME)
TASK_MIDI = "{}.mid".format(TASK_NAME)
# Load pickle file
with open('{}.pkl'.format(TASK_NAME), 'rb') as f:
    note_traj = pickle.load(f)

start_from = 0 # To align with the video (different for each video)
notes = note_traj.notes[start_from:]
sustains = note_traj.sustains[start_from:]
note_length = len(notes)

detector = create_detector()

# Specify the path to the input video file
video_path = TASK_VIDEO  # Replace with your video file's path
out_filename = 'out.mp4'

# Create a VideoCapture object to load the input video
cap = cv2.VideoCapture(video_path)

fps = cap.get(cv2.CAP_PROP_FPS)
print("FPS:", fps)

frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
print("Video width, height:", frame_width, frame_height)

# Load homography matrix
H = np.load('homography_matrix.npy')

out = cv2.VideoWriter(out_filename, cv2.VideoWriter_fourcc(
    'M', 'J', 'P', 'G'), 10, (frame_width, frame_height))

last_timestamp = float('-inf') # Let the first frame be processed -inf
timestep = 0
left_hand_initial_action_list = HandAction(*np.load('left_hand_initial_action_list.npy'))
right_hand_initial_action_list = HandAction(*np.load('right_hand_initial_action_list.npy'))

left_hand_action_list = []
right_hand_action_list = []
last_hand_action_list = [left_hand_initial_action_list, right_hand_initial_action_list]
hand_action_list = []
first_timestamp = None
# Check if the video was successfully opened
if not cap.isOpened():
    print("Error: Could not open video.")
else:
    # Get the frame rate of the video
    frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
    timestamp = 0
    # Loop through each frame in the video
    while timestep < note_length:
        # Read the next frame from the video
        ret, frame = cap.read()
        timestamp = int(cap.get(cv2.CAP_PROP_POS_MSEC))
        if first_timestamp is None:
            first_timestamp = timestamp
        # Check if the video has ended
        if not ret:
            break

        # Process the frame here 
        if timestamp - first_timestamp < CTRL_TIMESTEP * 1000 * timestep:
            pass
        else:
            last_timestamp = timestamp
            if timestep >= note_length:
                break
            keys = [note.key for note in notes[timestep]]
            fingering = [note.fingering for note in notes[timestep]]

            original_frame = frame.copy()

            # brightness_factor = 8 # You can adjust this value to control the brightness
            # frame = cv2.addWeighted(frame, brightness_factor, np.zeros_like(frame), 0, 0)

            # contrast_factor = 1 # You can adjust this value to control the contrast
            # frame = cv2.convertScaleAbs(frame, alpha=contrast_factor, beta=0.5)

            # frame = enhance_hand_visibility(frame)
            frame_eq, frame = preprocess_frame(frame) # Enhance the local contrast
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            frame.flags.writeable = False
            image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
            detection_result = detector.detect_for_video(image, timestamp)
            original_frame.flags.writeable = True
            
            original_frame, hand_action_list, handedness_list = \
                process_landmarks(original_frame, detection_result, keys, fingering, H, timestep=timestep)
            if len(hand_action_list) == 0:
                # No hand detected
                hand_action_list = last_hand_action_list
            elif len(hand_action_list) == 1:
                # Only one hand detected
                if handedness_list[0] == 'Left':
                    # Use the last right hand action
                    hand_action_list = [adjust_hand_action(hand_action_list[0], last_hand_action_list[0]),
                                        adjust_hand_action(last_hand_action_list[1])]
                else:
                    # Use the last left hand action
                    hand_action_list = [adjust_hand_action(last_hand_action_list[0]),
                                        adjust_hand_action(hand_action_list[0], last_hand_action_list[1])]
            else:
                # Two hands detected
                if last_hand_action_list != []:
                    hand_action_list = [adjust_hand_action(hand_action_list[0], last_hand_action_list[0]), 
                                        adjust_hand_action(hand_action_list[1], last_hand_action_list[1])]
                    
            last_hand_action_list = hand_action_list
            # For testing
            val_hand_action(hand_action_list[0])
            val_hand_action(hand_action_list[1])
            left_hand_action_list.append(hand_action_list[0])
            right_hand_action_list.append(hand_action_list[1])
            
            # Write the frame to the output video file
            out.write(original_frame)

            # Display the frame (you can remove this line if you don't need to display the video)
            cv2.imshow('Video Frame', original_frame)
            timestep += 1

        # Exit the loop if the 'q' key is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Write the hand action list to file
    left_hand_action_list = np.array(left_hand_action_list)
    right_hand_action_list = np.array(right_hand_action_list)
    # print(left_hand_action_list.shape)
    # print(right_hand_action_list.shape)
    np.save('{}_left_hand_action_list.npy'.format(TASK_NAME), left_hand_action_list)
    np.save('{}_right_hand_action_list.npy'.format(TASK_NAME), right_hand_action_list)

    # Release the VideoCapture and close the display window
    cap.release()
    cv2.destroyAllWindows()
os.rename(out_filename, '{}_mujoco.mp4'.format(TASK_NAME))




FPS: 60.0
Video width, height: 1280 720


OpenCV: FFMPEG: tag 0x47504a4d/'MJPG' is not supported with codec id 7 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


## Step 4: Control the Robot Using Inverse Kinematics (IK)

In this step, you'll use the previously generated fingertip trajectories to control a robot in a **MuJoCo simulation**.

### 🤖 What Happens Here

- The robot is controlled via **Inverse Kinematics (IK)** to **mimic the human hand movements**.
- The input to the IK controller is the `.npy` trajectory file created in Step 3.

### 🎯 Output

- The robot's movements are recorded in a `.mp4` file (e.g., `Stan_1_demo.mp4`).


In [4]:
task_name = "Stan_1" # TODO Replace with your task name

mjlib = mjbindings.mjlib

def play_video(filename: str):
    mp4 = open(filename, "rb").read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()

    return HTML(
        """
  <video controls>
        <source src="%s" type="video/mp4">
  </video>
  """
        % data_url
    )

with open('{}.pkl'.format(task_name), 'rb') as f:
    note_traj = pickle.load(f)


task = piano_with_shadow_hands_res.PianoWithShadowHandsResidual(
    # hand_side=HandSide.LEFT,
    note_trajectory=note_traj,
    # midi=music.load(task_name),
    change_color_on_activation=True,
    trim_silence=True,
    control_timestep=0.05,
    disable_hand_collisions=True,
    disable_forearm_reward=True,
    disable_fingering_reward=False,
    midi_start_from=0,
    n_steps_lookahead=10,
    gravity_compensation=True,
    residual_factor=0.03, # 0.03 for after NeverGonnaGiveYouUp
    shift=0,
)

# Load hand action trajectory
left_hand_action_list = np.load('{}_left_hand_action_list.npy'.format(task_name))
right_hand_action_list = np.load('{}_right_hand_action_list.npy'.format(task_name))

# Load trained actions
# actions = np.load('trained_songs/{}/actions_{}.npy'.format(task_name, task_name))

env = composer_utils.Environment(
    recompile_physics=False, task=task, strip_singleton_obs_buffer_dim=True
)

env = PianoSoundVideoWrapper(
    env,
    record_every=1,
    camera_id="piano/top",
    record_dir=".",
)
env = DeepMimicWrapper(env,
                      demonstrations_lh=left_hand_action_list,
                      demonstrations_rh=right_hand_action_list,
                      remove_goal_observation=False,
                      mimic_z_axis=False,)
env = ResidualWrapper(env, 
                      demonstrations_lh=left_hand_action_list,
                      demonstrations_rh=right_hand_action_list,
                      demo_ctrl_timestep=0.05,)
env = MidiEvaluationWrapper(
    environment=env, deque_size=1
)
env = CanonicalSpecWrapper(env, clip=True)

env = SinglePrecisionWrapper(env)
env = DmControlWrapper(env)

env = Dm2GymWrapper(env)
step = 0
err_poses = list()

demos = []
env = env.env
timestep = env.reset()
reward = 0
# print(env.physics.named.data.ctrl)
# raise
while not timestep.last():
    action = np.zeros(env.action_spec().shape)
    timestep = env.step(action)
    step += 1
    reward += timestep.reward
print(f"Total steps: {step}")
print(f"Total reward: {reward}")
print(f"Metrics: {env.get_musical_metrics()}")

play_video(env.latest_filename)

# Rename 00001.mp4 as "./demos/{}.mp4".format(task_name)
os.rename("./00001.mp4","{}_demo.mp4".format(task_name))

Total steps: 537
Total reward: 1748.7674869298935
Metrics: {'precision': 0.7895716945996276, 'recall': 0.6932650527622594, 'f1': 0.6899441340782123, 'sustain_precision': 1.0, 'sustain_recall': 1.0, 'sustain_f1': 1.0}
