In [1]:
# imports
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam,SGD
from collections import deque
import random
from matplotlib import pyplot as plt
import copy
import numpy as np
import gym
from torchsummary import summary
import warnings
warnings.filterwarnings('ignore')

# set seed for torch library
torch.manual_seed(33)

<torch._C.Generator at 0x7fb0c6914110>

In [2]:
# create and preview environment
env = gym.make("MountainCar-v0")
env

<TimeLimit<MountainCarEnv<MountainCar-v0>>>

In [3]:
# view obs and action space
obs_space = env.observation_space
print(f"Observation space: {obs_space}")
act_space = env.action_space
print(f"Action space: {act_space}")

Observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
Action space: Discrete(3)


In [4]:
# test random env, action, and step
obs = env.reset()
act = env.action_space.sample()
ns,r,d,_ = env.step(act)
print(f"Initial observation: {obs}\nAction: {act}\nNew observation: {ns,r,d}")

Initial observation: [-0.45576322  0.        ]
Action: 1
New observation: (array([-0.4562685 , -0.00050526], dtype=float32), -1.0, False)


In [11]:
# create NN for model; 2 inputs and 3 outputs
class Net(nn.Module):
    
    # NN structure
    # 1 hidden layer with less neurons
    def __init__(self):
        super(Net,self).__init__()
        self.fc1 = nn.Linear(2,35)
        #self.fc2 = nn.Linear(128,64)
        #self.fc3 = nn.Linear(64,32)
        self.fc5 = nn.Linear(35,35)
        self.fc4 = nn.Linear(35,3)
        
    # pass forward function    
    def forward(self,x):
        x = F.relu(self.fc1(x))
        #x = F.relu(self.fc2(x))
        #x = F.relu(self.fc3(x))
        x = F.relu(self.fc5(x))
        x = self.fc4(x)
        return x

In [12]:
# create learning and target nets; put target in test mode
DQNet = Net()
TNet = copy.deepcopy(DQNet)
TNet.eval()

Net(
  (fc1): Linear(in_features=2, out_features=35, bias=True)
  (fc5): Linear(in_features=35, out_features=35, bias=True)
  (fc4): Linear(in_features=35, out_features=3, bias=True)
)

In [13]:
# check summary of nets
summary(TNet,(1,2))
summary(DQNet,(1,2))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                [-1, 1, 35]             105
            Linear-2                [-1, 1, 35]           1,260
            Linear-3                 [-1, 1, 3]             108
Total params: 1,473
Trainable params: 1,473
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.01
Estimated Total Size (MB): 0.01
----------------------------------------------------------------
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                [-1, 1, 35]             105
            Linear-2                [-1, 1, 35]           1,260
            Linear-3                 [-1, 1, 3]             108
Total params: 1,473
Trainable params: 1,473
N

In [14]:
# test networks with a random input
test_t = torch.Tensor([1,2]).float()
print(f"test tensor: {test_t}\nDQNet output: {DQNet(test_t)}\nTNet output: {TNet(test_t)}")

test tensor: tensor([1., 2.])
DQNet output: tensor([-0.1599,  0.0847,  0.0474], grad_fn=<AddBackward0>)
TNet output: tensor([-0.1599,  0.0847,  0.0474], grad_fn=<AddBackward0>)


In [15]:
# network parameters
lr = 0.001
optimizer = Adam(params=DQNet.parameters(),lr=lr)
eps_start = 1
eps_decay = 0.9999
eps_end = 0.0001
loss_fn = nn.MSELoss()
episodes = 2200
# try smaller mini batch size; orininal 128
mini_batch_size = 72

loss_value = 0
loss_history = []
# maybe change update freq to smaller value ~15
update_freq = 20
gamma = 0.95
scores = []
rewards = 0

In [16]:
# training function
def training(Qnet,t_net,replay_memory,optimizer,loss_fn,mini_batch_size=32):
    # sample random observations from buffer to avoid association
    observations = random.choices(replay_memory,k=mini_batch_size)
    
    Qnet.train()
    
    for epochs in range(1):
        for observation in observations:
            state = torch.from_numpy(np.array(observation[0])).float() # read state from replay buffer spot [0]  
            q_values = Qnet(state) # get q values by feeding state into DQN
            expected_value = q_values[int(observation[1])] # see reward from action from replay buffer
            done = observation[4] # check to see if terminal state reached
            
            next_state = torch.from_numpy(np.array([observation[3]])).float() # next state from replay buffer
            next_q_values = t_net(next_state) # get q values for next state from target net
            next_action = torch.argmax(next_q_values) # choose next action from max q from target net
            # print(f"next_q_values: {next_q_values}\nnext_action: {next_action}")
            # print(f"next q val shape: {next_q_values.shape}")
            # next_q_value = next_q_values[next_action]
            next_q_value = next_q_values[0][next_action] # next q value corresponding to results from target net
            # print(f"next_q_value: {next_q_value}")
            
            # bellmans eqn for current state
            if done:
                target_value = torch.tensor(observation[2]) # if episode done target value is reward
            else:
                target_value = observation[2]+(gamma*next_q_value) # add direct reward to current reward
            
            # compute loss
            loss = loss_fn(expected_value, target_value)
            loss_value = loss.item()
            
            # back prop
            optimizer.zero_grad() # zero the gradients
            loss.backward() # compute loss
            optimizer.step() # update net
            
    return Qnet, loss_value

In [17]:
# explore or exploit
eps = eps_start # intial epsilon value
replay = [] # replay buffer
count_good = 0 # count how many successful trials for training

for i in range(episodes):
    s = env.reset() # reset env for new random state
    s = torch.from_numpy(s).float()
    
    done = False # bool for terminal state
    rewards = 0
    
    while not done: # while not in terminal state
        DQNet.eval() # test mode
        
        q_values = DQNet(s) # get q values from DQNet
        
        if np.random.random() < eps:
            a = env.action_space.sample()
            new_state,reward,done,_ = env.step(a)
        else:
            a = torch.argmax(q_values)
            new_state,reward,done,_ = env.step(a.item())
            
        new_experience = s.tolist(),a,reward,new_state.tolist(),done
        replay.append(new_experience)
        
        # limit replay buffer length
        if len(replay) > 100000:
            replay.pop(0)
        
        # accumulate rewards
        rewards += reward
        
        s = torch.from_numpy(new_state)
        
        if i % update_freq == 0 and done == True:
            TNet = copy.deepcopy(DQNet)
            TNet.eval()
            print('TNET UPDATED')
            
        if len(replay) > 5000:
            DQNet, loss = training(DQNet,TNet,replay,optimizer,loss_fn,mini_batch_size)
            loss_history.append(loss)
            eps = max(eps*eps_decay,eps_end)
            
    position_final = new_state[0]
    
    scores.append(rewards)
    if rewards > -75:
        torch.save(DQNet,'supermountaincarDQN.pth')
        torch.save(DQNet.state_dict(),'supermountaincarDQN.pth')
        print(i)
    if rewards > -100:
        torch.save(DQNet,'sub100mountaincarDQN.pth')
        torch.save(DQNet.state_dict(),'sub100mountaincarDQN_parameters.pth')
        print(i)
    if rewards > -200 and rewards <= -150:
        torch.save(DQNet,'sub200mountaincarDQN.pth')
        torch.save(DQNet.state_dict(),'sub200mountaincarDQN_parameters.pth')
        print(i)
    if rewards > -150 and rewards <= -100:
        torch.save(DQNet,'sub150mountaincarDQN.pth')
        torch.save(DQNet.state_dict(),'sub150mountaincarDQN_parameters.pth')
        print(i)
        
    print(f"Episode{i}/{episodes} Rewards: {rewards} Position: {position_final} Buffer Length: {len(replay)} and eps: {eps}")

TNET UPDATED
Episode0/2200 Rewards: -200.0 Position: -0.574011504650116 Buffer Length: 200 and eps: 1
Episode1/2200 Rewards: -200.0 Position: -0.5553432703018188 Buffer Length: 400 and eps: 1
Episode2/2200 Rewards: -200.0 Position: -0.45360806584358215 Buffer Length: 600 and eps: 1
Episode3/2200 Rewards: -200.0 Position: -0.4906008541584015 Buffer Length: 800 and eps: 1
Episode4/2200 Rewards: -200.0 Position: -0.6190565824508667 Buffer Length: 1000 and eps: 1
Episode5/2200 Rewards: -200.0 Position: -0.5925874710083008 Buffer Length: 1200 and eps: 1
Episode6/2200 Rewards: -200.0 Position: -0.6824590563774109 Buffer Length: 1400 and eps: 1
Episode7/2200 Rewards: -200.0 Position: -0.6409192085266113 Buffer Length: 1600 and eps: 1
Episode8/2200 Rewards: -200.0 Position: -0.5596449971199036 Buffer Length: 1800 and eps: 1
Episode9/2200 Rewards: -200.0 Position: -0.7457220554351807 Buffer Length: 2000 and eps: 1
Episode10/2200 Rewards: -200.0 Position: -0.5493726134300232 Buffer Length: 2200 

KeyboardInterrupt: 

In [11]:
# # Test the model
# env = gym.make('MountainCar-v0')
# model_test = torch.load('sub150mountaincarDQN.pth')
# for e in range(10):
#     sta = env.reset()
#     sta = torch.from_numpy(sta)
#     done = False
#     i = 0
#     while not done:
#         env.render()
#         action = torch.argmax(model_test(sta))
#         new_sta, rew, done, _ = env.step(action.item())
#         sta = torch.from_numpy(new_sta)
#         i += 1
#         if done:
#             print(e, i)
#             #break


0 200
1 200
2 200
3 200
4 200
5 200
6 200
7 200
8 200
9 200
