In [1]:

import matplotlib.pyplot as plt
import numpy as np
from dm_control import viewer
from tqdm import tqdm
from simulation.dm_control.simulation_api import SimulationAPI
import simulation.dm_control.simulation_control.environments as environments
from simulation.dm_control.ddpg.ddpg import DDPGagent, OUNoise

ModuleNotFoundError: No module named 'simulation.dm_control.ddpg.ddpg'

In [2]:
random_state = np.random.RandomState(42)

LOAD_MODEL = True
RESUME_TRAINING = False
PATH_MODEL = 'passive_hand'
NUM_EPISODES = 1000
BATCH_SIZE = 128
DURATION = 100
ACTOR_LEARNING_RATE = 1e-4
CRITIC_LEARNING_RATE = 1e-3
GAMMA = 0.99
TAU = 1e-2

# for parametrization
sapi = SimulationAPI()
sapi.rebuild_XML()
env = environments.load(domain_name='passive_hand', task_name='lift_sparse')
action_spec = env.action_spec()
dim_action = action_spec.shape[0]
dim_obs = 6

updated object_translate
updated object_change_slope
updated robot_change_finger_length
updated robot_change_joint_stiffness
updated robot_change_finger_spring_default
updated robot_change_thumb_spring_default
updated robot_change_friction


In [3]:
def parse_obs(obs):
    """
    Take only gripper position and object position
    """
    x = np.array([])
    x = np.append(x, obs['grip_pos'])
    x = np.append(x, obs['object_pos'])
    return x


agent = DDPGagent(
    dim_obs,
    dim_action,
    actor_learning_rate=ACTOR_LEARNING_RATE,
    critic_learning_rate=CRITIC_LEARNING_RATE,
    gamma=GAMMA,
    tau=TAU
)

if LOAD_MODEL: agent.load(PATH_MODEL)

noise = OUNoise(dim_action, action_spec.minimum, action_spec.maximum)


def denorm(a):  #  use on model output before passing to env
    act_k = (action_spec.maximum - action_spec.minimum) / 2.
    act_b = (action_spec.maximum + action_spec.minimum) / 2.
    return a * act_k + act_b

In [4]:
if RESUME_TRAINING:
    rewards = []
    avg_rewards = []

    for episode in tqdm(range(NUM_EPISODES)):
        time_step = env.reset()
        state = parse_obs(time_step.observation)
        noise.reset()
        episode_reward = 0
        episode_reward_history = []
        for step in range(DURATION):
            action = agent.get_action(state)
            action = noise.get_action(action, step)
            try:
                time_step_2 = env.step(denorm(action))
            except:
                print(f'Physics Error: {action}')
                break
            state_2 = parse_obs(time_step_2.observation)
            reward = time_step_2.reward
            agent.memory.push(state, action, reward, state_2, -1)
            state = state_2
            if len(agent.memory) > BATCH_SIZE:
                agent.update(BATCH_SIZE)
            episode_reward += reward
            episode_reward_history.append(reward)
        print(f"episode: {episode}, "
              f"reward: {np.round(episode_reward, decimals=2)}, "
              f"average_reward: {np.mean(rewards[-10:])}")
        rewards.append(episode_reward)
        avg_rewards.append(np.mean(rewards[-10:]))

    agent.save(PATH_MODEL)

    plt.plot(rewards)
    plt.plot(avg_rewards)
    plt.plot()
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.show()

In [5]:
t = 0

def policy(time_step):
    global t
    state = parse_obs(time_step.observation)
    action = agent.get_action(state)
    action = noise.get_action(action, t)
    action = denorm(action)
    t += 1
    return action

viewer.launch(env, policy=policy)