<a href="https://colab.research.google.com/github/ThomasWong-ST/Intro-to-RL/blob/main/On_Policy_Control_with_Approximation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

In [None]:
class MountainCar:
    """
    Continuous Mountain Car environment
    with discrete actions: -1 (reverse), 0 (coast), +1 (forward)
    """
    def __init__(self):
        # constants
        self.min_position = -1.2
        self.max_position = 0.6
        self.max_speed = 0.07
        self.goal_position = 0.5

        self.force = 0.001
        self.gravity = 0.0025

        self.actions = [-1, 0, 1]

        self.state = None

    def reset(self):
        """Randomize initial position and set velocity to 0"""
        position = np.random.uniform(-0.6, -0.4)
        velocity = 0.0
        self.state = np.array([position, velocity], dtype=np.float32)
        return self.state

    def step(self, action):
        """
        Update physics for one time step.
        action ∈ {-1, 0, 1}
        """
        position, velocity = self.state

        # physics update
        velocity += (self.force * action) - (self.gravity * np.cos(3 * position))
        velocity = np.clip(velocity, -self.max_speed, self.max_speed)
        position += velocity
        position = np.clip(position, self.min_position, self.max_position)

        # if car hits left bound, stop it
        if position <= self.min_position and velocity < 0:
            velocity = 0.0

        # check terminal
        done = bool(position >= self.goal_position)

        # reward: -1 per step until goal
        reward = -1.0

        self.state = np.array([position, velocity], dtype=np.float32)
        return self.state, reward, done, {}


In [None]:
env = MountainCar()
s = env.reset()
for t in range(10):
    a = np.random.choice(env.actions)
    s_next, r, done, _ = env.step(a)
    print(f"Step {t}: state={s_next}, reward={r}, done={done}")
    if done:
        break

Step 0: state=[-0.49008355 -0.00126034], reward=-1.0, done=False
Step 1: state=[-0.49259484 -0.00251128], reward=-1.0, done=False
Step 2: state=[-0.49533832 -0.00274348], reward=-1.0, done=False
Step 3: state=[-0.4972935  -0.00195518], reward=-1.0, done=False
Step 4: state=[-0.49944577 -0.00215226], reward=-1.0, done=False
Step 5: state=[-0.50077903 -0.00133325], reward=-1.0, done=False
Step 6: state=[-0.5022833  -0.00150427], reward=-1.0, done=False
Step 7: state=[-0.5049473  -0.00266402], reward=-1.0, done=False
Step 8: state=[-0.50875115 -0.00380384], reward=-1.0, done=False
Step 9: state=[-0.51366633 -0.00491516], reward=-1.0, done=False


In [None]:
numtilings = 8
iht = IHT(4096)
get_tiles(0, 1, 1)

"""
Each integer in this list is the index corresponding
to the specific tile the state [-0.5, 0.0] falls into
within each of the 8 tilings, considering the action 1.
This list of 8 indices is the feature representation
x(s, a) for that specific state-action pair.
"""

<bound method MountainCar.reset of <__main__.MountainCar object at 0x7a0f645d09e0>>


[0, 1, 2, 3, 4, 5, 6, 7]

In [None]:
def get_tiles(position, velocity, action):
  """
  Calculates the active tile indices for a given state and action.
  """
  # Scale the continuous state variables (as discussed)
  min_pos, max_pos = -1.2, 0.6
  min_vel, max_vel = -0.07, 0.07

  scaled_pos = (position - min_pos) / (max_pos - min_pos) * numtilings
  scaled_vel = (velocity - min_vel) / (max_vel - min_vel) * numtilings

  # The 'tiles' function expects lists for floats and ints
  float_features = [scaled_pos, scaled_vel]
  int_features = [action] # Include the discrete action

  # Get the indices of the active tiles from the tile coder
  # We pass iht, numtilings, the scaled floats, and the action as an int feature
  active_tiles = tiles(iht, numtilings, float_features, int_features)

  return active_tiles

def get_q_value(state_tiles, w):
  """
  Calculates q_hat(s, a, w) by summing weights for active tiles.
  state_tiles: The list/array of active tile indices for a state-action pair.
  w: The weight vector.
  """
  # The value is the sum of weights corresponding to active tiles
  return np.sum(w[state_tiles])

def choose_action(position, velocity, w, epsilon=0.1):
  """
  Chooses an action using an epsilon-greedy policy.
  """
  possible_actions = [-1, 0, 1]
  q_values = []
  for action in possible_actions:
    tiles_for_action = get_tiles(position, velocity, action)
    q_values.append(get_q_value(tiles_for_action, w))

  if np.random.rand() < epsilon:
    # Exploration: choose a random action
    return np.random.choice(possible_actions)
  else:
    # Exploitation: choose the action with the highest q-value
    best_action_index = np.argmax(q_values)
    return possible_actions[best_action_index]

In [None]:
"""
Size: The vector x(s, a) has a large number of components, one for every possible tile index (4096 in our example).

Activation: For a specific state s and action a, the get_tiles function tells us which few tiles are "active" (e.g., it gives us a list of 8 indices).

Binary & Sparse: The feature vector x(s, a) is binary (contains only 0s and 1s) and sparse (mostly 0s).

It has a 1 at the component corresponding to each active tile index found by get_tiles.

It has a 0 at all other components.

So, for any given state-action pair, x(s, a) is a long vector with exactly numtilings (8 in our case) components equal to 1,
and all the rest (4096 - 8 = 4088) equal to 0. This sparsity is what makes calculating the action value q̂(s, a, w) efficient.
"""

In [None]:
def semi_gradient_sarsa_mountain_car(env, num_episodes=1000, max_steps_per_episode=500,
                                    gamma=1.0, epsilon=0.1):
    """
    Implements Episodic Semi-gradient Sarsa for MountainCar-v0 using Tile Coding.
    alpha: Step size, often scaled by the number of active features (tilings).
    """
    w = np.zeros(4096) # Initialize weights to zero
    numtilings = 8
    alpha=0.1/numtilings

    episode_rewards = [] # To track learning progress

    for ep in range(num_episodes):
        state = env.reset()
        position, velocity = state
        action = choose_action(position, velocity, w, epsilon)
        active_tiles_current = get_tiles(position, velocity, action)

        current_episode_reward = 0
        steps = 0

        while steps < max_steps_per_episode:
            next_state, reward, done, _ = env.step(action)
            next_position, next_velocity = next_state
            current_episode_reward += reward

            if done:
                # Calculate TD error for terminal state (next q-value is 0)
                q_value_current = get_q_value(active_tiles_current, w)
                td_error = reward - q_value_current

                # Update weights for the last state-action pair
                w[active_tiles_current] += alpha * td_error
                # No gradient calculation needed as it's just 1 for active tiles
                break # End episode
                print(steps)

            # Choose the next action A' based on the next state S'
            next_action = choose_action(next_position, next_velocity, w, epsilon)
            active_tiles_next = get_tiles(next_position, next_velocity, next_action)

            # Calculate TD error for non-terminal state
            q_value_current = get_q_value(active_tiles_current, w)
            q_value_next = get_q_value(active_tiles_next, w)
            td_target = reward + gamma * q_value_next
            td_error = td_target - q_value_current

            # Update weights w using the active tiles for the *current* state-action pair
            # The gradient is implicitly handled by only updating active tiles
            w[active_tiles_current] += alpha * td_error

            # Move to the next state and action
            state = next_state
            action = next_action
            active_tiles_current = active_tiles_next # Use the tiles for the *next* step

            steps += 1

        episode_rewards.append(current_episode_reward)
        if (ep + 1) % 100 == 0:
            print(f"Episode {ep + 1}/{num_episodes} completed. Average reward (last 100): {np.mean(episode_rewards[-100:]):.2f}")

    return w, episode_rewards

def generate_episode_with_learned_policy(env, w):
    """
    Generates an episode using the greedy policy derived from weights w,
    and returns the trajectory and number of steps.
    """
    episode_trajectory = [] # List to store (state, action, reward) tuples
    state = env.reset()
    done = False
    steps = 0
    max_steps_eval = 1000 # Put a limit to prevent infinite loops if policy is bad

    while not done and steps < max_steps_eval:
        position, velocity = state
        # Choose action greedily (epsilon = 0)
        action = choose_action(position, velocity, w, epsilon=0)

        next_state, reward, done, _ = env.step(action)

        episode_trajectory.append({"state": state, "action": action, "reward": reward})

        state = next_state
        steps += 1

    return episode_trajectory, steps

In [None]:
env = MountainCar()
numtilings = 8
iht = IHT(4096)
w, rewards = semi_gradient_sarsa_mountain_car(env, num_episodes=1000)

Episode 100/1000 completed. Average reward (last 100): -340.73
Episode 200/1000 completed. Average reward (last 100): -196.95
Episode 300/1000 completed. Average reward (last 100): -163.51
Episode 400/1000 completed. Average reward (last 100): -149.48
Episode 500/1000 completed. Average reward (last 100): -147.48
Episode 600/1000 completed. Average reward (last 100): -139.59
Episode 700/1000 completed. Average reward (last 100): -139.57
Episode 800/1000 completed. Average reward (last 100): -141.89
Episode 900/1000 completed. Average reward (last 100): -131.72
Episode 1000/1000 completed. Average reward (last 100): -125.60


In [None]:
generate_episode_with_learned_policy(env, w)

([{'state': array([-0.47911423,  0.        ], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.48044688, -0.00133264], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.48310226, -0.00265538], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.4870606 , -0.00395836], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.49229246, -0.00523185], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.49875876, -0.0064663 ], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.5064112 , -0.00765243], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.51519245, -0.00878128], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.52503675, -0.00984432], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state': array([-0.5358703 , -0.01083353], dtype=float32),
   'action': -1,
   'reward': -1.0},
  {'state'

#Tile Coding

In [None]:
basehash = hash

class IHT:
    "Structure to handle collisions"
    def __init__(self, sizeval):
        self.size = sizeval
        self.overfullCount = 0
        self.dictionary = {}

    def __str__(self):
        "Prepares a string for printing whenever this object is printed"
        return "Collision table:" + \
               " size:" + str(self.size) + \
               " overfullCount:" + str(self.overfullCount) + \
               " dictionary:" + str(len(self.dictionary)) + " items"

    def count (self):
        return len(self.dictionary)

    def fullp (self):
        return len(self.dictionary) >= self.size

    def getindex (self, obj, readonly=False):
        d = self.dictionary
        if obj in d: return d[obj]
        elif readonly: return None
        size = self.size
        count = self.count()
        if count >= size:
            if self.overfullCount==0: print('IHT full, starting to allow collisions')
            self.overfullCount += 1
            return basehash(obj) % self.size
        else:
            d[obj] = count
            return count

def hashcoords(coordinates, m, readonly=False):
    if type(m)==IHT: return m.getindex(tuple(coordinates), readonly)
    if type(m)==int: return basehash(tuple(coordinates)) % m
    if m==None: return coordinates

from math import floor, log
from itertools import zip_longest

def tiles (ihtORsize, numtilings, floats, ints=[], readonly=False):
    """returns num-tilings tile indices corresponding to the floats and ints"""
    qfloats = [floor(f*numtilings) for f in floats]
    Tiles = []
    for tiling in range(numtilings):
        tilingX2 = tiling*2
        coords = [tiling]
        b = tiling
        for q in qfloats:
            coords.append( (q + b) // numtilings )
            b += tilingX2
        coords.extend(ints)
        Tiles.append(hashcoords(coords, ihtORsize, readonly))
    return Tiles

def tileswrap (ihtORsize, numtilings, floats, wrapwidths, ints=[], readonly=False):
    """returns num-tilings tile indices corresponding to the floats and ints, wrapping some floats"""
    qfloats = [floor(f*numtilings) for f in floats]
    Tiles = []
    for tiling in range(numtilings):
        tilingX2 = tiling*2
        coords = [tiling]
        b = tiling
        for q, width in zip_longest(qfloats, wrapwidths):
            c = (q + b%numtilings) // numtilings
            coords.append(c%width if width else c)
            b += tilingX2
        coords.extend(ints)
        Tiles.append(hashcoords(coords, ihtORsize, readonly))
    return Tiles