<a href="https://colab.research.google.com/github/adammoss/MLiS2/blob/reinforcement_learning/examples/reinforcement_learning/tabular/monte_carlo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

We begin by defining our environment as a class: we need this to have two methods which the agent can interact with, to induce transitions in the state, and to restart the environment after each episode, which we call step and reset respectively. Transition calls _reward, which provides the reward for the transition which just occured, and provides the new state, reward, and whether the episode has ended as output to the agent.

In [16]:
class excursions(object):
	"""A simple environment which provides rewards based on excursions."""

	def __init__(self, parameters):
		self.trajectory_length = parameters['trajectory_length']
		self.positivity_bias = parameters['positivity_bias']
		self.target_bias = parameters['target_bias']
		self.action = 0
		self.state = [0, 0]
		self.terminal_state = False

	def _reward(self):
		"""Calculates the reward for the last transition to occur."""
		if self.state[0] < 0:
			reward = -self.positivity_bias * abs(self.state[0])
		else:
			reward = 0
		if self.state[1] == self.trajectory_length:
			reward -= self.target_bias * abs(self.state[0])
		return reward

	def step(self, action):
		"""Updates the environment state based on the input action."""
		self.action = action
		self.state[0] += 2*action - 1
		self.state[1] += 1
		if self.state[1] == self.trajectory_length:
			self.terminal_state = True
		return self.state, self._reward(), self.terminal_state

	def reset(self):
		"""Resets the environment state and terminal boolean."""
		self.action = 0
		self.state = [0, 0]
		self.terminal_state = False

Next, we need our table for the policy, for which we will need some functions from math and numpy.

In [17]:
import math
import numpy as np
import numpy.random

This table will act as the agent, so it needs to be able to output a randomly selected action. The action function calls the forward function, which simply returns one of the two action probabilities for the current state.

To learn, it needs to be able to update the probabilities, which is done using the step function. To facilitate this, the eligibility (the derivative of the log of the policy) is output as a second return from action, which the learning algorithm can then input to step when later required.

In [18]:
class two_action_policy_table(object):
	"""A tabular policy for environments where each state has two actions."""

	def __init__(self, dimensions, learning_rate):
		self.table = np.zeros(dimensions)
		self.learning_rate = learning_rate

	def forward(self, state):
		"""Calculates the probabilitiy of action 1."""
		exponentiated_potential = math.exp(-self.table[state[0]][state[1]])
		return 1/(exponentiated_potential+1)

	def action(self, state):
		"""Returns a random action according to the current policy."""
		action1_probability = self.forward(state)
		random = numpy.random.random() # pylint: disable = no-member
		if random < action1_probability:
			return 1, 1 - action1_probability
		else:
			return 0, -action1_probability
	
	def step(self, state, error, eligibility):
		"""Updates the potential for actions in the given state."""
		self.table[state[0]][state[1]] += self.learning_rate * error * eligibility

Finally, we need to define our algorithm. The three algorithms we will consider in the tabular case share much of the same structure, so we can make use of inheritance to make the code more compact and highlight the distinctions between the three. As such, we begin by defining a base class for episodic RL algorithms.



In [19]:
class episodic_algorithm(object):
	"""A wrapper for episodic RL algorithms."""

	def __init__(self, parameters):
		self.environment = parameters['environment']
		self.average_return = 0
		self.average_returns = []
		self.returns = []
		self.return_learning_rate = parameters['return_learning_rate']
		self.policy = parameters['policy']
		self.episode = 0
		self.past_state = self.environment.state.copy()
		self.action = 0
		self.current_state = self.environment.state.copy()
		self.reward = 0
		self.current_return = 0
		self.terminal_state = False
	
	def _transition(self):
		self.past_state = self.current_state.copy()
		self.action, self.eligibility = self.policy.action(self.current_state)
		self.current_state, self.reward, self.terminal_state = self.environment.step(
			self.action)
		self.current_return += self.reward

	def _per_step(self):
		self._transition()

	def _per_episode(self):
		self.environment.reset()
		self.past_state = self.environment.state.copy()
		self.current_state = self.environment.state.copy()
		self.terminal_state = False

	def _episode(self):
		self.current_return = 0
		while not self.terminal_state:
			self._per_step()
		self._per_episode()
		self.average_return += self.return_learning_rate * (self.current_return 
															- self.average_return)
		self.episode += 1

	def train(self, episodes):
		self.episode = 0
		while self.episode < episodes:
			self._episode()
			self.average_returns.append(self.average_return)
			self.returns.append(self.current_return)

	def _sample(self):
		trajectory = []
		while not self.terminal_state:
			self._transition()
			trajectory.append(self.current_state)
		self.environment.reset()
		self.past_state = self.environment.state.copy()
		self.current_state = self.environment.state.copy()
		self.terminal_state = False
		return trajectory

	def samples(self, sample_count):
		trajectories = []
		sample = 0
		while sample < sample_count:
			trajectory = self._sample()
			trajectories.append(trajectory)
			sample += 1
		return trajectories

	def _return_sample(self):
		self.current_return = 0
		while not self.terminal_state:
			self._transition()
		self.environment.reset()
		self.past_state = self.environment.state.copy()
		self.current_state = self.environment.state.copy()
		self.terminal_state = False

	def evaluate(self, sample_count, set_average = True):
		sample = 1
		average_return = 0
		while sample <= sample_count:
			self._return_sample()
			average_return += (self.current_return - average_return)/sample
			sample += 1
		if set_average:
			self.average_return = average_return
		return average_return

On top of this base class we then define our first policy gradient algorithm, REINFORCE, or Monte Carlo returns.

In [20]:
class monte_carlo_returns(episodic_algorithm):
	"""A purely return based policy gradient algorithm."""

	def __init__(self, parameters):
		super().__init__(parameters)
		self.states = []
		self.rewards = []
		self.eligibilities = []

	def _per_step(self):
		self._transition()
		self.states.append(self.past_state)
		self.rewards.append(self.reward)
		self.eligibilities.append(self.eligibility)

	def _update(self):
		self.rewards = np.array(self.rewards)
		for index in range(len(self.states)):
			state_return = np.sum(self.rewards[index:])
			self.policy.step(self.states[index], state_return, self.eligibilities[index])

	def _per_episode(self):
		self._update()
		super()._per_episode()
		self.states = []
		self.rewards = []
		self.eligibilities = []

To test this, we begin by generating samples and calculating the average return for the initial 50/50 action policy. We then train with the above algorithm, and repeat the process with the resulting policy.