In [1]:
#from https://towardsdatascience.com/introduction-to-augmented-random-search-d8d7b55309bd

import os
import numpy as np
import gym
from gym import wrappers
import safety_gym

import time

In [33]:
class HyperParameters():
    
    def __init__(self):
        self.n_iter = 1000
        self.episode_length = 1000
        self.step_size = 0.02
        self.n_directions = 16
        self.n_best_directions = 16
        assert self.n_best_directions <= self.n_directions
        self.noise = 0.03
        self.seed = 1
        #self.env_name = 'Safexp-PointGoal0-v0'
        #self.env_name = 'Humanoid-v2'
        self.env_name = 'HalfCheetah-v2'

In [3]:
#Normalize states
class Normalizer():
    
    def __init__(self, n_inputs):
        self.n = np.zeros(n_inputs)
        self.mean = np.zeros(n_inputs)
        self.mean_diff = np.zeros(n_inputs)
        self.var = np.zeros(n_inputs)
        
    def observe(self, x):
        self.n+=1
        last_mean = self.mean.copy()
        self.mean += (x-self.mean)/self.n
        self.mean_diff += (x-last_mean)*(x-self.mean)
        self.var = (self.mean_diff/self.n).clip(min=1e-2)
        
    def normalise(self, inputs):
        obs_mean = self.mean
        obs_std = np.sqrt(self.var)
        return (inputs-obs_mean)/obs_std

In [4]:
class Policy():
    
    def __init__(self,input_size,output_size):
        # initiate weights matrix
        self.theta = np.zeros((output_size,input_size))             
    
    def evaluate(self,inputs,delta=None,direction=None):
        if direction is None:
            return (self.theta).dot(inputs)
        elif direction == 'positive':
            return (self.theta + hp.noise * delta).dot(inputs)
        else:
            return (self.theta - hp.noise * delta).dot(inputs)
        
    def sample_deltas(self):
        return [np.random.randn(*self.theta.shape) for _ in range(hp.n_directions)]
    
    def update(self, rollouts , sigma_r):
        step = np.zeros(self.theta.shape)
        
        #Approx Grad. Descent
        for rpos,rneg,d in rollouts:
            step+= (rpos - rneg)*d
            
        self.theta+= hp.step_size/(hp.n_best_directions*sigma_r)*step

In [5]:
#Explore policy on one specific direction over one episode

def explore(env, normalizer, policy, direction = None , delta = None):
    state = env.reset()
    done = False
    num_plays = 0
    sum_rewards = 0
    
    #loop_count = 0
    
    while not done and num_plays < hp.episode_length:
        
        #t0=time.time()
        #print("observe+normalize")
        normalizer.observe(state)
        state = normalizer.normalise(state)
        #t1=time.time()
        #print(t1-t0)
        
        #print("eval")
        action = policy.evaluate(state,delta,direction)
        #t2=time.time()
        #print(t2-t1)
        
        #print("take step")
        state,reward,done,_ = env.step(action)
        #t3=time.time()
        #print(t3-t2)
        
        #print("collect rewards")
        reward = max(min(reward,1),-1)
        sum_rewards += reward
        num_plays += 1
        #t4=time.time()
        #print(t4-t3)
        
        #if not done: loop_count +=1
    
    #print(loop_count)
    return sum_rewards

In [6]:
#Training

def train(env, policy, normalizer, hp):
    for i in range(hp.n_iter):
        
        #print("generate perturbations")
        # Initialize deltas/pertubations for adjusting the weights
        deltas = policy.sample_deltas()
        positive_rewards = [0] * hp.n_directions
        negative_rewards = [0] * hp.n_directions
        
        #print("\npos rollout")
        #Getting the positive rewards in positive direction
        for k in range(hp.n_directions):
            positive_rewards[k] = explore(env, normalizer, policy, direction="positive", delta=deltas[k])
        
        #print("\nneg rollout")
        #Getting the negative rewards in negative direction
        for k in range(hp.n_directions):
            negative_rewards[k] = explore(env, normalizer, policy, direction="negative", delta=deltas[k])
        #print("done rollout")
        
        #print("sorting")
        #Gathering all positive/negative rewards to compute the standard deviation of these results
        all_rewards = np.array(positive_rewards + negative_rewards)
        sigma_r = all_rewards.std()

        #Sorting the rollouts by the max(rpos,rneg) and selecting the best directions
        scores = { k:[max(r_pos,r_pos)]  for k,(r_pos,r_neg) in enumerate(zip(positive_rewards,negative_rewards))}
        order = sorted(scores.keys(), key = lambda x: scores[x])[:hp.n_best_directions]
        rollouts = [[positive_rewards[k],negative_rewards[k],deltas[k]] for k in order]
        
        #print("update policy")
        #Update policy
        policy.update(rollouts,sigma_r)
        
#         env.reset() 
#         env.render()

        #Printing the final reward of the policy after the update
        reward_evaluation = explore(env, normalizer, policy)
        print('Step: ',i,' Reward :', reward_evaluation)


In [7]:
# Make directory for video

def mkdir(base, name):
    path = os.path.join(base, name)
    if not os.path.exists(path):
        os.makedirs(path)
    return path

In [34]:
#Running

work_dir = mkdir('exp', 'blah')
monitor_dir = mkdir(work_dir, 'monitor')

hp = HyperParameters()
np.random.seed(hp.seed)
env = gym.make(hp.env_name)
env = wrappers.Monitor(env, monitor_dir, force=True)
nb_inputs = env.observation_space.shape[0]
nb_outputs = env.action_space.shape[0]
policy = Policy(nb_inputs, nb_outputs)
normalizer = Normalizer(nb_inputs)
train(env, policy, normalizer, hp)



Step:  0  Reward : 950.0444984335867
Step:  1  Reward : 972.8008196189537
Step:  2  Reward : 983.6285977892312


KeyboardInterrupt: 