<a href="https://colab.research.google.com/github/ai-in-practice/youtube/blob/main/colab/ai_in_practice_intro_to_ai_agents_exploration_exploitation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### **AI in practice: intro to AI agents - reinforcement learning notebook**

Let's get started with reinforcement learning!

This notebook contains a simple RL agent that needs to learn how to navigate a frozen lake:

<figure>
<center>
<img src='https://www.gymlibrary.dev/_images/frozen_lake.gif' />
<figcaption>Frozen Lake</figcaption></center>
</figure>

A full description of the `FrozenLake` environment [can be found here](https://www.gymlibrary.dev/environments/toy_text/frozen_lake/).

We're using OpenAI's [gym](https://github.com/openai/gym) Python package, which has an extensive collection of environments for reinforcement learning agents.


*In order to visualise the agent environments and agent actions in our Google Colab notebooks, we need to install a number of Python and Linux packages. I used a Colab notebook [by Jeff Heaton from Washington University](https://colab.research.google.com/github/jeffheaton/t81_558_deep_learning/blob/master/t81_558_class_12_01_ai_gym.ipynb) to figure out which packages needed to be installed.*

We first install the necessary Python packages


In [None]:
!pip install gym pyvirtualdisplay seaborn

Next, we install the required Linux packages

In [None]:
!apt-get install -y xvfb python-opengl ffmpeg
!apt-get update
!apt-get install cmake


With the required Linux packages installed, we need to update the Python runtime environment to be able to run video in the Colab notebook

In [None]:
!pip install --upgrade setuptools
!pip install ez_setup
!pip install gym[atari]

With the installation done, we can now import the Python packages into our notebook environment.

In [None]:
import io
import gym
import glob
import base64
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from gym.wrappers.record_video import RecordVideo

from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay

We need some more code to make sure the videos we record are shown in our notebook (adapted from [Jeff Heaton](https://colab.research.google.com/github/jeffheaton/t81_558_deep_learning/blob/master/t81_558_class_12_01_ai_gym.ipynb))

In [None]:
def show_video():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

In [None]:
def wrap_env(env):
    env = RecordVideo(env, './video')
    return env

In [None]:
import warnings
warnings.filterwarnings('ignore')

We need to add some code to evaluate how our agent is doing in reaching the treasure on the other side of the frozen lake.

In [None]:
def evaluate_agent(env, q_table, num_episodes=100):
    total_rewards = 0
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
            action = np.argmax(q_table[state, :])
            state, reward, done, _ = env.step(action)
            total_rewards += reward
    return total_rewards / num_episodes

In [None]:
def visualize_policy(q_table):
    policy = np.argmax(q_table, axis=1)
    actions = ["←", "↓", "→", "↑"]
    policy_chars = [actions[a] for a in policy]
    policy_chars = np.array(policy_chars).reshape(4, 4)
    print(policy_chars)

**Now we can finally run our environment! Let's see what we have going on:**

In [None]:
env = gym.make('FrozenLake-v1', is_slippery=False)

This is an environment in which the agent can take four different actions:

* 0: LEFT
* 1: DOWN
* 2: RIGHT
* 3: UP

For a full description, see the [environment documentation](https://www.gymlibrary.dev/environments/toy_text/frozen_lake/).

In [None]:
q_table_before_training = np.zeros([env.observation_space.n, env.action_space.n])

In [None]:
print(env.observation_space.n)

In [None]:
print(q_table_before_training)

We initialise the Q-table with a probability of zero for each of the four actions the can take (left, down, right, up).

Rather than taking random actions, for now we'll initialise with a simple rule: 'always go left'.

In [None]:
visualize_policy(q_table_before_training)

Let's see what this looks like on video

In [None]:
video_env = wrap_env(env)

In [None]:
observation = video_env.reset()

while True:
    video_env.render()
    action = 0
    observation, reward, done, info = video_env.step(action)

    if done:
        break

video_env.close()
show_video()

Not great! :) The agent is just trying to go to the left each step..

In [None]:
pre_training_score = evaluate_agent(env, q_table_before_training)
print(f"Average reward before training: {pre_training_score}")

As expected, the agent never reaches its goal - the treasure! - and reward is always 0.

Okay, so let's see if we can improve on this by training the agent using Q-learning.

### **Q-learning algorithm**

The Q-learning algorithm works by updating its value policy (Q) in the following way:

$$Q(s, a) \leftarrow Q(s, a) + \alpha \left( r + \gamma \max_{a'} Q(s', a') - Q(s, a) \right)$$

where $Q(s,a)$ is the Q-value for state 𝑠 and action 𝑎, α is the learning rate, 𝑟 is the reward received after taking action 𝑎 from state 𝑠, 𝛾 is the discount factor, and $\max_{𝑎′}𝑄(𝑠′,𝑎′)$ is the maximum Q-value for the next state 𝑠′ over all possible actions 𝑎′.

We're setting the hyperparameters based on values that have been shown to work well for Q-learning:

* Learning rate (α / alpha): Determines how much new information overrides the old information.
* Discount factor (γ / gamma): Determines the importance of future rewards.
* Exploration rate (ε / epsilon): Determines the probability of choosing a random action over the best-known action.

In [None]:
# Create the environment
env = gym.make('FrozenLake-v1', is_slippery=False)

# Initialize Q-table
q_table = np.zeros([env.observation_space.n, env.action_space.n])

# Hyperparameters
alpha = 0.8
gamma = 0.95
epsilon = 0.2  # Increased epsilon for more exploration
num_episodes = 1000
max_steps = 100  # Maximum steps per episode

# Keep track of the agent task completion rates
rewards = []
success_rate = []
step_count = []

for episode in range(num_episodes):
    state = env.reset()
    if isinstance(state, tuple):
        state = state[0]

    done = False
    total_reward = 0 # A reward is '1' if the agent reaches the treasure, 0 otherwise
    steps = 0 # The number of steps the agent needs to reach the treasure

    while not done:
        # Choose action
        if np.random.uniform(0, 1) < epsilon:
            # Exploration: choose a random action from 0-3 (up, left, right, down)
            action = env.action_space.sample()
        else:
            # Exploitation: use the learnt policy to select the next action
            action = np.argmax(q_table[state, :])

        # Take action
        step_return = env.step(action)
        if len(step_return) == 4:
            next_state, reward, done, _ = step_return
        else:
            next_state, reward, done, _, _ = step_return

        # Set up the reward structure
        if done and reward == 1:  # Reached the goal - found the treasure!
            adjusted_reward = 1
            step_count.append(steps + 1)
        elif done and reward == 0:  # Fell into a hole in the ice - game over!
            adjusted_reward = -1
        else:  # Small penalty for each step, continue playing
            adjusted_reward = -0.01

        total_reward += adjusted_reward

        old_q = q_table[state, action]
        next_max = np.max(q_table[next_state])
        new_q = (1 - alpha) * old_q + alpha * (adjusted_reward + gamma * next_max)
        q_table[state, action] = new_q

        state = next_state
        steps += 1

    rewards.append(total_reward)
    success_rate.append(1 if total_reward > 0 else 0)


# Set up the plotting style
plt.style.use('seaborn')
sns.set_palette("deep")

# Create a figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
fig.suptitle('Q-Learning Performance on FrozenLake', fontsize=16)

# Plot rewards as a bar chart
successful_mask = np.array(rewards) > 0
ax1.bar(range(num_episodes), rewards, color=['#2ecc71' if s else '#e74c3c' for s in successful_mask], alpha=0.7)
ax1.set_title('Rewards over Episodes', fontsize=14)
ax1.set_xlabel('Episode', fontsize=12)
ax1.set_ylabel('Total Reward', fontsize=12)
ax1.grid(True, linestyle='--', alpha=0.7)

# Add a legend for the bar chart
from matplotlib.patches import Patch
legend_elements = [Patch(facecolor='#2ecc71', edgecolor='#2ecc71', label='Successful'),
                   Patch(facecolor='#e74c3c', edgecolor='#e74c3c', label='Unsuccessful')]
ax1.legend(handles=legend_elements, loc='upper right')

# Plot step count for successful runs
ax2.plot(step_count, alpha=0.7)
ax2.set_title('Steps for Successful Runs', fontsize=14)
ax2.set_xlabel('Successful Run', fontsize=12)
ax2.set_ylabel('Number of Steps', fontsize=12)
ax2.grid(True, linestyle='--', alpha=0.7)

# Add a trend line to the step count plot
z = np.polyfit(range(len(step_count)), step_count, 1)
p = np.poly1d(z)
ax2.plot(range(len(step_count)), p(range(len(step_count))), "r--", alpha=0.8)

plt.tight_layout()
plt.show()

# Print final Q-table
print("\nFinal Q-table:")
print(q_table)

env.close()

Let's see how the agent is performing after training

In [None]:
post_training_score = evaluate_agent(env, q_table)
print(f"Average reward after training: {post_training_score}")


That's a lot better! The agent now almost always reaches the treasure 🧳!

In [None]:
def visualize_policy(q_table):
    policy = np.argmax(q_table, axis=1)
    actions = ["←", "↓", "→", "↑"]
    policy_chars = [actions[a] for a in policy]
    policy_chars = np.array(policy_chars).reshape(4, 4)
    print(policy_chars)

visualize_policy(q_table)

In [None]:
demo_env = wrap_env(env)

In [None]:
state = demo_env.reset()

if isinstance(state, tuple):
    state = state[0]

done = False
num_steps = 0

while not done:
    action = np.argmax(q_table[state, :])
    step_return = demo_env.step(action)
    if len(step_return) == 4:
        state, reward, done, _ = step_return
    else:
        state, reward, done, _, _ = step_return
    num_steps += 1

demo_env.close()

if reward == 1:
    print(f'The agent reached the treasure in {num_steps} steps!')
else:
    print(f'Game over! The agent fell into a hole after {num_steps} steps!')
show_video()

All right! Looks like we've just trained our first AI agent, congratulations!

Feel free to play around with the environments, and look at some of the example agents implemented in the [pratical RL course](https://github.com/VinF/practical_sessions_RL/tree/main) for inspiration!

Happy hacking!