In [1]:
import os
import time
import gymnasium as gym
import gymnasium_robotics
from gymnasium.wrappers import RecordVideo
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from sb3_contrib import TQC

In [2]:
# from multi_fetch_env import MultiObjectFetchPickAndPlaceEnv
import my_envs
from wrappers import ActiveObjectWrapper

gym.register_envs(gymnasium_robotics)

LOG_DIR = 'logs_testing'
# RUN_ID = os.getenv('RESUME_ID', None)
# assert RUN_ID is not None,  "Set RESUME_ID to the run folder name (e.g., 08vger36)"

ENV_ID = 'MultiObjectFetchPickAndPlace-v0'
# MultiObjectFetchPickAndPlace-v0

# ENV_ID = 'FetchPickAndPlace-v4'
MODEL_PATH = os.path.join('models', 'tqcdense_model.zip')
VECNORM_PATH = os.path.join('models', 'tqcdense_vecnorm.pkl')
VIDEO_DIR = os.path.join(LOG_DIR, "inference_videos")


def make_eval_env():
    env = gym.make(
        ENV_ID,
        render_mode = 'human', 
        reward_type = 'dense', 
        max_episode_steps=50, 
        n_objects = 4
    )
    # os.makedirs(VIDEO_DIR, exist_ok=True)

    # env = RecordVideo(
    #     env,
    #     video_folder=VIDEO_DIR,
    #     episode_trigger=lambda ep: True,  # record every episode
    #     name_prefix="eval",
    # )
    env = ActiveObjectWrapper(env, 0, 5)
    # env = Monitor(env)  # for episode stats
    
    return env

eval_env = DummyVecEnv([make_eval_env])

if os.path.exists(VECNORM_PATH):
    eval_env = VecNormalize.load(VECNORM_PATH, eval_env)
else:
    raise FileNotFoundError(f'Missing Vecnormalize file : {VECNORM_PATH}')

eval_env.training = False
eval_env.norm_reward = False

model = TQC.load(MODEL_PATH, env = eval_env)



In [29]:
import numpy as np
import time
import copy

def surgical_routine(vec_env, model, final_target):
    # 1. Reset Standard Wrapper
    # This initializes the wrappers (Monitor, VideoRecorder, etc.) correctly
    vec_env.reset()
    
    # 2. Access 'Unwrapped' for Physics/Logic checks (Read-Only)
    # We use this to know where things *really* are in meters
    env_stack = vec_env.envs[0] #.unwrapped
    
    raw_env = env_stack
    while not hasattr(raw_env, "set_active_object"):
        if hasattr(raw_env, "env"):
            raw_env = raw_env.env
        else:
            raise ValueError("ActiveObjectWrapper not found in env stack!")
    
    # Setup Waypoints (using raw env to get initial positions)
    initial_obs = raw_env._get_obs() # Get raw dict (meters)
    obj_pos = initial_obs['observation'][3:6].copy()
    final_target = obj_pos = initial_obs['desired_goal'].copy()
    # print(final_target).exit()
    
    hover_point = obj_pos.copy()
    hover_point[2] += 0.10  # 25cm above
    
    state = "HOVER"
    gripper_closed = False 
    # open_grip = False
    
    print(f"--- Starting Surgical Routine ---")
    
    while True:
        # --- A. READ REALITY ---
        # Get a fresh RAW observation from the internal physics
        # We do this every step to get the latest coordinates in meters
        raw_obs = raw_env._get_obs()
        current_grip = raw_obs['observation'][:3]
        current_obj  = raw_obs['observation'][3:6]
        
        # --- B. STATE MACHINE (Decide the Goal) ---
        # We modify 'raw_obs' to trick the agent into doing what we want
        
        if state == "HOVER":
            # TRICK: Tell agent "I am already holding the object"
            # Set Object Pos = Gripper Pos. 
            # This makes the policy drive the Gripper to the Goal.
            # raw_obs['observation'][3:6] = current_grip
            # raw_obs['observation'][6:9] = [0, 0, 0] # Relative dist is 0
            
            # STRATEGY: "The Ghost Object"
            # We tell the agent the object is AT the hover point.
            # The agent will try to fly there to "pick it up".
            
            # 1. Lie: Object is at Hover Point
            raw_obs['observation'][3:6] = hover_point
            # 2. Lie: Relative Distance = Hover Point - Current Gripper
            # (Standard subtraction the agent expects)
            raw_obs['observation'][6:9] = raw_obs['observation'][3:6] - current_grip
            # Set Goal = Hover Point
            raw_obs['desired_goal'] = hover_point
            
            # Force Gripper Open (Agent expects open gripper during approach)
            override_gripper = 1.0
            
            # Check success (in meters)
            dist = np.linalg.norm(current_grip - hover_point)
            print(f'dist:{dist} ; grip_pos: {current_grip} ; hover: {hover_point}')

            if dist < 0.05:
                print(">> Hover Reached. Descending...")
                state = "DESCEND"

        elif state == "DESCEND":
            # # TRICK: Still pretend we hold it, but move goal to object
            # raw_obs['observation'][3:6] = current_grip
            # raw_obs['observation'][6:9] = [0, 0, 0]
            
            # Goal = The real object position
            raw_obs['desired_goal'] = obj_pos
            
            override_gripper = 1.0
            
            # Check proximity to grasp
            dist = np.linalg.norm(current_grip - obj_pos)
            if dist < 0.03: # Very close
                print(">> In Position. Grasping...")
                state = "GRASP"

        elif state == "GRASP":
            # REALITY: Give the agent the TRUE observation now.
            # It sees the object right in front of it and will naturally close gripper.
            # raw_obs['desired_goal'] = obj_pos
             
            override_gripper = -1.0
            # We can force the gripper closed in the action later, 
            # or trust the agent. TQC usually closes instantly here.
            
            # Check if lifted (Z height increased)
            if current_obj[2] > obj_pos[2] + 0.02:
                print(">> Object Moving! Lifting...")
                state = "MOVE"

        # elif state == "LIFT":
        #     # REALITY: Object is held. Set Goal High.
        #     lift_target = obj_pos.copy()
        #     lift_target[2] += 0.4
        #     raw_obs['desired_goal'] = lift_target
            
        #     if current_obj[2] > obj:
        #         print(">> Lifted. Moving to Drop...")
        #         state = "MOVE"

        elif state == "MOVE":
            # REALITY: Set Goal to Final Target
            raw_obs['desired_goal'] = final_target
            
            dist = np.linalg.norm(current_obj - final_target)
            if dist < 0.05:
                print(">> MISSION COMPLETE")
                break

        # --- C. NORMALIZE MANUALLY ---
        # The model needs Normalized data. We use the wrapper to convert our
        # hacked raw_obs into something the model understands.
        
        # 1. Add Batch Dimension (VecEnv expects (1, Dim))
        obs_vec = {key: val[np.newaxis, ...] for key, val in raw_obs.items()}
        
        # 2. Normalize using the Env's running stats
        # This is the "Magic" bridge between your Meter logic and the Agent's Sigma logic
        obs_norm = vec_env.normalize_obs(obs_vec)
        
        # --- D. PREDICT & STEP ---
        # Predict using our crafted input
        action, _ = model.predict(obs_norm, deterministic=True)
        
        # Optional: Manual Gripper Override (Safety)
        # If we are in Hover/Descend, force Open (1.0)
        # If we are in Lift/Move, force Closed (-1.0)
        # if state in ["HOVER", "DESCEND"]:
        # if open_grip:
        #     action[0, 3] = 1.0
        #     open_grip=False
        # elif state in ["LIFT", "MOVE"]:
        #     action[0, 3] = -1.0

        # Execute standard step (maintains recording, timing, rendering)
        # We discard the 'obs' returned here because we generate our own next loop
        _, _, done, info = vec_env.step(action)
        
        time.sleep(0.02)
        vec_env.render()
        
        if done:
            break

# Run it
eval_env.env_method("set_active_object", 2)

target = np.array([1.45, 0.75, 0.45]) # Example Shelf
surgical_routine(eval_env, model, target)

AttributeError: 'ActiveObjectWrapper' object has no attribute '_get_obs'

In [30]:
import numpy as np
import time
import copy

def surgical_routine(vec_env, model, final_target):
    # 1. Reset Standard Wrapper
    # This initializes the wrappers (Monitor, VideoRecorder, etc.) correctly
    vec_env.reset()
    
    # 2. Access 'Unwrapped' for Physics/Logic checks (Read-Only)
    # We use this to know where things *really* are in meters
    env_stack = vec_env.envs[0] #.unwrapped
    
    raw_env = env_stack
    while not hasattr(raw_env, "set_active_object"):
        if hasattr(raw_env, "env"):
            raw_env = raw_env.env
        else:
            raise ValueError("ActiveObjectWrapper not found in env stack!")
    
    # Setup Waypoints (using raw env to get initial positions)
    # initial_obs = raw_env._get_obs() # Get raw dict (meters)
    raw_obs = raw_env.get_current_obs()
    obj_pos_start = raw_obs['observation'][3:6].copy()
    final_target = obj_pos = raw_obs['desired_goal'].copy()
    # print(final_target).exit()
    
    hover_point = obj_pos_start.copy()
    hover_point[2] += 0.10  # 25cm above
    
    # Define a Safe Z Height for transporting (e.g., 60cm high)
    SAFE_Z_HEIGHT = 0.65
    
    state = "HOVER"
    override_gripper = None # Default Open
    # open_grip = False
    
    print(f"--- Starting Surgical Routine ---")
    
    while True:
        # --- A. READ REALITY ---
        # Get a fresh RAW observation from the internal physics
        # We do this every step to get the latest coordinates in meters
        raw_obs = raw_env.get_current_obs()
        current_grip = raw_obs['observation'][:3]
        current_obj  = raw_obs['observation'][3:6]
        
        # --- B. STATE MACHINE (Decide the Goal) ---
        # We modify 'raw_obs' to trick the agent into doing what we want
        
        if state == "HOVER":
            
            # STRATEGY: "The Ghost Object"
            # We tell the agent the object is AT the hover point.
            # The agent will try to fly there to "pick it up".
            
            # 1. Lie: Object is at Hover Point
            raw_obs['observation'][3:6] = hover_point
            # 2. Lie: Relative Distance = Hover Point - Current Gripper
            # (Standard subtraction the agent expects)
            raw_obs['observation'][6:9] = raw_obs['observation'][3:6] - current_grip
            # Set Goal = Hover Point
            raw_obs['desired_goal'] = hover_point
            
            # Force Gripper Open (Agent expects open gripper during approach)
            # override_gripper = 1.0
            
            # Check success (in meters)
            dist = np.linalg.norm(current_grip - hover_point)
            print(f'dist:{dist} ; grip_pos: {current_grip} ; hover: {hover_point}')

            if dist < 0.05:
                print(">> Hover Reached. Descending...")
                state = "DESCEND"

        elif state == "DESCEND":
            # # TRICK: Still pretend we hold it, but move goal to object
            # raw_obs['observation'][3:6] = current_grip
            # raw_obs['observation'][6:9] = [0, 0, 0]
            
            # Goal = The real object position
            raw_obs['desired_goal'] = obj_pos_start
            
            # override_gripper = 1.0
            
            # Check proximity to grasp
            dist = np.linalg.norm(current_grip - obj_pos_start)
            if dist < 0.03: # Very close
                print(">> In Position. Grasping...")
                state = "GRASP"

        elif state == "GRASP":
            # REALITY: Give the agent the TRUE observation now.
            # It sees the object right in front of it and will naturally close gripper.
            # raw_obs['desired_goal'] = obj_pos
             
            override_gripper = -1.0
            # We can force the gripper closed in the action later, 
            # or trust the agent. TQC usually closes instantly here.
            
            # Check if lifted (Z height increased)
            if current_obj[2] > obj_pos_start[2] + 0.02:
                print(">> Object Moving! Lifting...")
                state = "LIFT"

        elif state == "LIFT":
            # --- THE NEW ELEVATOR LOGIC ---
            
            # 1. Trick: "I have it" (Object is at Gripper)
            raw_obs['observation'][3:6] = current_grip
            raw_obs['observation'][6:9] = [0, 0, 0]
            
            # 2. Goal: LOCK X/Y, INCREASE Z
            # We construct a target that is exactly where we are now, but higher.
            # This prevents the arm from drifting sideways into neighbors.
            elevator_target = current_grip.copy()
            elevator_target[2] = SAFE_Z_HEIGHT 
            
            raw_obs['desired_goal'] = elevator_target
            
            override_gripper = -1.0 # Close
            
            # 3. Transition: When we are high enough
            if current_grip[2] > SAFE_Z_HEIGHT - 0.05:
                print(f">> Lifted Safe (Z={current_grip[2]:.2f}). Moving to Target...")
                state = "MOVE"
                
        elif state == "MOVE":
            # REALITY: Set Goal to Final Target
            raw_obs['desired_goal'] = final_target
            override_gripper = -1.0
            
            dist = np.linalg.norm(current_obj - final_target)
            if dist < 0.02:
                print(">> MISSION COMPLETE")
                break

        # --- C. NORMALIZE MANUALLY ---
        # The model needs Normalized data. We use the wrapper to convert our
        # hacked raw_obs into something the model understands.
        
        # 1. Add Batch Dimension (VecEnv expects (1, Dim))
        obs_vec = {key: val[np.newaxis, ...] for key, val in raw_obs.items()}
        
        # 2. Normalize using the Env's running stats
        # This is the "Magic" bridge between your Meter logic and the Agent's Sigma logic
        obs_norm = vec_env.normalize_obs(obs_vec)
        
        # --- D. PREDICT & STEP ---
        # Predict using our crafted input
        action, _ = model.predict(obs_norm, deterministic=True)
        
        if override_gripper is not None:
            action[0, 3] = override_gripper
        
        _, _, done, info = vec_env.step(action)
        
        time.sleep(0.02)
        vec_env.render()
        
        if done:
            break

# Run it
eval_env.env_method("set_active_object", 3)

target = np.array([[1.4, 0.65, 0.43]]) # Example Shelf
surgical_routine(eval_env, model, target)

--- Starting Surgical Routine ---
dist:0.09679288247314226 ; grip_pos: [1.34193475 0.74910105 0.53472519] ; hover: [1.2652443  0.69360772 0.55492239]
dist:0.09270660050753887 ; grip_pos: [1.31115907 0.77366754 0.5636844 ] ; hover: [1.2652443  0.69360772 0.55492239]
dist:0.056985097231171275 ; grip_pos: [1.28377786 0.74543069 0.5696936 ] ; hover: [1.2652443  0.69360772 0.55492239]
dist:0.019309433710078733 ; grip_pos: [1.26777152 0.71227603 0.55068427] ; hover: [1.2652443  0.69360772 0.55492239]
>> Hover Reached. Descending...
>> In Position. Grasping...
>> Object Moving! Lifting...
>> Lifted Safe (Z=0.61). Moving to Target...
>> MISSION COMPLETE


In [31]:
eval_env.close()

In [70]:
# import numpy as np
# import time

# def run_surgical_pick_and_place(env, model, final_target):
#     # 1. Setup
#     obs = eval_env.reset()
    
#     # Get the REAL position of the object we want to pick
#     # We extract this from the observation (indices 3:6 are object pos)
#     real_obj_pos = obs['observation'][0, 3:6].copy()
#     # print(real_obj_pos)
    
#     # Define our Waypoints
#     hover_point = real_obj_pos.copy()
#     print(hover_point)
#     hover_point[2] += 0.20  # 20cm above object
    
#     lift_point = real_obj_pos.copy()
#     lift_point[2] += 0.30   # Lift high before moving
    
#     state = "HOVER"
#     gripper_closed = False
    
#     print(f"--- Starting Sequence for Object  ---")
    
#     while True:
#         # --- STATE MACHINE LOGIC ---
        
#         current_gripper_pos = obs['observation'][0, 0:3]
        
#         if state == "HOVER":
#             # THE TRICK: Tell agent the object is at the hover point
#             # We construct a "Ghost Observation"
#             ghost_obs = obs.copy()
#             ghost_obs['observation'] = obs['observation'].copy()
            
#             # Fake the object position to be the hover point
#             ghost_obs['observation'][0, 3:6] = hover_point.ravel()
#             # Fake the relative distance (Ghost - Gripper)
#             ghost_obs['observation'][0, 6:9] = hover_point.ravel() - current_gripper_pos
            
#             # Set the goal to the hover point too, so it thinks it just needs to go there
#             ghost_obs['desired_goal'] = np.expand_dims(hover_point.ravel(), axis=0)
            
            
#             # Predict action based on the LIE
#             action, _ = model.predict(ghost_obs, deterministic=True)
            
#             # print(f'action:{action}')
#             # Force Gripper OPEN (1.0)
            
#             # Check if we arrived
#             dist = np.linalg.norm(current_gripper_pos - hover_point)
#             print(f'dist:{dist} ; grip_pos: {current_gripper_pos} ; hover: {hover_point}')
#             if dist < 0.05:
#                 print(">> Hover Reached. Descending...")
#                 state = "GRASP"
#                 action[0, 3] = 1.0 


#         elif state == "GRASP":
#             # REVEAL THE TRUTH: Use the real observation
#             # The agent will naturally try to go to the real object
            
#             # We allow the agent to decide when to close the gripper, 
#             # BUT usually TQC is eager. Let's just run the model normally.
#             action, _ = model.predict(obs, deterministic=True)
            
#             # Monitor if the agent has grasped it.
#             # (In Fetch, grip state < 0 usually means closing)
#             # A simple check: Is the object moving up?
#             obj_z = obs['observation'][5] # Z height of object
            
#             if obj_z > real_obj_pos[2] + 0.05: # If lifted 5cm
#                 print(">> Object Grasped! Lifting...")
#                 state = "LIFT"

#         elif state == "LIFT":
#             # Override the Goal to be high up
#             obs['desired_goal'] = lift_point
#             action, _ = model.predict(obs, deterministic=True)
            
#             # Keep Gripper Closed (safety override)
#             if action[3] > 0: action[3] = -1.0
            
#             # Check height
#             obj_z = obs['observation'][5]
#             if obj_z > lift_point[2] - 0.05:
#                 print(">> Lift Complete. Moving to Target...")
#                 state = "MOVE"

#         elif state == "MOVE":
#             # Set the Final Target
#             obs['desired_goal'] = final_target
#             action, _ = model.predict(obs, deterministic=True)
            
#             # Keep Gripper Closed until very close
#             dist_to_target = np.linalg.norm(obs['achieved_goal'] - final_target)
#             if dist_to_target > 0.05:
#                 if action[3] > 0: action[3] = -1.0
            
#             # Success Check
#             info = env.get_wrapper_attr("compute_reward")(obs['achieved_goal'], final_target, {})
#             # Note: reward is usually -1 or 0. Success is 0.
#             if dist_to_target < 0.05:
#                 print(">> MISSION COMPLETE")
#                 break
        
#         # --- EXECUTE STEP ---
#         obs, reward, done, info = env.step(action)
#         time.sleep(0.05) # Slow down for visualization
#         env.render()
        
#         if done:
#             break

# # --- USAGE ---


In [None]:
# target_pos = np.array([1.43710006, 0.89503746, 0.42489224]) # Final shelf location
# run_surgical_pick_and_place(eval_env, model, final_target=target_pos)

[ 0.6088694  -0.28861788 -0.05878199]
--- Starting Sequence for Object  ---
dist:0.7434815168380737 ; grip_pos: [-0.02822911 -0.0232845  -0.13531129] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.9194920063018799 ; grip_pos: [-0.23844746  0.06065339  0.06685882] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.7129766345024109 ; grip_pos: [-0.05906625 -0.1180447  -0.04072295] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.49186351895332336 ; grip_pos: [ 0.16640362 -0.18366925 -0.04623748] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.3035851716995239 ; grip_pos: [ 0.3872642  -0.27144578 -0.06556741] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.3641887307167053 ; grip_pos: [ 0.5989419  -0.2575634  -0.22150846] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.5343226194381714 ; grip_pos: [ 0.604176   -0.22332798 -0.38907984] ; hover: [ 0.6088694  -0.28861788  0.14121802]
dist:0.45727968215942383 ; grip_pos: [ 0.51227134 -0.20915245 -0.29862145] ;

In [43]:
eval_env.close()

In [None]:
# class RobotCurriculum():
#     def __init__(self, env, model, ):
#         self.env = env
#         self.model = model
#         self.current_obs = None
        
#     def get_current_obs(self):
#         """ for getting current observation """
#         return self.current_obs
    
#     def hover_free(self, hover_point):
        
#         modified_obs = self.current_obs.copy()
#         modified_obs['observations'] = self.current_obs['observations'].copy
        
#         current_gripper = self.current_obs['observations']
        
#         assert isinstance(hover_point, np.ndarray) and hover_point.shape == (3,)
#         modified_obs[3:6] = hover_point
        
#         modified_obs[6:9] = hover_point - current_gripper
        
        
        
        
        

In [None]:
# import numpy as np
# import time

# def surgical_routine(vec_env, model, final_target, active_obj_idx, reset_env=False):
#     """
#     Executes a Pick-and-Place operation on the specified object index.
#     Includes: Hover -> Grasp -> Lift -> Move -> Place -> Retract.
#     """
    
#     # 1. HANDLE RESET & TIMER
#     if reset_env:
#         # Full Reset (Only do this for the VERY FIRST move)
#         vec_env.reset()
#     else:
#         # MAGIC HACK: Reset the hidden "Time Limit" counter
#         # This prevents the environment from auto-resetting in the middle of your swap
#         current_env = vec_env.envs[0]
#         while hasattr(current_env, "env"):
#             if hasattr(current_env, "_elapsed_steps"):
#                 current_env._elapsed_steps = 0 # Reset the timer to 0
#             current_env = current_env.env
    
#     # 1. Access Wrapper
#     env_stack = vec_env.envs[0]
#     wrapper = env_stack
#     while not hasattr(wrapper, "set_active_object"):
#         if hasattr(wrapper, "env"):
#             wrapper = wrapper.env
#         else:
#             raise ValueError("ActiveObjectWrapper not found in env stack!")
            
#     # 2. Activate Target Object
#     wrapper.set_active_object(active_obj_idx)
    
#     # 3. Optional Reset (Only for the very first move)
#     # if reset_env:
#     #     vec_env.reset()
    
#     # --- SETUP ---
#     # Get fresh observation via wrapper
#     raw_obs = wrapper.get_current_obs()
    
#     # Start Position
#     obj_pos_start = raw_obs['observation'][3:6].copy()
    
#     # Calculated Waypoints
#     hover_point = obj_pos_start.copy()
#     hover_point[2] += 0.10  # 15cm above object
    
#     SAFE_Z_HEIGHT = 0.65
    
#     state = "HOVER"
#     override_gripper = 1.0 # Open
    
#     print(f"\n--- Moving Object {active_obj_idx} ---")
    
#     # Loop until complete
#     while True:
#         # A. READ REALITY
#         raw_obs = wrapper.get_current_obs()
#         current_grip = raw_obs['observation'][:3]
#         current_obj  = raw_obs['observation'][3:6]
        
#         # B. STATE MACHINE
#         if state == "HOVER":
#             # Goal: Hover point
#             # Trick: Ghost Object at Hover
#             raw_obs['observation'][3:6] = hover_point
#             raw_obs['observation'][6:9] = hover_point - current_grip
#             raw_obs['desired_goal'] = hover_point
#             override_gripper = 1.0
            
#             if np.linalg.norm(current_grip - hover_point) < 0.05:
#                 state = "DESCEND"

#         elif state == "DESCEND":
#             # Goal: Real Object
#             raw_obs['desired_goal'] = obj_pos_start
#             override_gripper = 1.0
            
#             if np.linalg.norm(current_grip - obj_pos_start) < 0.03:
#                 state = "GRASP"

#         elif state == "GRASP":
#             # Goal: Hold
#             override_gripper = -1.0 # Close
            
#             # Trick: "I have it"
#             raw_obs['observation'][3:6] = current_grip
#             raw_obs['observation'][6:9] = [0,0,0]
#             raw_obs['desired_goal'] = current_grip # Stay put
            
#             # Simple timer or check for closure could go here
#             # We assume instant grasp and move to lift
#             state = "LIFT"

#         elif state == "LIFT":
#             # Goal: Elevator Up
#             # Trick: "I have it"
#             raw_obs['observation'][3:6] = current_grip
#             raw_obs['observation'][6:9] = [0,0,0]
            
#             elevator_target = current_grip.copy()
#             elevator_target[2] = SAFE_Z_HEIGHT
#             raw_obs['desired_goal'] = elevator_target
            
#             override_gripper = -1.0 # Keep Closed!
            
#             if current_grip[2] > SAFE_Z_HEIGHT - 0.05:
#                 state = "MOVE"

#         elif state == "MOVE":
#             # Goal: Final Target
#             # Trick: "I have it"
#             raw_obs['observation'][3:6] = current_grip
#             raw_obs['observation'][6:9] = [0,0,0]
            
#             # Modify target Z to be slightly above table to avoid smashing
#             place_target = final_target.copy()
#             place_target[2] = max(place_target[2], 0.45) 
            
#             raw_obs['desired_goal'] = place_target
#             override_gripper = -1.0
            
#             # Check 2D distance (XY) mostly, Z can be approximate
#             dist_xy = np.linalg.norm(current_obj[:2] - place_target[:2])
#             if dist_xy < 0.04:
#                 print(">> Target Reached. Placing...")
#                 state = "PLACE"

#         elif state == "PLACE":
#             # Goal: Stay put, Open Gripper
#             raw_obs['desired_goal'] = current_grip
#             override_gripper = 1.0 # OPEN!
            
#             # Wait for object to detach (simple logic: just wait a few frames)
#             # or check if we can move up
#             state = "RETRACT"

#         elif state == "RETRACT":
#             # Goal: Go UP to Safe Height again
#             # Real Object stays on table, we leave it.
            
#             retract_target = current_grip.copy()
#             retract_target[2] = SAFE_Z_HEIGHT
            
#             raw_obs['desired_goal'] = retract_target
#             override_gripper = 1.0 # Keep Open
            
#             if current_grip[2] > SAFE_Z_HEIGHT - 0.05:
#                 print(">> Operation Complete.")
#                 return # EXIT FUNCTION

#         # C. NORMALIZE & STEP
#         obs_vec = {key: val[np.newaxis, ...] for key, val in raw_obs.items()}
#         obs_norm = vec_env.normalize_obs(obs_vec)
#         action, _ = model.predict(obs_norm, deterministic=True)
        
#         if override_gripper is not None:
#             action[0, 3] = override_gripper

#         _, _, done, info = vec_env.step(action)
        
#         time.sleep(0.02)
#         vec_env.render()
        
#         if done:
#             break

In [4]:
import numpy as np
import time

def surgical_routine(vec_env, model, final_target, active_obj_idx, reset_env=False):
    """
    Executes a Pick-and-Place operation on the specified object index.
    """
    
    # --- FIX 1: FREEZE THE BRAIN ---
    # Stop the normalizer from learning from our 'hacked' observations.
    # If we don't do this, the running mean/std will drift, making the 
    # agent fail on the 2nd and 3rd objects.
    if hasattr(vec_env, "training"):
        vec_env.training = False 

    # --- FIX 2: ROBUST TIMER RESET ---
    if reset_env:
        print(">> FULL RESET (Starting fresh)")
        vec_env.reset()
    # else:
    #     # Recursively hunt for the TimeLimit wrapper and reset it
    #     current_env = vec_env.envs[0]
    #     timer_found = False
    #     while hasattr(current_env, "env"):
    #         if hasattr(current_env, "_elapsed_steps"):
    #             current_env._elapsed_steps = 0
    #             timer_found = True
    #         current_env = current_env.env
        
    #     if not timer_found:
    #         print("WARNING: Could not find '_elapsed_steps'. Auto-reset might occur!")

    # 1. Access Wrapper
    env_stack = vec_env.envs[0]
    wrapper = env_stack
    while not hasattr(wrapper, "set_active_object"):
        if hasattr(wrapper, "env"):
            wrapper = wrapper.env
        else:
            raise ValueError("ActiveObjectWrapper not found in env stack!")
            
    # 2. Activate Target Object
    wrapper.set_active_object(active_obj_idx)
    
    # --- SETUP ---
    raw_obs = wrapper.get_current_obs()
    obj_pos_start = raw_obs['observation'][3:6].copy()
    
    # Waypoints
    hover_point = obj_pos_start.copy()
    hover_point[2] += 0.15  # 15cm above
    
    SAFE_Z_HEIGHT = 0.65
    
    state = "HOVER"
    override_gripper = None
    
    print(f"\n--- Moving Object {active_obj_idx} ---")
    
    while True:
        # A. READ REALITY
        raw_obs = wrapper.get_current_obs()
        current_grip = raw_obs['observation'][:3]
        current_obj  = raw_obs['observation'][3:6]
        
        # B. STATE MACHINE
        if state == "HOVER":
            raw_obs['observation'][3:6] = hover_point
            raw_obs['observation'][6:9] = hover_point - current_grip
            raw_obs['desired_goal'] = hover_point
            # override_gripper = 1.0
            
            if np.linalg.norm(current_grip - hover_point) < 0.05:
                state = "DESCEND"
                print(">> Hover Reached. Descending...")

        elif state == "DESCEND":
            raw_obs['desired_goal'] = obj_pos_start
            # override_gripper = 1.0
            
            if np.linalg.norm(current_grip - obj_pos_start) < 0.03:
                print(">> In Position. Grasping...")
                state = "GRASP"

        elif state == "GRASP":
            # override_gripper = -1.0 # Close
            
            # Trick: "I have it" - prevents panic
            # raw_obs['observation'][3:6] = current_grip
            # raw_obs['observation'][6:9] = [0,0,0]
            # raw_obs['desired_goal'] = current_grip 
            
            if current_obj[2] > obj_pos_start[2] + 0.02:
                print(">> Object Moving! Lifting...")
                state = "LIFT"
            

        elif state == "LIFT":
            raw_obs['observation'][3:6] = current_grip
            raw_obs['observation'][6:9] = [0,0,0]
            
            elevator_target = current_grip.copy()
            elevator_target[2] = SAFE_Z_HEIGHT
            raw_obs['desired_goal'] = elevator_target
            # override_gripper = -1.0 
            
            if current_grip[2] > SAFE_Z_HEIGHT - 0.05:
                print(f">> Lifted Safe (Z={current_grip[2]:.2f}). Moving to Target...")
                state = "MOVE"

        elif state == "MOVE":
            # raw_obs['observation'][3:6] = current_grip
            # raw_obs['observation'][6:9] = [0,0,0]
            
            # # Ensure target Z is safe (not inside table)
            # place_target = final_target.copy()
            # place_target[2] = max(place_target[2], 0.45) 
            
            # raw_obs['desired_goal'] = place_target
            # override_gripper = -1.0
            
            # dist_xy = np.linalg.norm(current_obj[:2] - place_target[:2])
            # if dist_xy < 0.04:
            #     print(">> Target Reached. Placing...")
            #     state = "PLACE"
            raw_obs['desired_goal'] = final_target
            
            dist = np.linalg.norm(current_obj - final_target)
            if dist < 0.02:
                print(">> target reached.placing...")
                break

        elif state == "PLACE":
            # raw_obs['desired_goal'] = current_grip
            override_gripper = 1.0 # OPEN!
            state = "RETRACT"

        elif state == "RETRACT":
            retract_target = current_grip.copy()
            retract_target[2] = SAFE_Z_HEIGHT
            
            raw_obs['desired_goal'] = retract_target
            override_gripper = 1.0 
            
            if current_grip[2] >= SAFE_Z_HEIGHT :
                print(">> Operation Complete.")
                return # SUCCESS

        # C. NORMALIZE & STEP
        obs_vec = {key: val[np.newaxis, ...] for key, val in raw_obs.items()}
        obs_norm = vec_env.normalize_obs(obs_vec)
        action, _ = model.predict(obs_norm, deterministic=True)
        
        if override_gripper is not None:
            action[0, 3] = override_gripper

        _, _, done, info = vec_env.step(action)
        
        # --- FIX 3: IGNORE 'DONE' ---
        # If the environment auto-resets despite our hack, we don't want to break the loop immediately.
        # However, if it DOES reset, physics are broken anyway. 
        # But this prevents a premature exit if 'done' signals success instead of timeout.
        if done:
            # We don't break here, we trust our state machine to return.
            # But if physics reset, the next loops will look weird.
            pass
        
        time.sleep(0.01)
        vec_env.render()

# The swap_objects function remains the same, 
# but ensure you use the updated surgical_routine above.

In [5]:
def swap_objects(vec_env, model, obj_id_A, obj_id_B):
    """
    Swaps Object A and Object B using a buffer position.
    """
    
    # 1. Initialization (Reset ONCE here)
    vec_env.reset()
    
    # Access wrapper to read positions
    env_stack = vec_env.envs[0]
    wrapper = env_stack
    while not hasattr(wrapper, "set_active_object"):
        if hasattr(wrapper, "env"):
            wrapper = wrapper.env
        else:
            raise ValueError("Wrapper not found")

    # 2. Get Initial Positions
    # We must set active object to read its true data correctly using your helper
    wrapper.set_active_object(obj_id_A)
    pos_A = wrapper.get_current_obs()['observation'][3:6].copy()
    
    wrapper.set_active_object(obj_id_B)
    pos_B = wrapper.get_current_obs()['observation'][3:6].copy()
    
    # Define Buffer Position (Empty spot on the table)
    # Assuming table center is roughly 1.3, 0.75
    buffer_pos = np.array([1.40, 0.64, 0.43]) 
    
    print(f"Plan: A({obj_id_A}) -> Buffer")
    print(f"Plan: B({obj_id_B}) -> A's Spot")
    print(f"Plan: A({obj_id_A}) -> B's Spot")
    
    # --- STEP 1: Move A to Buffer ---
    surgical_routine(
        vec_env, model, 
        final_target=buffer_pos, 
        active_obj_idx=obj_id_A, 
        reset_env=False  # Do NOT reset, we just did
    )
    
    # --- STEP 2: Move B to A's old spot ---
    surgical_routine(
        vec_env, model, 
        final_target=pos_A, 
        active_obj_idx=obj_id_B, 
        reset_env=False
    )
    
    # --- STEP 3: Move A (from Buffer) to B's old spot ---
    surgical_routine(
        vec_env, model, 
        final_target=pos_B, 
        active_obj_idx=obj_id_A, 
        reset_env=False
    )
    
    print("SWAP COMPLETE!")

# --- EXECUTE ---
# Swap Object 0 and Object 1
swap_objects(eval_env, model, 1, 2)

Plan: A(1) -> Buffer
Plan: B(2) -> A's Spot
Plan: A(1) -> B's Spot

--- Moving Object 1 ---
>> Hover Reached. Descending...
>> In Position. Grasping...
>> Object Moving! Lifting...
>> Lifted Safe (Z=0.61). Moving to Target...
>> target reached.placing...

--- Moving Object 2 ---
>> Hover Reached. Descending...
>> In Position. Grasping...
>> Object Moving! Lifting...
>> Lifted Safe (Z=0.62). Moving to Target...
>> target reached.placing...

--- Moving Object 1 ---
>> Hover Reached. Descending...
>> In Position. Grasping...
>> Object Moving! Lifting...
>> Lifted Safe (Z=0.61). Moving to Target...
>> target reached.placing...
SWAP COMPLETE!


In [10]:
eval_env.close()