## Self Balancing Robot in PyBullet
**Balance and control of a 2-wheeled robot simulated with PyBullet Physics library**
<br>V2: Everything Implemented in GYM 

Importing Required Libraries

In [1]:
import pybullet
import time
import pybullet_data
import numpy as np
import matplotlib.pyplot as plt
import gym
from gym import spaces

GYM Environment For Robot

In [8]:
class SelfBalancing(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(SelfBalancing, self).__init__()
        # Define action and observation space
        self.action_space = spaces.Box(low=np.array([-1000,-0.01,-20]), high=np.array([+1000,+0.01,20]), dtype=np.float32)
        self.observation_space = spaces.Box(low=np.array([-np.pi/2,-1000]), high=np.array([+np.pi/2,+1000]), dtype=np.float32)
        """
            Action Space: action[0] -> kp, action[1] -> ki, action[2] -> kd
            Observation Space: torso_pitch orientation, torso linear speed
        """
        self.state = np.array([0.0,0.0])
        self.steps = 0
        self.max_episode_steps = 5000
        # Instantiate PyBullet
        phisycsClient = pybullet.connect(pybullet.GUI)
        pybullet.setAdditionalSearchPath(pybullet_data.getDataPath())
        # Spawn Robot
        self.robotID = None
        self.reset()
        # Initialize Controller Parameters
        self.integral = 0
        self.derivative = 0
        self.prev_error = 0
        
    def step(self, action):
        motion = self.controller(action)
        self.take_action(motion)
        ## Calculating reward
        reward = self.calculate_reward()
        obs = self.observe
        done = self.terminated()
        return obs, reward, done, {}
    
    def take_action(self,motion):
        # Takes a tuple as input
        # motion --> (left wheel speed, right wheel speed)
        pybullet.setJointMotorControl2(bodyUniqueId=self.robotID, 
                        jointIndex=0, 
                        controlMode=pybullet.VELOCITY_CONTROL,
                        targetVelocity = motion[0])
        pybullet.setJointMotorControl2(bodyUniqueId=self.robotID, 
                        jointIndex=1, 
                        controlMode=pybullet.VELOCITY_CONTROL,
                        targetVelocity = motion[1])
        pybullet.stepSimulation()
        time.sleep(1.0/240.0)
        self.steps += 1
    
    def observe(self):
        position, orientation = pybullet.getBasePositionAndOrientation(self.robotID)
        self.state[0] = np.array([pybullet.getEulerFromQuaternion(orientation)[0]])
        linear_vel, anagular_vel = pybullet.getBaseVelocity(self.robotID)
        self.state[1] = (linear_vel[0]**2 + linear_vel[1]**2 + linear_vel[2]**2) ** 0.5
        return self.state
    
    def calculate_reward(self):
        reward = - (self.observe()[0]**2 + self.observe()[1]**2)
        if self.terminated():
            reward += (self.steps - self.max_episode_steps) / 500
        return reward
    
    def controller(self,action):
        ## Simple PID
        error = self.observe()[0]
        self.integral += error
        self.derivative = error - self.prev_error
        self.prev_error = error
        
        motion = (action[0] * error + action[1] * self.integral + action[2] * self.derivative)
        return (motion,motion)
    
    def reset(self):
        pybullet.resetSimulation()
        planeID = pybullet.loadURDF("plane.urdf")
        pybullet.setGravity(0,0,-9.81)
        self.robotID = pybullet.loadURDF("robot.urdf",
                                 [0.0,0.0,0.0],pybullet.getQuaternionFromEuler([0.0,0.0,0.0]),useFixedBase = 0)
        pybullet.setRealTimeSimulation(0) # change to (1) for real time simulation
        self.state = self.observe()
    
    def terminated(self):
        ## If the robot tilt angle reaches 75 degrees or
        ## the simulation reaches its maximum time steps
        if self.steps > self.max_episode_steps or abs(self.state[0]) > (np.pi / (75/80)):
            return True
        else:
            return False

    def render(self, mode='human', close=False):
        pass
    
    def __del__(self):
        pybullet.disconnect()

Exception ignored in: <function SelfBalancing.__del__ at 0x000002247C4F7CA8>
Traceback (most recent call last):
  File "<ipython-input-2-fcaadefef76d>", line 96, in __del__
pybullet.error: Not connected to physics server.


Test Environment with (Kp = 650, Ki = 0.005, Kd = 10.0)

In [9]:
env = SelfBalancing()
while not env.terminated():
    env.step((650.0,0.005,10.0))
del env

Using COAX to Train an Agent

In [4]:
import gym
import coax
import optax
import haiku as hk
from jax.nn import relu

# pick environment
env = SelfBalancing()
env = coax.wrappers.TrainMonitor(env)

def func_v(S, is_training):
    # custom haiku function
    value = hk.Sequential([
                          hk.Linear(20), relu,
                          hk.Linear(20), relu,
                          hk.Linear(1),])
    return value(S)  # output shape: (batch_size,)

def func_pi(S, is_training):
    # custom haiku function (for discrete actions in this example)
    logits = hk.Sequential([
                          hk.Linear(20), relu,
                          hk.Linear(20), relu,
                          hk.Linear(3),])
    return {'logits': logits(S)}  # logits shape: (batch_size, num_actions)

# function approximators
v = coax.V(func_v, env)
pi = coax.Policy(func_pi, env)

# slow-moving avg of pi
pi_behavior = pi.copy()

# specify how to update policy and value function
ppo_clip = coax.policy_objectives.PPOClip(pi, optimizer=optax.adam(0.001))
simple_td = coax.td_learning.SimpleTD(v, optimizer=optax.adam(0.001))


# specify how to trace the transitions
tracer = coax.reward_tracing.NStep(n=5, gamma=0.9)
buffer = coax.experience_replay.SimpleReplayBuffer(capacity=256)

for ep in range(100):
    s = env.reset()

    for t in range(env.spec.max_episode_steps):
        a, logp = pi_behavior(s, return_logp=True)
        s_next, r, done, info = env.step(a)

        # add transition to buffer
        tracer.add(s, a, r, done, logp)
        while tracer:
            buffer.add(tracer.pop())

        # update
        if len(buffer) == buffer.capacity:
            for _ in range(4 * buffer.capacity // 32):  # ~4 passes
                transition_batch = buffer.sample(batch_size=32)
                metrics_v, td_error = simple_td.update(transition_batch, return_td_error=True)
                metrics_pi = ppo_clip.update(transition_batch, td_error)
                env.record_metrics(metrics_v)
                env.record_metrics(metrics_pi)

            buffer.clear()
            pi_behavior.soft_update(pi, tau=0.1)

        if done:
            break

        s = s_next

Checking Learned Policy

In [9]:
env = SelfBalancing()
while not env.terminated():
    s = env.observe()
    action = pi(s, return_logp=False)
    env.step(action)
del env