In [1]:
!pip install gymnasium



In [2]:
import os
import sys
import contextlib
import gymnasium as gym
import random
import matplotlib.pyplot as plt
from tensorflow.keras import layers, models, optimizers
from typing import Dict, List, Tuple, Union
import numpy as np

import moviepy.editor as mpy
from IPython.display import Video
from IPython.display import HTML
from base64 import b64encode

def render_as_image(env: gym.Env) -> None:
    """ Display the environment's current state as an image. """
    plt.imshow(env.render())
    plt.axis('off')
    plt.show()


In [3]:
class DQN:
    """ Class to initialize, train and run an agent to move in environment. """

    def __init__(self, number_of_actions, number_of_states) -> None:
        """ Initialize the agent with default settings and a given environment. """

        self.batch_size = 100
        self.learning_rate = 0.001
        self.number_of_actions = number_of_actions
        self.number_of_states = number_of_states
        self.gamma = 0.9

        self.epsilon = 1
        self.min_epsilon = 0.02
        self.epsilon_decay = 0.99

        self.t_steps_occurence = 5

        self.Q_network = models.Sequential([
            layers.Input(shape=(self.number_of_states,)),
            layers.Dense(24, activation='relu'),
            layers.Dense(24, activation='relu'),
            layers.Dense(self.number_of_actions, activation='linear')
        ])
        self.Q_network.summary()

        self.Target_network = models.Sequential([
            layers.Input(shape=(self.number_of_states,)),
            layers.Dense(24, activation='relu'),
            layers.Dense(24, activation='relu'),
            layers.Dense(self.number_of_actions, activation='linear')
        ])
        self.Target_network.summary()

        self.Q_network.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=self.learning_rate))
        self.Target_network.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=self.learning_rate))

        self.train_buffer_size = 1000
        # self.train_buffer = list()
        self.train_buffer = {
            'states': [],
            'actions': [],
            'rewards': [],
            'next_states': [],
            'terminateds': [],
        }

    def choose_learn_action_epsilon_greedy_policy(self, state: int) -> int:
        """ Choose an action based on the epsilon-greedy policy. """

        if np.random.uniform(0, 1) < self.epsilon:
            # Exploration: choose a random action
            # action = self.environment.action_space.sample()
            action = random.randint(0, self.number_of_actions - 1)
        else:
            # Exploitation: choose the action with the highest Q-value
            state_encoded = self.one_hot_encode(state).reshape(1, -1)  # example: [[1,0,0,0]] - vertically
            q_values = self.Q_network.predict(state_encoded, verbose=0)
            action = np.argmax(q_values)

        return action

    def update_exploration_rate(self):
        """ Update the exploration rate based on the decay rate. """
        if self.epsilon > self.min_epsilon:
            self.epsilon *= self.epsilon_decay
        else:
            self.epsilon = self.min_epsilon

    def store_episode(self, state, action, reward, next_state, terminated):
        """ Store an episode in the training buffer. """
        # self.train_buffer.append((state, action, reward, next_state, terminated))
        self.train_buffer['states'].append(state)
        self.train_buffer['actions'].append(action)
        self.train_buffer['rewards'].append(reward)
        self.train_buffer['next_states'].append(next_state)
        self.train_buffer['terminateds'].append(terminated)

        if len(self.train_buffer['states']) > self.train_buffer_size:
          # self.train_buffer.pop(0)
          self.train_buffer['states'].pop(0)
          self.train_buffer['actions'].pop(0)
          self.train_buffer['rewards'].pop(0)
          self.train_buffer['next_states'].pop(0)
          self.train_buffer['terminateds'].pop(0)

    def get_buffer_randomized(self):
        """ Get a random batch of episodes from the training buffer. """
        # random.shuffle(self.train_buffer)
        # return self.train_buffer[:self.batch_size]

        # random.shuffle(self.train_buffer)
        # # return self.train_buffer[:self.batch_size]
        # batch = self.train_buffer[:self.batch_size]
        # # Batch the states, actions, rewards, next_states, and terminateds
        # states = np.array([s for s, _, _, _, _ in batch])
        # actions = np.array([a for _, a, _, _, _ in batch])
        # rewards = np.array([r for _, _, r, _, _ in batch])
        # next_states = np.array([ns for _, _, _, ns, _ in batch])
        # terminateds = np.array([d for _, _, _, _, d in batch])
        # return states, actions, rewards, next_states, terminateds

        states = np.array(self.train_buffer['states'][:self.batch_size])
        actions = np.array(self.train_buffer['actions'][:self.batch_size])
        rewards = np.array(self.train_buffer['rewards'][:self.batch_size])
        next_states = np.array(self.train_buffer['next_states'][:self.batch_size])
        terminateds = np.array(self.train_buffer['terminateds'][:self.batch_size])
        return states, actions, rewards, next_states, terminateds

    def one_hot_encode(self, state):
        """ One-hot encode the input state. """
        one_hot = np.zeros(self.number_of_states)
        one_hot[state] = 1
        return one_hot


class Agent:
    """ Class to initialize, train and run an agent to move in environment. """

    def __init__(self, enable_training_render: bool = False, render_episodes_from: int = 0) -> None:
        """ Initialize the agent with default settings and a CliffWalking environment. """

        self.enable_training_render = enable_training_render
        # Declaring environment
        if self.enable_training_render:
            self.environment: gym.Env = gym.make('CliffWalking-v0', render_mode='rgb_array')
        else:
          self.environment: gym.Env = gym.make('CliffWalking-v0')

        self.render_episodes_from = render_episodes_from

        # Initial state
        self.current_state: Union[int, None] = self.environment.reset()[0]

        # DQN-learning models
        self.DQN = DQN(self.environment.action_space.n, self.environment.observation_space.n)

    def end_training(self) -> None:
        """ Close the environment after training. """
        self.environment.close()

    def playDQN_learning(self, number_of_episodes: int = 1) -> None:
        """ Train the agent using the DQN-learning. """

        episode_count = 0

        while episode_count < number_of_episodes:
            # terminated, truncated, reward = False, False, 0
            terminated = False
            # TODO: reset buffer?
            self.reset()
            t_step_count = 0
            cumulated_reward = 0

            if self.enable_training_render:
                self.environment.render()

            print(f"episode: {episode_count}, epsilon: {self.DQN.epsilon}, train_buffer: {len(self.DQN.train_buffer['states'])}")

            # while not terminated and not truncated and reward != -100:
            while not terminated:

                # PP Experience Replay gathers a training sample by
                # interacting with the environment and saves it as Training data

                # Choose an action using epsilon-greedy policy
                action = self.DQN.choose_learn_action_epsilon_greedy_policy(self.current_state)
                # Take the action and observe the result
                next_state, reward, terminated, truncated, info = self.environment.step(action)
                if self.enable_training_render and episode_count >= self.render_episodes_from:
                    render_as_image(self.environment)
                    print(f"episode: {episode_count}, epsilon: {self.DQN.epsilon}, train_buffer: {len(self.DQN.train_buffer['states'])} "
                          f"next_state: {next_state}, reward: {reward}, terminated: {terminated}, truncated: {truncated}, info: {info}")

                # Store episode values in buffer
                self.DQN.store_episode(
                    self.DQN.one_hot_encode(self.current_state),  # self.current_state,  # self.DQN.one_hot_encode(self.current_state),
                    action,
                    reward,
                    self.DQN.one_hot_encode(next_state),  # next_state,  # self.DQN.one_hot_encode(next_state),
                    terminated
                )

                # Update the agent state
                self.current_state = next_state

                # when the buffer has needed episode values amount
                if len(self.DQN.train_buffer['states']) > self.DQN.batch_size:

                    """
                    # PP Random batch of training data is input to both networks
                    pulled_batch = self.DQN.get_buffer_randomized()

                    for state, action, reward, next_state, terminated in pulled_batch:

                         # Q Network predicts Q Value
                        state_encoded = self.DQN.one_hot_encode(state).reshape(1, -1)
                        next_target_q_values = self.DQN.Q_network.predict(state_encoded, verbose=0)

                        if terminated:
                            next_target_q_values[0][action] = reward
                        else:
                            # Target Network predicts Target Q Value
                            next_state_encoded = self.DQN.one_hot_encode(next_state).reshape(1, -1)
                            current_target_q_values = self.DQN.Target_network.predict(next_state_encoded, verbose=0)

                            # Compute loss
                            next_target_q_values[0][action] = reward + self.DQN.gamma * np.amax(current_target_q_values)

                        # Train Q Network only. Target network remains fixed
                        self.DQN.Q_network.fit(state_encoded, next_target_q_values, epochs=1, verbose=0)
                    """

                    # PP Random batch of training data is input to both networks
                    states, actions, rewards, next_states, terminateds = self.DQN.get_buffer_randomized()

                    # Q Network predicts Q Value
                    next_q_values = self.DQN.Q_network.predict(next_states)

                    # Target Network predicts Target Q Value
                    next_target_q_values = self.DQN.Target_network.predict(next_states)

                    # Compute loss
                    for i in range(self.DQN.batch_size):
                        target = rewards[i]
                        if not terminateds[i]:
                            target += self.DQN.gamma * np.max(next_target_q_values[i])
                        next_target_q_values[i][actions[i]] = target

                    # Train Q Network only. Target network remains fixed
                    self.DQN.Q_network.fit(states, next_target_q_values, epochs=1, verbose=0)
                    print(f"episode {episode_count}, step {t_step_count}, train_buffer: {len(self.DQN.train_buffer['states'])}, epsilon {self.DQN.epsilon}")

            # Update epsilon
            self.DQN.update_exploration_rate()

            # Every T step, copy Q Network weights to Target Network
            if not episode_count % self.DQN.t_steps_occurence:
                self.DQN.Target_network.set_weights(self.DQN.Q_network.get_weights())

            # Set new episode
            episode_count += 1

        self.end_training()

    def follow_best_policy(self) -> None:
        """ Follow the best policy learned by the agent and render the environment at each step. """
        self.environment = gym.make('CliffWalking-v0', render_mode='rgb_array')

        self.current_state = self.environment.reset()[0]
        self.environment.render()
        frames = []
        i = 0
        r = 0.0
        terminated = False
        while not terminated:
            frames.append(self.environment.render())
            render_as_image(self.environment)

            # epsilon = 0 => strictly greedy policy
            action = self.choose_action_epsilon_greedy_policy(self.current_state, epsilon=0.0)
            next_state, reward, terminated, truncated, info = self.environment.step(action)
            print(f'Step {i}: observation={next_state}, reward={reward}, done={terminated}')

            r += float(reward)
            i += 1
            self.current_state = next_state

        clip = mpy.ImageSequenceClip(frames, fps=5)
        clip.write_videofile("cliff_walking_run.mp4")

        print(f"FINAL STATE".center(100, '-'))
        render_as_image(self.environment)

        print("Reward ", r)

    def choose_action_epsilon_greedy_policy(self, state: int, epsilon: float) -> int:
        """ Choose an action based on the epsilon-greedy policy. """

        if np.random.uniform(0, 1) < epsilon:
            # Exploration: choose a random action
            # action = self.environment.action_space.sample()
            action = random.randint(0, self.environment.action_space.n - 1)
        else:
            # Exploitation: choose the action with the highest Q-value
            state_encoded = self.DQN.one_hot_encode(state).reshape(1, -1)
            q_values = self.DQN.Target_network.predict(state_encoded)
            action = np.argmax(q_values)

        return action

    def reset(self) -> None:
        """ Reset the environment and traces for a new episode. """
        self.current_state = self.environment.reset()[0]

In [None]:
agent = Agent()
# agent = Agent(enable_training_render=True, render_episodes_from=0)
agent.playDQN_learning()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
episode 0, step 0, train_buffer: 1000, epsilon 1
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 
episode 0, step 0, train_buffer: 1000, epsilon 1
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
episode 0, step 0, train_buffer: 1000, epsilon 1
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
episode 0, step 0, train_buffer: 1000, epsilon 1
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
episode 0, step 0, train_buffer: 1000, epsilon 1
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

In [None]:
agent.follow_best_policy()

video_path = 'cliff_walking_run.mp4'

with open(video_path, 'rb') as video_file:
    video_data = b64encode(video_file.read()).decode()

video_tag = f'''
<video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{video_data}" type="video/mp4">
    Your browser does not support the video tag.
</video>'''
HTML(video_tag)