In [None]:
import numpy as np
import pandas as pd
from copy import deepcopy

from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from statistics import mean

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
import gym

In [None]:
class NeuralNet(nn.Module):
    def __init__(self, input_dims, n_actions):
        super(NeuralNet, self).__init__()
        
        self.layer1 = nn.Linear(input_dims, 32)
        self.layer2 = nn.Linear(32, 16)
        self.layer3 = nn.Linear(16, n_actions)
        self.activation = nn.ReLU()
        
    def forward(self, x):
        l1 = self.layer1(x)
        l1_ = self.activation(l1)
        l2 = self.layer2(l1_)
        l2_ = self.activation(l2)
        l3 = self.layer3(l2_)
        
        return l3

In [None]:
class ReplayBuffer():
    def __init__(self, mem_size, batch_size, input_dims):
        self.mem_size = mem_size
        self.batch_size = batch_size
        
        self.mem_counter = 0
        
        self.state_mem = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.action_mem = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_mem = np.zeros(self.mem_size, dtype=np.float32)
        self.next_state_mem = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.terminal_mem = np.zeros(self.mem_size, dtype=np.int32)
        
    def store_record(self, state, action, reward, state_, done):
        index = self.mem_counter % self.mem_size
        
        self.state_mem[index] = state
        self.action_mem[index] = action
        self.reward_mem[index] = reward
        self.next_state_mem[index] = state_
        self.terminal_mem[index] = 1 - int(done)
        self.mem_counter = self.mem_counter + 1
    
    def is_sampleable(self):
        if(self.mem_counter >= self.batch_size):
            return True
        else:
            return False
    
    def sample_buffer(self):
        
        if not(self.is_sampleable()):
            return []
        
        max_mem = min(self.mem_size, self.mem_counter)
        
        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        
        states = self.state_mem[batch]
        actions = self.action_mem[batch]
        rewards = self.reward_mem[batch]
        next_states = self.next_state_mem[batch]
        terminals = self.terminal_mem[batch]
        
        return states, actions, rewards, next_states, terminals

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
class dqn_agent():
    
    def __init__(self, input_dims, n_actions, epsilon_decay=(1 - (1e-4)), gamma=0.99, lr=1e-4,\
                 mem_size=1024, batch_size=128):
        
        self.input_dims = input_dims[0]
        self.n_actions = n_actions
        
        self.epsilon_decay = epsilon_decay
        self.epsilon = 1
        
        self.gamma = gamma
        
        self.mem_size = mem_size
        self.batch_size = batch_size
        
        self.replay_mem = ReplayBuffer(mem_size=mem_size, batch_size=batch_size, input_dims=input_dims)
        self.policy_network = NeuralNet(input_dims=self.input_dims, n_actions=n_actions).to(device)

        self.loss_function = nn.MSELoss()
        self.optimizer = torch.optim.Adam(self.policy_network.parameters(), lr=lr)
        
    def choose_action(self, state):
        if(np.random.random() > self.epsilon):
            with torch.no_grad():
                state_T = torch.tensor(state, device=device).float()
                q_values = self.policy_network(state_T).cpu().detach().numpy()
            action = np.argmax(q_values)
        else:
            action = np.random.randint(self.n_actions)

        return action
            
    def store_mem(self, state, action, reward, state_, done):
        self.replay_mem.store_record(state, action, reward, state_, done)

    def train(self):
        if not(self.replay_mem.is_sampleable()):
            return np.nan
        
        states, actions, rewards, next_states, dones = self.replay_mem.sample_buffer()

        states_T = torch.tensor(states, device=device).float()
        actions_T = torch.tensor(actions, device=device).type(torch.int64).unsqueeze(1)
        rewards_T = torch.tensor(rewards, device=device).float()
        next_states_T = torch.tensor(next_states, device=device).float()
        dones_T = torch.tensor(dones, device=device).type(torch.int64)

        q_values = self.policy_network(states_T).gather(1, actions_T).squeeze(1)

        with torch.no_grad():
            q_values_next = self.policy_network(next_states_T).max(1)[0].detach()

        q_target_values = rewards_T + self.gamma * q_values_next * dones_T

        loss = self.loss_function(q_values, q_target_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon = self.epsilon * self.epsilon_decay

        return loss.item()

In [None]:
lr = 1e-4
gamma = 0.99

epsilon_decay = 1 - (3e-5)
episodes = 700

In [None]:
mem_size = 100000
batch_size = 128

In [None]:
env = gym.make('LunarLander-v2')

In [None]:
agent1 = dqn_agent(input_dims=env.observation_space.shape, n_actions=env.action_space.n, epsilon_decay=epsilon_decay,
gamma=gamma, lr=lr, mem_size=mem_size, batch_size=batch_size)

In [None]:
scores = []
eps = []
losses = []

In [None]:
pbar = tqdm(range(episodes))

for i in pbar:
    done = False
    score = 0
    state = env.reset()
    ep_loss = []

    while not(done):
        action = agent1.choose_action(state)

        new_state, reward, done, _ = env.step(action)
        env.render()

        score = score + reward

        agent1.store_mem(state, action, reward, new_state, done)

        state = deepcopy(new_state)

        loss = agent1.train()
        ep_loss.append(loss)
    
    scores.append(score)
    eps.append(agent1.epsilon)
    losses.append(mean(ep_loss))
    pbar.set_description("current score = %s" % score)

env.close()
