In [1]:
import gymnasium as gym
import pygame
from gymnasium.utils.play import play
# from gynasium.utils.play import pl
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torch.utils.data import DataLoader
import matplotlib
import matplotlib.pyplot as plt
import random
%matplotlib inline

matplotlib.rcParams['figure.facecolor'] = '#ffffff'


In [2]:
env = gym.make("CartPole-v1",render_mode="human")

In [3]:
class Q_Network(nn.Module):
    def __init__(self):
        super(Q_Network, self).__init__()
        self.network = nn.Sequential(nn.Linear(4,6),
                                     nn.ReLU(),
                                     nn.Linear(6,4),
                                     nn.ReLU(),
                                     nn.Linear(4,2))
    def forward(self,state):
            return self.network(state)
            
        

In [4]:
def mse(t1,t2):
    diff = t1-t2
    return torch.sum(diff*diff/diff.numel())

In [9]:
# @torch.no_grad()
def train(epochs,lr,model,rm_size,batch_size,max_steps,gamma,opt_func=torch.optim.SGD):

    max_epsilon = 1.0             # Exploration probability at start
    min_epsilon = 0.05
    decay_rate = 0.0005 
    epsilon = max_epsilon
    env = gym.make("CartPole-v1",render_mode="human")
    optimizer = opt_func(model.parameters(),lr)
    replay_memory = list()
    model_old = model
    # replay_memory = replay_memory
    for epoch in range(epochs):
        state , info = env.reset()
        state = torch.from_numpy(state)
        # print(device)
        # state = state.to(device)
        
        # print(state.device)
        total_reward = 0
        for step in range(max_steps):
            
            #Sampling Phase
            exp_exp_tradeoff = random.uniform(0, 1)
            if exp_exp_tradeoff > epsilon:
                action = torch.argmax(model.forward(state)).item()
            else:
                action = env.action_space.sample()

            new_state, reward, done,truncated, info= env.step(action)
            new_state = torch.from_numpy(new_state)
            memory = [state,action,reward,new_state]
            total_reward += reward
            
            if len(replay_memory) == rm_size:
                replay_memory.pop(0)
            replay_memory.append(memory)
            if done or truncated:
                print(epoch,":",total_reward)
                break
            
            #Training Phase
            batch = random.sample(replay_memory,batch_size) if len(replay_memory)>batch_size else []
            for element in batch:
                
                rand_state,rand_action,rand_reward,target_state = replay_memory[random.randint(0,len(replay_memory)-1)]
                # target_state = target_state.to(device)
                target_Q = rand_reward + gamma*torch.max(model_old.forward(target_state))
                
                # assert rand_output.requires_grad == True, "Model output must have requires_grad=True"
    
                trial_Q = model.forward(rand_state)[rand_action]
                
                # print("hi")
                # print(target_Q)
                # print(trial_Q)
                model_old = model   #saving the model's old parameters in an target model variable
                loss = F.mse_loss(target_Q,trial_Q)
                # print(loss)

                #updating the current model
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
                
            #State-Change
            state = new_state
            
        epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*epoch) 
        # epsilon = 0.01
        
    env.close()

In [6]:
model = Q_Network()
model

Q_Network(
  (network): Sequential(
    (0): Linear(in_features=4, out_features=6, bias=True)
    (1): ReLU()
    (2): Linear(in_features=6, out_features=4, bias=True)
    (3): ReLU()
    (4): Linear(in_features=4, out_features=2, bias=True)
  )
)

In [7]:
epochs=500
# epsilon=1.0
# max_epsilon = 1.0             # Exploration probability at start
# min_epsilon = 0.2 
max_steps=300
gamma=0.7
lr = 0.003
rm_size=60
batch_size=10

In [8]:

train(epochs,lr,model,rm_size,batch_size,max_steps,gamma)

0 : 21.0
1 : 9.0
2 : 20.0
3 : 12.0
4 : 20.0
5 : 40.0
6 : 14.0
7 : 26.0
8 : 38.0
9 : 29.0
10 : 25.0
11 : 20.0
12 : 14.0
13 : 20.0
14 : 20.0
15 : 34.0
16 : 25.0
17 : 27.0
18 : 35.0
19 : 18.0
20 : 12.0
21 : 14.0
22 : 24.0
23 : 51.0
24 : 15.0
25 : 74.0
26 : 40.0
27 : 10.0
28 : 18.0
29 : 10.0
30 : 15.0
31 : 19.0
32 : 24.0
33 : 18.0
34 : 38.0
35 : 33.0
36 : 46.0
37 : 48.0
38 : 30.0
39 : 25.0
40 : 13.0
41 : 20.0
42 : 36.0
43 : 32.0
44 : 24.0
45 : 40.0
46 : 10.0
47 : 22.0
48 : 9.0
49 : 49.0
50 : 12.0
51 : 14.0
52 : 25.0
53 : 11.0
54 : 20.0
55 : 19.0
56 : 15.0
57 : 22.0
58 : 18.0
59 : 15.0
60 : 13.0
61 : 16.0
62 : 24.0
63 : 24.0
64 : 10.0
65 : 17.0
66 : 9.0
67 : 31.0
68 : 10.0
69 : 10.0
70 : 15.0
71 : 29.0
72 : 25.0
73 : 14.0
74 : 23.0
75 : 43.0
76 : 13.0
77 : 15.0
78 : 29.0
79 : 15.0
80 : 11.0
81 : 22.0
82 : 17.0
83 : 11.0
84 : 19.0
85 : 27.0
86 : 14.0
87 : 26.0
88 : 14.0
89 : 25.0
90 : 12.0
91 : 29.0
92 : 15.0
93 : 10.0
94 : 17.0
95 : 36.0
96 : 10.0
97 : 19.0
98 : 21.0
99 : 26.0
100 : 22.0
10

In [12]:
def test(model,episodes):
    env = gym.make("CartPole-v1",render_mode="human")
    state,info = env.reset()
    state = torch.from_numpy(state)
    total_reward=0
    for ep in range(episodes):
       
        action = torch.argmax(model.forward(state)).item()
        # print(action)
        new_state, reward, done,truncated, info= env.step(action)
        # print(env.step(action))
        total_reward+=reward
        # print(f"{ep} Total reward:",total_reward)
        if done or truncated:
            print(total_reward)
            break
        state = torch.from_numpy(new_state)    
    # env.close()
    

In [15]:
for i in range(20):
    print(i)
    test(model,100)


0
9.0
1
9.0
2
8.0
3
9.0
4
12.0
5
9.0
6
9.0
7
10.0
8
10.0
9
10.0
10
9.0
11
9.0
12
10.0
13
9.0
14
8.0
15
10.0
16
10.0
17
8.0
18
10.0
19
8.0
