In [None]:
import gym
import numpy as np
import random, time
from collections import defaultdict

In [None]:
import matplotlib.pyplot as plt

def plot_values(V):
 # reshape value function
 V_sq = np.reshape(V, (4,4))
 # plot the state-value function
 fig = plt.figure(figsize=(6, 6))
 ax = fig.add_subplot(111)
 im = ax.imshow(V_sq, cmap='cool')
 for (j,i),label in np.ndenumerate(V_sq):
  ax.text(i, j, np.round(label, 5), ha='center', va='center', fontsize=14)
 plt.tick_params(bottom=False, left=False, labelbottom=False, labelleft=False)
 plt.title('State-Value Function')
 plt.show()

In [None]:
class FrozenLakeWrapper(gym.Env):
    def __init__(self, desc=None, map_name="4x4", is_slippery=True, gamma: float=1.0):
        self.__env = gym.make('FrozenLake-v0', desc=desc, map_name=map_name, is_slippery=is_slippery)
        self.observation_space = self.__env.observation_space
        self.action_space = self.__env.action_space
        self.reward_range = self.__env.reward_range
        self.__gamma = gamma

    def reset(self):
        return self.__env.reset()

    def step(self, action):
        return self.__env.step(action)

    def execute(self, state: int, action: int):
        next_state, reward, _, _ = self.__env.step(action)
        return next_state, reward

    def get_actions(self, state: int):
        return [action for action in range(self.action_space.n) if self.__env.P[state][action]]

    def get_transitions(self, state: int, action: int):
        return [(x[1],x[0]) for x in self.__env.P[state][action]]

    def get_initial_state(self):
        return 0

    def get_discount_factor(self):
        return self.__gamma

    def is_terminal(self, state: int):
        return self.__env.env.desc.flatten()[state] in b'GH'

    def close(self):
        return self.__env.close()

# Monte Carlo Tree Search

In [None]:
# def simulate_step(env, state: int, action: int=None):
#  # Save the current state
#  original_state = env.env.s

#  # Switch to given state
#  env.env.s = state

#  # Choose an action
#  action = action if action else env.action_space.sample()

#  # Simulate the step without modifying the original environment's state
#  next_state, reward, done, _ = env.env.step(action)

#  # Restore the original state
#  env.env.s = original_state

#  return next_state, reward, done

class Node:

 # Record a unique node id to distinguish duplicated states
 next_node_id = 0

 # Records the number of times states have been visited
 visits = defaultdict(lambda: 0)

 def __init__(self, env, Q, state=0, reward=0.0, action=None, parent=None, gamma=1.0):
  self.env    = env
  self.parent = parent
  self.state  = state
  self.reward = reward
  self.action = action
  self.gamma  = gamma
  self.children = {}
  self.id = Node.next_node_id
  self.Q  = Q

  Node.next_node_id += 1

 def get_nA(self):
  # All actions are possible for a given state
  return self.env.nA

 def get_actions(self):
  # All actions are possible for a given state
  return set(range(self.env.nA))

 def get_visits(self):
  return Node.visits[self.state]

 def is_fully_expanded(self):
   return True if self.get_nA() == len(self.children) else False

 def is_terminal(self, state: int=None):
  return self.env.desc.flatten()[state] in b'GH'

 """ Select a node that is not fully expanded """

 def select(self):
  node = self
  while node.is_fully_expanded():
   if not node.children or node.is_terminal():
    return node

   actions = list(node.children.keys())
   action  = random.choice(actions)
   node    = random.choice(node.children[action])[0]

  return node

 """ Expand a node if it is not a terminal node """

 def expand(self):

  if self.is_terminal():
   return self

  # Choose at random an action that has not been expanded
  actions = list(self.get_actions() - self.children.keys())
  action  = random.choice(actions)

  # Synchronize the state of the environment with the state of the node
  self.env.reset()
  self.env.env.s = self.state
  # Execute the action
  next_state, reward, done, info = self.env.env.step(action)

  # Initialize new child node
  child = Node(self.env, self.Q, next_state, reward, action, self, self.gamma)

  # Compute the correct transition probability
  transition = defaultdict(float)
  for prob, next_s, *rest in env.P[self.state][action]:
   transition[next_s] = prob

  # self.children[action] += [(child, info['prob'])]
  self.children[action] = [(child, transition[next_state])]
  return child

 """ Simulate until a terminal state """

 def simulate(self):

  # Set the inner state of the environment to the node state
  self.env.reset()
  self.env.env.s = self.state

  # Initialization
  reward = 0.0
  depth  = 0
  done   = False

  # Simulate reward
  while not done:

   # Choose an action to execute
   action = self.env.action_space.sample()

   # Execute the action
   _, r, done, _ = self.env.env.step(action)

   # Discount the reward
   reward += np.power(self.gamma, depth) * r
   depth  += 1

  return reward

 """ Backpropogate the reward back to the parent node """
 def backpropagate(self, reward, child):
  action = child.action

  node   = self
  while node is not None:
   Node.visits[node.state] += 1
   Node.visits[(node.state, action)] += 1

   Nsa   = Node.visits[(self.state, action)]
   Qsa   = self.Q[(self.state, action)]
   delta = 1.0 * (reward - Qsa) / Nsa

   self.Q[(self.state, action)] += delta

   node = node.parent
   if node is not None:
    node.reward += reward


#  def select(self):
#   if not self.is_fully_expanded() or self.is_terminal():
#    return self
#   else:
#    actions = list(self.children.keys())
#    action  = random.choice(actions)
#    return self.get_outcome_child(action).select()

#  def expand(self):

#   if self.is_terminal():
#    return self

#   actions = list(self.get_actions() - self.children.keys())
#   action  = random.choice(actions)
#   self.children[action] = []

#   return self.get_outcome_child(action)

#  def back_propagate(self, reward, child):
#   action = child.action

#   Node.visits[self.state] += 1
#   Node.visits[(self.state, action)] += 1

#   Nsa   = Node.visits[(self.state, action)]
#   Qsa   = self.Q[(self.state, action)]
#   delta = 1.0 * (reward - Qsa) / Nsa

#   self.Q[(self.state, action)] += delta

#   if self.parent != None:
#    self.parent.back_propagate(self.reward + reward, self)

#  """ Simulate the outcome of an action, and return the child node """

#  def get_outcome_child(self, action):

#   self.env.reset()
#   self.env.env.s = self.state
#   # Choose one outcome based on transition probabilities
#   next_state, reward, done, _ = self.env.env.step(action)

#   # Find the corresponding state and return if this already exists
#   for child, _ in self.children[action]:
#    if next_state == child.state:
#     return child

#   # This outcome has not occured from this state-action pair previously
#   new_child = Node(self.env, self.Q, next_state, reward, action, self, self.gamma)

#   # Find the probability of this outcome (only possible for model-based) for visualising tree
#   probability = 0.0
#   for probability, outcome, _, _ in self.env.P[self.state][action]:
#    if outcome == next_state:
#     self.children[action] += [(new_child, probability)]
#     return new_child

In [None]:
N = 200000
rewards = np.zeros(N)

Q = defaultdict(lambda: 0.0)
env = gym.make('FrozenLake-v0', desc=None, map_name="4x4", is_slippery=True)
env.reset()
node = Node(env, Q)

In [None]:
for _ in range(N):
 current = node.select()
 child   = current.expand()
 reward  = child.simulate()
 current.backpropagate(reward, child)

In [None]:
Qvec = np.zeros((env.nS, env.nA))
for (s, a), q in Q.items():
 Qvec[s,a] = q

In [None]:
plot_values(np.max(Qvec, axis=1))

## Simulation tests for gym env

In [None]:
N = 100000
rewards = np.zeros(N)

Q = defaultdict(lambda: 0.0)
node = Node(env, Q)

for i in range(N):
 rewards[i] = node.simulate()

np.sum(rewards)

In [None]:
for key, value in Q.items():
 print(key, value)

In [None]:
N = 100000
rewards = np.zeros(N)

for i in range(N):
 env.reset()
 done = False
 reward = 0.0
 state = env.env.s
 while not done:
  action = env.action_space.sample()
  # _, next_state, r, done = random.choice(env.P[state][action])
  next_state, r, done, _ = env.step(action)
  reward += r
 rewards[i] = reward

np.sum(rewards)

# Dynamic programming

In [None]:
import gym
import numpy as np

env = gym.make('FrozenLake-v0', desc=None, map_name="4x4", is_slippery=True)

In [None]:
import matplotlib.pyplot as plt

def plot_values(V):
 # reshape value function
 V_sq = np.reshape(V, (4,4))
 # plot the state-value function
 fig = plt.figure(figsize=(6, 6))
 ax = fig.add_subplot(111)
 im = ax.imshow(V_sq, cmap='cool')
 for (j,i),label in np.ndenumerate(V_sq):
  ax.text(i, j, np.round(label, 5), ha='center', va='center', fontsize=14)
 plt.tick_params(bottom=False, left=False, labelbottom=False, labelleft=False)
 plt.title('State-Value Function')
 plt.show()

In [None]:
# Dynamic programming
def dynamic_program(env, gamma=1.0, eps=1e-8):
 V = np.zeros(env.nS)

 while True:
  Q     = np.zeros((env.nS, env.nA))
  delta = 0.0

  for s in range(env.nS):
   Vs = 0.0

   for a in range(env.nA):
    for prob, next_state, reward, done in env.P[s][a]:
     # Vs += (1.0 / env.nA) * prob * (reward + gamma * V[next_state])
     Q[s,a] += prob * (reward + gamma * V[next_state])

   Vs    = np.sum((1.0 / env.nA) * Q[s,])
   delta = max(delta, np.abs(V[s]-Vs))
   V[s]  = Vs

  if delta < eps:
   break

 return V, Q

In [None]:
dynamic_program(env)

In [None]:
V, _ = dynamic_program(env)
# Extracting Q from V with respect to a policy
gamma = 1.0
Q = np.zeros((env.nS, env.nA))

for s in range(env.nS):
 for a in range(env.nA):
  for prob, next_state, reward, done in env.P[s][a]:
   Q[s,a] += prob * (reward + gamma * V[next_state])

Q

In [None]:
# Policy improvement
policy = np.ones([env.nS, env.nA]) / env.nA
Q = np.zeros((env.nS, env.nA))

for s in range(env.nS):
 for a in range(env.nA):
  for prob, next_state, reward, done in env.P[s][a]:
   Q[s,a] += prob * (reward + gamma * V[next_state])

 a_opt     = np.flatnonzero(Q[s,] == np.max(Q[s,]))
 policy[s] = np.zeros(env.nA)
 policy[s,a_opt] = 1.0 / len(a_opt)

policy

In [None]:
def policy_evaluation(env, policy, gamma=1.0, eps=1e-8):
 V     = np.zeros(env.nS)

 while True:
  delta = 0.0

  for s in range(env.nS):
   Vs = 0.0

   for a, prob_a in enumerate(policy[s]):
    for prob, next_state, reward, done in env.P[s][a]:
     Vs += prob_a * prob * (reward + gamma * V[next_state])

   delta = max(delta, np.abs(V[s]-Vs))
   V[s]  = Vs

  if delta < eps:
   break

 return V

In [None]:
random_policy = np.ones([env.nS, env.nA]) / env.nA

V = policy_evaluation(env, random_policy)

In [None]:
plot_values(V)

In [None]:
# Extracting Q from V with respect to a policy
def policy_extraction(env, V, s, gamma=1.0):
 qs = np.zeros(env.nA)

 for a in range(env.nA):
  for prob, next_state, reward, done in env.P[s][a]:
   qs[a] += prob * (reward + gamma * V[next_state])

 return qs

In [None]:
V = policy_evaluation(env, random_policy)
policy_extraction(env, V, 4)

In [None]:
def policy_improvement(env, V, gamma=1.0):
 policy = np.zeros([env.nS, env.nA]) / env.nA

 for s in range(env.nS):
  qs    = policy_extraction(env, V, s, gamma)
  a_opt = np.flatnonzero(qs == np.max(qs))
  # Initialize policy vector
  policy[s] = np.zeros(env.nA)
  # Uniform probability of optimal action(s)
  policy[s,a_opt] = 1.0 / len(a_opt)

 return policy

In [None]:
policy_improvement(env, V)

In [None]:
def policy_iteration(env, gamma=1.0, eps=1e-8):
 pi = np.ones([env.nS, env.nA]) / env.nA

 while True:
  V     = policy_evaluation(env, pi, gamma, eps)
  newpi = policy_improvement(env, V, gamma)

  if (newpi == pi).all():
   break

  pi[:] = newpi

 return pi, V

In [None]:
pi, V_pi = policy_iteration(env)

In [None]:
plot_values(V_pi)

In [None]:
def value_iteration(env, gamma=1.0, eps=1e-8):
 V = np.zeros(env.nS)

 while True:
  delta = 0.0

  for s in range(env.nS):
   v     = V[s]
   V[s]  = max(policy_extraction(env, V, s, gamma))
   delta = max(delta, abs(V[s]-v))

  if delta < eps:
   break

 # policy = policy_improvement(env, V, gamma)
 # return policy, V
 return V

In [None]:
V_pi = value_iteration(env)

In [None]:
plot_values(V_pi)