<a href="https://colab.research.google.com/github/QuantumQist/Hands_on_RL_Lapan/blob/main/ch5_tabular_learning/ch5_frozenLake.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Adjusted source code to chapter 5 of the book**

In [27]:
#import gym # FIX BELOW + adjust for libraries available in Colab
try:
  import gymnasium as gym
except:
  !pip install gymnasium
  import gymnasium as gym
import collections
#from tensorboardX import SummaryWriter # FIX BELOW
from torch.utils.tensorboard import SummaryWriter

# ENV_NAME = "FrozenLake-v0" # Environment deprecated, FIX BELOW
ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
TEST_EPISODES = 20
# Add reward threshold beyond which training is completed
# Original code = 0.8
REWARD_THRESHOLD = 0.9

**Fixes in the code below**

1. `self.env.step(action)` (line 12) returns 5 outputs - [observation (ObsType), reward (SupportsFloat), terminated (bool), truncated (bool), info (dict)]. Original code assumes 4 outputs, we keep first 3 of them.
2. `self.env.reset()` (as in line 4) returns a tuple with [observation (ObsType), info (dictionary)]. We will keep only the first element to make it compatible with the rest of the code.

The book chapter descripes two classes of RL agent based on
* Value iteration
* Q-Learning

We implement both agents here in a single notebook

In [28]:
class Agent_V:
    """
    RL agent based on value iteration
    """
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state = self.env.reset()[0] #Fixed from `self.state = self.env.reset()`
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            #new_state, reward, is_done, _ = self.env.step(action)
            new_state, reward, is_done, _, _ = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward
            self.transits[(self.state, action)][new_state] += 1
            # Fix: in the line below we changed
            # `self.env.reset()` -> `self.env.reset()[0]`
            self.state = self.env.reset()[0] if is_done else new_state

    def calc_action_value(self, state, action):
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            action_value += (count / total) * (reward + GAMMA * self.values[tgt_state])
        return action_value

    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()[0] # Fixed from `state = env.reset()`
        while True:
            action = self.select_action(state)
            # new_state, reward, is_done, _ = env.step(action) # Fixed below
            new_state, reward, is_done, _, _ = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [self.calc_action_value(state, action)
                            for action in range(self.env.action_space.n)]
            self.values[state] = max(state_values)


In [29]:
class Agent_Q:
    """
    RL agent based on Q learning
    """
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state = self.env.reset()[0] #Fixed from `self.state = self.env.reset()`
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            #new_state, reward, is_done, _ = self.env.step(action)
            new_state, reward, is_done, _, _ = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward
            self.transits[(self.state, action)][new_state] += 1
            # Fix: in the line below we changed
            # `self.env.reset()` -> `self.env.reset()[0]`
            self.state = self.env.reset()[0] if is_done else new_state

    def calc_action_value(self, state, action):
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            action_value += (count / total) * (reward + GAMMA * self.values[tgt_state])
        return action_value

    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()[0] # Fixed from `state = env.reset()`
        while True:
            action = self.select_action(state)
            # new_state, reward, is_done, _ = env.step(action) # Fixed below
            new_state, reward, is_done, _, _ = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                action_value = 0.0
                target_counts = self.transits[(state, action)]
                total = sum(target_counts.values())
                for tgt_state, count in target_counts.items():
                    reward = self.rewards[(state, action, tgt_state)]
                    best_action = self.select_action(tgt_state)
                    action_value += (count / total) * (reward + GAMMA * self.values[(tgt_state, best_action)])
                self.values[(state, action)] = action_value

Create a function that trains an agent.

In [30]:
def train_agent(agent_type: str,
                env_name: str = ENV_NAME,
                reward_threshold: float = REWARD_THRESHOLD):
    """
    Trains an agent.

    Parameters:
    - agent_type: str
        Type of agent. Must be either "V" or "Q"
        V - Value iteration
        Q - Q-learning
    - env_name: str
        Environment name. Example: "FrozenLake-v1"
    - reward_threshold: float
        Reward threshold beyond which training is completed.
    The default values for `env_name` and `reward_threshold` are given
    in the beginning of the notebook as hyperparameters.

    Returns a class with the trained agent.
    """
    # Setup the agent
    assert agent_type in ["V", "Q"], "`agent` must be either `V` or `Q`"
    agent = Agent_V() if agent_type=="V" else Agent_Q()
    # Create the envoronment
    test_env = gym.make(ENV_NAME)
    # Train the agent
    iter_no = 0
    best_reward = 0.0
    while True:
        iter_no += 1
        agent.play_n_random_steps(100)
        agent.value_iteration()

        reward = 0.0
        for _ in range(TEST_EPISODES):
            reward += agent.play_episode(test_env)
        reward /= TEST_EPISODES
        if reward > best_reward:
            print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
            best_reward = reward
        if reward > reward_threshold:
            print("Solved in %d iterations!" % iter_no)
            break

    test_env.close()

    return agent

**Train both agents**

In [31]:
print("Training V-Agent")
v_agent = train_agent("V")
print("--------------\nTraining Q-Agent")
q_agent = train_agent("Q")

Training V-Agent
Best reward updated 0.000 -> 0.150
Best reward updated 0.150 -> 0.250
Best reward updated 0.250 -> 0.400
Best reward updated 0.400 -> 0.450
Best reward updated 0.450 -> 0.550
Best reward updated 0.550 -> 0.700
Best reward updated 0.700 -> 0.850
Best reward updated 0.850 -> 1.000
Solved in 29 iterations!
--------------
Training Q-Agent
Best reward updated 0.000 -> 0.500
Best reward updated 0.500 -> 0.550
Best reward updated 0.550 -> 0.600
Best reward updated 0.600 -> 0.650
Best reward updated 0.650 -> 0.800
Best reward updated 0.800 -> 0.850
Best reward updated 0.850 -> 0.950
Solved in 48 iterations!


Setup a function to visualize the trained agents.

In [32]:
def create_video(agent, folder_name: str = "recording", env_name: str = ENV_NAME):
    """
    Creates a video of a working trained agent.

    Parameters:
    - agent
        Class with the trained agent
    - folder_name: str, default value is "recording"
        Folder where the video will be saved
    - env_name: str
        Environment used to evaluate the agent.
        Default value is specified as hyperparameter in the beginning
        of the notebook.

    Returns nothing.
    """
    # Setup the environment
    video_env = gym.make(env_name, render_mode="rgb_array")
    video_env = gym.wrappers.RecordVideo(video_env, folder_name)
    # Make the recording
    video_env.reset()[0]
    video_env.start_video_recorder()
    agent.play_episode(env = video_env)
    # Close the recording and environment
    video_env.close_video_recorder()
    video_env.close()

Create videos of both agents.

In [33]:
print("Crating videos:\nV-Agent:")
create_video(agent = v_agent, folder_name = "recording_v_agent", env_name = ENV_NAME)
print("-----------\nQ-Agent:")
create_video(agent = q_agent, folder_name = "recording_q_agent", env_name = ENV_NAME)
print("Finished creating videos!")

Crating videos:
V-Agent:
Moviepy - Building video /content/recording_v_agent/rl-video-episode-0.mp4.
Moviepy - Writing video /content/recording_v_agent/rl-video-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/recording_v_agent/rl-video-episode-0.mp4
Moviepy - Building video /content/recording_v_agent/rl-video-episode-0.mp4.
Moviepy - Writing video /content/recording_v_agent/rl-video-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/recording_v_agent/rl-video-episode-0.mp4
-----------
Q-Agent:
Moviepy - Building video /content/recording_q_agent/rl-video-episode-0.mp4.
Moviepy - Writing video /content/recording_q_agent/rl-video-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/recording_q_agent/rl-video-episode-0.mp4
Moviepy - Building video /content/recording_q_agent/rl-video-episode-0.mp4.
Moviepy - Writing video /content/recording_q_agent/rl-video-episode-0.mp4



                                                   

Moviepy - Done !
Moviepy - video ready /content/recording_q_agent/rl-video-episode-0.mp4
Finished creating videos!


