In [1]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym

In [2]:
class DeepQLearning(nn.Module):
    def __init__(self, lr, input_dims, fc1_dims, fc2_dims, n_actions):
        super(DeepQLearning, self).__init__()
        self.lr = lr
        self.input_dims = input_dims
        self.fc1_dims = fc1_dims
        self.fc2_dims = fc2_dims
        self.n_actions = n_actions
        self.fc1 = nn.Linear(*self.input_dims, self.fc1_dims)
        self.fc2 = nn.Linear(self.fc1_dims, self.fc2_dims)
        self.fc3 = nn.Linear(self.fc2_dims, n_actions)
        self.optimizer = optim.SGD(self.parameters(), lr = self.lr)
        self.loss = nn.MSELoss()
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.to(self.device)
        
    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        actions = self.fc3(x)
        
        return actions

In [6]:
class Agent():
    def __init__(self, gamma, epsilon, lr, input_dims, batch_size, n_actions, 
                 max_memory_size = 100000, eps_min = 0.01, eps_dec = 5e-4):
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.batch_size = batch_size
        self.action_space = [i for i in range(n_actions)]
        self.memory_size = max_memory_size
        self.batch_size = batch_size
        self.eps_min = eps_min
        self.eps_dec = eps_dec
        
        self.Q_learn = DeepQLearning(self.lr, input_dims = input_dims, n_actions = n_actions,
                                     fc1_dims = 256, fc2_dims = 256)
        self.mem_counter = 0
        
        self.state_memory = np.zeros((self.memory_size,*input_dims), dtype=np.float32)
        self.new_state_memory = np.copy(self.state_memory)
        
        self.action_memory = np.zeros(self.memory_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.memory_size, dtype=np.float32)
        self.condition_memory = np.zeros(self.memory_size, dtype=bool)
        
    def _store(self, state, action, reward, state_new, done):
        idx = self.mem_counter%self.memory_size
        self.state_memory[idx] = state
        self.action_memory[idx] = action
        self.reward_memory[idx] = reward
        self.new_state_memory[idx] = state_new
        self.condition_memory[idx] = done
        
        self.mem_counter += 1
        
    def predict(self, obs):
        if np.random.random() > self.epsilon:
            state = torch.tensor([obs]).to(self.Q_learn.device)
            act = self.Q_learn.forward(state)
            action = torch.argmax(act).item()
        else:   
            action = np.random.choice(self.action_space)
            
        return action
    
    def learn(self):
        if self.mem_counter < self.batch_size:
            return
        
        self.Q_learn.optimizer.zero_grad()
        
        max_memory = min(self.mem_counter, self.memory_size)
        batch = np.random.choice(max_memory, self.batch_size, replace=False)
        batch_idx = np.arange(self.batch_size, dtype=np.int32)
        
        state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_learn.device)
        new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_learn.device)
        reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_learn.device)
        condition_batch = torch.tensor(self.condition_memory[batch]).to(self.Q_learn.device)
        
        action_batch = self.action_memory[batch]
        
        q_eval = self.Q_learn.forward(state_batch)[batch_idx, action_batch]
        q_next = self.Q_learn.forward(new_state_batch)
        q_next[condition_batch] = 0.0
        
        q_target = reward_batch + self.gamma*torch.max(q_next, dim=1)[0]
        
        loss = self.Q_learn.loss(q_target, q_eval).to(self.Q_learn.device)
        loss.backward()
        self.Q_learn.optimizer.step()
        
        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min \
                       else self.eps_min

In [7]:
train_env = gym.make('LunarLander-v2')
agent = Agent(gamma = 0.99, 
              epsilon = 1.0, 
              batch_size = 64, 
              n_actions = 4, 
              eps_min = 0.01,
              input_dims = [8],
              lr = 0.003)
n_episode = 500

In [9]:
scores, epsilon_history = [], []
for i in range(n_episode):
    score = 0
    done = False
    obs = train_env.reset()[0]
    while not done:
        action = agent.predict(obs)
        obs_new, reward, done, _, info = train_env.step(action)
        score += reward
        agent._store(obs, action, reward, obs_new, done)
        agent.learn()
        obs = obs_new
        
    scores.append(score)
    epsilon_history.append(agent.epsilon)
    
    avg_score = np.mean(scores)
    print(f"Episode: {i}, score: {score}, average score: {avg_score}\
           epsilon: {agent.epsilon}")

Episode: 0, score: -262.6716093868959, average score: -262.6716093868959           epsilon: 0.01
Episode: 1, score: -78.63229203011721, average score: -170.65195070850655           epsilon: 0.01
Episode: 2, score: -101.89039214701998, average score: -147.73143118801102           epsilon: 0.01
Episode: 3, score: 15.331178097233305, average score: -106.96577886669994           epsilon: 0.01
Episode: 4, score: -54.71834919683396, average score: -96.51629293272674           epsilon: 0.01
Episode: 5, score: 2.0993690658067976, average score: -80.08034926630448           epsilon: 0.01
Episode: 6, score: 94.0421041253517, average score: -55.205713067496454           epsilon: 0.01
Episode: 7, score: -111.30596991720031, average score: -62.21824517370944           epsilon: 0.01
Episode: 8, score: -70.89594128625677, average score: -63.18243363065915           epsilon: 0.01
Episode: 9, score: -447.0592649324501, average score: -101.57011676083825           epsilon: 0.01
Episode: 10, score: 158.7

KeyboardInterrupt: 

In [13]:
test_env = gym.make('LunarLander-v2', render_mode='human')

for i in range(5):
    obs = test_env.reset()[0]
    done = False
    score = 0
    
    while not done:
        test_env.render()
        action = agent.predict(obs)
        obs, reward, done, _, info = train_env.step(action)
        score += reward
        
    print(f"Episode: {i+1}, score: {score}")
    
test_env.close()

Episode: 1, score: 99.10264916927697
Episode: 2, score: 98.72437845937839
Episode: 3, score: 99.22752320890349
Episode: 4, score: 99.16180215615377
Episode: 5, score: 99.09039753788743
