In [1]:
# Exercise 7
# Tensorflow Implementation

In [2]:
import random
from copy import deepcopy
import time
import numpy as np
import gymnasium as gym

# what ever framework you fancy
from collections import namedtuple, deque 
import tensorflow as tf
from  tensorflow.keras.layers import Dense
from  tensorflow.keras import Sequential,Input
import random


In [3]:
# set up environment
env_kwargs = {
    "id": "LunarLander-v2",
    "continuous": False,
    "gravity" : -8.0,
    "enable_wind": False,
}
env = gym.make(**env_kwargs)


In [4]:
# get some information about the env dimensions
print(f'observation space: {env.observation_space}, \nhigh: {env.observation_space.high}, \nlow: {env.observation_space.low}')
print(f'action space: {env.action_space}')

observation space: Box([-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ], [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], (8,), float32), 
high: [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], 
low: [-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ]
action space: Discrete(4)


In [57]:
# Implement replay buffer
class ReplayBuffer:
    """Memory buffer for experience replay of transitions in episodes

    Args:
        capacity (int): max capacity of the buffer
    """
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = deque([], maxlen=self.capacity) 


    def push(self, transition):
        """Save transition to buffer."""
        return self.buffer.append(transition)
        
    def sample(self, batch_size):
        """Randomly sample a batch of transitions of specified size from the buffer"""        
        return random.choices(self.buffer, k=batch_size)

    def __len__(self):
        """Gives the length of the current buffer"""
        return len(self.buffer)

In [58]:
# implement deep Q-network

class DQN:
    """Deep FF Q-Network for Gym classical envs
    consisting of 3 linear layers with ReLU activation

    Args:
        input_dim (int): dimension of input, shape of observation space
        output_dim (int): dimension of output, number of possible actions
        hidden_dim (int): number of units in hidden layer

    """

    def __init__(self, input_dim, output_dim, hidden_dim=128, loss="mse", optimizer=tf.keras.optimizers.Adam(learning_rate=0.001)):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.loss = loss
        self.optimizer = optimizer
        
    def create_model(self):
        self.model = Sequential([
            Input(shape=(self.input_dim)),
            Dense(self.hidden_dim, activation="relu"),
            Dense(self.output_dim )]
        )
        self.model.compile(loss=self.loss, optimizer=self.optimizer)
        return self.model





In [97]:

class Trainer:
    """Trainer class for training of DQN on classical Gym environments

    Args:
        env (gym.Env):  environment to train
        model (nn.Module):  deep Q-Network model
        batch_size (int):   size of mini batch
        gamma (float):  discount factor gamma for MDP
        use_target (bool):  use target network flag (double DQN)
        target_update_iters (int):  update target NN every target_update_iters iterations
        capacity (int): capacity of replay buffer
        epsilon_start (float):  starting value of epsilon
        epsilon_end (float):    final value of epsilon
        epsilon_decay_iters (int):  number of iters to decay from start to final value
        lr (float): learning rate for optimizer
        clip_grad (int or float):   value for gradient clipping, no clipping if 0
        report_iters (int): report mean results every report_iters iterations
        seed (int): seed for RNG
    """
    def __init__(self,
                 env_kwargs,
                 model, 
                 batch_size=64, 
                 gamma=0.99,
                 use_target=True, 
                 target_update_iters=20, 
                 capacity=500,
                 epsilon_start=1.0, 
                 epsilon_end=0.01, 
                 epsilon_decay_iters=4000,
                 lr=0.005, 
                 clip_grad=1.0, 
                 report_iters=50,
                 seed: int = 1):
        # env params
        self.env_kwargs = env_kwargs
        self.env = gym.make(**self.env_kwargs)
        self.n_actions = self.env.action_space.n

        # model params
        self.model = model
        self.use_target = use_target
        if self.use_target:
            self.target = deepcopy(self.model)
            self.target_update_iters = target_update_iters
            
        self.replay = ReplayBuffer(capacity=capacity)

        # meta params
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay_iters
        self.clip_grad = clip_grad
        self.report_iters = report_iters
        self.seed = seed
        self._rnds = np.random.RandomState(seed)
        self._step = 0

    def train(self, num_episodes):
        """Training of network model"""
        print(f'Start training model for {num_episodes}:')
        reward_report = deque(maxlen=self.report_iters)
        loss_report = deque(maxlen=self.report_iters)
        for i in range(num_episodes):

            reward_report.append(self.do_episode())
            loss_report.append(self.update_model())

            # update target network
            if self.use_target and i % self.target_update_iters == 0:
                self.update_target()

            # report
            if i % self.report_iters == 0:
                mean_r = np.mean(reward_report)
                try:
                    mean_l = np.mean(loss_report)
                except TypeError:
                    mean_l = 0
                print(f'eps: {i:04} - mean reward/loss over last {self.report_iters} episodes: '
                      f'{mean_r:.1f}/{mean_l:.4f}')

        print('training finished.')

    def _decay_eps_threshold(self):
        """Step decay of threshold for epsilon greedy"""
        return self.epsilon_end + \
               (self.epsilon_start - self.epsilon_end) * \
               np.exp(-1. * self._step / self.epsilon_decay)

    def get_action(self, state):
        """Do forward pass and get action from model"""
        t = self._decay_eps_threshold()     # update threshold
        r = self._rnds.rand()    # sample from [0, 1]
        self._step += 1
        if r < t:   # do random action
            
            return tf.math.argmax(self.model(state),-1).numpy()[0]
        
        else:   # greedy action
            return  self.env.action_space.sample()


    def do_episode(self, render=False):
        """Do one episode in environment"""
        if render:
            self.env = gym.make(**self.env_kwargs, render_mode="human")
        stop = False
        total_reward = 0
        state, _ = self.env.reset(seed=self.seed)
        state = tf.convert_to_tensor(state.reshape(-1,8), dtype=tf.float32)

        while not stop:
            action = self.get_action(state)            
            next_state, reward, term, trunc, info = self.env.step(action)
            stop = term or trunc
            
            # amend reward --> reward shaping 
            if next_state[0] >= 0.1:
                reward += 10
            elif next_state[0] >= 0.25:
                reward += 20
            elif next_state[0] >= 0.5:
                reward += 100

            # render env
            if render:
                self.env.render()
                time.sleep(0.05)

            total_reward += reward

            next_state = tf.convert_to_tensor(next_state.reshape(-1,8), dtype=tf.float32)
            reward = tf.convert_to_tensor(reward,dtype=tf.float32)

            # save transition
            self.replay.push((tf.squeeze(state), action, tf.squeeze(next_state), reward,stop))

            state = next_state

        return total_reward


    def update_model(self):
        """Do batch update of model parameters"""
        if len(self.replay) <= self.batch_size:
            return

        # get transitions from buffer
        train_batch = self.replay.sample(self.batch_size)
    

        input = []
        output = []
        # calculate Q(s_t, a)
        current_states = np.array([ transition[0] for transition in train_batch ])
        current_q_list = self.model.predict(current_states)
        
        # calculate V(s_{t+1}) either with target network or original model
        future_states = np.array([ transition[2] for transition in train_batch ])
        future_q_list = self.target(future_states)
        print(future_q_list.shape)
        
        # mask final states
        # calculate expected Q-values
        for idx, (state, action, next_state, reward,stop) in enumerate(train_batch):
            if not stop:
                max_next_q = tf.math.reduce_max(future_q_list[idx])
                print('max_next_q:',max_next_q)
                new_q = reward + self.gamma * max_next_q
                print('new_q:',new_q)
            else:
                new_q = reward
        
            current_q = current_q_list[idx]
            print('current_q:',current_q)
            current_q[action] = new_q
            print(f'current_q: of action {action} is', current_q[action])
            
            input.append(state)
            output.append(current_q)
            break
        
        print(len(input), len(output))
        # update model params
        history = self.model.fit(input,output, batch_size=self.batch_size, verbose=0, shuffle=False)
        loss = history.history['loss']
        

        return loss

    def update_target(self):
        """Update target network with parameters of model"""
        return self.target.set_weights(deepcopy(self.model.get_weights()))



In [None]:
SEED = 123
N = 3000
env_kwargs = {
    "id": "LunarLander-v2",
    "continuous": False,
    "gravity" : -8.0,
    "enable_wind": False,
}


# initialize model
model = DQN(8, 4,).create_model()

# initialize trainer
trainer = Trainer(env_kwargs, model, batch_size=256, seed=SEED, epsilon_decay_iters=N*0.95, lr=0.001, clip_grad=0.1)

# train...
trainer.train(N)