In [None]:
# -*- coding: utf-8 -*-

import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
# import random

class DQNLunarLander():
    def __init__(self, training_episodes):
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        self.training_episodes = training_episodes
        self.env = gym.make('LunarLander-v2')
        self.epsilon = 0.15
        self.replay_memory = [] # (state, action, reward, next_state)
        self.batch_size = 5
        self.epochs = 1
        self.gamma = 0.99
        self.seed = 0
        self.Q1 = NeuralNetwork(seed=0, action_size=4, state_size=8).to(device)
        self.Q2 = NeuralNetwork(seed=0, action_size=4, state_size=8).to(device)
        self.learning_rate = 1e-5
        self.C = 100
        
        # Replay memory
        # Action value network with arbitrary weights (Q1)
        # Target action value network with same weights (Q2)
    
    def execute_episode(self, initial_state):
        # Initialise state
        
        # For t=1 to T
        # pick random action with p=0.15
        
        state = initial_state
        state_tensor = torch.tensor(state)
        
        rewards = 0
        done = False
        
        while not done:
        
            random_choice = np.random.uniform(0, 1)
            if (random_choice < self.epsilon):
                action = self.env.action_space.sample()    
            
            else:
                action = self.Q1(torch.from_numpy(state)).argmax().cpu().detach().numpy()
            
            # pick best action from Q1
            # Execute action At and observe reward Rt and next state St+1
            
            # Execute action
            next_state, reward, done, info = self.env.step(action)
            self.replay_memory.append((state, action, reward, next_state, done))
            rewards += reward

            sample_idxs = np.random.choice(range(len(self.replay_memory)), size=self.batch_size)
            samples = []
            
            for idx, s in enumerate(self.replay_memory):
                if idx in sample_idxs:
                    samples.append(s)

            predictions = torch.tensor([])
            current_state_values = torch.tensor([])
            
            # Minibatch gradient descent
            for sample in samples:
                (Sj, Aj, Rj, Sj_1, done_j) = sample

                current_state_action_val = self.Q1(torch.from_numpy(Sj)).max().detach().numpy() # Returns the probability of predicting Aj from Sj
                target_state_action_val = self.Q2(torch.from_numpy(Sj_1)).max().detach().numpy() # Returns best action value for Sj+1

                # Set y_j = Rj + 0 if next state is terminal
                y_j = Rj

                # Rj + value otherwise
                if (not done_j):
                    y_j += self.gamma * target_state_action_val

                # Perform gradient descent between y_j and current_state_action_val
                
                np.append(predictions, y_j)
                np.append(current_state_values, current_state_action_val)
                
            predictions_tensor = torch.tensor(predictions.detach().numpy(), requires_grad=True)
            current_state_vals_tensor = torch.tensor(current_state_values.detach().numpy(), requires_grad=True)
                
            criterion = torch.nn.MSELoss()
            loss = criterion(predictions_tensor, current_state_vals_tensor)

            optimizer = torch.optim.SGD(self.Q1.parameters(), lr=self.learning_rate)
                
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            
            # Store transition (St, At, Rt, St+1) in D
            # Sample random minibatch transitions from D
            # Perform gradient descent step
            # Every C steps, update target weights
            
            state = next_state
        
            # Update model weights
            
            """
            if t % self.C:
                Q1_params = self.Q1.parameters()
                with torch.no_grad():
                    for idx, p in enumerate(self.Q2.parameters()):
                        new_param = Q1_params[idx]
                        p.copy_(new_param)
            """
        return rewards

    
    def train(self, verbose = 0):
        rewards = []
        for ep in range(self.training_episodes):
            if verbose:
                print("Episode ", ep)
            initial_state = self.env.reset()
            reward = self.execute_episode(initial_state)
            rewards.append(reward)
        return rewards

class NeuralNetwork(nn.Module):
    def __init__(self, seed, action_size, state_size):
        super(NeuralNetwork, self).__init__()
        # Architecture 
        fc1_units = 128
        fc2_units = 128
        fc3_units = 128
        
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, fc1_units)
        self.dropout1 = nn.Dropout(p=0.6)
        self.fc2 = nn.Linear(fc1_units, fc2_units)
        self.dropout2 = nn.Dropout(p=0.6)
        self.fc3 = nn.Linear(fc2_units, fc3_units)
        self.dropout3 = nn.Dropout(p=0.6)
        self.fc4 = nn.Linear(fc3_units, action_size)
        
        
    def forward(self, state):
        x = self.fc1(state)
        x = self.dropout1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = self.dropout2(x)
        x = F.relu(x)
        x = self.fc3(x)
        x = self.dropout3(x)
        x = F.relu(x)
        return self.fc4(x)

print("Executing")
episodes = 10
agent = DQNLunarLander(episodes)
rewards = agent.train(verbose=1)

plt.plot(episodes, rewards)
plt.show()

Executing
Episode  0
Episode  1
Episode  2
Episode  3
Episode  4
Episode  5
Episode  6
Episode  7
Episode  8
Episode  9


In [2]:
print(rewards)

NameError: name 'rewards' is not defined