## Full Body Trajectory for Walking
**Implmentation of an A2C Agent for learning Biped Gait** 

Importing Required Libraries

In [None]:
import pybullet
import time
import pybullet_data
import numpy as np
import matplotlib.pyplot as plt

import gym
from gym import spaces

from qibullet import SimulationManager
from qibullet import NaoVirtual

GYM Based Environment

In [None]:

"""
    TODO:
        0-Check Step Time and IMU Frequency
        1-Calculate Linear velocity of pelvis from IMU Data
        2-Complete observe()
        3-Form Reward Function 
"""

class NAO(gym.Env):
    #metadata = {'render.modes': ['human']}

    def __init__(self,imu_freq = 240,max_episode_legth = 10000):
        super(SelfBalancing, self).__init__()
        # Define action and observation space
        self.action_space = spaces.Box(low=np.array([-1.53589,-1.14529,-0.379435,-0.0923279,-1.18944,-0.397761, 
                                                    -1.53589,-1.14529,-0.79046,-0.0923279,-1.1863,-0.768992]),
                                       high=np.array([0.48398,0.740718,0.79046,2.11255,0.922581,0.768992,
                                                     0.48398,0.740718,0.379435,2.11255,0.932006,0.397761]),
                                       dtype=np.float64)
        self.observation_space = spaces.Box(low=np.array([-10,-10,-10,-10]), 
                                            high=np.array([10,10,10,10]),
                                           dtype=np.float64) 
        """""""""""""""""""""
            Action Space: 12 component for lower limb joint positions
                          LHipPitch,LHipPitchYaw,LHipRoll,LkneePitch,LAnklePitch,LAnkleRoll
                          RHipPitch,RHipPitchYaw,RHipRoll,RkneePitch,RAnklePitch,RAnkleRoll
        """"""""""""""""""
            Observation Space: LPelvis,RPelvis Poition (z component), 
                               Torso Orientation (y component),
                               Toros's linear velocity (normal x & y component)
        """""""""""""""""""""
        self.max_steps = max_episode_length
        self.steps = 0
        self.imu_freq = imu_freq
        
        self.velocity_x = 0
        self.simulation_manager,self.phisycsClient,self.nao, self.imu = self.reset()
        self.effort = 1.0
        
    def step(self,action):
        self.take_action(action)
        obs = self.observe()
        reward = self.calculate_reward(obs)
        done = self.is_terminated()
        return obs, reward, done, {}
    
    def take_action(self,action):
        self.nao.setAngles(["LHipPitch","LHipPitchYaw","LHipRoll","LkneePitch","LAnklePitch","LAnkleRoll", ,
                            "RHipPitch","RHipPitchYaw","RHipRoll","RkneePitch","RAnklePitch","RAnkleRoll"],
                          action, [self.effort]*12)
        pybullet.stepSimulation()
        pass
    
    def calculate_reward(self,obs):
        
        pass
    
    def observe(self):
        _, linear_acceleration = self.nao.getImuValues()
        a = linear_aceleration[0]
        self.velocity_x += (1/self.imu_freq) * a 
        
        torsoYaw = pybullet.getEulerFromQuaternion(nao.getLinkPosition("torso")[1])[2]
        torsoPitch = pybullet.getEulerFromQuaternion(nao.getLinkPosition("torso")[1])[1]
        torsoRoll = pybullet.getEulerFromQuaternion(nao.getLinkPosition("torso")[1])[0]
        
        pelvisHeight = 0.5*(self.nao.getLinkPosition(["LPelvis"])[0][2] + self.nao.getLinkPosition(["RPelvis"])[0][2])  
        return np.array([self.velocity_x,torso_Yaw , torsoPitch, torsoRoll, PelvisHeight])
    
    def is_terminated():
        if self.steps == self.max_steps:
            self.steps = 0
             self.simulation_manager,self.phisycsClient,self.nao, self.imu = self.reset()
            return True
        elif #robot colapses:
            self.simulation_manager,self.phisycsClient,self.nao, self.imu = self.reset()
            return True
        else:
            return False
    
    def reset(self):
        # Close Previous Environment
        simulation_manager.stopSimulation(phisycsClient)
        
        #Instantiate Phisycs Client
        phisycsClient = pybullet.connect(pybullet.GUI)
        pybullet.setAdditionalSearchPath(pybullet_data.getDataPath())
        
        #Spawn Robot
        simulation_manager = SimulationManager()
        simulation_manager.setGravity(phisycsClient, [0.0, 0.0, -9.81])
        nao = simulation_manager.spawnNao(phisycsClient, spawn_ground_plane=True) 
        pybullet.setRealTimeSimulation(0)
        imu = nao.getImu()
        nao.subscribeImu(frequency=self.imu_freq)
        self.velocity_x = 0
        return simulation_manager,phisycsClient,nao, imu
    
    def __close__(self):
        self.simulation_manager.stopSimulation(self.phisycsClient)

A2C Agent

In [None]:
import gym
import coax
import optax
import haiku as hk
import jax.numpy as jnp
import jax
from numpy import prod

# pick environment
name = 'A2C'
env = NAO()
env = coax.wrappers.TrainMonitor(env, name=name, tensorboard_dir=f"./data/tensorboard/{name}") 

def func_v(S, is_training):
    # custom haiku function
    value = hk.Sequential([
                          hk.Linear(20),
                          hk.Linear(20),
                          hk.Linear(1,w_init=jnp.zeros),jnp.ravel])
    return value(S)  # output shape: (batch_size,)

def func_pi(S, is_training):
    shared = hk.Sequential((
        hk.Linear(20), jax.nn.relu,
        hk.Linear(20), jax.nn.relu,
    ))
    mu = hk.Sequential((
        shared,
        hk.Linear(10), jax.nn.relu,
        hk.Linear(3, w_init=jnp.zeros),
        hk.Reshape(env.action_space.shape),
    ))
    logvar = hk.Sequential((
        shared,
        hk.Linear(8), jax.nn.relu,
        hk.Linear(3, w_init=jnp.zeros),
        hk.Reshape(env.action_space.shape),
    ))
    return {'mu': mu(S), 'logvar': logvar(S)}

# function approximators
v = coax.V(func_v, env)
pi = coax.Policy(func_pi, env)


# specify how to update policy and value function
vanilla_pg = coax.policy_objectives.VanillaPG(pi, optimizer=optax.adam(0.001))
simple_td = coax.td_learning.SimpleTD(v, optimizer=optax.adam(0.002))


# specify how to trace the transitions
tracer = coax.reward_tracing.NStep(n=5, gamma=0.9)
buffer = coax.experience_replay.SimpleReplayBuffer(capacity=256)


for ep in range(10):
    s = env.reset()

    for t in range(env.max_episode_steps):
        a, logp = pi(s, return_logp=True)
        s_next, r, done, info = env.step(a)

        # add transition to buffer
        # N.B. vanilla-pg doesn't use logp but we include it to make it easy to
        # swap in another policy updater that does require it, e.g. ppo-clip
        tracer.add(s, a, r, done, logp)
        while tracer:
            buffer.add(tracer.pop())

        # update
        if len(buffer) == buffer.capacity:
            for _ in range(4 * buffer.capacity // 32):  # ~4 passes
                transition_batch = buffer.sample(batch_size=32)
                metrics_v, td_error = simple_td.update(transition_batch, return_td_error=True)
                metrics_pi = vanilla_pg.update(transition_batch, td_error)
                env.record_metrics(metrics_v)
                env.record_metrics(metrics_pi)

            buffer.clear()

        if done:
            break

        s = s_next
del env