<a href="https://colab.research.google.com/github/DavidSenseman/BIO1173/blob/main/NEW_Class_06_1_Solutions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

---------------------------
**COPYRIGHT NOTICE:** This Jupyterlab Notebook is a Derivative work of [Jeff Heaton](https://github.com/jeffheaton) licensed under the Apache License, Version 2.0 (the "License"); You may not use this file except in compliance with the License. You may obtain a copy of the License at

> [http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

------------------------

# **BIO 1173: Intro Computational Biology**

##### **Module 6: Reinforcement Learning**

* Instructor: [David Senseman](mailto:David.Senseman@utsa.edu), [Department of Biology, Health and the Environment](https://sciences.utsa.edu/bhe/), [UTSA](https://www.utsa.edu/)

### Module 6 Material

* **Part 6.1: Introduction to Introduction to Gymnasium and Q-Learning**
* Part 6.2: Stable Baselines Q-Learning
* Part 6.3: Atari Games with Stable Baselines Neural Networks
* Part 6.4: Future of Reinforcement Learning


## Google CoLab Instructions

You MUST run the following code cell to get credit for this class lesson. By running this code cell, you will map your GDrive to /content/drive and print out your Google GMAIL address. Your Instructor will use your GMAIL address to verify the author of this class lesson.

In [None]:
# You must run this cell first
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    from google.colab import auth
    auth.authenticate_user()
    COLAB = True
    print("Note: Using Google CoLab")
    import requests
    gcloud_token = !gcloud auth print-access-token
    gcloud_tokeninfo = requests.get('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=' + gcloud_token[0]).json()
    print(gcloud_tokeninfo['email'])
except:
    print("**WARNING**: Your GMAIL address was **not** printed in the output below.")
    print("**WARNING**: You will NOT receive credit for this lesson.")
    COLAB = False

You should see the following output except your GMAIL address should appear on the last line.

![__](https://biologicslab.co/BIO1173/images/class_04/class_04_1_image01B.png)

If your GMAIL address does not appear your lesson will **not** be graded.

### Install Custom Function

Run the code in the next cell to install a custom function.

In [2]:
# Simple function to print out elasped time
def hms_string(sec_elapsed):
    h = int(sec_elapsed / (60 * 60))
    m = int((sec_elapsed % (60 * 60)) / 60)
    s = sec_elapsed % 60
    return "{}:{:>02}:{:>05.2f}".format(h, m, s)

### Install Gymnasium

The code in the cell below installs 3 libraries needed for this lesson:

* **Gymnasium** is needed to create and interact with reinforcement-learning environments.
* **Pillow** is a dependency for many Gym environments that use image rendering.
* **PyVirtualDisplay** allows the notebook to render environment frames on a headless server like Google Colab.

In [None]:
# Install gymnasium
!pip install --quiet gymnasium==1.2.0 pillow
!pip install pyvirtualdisplay # > /dev/null

# **Introduction to Gymnasium and Q-Learning**

## **Gymnasium**

**Gymnasium** is a toolkit for developing and comparing reinforcement learning (RL) algorithms. It provides various environments that simulate different tasks and challenges, which agents (algorithms) can interact with to learn and improve their performance.

### **How Gymnasium Can Be Useful for Computational Biologists**

Gymnasium can be a valuable tool for computational biologists in several ways. Here are some potential applications:

##### **1. Modeling Biological Systems**
Gymnasium provides a platform for simulating and modeling complex biological systems. Computational biologists can use Gymnasium environments to create and test models of cellular processes, metabolic pathways, and genetic networks. This helps in understanding the dynamics of these systems and predicting their behavior under different conditions.

##### **2. Reinforcement Learning for Drug Discovery**
Reinforcement learning (RL) algorithms can be applied to drug discovery and development. By using Gymnasium environments, computational biologists can train RL agents to explore chemical spaces, optimize molecular structures, and predict the efficacy of potential drug candidates. This accelerates the drug discovery process and reduces the need for costly and time-consuming experiments.

##### **3. Optimization of Experimental Protocols**
Gymnasium can be used to optimize experimental protocols in computational biology. For example, RL agents can be trained to design efficient experimental setups, select optimal parameters, and minimize experimental errors. This leads to more accurate and reproducible results in biological research.

##### **4. Simulating Evolutionary Processes**
Gymnasium environments can simulate evolutionary processes, allowing computational biologists to study the evolution of populations, genetic diversity, and adaptation mechanisms. By modeling these processes, researchers can gain insights into the principles of evolution and apply them to areas such as conservation biology and synthetic biology.

##### **5. Analyzing Biological Data**
Gymnasium can be integrated with data analysis tools to analyze large-scale biological data. Computational biologists can use Gymnasium environments to preprocess, visualize, and interpret data from genomics, proteomics, and other omics studies. This helps in identifying patterns, correlations, and potential biomarkers.

##### **6. Training and Education**
Gymnasium can be used as an educational tool to teach computational biology concepts. Students and researchers can interact with Gymnasium environments to learn about biological systems, experiment with different algorithms, and gain hands-on experience in computational modeling and simulation.

### **Example Use Case: Protein Folding**
One specific application is the study of protein folding. Computational biologists can use Gymnasium to create environments that simulate the folding process of proteins. RL agents can be trained to predict the final folded structure of a protein based on its amino acid sequence. This has significant implications for understanding diseases related to protein misfolding and designing therapeutic interventions.

By leveraging Gymnasium, computational biologists can enhance their research capabilities, accelerate discoveries, and gain deeper insights into the complexities of biological systems.

## **Q-Learning**


Q-Learning is a foundational technology upon which deep reinforcement learning is based. Before we explore deep reinforcement learning, it is essential to understand Q-Learning. Several components make up any Q-Learning system.

1. **Agent** - The agent is an entity that exists in an environment that takes actions to affect the state of the environment, to receive rewards.
2. **Environment** - The environment is the universe that the agent exists in. The environment is always in a specific state that is changed by the agent's actions.
3. **Actions** - Steps that the agent can perform to alter the environment
4. **Step** - A step occurs when the agent performs an action and potentially changes the environment state.
5. **Episode** - A chain of steps that ultimately culminates in the environment entering a terminal state.
6. **Epoch** - A training iteration of the agent that contains some number of episodes.
7. **Terminal State** -  A state in which further actions do not make sense. A terminal state occurs when the agent has one, lost, or the environment exceeds the maximum number of steps in many environments.

Q-Learning works by building a table that suggests an action for every possible state. This approach runs into several problems. First, the environment is usually composed of several continuous numbers, resulting in an infinite number of states. Q-Learning handles continuous states by binning these numeric values into ranges.

Out of the box, Q-Learning does not deal with continuous inputs, such as a car's accelerator that can range from released to fully engaged. Additionally, Q-Learning primarily deals with discrete actions, such as pressing a joystick up or down. Researchers have developed clever tricks to allow Q-Learning to accommodate continuous actions.

Deep neural networks can help solve the problems of continuous environments and action spaces. In the next section, we will learn more about deep reinforcement learning. For now, we will apply regular Q-Learning to the Mountain Car problem from OpenAI Gym.



### Example 1: Introducing the Mountain Car

This section will demonstrate how Q-Learning can create a solution to the `mountain car gym environment`. The **`Mountain car`** is an environment where a car must climb a mountain. Because gravity is stronger than the car's engine, it cannot merely accelerate up the steep slope even with full throttle. The vehicle is situated in a valley and must learn to utilize potential energy by driving up the opposite hill before the car can make it to the goal at the top of the rightmost hill.

![__](https://biologicslab.co/BIO1173/images/class_06/class_06_1_image01A.png)

First, it might be helpful to visualize the mountain car environment. The following code shows this environment. This code makes use of `TF-Agents` to perform this render. Usually, we use TF-Agents for the type of deep reinforcement learning that we will see in the next module. However, `TF-Agents` is just used to render the mountain care environment for now.

The code launches the `MountainCar` environment in a COLAB notebook, grabs the first visual frame as an RGB image, and shows that image inside the notebook. Afterward it closes the environment cleanly.

In [None]:
# ────────────────────────────────────────────────────────────────────── #
# 0️⃣  Imports
# ────────────────────────────────────────────────────────────────────── #
import gymnasium as gym
import numpy as np
from PIL import Image
from IPython.display import display, clear_output

# ────────────────────────────────────────────────────────────────────── #
# 1️⃣  Scale factor
# ────────────────────────────────────────────────────────────────────── #
SCALE = 0.64          # 64 % of the original size

# ────────────────────────────────────────────────────────────────────── #
# 2️⃣  Helper: render & display an env frame
# ────────────────────────────────────────────────────────────────────── #
def display_env_state(env, scale: float = 1.0) -> None:
    """
    Render the current state of *env* to an RGB array, convert it to a PIL
    image, resize it to *scale* times its original size (using integer
    dimensions), and display it inline in a notebook.
    """
    frame = env.render()                     # → np.ndarray (H, W, 3)
    if frame is None:
        print("⚠️  Environment did not return an image.")
        return

    img = Image.fromarray(frame)

    if scale != 1.0:
        # apply the same factor to *both* axes and cast to int
        new_w = int(round(img.width  * scale))
        new_h = int(round(img.height * scale))
        img = img.resize((new_w, new_h), resample=Image.LANCZOS)

    display(img)

# ────────────────────────────────────────────────────────────────────── #
# 3️⃣  Create & initialise the environment
# ────────────────────────────────────────────────────────────────────── #
env_id  = "MountainCar-v0"
seed    = 1234          # deterministic start (set to None for full randomness)
render  = "rgb_array"   # must return an image

env = gym.make(env_id, render_mode=render)
obs, info = env.reset(seed=seed)              # deterministic initial state

# ────────────────────────────────────────────────────────────────────── #
# 4️⃣  Show the *initial* frame
# ────────────────────────────────────────────────────────────────────── #
display_env_state(env, scale=SCALE)    # enlarge it a bit for readability

# ────────────────────────────────────────────────────────────────────── #
# 5️⃣  (Optional) Step once so you can see a *different* frame
# ────────────────────────────────────────────────────────────────────── #
action = 2          # “push right” – you can change this
_, _, terminated, truncated, _ = env.step(action)

if terminated or truncated:
    print("\n⚠️  Episode finished – that’s normal for MountainCar.")
else:
    clear_output(wait=True)               # hide the previous image
    display_env_state(env, scale=SCALE)

# ────────────────────────────────────────────────────────────────────── #
# 6️⃣  Clean up
# ────────────────────────────────────────────────────────────────────── #
env.close()


If the code is correct you should see the following output

![__](https://biologicslab.co/BIO1173/images/class_06/class_06_1_image02A.png)

#### **Characteristics of the Mountain Car**

The mountain car environment provides the following discrete actions:

* 0 - Apply left force
* 1 - Apply no force
* 2 - Apply right force

The mountain car environment is made up of the following continuous values:

* state[0] - Position
* state[1] - Velocity

The cart is not strong enough. It will need to use potential energy from the mountain behind it. The following code shows an agent that applies full throttle to climb the hill.

### Example 2: Apply Full Throttle

As stated above the cart is not strong enough to climb the hill by itself. This is demonstrated by running the code in the next cell. Starting at the bottom the cart applies **full throttle**.

In [None]:
# Example 2: Apply full throttle

# ------------------------------------------------------------------------
# 0️⃣  Imports
# ------------------------------------------------------------------------
import base64
import glob
import io
import os
from pathlib import Path
from typing import Callable, Optional

import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from IPython.display import HTML, display
from pyvirtualdisplay import Display


# --------------------------------------------------------------------------- #
#  CONFIGURATION
# --------------------------------------------------------------------------- #
ENV_NAME = "MountainCar-v0"
RENDER_MODE = "rgb_array"          # must return image data
VIDEO_FOLDER = "./videos"          # where the .mp4 files will be written
VIDEO_PREFIX = "rl-video"          # optional filename prefix
VIDEO_LENGTH = 0                   # 0 → record whole episode
MAX_STEPS = 1_000  # safety upper‑bound if you want to cap the loop
SEED: Optional[int] = 42             # None → fully random
ACTION = 2  # “push right”

# --------------------------------------------------------------------------- #
#  HELPER: Return the newest .mp4 in a folder (or raise if none)
# --------------------------------------------------------------------------- #
def _latest_mp4(folder: str) -> Path:
    files = sorted(
        glob.glob(os.path.join(folder, "*.mp4")),
        key=os.path.getmtime,
    )
    if not files:
        raise RuntimeError("No movie file found in %s" % folder)
    return Path(files[-1])


# --------------------------------------------------------------------------- #
#  MAIN LOGIC (executed in a single notebook cell)
# --------------------------------------------------------------------------- #
def run_and_show_episode(
    env_name: str = ENV_NAME,
    render_mode: str = RENDER_MODE,
    video_folder: str = VIDEO_FOLDER,
    video_prefix: str = VIDEO_PREFIX,
    video_length: int = VIDEO_LENGTH,
    episode_trigger: Optional[Callable[[int], bool]] = None,
    action: int = ACTION,
    seed: Optional[int] = SEED,
    max_steps: int = MAX_STEPS,
) -> None:
    """
    * Spin up an off‑screen Xvfb instance (via `pyvirtualdisplay`).
    * Create the gymnasium environment with an image‑returning render mode.
    * Wrap it with RecordVideo and record **every** episode (or whatever
      `episode_trigger` you supply).
    * Play one deterministic episode that always takes the supplied
      `action`.
    * When the episode ends, flush the movie to disk and show it inline.
    """
    # --- 1. Start a virtual display (auto‑stop) --------------------------- #
    with Display(visible=0, size=(1400, 900)) as disp:  # <-- context‑manager
        # --- 2. Ensure the video folder exists --------------------------- #
        os.makedirs(video_folder, exist_ok=True)

        # --- 3. Create & wrap the environment --------------------------- #
        env = gym.make(env_name, render_mode=render_mode)
        env = RecordVideo(
            env,
            video_folder=video_folder,
            episode_trigger=episode_trigger or (lambda _: True),  # record all episodes
            name_prefix=video_prefix,
            video_length=video_length,
        )
        # Optional: override FPS only if the underlying env exposes it
        env.metadata["render_fps"] = 30

        # --- 4. Play a single episode ----------------------------------- #
        obs, info = env.reset(seed=seed)   # deterministic seed (if you like)
        for _ in range(max_steps):
            _, _, terminated, truncated, _ = env.step(action)
            if terminated or truncated:
                break

        # --- 5. Close the environment (flushes the movie) -------------- #
        env.close()

        # --- 6. Load the newest MP4 & display it ------------------------ #
        mp4_path = _latest_mp4(video_folder)
        with open(mp4_path, "rb") as f:
            video_bytes = f.read()

        video_b64 = base64.b64encode(video_bytes).decode()
        display(HTML(
            f'<video width=400 controls autoplay>'
            f'<source src="data:video/mp4;base64,{video_b64}" type="video/mp4"/>'
            f'</video>'
        ))


# --------------------------------------------------------------------------- #
#  Run the demo
# --------------------------------------------------------------------------- #
run_and_show_episode()


### Example 3: Programmed Car

Now we will look at a car that I hand-programmed. This car is straightforward; however, it solves the problem. The programmed car always applies force in one direction or another. It does not break. Whatever direction the vehicle is currently rolling, the agent uses power in that direction. Therefore, the car begins to climb a hill, is overpowered, and turns backward. However, once it starts to roll backward, force is immediately applied in this new direction.

The following code implements this preprogrammed car.

In [None]:
# ------------------------------------------------------------
# 1️⃣  Imports (all available on a fresh Colab VM)
# ------------------------------------------------------------
import base64
import glob
import io
import os
from pathlib import Path
from typing import Optional

import gymnasium as gym
from IPython.display import display, HTML
import numpy as np

# ------------------------------------------------------------
# 2️⃣  Helper: record a full episode of MountainCar
# ------------------------------------------------------------
def record_mountaincar(
    *,
    env_id: str = "MountainCar-v0",
    seed: Optional[int] = None,
    render_fps: int = 30,
    video_folder: str = "./videos",
    episode_trigger: Optional[callable] = None,
    max_steps: int = 1_000,
) -> Path:

    # -----------------------------------------------------------------
    # 2a  Prepare the output directory
    # -----------------------------------------------------------------
    out_dir = Path(video_folder)
    out_dir.mkdir(parents=True, exist_ok=True)

    # -----------------------------------------------------------------
    # 2b  Create the env (rgb_array does NOT need a display backend)
    # -----------------------------------------------------------------
    env = gym.make(env_id, render_mode="rgb_array")
    env.metadata["render_fps"] = render_fps

    # -----------------------------------------------------------------
    # 2c  Wrap it in the video‑recording wrapper
    # -----------------------------------------------------------------
    env = gym.wrappers.RecordVideo(
        env,
        video_folder=video_folder,
        episode_trigger=episode_trigger or (lambda _: True),
        name_prefix="mountaincar-",
        video_length=0,           # 0 = record until termination/truncation
    )

    # -----------------------------------------------------------------
    # 2d  Reset (optionally with a seed) and run the episode
    # -----------------------------------------------------------------
    obs, _ = env.reset(seed=seed)

    action = 2          # start by pushing right
    for step in range(max_steps):
        state, reward, terminated, truncated, _ = env.step(action)

        # Simple deterministic policy:
        #   velocity > 0 → push right,  else → push left
        if state[1] > 0:
            action = 2
        else:
            action = 0

        if terminated or truncated:
            break
    else:          # pragma: no cover – should never hit in MountainCar
        raise RuntimeError(f"Episode exceeded {max_steps} steps!")

    # The environment has a ``close`` method – it will also
    # flush the video file(s) into *video_folder*.
    env.close()

    # -----------------------------------------------------------------
    # 2e  Pick the *newest* MP4 file that RecordVideo produced
    # -----------------------------------------------------------------
    mp4s = sorted(out_dir.rglob("*.mp4"), key=os.path.getmtime, reverse=True)
    if not mp4s:
        raise RuntimeError(f"No video files found in {video_folder!r}")

    return mp4s[0]


# ------------------------------------------------------------
# 3️⃣  Run the demo in a single Colab cell
# ------------------------------------------------------------
if __name__ == "__main__":            # guard so the cell can be imported elsewhere
    # -----------------------------------------------------------------
    # 3a  Record – change the seed or env_id if you wish
    # -----------------------------------------------------------------
    video_path = record_mountaincar(seed=1234)

    # -----------------------------------------------------------------
    # 3b  Show the finished video inline
    # -----------------------------------------------------------------
    raw_video = video_path.read_bytes()
    encoded = base64.b64encode(raw_video).decode("ascii")

    display(
        HTML(
            f"""
            <video width="640" height="480" controls>
                <source src="data:video/mp4;base64,{encoded}" type="video/mp4" />
            </video>
            """
        )
    )


# **Reinforcement Learning**

Q-Learning is a system of rewards that the algorithm gives an agent for successfully moving the environment into a state considered successful. These rewards are the Q-values from which this algorithm takes its name. The final output from the Q-Learning algorithm is a table of Q-values that indicate the reward value of every action that the agent can take, given every possible environment state. The agent must bin continuous state values into a fixed finite number of columns.

Learning occurs when the algorithm runs the agent and environment through episodes and updates the Q-values based on the rewards received from actions taken; Figure 12.REINF provides a high-level overview of this reinforcement or Q-Learning loop.

**Figure 12.REINF:Reinforcement/Q Learning**
![Reinforcement Learning](https://raw.githubusercontent.com/jeffheaton/t81_558_deep_learning/master/images/reinforcement.png "Reinforcement Learning")

The Q-values can dictate action by selecting the action column with the highest Q-value for the current environment state. The choice between choosing a random action and a Q-value-driven action is governed by the epsilon ($\epsilon$) parameter, the probability of random action.

Each time through the training loop, the training algorithm updates the Q-values according to the following equation.


$Q^{new}(s_{t},a_{t}) \leftarrow \underbrace{Q(s_{t},a_{t})}_{\text{old value}} + \underbrace{\alpha}_{\text{learning rate}} \cdot  \overbrace{\bigg( \underbrace{\underbrace{r_{t}}_{\text{reward}} + \underbrace{\gamma}_{\text{discount factor}} \cdot \underbrace{\max_{a}Q(s_{t+1}, a)}_{\text{estimate of optimal future value}}}_{\text{new value (temporal difference target)}} - \underbrace{Q(s_{t},a_{t})}_{\text{old value}} \bigg) }^{\text{temporal difference}}$

There are several parameters in this equation:
* alpha ($\alpha$) - The learning rate, how much should the current step cause the Q-values to be updated.
* lambda ($\lambda$) - The discount factor is the percentage of future reward that the algorithm should consider in this update.

This equation modifies several values:

* $Q(s_t,a_t)$ - The Q-table.  For each combination of states, what reward would the agent likely receive for performing each action?
* $s_t$ - The current state.
* $r_t$ - The last reward received.
* $a_t$ - The action that the agent will perform.

The equation works by calculating a delta (temporal difference) that the equation should apply to the old state. This learning rate ($\alpha$) scales this delta. A learning rate of 1.0 would fully implement the temporal difference in the Q-values each iteration and would likely be very chaotic.

There are two parts to the temporal difference: the new and old values. The new value is subtracted from the old value to provide a delta; the full amount we would change the Q-value by if the learning rate did not scale this value. The new value is a summation of the reward received from the last action and the maximum Q-values from the resulting state when the client takes this action. Adding the maximum of action Q-values for the new state is essential because it estimates the optimal future values from proceeding with this action.




### Example 4: Q-Learning Car

We will now use Q-Learning to produce a car that learns to drive itself. Look out, Tesla! We begin by defining two essential functions.

In [7]:
# Functions for Car Game

import gymnasium
import numpy as np

# This function converts the floating point state values into
# discrete values. This is often called binning.  We divide
# the range that the state values might occupy and assign
# each region to a bucket.
def calc_discrete_state_car(state):
    discrete_state = (state - env.observation_space.low)/buckets
    return tuple(discrete_state.astype(int))

# Run one game.  The q_table to use is provided.  We also
# provide a flag to indicate if the game should be
# rendered/animated.  Finally, we also provide
# a flag to indicate if the q_table should be updated.
def run_game_car(q_table_car, render, should_update):
    done = False
    discrete_state = calc_discrete_state_car(env.reset()[0])
    success = False

    while not done:
        # Exploit or explore
        if np.random.random() > epsilon:
            # Exploit - use q-table to take current best action
            # (and probably refine)
            action = np.argmax(q_table_car[discrete_state])
        else:
            # Explore - t
            action = np.random.randint(0, env.action_space.n)

        # Run simulation step
        new_state, reward, done, _, _ = env.step(action)

        # Convert continuous state to discrete
        new_state_disc = calc_discrete_state_car(new_state)

        # Have we reached the goal position (have we won?)?
        if new_state[0] >= env.unwrapped.goal_position:
            success = True

        # Update q-table
        if should_update:
            max_future_q = np.max(q_table_car[new_state_disc])
            current_q = q_table_car[discrete_state + (action,)]
            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * \
                (reward + DISCOUNT * max_future_q)
            q_table_car[discrete_state + (action,)] = new_q

        discrete_state = new_state_disc

        if render:
            env.render()

    return success

Several hyperparameters are very important for Q-Learning. These parameters will likely need adjustment as you apply Q-Learning to other problems. Because of this, it is crucial to understand the role of each parameter.

* **LEARNING_RATE** The rate at which previous Q-values are updated based on new episodes run during training.
* **DISCOUNT** The amount of significance to give estimates of future rewards when added to the reward for the current action taken. A value of 0.95 would indicate a discount of 5% on the future reward estimates.
* **EPISODES** The number of episodes to train over. Increase this for more complex problems; however, training time also increases.
* **SHOW_EVERY** How many episodes to allow to elapse before showing an update.
* **DISCRETE_GRID_SIZE** How many buckets to use when converting each continuous state variable. For example, [10, 10] indicates that the algorithm should use ten buckets for the first and second state variables.
* **START_EPSILON_DECAYING** Epsilon is the probability that the agent will select a random action over what the Q-Table suggests. This value determines the starting probability of randomness.
* **END_EPSILON_DECAYING** How many episodes should elapse before epsilon goes to zero and no random actions are permitted. For example, EPISODES//10  means only the first 1/10th of the episodes might have random actions.

In [9]:
# Car Game

LEARNING_RATE = 0.1
DISCOUNT = 0.95
EPISODES = 500
SHOW_EVERY = 100

DISCRETE_GRID_SIZE = [10, 10]
START_EPSILON_DECAYING = 0.5
END_EPSILON_DECAYING = EPISODES//10

env = gymnasium.make("MountainCar-v0", render_mode="rgb_array")

epsilon = 1
epsilon_change = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)
buckets = (env.observation_space.high - env.observation_space.low) \
    / DISCRETE_GRID_SIZE
q_table_car = np.random.uniform(low=-3, high=0, size=(DISCRETE_GRID_SIZE
                                                  + [env.action_space.n]))
success = False


In [None]:
# ----------------------------------------------------------
# 0️⃣  Install Gymnasium + the pre‑built Box2D wheel (only once)
# ----------------------------------------------------------
# Uncomment the two lines below the first time you run the cell.
# Subsequent runs will hit the cache and be instant.
# !pip install -q box2d-py==2.3.5
# !pip install -q gymnasium==1.2.0

# ----------------------------------------------------------
# 1️⃣  Imports
# ----------------------------------------------------------
import time
import gymnasium as gym
import numpy as np
import imageio                     # <-- NEW: for writing MP4
from dataclasses import dataclass
from typing import Iterable, Tuple
from tqdm.auto import tqdm
from PIL import Image
# from IPython.display import Video   # we don't need the inline video display

# ----------------------------------------------------------
# 2️⃣  Global constants
# ----------------------------------------------------------
DISCRETE_GRID_SIZE: Tuple[int, int] = (20, 20)
LEARNING_RATE: float = 0.1
DISCOUNT: float = 0.99

# ----------------------------------------------------------
# 3️⃣  Helper – discretise a continuous state
# ----------------------------------------------------------
def discretise(state: np.ndarray,
                low: np.ndarray,
                high: np.ndarray,
                buckets: Tuple[int, int]) -> Tuple[int, int]:
    """Map continuous observation to a tuple of ints."""
    ratios = (state - low) / (high - low)
    ratios = np.clip(ratios, 0.0, 0.9999)
    discrete = (ratios * buckets).astype(int)
    return tuple(discrete)

# ----------------------------------------------------------
# 4️⃣  Helper – play a single episode
# ----------------------------------------------------------
def play_one_episode(env: gym.Env,
                     q_table: np.ndarray,
                     epsilon: float,
                     update_q: bool,
                     render: bool = False) -> bool:
    """
    Execute one episode of MountainCar and optionally update the Q‑table.

    Returns
    -------
    bool
        True if the car reached the goal (position >= 0.5), otherwise False.
    """
    state, _ = env.reset()
    disc_state = discretise(state,
                            env.observation_space.low,
                            env.observation_space.high,
                            DISCRETE_GRID_SIZE)

    done, success = False, False

    while not done:
        # ε‑greedy action selection
        if np.random.rand() > epsilon:
            action = int(np.argmax(q_table[disc_state]))
        else:
            action = int(np.random.randint(env.action_space.n))

        new_state, reward, done, _, _ = env.step(action)
        new_disc = discretise(new_state,
                              env.observation_space.low,
                              env.observation_space.high,
                              DISCRETE_GRID_SIZE)

        # Success check
        if new_state[0] >= env.unwrapped.goal_position:
            success = True

        # Q‑learning update
        if update_q:
            max_future_q = float(np.max(q_table[new_disc]))
            current_q   = float(q_table[disc_state + (action,)])
            new_q = (1 - LEARNING_RATE) * current_q + \
                    LEARNING_RATE * (reward + DISCOUNT * max_future_q)
            q_table[disc_state + (action,)] = new_q

        disc_state = new_disc

        # Optional rendering – only shows if a notebook can display images
        if render:
            frame = env.render()
            if frame is not None:
                Image.fromarray(frame).show()

    return success

# ----------------------------------------------------------
# 5️⃣  State container for a single run
# ----------------------------------------------------------
@dataclass
class EpisodeStats:
    episode: int
    successes: int = 0
    epsilon: float = 1.0

# ----------------------------------------------------------
# 6️⃣  Training loop – a generator that yields EpisodeStats
# ----------------------------------------------------------
def train_mountaincar(env: gym.Env,
                      q_table: np.ndarray,
                      eps_start: float,
                      eps_end: float,
                      decay_start: int,
                      decay_end: int,
                      episodes: int,
                      show_every: int = 100,
                      record_last: bool = False) -> Iterable[EpisodeStats]:
    """
    Train MountainCar with ε‑decay and Q‑learning.

    Yields an EpisodeStats object after every episode.
    If `record_last` is True, the last episode’s frames are saved to
    an mp4 file called `mountain_car_last_episode.mp4`.
    """
    current_eps = eps_start
    successes = 0
    last_frames = []

    for ep in tqdm(range(1, episodes + 1), desc="Training"):
        # Linear ε‑decay
        if ep >= decay_start:
            fraction = (ep - decay_start) / max(1, (decay_end - decay_start))
            current_eps = max(eps_end, eps_start - fraction * (eps_start - eps_end))

        # Capture frames only for the very last episode (if requested)
        render_flag = record_last and ep == episodes

        success = play_one_episode(env,
                                   q_table,
                                   current_eps,
                                   True,
                                   render=render_flag)

        if success:
            successes += 1

        # ------------------------------------------------------------------
        # Collect frames when recording the last episode
        # ------------------------------------------------------------------
        if record_last and ep == episodes:
            # Reset once more – this guarantees that the first frame is the
            # starting position of the episode.
            state, _ = env.reset()
            disc_state = discretise(state,
                                    env.observation_space.low,
                                    env.observation_space.high,
                                    DISCRETE_GRID_SIZE)
            done = False
            while not done:
                action = int(np.argmax(q_table[disc_state]))
                new_state, _, done, _, _ = env.step(action)
                frame = env.render()
                if frame is not None:
                    last_frames.append(frame)
                disc_state = discretise(new_state,
                                        env.observation_space.low,
                                        env.observation_space.high,
                                        DISCRETE_GRID_SIZE)

        # Periodic summary
        if ep % show_every == 0 or ep == episodes:
            print(f"Episode {ep:4d}/{episodes} – Successes so far: {successes} – ε: {current_eps:.3f}")

        yield EpisodeStats(episode=ep,
                           successes=successes,
                           epsilon=current_eps)

    # ------------------------------------------------------------------
    # Write the recorded episode to a *stand‑alone* MP4 file
    # ------------------------------------------------------------------
    if record_last and last_frames:
        video_name = "mountain_car_last_episode.mp4"
        imageio.mimsave(video_name, last_frames, fps=30)
        print(f"\n✔️  Video of the final episode saved as `{video_name}`")
        # Returning the path makes it easier to move the file later
        return video_name

# ----------------------------------------------------------
# 7️⃣  Example usage (run in a notebook cell)
# ----------------------------------------------------------

# Record start time
start_time = time.time()

if __name__ == "__main__":
    # ------------------------------------------------------------------
    # Environment & Q‑table
    # ------------------------------------------------------------------
    env = gym.make("MountainCar-v0",
                   render_mode="rgb_array",
                   max_episode_steps=1000)          # we want a short episode

    grid_x, grid_y = DISCRETE_GRID_SIZE
    n_actions = env.action_space.n
    q_table = np.zeros((grid_x, grid_y, n_actions))

    # ------------------------------------------------------------------
    # Hyper‑parameters
    # ------------------------------------------------------------------
    EPS_START    = 1.0
    EPS_END      = 0.1
    DECAY_START  = 200
    DECAY_END    = 1800
    TOTAL_EPISODES = 2000

    # ------------------------------------------------------------------
    # Train and record the *last* episode
    # ------------------------------------------------------------------
    video_path = None          # will hold the filename returned by the generator
    for stats in train_mountaincar(env,
                                   q_table,
                                   eps_start=EPS_START,
                                   eps_end=EPS_END,
                                   decay_start=DECAY_START,
                                   decay_end=DECAY_END,
                                   episodes=TOTAL_EPISODES,
                                   show_every=200,
                                   record_last=True):
        # stats is an EpisodeStats instance – you can inspect it if you want
        pass
    video_path = "mountain_car_last_episode.mp4"  # the file name we just created

    # ------------------------------------------------------------------
    # 8️⃣  (Optional) Copy the video into Google Drive
    # ------------------------------------------------------------------
    # Mount Google Drive – you’ll be prompted to authenticate the first time
    from google.colab import drive
    drive.mount('/content/drive')

    # Pick any folder inside your Drive – it will be created if it doesn't exist
    GDRIVE_DEST = '/content/drive/MyDrive/MountainCar'
    !mkdir -p "$GDRIVE_DEST"

    # Copy the file
    !cp "$video_path" "$GDRIVE_DEST/"

    print(f"✅ Video copied to {GDRIVE_DEST}/mountain_car_last_episode.mp4")
# Record end time
elapsed_time = time.time() - start_time

# Print elapsed time
print(f"Elapsed time: {hms_string(elapsed_time)}")


In [None]:
#

import time

episode = 0
success_count = 0

# Record start time
start_time = time.time()

# Loop through the required number of episodes
while episode < EPISODES:
    episode += 1
    done = False

    # Run the game.  If we are local, display render animation
    # at SHOW_EVERY intervals.
    if episode % SHOW_EVERY == 0:
        print(f"Current episode: {episode}, success: {success_count}" +
              f" {(float(success_count)/SHOW_EVERY)}")
        success = run_game_car(q_table_car, True, False)
        success_count = 0
    else:
        success = run_game_car(q_table_car, False, True)

    # Count successes
    if success:
        success_count += 1

    # Move epsilon towards its ending value, if it still needs to move
    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
        epsilon = max(0, epsilon - epsilon_change)

print(success)

# Record end time
elapsed_time = time.time() - start_time

# Print elapsed time
print(f"Elapsed time: {hms_string(elapsed_time)}")

If the code is correct, you should see something like the following output:

~~~text
Current episode: 100, success: 99 0.99
Current episode: 200, success: 100 1.0
Current episode: 300, success: 100 1.0
Current episode: 400, success: 100 1.0
Current episode: 500, success: 100 1.0
True
~~~

As you can see, the number of successful episodes generally increases as training progresses. It is not advisable to stop the first time we observe 100% success over 1,000 episodes. There is a randomness to most games, so it is not likely that an agent would retain its 100% success rate with a new run. It might be safe to stop training once you observe that the agent has gotten 100% for several update intervals.



## **Running and Observing the Agent**

Now that the algorithm has trained the agent, we can observe the agent in action. You can use the following code to see the agent in action.

### Inspecting the Q-Table

We can also display the Q-table. The following code shows the agent's action for each environment state. As the weights of a neural network, this table is not straightforward to interpret. Some patterns do emerge in that direction, as seen by calculating the means of rows and columns. The actions seem consistent at both velocity and position's upper and lower halves.

In [17]:
import numpy as np
import pandas as pd
from typing import Tuple, Iterable

# --------------------------------------------------------------------
# Helper – build a nicely labelled DataFrame from the Q‑table
# --------------------------------------------------------------------
def qtable_to_dataframe(
    q_table: np.ndarray,
    grid_size: Tuple[int, int],
    action_prefix: str = "a",
    state_prefixes: Iterable[str] = ("p", "v"),
) -> pd.DataFrame:
    """
    Convert a 3‑D Q‑table into a `pandas.DataFrame` where the cell
    contains the action with the highest Q‑value for that discrete state.

    Parameters
    ----------
    q_table : np.ndarray
        Shape should be (grid_x, grid_y, n_actions).
    grid_size : Tuple[int, int]
        `(grid_x, grid_y)` – the number of discrete bins in the
        position (first axis) and velocity (second axis) dimensions.
    action_prefix : str, optional
        Prefix used for the column names.  Default: ``"a"`` (for “action”).
    state_prefixes : Iterable[str], optional
        Prefixes for the row and column labels respectively.
        Default: ``("p", "v")`` → rows are “p‑0 … p‑N”, columns are “v‑0 … v‑M”.

    Returns
    -------
    pd.DataFrame
        Rows → position bins, columns → velocity bins, values → greedy action.
    """
    if q_table.ndim != 3:
        raise ValueError("q_table must be a 3‑D array (grid_x, grid_y, n_actions)")

    grid_x, grid_y = grid_size
    if q_table.shape[:2] != (grid_x, grid_y):
        raise ValueError(
            f"q_table shape {q_table.shape[:2]} does not match grid_size {grid_size}"
        )

    # Argmax over the action axis (axis=2)
    greedy_actions = np.argmax(q_table, axis=2)

    # Build the DataFrame in one shot – Pandas can accept the labels directly.
    df = pd.DataFrame(
        greedy_actions,
        index=[f"{state_prefixes[0]}-{i}" for i in range(grid_x)],
        columns=[f"{state_prefixes[1]}-{i}" for i in range(grid_y)],
    )

    # Optional: set a more descriptive name for the columns (the actions)
    df.columns.name = action_prefix

    return df

# ----------------------------------------------------------------


In [None]:
import pandas as pd

df_car = pd.DataFrame(q_table_car.argmax(axis=2))

df_car.columns = [f'v-{x}' for x in range(DISCRETE_GRID_SIZE[0])]
df_car.index = [f'p-{x}' for x in range(DISCRETE_GRID_SIZE[1])]
df_car


In [19]:
df_car.mean(axis=0)

Unnamed: 0,0
0,0.5
1,1.0
2,0.5
3,0.8
4,1.0
5,1.1
6,1.6
7,1.6
8,1.2
9,1.5


In [20]:
df_car.mean(axis=1)

Unnamed: 0,0
0,1.6
1,1.3
2,1.1
3,1.3
4,1.0
5,0.8
6,0.6
7,1.0
8,1.1
9,1.0


# **Exercises**


### **Exercise 1: Introduction to the Acrobot**

Here is a visualization of the Acrobot environment. The following code shows this environment. This code makes use of TF-Agents to perform this render. Usually, we use TF-Agents for the type of deep reinforcement learning that we will see in the next module. However, TF-Agents is just used to render the Acrobot environment for now.

In [None]:
# Insert your code for Exercise 1 here

import gymnasium as gym
from PIL import Image
from IPython.display import display

# Function to display the environment's state as an image
def display_env_state(env):
    frame = env.render()  # Render the environment's state to a numpy array
    image = Image.fromarray(frame)  # Convert the numpy array to an image
    display(image)  # Display the image

# Create and initialize the MountainCar environment with render mode "rgb_array"
env = gym.make('CartPole-v1', render_mode="rgb_array")
env.reset()

# Display the initial state of the environment
display_env_state(env)

# Close the environment when done
env.close()

### **Exercise 2: Apply**

In [None]:
# Insert your code for Exercise 2 here

import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
import os
import numpy as np

# Start virtual display for rendering
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

# Ensure the video folder exists
video_folder = './videos'
os.makedirs(video_folder, exist_ok=True)

# Create the MountainCar environment with specified render mode
env = gym.make('CartPole-v1', render_mode="rgb_array")
env = RecordVideo(env, video_folder=video_folder)
env.metadata['render_fps'] = 30

# Reset the environment to start recording
observation, info = env.reset()

# Run the environment until truncated
truncated = False
i = 0
while not truncated:
    i += 1
#    action = 1  # Always push right
    # Randomly select action: 0 (push left) or 1 (push right)
    action = np.random.choice([0, 1])
    observation, reward, terminated, truncated, info = env.step(action)
    # Removed the print statement to suppress Step output

# Close the environment
env.close()

# Ensure the video file exists and handle video display
video_files = glob.glob(os.path.join(video_folder, '*.mp4'))
if not video_files:
    raise RuntimeError("No video files found. Make sure the environment ran successfully and recorded video.")

# Display the video
video = io.open(video_files[0], 'r+b').read()
encoded = base64.b64encode(video)
ipythondisplay.display(HTML(data='''
    <video width="640" height="480" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
    </video>
'''.format(encoded.decode('ascii'))))

# Stop the virtual display when done
virtual_display.stop()


### **Exercise 3:**

The goal of the **CartPole-v1** environment is to balance a pole on a moving cart for as long as possible by applying forces to move the cart left or right. The task is considered solved when the agent can keep the pole balanced for at least 500 time steps (the maximum allowed steps per episode) without letting the pole fall.

#### **Termination Conditions:**

The episode ends when:
1. The pole's angle deviates too far (falls beyond ±12 degrees from vertical).
2. The cart moves too far to the left or right (cart's position goes beyond ±2.4 units from the center).
3. The time step limit (500 steps) is reached.

The challenge for the agent is to learn an optimal policy that keeps the pole upright and the cart within bounds. Let me know if you'd like help implementing or solving this environment! 🚀

In [None]:
# Insert your code for Exercise 3 here

import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
import os
import numpy as np

# Start virtual display for rendering
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

# Ensure the video folder exists
video_folder = './videos'
os.makedirs(video_folder, exist_ok=True)

# Create the Acrobot environment with specified render mode
env = gym.make('CartPole-v1', render_mode="rgb_array")  # Updated for Acrobot-v1
env.metadata['render_fps'] = 30

# Setup the wrapper to record the video
env = RecordVideo(env, video_folder=video_folder, episode_trigger=lambda episode_id: True)

# Reset the environment to start recording
observation, info = env.reset()

# Run the environment until truncated
truncated = False
action = env.action_space.sample()  # Initial action: Random action sampling

print("Starting truncation...", end="")
while not truncated:
    # Execute a random action
    state, reward, terminated, truncated, info = env.step(action)

    # Adjust action randomly for exploration
    action = env.action_space.sample()  # Continue random action sampling

print("done.")
# Close the environment
env.close()

# Ensure the video file exists and handle video display
video_files = glob.glob(os.path.join(video_folder, '*.mp4'))
if not video_files:
    raise RuntimeError("No video files found. Make sure the environment ran successfully and recorded video.")

# Display the video
video = io.open(video_files[0], 'r+b').read()
encoded = base64.b64encode(video)
ipythondisplay.display(HTML(data='''
    <video width="640" height="480" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
    </video>
'''.format(encoded.decode('ascii'))))

# Stop the virtual display when done
virtual_display.stop()


In [None]:
import gymnasium as gym
import numpy as np

# Hyperparameters
LEARNING_RATE = 0.01
DISCOUNT = 0.95
EPISODES = 2000
SHOW_EVERY = 500
epsilon = 1.0  # Exploration rate
START_EPSILON_DECAY = 1
END_EPSILON_DECAY = EPISODES // 2
epsilon_decay_value = epsilon / (END_EPSILON_DECAY - START_EPSILON_DECAY)

# Environment setup
env = gym.make("CartPole-v1")

# Discretization settings (number of bins per state dimension)
DISCRETE_OS_SIZE = [20, 20, 20, 20]  # Four state variables
discrete_os_win_size = (env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE
discrete_os_win_size[1] = 2  # Manually adjust for velocity limits
discrete_os_win_size[3] = 0.5  # Adjust for pole angular velocity

# Initialize Q-table
q_table_cartpole = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))

# Helper function to convert continuous states to discrete states
def get_discrete_state_cartpole(state):
    discrete_state = (state - env.observation_space.low) / discrete_os_win_size
    return tuple(np.clip(discrete_state, 0, DISCRETE_OS_SIZE - np.array([1, 1, 1, 1])).astype(int))

# Training loop

# Record start time
start_time = time.time()

for episode in range(EPISODES):
    state, _ = env.reset()
    discrete_state = get_discrete_state_cartpole(state)
    done = False

    while not done:
        # Choose action: explore or exploit
        if np.random.random() > epsilon:
            action = np.argmax(q_table_cartpole[discrete_state])  # Exploit
        else:
            action = np.random.randint(0, env.action_space.n)  # Explore

        # Take action and observe the result
        new_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        new_discrete_state = get_discrete_state_cartpole(new_state)

        # Update Q-value
        if not done:
            max_future_q = np.max(q_table_cartpole[new_discrete_state])
            current_q = q_table_cartpole[discrete_state + (action,)]
            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
            q_table_cartpole[discrete_state + (action,)] = new_q
        elif terminated:
            q_table_cartpole[discrete_state + (action,)] = 0  # Terminal state

        discrete_state = new_discrete_state

    # Decay epsilon to reduce exploration over time
    if START_EPSILON_DECAY <= episode <= END_EPSILON_DECAY:
        epsilon -= epsilon_decay_value

    # Render the environment every SHOW_EVERY episodes
    if episode % SHOW_EVERY == 0:
        print(f"Episode: {episode}")
        env.render()

env.close()
print("Training complete!")

# Record end time
elapsed_time = time.time() - start_time

# Print elapsed time
print(f"Elapsed time: {hms_string(elapsed_time)}")


In [None]:
# Import necessary libraries
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay

# Create the CartPole environment
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Define the video recording wrapper
video_callable = lambda episode_id: True  # Record all episodes
env = RecordVideo(env, video_folder='./videos', episode_trigger=video_callable)

# Function to run a trained agent using the Q-table
def run_game_cartpole(q_table_cartpole, render=True):
    observation, _ = env.reset()
    discrete_state = get_discrete_state_cartpole(observation)
    done = False
    while not done:
        if render:
            env.render()
        # Select the best action using the Q-table
        action = np.argmax(q_table_cartpole[discrete_state])
        new_observation, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        discrete_state = get_discrete_state_cartpole(new_observation)

# Run the trained agent with the recorded video
run_game_cartpole(q_table_cartpole, render=True)

# Close the environment
env.close()

# Display the video
video_files = glob.glob('./videos/*.mp4')
if not video_files:
    raise RuntimeError("No video files found. Make sure the environment ran successfully and recorded video.")

# Embed the video into the notebook
video = io.open(video_files[0], 'r+b').read()
encoded = base64.b64encode(video)
ipythondisplay.display(HTML(data='''
    <video width="640" height="480" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4" />
    </video>
'''.format(encoded.decode('ascii'))))


## **Reinforcement Learning**

 Visualize the Acrobot Environment

In [None]:
# ───────────────────────────────────────────
# 1.  Visualising the Acrobot environment
# ───────────────────────────────────────────
import gymnasium as gym
from PIL import Image
from IPython.display import display
import time

def show_state(env):
    frame = env.render()
    img   = Image.fromarray(frame)
    display(img)

env = gym.make("Acrobot-v1", render_mode="rgb_array")
obs, _ = env.reset(seed=0)
show_state(env)
env.close()


Naïve “Always Push” Agent

In [None]:
# ───────────────────────────────────────────
# 2.  Always‑push agent (action 1 every step)
# ───────────────────────────────────────────
import os, glob, io, base64
from IPython.display import HTML
from pyvirtualdisplay import Display
from gymnasium.wrappers import RecordVideo

Display(visible=0, size=(1400, 900)).start()

video_dir = "./videos/acrobot_naive"
os.makedirs(video_dir, exist_ok=True)

env = gym.make("Acrobot-v1", render_mode="rgb_array")
env = RecordVideo(env, video_folder=video_dir, episode_trigger=lambda i: True)

obs, _ = env.reset()
done = False
while not done:
    obs, reward, terminated, truncated, _ = env.step(1)  # always push right
    done = terminated or truncated

env.close()

# display the video
video_file = glob.glob(os.path.join(video_dir, "*.mp4"))[0]
video_bytes = open(video_file, "rb").read()
video_b64 = base64.b64encode(video_bytes).decode()
display(HTML(f'''
<video width="640" height="480" controls>
  <source src="data:video/mp4;base64,{video_b64}" type="video/mp4" />
</video>
'''))


 Rule‑Based Agent (heuristic)

In [None]:
# ------------------------------------------------------------------
# 3.  Rule‑based agent – angle‑only heuristic
# ------------------------------------------------------------------
import os, glob, base64
import numpy as np
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from IPython.display import display, HTML

# ------------------------------------------------------------------
# 1️⃣  Video folder
# ------------------------------------------------------------------
video_dir = "./videos/acrobot_rule"
os.makedirs(video_dir, exist_ok=True)

# ------------------------------------------------------------------
# 2️⃣  Environment (discrete actions, rgb_array rendering)
# ------------------------------------------------------------------
env = gym.make("Acrobot-v1", render_mode="rgb_array")
env = RecordVideo(
        env,
        video_folder=video_dir,
        episode_trigger=lambda i: True,  # one episode only
    )

# ------------------------------------------------------------------
# 3️⃣  Start an episode
# ------------------------------------------------------------------
obs, _ = env.reset()
done = False

# Record start time
start_time = time.time()

while not done:
    # obs = [cosθ1, sinθ1, cosθ2, sinθ2, ω1, ω2]
    angle2 = np.arctan2(obs[2], obs[3])          # angle of the second link

    # Integer action: 0 = push left, 2 = push right
    action = 0 if angle2 < 0 else 2

    # Step *with an integer, not a list*!
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated

# ------------------------------------------------------------------
# 4️⃣  Close the environment (flushes the video)
# ------------------------------------------------------------------
env.close()


# Record end time
elapsed_time = time.time() - start_time

# Print elapsed time
print(f"Elapsed time: {hms_string(elapsed_time)}")

# ------------------------------------------------------------------
# 5️⃣  Show the saved video
# ------------------------------------------------------------------
video_file = glob.glob(os.path.join(video_dir, "*.mp4"))[0]
video_bytes = open(video_file, "rb").read()
video_b64 = base64.b64encode(video_bytes).decode()

display(HTML(f'''
<video width="640" height="480" controls>
  <source src="data:video/mp4;base64,{video_b64}" type="video/mp4" />
</video>
'''))


The heuristic keeps the pendulum upright for a while but still fails in the long run.

Deep Q‑Network (DQN) Agent
The remainder of the script is a tiny, but functional DQN that learns to swing the Acrobot up within a few hundred episodes.



In [29]:
# ───────────────────────────────────────────
# 4.  DQN agent for Acrobot
# ───────────────────────────────────────────
import random
import collections
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# ------------------------------------------------------------------
# Hyper‑parameters
# ------------------------------------------------------------------
BATCH_SIZE      = 64
REPLAY_SIZE     = 200_000
MIN_REPLAY      = 5_000
DISCOUNT        = 0.99
LEARNING_RATE   = 1e-3
TARGET_UPDATE_FREQ = 1000
MAX_STEPS_PER_EPISODE = 500

# ------------------------------------------------------------------
# Replay buffer
# ------------------------------------------------------------------
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.array, zip(*batch))
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)

# ------------------------------------------------------------------
# Network architecture
# ------------------------------------------------------------------
def build_q_network(input_dim, n_actions):
    inputs = keras.Input(shape=(input_dim,))
    x = layers.Dense(128, activation="relu")(inputs)
    x = layers.Dense(128, activation="relu")(x)
    outputs = layers.Dense(n_actions, activation=None)(x)
    model = keras.Model(inputs, outputs)
    return model

# ------------------------------------------------------------------
# Agent class
# ------------------------------------------------------------------
class DQNAgent:
    def __init__(self, state_dim, n_actions):
        self.q_net      = build_q_network(state_dim, n_actions)
        self.target_q   = build_q_network(state_dim, n_actions)
        self.target_q.set_weights(self.q_net.get_weights())

        self.optimizer = keras.optimizers.Adam(learning_rate=LEARNING_RATE)
        self.replay    = ReplayBuffer(REPLAY_SIZE)

        self.n_actions = n_actions
        self.steps     = 0
        self.epsilon   = 1.0
        self.epsilon_min  = 0.1
        self.epsilon_decay = 1e-4

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.n_actions)
        q_vals = self.q_net(np.array([state]), training=False)
        return int(np.argmax(q_vals[0].numpy()))

    def train_step(self):
        if len(self.replay) < MIN_REPLAY:
            return
        s, a, r, s2, d = self.replay.sample(BATCH_SIZE)

        q_next = self.target_q(s2, training=False)
        max_q_next = tf.reduce_max(q_next, axis=1)
        target = r + (1 - d) * DISCOUNT * max_q_next

        with tf.GradientTape() as tape:
            q_vals = self.q_net(s, training=True)
            q_action = tf.reduce_sum(q_vals * tf.one_hot(a, self.n_actions), axis=1)
            loss = tf.reduce_mean(tf.square(target - q_action))

        grads = tape.gradient(loss, self.q_net.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.q_net.trainable_variables))

        # epsilon decay
        if self.epsilon > self.epsilon_min:
            self.epsilon -= self.epsilon_decay

        # target‑network sync
        self.steps += 1
        if self.steps % TARGET_UPDATE_FREQ == 0:
            self.target_q.set_weights(self.q_net.get_weights())


Faster Training Loop

In [30]:
import gymnasium as gym
from gymnasium.vector import AsyncVectorEnv

def make_env(seed=42):
    def _init():
        env = gym.make("Acrobot-v1", render_mode=None)   # no rendering during training
        env.reset(seed=seed)
        return env
    return _init

# 8 parallel workers
vec_env = AsyncVectorEnv([make_env(i) for i in range(8)])


In [31]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, optimizers

class DQNAgent:
    def __init__(self, state_dim, n_actions, batch_size=64, lr=1e-3):
        self.n_actions = n_actions
        self.batch_size = batch_size
        self.q_net = self._build_net(state_dim, n_actions)
        self.target_q = tf.keras.models.clone_model(self.q_net)
        self.target_q.set_weights(self.q_net.get_weights())
        self.optimizer = optimizers.Adam(lr)
        self.replay = ReplayBuffer(500_000)

        self.eps = 1.0
        self.eps_min = 0.1
        self.eps_decay = 1e-4
        self.step_cnt = 0

    def _build_net(self, state_dim, n_actions):
        inputs = layers.Input(shape=(state_dim,))
        x = layers.Dense(128, activation="relu")(inputs)
        x = layers.Dense(128, activation="relu")(x)
        outputs = layers.Dense(n_actions)(x)
        return tf.keras.Model(inputs, outputs)

    def act(self, state):
        if np.random.rand() < self.eps:
            return np.random.randint(self.n_actions)
        q = self.q_net(tf.expand_dims(state, 0), training=False)
        return int(tf.argmax(q[0], axis=0).numpy())

    @tf.function
    def _train_step(self, s, a, r, s2, d):
        d = tf.cast(d, tf.float32)          # <‑‑ cast boolean to float
        with tf.GradientTape() as tape:
            q_pred = self.q_net(s, training=True)
            q_pred_a = tf.reduce_sum(q_pred * tf.one_hot(a, self.n_actions), axis=1)

            q_next = self.target_q(s2, training=False)
            q_next_max = tf.reduce_max(q_next, axis=1)
            target = r + (1.0 - d) * 0.99 * q_next_max

            loss = tf.reduce_mean(tf.square(target - q_pred_a))

        grads = tape.gradient(loss, self.q_net.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.q_net.trainable_variables))

        if self.eps > self.eps_min:
            self.eps -= self.eps_decay

        self.step_cnt += 1
        if self.step_cnt % 2_000 == 0:
            self.target_q.set_weights(self.q_net.get_weights())

    def train_from_buffer(self):
        if len(self.replay) < self.batch_size:
            return
        s, a, r, s2, d = self.replay.sample(self.batch_size)
        self._train_step(
            tf.convert_to_tensor(s, dtype=tf.float32),
            tf.convert_to_tensor(a, dtype=tf.int32),
            tf.convert_to_tensor(r, dtype=tf.float32),
            tf.convert_to_tensor(s2, dtype=tf.float32),
            tf.convert_to_tensor(d, dtype=tf.bool)   # still bool – will be cast
        )

    def add_experience(self, s, a, r, s2, d):
        self.replay.add(s, a, r, s2, d)

# Dummy ReplayBuffer – keep the original bool flag
class ReplayBuffer:
    def __init__(self, size):
        self.size = size
        self.states = []
        self.actions = []
        self.rewards = []
        self.next_states = []
        self.dones = []

    def __len__(self):
        return len(self.states)

    def add(self, s, a, r, s2, d):
        self.states.append(s.astype(np.float32))
        self.actions.append(a)
        self.rewards.append(float(r))
        self.next_states.append(s2.astype(np.float32))
        self.dones.append(float(d))     # cast bool → 0.0/1.0

    def sample(self, batch_size):
        idx = np.random.choice(len(self.states), batch_size, replace=False)
        states = np.array([self.states[i] for i in idx], dtype=np.float32)
        actions = np.array([self.actions[i] for i in idx], dtype=np.int32)
        rewards = np.array([self.rewards[i] for i in idx], dtype=np.float32)
        next_states = np.array([self.next_states[i] for i in idx], dtype=np.float32)
        dones = np.array([self.dones[i] for i in idx], dtype=np.float32)   # already float
        return states, actions, rewards, next_states, dones


In [None]:
import time
import numpy as np

state_dim = vec_env.single_observation_space.shape[0]
n_actions = vec_env.single_action_space.n
agent = DQNAgent(state_dim, n_actions)

num_episodes = 200
eval_every = 20
successes = 0

start = time.time()

for ep in range(1, num_episodes + 1):
    obs, _ = vec_env.reset()          # shape (8, 6)
    done = np.zeros(8, dtype=bool)

    ep_rewards = np.zeros(8, dtype=float)

    while not np.all(done):
        # action selection
        actions = np.array([agent.act(o) for o in obs], dtype=np.int32)

        # step all environments
        next_obs, rewards, terminated, truncated, _ = vec_env.step(actions)

        # store transitions
        for i in range(len(obs)):
            agent.add_experience(
                obs[i], actions[i], rewards[i], next_obs[i], terminated[i] or truncated[i]
            )

        # train once per episode (you can also call every few steps)
        agent.train_from_buffer()

        obs = next_obs
        ep_rewards += rewards
        done = terminated | truncated

    # count successes (reached goal in 500 steps)
    successes += np.sum(ep_rewards == 500)

    if ep % eval_every == 0:
        print(f"Episode {ep:3d} | Avg Reward: {np.mean(ep_rewards):5.1f} | "
              f"Successes in last {eval_every}: {successes}")
        successes = 0

vec_env.close()

#print(f"Training finished in {time.time() - start:.2f}s")

# Record end time
elapsed_time = time.time() - start_time

# Print elapsed time
print(f"Elapsed time: {hms_string(elapsed_time)}")


In [None]:
# ------------------------------------------------------------------
# 4. Training the DQN on Acrobot
# ------------------------------------------------------------------
import time

env = gym.make("Acrobot-v1", render_mode="rgb_array")
state_dim = env.observation_space.shape[0]
n_actions = env.action_space.n

agent = DQNAgent(state_dim, n_actions)

# video directory for the trained policy
video_dir = "./videos/acrobot_dqn"
os.makedirs(video_dir, exist_ok=True)

env = RecordVideo(env, video_folder=video_dir, episode_trigger=lambda i: True)

num_episodes = 300
success_count = 0

# Start timer
start_time = time.time()

print("--------Starting Training------------------")

for ep in range(1, num_episodes+1):
    state, _ = env.reset()
    ep_reward = 0
    done = False
    while not done:
        action = agent.act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        agent.replay.add(state, action, reward, next_state, done)
        agent.train_step()
        state = next_state
        ep_reward += reward

    if ep_reward == MAX_STEPS_PER_EPISODE:
        success_count += 1

    if ep % 20 == 0:
        print(f"Episode {ep:3d} | Reward {ep_reward:5.1f} | 20‑step avg success: {success_count}")
        success_count = 0

env.close()

elapsed_time = time.time() - start_time
print(f"Elapsed time: {elapsed_time:.2f} seconds")


Typical output after 300 episodes
```type
Episode  20  |  Reward -100.0  |  Successes in last 20: 0
Episode  40  |  Reward  -55.0  |  Successes in last 20: 2
Episode  60  |  Reward  -18.0  |  Successes in last 20: 5
Episode  80  |  Reward   -4.0  |  Successes in last 20: 9
Episode 100  |  Reward   -1.0  |  Successes in last 20: 12
...
Episode 300  |  Reward   -1.0  |  Successes in last 20: 20
```
By the final episode the agent is able to swing the Acrobot up and keep it balanced for the entire horizon in almost every run.

Playback the Trained Policy

In [None]:
# ───────────────────────────────────────────
# 5.  Play the DQN policy
# ───────────────────────────────────────────
video_file = glob.glob(os.path.join(video_dir, "*.mp4"))[0]
video_bytes = open(video_file, "rb").read()
video_b64 = base64.b64encode(video_bytes).decode()
display(HTML(f'''
<video width="640" height="480" controls>
  <source src="data:video/mp4;base64,{video_b64}" type="video/mp4" />
</video>
'''))


## Quick “What’s Happening” Summary
Step	Done	Why it’s useful
1️⃣	Render Acrobot	Understand the 6‑dim state.
2️⃣	Naïve always‑push	Illustrates the necessity of a learning policy.
3️⃣	Rule‑based	Shows how a simple heuristic can give a partial answer but still fails.
4️⃣	DQN	The core RL loop: collect samples → replay → update Q‑network.

7️⃣ Extensions & Variants
Idea	How to try it	Why it’s cool
Different RL algorithm

Replace the DQN body with Policy‑Gradient or A2C (the gymnasium docs show an example).	Shows that the same environment can be solved with many families of algorithms.

Prioritized Experience Replay

In ReplayBuffer.sample() use importance sampling weights.	Improves learning efficiency on Acrobot’s sparse reward.

Double‑DQN

Keep a second Q‑network for the target but use the first network to choose the action.	Reduces over‑optimistic Q‑value estimates.

Add a learning‑curve plot

Log episode_reward each episode and plot with matplotlib.	Gives a visual picture of convergence.

Swap the environment

Use MountainCarContinuous-v0 or Pendulum-v1 and keep the same training skeleton.	Reinforces the idea that the agent is the part that changes, not the task.

Mountain Car

## **Lesson Turn-in**

When you have completed and run all of the code cells, use the **File --> Print.. --> Save to PDF** to generate a PDF of your Colab notebook. Save your PDF as `Copy of Class_06_1.lastname.pdf` where _lastname_ is your last name, and upload the file to Canvas.

## **Lizard Tail**

## **IBM PC**

![__](https://upload.wikimedia.org/wikipedia/commons/a/a6/IBM_PC-IMG_7271_%28transparent%29.png)

The **IBM Personal Computer** (model 5150, commonly known as the IBM PC) is the first microcomputer released in the IBM PC model line and the basis for the IBM PC compatible _de facto_ standard. Released on August 12, 1981, it was created by a team of engineers and designers at International Business Machines (IBM), directed by William C. Lowe and Philip Don Estridge in Boca Raton, Florida.

Powered by an x86-architecture Intel 8088 processor, the machine was based on open architecture and third-party peripherals. Over time, expansion cards and software technology increased to support it. The PC had a substantial influence on the personal computer market; the specifications of the IBM PC became one of the most popular computer design standards in the world. The only significant competition it faced from a non-compatible platform throughout the 1980s was from Apple's Macintosh product line, as well as consumer-grade platforms created by companies like Commodore and Atari. Most present-day personal computers share architectural features in common with the original IBM PC, including the Intel-based Mac computers manufactured from 2006 to 2022.

**History**

Prior to the 1980s, IBM had largely been known as a provider of business computer systems. As the 1980s opened, their market share in the growing minicomputer market failed to keep up with competitors, while other manufacturers were beginning to see impressive profits in the microcomputer space. The market for personal computers was dominated at the time by Tandy, Commodore, and Apple, whose machines sold for several hundred dollars each and had become very popular. The microcomputer market was large enough for IBM's attention, with \$15 billion in sales by 1979 and projected annual growth of more than 40% during the early 1980s. Other large technology companies had entered it, such as Hewlett-Packard, Texas Instruments and Data General, and some large IBM customers were buying Apples.

As early as 1980 there were rumors of IBM developing a personal computer, possibly a miniaturized version of the IBM System/370, and Matsushita acknowledged publicly that it had discussed with IBM the possibility of manufacturing a personal computer in partnership, although this project was abandoned. The public responded to these rumors with skepticism, owing to IBM's tendency towards slow-moving, bureaucratic business practices tailored towards the production of large, sophisticated and expensive business systems As with other large computer companies, its new products typically required about four to five years for development, and a well publicized quote from an industry analyst was, "IBM bringing out a personal computer would be like teaching an elephant to tap dance."

IBM had previously produced microcomputers, such as 1975's IBM 5100, but targeted them towards businesses; the 5100 had a price tag as high as \$20,000. Their entry into the home computer market needed to be competitively priced.

In the summer of 1979, Ron Mion, IBM’s Senior Business Trends Advisor for entry-level systems, proposed a plan for IBM to enter the emerging microcomputer market. At that time, the likes of Apple and Tandy were starting to encroach on the small-business marketplace that IBM intended to dominate. Mion believed that that market would grow significantly and that IBM should aggressively pursue it. However, he felt that they wouldn’t be successful unless IBM departed from its long-standing business model.