# Necessary installation for running in Google Colab

In [1]:
# Install required packages (cross-platform)
%pip install -q "gym==0.26.2" "Box2D==2.3.10" "pyglet==1.5.27" "pyopengl" "imageio" "imageio-ffmpeg"

Note: you may need to restart the kernel to use updated packages.




In [None]:
pip install moviepy

Collecting moviepy
  Using cached moviepy-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting proglog<=1.0.0 (from moviepy)
  Using cached proglog-0.1.12-py3-none-any.whl.metadata (794 bytes)
Using cached moviepy-2.2.1-py3-none-any.whl (129 kB)
Using cached proglog-0.1.12-py3-none-any.whl (6.3 kB)
Installing collected packages: proglog, moviepy

   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -------------------- ------------------- 1/2 [moviepy]
   -----------



In [0]:
# Headless support for Linux (no-op on Windows/Mac)
import os, platform
if platform.system() == "Linux" and not os.environ.get("DISPLAY"):
    os.environ["SDL_VIDEODRIVER"] = "dummy"

# Augmented Random Search

In [0]:
import os
import numpy as np
import gym

In [0]:
class HP():
    # Hyperparameters
    def __init__(self,
                 nb_steps=1000,
                 episode_length=2000,
                 learning_rate=0.02,
                 num_deltas=16,
                 num_best_deltas=16,
                 noise=0.03,
                 seed=1,
                 env_name='BipedalWalker-v3',
                 record_every=50,
                 use_tanh_actions=True,
                 rff_dim=256,
                 rff_scale=1.0,
                 learning_rate_decay=0.995,
                 noise_decay=0.995,
                 min_learning_rate=1e-4,
                 min_noise=0.005):

        self.nb_steps = nb_steps
        self.episode_length = episode_length
        self.learning_rate = learning_rate
        self.num_deltas = num_deltas
        self.num_best_deltas = num_best_deltas
        assert self.num_best_deltas <= self.num_deltas
        self.noise = noise
        self.seed = seed
        self.env_name = env_name
        self.record_every = record_every
        self.use_tanh_actions = use_tanh_actions
        self.rff_dim = rff_dim
        self.rff_scale = rff_scale
        self.learning_rate_decay = learning_rate_decay
        self.noise_decay = noise_decay
        self.min_learning_rate = min_learning_rate
        self.min_noise = min_noise

In [0]:
class Normalizer():
    # Normalizes the inputs
    def __init__(self, nb_inputs):
        self.n = np.zeros(nb_inputs)
        self.mean = np.zeros(nb_inputs)
        self.mean_diff = np.zeros(nb_inputs)
        self.var = np.zeros(nb_inputs)

    def observe(self, x):
        self.n += 1.0
        last_mean = self.mean.copy()
        self.mean += (x - self.mean) / self.n
        self.mean_diff += (x - last_mean) * (x - self.mean)
        self.var = (self.mean_diff / self.n).clip(min = 1e-2)

    def normalize(self, inputs):
        obs_mean = self.mean
        obs_std = np.sqrt(self.var)
        return (inputs - obs_mean) / obs_std

In [0]:
class Policy():
    def __init__(self, input_size, output_size, hp):
        # Random Fourier Features for richer linear policy
        self.hp = hp
        self.rff_W = np.random.randn(hp.rff_dim, input_size) * (hp.rff_scale / np.sqrt(input_size))
        self.rff_b = 2 * np.pi * np.random.rand(hp.rff_dim)
        self.theta = np.zeros((output_size, hp.rff_dim))

    def featurize(self, x):
        z = self.rff_W.dot(x) + self.rff_b
        return np.sqrt(2.0 / self.hp.rff_dim) * np.cos(z)

    def evaluate(self, input, delta = None, direction = None):
        phi = self.featurize(input)
        if direction is None:
            y = self.theta.dot(phi)
        elif direction == "+":
            y = (self.theta + self.hp.noise * delta).dot(phi)
        elif direction == "-":
            y = (self.theta - self.hp.noise * delta).dot(phi)
        else:
            y = self.theta.dot(phi)
        if self.hp.use_tanh_actions:
            y = np.tanh(y)
        return y

    def sample_deltas(self):
        return [np.random.randn(*self.theta.shape) for _ in range(self.hp.num_deltas)]

    def update(self, rollouts, sigma_rewards):
        # sigma_rewards is the standard deviation of the rewards
        step = np.zeros(self.theta.shape)
        for r_pos, r_neg, delta in rollouts:
            step += (r_pos - r_neg) * delta
        self.theta += self.hp.learning_rate / (self.hp.num_best_deltas * (sigma_rewards + 1e-8)) * step

In [0]:
class ARSTrainer():
    def __init__(self,
                 hp=None,
                 input_size=None,
                 output_size=None,
                 normalizer=None,
                 policy=None,
                 monitor_dir=None):

        self.hp = hp or HP()
        np.random.seed(self.hp.seed)
        # Create env with render_mode for video compatibility (Gym 0.26+)
        try:
            self.env = gym.make(self.hp.env_name, render_mode="rgb_array")
        except TypeError:
            # Older Gym
            self.env = gym.make(self.hp.env_name)
        if monitor_dir is not None:
            # Prefer RecordVideo (Gym 0.26+)
            try:
                from gym.wrappers import RecordVideo
                episode_trigger = lambda episode_id: self.record_video
                self.env = RecordVideo(self.env, video_folder=monitor_dir, episode_trigger=episode_trigger)
            except Exception:
                # Fallback to older Monitor API if available
                try:
                    from gym import wrappers as gym_wrappers
                    should_record = lambda i: self.record_video
                    self.env = gym_wrappers.Monitor(self.env, monitor_dir, video_callable=should_record, force=True)
                except Exception:
                    pass
        # Set episode length from env if available
        max_steps = None
        if hasattr(self.env, "spec") and getattr(self.env.spec, "max_episode_steps", None) is not None:
            max_steps = self.env.spec.max_episode_steps
        elif hasattr(self.env, "_max_episode_steps"):
            max_steps = self.env._max_episode_steps
        self.hp.episode_length = max_steps or self.hp.episode_length
        self.input_size = input_size or self.env.observation_space.shape[0]
        self.output_size = output_size or self.env.action_space.shape[0]
        self.normalizer = normalizer or Normalizer(self.input_size)
        self.policy = policy or Policy(self.input_size, self.output_size, self.hp)
        self.record_video = False
        self.best_theta = None
        self.best_eval_reward = -np.inf

    # Explore the policy on one specific direction and over one episode
    def explore(self, direction=None, delta=None):
        state = self.env.reset()
        # Gym 0.26+ returns (obs, info)
        if isinstance(state, tuple):
            state = state[0]
        done = False
        num_plays = 0.0
        sum_rewards = 0.0
        while not done and num_plays < self.hp.episode_length:
            self.normalizer.observe(state)
            state = self.normalizer.normalize(state)
            action = self.policy.evaluate(state, delta, direction)
            # Clip to action space bounds when available
            try:
                action = np.clip(action, self.env.action_space.low, self.env.action_space.high)
            except Exception:
                pass
            step_result = self.env.step(action)
            if len(step_result) == 5:
                state, reward, terminated, truncated, _ = step_result
                done = terminated or truncated
            else:
                state, reward, done, _ = step_result
            # Do not clip reward; preserve informative signals
            sum_rewards += float(reward)
            num_plays += 1
        return sum_rewards

    def train(self):
        for step in range(self.hp.nb_steps):
            # initialize the random noise deltas and the positive/negative rewards
            deltas = self.policy.sample_deltas()
            positive_rewards = [0] * self.hp.num_deltas
            negative_rewards = [0] * self.hp.num_deltas

            # play an episode each with positive deltas and negative deltas, collect rewards
            for k in range(self.hp.num_deltas):
                positive_rewards[k] = self.explore(direction="+", delta=deltas[k])
                negative_rewards[k] = self.explore(direction="-", delta=deltas[k])
                
            # Compute the standard deviation of all rewards
            sigma_rewards = np.array(positive_rewards + negative_rewards).std() + 1e-8

            # Sort the rollouts by the max(r_pos, r_neg) and select the deltas with best rewards
            scores = {k:max(r_pos, r_neg) for k,(r_pos,r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
            order = sorted(scores.keys(), key = lambda x:scores[x], reverse = True)[:self.hp.num_best_deltas]
            rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]

            # Update the policy
            self.policy.update(rollouts, sigma_rewards)

            # Decay learning rate and noise (without extending steps)
            self.hp.learning_rate = max(self.hp.min_learning_rate, self.hp.learning_rate * self.hp.learning_rate_decay)
            self.hp.noise = max(self.hp.min_noise, self.hp.noise * self.hp.noise_decay)

            # Only record video during evaluation, every n steps
            if step % self.hp.record_every == 0:
                self.record_video = True
            # Play an episode with the new weights and print the score
            reward_evaluation = self.explore()
            print('Step: ', step, 'Reward: ', reward_evaluation)
            self.record_video = False

            # Track best policy and keep it if improved
            if reward_evaluation > self.best_eval_reward:
                self.best_eval_reward = reward_evaluation
                self.best_theta = self.policy.theta.copy()

        # Restore best policy at the end
        if self.best_theta is not None:
            self.policy.theta = self.best_theta.copy()

In [0]:
def mkdir(base, name):
    path = os.path.join(base, name)
    if not os.path.exists(path):
        os.makedirs(path)
    return path

In [10]:
ENV_NAME = 'BipedalWalker-v3'

videos_dir = mkdir('.', 'videos')
monitor_dir = mkdir(videos_dir, ENV_NAME)

hp = HP(env_name=ENV_NAME)
trainer = ARSTrainer(hp=hp, monitor_dir=monitor_dir)
trainer.train()

MoviePy - Building video c:\Users\bonit\Downloads\MACHINE LEARNING DAY BY DAY\DAY 5- RL Bipedal Walker\videos\BipedalWalker-v3\rl-video-episode-32.mp4.
MoviePy - Writing video c:\Users\bonit\Downloads\MACHINE LEARNING DAY BY DAY\DAY 5- RL Bipedal Walker\videos\BipedalWalker-v3\rl-video-episode-32.mp4



                                                                         

MoviePy - Done !
MoviePy - video ready c:\Users\bonit\Downloads\MACHINE LEARNING DAY BY DAY\DAY 5- RL Bipedal Walker\videos\BipedalWalker-v3\rl-video-episode-32.mp4
Step:  0 Reward:  -93.01364912948547
Step:  1 Reward:  -92.8513136246571
Step:  2 Reward:  -92.71392349333215
Step:  3 Reward:  -93.04321505908383
Step:  4 Reward:  -92.32015878345558
Step:  5 Reward:  -93.5140540862218
Step:  6 Reward:  -92.86960345481216
Step:  7 Reward:  -92.01505805020305
Step:  8 Reward:  -91.99871290152512
Step:  9 Reward:  -11.133469509041973
Step:  10 Reward:  -13.26068095742212
Step:  11 Reward:  -11.156611242381485
Step:  12 Reward:  -4.825312277134151
Step:  13 Reward:  -12.467204531119938
Step:  14 Reward:  -11.291375852246665
Step:  15 Reward:  -11.449609151715567
Step:  16 Reward:  -10.433582417174813
Step:  17 Reward:  -5.637453716506404
Step:  18 Reward:  -5.749648916964636
Step:  19 Reward:  -7.075518985324694
Step:  20 Reward:  -11.6282785642106
Step:  21 Reward:  -7.840517091489374
Step: 

                                                                           

MoviePy - Done !
MoviePy - video ready c:\Users\bonit\Downloads\MACHINE LEARNING DAY BY DAY\DAY 5- RL Bipedal Walker\videos\BipedalWalker-v3\rl-video-episode-1682.mp4
Step:  50 Reward:  3.0628345076946246
Step:  51 Reward:  6.311172832817678
Step:  52 Reward:  3.00887079479312
Step:  53 Reward:  4.772360242036146
Step:  54 Reward:  5.646951315868726
Step:  55 Reward:  6.190813795889733
Step:  56 Reward:  4.330555365597287
Step:  57 Reward:  6.071632907145179
Step:  58 Reward:  4.383939051223819
Step:  59 Reward:  6.516155554703882
Step:  60 Reward:  3.8239836948532075
Step:  61 Reward:  6.125271852059448
Step:  62 Reward:  6.1711947296721466
Step:  63 Reward:  6.361444179465918
Step:  64 Reward:  4.237889190271267
Step:  65 Reward:  6.692508226099239
Step:  66 Reward:  5.251593937091923
Step:  67 Reward:  4.959414200036497
Step:  68 Reward:  6.286100222510372
Step:  69 Reward:  5.258159806959852
Step:  70 Reward:  4.904990620928347
Step:  71 Reward:  5.097261970667521
Step:  72 Reward:

KeyboardInterrupt: 

# Download the episodes

In [11]:
import os
from pathlib import Path
folder = Path('videos') / ENV_NAME
if folder.exists():
    print('\n'.join(sorted(p.name for p in folder.iterdir())))
else:
    print(f'No video folder at {folder}')

rl-video-episode-11582.meta.json
rl-video-episode-11582.mp4
rl-video-episode-13232.meta.json
rl-video-episode-13232.mp4
rl-video-episode-14882.meta.json
rl-video-episode-14882.mp4
rl-video-episode-16532.meta.json
rl-video-episode-16532.mp4
rl-video-episode-1682.meta.json
rl-video-episode-1682.mp4
rl-video-episode-32.meta.json
rl-video-episode-32.mp4
rl-video-episode-3332.meta.json
rl-video-episode-3332.mp4
rl-video-episode-4982.meta.json
rl-video-episode-4982.mp4
rl-video-episode-6632.meta.json
rl-video-episode-6632.mp4
rl-video-episode-8282.meta.json
rl-video-episode-8282.mp4
rl-video-episode-9932.meta.json
rl-video-episode-9932.mp4


In [0]:
# Videos are saved locally under videos/<ENV_NAME>. Open the folder to view them.
# Example: print first few files if present.
import os, itertools
video_dir = os.path.join('videos', ENV_NAME)
if os.path.isdir(video_dir):
    for name in itertools.islice(sorted(os.listdir(video_dir)), 10):
        print(name)
else:
    print(f"No local video directory found at {video_dir}")

rl-video-episode-11582.meta.json
rl-video-episode-11582.mp4
rl-video-episode-13232.meta.json
rl-video-episode-13232.mp4
rl-video-episode-14882.meta.json
rl-video-episode-14882.mp4
rl-video-episode-16532.meta.json
rl-video-episode-16532.mp4
rl-video-episode-1682.meta.json
rl-video-episode-1682.mp4
