<a href="https://colab.research.google.com/github/alirezasakhaei/RL_Course2023_Homeworks/blob/main/HW01/6_CartPole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installations and Imports

In [26]:
!sudo apt-get update
!pip install 'imageio==2.4.0'
!sudo apt-get install -y xvfb ffmpeg
!pip3 install gymnasium[classic_control]

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.10% [Connecting to archive.ubuntu.com (91.189.91.38)] [Waiting for headers] [Wai                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.38)] [Waiting for headers] [Con                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
0% [Waiting for headers] [Waiting fo

In [103]:
import math
import base64
import random
import imageio
import IPython
import matplotlib
import gymnasium as gym
from itertools import count
import matplotlib.pyplot as plt
from collections import namedtuple, deque
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import warnings
warnings.filterwarnings('ignore')

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Utility functions for rendering evironment

In [28]:
def embed_mp4(filename):
  
    video = open(filename,'rb').read()
    b64 = base64.b64encode(video)
    tag = '''
    <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
    Your browser does not support the video tag.
    </video>'''.format(b64.decode())
    
    return IPython.display.HTML(tag)

In [113]:
def create_policy_eval_video(env, policy, filename, num_episodes=1, fps=30):
  
    filename = filename + ".mp4"
    with imageio.get_writer(filename, fps=fps) as video:
        for _ in range(num_episodes):
            state, info = env.reset()
            video.append_data(env.render())
            while True:
                state = torch.from_numpy(state).unsqueeze(0).to(DEVICE)
                action = policy(state)
                state, reward, terminated, truncated, _ = env.step(action.item())
                video.append_data(env.render())
                if terminated or truncated:
                    break
    return embed_mp4(filename)

# Replay Memory and Q-Network

In [149]:
SARS = namedtuple('SARS', ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [107]:
# Complete the Q-Network below. 
# The Q-Network takes a state as input and the output is a vector so that each element is the q-value for an action.

class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        # ==================================== Your Code (Begin) ====================================
        units = [64, 32]
        self.lin0 = nn.Linear(n_observations, units[0])
        self.lin1 = nn.Linear(units[0], units[1])
        self.lin2 = nn.Linear(units[1], n_actions)
        self.relu = nn.ReLU()
        # ==================================== Your Code (End) ====================================

    def forward(self, x):
        # ==================================== Your Code (Begin) ====================================
        x = self.relu(self.lin0(x))
        x = self.relu(self.lin1(x))
        x = self.lin2(x)
        return x
        # ==================================== Your Code (End) ====================================

# Policies

Now we define 2 policies. We use greedy policy for evaluation and e-greedy during training.

In [140]:
# This function takes in a state and returns the best action according to your q-network.
# Don't forget "torch.no_grad()". We don't want gradient flowing through our network. 

# state shape: (1, state_size) -> output shape: (1, 1)  
def greedy_policy(qnet, state):
    state = torch.tensor(state, dtype=qnet.lin0.weight.dtype, device = qnet.lin0.weight.device)
    # ==================================== Your Code (Begin) ====================================
    with torch.no_grad():
        output = qnet(state)
    best_action = torch.argmax(output)
    best_action = torch.tensor(best_action, dtype=torch.int64, device = qnet.lin0.weight.device)
    return best_action
    # ==================================== Your Code (End) ====================================

In [141]:
# state shape: (1, state_size) -> output shape: (1, 1)
# Don't forget "torch.no_grad()". We don't want gradient flowing through our network.

def e_greedy_policy(qnet, state, current_timestep):
    
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * current_timestep / EPS_DECAY)
    # ==================================== Your Code (Begin) ====================================
    # With probability "eps_threshold" choose a random action 
    # and with probability 1-"eps_threshold" choose the best action according to your Q-Network.
    
    best_action = greedy_policy(qnet, state)
    rnd_num = np.random.uniform(0, 1)
    if rnd_num > eps_threshold:
        return best_action
    else:
        return torch.tensor(env.action_space.sample(), dtype=torch.int64, device = qnet.lin0.weight.device)
    # ==================================== Your Code (End) ====================================

# Initial setup

In [145]:
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
MIN_MEMORY = 500

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("CartPole-v1", render_mode='rgb_array')
n_actions = env.action_space.n
state, info = env.reset()
n_observations = len(state)
q_network = DQN(n_observations, n_actions).to(device)
target_network = DQN(n_observations, n_actions).to(device)

target_network.load_state_dict(q_network.state_dict())

optimizer = optim.Adam(q_network.parameters(), lr=LR)
memory = ReplayMemory(10000)

create_policy_eval_video(env, lambda s: greedy_policy(q_network, s), "random_agent")



[-0.02057094  0.01676947 -0.01563998 -0.02650603]


# Training

In [None]:
num_episodes = 200
episode_returns = []
episode_durations = []

for i_episode in range(num_episodes):

    # ==================================== Your Code (Begin) ====================================
    # 1. Start a new episode
    episode_returns.append(0)
    episode_durations.append(0)
    state, info = env.reset()
    done = False
    i = 0
    while not done:
        # 2. Run the environment for 1 step using e-greedy policy
        action = e_greedy_policy(q_network, state, i_episode)
        next_state, reward, terminated, truncated, _ = env.step(int(action.cpu().numpy()))
        done = terminated or truncated
        # 3. Add the (state, action, next_state, reward) to replay memory
        # matching everything
        train_dev = q_network.lin0.weight.device
        train_dtype = q_network.lin0.weight.dtype
        state = torch.tensor(state, device = train_dev, dtype = train_dtype)
        reward = torch.tensor(reward, device = train_dev, dtype = train_dtype)
        next_state = torch.tensor(next_state, device = train_dev, dtype = train_dtype)
        action = torch.tensor(action, device = train_dev, dtype = train_dtype)
        # pushing to the buffer
        memory.push(SARS(state, action, next_state, reward))
        if len(memory) > MIN_MEMORY:
            # 4. Optimize your q_network for 1 iteration
            #       4.1 Sample one batch from replay memory
            sample = memory.sample(BATCH_SIZE)
            batch = SARS(*zip(*sample))
            #       4.2 Compute predicted state-action values using q_network

            #       4.3 Compute expected state-action values using target_network (Don't forget "no_grad" because we don't want gradient through target_network)
            #       4.4 Compute loss function and optimize q_network for 1 step

            # 5. Soft update the weights of target_network
            #       θ′ ← τ θ + (1 −τ )θ′
            #       θ   is q_network weights
            #       θ′  is target_network weights   
            theta = q_network.state_dict()
            theta_prime = target_network.state_dict()
            for key in theta_prime:
                theta_prime[key] = TAU*theta[key] + (1-TAU)*theta_prime[key]
            target_network.load_state_dict(theta_prime)

        # 6. Keep track of the total reward for each episode to plot later
        episode_returns[i_episode] += (GAMMA ** (i)) * reward
        i += 1

    episode_durations[i_episode] = i
    # ==================================== Your Code (End) ====================================  

print('Complete')
plt.plot(range(1, num_episodes+1), episode_durations)

In [None]:
# Render trained model

create_policy_eval_video(env, lambda s: greedy_policy(q_network, s), "trained_agent")

In [156]:
list(zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4)]))

[('a', 'b', 'c', 'd'), (1, 2, 3, 4)]

In [None]:
*[('a', 1), ('b', 2), ('c', 3), ('d', 4)]