In [None]:
import gym
#import atari_py
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T  

from collections import deque

In [None]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display

In [None]:
TEST_ENV_NAME = "CartPole-v1"
#NUM_EPISODES = 1000
#GAMMA = 0.99
#LEARNING_RATE = 0.001

#TARGET_UPDATE =25
MEMORY_SIZE = 1000000
#BATCH_SIZE = 64

EXPLORATION_START = 0.0
EXPLORATION_END = 0.0
EXPLORATION_DECAY = 0.001

In [None]:
class DQN(nn.Module):

    def __init__(self, observation_space_size, action_space_size):    
        super(DQN,self).__init__()
        self.fc1 = nn.Linear(in_features=observation_space_size, out_features=128)
        self.fc2 = nn.Linear(in_features=128, out_features=64)
        self.out = nn.Linear(in_features=64, out_features=action_space_size)
        
        
    def forward(self, t):
        t = t.flatten(start_dim=1)
        t = F.relu(self.fc1(t))
        t = F.relu(self.fc2(t))
        t = self.out(t)
        return t
    

In [None]:
def plot(moving_avg, values):
    plt.figure(2)
    plt.clf()        
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(values)

    plt.plot(moving_avg)    
    plt.pause(0.001)
    print("Episode:", len(values),"\t Reward",str(values[-1]),\
          "\n 100 episode moving avg:", moving_avg[-1])
    if is_ipython: display.clear_output(wait=True)

In [None]:
def get_moving_average(period, values):
    values = torch.tensor(values, dtype=torch.float)
    if len(values) >= period:
        moving_avg = values.unfold(dimension=0, size=period, step=1) \
            .mean(dim=1).flatten(start_dim=0)
        moving_avg = torch.cat((torch.zeros(period-1), moving_avg))
        return moving_avg.numpy()
    else:
        moving_avg = torch.zeros(len(values))
        return moving_avg.numpy()

In [None]:
class Agent_class():
    
    def __init__(self, policy_net, target_net, observation_space_size, action_space_size):
        self.policy_net = policy_net
        self.target_net = target_net
        self.observation_space_size = observation_space_size
        self.action_space_size = action_space_size
        self.current_step = 0
        self.memory = deque(maxlen=MEMORY_SIZE)
        
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        exploration_rate = EXPLORATION_END + (EXPLORATION_START - EXPLORATION_END)* math.exp(-1.0 * self.current_step * EXPLORATION_DECAY)
        self.current_step +=1
        if np.random.rand() < exploration_rate:
            return random.randrange(self.action_space_size)
        q_values = self.policy_net(state)        #self.model.predict(state)
        q_values = q_values.detach().numpy()
        return np.argmax(q_values)
        
    def experience_replay_n_train(self):
        state, action, reward, next_state, done = zip(*random.sample(self.memory, BATCH_SIZE))
        state = np.concatenate(state)
        next_state = np.concatenate(next_state)

        state      = torch.FloatTensor(np.float32(state))
        next_state = torch.FloatTensor(np.float32(next_state))
        action     = torch.LongTensor(action)
        reward     = torch.FloatTensor(reward)
        done       = torch.FloatTensor(done)

        q_values      = policy_net(state)
        next_q_values = target_net(next_state)

        q_value          = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
        next_q_value     = next_q_values.max(1)[0]
        expected_q_value = reward + GAMMA * next_q_value * (1 - done)

        optimizer = optim.Adam(params=self.policy_net.parameters(), lr=LEARNING_RATE)
        loss = F.mse_loss(q_value,expected_q_value.data)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [None]:
test_net = torch.load('./CartPole-v0_or_v1_direct_values.pth')

In [None]:
test_env = gym.make(TEST_ENV_NAME)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

test_observation_space_size = test_env.observation_space.shape[0]
test_action_space_size = test_env.action_space.n

test_agent = Agent_class(test_net, test_net, test_observation_space_size, test_action_space_size)

In [None]:
test_state = test_env.reset()
test_state = test_state.reshape(1,test_observation_space_size)
test_state = torch.from_numpy(test_state).float().to(device)
test_timestep = 0
while True:
    test_timestep += 1
    test_env.render()
    test_action = test_agent.act(test_state)

    test_next_state, test_reward, test_done, _ = test_env.step(test_action)
    test_reward = test_reward if not test_done else -test_reward
   
    test_next_state = test_next_state.reshape(1, test_observation_space_size)
    test_next_state = torch.from_numpy(test_next_state).float().to(device)
    #agent.remember(state, action, reward, next_state, done)
    test_state = test_next_state
    if test_done:
        print("the agent got a reward of "+str(test_timestep))
        break
        
test_env.close()        