# Solving the Pick-and-Place Environment in Robosuite

## Abstract

## Introduction




## Environment

Blah Blah about robosuite and Stable Baselines


## Implementation

1. Import numpy and robosuite



In [1]:
import yaml
import numpy as np
import os
import robosuite as suite


from robosuite import load_controller_config
from robosuite.environments.base import register_env
from robosuite.controllers import load_controller_config
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.save_util import save_to_zip_file, load_from_zip_file
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from robosuite.wrappers import GymWrapper

from stable_baselines3 import PPO, DDPG

config = {}
print('Load configuration file config.yaml')
with open("config.yaml") as stream:
    try:
        config = yaml.safe_load(stream)
    except yaml.YAMLError as e:
        print(e)

print("Running environment with training =", str(config["training"]), " and simulation =", str(config["simulation"]))



Load configuration file config.yaml
Running environment with training = True  and simulation = True


2. Create Environment

In [2]:
# create environment instance


def make_env(env_id, options, rank, seed=0):
    """
    Utility function for multiprocessed env.

    :param env_id: (str) the environment ID
    :param options: (dict) additional arguments to pass to the specific environment class initializer
    :param seed: (int) the inital seed for RNG
    :param rank: (int) index of the subprocess
    """
    def _init():
        env = GymWrapper(suite.make(env_id, **options))
        env.render_mode = 'mujoco'
        env = Monitor(env)
        env.reset(seed=seed + rank)
        return env
    set_random_seed(seed)
    return _init

controller_config = load_controller_config(default_controller=config["robot_controller"])
env_options = {"robots":config["robot_name"],
    "controller_configs":controller_config,
    "has_renderer":True,
    "has_offscreen_renderer":False,
    "use_camera_obs":False,
    "reward_shaping":True,
    }


3. Create Reinforcement Learning Model

In [3]:
if config["training"]:

    if config["multiprocessing"]:
        env = SubprocVecEnv([make_env("PickPlace", env_options, i, config["seed"]) for i in range(config["num_envs"])])
    else:
        env = DummyVecEnv([make_env("PickPlace", env_options, i, config["seed"]) for i in range(config["num_envs"])])

    
    if not os.path.isfile(config["model_file_name"] + ".zip"):
        print("No model found, creating a new one")
        
        if config["normalize"]:
            # Normalize environment
            env = VecNormalize(env)

        # Create model
        match config["algorithm"]:
            case "PPO":
                model = PPO("MlpPolicy", env, verbose=1)
            case "DDPG":
                n_actions = env.action_space.shape[-1]
                action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
                model = DDPG("MlpPolicy", env, action_noise=action_noise, verbose=1)
        
        print("Created a new model")
    else:
        print("Loading existing model")
        continue_training_model_path = os.path.join(".", config["model_file_name"] + '.zip')
        continue_training_vecnormalize_path = os.path.join(".", 'vec_normalize_' + config["model_file_name"] + '.pkl')

        if config["normalize"]:
            env = VecNormalize.load(continue_training_vecnormalize_path, env)
        
        match config["algorithm"]:
            case "PPO":
                model = PPO.load(continue_training_model_path, env=env)
            case "DDPG":
                model = DDPG.load(config["model_file_name"] + '.zip', env=env)
        

Loading existing model


3. Train Model

In [4]:
# Create callback
#checkpoint_callback = CheckpointCallback(save_freq=1000000, save_path='./checkpoints/', 
#                        name_prefix="model", verbose=2)

if config["training"]:
    for i in range(config["training_repetitions"]):
        print("Starting learning iteration " + str(i))
        model.learn(total_timesteps=config["training_total_timesteps"])
        
        model.save(config["model_file_name"] + '.zip')
        if config["normalize"]:
            env.save('vec_normalize_' + config["model_file_name"] +'.pkl')

        print("Saved learing iteration " + str(i) + " successfully")

Starting learning iteration 0
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | 0.296    |
| time/              |          |
|    episodes        | 4        |
|    fps             | 53       |
|    time_elapsed    | 187      |
|    total_timesteps | 10000    |
| train/             |          |
|    actor_loss      | -0.171   |
|    critic_loss     | 0.00013  |
|    learning_rate   | 0.001    |
|    n_updates       | 989      |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | 0.296    |
| time/              |          |
|    episodes        | 8        |
|    fps             | 53       |
|    time_elapsed    | 187      |
|    total_timesteps | 10000    |
---------------------------------
Saved learing iteration 0 successfully


4. Apply Model

In [5]:
if config["simulation"]:

    env = SubprocVecEnv([make_env("PickPlace", env_options, i, config["seed"]) for i in range(config["num_envs"])])

    if config["normalize"]:
        env = VecNormalize.load('vec_normalize_' + config["model_file_name"] +'.pkl', env)
        
    match config["algorithm"]:
        case "PPO":
            model = PPO.load(config["model_file_name"] + '.zip', env=env)
        case "DDPG":
            model = DDPG.load(config["model_file_name"] + '.zip', env=env)

    env.training = False
    env.norm_reward = False
    
    obs = env.reset()
    for i in range(10000):
        action, _states = model.predict(obs)
        obs, reward, done, info = env.step(action)
        env.render()
env.close

Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket
Qt: Session management error: Could not open network socket


KeyboardInterrupt: 