- The objective of the agent is to maximize its total reward over a certain period of time, but it does not know the true distributions of the different actions in advance. 
- The agent must explore by trying different actions and collecting data on their payoffs, while also exploiting the knowledge it has gained so far to choose the best action based on the available information.
- This trade-off between exploration and exploitation is the main challenge of the MAB problem.

In [1]:
pip install tf-agents

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tf-agents
  Downloading tf_agents-0.16.0-py3-none-any.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m53.4 MB/s[0m eta [36m0:00:00[0m
Collecting pygame==2.1.3
  Downloading pygame-2.1.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.7/13.7 MB[0m [31m67.3 MB/s[0m eta [36m0:00:00[0m
Collecting gym<=0.23.0,>=0.17.0
  Downloading gym-0.23.0.tar.gz (624 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m624.4/624.4 KB[0m [31m53.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: gym
  Building wheel for gym (pyproject.toml) ... [?

### Exercise-1


#### Creating a Environment for which the observation is a random integer between -5 and 5, there are 3 possible actions (0, 1, 2), and the reward is the product of the action and the observation.





In [2]:
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
import numpy as np
import tensorflow as tf
from tf_agents.trajectories import policy_step
from tf_agents.agents import TFAgent
from tf_agents.policies import random_py_policy, tf_policy
from tf_agents.utils import common
from tf_agents.drivers import dynamic_episode_driver



class MyEnvironment(py_environment.PyEnvironment):

  def __init__(self):
    self._observation_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=-5, maximum=5, name='observation')    # observation range from -5 to +5
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')          # action range from 0 to 2 
    self._episode_ended = False
    self._observation = None
    self._reward = None

  def action_spec(self):
    return self._action_spec

  def observation_spec(self):
    return self._observation_spec

  def _reset(self):
    self._observation = np.random.randint(-5, 6)
    self._episode_ended = False
    return ts.restart(np.array(self._observation, dtype=np.int32))

  def _step(self, action):
    if self._episode_ended:
      return self.reset()

    if action < 0 or action > 2:      # termination step
      return self.reset()

    self._reward = self._observation * action
    self._episode_ended = True
    return ts.termination(np.array(self._observation, dtype=np.int32), reward=self._reward)



#### Define an optimal policy manually. The action only depends on the sign of the observation, 0 when is negative and 2 when is positive.

In [None]:
class OptimalPolicy(tf.Module):
    def __init__(self):
        pass

    def action(self, observation):
        action = 0 if observation < 0 else 2
        return policy_step.PolicyStep(action=action, state=())

#### Request for 50 observations from the environment, compute and print the total reward.

In [None]:
env = MyEnvironment()
policy = OptimalPolicy()

total_reward = 0
for _ in range(50):
  time_step = env.reset()
  action_step = policy.action(time_step.observation)
  next_time_step = env.step(action_step.action)
  total_reward += next_time_step.reward

print("Total reward:", total_reward)


Total reward: 136.0


### Exercise-2

#### Define an environment will either always give reward = observation * action or reward = -observation * action. This will be decided when the environment is initialized.


In [None]:
class MyEnvironment(py_environment.PyEnvironment):
    def __init__(self):
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=-5, maximum=5, name='observation')
        self._reward_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.float32, minimum=-25, maximum=25, name='reward')
        self._observation = None
        self._reward = None
        self._flip = np.random.choice([True, False])
        super(MyEnvironment, self).__init__()

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._observation = np.random.randint(low=-5, high=6)
        self._reward = self._observation * np.random.choice([-1, 1])
        return time_step.restart(observation=self._observation)

    def _step(self, action):
        if action < 0 or action > 2:
            return self.reset()
        if self._flip:
            self._reward = self._observation * action
        else:
            self._reward = -self._observation * action
        self._flip = not self._flip
        return time_step.transition(
            observation=self._observation, reward=self._reward, discount=1.0)


#### Define a policy that detects the behavior of the underlying environment. 
- There are three situations that the policy needs to handle

  - The agent has not detected know yet which version of the environment 
  is running.
  - The agent detected that the original version of the environment is 
  running.
  - The agent detected that the flipped version of the environment is 
  running.

In [None]:
class DetectPolicy(tf.Module):
    def __init__(self, num_actions):
        self.num_actions = num_actions
        self.counts = np.zeros((num_actions,))
        self.values = np.zeros((num_actions,))
        self.total_counts = 0
        self.last_action = None
        self.detected_environment = None
    
    def __call__(self, time_step):
        if self.last_action is not None:
            self.update(time_step.reward)
        
        if self.detected_environment is None:
            action = np.random.randint(self.num_actions)
        elif self.detected_environment == "original":
            action = np.argmax(self.values)
        elif self.detected_environment == "flipped":
            action = np.argmin(self.values)
        
        self.last_action = action
        return policy_step.PolicyStep(action=action, state=())
    
    def update(self, reward):
        self.counts[self.last_action] += 1
        self.total_counts += 1
        alpha = 1.0 / self.counts[self.last_action]
        self.values[self.last_action] += alpha * (reward - self.values[self.last_action])
        
        if self.detected_environment is None and self.total_counts >= self.num_actions:
            if self.values[0] > self.values[1]:
                self.detected_environment = "original"
            else:
                self.detected_environment = "flipped"


#### Define the agent that detects the sign of the environment and sets the policy appropriately

In [None]:
class SignDetectionAgent(TFAgent):
    def __init__(self, environment, policy):
        self._environment = environment
        self._policy = policy
        super().__init__(
            time_step_spec=environment.time_step_spec(),
            action_spec=environment.action_spec(),
            policy=policy,
            collect_policy=policy,
            train_sequence_length=2,
            num_outer_dims=1)
        
    def _initialize(self):
        # Run an episode to learn the sign of the environment
        self._environment.reset()
        time_step = self._environment.step(self._environment.action_spec().sample())
        while not time_step.is_last():
            time_step = self._environment.step(self._environment.action_spec().sample())
        observation = time_step.observation
        
        # Set the policy based on the sign of the observation
        if observation < 0:
            self._policy = ZeroPolicy()
        else:
            self._policy = DoublePolicy()
        
        # Update the policy for the agent
        self._set_policy(self._policy)
        
    def train(self, experience, **kwargs):
        if not self._policy:
            self._initialize()
        return super().train(experience, **kwargs)
    
class ZeroPolicy(tf_policy.TFPolicy):
    def __init__(self):
        super().__init__(
            time_step_spec=ts.time_step_spec(tensor_spec.TensorSpec([], tf.int32)),
            action_spec=tensor_spec.BoundedTensorSpec([], tf.int32, 0, 2),
            policy_state_spec=(),
            info_spec=())
        
    def _variables(self):
        return []

    def _distribution(self, time_step):
        action = tf.constant(0, dtype=tf.int32)
        return tfp.distributions.Deterministic(action)
    
class DoublePolicy(tf_policy.TFPolicy):
    def __init__(self):
        super().__init__(
            time_step_spec=ts.time_step_spec(tensor_spec.TensorSpec([], tf.int32)),
            action_spec=tensor_spec.BoundedTensorSpec([], tf.int32, 0, 2),
            policy_state_spec=(),
            info_spec=())
        
    def _variables(self):
        return []

    def _distribution(self, time_step):
        action = tf.constant(2, dtype=tf.int32)
        return tfp.distributions.Deterministic(action)
