In [None]:
# Only needed on colab or on a fresh setup
# Won't work on Windows!
!pip install -r requirements.txt
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

# Environments and Feature Engineering

As we will discuss later, there is a plethora of reinforcement learning frameworks, with a new library appearing almost each week.
Fortunately, when it comes to environments, one standard is far more common than any other: openAI's gym

If we don't count the inbuilt environments, gym is a rather small package with a few simple helpers
for creating custom environments.
The documentation on these helpers is, somehow surprisingly, non-existent. Also, best practices for feature engineering and
general processes concerning environments are not easy to come by.

We suspect that this lack of best-practices and the general non-industrial focus of most RL tools
can be attributed to a stark difference between the RL research and the RL-for-industry focuses:

The main RL research goal is to find algorithms that perform well across multiple environments.
Improvements in performance due to feature engineering or specific learning architectures are
not the overarching aim of most researchers, and hence not a focus of most tools either. Instead, the tools
focus on being well integrated with some setup of multiple environments and on clarity of the implementations
(often sacrificing modularity because researchers are expected to modify the source code directly).

On the other hand, the RL industry application goal is usually to solve some specific problem with
the help of RL. Whether the solution of this problem is useful in other circumstances is of lesser
importance. Thus, there is a need for feature engineering, for incorporating problem-specific tricks
into the learning algorithms, for parallelization, for support of common logging and tracking frameworks
as well as for improved training speed and so on. Moreover, in such projects the developers would
typically not want to modify source code of a tool they are using.
Unfortunately, these needs are rarely met by RL libraries that focus on the research question.


In this notebook we introduce you to gym's basic interfaces and to strategies for feature engineering
and reward shaping. As you will see, the software design is quite different to non-RL machine learning.
The topics presented here will be relevant for almost any kind of RL project.

## Basic interaction with environments

In [None]:
import sys

# rendering envs directly in notebook is recommended only on colab
in_notebook = "google.colab" in sys.modules

In [None]:
import logging

from gym.envs.classic_control import PendulumEnv
from gym.wrappers import TimeLimit
from visualization import demo_model, ModelProtocol

In [None]:
env = PendulumEnv(g=10)
env = TimeLimit(env, max_episode_steps=200)

In [None]:
import numpy as np


class PushRightModel(ModelProtocol):
    def predict(self, observation: np.ndarray, **kwargs):
        return np.array([env.max_torque]), None

In [None]:
demo_model(env, PushRightModel(), num_steps=800, in_notebook=in_notebook)

One can easily define human-designed agents, they can serve as simple benchmarks, and it is often a good idea to start an RL project with them.

## Manipulating environments

Let us see what happens if we increase gravity and keep pushing to the right with the maximal torque

In [None]:
print("initial g: " + str(env.g))
env.g = 20
print("modified g: " + str(env.g))

In [None]:
demo_model(env, PushRightModel(), num_steps=800, in_notebook=in_notebook)

Huh, nothing changed? Gotcha!
The reason is a very funny behaviour of gym wrappers that took me hours to find for the first time: *getattr* is overridden but *setattr* is not!
This causes creation of a new attribute in the wrapper instance shadowing the name of the (still unchanged) attribute in the wrapped environment.

The quick solution here: unwrap and adjust g. The real solution in custom environments is to define setters like

```python
    def set_g(self, g: float):
        self.g = g
```
for all quantities of interest and never to set attributes directly.

In [None]:
env.env.g = 20
demo_model(env, PushRightModel(), num_steps=800, in_notebook=in_notebook)

## Processing and enriching observations

In non-RL machine learning, data processing is usually happening in an abstraction bound to the model. Examples for this are scikit-learn
pipelines and similar architectures allowing the implementation of a `.predict()` method acting on raw or slightly processed inputs. Such models
can then be easily deployed to user-facing applications.

In RL on the other hand, data processing is bound to the environment and not the agent - everything happens with wrappers.
Let's say, we want to add the square of the velocity to the observations. For that, we will use an observation wrapper.

In [None]:
from gym import ObservationWrapper
from gym.spaces import Box


class AddThetadotSquaredWrapper(ObservationWrapper):
    def __init__(self, env: PendulumEnv):
        super().__init__(env)
        high, low = list(env.observation_space.high), list(env.observation_space.low)
        max_speed = high[-1]
        high.append(max_speed ** 2)
        low.append(0)
        self.observation_space = Box(
            low=np.array(low), high=np.array(high), dtype=env.observation_space.dtype
        )

    def observation(self, observation):
        thetadot = observation[-1]
        result = list(observation)
        result.append(thetadot ** 2)
        return np.array(result)

In [None]:
extended_env = AddThetadotSquaredWrapper(env)
extended_env.reset()

This suggests the following architecture: one should have a basic environment that makes as few assumptions about data processing
as possible and then do all processing through a combination of observation wrappers when the environment is instantiated.

How to do the latter is not entirely straightforward. Since the wrappers contain an instance of the environment, they can be easily
chained but not concatenated (i.e. executed simultaneously, possibly in parallel).
It is thus not directly possible to create multiple feature extractors and select which ones should be
used when the environment is created. However, this kind of flexible feature extractor design is often *crucial* for
a project! Rarely do we know in advance which combination of features will work best for solving the task.

This problem can be circumvented by an architectural trick - we create an observation class not bound to
the environment which can be concatenated and use it within a single observation wrapper. Here is a sketch
for how such a software design could look like:

In [None]:
extended_env

In [None]:
import gym
from typing import List
from abc import ABC, abstractmethod


class ScalarObservation(ABC):
    """
    Base class for observations based on greyscale images, e.g. as provided by ScanningEMEnv
    """

    @property
    @abstractmethod
    def observation_space(self) -> Box:
        pass

    @abstractmethod
    def observation(self, arr: np.ndarray) -> np.ndarray:
        pass

    def __str__(self):
        name = self.__class__.__name__
        if name.endswith("Observation"):
            name = name.replace("Observation", "")
        return name


class ScalarObservationWrapper(ObservationWrapper):
    """
    Wraps an environment and an ScalarObservation into a single, gym-compatible object. See docu of
    ScalarObservation for more details about the intended usage of this class.
    """

    def __init__(
        self,
        env: gym.Env,
        scalar_observation: ScalarObservation,
    ):
        super().__init__(env)
        self.scalar_observation = scalar_observation
        self.observation_space = self.scalar_observation.observation_space

    def observation(self, image: np.ndarray):
        return self.scalar_observation.observation(image)

    def __str__(self):
        return f"<{self.scalar_observation}{self.env}>"


def concatenate_boxes(boxes: List[Box]):
    result_lows = []
    result_highs = []
    for b in boxes:
        if len(b.shape) != 1:
            raise ValueError(f"Can only concatenate flat boxes but got shape {b.shape}")
        result_lows.extend(b.low)
        result_highs.extend(b.high)
    return Box(np.array(result_lows), np.array(result_highs))


class ConcatenatedScalarObservation(ScalarObservation):
    def __init__(self, *observations: ScalarObservation):
        self.observations = observations
        self._observation_space = concatenate_boxes(
            [obs.observation_space for obs in self.observations]
        )

    @property
    def observation_space(self) -> Box:
        return self._observation_space

    def observation(self, image: np.ndarray) -> np.ndarray:
        return np.concatenate([obs.observation(image) for obs in self.observations])

    def __str__(self):
        return "_".join([obs.__str__() for obs in self.observations])

Let's look at this pattern in action:

In [None]:
class BaseObservation(ScalarObservation):
    @property
    def observation_space(self) -> Box:
        return env.observation_space

    def observation(self, arr: np.ndarray) -> np.ndarray:
        return arr


class ThetadotSquaredObservation(ScalarObservation):
    @property
    def observation_space(self) -> Box:
        return Box(low=0, high=env.max_speed ** 2, shape=(1,))

    def observation(self, arr: np.ndarray):
        thetadot = arr[-1]
        return np.array([thetadot ** 2])


class ThetadotCubeObservation(ScalarObservation):
    @property
    def observation_space(self) -> Box:
        return Box(low=0, high=env.max_speed ** 3, shape=(1,))

    def observation(self, arr: np.ndarray):
        thetadot = arr[-1]
        return np.array([thetadot ** 3])

In [None]:
obs1 = BaseObservation()
obs2 = ThetadotSquaredObservation()
obs3 = ThetadotCubeObservation()

full_obs = ConcatenatedScalarObservation(obs1, obs2, obs3)
enhanced_env = ScalarObservationWrapper(env, full_obs)

In [None]:
enhanced_env

In [None]:
enhanced_env.observation_space

## Reward shaping

As discussed before, finding a good reward is often the most important as well as the most challenging part of an RL project.
Therefore, a lot of engineering and experimenting with different rewards might be needed. Defining new rewards should be as painless
as possible.

One way to do reward engineering is to use gym's RewardWrapper

In [None]:
from gym import RewardWrapper


class CustomPendulumReward(RewardWrapper):
    def reward(self, reward):
        #      do something with the reward
        return reward

This has two downsides:

    1. as with observations, reward wrappers cannot be concatenated
    2. the reward of the base env will always be computed, which might be expensive

The first downside might not be a problem if concatenation of different rewards is not something you need. However,
the second point is more serious and has no solution within the realm of RewardWrappers.


When reward computations are extracted from observations in a straightforward manner, modularity in reward shaping may be
achieved by passing a `reward_function` to the environment. To get even more generality and maintainability, we propose using
the [strategy pattern](https://en.wikipedia.org/wiki/Strategy_pattern) for injecting rewards into environments. Due to python's
great flexibility, one can in fact have both at the same time. Below we sketch how the strategy pattern could look like for rewards.

In [None]:
from gym.envs.classic_control.pendulum import angle_normalize
from typing import Tuple, Callable


class PendulumRewardMetric(ABC):
    """
    RewardMetrics follow a strategy pattern. They are injected into environments and are used for computing
    the reward from an environment's current state.
    """

    @abstractmethod
    def __call__(self, env: PendulumEnv, action: np.ndarray) -> float:
        pass

    @property
    @abstractmethod
    def range(self) -> Tuple[float, float]:
        pass


class ModularPendulumEnv(PendulumEnv):
    def __init__(
        self, reward_metric: Callable[[PendulumEnv, np.ndarray], float] = None, **kwargs
    ):
        super().__init__(**kwargs)
        if reward_metric is None:
            reward_metric = ModularPendulumEnv._default_reward_metric
        self.reward_metric = reward_metric
        if hasattr(reward_metric, "range"):
            self.reward_metric = reward_metric.range

    @staticmethod
    def _default_reward_metric(env: PendulumEnv, action: np.ndarray):
        th, thdot = env.state  # th := theta
        u = action
        costs = angle_normalize(th) ** 2 + 0.1 * thdot ** 2 + 0.001 * (u ** 2)
        return -costs

    def step(self, u):
        th, thdot = self.state  # th := theta

        g = self.g
        m = self.m
        l = self.l
        dt = self.dt

        u = np.clip(u, -self.max_torque, self.max_torque)[0]
        self.last_u = u  # for rendering
        reward = self.reward_metric(self, u)

        newthdot = (
            thdot
            + (-3 * g / (2 * l) * np.sin(th + np.pi) + 3.0 / (m * l ** 2) * u) * dt
        )
        newth = th + newthdot * dt
        newthdot = np.clip(newthdot, -self.max_speed, self.max_speed)

        self.state = np.array([newth, newthdot])
        return self._get_obs(), reward, False, {}

With this kind of flexibility, we can easily define and inject all kinds of rewards. For example, a sparse reward giving 1 only if
the pendulum is within a certain angle range

In [None]:
class WithinRegionRewardMetric(PendulumRewardMetric):
    def __init__(self, theta_min: float, theta_max: float):
        self.theta_max = theta_max
        self.theta_min = theta_min

    def __call__(self, env: PendulumEnv, action: np.ndarray) -> float:
        theta, thetadot = env.state
        return 0 if self.theta_min < theta < self.theta_max else -1

    @property
    def range(self) -> Tuple[float, float]:
        return 0, 1


within_region_reward = WithinRegionRewardMetric(-np.pi / 4, np.pi / 4)

target_region_pendulum_env = ModularPendulumEnv(reward_metric=within_region_reward)

The same kind of pattern can be applied to episode termination criteria, which sometimes form an important part of the environment.
For the pendulum environment this is rather not the case, so we will not highlight them here.

### Composite rewards (and some syntactic sugar)

Once the strategy pattern is implemented, it is easy to build new reward metrics out of old ones because the rewards are callables with the same,
fixed signature. Thus, they can be multiplied, added, negated and so on. Since python allows operator overloading, this becomes particularly
neat. Here is a small example for implementing combinations of reward metrics.

*NB*: At some point I got sad that operator overloading is only allowed for regular objects but not functions, and that despite functions being
"first class citizens" in python... So I implemented a way around it by coercing functions into classes and thereby permitting operations on them
(prior to execution, of course). Have a look at [this gist](https://gist.github.com/MischaPanch/30b25d82093cdef6577146af75badcff)
if you are interested.

In [None]:
from functools import reduce
from numbers import Number
from typing import TypeVar, Generic, Sequence

log = logging.getLogger(__name__)

EnvType = TypeVar("EnvType", bound=gym.Env)


class RewardMetric(ABC, Generic[EnvType]):
    """
    RewardMetrics follow a strategy pattern. They are injected into environments and are used for computing
    the reward from an environment's current state.
    """

    @abstractmethod
    def __call__(self, env: EnvType, action: np.ndarray) -> float:
        pass

    @property
    @abstractmethod
    def range(self) -> Tuple[float, float]:
        pass

    def __mul__(self, other):
        if isinstance(other, Number):
            other = ConstantRewardMetric(other)
        if not isinstance(other, RewardMetric):
            raise ValueError(
                f"Can only multiply a RewardMetric with another RewardMetric but got: {other.__class__.__name__}"
            )
        return RewardMetricProduct([self, other])

    def __rmul__(self, other):
        return self.__mul__(other)

    def __add__(self, other):
        if not isinstance(other, RewardMetric):
            raise ValueError(
                f"Can only add a RewardMetric to another RewardMetric but got: {other.__class__.__name__}"
            )
        # two is necessary because RewardMetricSum creates a weighted sum. We need to undo that for expected behaviour
        return 2 * RewardMetricSum([self, other])

    def __abs__(self):
        return AbsRewardMetricWrapper(self)

    def __neg__(self):
        return (-1) * self


class RewardMetricSum(RewardMetric):
    def __call__(self, env: EnvType, action: np.ndarray) -> float:
        result = 0
        for metric, weight in zip(self.reward_metrics, self.weights):
            if weight > 0:
                result += weight * metric(env, action)
        return result

    @property
    def range(self) -> Tuple[float, float]:
        return self._range

    def __init__(
        self, reward_metrics: List[RewardMetric], weights: Sequence[float] = None
    ):
        self.reward_metrics = reward_metrics
        n_metrics = len(reward_metrics)
        weights = np.array(weights) if weights is not None else np.ones(n_metrics)
        if not np.all(weights >= 0):
            raise ValueError(f"Weights should be greater equal zero but got: {weights}")
        self.weights = weights / np.sum(weights)
        lower_range, upper_range = 0, 0
        for metric, weight in zip(self.reward_metrics, self.weights):
            if weight > 0:
                lower, upper = metric.range
                lower_range += weight * lower
                upper_range += weight * upper
            else:
                log.debug(
                    f"Ignoring {metric.__class__.__name__} for sum of rewards since weight is 0"
                )
        self._range = (lower_range, upper_range)


class ConstantRewardMetric(RewardMetric):
    def __init__(self, value: Number):
        self.value = float(value)

    def __call__(self, env: EnvType, action: np.ndarray):
        return self.value

    @property
    def range(self) -> Tuple[float, float]:
        return self.value, self.value


class AbsRewardMetricWrapper(RewardMetric):
    def __init__(self, reward_metric: RewardMetric):
        self.reward_metric = reward_metric
        low, high = self.reward_metric.range
        new_low, new_high = sorted([abs(low), abs(high)])
        self._range = (new_low, new_high)

    def __call__(self, env: EnvType, action: np.ndarray) -> float:
        return abs(self.reward_metric(env, action))

    @property
    def range(self) -> Tuple[float, float]:
        return self._range


class RewardMetricProduct(RewardMetric):
    def __init__(self, reward_metrics: List[RewardMetric]):
        self.reward_metrics = reward_metrics

        # NOTE: finding the right range in the general case requires quite complicated logic b/c of changes in sign.
        # E.g. multiplying rewards with ranges (-3, 0), (-1, 2) results in the range (-6, 3)
        def get_product_range(range1, range2):
            range1, range2 = np.array(range1), np.array(range2)
            boundary_candidates_matrix = range1.reshape((2, 1)) * range2
            return boundary_candidates_matrix.min(), boundary_candidates_matrix.max()

        ranges = [m.range for m in reward_metrics]
        self._range = reduce(get_product_range, ranges)

    def __call__(self, env: EnvType, action: np.ndarray) -> float:
        result = 1
        for reward_metric in self.reward_metrics:
            value = reward_metric(env, action)
            if value == 0:
                return 0
            result *= value
        return result

    @property
    def range(self) -> Tuple[float, float]:
        return self._range

## Exercises

Since this notebook was mainly an introduction to useful ideas for approaching an RL project, the constructions
here can be applied in many situations. You could for example:

    1. Make an abstract base environment class with modularity along the lines of `ModularPendulumEnv` permitting simple injection
       of rewardsa as well as an injection episode termination criteria.
    2. Extend the concatenation of observation to images. Implement this for an atari game - one could envision that adding
       additional channels to the input (e.g. mirroring or passing through an edge detector) could boost performance of
       RL agents acting directly on pixel
    3. Try reproducing some gym environments with the modular interfaces
    4. Think about how a curriculum could be added to an environment - an adaptation of the task based on time or on the agent's
       current performance
    5. Try to develop a scheme for creating environments, including observations, rewards and so on, from configuration
       (json, yaml or some format of your choice). How would you approach it? Think about versioning of configuration schemas
       and issues of backwards compatibility.
