# **SYDE 552 Notebook**

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
%cd 'gdrive'/'MyDrive'/'4th Year'/'SYDE 552'/'SYDE 552 Project'

Mounted at /content/gdrive
/content/gdrive/MyDrive/4th Year/SYDE 552/SYDE 552 Project


In [None]:
!apt-get install -qq ffmpeg freeglut3-dev xvfb  # For visualization

!pip install -q git+https://github.com/DLR-RM/stable-baselines3#egg=stable-baselines3[extra]

In [None]:
import gym
import numpy as np
import torch as th
import matplotlib.pyplot as plt

from stable_baselines3 import DQN, A2C, PPO
from stable_baselines3.common.evaluation import evaluate_policy

In [None]:
import gym
import numpy as np
import torch as th
import matplotlib.pyplot as plt

from stable_baselines3 import A2C, DQN, PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.env_util import make_atari_env

from stable_baselines3.common import results_plotter
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results

tensorboard_log = "data_temp/tb/"
env = gym.make("CartPole-v1")

model = A2C("MlpPolicy",
            env,
            verbose=0,
            learning_rate=4e-3,
            tensorboard_log=tensorboard_log,
            seed=2)

timesteps = int(3e5)
model.learn(timesteps, log_interval=10)

mean_reward, std_reward = evaluate_policy(model, model.get_env(), deterministic=True, n_eval_episodes=20)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [None]:
import math
import gym
from gym import spaces, logger
from gym.utils import seeding
import numpy as np

class CartPoleEnv(gym.Env):
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second' : 50
    }

    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5 # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds
        high = np.array([
            self.x_threshold * 2,
            np.finfo(np.float32).max,
            self.theta_threshold_radians * 2,
            np.finfo(np.float32).max])

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high)

        self.seed()
        self.viewer = None
        self.state = None

        self.steps_beyond_done = None

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action))
        state = self.state
        x, x_dot, theta, theta_dot = state
        force = self.force_mag if action==1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)
        temp = (force + self.polemass_length * theta_dot * theta_dot * sintheta) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta* temp) / (self.length * (4.0/3.0 - self.masspole * costheta * costheta / self.total_mass))
        xacc  = temp - self.polemass_length * thetaacc * costheta / self.total_mass
        x  = x + self.tau * x_dot
        x_dot = x_dot + self.tau * xacc
        theta = theta + self.tau * theta_dot
        theta_dot = theta_dot + self.tau * thetaacc
        self.state = (x,x_dot,theta,theta_dot)
        done =  x < -self.x_threshold \
                or x > self.x_threshold \
                or theta < -self.theta_threshold_radians \
                or theta > self.theta_threshold_radians
        done = bool(done)

        if not done:
            reward = 1.0
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = 1.0
        else:
            if self.steps_beyond_done == 0:
                logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.")
            self.steps_beyond_done += 1
            reward = 0.0

        return np.array(self.state), reward, done, {}

    def reset(self):
        self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps_beyond_done = None
        return np.array(self.state)

    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400

        world_width = self.x_threshold*2
        scale = screen_width/world_width
        carty = 100 # TOP OF CART
        polewidth = 10.0
        polelen = scale * 1.0
        cartwidth = 50.0
        cartheight = 30.0

        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l,r,t,b = -cartwidth/2, cartwidth/2, cartheight/2, -cartheight/2
            axleoffset =cartheight/4.0
            cart = rendering.FilledPolygon([(l,b), (l,t), (r,t), (r,b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l,r,t,b = -polewidth/2,polewidth/2,polelen-polewidth/2,-polewidth/2
            pole = rendering.FilledPolygon([(l,b), (l,t), (r,t), (r,b)])
            pole.set_color(.8,.6,.4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth/2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5,.5,.8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0,carty), (screen_width,carty))
            self.track.set_color(0,0,0)
            self.viewer.add_geom(self.track)

        if self.state is None: return None

        x = self.state
        cartx = x[0]*scale+screen_width/2.0 # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])

        return self.viewer.render(return_rgb_array = mode=='rgb_array')

    def close(self):
        if self.viewer: self.viewer.close()

In [None]:
# env = CartPoleEnv()
# env = gym.make("custom-cart-pole-v0")
env = gym.make("CartPole-v1")
# env = gym.make("LunarLanderContinuous-v2")
# env = gym.make("BreakoutNoFrameskip-v4")

In [None]:
tensorboard_log = "data_cart_custom/tb/"

In [None]:
# model = DQN("MlpPolicy",
#             env,
#             verbose=0,
#             train_freq=16,
#             gradient_steps=8,
#             gamma=0.99,
#             exploration_fraction=0.2,
#             exploration_final_eps=0.07,
#             target_update_interval=600,
#             learning_starts=1000,
#             buffer_size=10000,
#             batch_size=128,
#             learning_rate=4e-3,
#             policy_kwargs=dict(net_arch=[256, 256]),
#             tensorboard_log=tensorboard_log,
#             seed=2)

model = A2C("MlpPolicy",
            env,
            verbose=0,
            learning_rate=4e-3,
            tensorboard_log=tensorboard_log,
            seed=2)

# model = PPO("MlpPolicy",
#             env,
#             verbose=1,
#             learning_rate=4e-3,
#             tensorboard_log=tensorboard_log,
#             seed=2)

In [None]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), deterministic=True, n_eval_episodes=20)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

mean_reward:9.25 +/- 0.89


In [None]:
# Optional: Monitor training in tensorboard
%load_ext tensorboard
%tensorboard --logdir $tensorboard_log

In [None]:
model.learn(int(3e5), log_interval=20)

In [None]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), deterministic=True, n_eval_episodes=1)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [None]:
# Set up fake display; otherwise rendering will fail
import os
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'
import base64
from pathlib import Path
from IPython import display as ipythondisplay

def show_videos(video_path='', prefix=''):
  html = []
  for mp4 in Path(video_path).glob("{}*.mp4".format(prefix)):
      video_b64 = base64.b64encode(mp4.read_bytes())
      html.append('''<video alt="{}" autoplay 
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{}" type="video/mp4" />
                </video>'''.format(mp4, video_b64.decode('ascii')))
  ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

In [None]:
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv

def record_video(env_id, model, video_length=500, prefix='', video_folder='videos/'):
  eval_env = DummyVecEnv([lambda: env_id])
  # Start the video at step=0 and record 500 steps
  eval_env = VecVideoRecorder(eval_env, video_folder=video_folder,
                              record_video_trigger=lambda step: step == 0, video_length=video_length,
                              name_prefix=prefix)

  obs = eval_env.reset()
  for _ in range(video_length):
    action, _ = model.predict(obs, deterministic=False)
    obs, _, _, _ = eval_env.step(action)

  # Close the video recorder
  eval_env.close()

In [None]:
# powers_step = [9.8, 12, 14, 16, 30, 50, 100, 150]
# powers_step = [1, 3, 5, 7, 9.8, 100, 150]
powers_step = [150, 100, 50, 40, 30, 16, 12]



total_mean_rewards = []

for each_power_step in powers_step:
  class CartPoleEnv(gym.Env):
      metadata = {
          'render.modes': ['human', 'rgb_array'],
          'video.frames_per_second' : 50
      }

      def __init__(self):
          self.gravity = each_power_step
          self.masscart = 1.0
          self.masspole = 0.1
          self.total_mass = (self.masspole + self.masscart)
          self.length = 0.5 # actually half the pole's length
          self.polemass_length = (self.masspole * self.length)
          self.force_mag = 10.0
          self.tau = 0.02  # seconds between state updates

          # Angle at which to fail the episode
          self.theta_threshold_radians = 12 * 2 * math.pi / 360
          self.x_threshold = 2.4

          # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds
          high = np.array([
              self.x_threshold * 2,
              np.finfo(np.float32).max,
              self.theta_threshold_radians * 2,
              np.finfo(np.float32).max])

          self.action_space = spaces.Discrete(2)
          self.observation_space = spaces.Box(-high, high)

          self.seed()
          self.viewer = None
          self.state = None

          self.steps_beyond_done = None

      def seed(self, seed=None):
          self.np_random, seed = seeding.np_random(seed)
          return [seed]

      def step(self, action):
          assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action))
          state = self.state
          x, x_dot, theta, theta_dot = state
          force = self.force_mag if action==1 else -self.force_mag
          costheta = math.cos(theta)
          sintheta = math.sin(theta)
          temp = (force + self.polemass_length * theta_dot * theta_dot * sintheta) / self.total_mass
          thetaacc = (self.gravity * sintheta - costheta* temp) / (self.length * (4.0/3.0 - self.masspole * costheta * costheta / self.total_mass))
          xacc  = temp - self.polemass_length * thetaacc * costheta / self.total_mass
          x  = x + self.tau * x_dot
          x_dot = x_dot + self.tau * xacc
          theta = theta + self.tau * theta_dot
          theta_dot = theta_dot + self.tau * thetaacc
          self.state = (x,x_dot,theta,theta_dot)
          done =  x < -self.x_threshold \
                  or x > self.x_threshold \
                  or theta < -self.theta_threshold_radians \
                  or theta > self.theta_threshold_radians
          done = bool(done)

          if not done:
              reward = 1.0
          elif self.steps_beyond_done is None:
              # Pole just fell!
              self.steps_beyond_done = 0
              reward = 1.0
          else:
              if self.steps_beyond_done == 0:
                  logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.")
              self.steps_beyond_done += 1
              reward = 0.0

          return np.array(self.state), reward, done, {}

      def reset(self):
          self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
          self.steps_beyond_done = None
          return np.array(self.state)

      def render(self, mode='human'):
          screen_width = 600
          screen_height = 400

          world_width = self.x_threshold*2
          scale = screen_width/world_width
          carty = 100 # TOP OF CART
          polewidth = 10.0
          polelen = scale * 1.0
          cartwidth = 50.0
          cartheight = 30.0

          if self.viewer is None:
              from gym.envs.classic_control import rendering
              self.viewer = rendering.Viewer(screen_width, screen_height)
              l,r,t,b = -cartwidth/2, cartwidth/2, cartheight/2, -cartheight/2
              axleoffset =cartheight/4.0
              cart = rendering.FilledPolygon([(l,b), (l,t), (r,t), (r,b)])
              self.carttrans = rendering.Transform()
              cart.add_attr(self.carttrans)
              self.viewer.add_geom(cart)
              l,r,t,b = -polewidth/2,polewidth/2,polelen-polewidth/2,-polewidth/2
              pole = rendering.FilledPolygon([(l,b), (l,t), (r,t), (r,b)])
              pole.set_color(.8,.6,.4)
              self.poletrans = rendering.Transform(translation=(0, axleoffset))
              pole.add_attr(self.poletrans)
              pole.add_attr(self.carttrans)
              self.viewer.add_geom(pole)
              self.axle = rendering.make_circle(polewidth/2)
              self.axle.add_attr(self.poletrans)
              self.axle.add_attr(self.carttrans)
              self.axle.set_color(.5,.5,.8)
              self.viewer.add_geom(self.axle)
              self.track = rendering.Line((0,carty), (screen_width,carty))
              self.track.set_color(0,0,0)
              self.viewer.add_geom(self.track)

          if self.state is None: return None

          x = self.state
          cartx = x[0]*scale+screen_width/2.0 # MIDDLE OF CART
          self.carttrans.set_translation(cartx, carty)
          self.poletrans.set_rotation(-x[2])

          return self.viewer.render(return_rgb_array = mode=='rgb_array')

      def close(self):
          if self.viewer: self.viewer.close()


  mean_reward, std_reward = evaluate_policy(model, CartPoleEnv(), deterministic=True, n_eval_episodes=20)

  total_mean_rewards.append(mean_reward)
  print("Gravity: ", each_power_step, "Mean Reward: ", mean_reward, "Std Deviation: ", std_reward)
  # print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

print("Step: ", powers_step)
print("Mean Rewards: ", total_mean_rewards)

In [None]:
env = CartPoleEnv()
record_video(CartPoleEnv(), model, video_length=500, prefix='dqn-cartpole')
show_videos('videos', prefix='dqn-cartpole')