In [1]:
import os
from datetime import datetime
import time

import numpy as np
import matplotlib.pyplot as plt

import gym
from gym import Wrapper
import panda_gym
import numpy as np
from datetime import datetime
import sys
import cv2

from phase_1_awac_agent import AWACAgent
from utils_awac import ActionNormalizer, ResetWrapper, TimeFeatureWrapper, TimeLimitWrapper, RealRobotWrapper, save_results, reconstruct_state
from phase_1_func import CustomResetWrapper, choose_task, get_participant_configuration, initialize_envs_for_condition, get_base_env
from task_envs import Phase1TaskCentreEnv, Phase1TaskLeftEnv, Phase1TaskRightEnv
import imageio


CURRENT_DIR = os.getcwd() 
PARENT_DIR = os.path.dirname(CURRENT_DIR)
print(f"CURRENT_DIR: {CURRENT_DIR}")
print(f"PARENT_DIR: {PARENT_DIR}")



  fn()


PyTorch version: 2.4.0+cpu
PyTorch CUDA version: None
CUDA available: False
CUDA device count: 0
CUDA device name: No GPU found
CURRENT_DIR: c:\Users\Konstantin\Documents\Meta-learning-thesis\active_panda\algs
PARENT_DIR: c:\Users\Konstantin\Documents\Meta-learning-thesis\active_panda


In [2]:
def load_demo_data(task_name, parent_dir):
    joystick_demo_path = os.path.join(parent_dir, 'demo_data', 'joystick_demo', task_name)
    print(f"Looking for files in: {joystick_demo_path}")
    state_trajs = np.genfromtxt(os.path.join(joystick_demo_path, 'demo_state_trajs.csv'), delimiter=' ')
    action_trajs = np.genfromtxt(os.path.join(joystick_demo_path, 'demo_action_trajs.csv'), delimiter=' ')
    return state_trajs, action_trajs

In [3]:
def rollout_demo(task_name, env, parent_dir):
    # Load pre-existing demo data
    state_trajs, action_trajs = load_demo_data(task_name, parent_dir)


    # Indices where the demonstrations start
    starting_ids = [i for i in range(state_trajs.shape[0]) if np.isinf(state_trajs[i][0])]
    base_env = get_base_env(env)
    base_env.enable_line_drawing = False


    # Video parameters
    fps = 30
    frame_height = 720
    frame_width  = 640
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    input("Press Enter to continue: ")

    for i in range(len(starting_ids)):
        starting_id = starting_ids[i]
        if i < len(starting_ids) - 1:
            end_id = starting_ids[i + 1]
        else:
            end_id = state_trajs.shape[0]

        # Get initial state and actions for the current demonstration
        init_state = state_trajs[starting_id + 1]
        actions = action_trajs[starting_id + 1:end_id]

        goal_pos = env.task.goal_position
        object_pos = init_state[6:9]
        
        # Set the initial state in the environment
        state = env.reset(goal_pos=goal_pos, object_pos=object_pos, object_color=env.object_color)

        # video_filename = f"{task_name}_demo_{i}.mp4"
        # out = cv2.VideoWriter(video_filename, fourcc, fps, (frame_width, frame_height))

        curr_state_traj = []
        curr_action_traj = []
        curr_next_state_traj = []
        curr_reward_traj = []
        curr_done_traj = []

        # raw_frame = env.render(mode='rgb_array')  # Possibly a 1D array/tuple
        # frame_array = np.array(raw_frame, dtype=np.uint8)  # shape = (1382400,)
        # # frame_reshaped = frame_array.reshape((720, 640, 3))  # or (720, 640, 3)
        # frame_reshaped = frame_array.reshape((480, 960, 3))
        # frame_reshaped = cv2.cvtColor(frame_reshaped, cv2.COLOR_RGB2BGR)
        # print("Reshaped frame shape:", frame_reshaped.shape)
        # # left_half = frame_reshaped[:, :320, :]
        # # right_half = frame_reshaped[:, 320:, :]
        # plt.imshow(frame_reshaped)
        # plt.show()
        # plt.subplot(1, 2, 1)
        # plt.imshow(left_half)
        # plt.title("Left half")
        # plt.subplot(1, 2, 2)
        # plt.imshow(right_half)
        # plt.title("Right half")

        for action in actions:
            time.sleep(0.02)
            action_copy = action.copy()
            next_state, reward, done, info = env.step(action)
            
            reshaped_state = reconstruct_state(state)
            reshaped_next_state = reconstruct_state(next_state)
            
            curr_state_traj.append(reshaped_state.copy())
            curr_action_traj.append(action_copy)
            curr_next_state_traj.append(reshaped_next_state.copy())
            curr_reward_traj.append(np.array([reward]))
            curr_done_traj.append(np.array([float(done)]))

        #     raw_frame = env.render(mode='rgb_array') # a 1D array/tuple shape = (1382400,)

        #     # frame = np.array(raw_frame, dtype=np.uint8)
        #     frame = np.array(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))
        #     # If colors are reversed (e.g., BGR vs RGB), you may need to convert:
        #     # frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        #     out.write(frame)


            state = next_state
            
            if done:
                env.clear_lines_gripper()
                env.clear_lines()
                break
    
        # out.release()
        # print(f"Saved demonstration {i} as {video_filename}")     

    env.close()

In [4]:
def main():
    # Initialization (as you already have it)
    log_data = []
    TESTING = False
    TRAINING = True
    TIMESTAMP = "{0:%Y-%m-%dT%H-%M-%S/}".format(datetime.now())

    participant_code = "train"
    task_type = choose_task()

    if task_type == 1:
        env = Phase1TaskLeftEnv(render=True, reward_type="modified_sparse", control_type='ee')
        task_name = "Phase1TaskLeft"
    elif task_type == 2:
        env = Phase1TaskCentreEnv(render=True, reward_type="modified_sparse", control_type='ee')
        task_name = "Phase1TaskCentre"
    elif task_type == 3:
        env = Phase1TaskRightEnv(render=True, reward_type="modified_sparse", control_type='ee')
        task_name = "Phase1TaskRight"

    env = ActionNormalizer(env)
    env = CustomResetWrapper(env=env)
    env = TimeFeatureWrapper(env=env, max_steps=120, test_mode=False)
    env = TimeLimitWrapper(env=env, max_steps=120)
    
    # The environment is initialized here
    print("Environment initialized successfully.")

    # Pass the initialized environment (env) to rollout_demo
    rollout_demo(task_name, env, PARENT_DIR)

if __name__ == "__main__":
    main()

Choose a task type (1=left, 2=centre, 3=right):


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


client id is: 0
no default max steps
Environment initialized successfully.
Looking for files in: c:\Users\Konstantin\Documents\Meta-learning-thesis\active_panda\demo_data\joystick_demo\Phase1TaskLeft
