In [None]:
import numpy as np
import gym
import torch.nn as nn
import torch
from collections import deque
import json
import os
import pygame
import random
import matplotlib.pyplot as plt
from gym.utils.play import play

In [None]:
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('runs/cart_pole_dqn')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
class VfApproxModel(nn.Module):
    """
    Neural Network for Value Function Approximation\n
    Contains Three layers 4->10->10->2
    """
    def __init__(self):
        super().__init__()
        self.layer_1 = nn.Linear(in_features=4,out_features=30)
        self.layer_2 = nn.Linear(in_features=30,out_features=30)
        self.layer_3 = nn.Linear(in_features=30,out_features=2) # 2 actions as output
        self.relu = nn.ReLU()
    
    def forward(self,features):
        out = self.relu(self.layer_1(features))
        out = self.relu(self.layer_2(out))
        out = self.relu(self.layer_3(out))
        # using softmax as action-selection policy
        # out = torch.softmax(out,-1)
        return out

In [None]:
target_policy = VfApproxModel().to(device)
learning_policy = VfApproxModel().to(device)

In [None]:
# Loading weights of learning policy
target_policy.load_state_dict(learning_policy.state_dict())

In [None]:
state = torch.tensor([0,0,4,0],dtype=torch.float32,device=device)
learning_policy(state).max(0)[0]

In [None]:
def update_target_policy(TAU):
        """
        Soft update of the target network's weights\n
        θ′ ← τ θ + (1 −τ )θ
        """
        target_net_state_dict = target_policy.state_dict()
        learning_state_dict = learning_policy.state_dict()
        for key in learning_state_dict:
                target_net_state_dict[key] = learning_state_dict[key]*TAU + target_net_state_dict[key] *(1-TAU)
        
        target_policy.load_state_dict(target_net_state_dict)

In [None]:
EPISODES = 10*200
GAMMA = 0.99 # discount factor

ALPHA = 1e-4 # Learning rate
EPSILON = 1 # e
EPSILON_DECAY = 0.002
MIN_EXP_RATE = 0.2
MAX_EXP_RATE = 1

REPLAY_LENGTH = 9000
REPLAY_BATCH = 100
TAU = 0.003

In [None]:
criterion = nn.SmoothL1Loss().to(device)
opt = torch.optim.SGD(learning_policy.parameters(), lr=ALPHA)

In [None]:
def q_update(state,new_state,reward,running):

    state_action_value = learning_policy(state).max(0)[0]
    with torch.no_grad():
        expected_state_action_value = target_policy(new_state).max(0)[0]

    expected_state_action_value = ((expected_state_action_value * GAMMA ) + reward) * bool(not(running))

    loss = criterion(state_action_value,expected_state_action_value)
    
    opt.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(learning_policy.parameters(), 100)
    opt.step()

    return loss

In [None]:
class ReplayMemory:
    """
        Replay Memory for string Experience
    """
    def __init__(self,length,batch_size):
        self.replay_memory = deque(maxlen=length)
        self.batch_size = batch_size
    
    def add_experience(self,new_state,reward,running,state,action):
        """
            Adds Experience into replay_memory\n
            new_state and state both are torch tensors
        """
        self.replay_memory.append((new_state,reward,running,state,action))
    
    def train_on_replay(self):
        """
            Training on Replay memory
        """
        batch = random.sample(self.replay_memory,self.batch_size)
        
        for new_state,reward,running,state,action in batch:

            loss = q_update(state,new_state,reward,running)
        
        return loss # final loss of replay batch


In [None]:
replay_memory = ReplayMemory(REPLAY_LENGTH,batch_size=REPLAY_BATCH)

In [None]:
env = gym.make("CartPole-v1")
for episode in range(EPISODES):
    terminated = False
    truncated = False
    state,info = env.reset()
    reward_per_episode = 0
    state = torch.tensor(state,dtype=torch.float32,requires_grad=True,device=device)
    while not (terminated or truncated):
        exploration_rate_threshold = random.uniform(0,1)
        # E-greedy for exploration vs exploitation
        if exploration_rate_threshold > EPSILON:
            with torch.no_grad():
                action = learning_policy(state).max(0)[1].item()
        else:
            action = random.randint(0,1)
        
        new_state,reward,terminated,truncated,info = env.step(action)
        
        new_state = torch.tensor(new_state,dtype=torch.float32,requires_grad=True,device=device)
        running = terminated or truncated

        replay_memory.add_experience(new_state,reward,running,state,action)
        
        loss = q_update(state,new_state,reward,running)

        state = new_state
        reward_per_episode += reward
    
    print(f"-------Episode:[{episode+1}/{EPISODES}]--------")
    print("Reward per Episode: ",reward_per_episode)
    print("Loss per Episode",loss.item())
    
    writer.add_scalar("Exploration Rate",EPSILON,episode)
    EPSILON = (MAX_EXP_RATE-MIN_EXP_RATE) * np.exp(-EPSILON_DECAY*episode) + MIN_EXP_RATE
    writer.add_scalar("Reward per Episode",reward_per_episode,episode)
    writer.add_scalar("Loss per Episode",loss.item(),episode)
    
    if len(replay_memory.replay_memory) > REPLAY_BATCH:
        replay_loss = replay_memory.train_on_replay()
        writer.add_scalar("Replay Loss",replay_loss.item(),episode)
    
    if episode % 10 == 0:
        print("[Updating Policy]")
        update_target_policy(TAU)
        


env.close()    

In [None]:
pygame.display.quit() 