In [None]:
%pip install -r requirements.txt
%pip install pyreadline
%config Completer.use_jedi = False
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy
import random
from gym import wrappers
import torch.optim as optim
import math
from torch.autograd import Variable
import matplotlib.pyplot as plt
%matplotlib inline


In [None]:
Env_Name='CartPole-v1'

Batch_Size=64
Replay_Memory_Size=1000000
Target_Network_Update_Frequency=10
Discount_Factor=0.95
Learning_Rate=0.001
Initial_Exploration=1.0
Final_Exploration=0.005
Exploration_Decay=0.9999
Episodes=5000
Hidden_Layer_Size=256


device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device=",device)

env=gym.make(Env_Name)
#env=wrappers.Monitor(env,'./tmp/cartpole-v0-0',force=True)
#observation=env.reset()

Input_Shape=env.observation_space.shape[0]
Action_Shape=env.action_space.n
#print('Input Shape=',Input_Shape)
#print('Action Shape=',Action_Shape)


In [None]:

class ReplayMemory:
    def __init__(self,capacity):
        self.capacity=capacity
        self.memory=[]
    def push(self,transition):
        self.memory.append(transition)
        if len(self.memory)>self.capacity:
            del self.memory[0]
    def sample(self,batch_size):
        return random.sample(self.memory,batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:

class DQN(nn.Module):
    def __init__(self,input_shape=4,hidden_layer_shape=Hidden_Layer_Size,action_shape=2):
        super(DQN,self).__init__()
        self.l1=nn.Linear(input_shape,hidden_layer_shape)
        self.l2=nn.Linear(hidden_layer_shape,hidden_layer_shape//2)
        self.l3=nn.Linear(hidden_layer_shape//2,hidden_layer_shape//2//2)
        self.l4=nn.Linear(hidden_layer_shape//2//2,hidden_layer_shape//2//2//2)
        self.l5=nn.Linear(hidden_layer_shape//2//2//2,action_shape)
    def forward(self,x):
        x=F.relu(self.l1(x))
        x=F.relu(self.l2(x))
        x=F.relu(self.l3(x))
        x=F.relu(self.l4(x))
        x=self.l5(x)
        return x

# class DQN(nn.Module):
#     def __init__(self,input_shape=4,action_shape=2):
#         super(DQN,self).__init__()
#         self.l1=nn.Linear(input_shape,24)
#         self.l2=nn.Linear(24,24)
#         self.l3=nn.Linear(24,action_shape)
    
#     def forward(self,x):
#         x=F.relu(self.l1(x))
#         x=F.relu(self.l2(x))
#         x=self.l3(x)
#         return x

In [None]:

model=DQN(input_shape=Input_Shape,action_shape=Action_Shape).to(device)


memory=ReplayMemory(Replay_Memory_Size)
optimizer=optim.Adam(model.parameters(),Learning_Rate)
steps_done=0
episode_durations=[]


In [None]:
def select_action(state):
    global steps_done
    sample=random.random()
   # eps_threshold=Final_Exploration+(Initial_Exploration-Final_Exploration)*math.exp(-1.*steps_done/Exploration_Decay)

    eps_threshold=Initial_Exploration*Exploration_Decay 
    eps_threshold=max(eps_threshold,Final_Exploration)
    steps_done+=1
    if sample>eps_threshold:
        return  model(torch.tensor([state],dtype=torch.float32).to(device)).detach().max(1)[1].view(1,1)
    else:
        return torch.tensor([[random.randrange(2)]],dtype=torch.int64).to(device)

In [None]:

def run_episode(e,env):
    state=env.reset()
    steps=0
    while True:
        env.render()
        action=select_action(state)
        next_state,reward,done,_=env.step(action[0,0].item())

        if done:
            reward=-1

        memory.push((torch.tensor([state],dtype=torch.float32).to(device),
                    action.clone().detach(),
                    torch.tensor([next_state],dtype=torch.float32).to(device),
                    torch.tensor([reward],dtype=torch.float32).to(device)))
        learn()
        state=next_state
        steps+=1
        if done:
            print("{2} Episode {0} finished after {1} steps".format(e,steps,'\033[92m' if steps>=195
                                                                    else '\033[99m'))
            episode_durations.append(steps)
            plot_durations()
            break


In [None]:
def learn():
    if len(memory)<Batch_Size:
        return
    transitions=memory.sample(Batch_Size)
    batch_state,batch_action,batch_next_state,batch_reward=zip(*transitions)
    batch_state=(torch.cat(batch_state)).clone().detach()
    batch_action=(torch.cat(batch_action)).clone().detach()
    batch_next_state=(torch.cat(batch_next_state)).clone().detach()
    batch_reward=(torch.cat(batch_reward)).clone().detach()
    current_q_values=model(batch_state).gather(1,batch_action)
    current_q_values=current_q_values.squeeze(1)
    #print("current_q_values=",current_q_values)
    
    max_next_q_values=model(batch_next_state).detach().max(1)[0]
    expected_q_values=batch_reward+(Discount_Factor*max_next_q_values)
   # print("expected_q_values=",expected_q_values)
    loss=F.smooth_l1_loss(current_q_values,expected_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.FloatTensor(episode_durations)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated


In [None]:
for e in range(Episodes):
    run_episode(e, env)

print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()