# WEEK 5: MULTI ARMED BANDITS IN TF-AGENTS

Reg No: 200968008<br>
Name: Aaron Dsouza

In [36]:
# Installing the required package
!pip install tf-agents

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [37]:
# Importing the Libraries
import numpy as np
import tensorflow as tf

from tf_agents.agents import tf_agent
from tf_agents.drivers import driver
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.policies import tf_policy
from tf_agents.specs import array_spec
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.trajectories import trajectory
from tf_agents.trajectories import policy_step

nest = tf.nest

2.Exercise 1 -Create a  environment 
<ul>
<li>a. for  which  the  observation  is  a  random  integer  between -5  and  5,  there  are  3 possible actions (0, 1, 2), and the reward is the product of the action and the observation.</li>
</ul>

In [38]:
class BanditPyEnvironment(py_environment.PyEnvironment):
  def __init__(self, observation_spec, action_spec):
    self._observation_spec = observation_spec
    self._action_spec = action_spec
    super(BanditPyEnvironment, self).__init__()

  def action_spec(self):
    return self._action_spec

  def observation_spec(self):
    return self._observation_spec

  def _empty_observation(self):
    return tf.nest.map_structure(lambda x: np.zeros(x.shape, x.dtype),
                                self.observation_spec())
    
  def _reset(self):
    return ts.restart(self._observe(), batch_size = self.batch_size)


  def _step(self, action):
    reward = self._apply_action(action)
    return ts.termination(self._observe(), reward)

  # @abc.abstractmethod
  #   def _observe(self):


  # @abc.abstractmethod
  #   def _apply_action(self,action):

In [39]:
class SimplePyEnvironment(BanditPyEnvironment):

  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype = np.int32, minimum=0, maximum=2, name='action'
    )
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-5, maximum=5, name='observation'
    )
    super(SimplePyEnvironment, self).__init__(observation_spec, action_spec)

  # This function returns a random observation between -5 to 5
  def _observe(self):
    self._observation = np.random.randint(-5,6,(1,), dtype='int32')
    return self._observation

  # This function calculates and returns the reward for the specific observation
  def _apply_action(self, action):
    return action * self._observation

In [51]:
# Defining a BanditPYEnvironment
environment = SimplePyEnvironment()
observation = environment.reset().observation
print("observation: %d" % observation)

action = np.random.randint(0,3,(1,), dtype='int32')

print("action: %d" % action)
reward = environment.step(action).reward
print("reward: %f" % reward)

observation: -5
action: 1
reward: -5.000000


<ul>
<li>b. Define an optimal policy manually. The action only depends on the sign of the observation, 0 when is negative and 2 when is positive.</li>
</ul>

In [41]:
# Wrapping the BanditPyEnvironment defined before with TFPyEnvironment
tf_environment = tf_py_environment.TFPyEnvironment(environment)

In [42]:
class SignPolicy(tf_policy.TFPolicy):
  def __init__(self):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-5, maximum=5)
    time_step_spec = ts.time_step_spec(observation_spec)

    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype = tf.int32, minimum=0, maximum=2)

    super(SignPolicy, self).__init__(time_step_spec=time_step_spec,
                                     action_spec =action_spec)
  def _distribution(self, time_step):
    pass

  def _variables(self):
    return ()

  def _action(self, time_step, policy_state, seed):
    observation_sign = tf.cast(tf.sign(time_step.observation), dtype=tf.int32)
    action = observation_sign + 1
    # return policy_step.PolicyStep(action, policy_step) this causes output structure mismatch
    return policy_step.PolicyStep(action, ())

In [43]:
sign_policy = SignPolicy()

current_time_step = tf_environment.reset()
print('Observation:')
# print (current_time_step.observation)
action = sign_policy.action(current_time_step).action
print('Action:')
print (action)
reward = tf_environment.step(action).reward
print('Reward:')
print(reward)

Observation:
Action:
tf.Tensor([[2]], shape=(1, 1), dtype=int32)
Reward:
tf.Tensor([[8.]], shape=(1, 1), dtype=float32)


<ul><li>c. Request  for  50  observations  from  the  environment,  compute  and  print  the total reward.</li></ul>

In [44]:
step = tf_environment.reset()
total_reward = 0
for _ in range(50):
  action_step = sign_policy.action(step).action
  reward = tf_environment.step(action_step).reward
  next_step = tf_environment.step(action_step)
  total_reward += reward
  step = next_step

print("Total Reward",np.array(total_reward)[0])

Total Reward [124.]


3.Exercise 2 –Create an environment 
<ul>
<li>a. Define an environment will either always give reward = observation * action or reward = -observation * action. This will be decided when the environment is initialized.</li>
</ul>

In [45]:
# Defining an Environment
class TwoWayPyEnvironment(BanditPyEnvironment):
  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='Action'
    )
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-5, maximum=5, name='Observation'
    )
    self._reward_sign = 2*np.random.randint(2)-1
    print("reward sign: ",self._reward_sign)
    super(TwoWayPyEnvironment, self).__init__(observation_spec, action_spec)

  def _observe(self):
    self._observation = np.random.randint(-5,6,(1,),dtype='int32')
    return self._observation

  def _apply_action(self, action):
    return self._reward_sign*action*self._observation[0]

In [46]:
two_way_tf_environment = tf_py_environment.TFPyEnvironment(TwoWayPyEnvironment())

reward sign:  -1


<ul>
<li>b. Define a policy that detects the behaviorof the underlying environment. There are three situations that the policy needs to handle:
<ul>
<li>i.The agent has not detected know yet which version of the environment is running.</li>
<li>ii.The  agent  detected  that  the  original  version  of  the  environment  is running.</li>
<li>iii.The  agent  detected  that  the  flipped  version  of  the  environment  is running.</li>
</ul></li>
</ul>

In [47]:
class TwoWaySignPolicy(tf_policy.TFPolicy):
  def __init__(self, situation):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-5, maximum=5
    )
    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2
    )
    time_step_spec = ts.time_step_spec(observation_spec)
    self._situation = situation
    super(TwoWaySignPolicy, self).__init__(time_step_spec=time_step_spec,
                                           action_spec=action_spec)
  
  def _distribution(self, time_step):
    pass

  def _variables(self):
    return [self._situation]

  def _action(self, time_step, policy_state, seed):
    sign = tf.cast(tf.sign(time_step.observation[0,0]), dtype=tf.int32)

    # Case-1: The agent has not detected know yet which version of the environment is running.
    def case_unknown_fn():
      return tf.constant(1, shape=(1,))

    # Case-2: The agent detected that the original version of the environment is running.
    def case_normal_fn():
      return tf.constant(sign+1, shape=(1,))

    # Case-3: The agent detected that the flipped version of the environment is running.
    def case_flipped_fn():
      return tf.constant(1-sign, shape=(1,))

    cases = [(tf.equal(self._situation, 0), case_unknown_fn),
             (tf.equal(self._situation, 1), case_normal_fn),
             (tf.equal(self._situation, 2), case_flipped_fn)]

    action = tf.case(cases, exclusive=True)
    return policy_step.PolicyStep(action, policy_state)
              


<ul>
<li>c. Define the agent that detects the sign of the environment and sets the policy appropriately.
</li>
</ul>

In [48]:
# Defining the Agent
# variable 'situation' is shared by the agent and the policy.
class SignAgent(tf_agent.TFAgent):
  def __init__(self):
    self._situation = tf.Variable(0, dtype=tf.int32)
    policy = TwoWaySignPolicy(self._situation)
    time_step_spec = policy.time_step_spec
    action_spec = policy.action_spec
    super(SignAgent, self).__init__(time_step_spec=time_step_spec,
                                    action_spec=action_spec,
                                    policy=policy,
                                    collect_policy=policy,
                                    train_sequence_length=None)

  def _initialize(self):
    return tf.compat.v1.variables_initializer(self.variables)

  def _train(self, experience, weights=None):
    observation = experience.observation
    action = experience.action
    reward = experience.reward

    needs_action = tf.logical_and(tf.equal(self._situation, 0),
                                  tf.not_equal(reward, 0))


    def new_situation_fn():
      return (3 - tf.sign(tf.cast(observation[0, 0, 0], dtype=tf.int32) *
                          tf.cast(action[0, 0], dtype=tf.int32) *
                          tf.cast(reward[0, 0], dtype=tf.int32))) / 2

    new_situation = tf.cond(needs_action,
                            new_situation_fn,
                            lambda: self._situation)
    new_situation = tf.cast(new_situation, tf.int32)
    tf.compat.v1.assign(self._situation, new_situation)
    return tf_agent.LossInfo((), ())

sign_agent = SignAgent()

Trajectories are tuples that contain samples taken from the previous steps. These samples are then used by the agent to train and update the policy.

In [49]:
def trajectory_for_bandit(initial_step, action_step, final_step):
  return trajectory.Trajectory(observation=tf.expand_dims(initial_step.observation, 0),
                               action=tf.expand_dims(action_step.action, 0),
                               policy_info=action_step.info,
                               reward=tf.expand_dims(final_step.reward, 0),
                               discount=tf.expand_dims(final_step.discount, 0),
                               step_type=tf.expand_dims(initial_step.step_type, 0),
                               next_step_type=tf.expand_dims(final_step.step_type, 0))

<h3>Training the Agent</h3>

In [50]:
step = two_way_tf_environment.reset()
total_reward=0
for _ in range(20):
  action_step = sign_agent.collect_policy.action(step)
  total_reward += step.reward
  next_step = two_way_tf_environment.step(action_step.action)
  experience = trajectory_for_bandit(step, action_step, next_step)
  print(experience)
  sign_agent.train(experience)
  step = next_step

print("\n\nTotal Reward: ",np.array(total_reward)[0])

Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[-1.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1)

After some steps, we can see that the reward is always non-negative i.e, the agent detects the sign of the environment and sets the policy accordingly.