# Tiling for RL Control

In [1]:
import abc
import copy
import functools
import itertools
import math
import random
from typing import Any, Optional, Sequence


In [2]:
import gym_electric_motor as gem
import gymnasium as gym
import numpy as np
from gym_electric_motor import reward_functions
from scipy import linalg
from sklearn import mixture


In [3]:
from rlplg.environments import gridworld, redgreen

## Tiling

In [4]:
"""
Tile Coding Software version 3.0beta
by Rich Sutton
based on a program created by Steph Schaeffer and others
External documentation and recommendations on the use of this code is available in the 
reinforcement learning textbook by Sutton and Barto, and on the web.
These need to be understood before this code is.

This software is for Python 3 or more.

This is an implementation of grid-style tile codings, based originally on
the UNH CMAC code (see http://www.ece.unh.edu/robots/cmac.htm), but by now highly changed. 
Here we provide a function, "tiles", that maps floating and integer
variables to a list of tiles, and a second function "tiles-wrap" that does the same while
wrapping some floats to provided widths (the lower wrap value is always 0).

The float variables will be gridded at unit intervals, so generalization
will be by approximately 1 in each direction, and any scaling will have 
to be done externally before calling tiles.

Num-tilings should be a power of 2, e.g., 16. To make the offsetting work properly, it should
also be greater than or equal to four times the number of floats.

The first argument is either an index hash table of a given size (created by (make-iht size)), 
an integer "size" (range of the indices from 0), or nil (for testing, indicating that the tile 
coordinates are to be returned without being converted to indices).
"""

basehash = hash

class IHT:
    "Structure to handle collisions"
    def __init__(self, sizeval):
        self.size = sizeval                        
        self.overfullCount = 0
        self.dictionary = {}

    def __str__(self):
        "Prepares a string for printing whenever this object is printed"
        return "Collision table:" + \
               " size:" + str(self.size) + \
               " overfullCount:" + str(self.overfullCount) + \
               " dictionary:" + str(len(self.dictionary)) + " items"

    def count(self):
        return len(self.dictionary)
    
    def fullp(self):
        return len(self.dictionary) >= self.size
    
    def getindex(self, obj, readonly=False):
        d = self.dictionary
        if obj in d:
            return d[obj]
        elif readonly:
            return None
        size = self.size
        count = self.count()
        if count >= size:
            # TODO: Fail
            if self.overfullCount==0: 
                print('IHT full, starting to allow collisions')
            self.overfullCount += 1
            return basehash(obj) % self.size
        else:
            d[obj] = count
            return count

def hashcoords(coordinates, m, readonly=False):
    if isinstance(m, IHT):
        return m.getindex(tuple(coordinates), readonly)
    if isinstance(m, int):
        return basehash(tuple(coordinates)) % m
    if m is None:
        return coordinates


def tiles(ihtORsize, numtilings, floats, ints=[], readonly=False):
    """returns num-tilings tile indices corresponding to the floats and ints"""
    qfloats = [math.floor(f*numtilings) for f in floats]
    Tiles = []
    for tiling in range(numtilings):
        tilingX2 = tiling*2
        coords = [tiling]
        b = tiling
        for q in qfloats:
            coords.append( (q + b) // numtilings )
            b += tilingX2
        coords.extend(ints)
        Tiles.append(hashcoords(coords, ihtORsize, readonly))
    return Tiles

def tileswrap (ihtORsize, numtilings, floats, wrapwidths, ints=[], readonly=False):
    """returns num-tilings tile indices corresponding to the floats and ints, wrapping some floats"""
    qfloats = [math.floor(f*numtilings) for f in floats]
    Tiles = []
    for tiling in range(numtilings):
        tilingX2 = tiling*2
        coords = [tiling]
        b = tiling
        for q, width in itertools.zip_longest(qfloats, wrapwidths):
            c = (q + b%numtilings) // numtilings
            coords.append(c%width if width else c)
            b += tilingX2
        coords.extend(ints)
        Tiles.append(hashcoords(coords, ihtORsize, readonly))
    return Tiles


In [5]:
def pow2geq(lb):
    exp = 1
    while True:
        rs = np.power(2, exp)
        if rs >= lb:
            break
        exp += 1
    return rs

In [6]:
def solve_least_squares(
    matrix: np.ndarray, rhs: np.ndarray
) -> np.ndarray:
    try:
        solution, _, _, _ = linalg.lstsq(a=matrix, b=rhs, lapack_driver="gelsy")
        return solution  # type: ignore
    except linalg.LinAlgError as err:
        # the computation failed, likely due to the matix being unsuitable (no solution).
        raise ValueError("Failed to solve linear system") from err

In [7]:
def rmse(v_pred: np.ndarray, v_true: np.ndarray, axis: int):
    if np.shape(v_pred) != np.shape(v_true):
        raise ValueError(
            f"Tensors have different shapes: {np.shape(v_pred)} != {np.shape(v_true)}"
        )
    return np.sqrt(
        np.sum(np.power(v_pred - v_true, 2.0), axis=axis) / np.shape(v_pred)[axis]
    )

In [8]:
class Tiles:
    def __init__(
        self, 
        obs_min: np.ndarray, 
        obs_max: np.ndarray,
        num_actions: Sequence[int],
        tiling_dim: int, num_tilings: Optional[int] = None
    ):
        assert isinstance(obs_min, np.ndarray)
        assert isinstance(obs_max, np.ndarray)
        self.obs_min = obs_min
        self.obs_max = obs_max
        self.tiling_dim = tiling_dim
        self.wrapwidths = [tiling_dim] * np.size(obs_min)
        self.num_actions = num_actions
    
        # num tilings should a power of 2
        # and at least 4 times greater than
        # the number of dimensions
        self.num_tilings = num_tilings or pow2geq(np.size(obs_min) * 4)
        self.max_size = (tiling_dim ** np.size(obs_min)) * self.num_tilings
        print("Num tilings", self.num_tilings, "\n", "Flat dim:", self.max_size)
        self.iht = IHT(self.max_size)
        self.dim = self.max_size + self.num_actions

    def __call__(self, obs, action):
        obs_scaled_01 = (obs - self.obs_min) / (self.obs_max - self.obs_min)
        repr = np.zeros(shape=self.dim)
        idx = tileswrap(
            self.iht, 
            numtilings=self.num_tilings, 
            floats=obs_scaled_01 * self.tiling_dim,
            wrapwidths=self.wrapwidths,
            ints=[action] if action else []
        )
        repr[idx] = 1
        print(idx, len(idx))
        return repr


## Utility Functions

In [11]:
def collection_traj_data(env: gym.Env, steps: int):
    obs, _ = env.reset()
    step = 0
    buffer = []
    while step < steps:
        action = env.action_space.sample()
        (
            next_obs,
            rew,
            term,
            trunc,
            _,
        ) = env.step(action)
        step += 1
        buffer.append((obs, action, next_obs, rew))
        obs = next_obs
        if term or trunc:
            obs, _ = env.reset()
    return buffer

## Feature transformation

In [12]:
def hashtrick(xs, dim: int):
    ys = np.zeros(dim, dtype=np.int32)
    idx,  = np.where(xs == 1)
    for i in idx:
        ys[i % dim] += 1
    return ys

In [13]:
class FeatTransform(abc.ABC):
    def __init__(self):
        pass

    @abc.abstractmethod
    def transform(self, obs: Any, action: Any):
        pass

    @property
    @abc.abstractmethod
    def output_shape(self) -> int:
        pass


### Tile Transform

In [14]:
class TileTransform(FeatTransform):
    def __init__(self, env: gym.Env, tiling_dim: int, num_tilings: int = None, hash_dim: int = None):
        if not isinstance(env.observation_space, gym.spaces.Box):
            raise ValueError("env.observation_space must be `spaces.Box`")
        if not isinstance(env.action_space, gym.spaces.Discrete):
            raise ValueError("env.action_space must be `spaces.Discrete`")            
        
        self.obs_space = env.observation_space
        self.tiling_dim = tiling_dim
        self.wrapwidths = [tiling_dim] * np.size(self.obs_space.low)
        self.num_actions = env.action_space.n
    
        # num tilings should a power of 2
        # and at least 4 times greater than
        # the number of dimensions
        self.num_tilings = num_tilings or pow2geq(np.size(self.obs_space.low) * 4)
        self.max_size = (tiling_dim ** np.size(self.obs_space.low)) * self.num_tilings * self.num_actions
        print("Num tilings", self.num_tilings, "\n", "Flat dim:", self.max_size)
        self.iht = IHT(self.max_size)
        self.hash_dim = hash_dim
        
    def transform(self, obs: Any, action: Any):
        obs_scaled_01 = (obs - self.obs_space.low) / (self.obs_space.high - self.obs_space.low)
        repr = np.zeros(shape=self.max_size)
        idx = tileswrap(
            self.iht, 
            numtilings=self.num_tilings, 
            floats=obs_scaled_01 * self.tiling_dim,
            wrapwidths=self.wrapwidths,
            ints=[action] if action else []
        )
        repr[idx] = 1
        if self.hash_dim:
            return hashtrick(repr, self.hash_dim)
        return repr

    @property
    def output_shape(self) -> int:
        return self.hash_dim or self.max_size

In [15]:
env = gym.make("MountainCar-v0", max_episode_steps=10000)
transform = TileTransform(env, tiling_dim=8)
obs, _ = env.reset()
print(transform.transform(obs, 0))
print(transform.transform(obs, 1))
print(transform.transform(obs, 2))

Num tilings 8 
 Flat dim: 1536
[1. 1. 1. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]


### Scale Transform

In [16]:
class ScaleObsOheActTransform(FeatTransform):
    def __init__(self, env: gym.Env, hash_dim: int = None):
        if not isinstance(env.observation_space, gym.spaces.Box):
            raise ValueError("env.observation_space must be `spaces.Box`")
        if not isinstance(env.action_space, gym.spaces.Discrete):
            raise ValueError("env.action_space must be `spaces.Discrete`")
        
        self.obs_space = env.observation_space
        self.num_actions = env.action_space.n
        self.hash_dim = hash_dim
        self.obs_dim = np.size(self.obs_space.high)

    def transform(self, obs: Any, action: Any):
        obs_scaled_01 = (obs - self.obs_space.low) / (self.obs_space.high - self.obs_space.low)
        output = np.zeros(shape=self.obs_dim * self.num_actions)
        idx = np.size(self.obs_space.high) * action
        output[idx:idx+self.obs_dim] = obs_scaled_01
        return output

    @property
    def output_shape(self) -> int:
        return self.hash_dim or self.obs_dim * self.num_actions

In [17]:
env = gym.make("MountainCar-v0", max_episode_steps=10000)
transform = ScaleObsOheActTransform(env)
obs, _ = env.reset()
print(transform.transform(obs, 0))
print(transform.transform(obs, 1))
print(transform.transform(obs, 2))

[0.38051811 0.5        0.         0.         0.         0.        ]
[0.         0.         0.38051811 0.5        0.         0.        ]
[0.         0.         0.         0.         0.38051811 0.5       ]


### Gaussian Mix Transform

In [18]:
class GaussianMixObsOheActTransform(FeatTransform):
    def __init__(self, env: gym.Env, params, sample_steps: int = 100_000):
        # params or hps_params can be provided
        if not isinstance(env.observation_space, gym.spaces.Box):
            raise ValueError("env.observation_space must be `spaces.Box`")
        if not isinstance(env.action_space, gym.spaces.Discrete):
            raise ValueError("env.action_space must be `spaces.Discrete`")

        self.obs_space = env.observation_space
        self.num_actions = env.action_space.n
        self._gm = mixture.GaussianMixture(**params)
        self._gm.fit(
            [obs for obs, _, _ in collection_traj_data(env, steps=sample_steps)]
        )
        self.obs_dim = self._gm.n_components
        
    def transform(self, obs: Any, action: Any):
        output = np.zeros(shape=self.obs_dim * self.num_actions)
        idx = self.obs_dim * action
        output[idx:idx+self.obs_dim] = self._gm.predict_proba([obs])[0]
        return output

    @property
    def output_shape(self) -> int:
        return self.obs_dim * self.num_actions

In [19]:
env = gym.make("MountainCar-v0", max_episode_steps=10000)
transform = GaussianMixObsOheActTransform(
    env, 
    params={"n_components": 4, "covariance_type": "tied"}
)
obs, _ = env.reset()
print(transform.transform(obs, 0))
print(transform.transform(obs, 1))
print(transform.transform(obs, 2))

[2.45730030e-03 3.21516286e-01 6.75638843e-01 3.87570427e-04
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 2.45730030e-03 3.21516286e-01 6.75638843e-01 3.87570427e-04
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 2.45730030e-03 3.21516286e-01 6.75638843e-01 3.87570427e-04]


### Random Binary Transform

In [20]:
class RndBinaryTransform(FeatTransform):
    def __init__(self, env: gym.Env, enc_size: int):
        if not isinstance(env.action_space, gym.spaces.Discrete):
            raise ValueError("env.action_space must be `spaces.Discrete`")              
        self.obs_space = env.observation_space
        if isinstance(self.obs_space, gym.spaces.Discrete):
            self.num_states = self.obs_space.n
        elif isinstance(self.obs_space, gym.spaces.Box):
            (self.obs_space.high - self.obs_space.low).item()
        else:
            raise ValueError("Unsupport observation_space. Must be Box[1d] or Discrete")
        self.num_actions = env.action_space.n
        self.enc_size = enc_size
        self._idx = [np.array(xs) for xs in 
            np.random.randint(
                0, 
                high=self.enc_size, 
                size=(self.num_states, math.floor(self.enc_size/2)),
            
            )
        ]
        
    def transform(self, obs: Any, action: Any):
        output = np.zeros(shape=self.enc_size * self.num_actions)
        idx = self.enc_size * action
        output[self._idx[obs]+idx] = 1
        return output

    @property
    def output_shape(self) -> int:
        return self.enc_size * self.num_actions

In [21]:
env = gym.make("CliffWalking-v0", max_episode_steps=10000)
transform = RndBinaryTransform(
    env, 
    enc_size=6
)
obs, _ = env.reset()
print(transform.transform(obs, 0))
print(transform.transform(obs, 1))
print(transform.transform(obs, 2))

[0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]


## Control with SARSA

In [22]:
def action_values(obs, actions: Sequence[int], weights, feat_transform: FeatTransform):
    state_action_m = np.zeros(shape=(len(actions), feat_transform.output_shape))
    for idx, action in enumerate(actions):
        state_action_m[idx, :] = feat_transform.transform(obs, action)
    return np.dot(state_action_m, weights), state_action_m

def semi_gradient_sarsa(env, alpha: float, gamma: float, epsilon: float, num_episodes: int, feat_transform: FeatTransform):
    actions = tuple(range(env.action_space.n))
    weights = np.zeros(feat_transform.output_shape, dtype=np.float64)
    returns = []
    
    for i in range(num_episodes):
        obs, _ = env.reset()
        state_qvalues, gradients = action_values(obs, actions, weights, feat_transform)
        rewards = 0
        # choose action
        if random.random() <= epsilon:
            action = env.action_space.sample()
        else:
            action = np.random.choice(np.flatnonzero(state_qvalues == state_qvalues.max()))

        while True:
            # greedy            
            next_obs, reward, term, trunc, _,  = env.step(action)
            rewards += reward
            
            if term or trunc:
                weights = weights + alpha * (reward - state_qvalues[action]) * gradients[action]
                break

            next_state_qvalues, next_gradients = action_values(next_obs, actions, weights, feat_transform)
            
            if random.random() <= epsilon:
                next_action = env.action_space.sample()
            else:
                # greedy
                next_action = np.random.choice(np.flatnonzero(next_state_qvalues == next_state_qvalues.max()))

            weights = weights + alpha * (
                reward + gamma * next_state_qvalues[next_action] - state_qvalues[action]
            ) * gradients[action]
            obs = next_obs
            action = next_action
            state_qvalues = next_state_qvalues
            gradients = next_gradients
        returns.append(rewards)
        if (i+1) % math.floor(num_episodes/5) == 0:
            print("Episode", i+1, "mean returns:", np.mean(returns))
    return weights


def play(env, weights, num_episodes: int, feat_transform):
    actions = tuple(range(env.action_space.n))
    returns = []
    for i in range(num_episodes):
        obs, _ = env.reset()
        rewards = 0
        while True:
            state_qvalues, _ = action_values(obs, actions, weights, feat_transform)
            action = np.random.choice(np.flatnonzero(state_qvalues == state_qvalues.max()))
            next_obs, reward, term, trunc, _,  = env.step(action)
            rewards += reward
            obs = next_obs
            if term or trunc:
                returns.append(rewards)
                break
    return np.mean(returns)

### Grid World

In [23]:
class GWObsWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(
            high=np.array(
                    [env.observation_space["agent"][0].n, env.observation_space["agent"][1].n]
                ),
            low=np.zeros(shape=2, dtype=np.int32)
        )

    def observation(self, obs):
        return np.array(obs["agent"])


size, cliffs, exits, start = gridworld.parse_grid_from_text(
    ["oooooooooooo", "oooooooooooo", "oooooooooooo", "sxxxxxxxxxxg"]
)
env = GWObsWrapper(
    gridworld.GridWorld(size, cliffs, exits, start)
)
tile_transform = TileTransform(
    env=env,
    tiling_dim=8,
    hash_dim=512
)
weights = semi_gradient_sarsa(env, alpha=0.01, gamma=1.0, epsilon=0.1, num_episodes=2000, feat_transform=tile_transform)
play(gym.wrappers.TimeLimit(env, max_episode_steps=1000), weights, 20, feat_transform=tile_transform)

Num tilings 8 
 Flat dim: 2048
Episode 400 mean returns: -82.5475
Episode 800 mean returns: -53.83875
Episode 1200 mean returns: -43.365
Episode 1600 mean returns: -38.108125
Episode 2000 mean returns: -35.111


-15.0

### RedGreen

In [24]:
class RGObsWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(
            high=np.array(
                    [env.observation_space["pos"].n]
                ),
            low=np.array([0])
        )

    def observation(self, obs):
        return obs["pos"]

env = RGObsWrapper(
    redgreen.RedGreenSeq(["red", "green", "red", "green", "wait", "green"])
)

tile_transform = TileTransform(
    env=env,
    tiling_dim=4,
)

weights = semi_gradient_sarsa(env, alpha=0.01, gamma=1.0, epsilon=0.2, num_episodes=1000, feat_transform=tile_transform)
play(env, weights, 20, feat_transform=tile_transform)

Num tilings 4 
 Flat dim: 48
Episode 200 mean returns: -10.31
Episode 400 mean returns: -8.5675
Episode 600 mean returns: -8.011666666666667
Episode 800 mean returns: -7.72375
Episode 1000 mean returns: -7.557


-6.0

### Moutain Car

In [25]:
xs = np.array([1, 2, 3, 4])
xs_batch = np.reshape(np.tile(xs, reps=3), newshape=(-1, xs.size))
actions = np.array(range(3))
np.hstack((xs_batch, np.expand_dims(actions, -1)))

array([[1, 2, 3, 4, 0],
       [1, 2, 3, 4, 1],
       [1, 2, 3, 4, 2]])

In [26]:
env = gym.make("MountainCar-v0", max_episode_steps=10000)
tile_transform = TileTransform(
    env=env,
    tiling_dim=4,
)

weights = semi_gradient_sarsa(env, alpha=0.01, gamma=1.0, epsilon=0.2, num_episodes=1000, feat_transform=tile_transform)
play(gym.wrappers.TimeLimit(env, max_episode_steps=1000), weights, 20, feat_transform=tile_transform)

Num tilings 8 
 Flat dim: 384
Episode 200 mean returns: -225.19
Episode 400 mean returns: -190.1525
Episode 600 mean returns: -178.24
Episode 800 mean returns: -172.3725
Episode 1000 mean returns: -169.316


-147.4

### GEM

In [27]:
class StrictWeightedSumOfErrors(reward_functions.WeightedSumOfErrors):
    def __init__(
        self, reward_weights=None, normed_reward_weights=False, violation_reward=None
    ):
        super().__init__(
            reward_weights,
            normed_reward_weights,
            violation_reward,
            gamma=1.0,
            reward_power=1,
            bias=0,
        )

    def reward(self, state, reference, k=None, action=None, violation_degree=0.0):
        del k
        del action
        return (
            self._wse_reward(state, reference)
            + violation_degree * self._violation_reward
        )


In [28]:
class GEMObsWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self._mask = getattr(env.reference_generator, "referenced_states")
        state_obs_space, ref_state_obs_space = env.observation_space

        self._weights = getattr(env.reward_function, "_reward_weights")[self._mask]
        self._expo = getattr(env.reward_function, "_n")[self._mask]
        self._bias = getattr(env.reward_function, "_bias")
        self._denom = (state_obs_space.high - state_obs_space.low)[self._mask]
        self._prev_ref_state = None  # np.zeros_like(state_obs_space.high[self._mask])

        bounds = [
            np.abs(state_obs_space.high[self._mask] - ref_state_obs_space.low),
            np.abs(state_obs_space.high[self._mask] - ref_state_obs_space.high),
            np.abs(state_obs_space.low[self._mask] - ref_state_obs_space.high),
            np.abs(state_obs_space.low[self._mask] - ref_state_obs_space.low),
        ]
        obs_space_low = np.concatenate(
            [
                functools.reduce(np.minimum, bounds),
                # constraint violation
                np.array([0.0]),
            ]
        )
        obs_space_high = np.concatenate(
            [
                functools.reduce(np.maximum, bounds),
                # constraint violation
                np.array([1.0]),
            ]
        )
        self.observation_space = gym.spaces.Box(
            low=obs_space_low, high=obs_space_high, dtype=state_obs_space.dtype
        )
        self._cvfn = getattr(self.env.constraint_monitor, "check_constraints")

    def observation(self, observation):
        prev_ref_state = copy.copy(self._prev_ref_state)
        next_state, ref_state = observation
        cv = self._cvfn(next_state)
        next_state = next_state[self._mask]

        if prev_ref_state is None:
            prev_ref_state = ref_state

        wrapped_next_state = np.concatenate(
            [
                (abs(next_state - prev_ref_state) / self._denom) ** self._expo
                + self._bias,
                np.array([cv]),
            ]
        )
        self._prev_ref_state = ref_state
        return wrapped_next_state



In [29]:
rf = StrictWeightedSumOfErrors(
    violation_reward=-10,
)

env = GEMObsWrapper(
    gem.make("Finite-CC-PMSM-v0", reward_function=rf)
)

scale_transform = ScaleObsOheActTransform(
    env
)

weights = semi_gradient_sarsa(env, alpha=0.01, gamma=1.0, epsilon=0.2, num_episodes=1000, feat_transform=scale_transform)
play(env, weights, 20, feat_transform=scale_transform)

Episode 200 mean returns: -388.1668777278072
Episode 400 mean returns: -306.6296302554932
Episode 600 mean returns: -259.29285292812807
Episode 800 mean returns: -230.98656796703742
Episode 1000 mean returns: -210.8218189409695


-302.71159019963414