# Import Dependencies

In [None]:
!apt-get install -y xvfb python3-opengl ffmpeg > /dev/null 2>&1
!pip install gym[classic_control]
!apt-get install cmake > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup gym pyvirtualdisplay tensorflow ffmpeg imageio-ffmpeg > /dev/null 2>&1

The action is an ndarray with shape (1,) which can take values {0, 1} indicating pushing the cart to the left or right, respectively. Note that the velocity that is reduced or increased by the applied force is not fixed and it depends on the angle the pole is pointing. The center of gravity of the pole varies the amount of energy needed to move the cart underneath it.

In [None]:
import os
import gym
from gym import logger as gymlogger
from gym.wrappers.record_video import RecordVideo
gymlogger.set_level(40) #error only
import tensorflow as tf
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay
import time
from random import randint

In [None]:
# Doesn't seem to work, need to manually go find the video
def show_video():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data=''''''.format(encoded.decode('ascii'))))
    else: 
        print("Could not find video")

# Install dependencies

The action is an ndarray with shape (1,) which can take values {0, 1} indicating pushing the cart to the left or right, respectively. Note that the velocity that is reduced or increased by the applied force is not fixed and it depends on the angle the pole is pointing. The center of gravity of the pole varies the amount of energy needed to move the cart underneath it.

# Load cartpole env

In [None]:
env = gym.make("CartPole-v1")

Check if there are 2 valid discrete actions that can be performed

In [None]:
print(env.action_space)

Display observable space. Note the format of the space being
{
    position
    velocity
    pole angle
    pole angular velocity
}

In [None]:
print(env.observation_space)

In [None]:
observation = env.reset()
print("Initial observations:", observation)

In [None]:
observation, reward, truncated, done, info = env.step(0)
print(f"""Position: {observation[0]:2f}, 
Velocity: {observation[0]:2f}, 
Angle: {observation[0]:2f}, 
Angular velocity: {observation[0]:2f}""")
print("Reward for this step:", reward)
print("Is this round done?", done)

Simple game where the policy is to always choose 0.

In [None]:
observation = env.reset()
cumulative_reward = 0
done = False
count = 0
while not done:
    count += 1
    observation, reward, truncated, done, info  = env.step(0)
    cumulative_reward += reward

print(count)
print(f"""Position: {observation[0]:2f}, 
Velocity: {observation[0]:2f}, 
Angle: {observation[0]:2f}, 
Angular velocity: {observation[0]:2f}""")
print("Cumulative reward for this round:", cumulative_reward)

# Task 1

Development of an RL agent. Demonstrate the correctness of the implementation by sampling a random state from the cart pole environment, inputting to the agent, and outputting a chosen action. Print the values of the state and chosen action in Jupyter notebook.

In [None]:
class CartpoleWorld():
    def __init__(self) -> None:
        self.__env = gym.make("CartPole-v1")
        self.__env.reset()
    def get_observation(self) -> np.ndarray:
        return self.__observation
    def update_world(self,action) -> float:
        self.__observation, self.__reward, self.__truncated, self.__done, _ = self.__env.step(action)
        return self.__reward
    def isEnd(self) -> bool:
        return self.__done or self.__truncated
    def get_reward(self):
        return self.__reward
    

In [None]:
from abc import ABC, abstractmethod

class RLAgent(ABC):
    def __init__(self, env:CartpoleWorld) -> None:
        self.__env = env
        self.__total_reward: float = 0
        
    @abstractmethod
    def get_optimal_action(self, s: np.ndarray):
        pass
    def move(self) -> float:
        if (not self.__env.isEnd()):
            raise Exception("Episode already terminated")
        action = self.get_optimal_action(self.__env.get_observation())
        reward = self.__env.update_world(action)
        # update reward
        self.__total_reward += reward
        return reward

In [None]:
from typing import Dict, List, Tuple

class QLearningAgent(RLAgent):
    def __init__(self, env:CartpoleWorld) -> None:
        super().__init__(env)
        self.__learning_rate = 0.1
        # defined for epsilon soft policy
        self.__epsilon = 0.1
        # dictionary of (state,action) -> quality
        self.__q_table : Dict[Tuple[np.ndarray,int],float] = dict()
        self.__pi_table : Dict[np.ndarray, int] = dict()
        # [left, right] action set
        self.__actions = [0,1]
        self.__discounted_reward = 0.9
    
    def get_optimal_action(self, s: np.ndarray):
        s = self.discretise_observation(s)
        
        # a* is the argmax_a Q(s,a)
        a_star: int = self.argmax_a_Q(s,self.__actions)
        epsilon_over_A: float = self.__epsilon / len(self.__actions)
        
        # apply epsilon soft policy here to encourage exploration
        if (np.random.randn() < 1 - self.__epsilon + epsilon_over_A):
            # pick optimal
            self.__pi_table[s] = a_star
        else:
            # pick random
            self.__pi_table[s] = self.get_random_action()
        return self.__pi_table[s]
    
    def main(self):
        while (not self.__env.isEnd()):
            s = self.__env.get_observation()
            R = self.move()
            s_prime = self.__env.get_observation()
            self.update_q_table(s,R,s_prime)
        
    def update_q_table(self,s: np.ndarray, R: float, s_prime: np.ndarray):
        Q_S_A = self.__q_table[s,self.__pi_table[s]]
        Q_S_A = Q_S_A + self.__learning_rate * \
                (R + self.__discounted_reward*self.argmax_a_Q(s,self.__actions) - Q_S_A)

    def Q(self, state: np.ndarray, action: int) -> int:
        return 0
    
    def argmax_a_Q(self, state: np.ndarray, action_set: List[int]) -> int:
        return max([(action,self.Q(state,action)) for action in action_set],\
                        key=lambda item:item[1])[0]
        
    def get_random_action(self) -> int:
        return round(np.random.rand())
    
    def discretise_observation(self, observation: np.ndarray) -> np.ndarray:
        return np.array([0])

# Task 2:

Demonstrate the effectiveness of the RL agent. Run for 100 episodes (reset the environment at the beginning of each episode) and plot the cumulative reward against all episodes in Jupyter. Print the average reward over the 100 episodes. The average reward should be larger than 195.

In [None]:
episode_results = np.random.randint(150, 250, size=100)
plt.plot(episode_results)
plt.title('Cumulative reward for each episode')
plt.ylabel('Cumulative reward')
plt.xlabel('episode')
plt.show()

Print the average reward over the 100 episodes.

In [None]:
print("Average cumulative reward:", episode_results.mean())
print("Is my agent good enough?", episode_results.mean() > 195)

# Task 3:
Render one episode played by the developed RL agent on Jupyter. Please refer to the sample code link for rendering code

In [None]:
import os
os.getcwd()

In [None]:
import os
video_folder="video"
video_folder = os.path.abspath(video_folder)
print(video_folder)

if os.path.isdir(video_folder):
            print(
                f"Overwriting existing videos at {video_folder} folder "
                f"(try specifying a different `video_folder` for the `RecordVideo` wrapper if this is not desired)"
            )

In [None]:
from IPython.display import HTML
from IPython import display as ipythondisplay
from gym import make
from gym.wrappers.record_video import RecordVideo
env = make("CartPole-v1", render_mode="rgb_array_list")
env = RecordVideo(env, video_folder="video", name_prefix = "rl-video", episode_trigger = lambda x: x % 2 == 0)


for i in range (10):
    print(f"Currently at {i} iteration")
    env.reset()
    while True:
        env.render()

        #your agent goes here
        #use random policy for now
        action = randint(0, 1) #rand_policy_agent(observation)
        observation, reward, done, info, _ = env.step(action) 
        # print(observation, reward, done, info, _)
        if done:
            break; 
    env.close()
show_video()

# Task 4:

Format the Jupyter notebook by including step-by-step instruction and explanation, such that the notebook is easy to follow and run (refer to the tutorial section in the sample notebook). Include text explanation to demonstrate the originality of your implementation and your understanding of the code. For example, for each task, explain your approach and analyze the output; if you improve an existing approach, explain your improvements.