In [None]:
import os
import tensorflow as tf
from tensorflow.keras import layers, models, losses, optimizers

class Estimator:
    """
    Q-Value Estimator neural network.
    This network is used for both the Q-Network and the Target Network.
    """

    def __init__(self, scope="estimator", summaries_dir=None, num_actions=4):
        self.scope = scope
        self.num_actions = num_actions
        self.model = self._build_model()  # Build the Keras model
        self.optimizer = optimizers.Adam(learning_rate=0.001)
        self.summary_writer = None

        if summaries_dir:
            summary_dir = os.path.join(summaries_dir, f"summaries_{scope}")
            os.makedirs(summary_dir, exist_ok=True)
            self.summary_writer = tf.summary.create_file_writer(summary_dir)

    def _build_model(self):

        model = models.Sequential([
            layers.Input(shape=(84, 84, 4), name="X"),
            layers.Conv2D(32, (8, 8), strides=(4, 4), activation='relu'),
            layers.Conv2D(64, (4, 4), strides=(2, 2), activation='relu'),
            layers.Conv2D(64, (3, 3), strides=(1, 1), activation='relu'),
            layers.Flatten(),
            layers.Dense(512, activation='relu'),
            layers.Dense(self.num_actions, name="predictions")
        ])
        return model

    def predict(self, s):
        """
        Predicts action values.

        Args:
          s: State input of shape [batch_size, 84, 84, 4]

        Returns:
          Tensor of shape [batch_size, num_actions] containing the estimated action values.
        """
        s = tf.convert_to_tensor(s, dtype=tf.float32) / 255.0
        return self.model(s)

    def update(self, s, a, y):
        """
        Updates the estimator towards the given targets.

        Args:
          s: State input of shape [batch_size, 84, 84, 4]
          a: Chosen actions of shape [batch_size]
          y: Targets of shape [batch_size]

        Returns:
          The calculated loss on the batch.
        """
        s = tf.convert_to_tensor(s, dtype=tf.float32) / 255.0
        a = tf.convert_to_tensor(a, dtype=tf.int32)
        y = tf.convert_to_tensor(y, dtype=tf.float32)

        with tf.GradientTape() as tape:
            # Predict Q-values for all actions
            q_values = self.model(s)
            # Select Q-value of chosen actions
            indices = tf.stack([tf.range(tf.shape(a)[0]), a], axis=1)
            predicted_q = tf.gather_nd(q_values, indices)
            # Compute loss
            loss = losses.MeanSquaredError()(y, predicted_q)

        # Backpropagation
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

        # Log the loss for TensorBoard
        if self.summary_writer:
            with self.summary_writer.as_default():
                tf.summary.scalar("loss", loss, step=tf.compat.v1.train.get_or_create_global_step())

        return loss.numpy()


In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models
import gym

class StateProcessor:
    """
    Processes game frames into the correct input format for the model.
    Example: Resizing and converting to grayscale.
    """
    def __init__(self):
        self.resizer = tf.keras.layers.Resizing(84, 84)

    def process(self, observation):
        """
        Processes an observation frame (e.g., resizing, grayscaling).
        Args:
          observation: A raw frame from the environment.
        Returns:
          A processed frame of shape (84, 84, 1).
        """
        observation = tf.image.rgb_to_grayscale(observation)
        observation = self.resizer(observation)
        return observation.numpy()

# Initialize  objects
sp = StateProcessor()
e = Estimator(num_actions=4)

env = gym.envs.make("Breakout-v0")
observation = env.reset()

observation_p = sp.process(observation)

# Create a stack of 4 identical frames as input
observation = np.stack([observation_p] * 4, axis=2)
observations = np.array([observation] * 2)

predicted_q_values = e.predict(observations)
print("Predicted Q-values:", predicted_q_values)

# Test training step
targets = np.array([10.0, 10.0])
actions = np.array([1, 3])
loss = e.update(observations, actions, targets)
print("Loss after update:", loss)

NameNotFound: Environment Breakout doesn't exist. 

In [None]:
def copy_model_parameters(sess, estimator1, estimator2):
    """
    Copies the model parameters of one estimator to another.

    Args:
      sess: Tensorflow session instance
      estimator1: Estimator to copy the paramters from
      estimator2: Estimator to copy the parameters to
    """
    e1_params = [t for t in tf.trainable_variables() if t.name.startswith(estimator1.scope)]
    e1_params = sorted(e1_params, key=lambda v: v.name)
    e2_params = [t for t in tf.trainable_variables() if t.name.startswith(estimator2.scope)]
    e2_params = sorted(e2_params, key=lambda v: v.name)

    update_ops = []
    for e1_v, e2_v in zip(e1_params, e2_params):
        op = e2_v.assign(e1_v)
        update_ops.append(op)

    sess.run(update_ops)

In [None]:
def make_epsilon_greedy_policy(estimator, nA):
    """
    Creates an epsilon-greedy policy based on a given Q-function approximator and epsilon.

    Args:
        estimator: An estimator that returns q values for a given state
        nA: Number of actions in the environment.

    Returns:
        A function that takes the (sess, observation, epsilon) as an argument and returns
        the probabilities for each action in the form of a numpy array of length nA.

    """
    def policy_fn(sess, observation, epsilon):
        A = np.ones(nA, dtype=float) * epsilon / nA
        q_values = estimator.predict(sess, np.expand_dims(observation, 0))[0]
        best_action = np.argmax(q_values)
        A[best_action] += (1.0 - epsilon)
        return A
    return policy_fn

In [None]:
def deep_q_learning(sess,
                    env,
                    q_estimator,
                    target_estimator,
                    state_processor,
                    num_episodes,
                    experiment_dir,
                    replay_memory_size=500000,
                    replay_memory_init_size=50000,
                    update_target_estimator_every=10000,
                    discount_factor=0.99,
                    epsilon_start=1.0,
                    epsilon_end=0.1,
                    epsilon_decay_steps=500000,
                    batch_size=32,
                    record_video_every=50):
    """
    Q-Learning algorithm for off-policy TD control using Function Approximation.
    Finds the optimal greedy policy while following an epsilon-greedy policy.

    Args:
        sess: Tensorflow Session object
        env: OpenAI environment
        q_estimator: Estimator object used for the q values
        target_estimator: Estimator object used for the targets
        state_processor: A StateProcessor object
        num_episodes: Number of episodes to run for
        experiment_dir: Directory to save Tensorflow summaries in
        replay_memory_size: Size of the replay memory
        replay_memory_init_size: Number of random experiences to sampel when initializing
          the reply memory.
        update_target_estimator_every: Copy parameters from the Q estimator to the
          target estimator every N steps
        discount_factor: Gamma discount factor
        epsilon_start: Chance to sample a random action when taking an action.
          Epsilon is decayed over time and this is the start value
        epsilon_end: The final minimum value of epsilon after decaying is done
        epsilon_decay_steps: Number of steps to decay epsilon over
        batch_size: Size of batches to sample from the replay memory
        record_video_every: Record a video every N episodes

    Returns:
        An EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
    """

    Transition = namedtuple("Transition", ["state", "action", "reward", "next_state", "done"])

    # The replay memory
    replay_memory = []

    # Keeps track of useful statistics
    stats = plotting.EpisodeStats(
        episode_lengths=np.zeros(num_episodes),
        episode_rewards=np.zeros(num_episodes))

    # Create directories for checkpoints and summaries
    checkpoint_dir = os.path.join(experiment_dir, "checkpoints")
    checkpoint_path = os.path.join(checkpoint_dir, "model")
    monitor_path = os.path.join(experiment_dir, "monitor")

    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    if not os.path.exists(monitor_path):
        os.makedirs(monitor_path)

    saver = tf.train.Saver()
    # Load a previous checkpoint if we find one
    latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
    if latest_checkpoint:
        print("Loading model checkpoint {}...\n".format(latest_checkpoint))
        saver.restore(sess, latest_checkpoint)

    # Get the current time step
    total_t = sess.run(tf.contrib.framework.get_global_step())

    # The epsilon decay schedule
    epsilons = np.linspace(epsilon_start, epsilon_end, epsilon_decay_steps)

    # The policy we're following
    policy = make_epsilon_greedy_policy(
        q_estimator,
        len(VALID_ACTIONS))

    # Populate the replay memory with initial experience
    print("Populating replay memory...")
    state = env.reset()
    state = state_processor.process(sess, state)
    state = np.stack([state] * 4, axis=2)
    for i in range(replay_memory_init_size):
        action_probs = policy(sess, state, epsilons[total_t])
        action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
        next_state, reward, done, _ = env.step(VALID_ACTIONS[action])
        next_state = state_processor.process(sess, next_state)
        next_state = np.append(state[:,:,1:], np.expand_dims(next_state, 2), axis=2)
        replay_memory.append(Transition(state, action, reward, next_state, done))
        if done:
            state = env.reset()
            state = state_processor.process(sess, state)
            state = np.stack([state] * 4, axis=2)
        else:
            state = next_state

    # Record videos
    env.monitor.start(monitor_path,
                      resume=True,
                      video_callable=lambda count: count % record_video_every == 0)

    for i_episode in range(num_episodes):

        # Save the current checkpoint
        saver.save(tf.get_default_session(), checkpoint_path)

        # Reset the environment
        state = env.reset()
        state = state_processor.process(sess, state)
        state = np.stack([state] * 4, axis=2)
        loss = None

        # One step in the environment
        for t in itertools.count():

            # Epsilon for this time step
            epsilon = epsilons[min(total_t, epsilon_decay_steps-1)]

            # Add epsilon to Tensorboard
            episode_summary = tf.Summary()
            episode_summary.value.add(simple_value=epsilon, tag="epsilon")
            q_estimator.summary_writer.add_summary(episode_summary, total_t)

            # Maybe update the target estimator
            if total_t % update_target_estimator_every == 0:
                copy_model_parameters(sess, q_estimator, target_estimator)
                print("\nCopied model parameters to target network.")

            # Print out which step we're on, useful for debugging.
            print("\rStep {} ({}) @ Episode {}/{}, loss: {}".format(
                    t, total_t, i_episode + 1, num_episodes, loss), end="")
            sys.stdout.flush()

            # Take a step
            action_probs = policy(sess, state, epsilon)
            action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
            next_state, reward, done, _ = env.step(VALID_ACTIONS[action])
            next_state = state_processor.process(sess, next_state)
            next_state = np.append(state[:,:,1:], np.expand_dims(next_state, 2), axis=2)

            # If our replay memory is full, pop the first element
            if len(replay_memory) == replay_memory_size:
                replay_memory.pop(0)

            # Save transition to replay memory
            replay_memory.append(Transition(state, action, reward, next_state, done))

            # Update statistics
            stats.episode_rewards[i_episode] += reward
            stats.episode_lengths[i_episode] = t

            # Sample a minibatch from the replay memory
            samples = random.sample(replay_memory, batch_size)
            states_batch, action_batch, reward_batch, next_states_batch, done_batch = map(np.array, zip(*samples))

            # Calculate q values and targets
            # This is where Double Q-Learning comes in!
            q_values_next = q_estimator.predict(sess, next_states_batch)
            best_actions = np.argmax(q_values_next, axis=1)
            q_values_next_target = target_estimator.predict(sess, next_states_batch)
            targets_batch = reward_batch + np.invert(done_batch).astype(np.float32) * \
                discount_factor * q_values_next_target[np.arange(batch_size), best_actions]

            # Perform gradient descent update
            states_batch = np.array(states_batch)
            loss = q_estimator.update(sess, states_batch, action_batch, targets_batch)

            if done:
                break

            state = next_state
            total_t += 1

        # Add summaries to tensorboard
        episode_summary = tf.Summary()
        episode_summary.value.add(simple_value=stats.episode_rewards[i_episode], node_name="episode_reward", tag="episode_reward")
        episode_summary.value.add(simple_value=stats.episode_lengths[i_episode], node_name="episode_length", tag="episode_length")
        q_estimator.summary_writer.add_summary(episode_summary, total_t)
        q_estimator.summary_writer.flush()

        yield total_t, plotting.EpisodeStats(
            episode_lengths=stats.episode_lengths[:i_episode+1],
            episode_rewards=stats.episode_rewards[:i_episode+1])

    env.monitor.close()
    return stats

In [None]:
class ExpReplay():
    def __init__(self, e_max=15000, e_min=100):
        self._max = e_max
        self._min = e_min
        self.exp = {'state':[], 'action':[], 'reward':[], 'next_state':[], 'done':[]} # total experiences the Agent stored

    def get_max(self):
        return self._max

    def get_min(self):
        return self._min

    def get_num(self):
        return len(self.exp['state'])

    def get_batch(self, batch_size=64):
        idx = np.random.choice(self.get_num(), size=batch_size, replace=False)
        state = np.array([self.exp['state'][i] for i in idx])
        action = [self.exp['action'][i] for i in idx]
        reward = [self.exp['reward'][i] for i in idx]
        next_state = np.array([self.exp['next_state'][i] for i in idx])
        done = [self.exp['done'][i] for i in idx]
        return state, action, reward, next_state, done

    def add(self, state, action, reward, next_state, done):
        if self.get_num()>self.get_max():
            del self.exp['state'][0]
            del self.exp['action'][0]
            del self.exp['reward'][0]
            del self.exp['next_state'][0]
            del self.exp['done'][0]

        self.exp['state'].append(state)
        self.exp['action'].append(action)
        self.exp['reward'].append(reward)
        self.exp['next_state'].append(next_state)
        self.exp['done'].append(done)

In [None]:
class TNET():
    """
    Target network is for calculating the maximum estimated Q-value in given action a.
    """
    def __init__(self, in_units, out_units, hidden_units=250):
        self.in_units = in_units
        self.out_units = out_units
        self.hidden_units = hidden_units
        self._model()

    def _model(self):
        with tf.variable_scope('tnet'):
            # input layer
            self.x = tf.placeholder(tf.float32, shape=(None, self.in_units))

            # from input layer to hidden layer1
            W1=tf.get_variable('W1', shape=(self.in_units, self.hidden_units), initializer=tf.random_normal_initializer())
            # from hidden layer1 to hiiden layer2
            W2=tf.get_variable('W2', shape=(self.hidden_units, self.hidden_units), initializer=tf.random_normal_initializer())
            # from hidden layer2 to output layer
            W3=tf.get_variable('W3', shape=(self.hidden_units, self.out_units), initializer=tf.random_normal_initializer())

            # the bias of hidden layer1
            b1=tf.get_variable('b1', shape=(self.hidden_units), initializer=tf.zeros_initializer())
            # the bias of hidden layer2
            b2=tf.get_variable('b2', shape=(self.hidden_units), initializer=tf.zeros_initializer())

            # the ouput of hidden layer1
            h1=tf.nn.tanh(tf.matmul(self.x, W1)+b1)
            # the output of hidden layer2
            h2=tf.nn.tanh(tf.matmul(h1, W2)+b2)

            # the output of output layer, that is, Q-value
            self.q=tf.matmul(h2, W3)
            self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='tnet')

In [None]:
class QNET():
  def batch_train(self, batch_size=64):
        """Implement Double DQN Algorithm, batch training"""
        if self.exp.get_num() < self.exp.get_min():

            return
        state, action, reward, next_state, done = self.exp.get_batch(batch_size)
        state = state.reshape(batch_size, self.in_units)
        next_state = next_state.reshape(batch_size, self.in_units)

        qnet_q_values = self.session.run(self.q, feed_dict={self.x:next_state})
        qnet_actions = np.argmax(qnet_q_values, axis=1)

        # calculate estimated Q-values with qnet_actions by using Target-network
        tnet_q_values = self.session.run(self.tnet.q, feed_dict={self.tnet.x:next_state})
        tnet_q = [np.take(tnet_q_values[i], qnet_actions[i]) for i in range(batch_size)]

        qnet_update_q = [r+0.95*q if not d else r for r, q, d in zip(reward, tnet_q, done)]

        indices=[[i,action[i]] for i in range(batch_size)]
        feed_dict={self.x:state, self.target:qnet_update_q, self.selected_idx:indices}
        self.session.run(self.train_opt, feed_dict)

In [None]:
class Agent():
    def __init__(self, env):

        self.max_episodes = 10000
        self.max_actions = 10000
        self.exploration_rate = 1.0
        self.exploration_decay = 0.0001
        self.env = env
        self.states = env.observation_space.shape[0]
        self.actions = env.action_space.n

        self.exp = ExpReplay()

        self.batch_size = 64

        # Deep Q Network
        self.qnet = QNET(self.states, self.actions, self.exp)
        session = tf.InteractiveSession()
        session.run(tf.global_variables_initializer())
        self.qnet.set_session(session)

    def train(self):
        max_episodes = self.max_episodes
        max_actions = self.max_actions
        exploration_rate = self.exploration_rate
        exploration_decay = self.exploration_decay
        batch_size = self.batch_size

        record_rewards = []
        for i in range(max_episodes):
            total_rewards = 0
            state = self.env.reset()
            state = state.reshape(1, self.states)
            for j in range(max_actions):
                self.env.render()
                action = self.qnet.get_action(state, exploration_rate)
                next_state, reward, done, info = self.env.step(action)
                next_state = next_state.reshape(1, self.states)
                total_rewards += reward

                if done:
                    self.exp.add(state, action, (reward-100), next_state, done)
                    self.qnet.batch_train(batch_size)
                    break

                self.exp.add(state, action, reward, next_state, done)
                self.qnet.batch_train(batch_size)
                if (j%25)== 0 and j>0:
                    self.qnet.update()
                state = next_state

            record_rewards.append(total_rewards)
            exploration_rate = 0.01 + (exploration_rate-0.01)*np.exp(-exploration_decay*(i+1))
            if i%100==0 and i>0:
                average_rewards = np.mean(np.array(record_rewards))
                record_rewards = []
                print("episodes: %i to %i, average_reward: %.3f, exploration: %.3f" %(i-100, i, average_rewards, exploration_rate))


In [None]:
import tensorflow as tf
import tensorflow.keras.layers as kl


class DuelingQNetwork(tf.keras.Model, SamplingMixin):

    def __init__(self, actions_space):

        super(DuelingQNetwork, self).__init__()
        self.action_space = actions_space
        self.conv1 = kl.Conv2D(32, 8, strides=4, activation="relu",
                               kernel_initializer="he_normal")
        self.conv2 = kl.Conv2D(64, 4, strides=2, activation="relu",
                               kernel_initializer="he_normal")
        self.conv3 = kl.Conv2D(64, 3, strides=1, activation="relu",
                               kernel_initializer="he_normal")
        self.flatten1 = kl.Flatten()
        self.dense1 = kl.Dense(512, activation="relu",
                               kernel_initializer="he_normal")
        self.value = kl.Dense(1, kernel_initializer="he_normal")
        self.dense2 = kl.Dense(512, activation="relu",
                               kernel_initializer="he_normal")
        self.advantages = kl.Dense(self.action_space,
                                   kernel_initializer="he_normal")

    @tf.function
    def call(self, x):

        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.flatten1(x)

        x1 = self.dense1(x)
        value = self.value(x1)

        x2 = self.dense2(x)
        advantages = self.advantages(x2)
        advantages_scaled = advantages - tf.reduce_mean(advantages, axis=1, keepdims=True)

        q = value + advantages_scaled

        return q

NameError: name 'SamplingMixin' is not defined