In [1]:
!pip install gym==0.10.5
# !pip install pybullet # 3.0.8
!pip install pybullet==2.0.8
!pip install ffmpeg-python # 0.2.0

In [2]:
import numpy as np # 1.19.4
import operator as op
from itertools import chain
from collections import defaultdict

In [3]:
# import gym # 0.18.0
import gym
import pybullet_envs

In [4]:
import warnings
warnings.simplefilter('ignore')
warnings.filterwarnings('ignore')

In [5]:
class AugmentedRS(object):

    def __init__(self, env, no_inputs, no_outputs):
        super(AugmentedRS, self).__init__()
        np.random.seed(1)
        self.episode_length = 1000
        self.env = env
        self.no_inputs = no_inputs
        self.no_outputs = no_outputs

        self.step_size = 50
        self.no_directions = 18
        self.sigma_exploration_noise = 0.02
        self.no_best_directions = 16
        self.learning_rate = 0.01

        self.n = np.zeros(self.no_inputs)
        self.mean = np.zeros(self.no_inputs)
        self.var_num = np.zeros(self.no_inputs)
        self.variance = np.zeros(self.no_inputs)

        self.theta = np.zeros((self.no_outputs, self.no_inputs))

    def norm_state(self, state):
        prev_mean = self.mean.copy()
        self.mean = (state+(self.n*prev_mean))/(self.n+1)
        self.var_num += (state-prev_mean)*(state-self.mean)
        self.variance = (self.var_num/(self.n+1)).clip(min = 1e-2)
        self.n += 1
        return (state-self.mean)/np.sqrt(self.variance)

    def evaluate(self, inputs, direction=None, delta=None):
        if not direction: return self.theta.dot(inputs)
        return (direction(self.theta, self.sigma_exploration_noise*delta)).dot(inputs)
    
    def update(self, rollouts, stddev_reward):
        step = np.zeros(self.theta.shape)
        for preward, nreward, delta in rollouts:
            step += (preward-nreward)*delta
        self.theta += (self.learning_rate/(self.no_best_directions*stddev_reward))*step
    
    def explore(self, direction=None, delta=None):
        state = self.env.reset()
        frames, done, total_reward = 0., False, 0
        while not done and frames < self.episode_length:
            state = self.norm_state(state)
            action = self.evaluate(state, direction, delta)
            state, reward, done, info = self.env.step(action)
            if reward > 1: reward = 1
            elif reward < -1: reward = -1
            total_reward += reward
            frames += 1
        return total_reward
    
    def train(self, verbose=True):
        for step in range(self.step_size):
            score, delta = defaultdict(list), [np.random.rand(*self.theta.shape) for _ in range(self.no_directions)]
            for move in range(self.no_directions):
                score[move].append(self.explore(op.add, delta[move]))
                score[move].append(self.explore(op.sub, delta[move]))
            stddev_reward = np.array(list(chain.from_iterable(score.values()))).std()
            sorted_keys = sorted(score.keys(), key=lambda x: score[x])[:self.no_best_directions]
            rollouts = [[*score[k], delta[k]] for k in sorted_keys]
            self.update(rollouts, stddev_reward)
            if(verbose): print('step:', step, 'total_reward:', self.explore())

In [6]:
video_path = '/content/ARS_Videos'

env = gym.make('HalfCheetahBulletEnv-v0')
env = gym.wrappers.Monitor(env, video_path, force=True)
no_inputs, no_outputs = env.observation_space.shape[0], env.action_space.shape[0]

In [7]:
ars = AugmentedRS(env, no_inputs, no_outputs)
ars.train()

In [8]:
print(ars.theta)

References

1. https://towardsdatascience.com/introduction-to-augmented-random-search-d8d7b55309bd