In [42]:
import numpy as np
import gym
import torch
import torch.nn as nn

In [43]:
class Agent:
    def __init__(self, action_space, state_space):
        #hyperparameters, consider adding to constructor
        self.batch_size = 256
        self.learning_rate = 0.01
        self.RB_size = 100000
        self.gamma = 0.99
        self.epsilon = 1
        self.epsilon_min = 0.01
        self.epsilon_decrement = 1e-4

        self.action_space = np.arange(action_space)

        #Deep Q Network
        self.dqn = DQN(self.learning_rate, action_space, state_space, 256)
        #Replay Buffer
        self.rb = ReplayBuffer(self.RB_size)

    def choose_action(self, state):
        if np.random.random() <= self.epsilon:
            return np.random.choice(self.action_space)
        else:
            state_ = torch.tensor([state]).to(self.dqn)
            actions = self.dqn.forward(state_)
            return torch.argmax(actions).item()

    def step(self):
        if len(self.rb) < self.batch_size:
            return

        self.dqn.optim.zero_grad()

        batch = self.rb.get_nbest(self.batch_size)
        batch_e = []
        for i in range(len(batch)):
            batch_e.append(batch[i]["states"])
        state_batch = torch.tensor(batch_e).to(self.dqn.device)
        new_state_batch = torch.tensor(batch["new_states"]).to(self.dqn.device)
        reward_batch = torch.tensor(batch["rewards"]).to(self.dqn.device)
        action_batch = torch.tensor(batch["actions"]).to(self.dqn.device)

        sl = np.arange(len(batch))
        q_eval = self.dqn.forward(state_batch)[sl, action_batch]
        q_next = self.dqn.forward(new_state_batch)

        q_target = reward_batch + self.gamma * torch.max(q_next, dim=1)[0]

        loss = self.dqn.loss(q_target, q_eval).to(self.dqn.device)
        loss.backward()
        self.dqn.optim.step()

        self.epsilon = self.epsilon - self.epsilon_decrement if self.epsilon > self.epsilon_min else self.epsilon_min



In [44]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.max_size = max_size
        self.buffer = []

    def add_sample(self, current_state, new_state, action, rewards):
        episode = {"states": current_state, "new_states": new_state, "actions":action, "rewards": rewards, "summed_rewards":sum(rewards)}
        self.buffer.append(episode)

    def sort(self):
        #sort buffer
        self.buffer = sorted(self.buffer, key = lambda i: i["summed_rewards"],reverse=True)
        # keep the max buffer size
        self.buffer = self.buffer[:self.max_size]

    def get_random_samples(self, batch_size):
        self.sort()
        idxs = np.random.randint(0, len(self.buffer), batch_size)
        batch = [self.buffer[idx] for idx in idxs]
        return batch

    def get_nbest(self, n):
        self.sort()
        return self.buffer[:n]

    def __len__(self):
        return len(self.buffer)

In [45]:
class DQN(nn.Module):
    def __init__(self, learning_rate, action_space, state_space, hidden_size):
        super(DQN, self).__init__()
        self.actions = np.arange(action_space)
        self.fc1 = nn.Linear(state_space, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_space)
        self.loss = nn.MSELoss()

        self.optim = torch.optim.Adam(self.parameters(), lr=learning_rate)
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, state):
        out = torch.relu(self.fc1(state))
        out = torch.relu(self.fc2(out))
        out = self.fc3(out)
        return out

In [47]:
env = gym.make("CartPole-v1")
action_space = env.action_space.n
state_space = env.observation_space.shape[0]

final_rewards = []

agent = Agent(action_space, state_space)
for i in range(20000):
    states, new_states, actions, rewards = [], [], [], []
    truncated = False
    terminated = False
    observation = env.reset()
    while not (truncated or terminated):
        action = agent.choose_action(observation)
        next_observation, reward, terminated, truncated, info = env.step(action)
        states.append(observation)
        new_states.append(next_observation)
        actions.append(action)
        rewards.append(reward)
        agent.step()
        observation = next_observation
    agent.rb.add_sample(states, new_states, actions, rewards)
    final_rewards.append(sum(rewards))

avg_per_100 = np.zeros(200)
for i in range(20000):
    avg_per_100[i//100] += final_rewards[i]
avg_per_100 = avg_per_100 / 100
print(avg_per_100)