## Lunar Lander using DDQN

In [2]:
import numpy as np
from tensorflow.keras.layers import *
from tensorflow.keras.models import load_model, Sequential
from tensorflow.keras.optimizers import Adam
from collections import deque
import random
import matplotlib.pyplot as plt
plt.style.use('seaborn')
import gym

## Memory class for storing and retrieving previous transition details

In [None]:
class Storage(object):
    def __init__(self, max_size, n_states, n_actions, discrete=False):
        self.mem_size = max_size
        self.mem_cntr = 0
        self.discrete = discrete
        self.state_memory = np.zeros((self.mem_size, n_states))
        self.new_state_memory = np.zeros((self.mem_size, n_states))
        
        dtype = np.int8 if self.discrete else np.float32
        
        self.action_memory = np.zeros((self.mem_size, n_actions), dtype=dtype)
        self.reward_memory = np.zeros(self.mem_size)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.float32)

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_

        if self.discrete:
            actions = np.zeros(self.action_memory.shape[1])
            actions[action] = 1.0
            self.action_memory[index] = actions
        else:
            self.action_memory[index] = action

        self.reward_memory[index] = reward
        self.terminal_memory[index] = done
        self.mem_cntr += 1

    def sample(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.new_state_memory[batch]
        terminal = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminal

## Double Deep Learning Agent Class

In [None]:

class DDQNAgent(object):
    def __init__(self, n_actions, n_states, gamma, batch_size, epsilon = 1.0,
                 epsilon_decay = 0.996, epsilon_min = 0.01, mem_size = 1000000,
                 lr = 0.01, replace_target = 100, discrete = True):
        self.action_space = [i for i in range(n_actions)]
        self.state_space = [i for i in range(n_states)]
        self.n_actions = n_actions
        self.n_states = n_states
        self.gamma = gamma
        self.batch_size = batch_size
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.memory_size = mem_size
        self.replace_target = replace_target
        self.discrete = discrete
        self.memory = Storage(self.memory_size, self.n_states, self.n_actions, self.discrete)
        self.lr = lr
        self.q_eval = self.build_model()
        self.q_target = self.build_model()
        self._counter = 0
        
    def remember(self, state, action, reward, new_state, done):
        self.memory.store_transition(state, action, reward, new_state, done)
    
    def build_model(self):
        model = Sequential()
        model.add(Dense(256, input_shape = (self.n_states,), activation = 'relu'))
        model.add(Dense(256, activation = 'relu'))
        model.add(Dense(self.n_actions))
        model.compile(optimizer = Adam(learning_rate = self.lr), loss = 'mse')
        return model
    
    def choose_action(self, state):
        rand = np.random.random()
        state = np.reshape(state, (1,self.n_states))
        if rand < self.epsilon:
            action = random.choice(self.action_space)
        else:
            action = np.argmax(self.q_eval.predict(state))
        
        return action
    
    def learn(self):
        if self.memory.mem_cntr <= self.batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        
        action_values = np.array(self.action_space, dtype=np.int8)
        actions = np.dot(actions, action_values)
        
        dones = 1-dones

        next_action_values_by_q_target = self.q_target.predict(next_states)
        next_action_values_by_q_eval = self.q_eval.predict(next_states)
        pres_action_values_by_q_eval = self.q_eval.predict(states)
        
        max_actions = np.argmax(next_action_values_by_q_eval, axis = 1)
        
        q_target = pres_action_values_by_q_eval
        
        batch_indices = np.arange(self.batch_size, dtype = int)

        q_target[batch_indices, actions.astype(int)] = rewards + self.gamma*next_action_values_by_q_target[batch_indices, max_actions.astype(int)]*dones
        
        _ = self.q_eval.fit(states, q_target, verbose = 0)
        
    
        if self.epsilon > self.epsilon_min:
            self.epsilon = self.epsilon*self.epsilon_decay
            
        if self._counter % self.replace_target == 0:
            self.update_q_target()
                
    def update_q_target(self):
        self.q_target.set_weights(self.q_eval.get_weights())
            
    def save_model(self, fname):
        self.q_eval.save(fname)

    def load_model(self, fname):
        self.q_eval = load_model(fname)
        if self.epsilon == 0.01:
            self.update_network_parameters()

## Making environment and defining state and action size

In [5]:
env = gym.make('LunarLander-v2')

In [6]:
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

### Random Play

In [None]:
env.reset()
for eno in range(10):
    env.reset()
    done = False
    score = 0
    while not done:
        env.render()
        action = env.action_space.sample()
        observation,reward,done,other_info = env.step(action)
        score += reward
        if done:
            print("Episode %d/%d: score : %d"%(eno+1, 20, score))
            break
#     print("Episode %d/%d: score : %d/%d"%(eno+1, 20, i, 1000))
print('finished')
env.close()

## Training Model

In [None]:
batch_size = 64
agent = DDQNAgent(n_actions = action_size, n_states = state_size, gamma = 0.99, batch_size = 64, epsilon = 1.0,
                 epsilon_decay = 0.996, epsilon_min = 0.01, mem_size = 1000000,
                 lr = 0.0005, replace_target = 100)
n_episodes = 4000
scores = []
avg_scores = []

try: 
    for e in range(n_episodes):
        
        state = env.reset()
        score = 0
        done = False
        t = 0
        while not done:
            env.render()
            
            action = agent.choose_action(state)
            
            next_state, reward, done, other_info = env.step(action)
            
            agent.remember(state, action, reward, next_state, done)
            
            state = next_state
            
            score += reward
            
            agent.learn()
            t+=1
        
        scores.append(score)
        avg_score = np.mean(scores[max(0, len(scores)-100) : len(scores)])
        avg_scores.append(avg_score)
        
        print("Episode: {}/{}, score : {:.6}, avg_score : {:.6} Exploration: {:.2}, Time States: {}".format(e, n_episodes, score, avg_score, agent.epsilon, t))
        
        if e%50 == 0:
            plt.plot(scores[-100:])
            plt.show()
            plt.plot(avg_scores)
            plt.show()
            plt.figure(figsize=(30,20))
            plt.plot(avg_scores)
            plt.title('Avg_Score')
            plt.savefig(f'./LunarLander_graphs/graph_{e}.png', quality = 95)
            plt.close()
            agent.save_model(f"./LunarLander_weights/q_eval_{e}.h5")
            

    print('Model Trained')
    env.close()
except OSError as err:
    print("OS error: {0}".format(err))
    env.close()
    print("Closing Environment")

## Replay model with trained weights

In [9]:
model = load_model('./LunarLander_weights/q_eval_3650.h5')

In [8]:
it
env.reset()
for eno in range(10):
    state = env.reset()
    done = False
    score = 0
    while not done:
        env.render()
        state = np.reshape(state, (1,8))
        action = np.argmax(model.predict(state))
        next_state, reward, done, other_info = env.step(action)
        state = next_state
        score += reward
        if done:
            print("Episode %d/%d: score : %d"%(eno+1, 20, score))
            break
#     print("Episode %d/%d: score : %d/%d"%(eno+1, 20, i, 1000))
print('finished')
env.close()

Episode 1/20: score : 263
Episode 2/20: score : 240


KeyboardInterrupt: 