In [37]:
import numpy as np

In [38]:
class Grid:
    def __init__(self, height, width):
        self.width = width
        self.height = height
        
    def configure(self, rewards, actions, transitions, start):
        self.rewards = rewards
        self.actions = actions
        self.transitions = transitions
        self.start = start
        self.restart()
        
    def restart(self, start=None, random=False):
        if random: 
            K = list(self.actions.keys())
            start = K[np.random.choice(len(K))]
        elif start is None: 
            start = self.start
        self.state = start
        
    def is_terminal_state(self, s):
        return s not in self.actions
    
    def is_game_over(self):
        return self.is_terminal_state(self.state)
    
    def all_states(self):
        for r in range(self.height):
            for c in range(self.width):
                yield (r,c)
    
    def all_actions(self):
        return list(set(s for a in std_grid.actions.values() for s in a))
    
    def move(self, a):
        if self.is_game_over(): return 0.0
        
        # get the transition distribution from the current state with action a
        targets = self.transitions(self.state, a)        
        if not targets: return 0 # move is not possible
        
        print (targets)
        K = list(targets.keys())
        V = list(targets.values())
        self.state = K[np.random.choice(len(K), p=V)]
        
        return self.rewards.get(self.state, 0.0)

In [39]:
def build_standard_grid():
    g = Grid(5,5)
    
    rewards = { (4,4): 10.0, 
                (2,2): -10.0,
                (4,1): 0.3
              }
    
    actions = { (0,0): ['R', 'D'],
                (0,1): ['L', 'D'],
                (0,3): ['D', 'R'],
                (0,4): ['D', 'L'],
                (1,0): ['U', 'D','R'],
                (1,1): ['U', 'D', 'R', 'L'],
                (1,2): ['D', 'R', 'L'],
                (1,3): ['U', 'D', 'R','L'],
                (1,4): ['D', 'L', 'U'],
                (2,0): ['U', 'D', 'R'],
                (2,1): ['U', 'D','L','R'],
                #(2,2): ['U', 'D', 'R', 'L'],
                (2,3): ['U', 'D', 'R','L'],
                (2,4): ['U', 'D','L'],
                (3,0): ['U', 'D', 'R'],
                (3,1): ['U', 'D', 'R','L'],
                (3,2): ['U', 'D', 'R','L'],
                (3,3): ['U', 'D', 'R','L'],
                (3,4): ['U', 'D','L'],
                (4,0): ['U', 'R'],
                (4,1): ['U', 'R','L'],
                (4,2): ['U', 'R','L'],
                (4,3): ['U', 'R','L']
              }
    
    def transitions(s, a):
        if s not in actions:return {}
        if a not in actions[s]: return {}
        if a=='D': return {(s[0]+1, s[1]  ): 1.0}
        if a=='U': return {(s[0]-1, s[1]  ): 1.0}
        if a=='R': return {(s[0]  , s[1]+1): 1.0}
        if a=='L': return {(s[0]  , s[1]-1): 1.0}

    
    g.configure(rewards, actions, transitions, (0.0))
    return g

In [40]:
def print_value(V, g):
    print ("Value function")
    for c in range(g.width):
        print("+-------", end='')
    print("+")
    for r in range(g.height):
        for c in range(g.width):
            print (f"| {V.get((r,c),0):+0.2f} ", end='')
        print("|")
        for c in range(g.width):
            print("+-------", end='')
        print("+")
        
        
def print_policy(pi, g):
    print ("Policy")
    
    for c in range(g.width):
        print("+-------------", end='')
    print("+")
    
    for r in range(g.height):
        for c in range(g.width):
            s = (r,c)
            print (f"|    {pi(g, 'U', s):+0.2f}    ", end='')
        print("|")
        for c in range(g.width):
            s = (r,c)
            print (f"| {pi(g, 'L', s):+0.2f} {pi(g, 'R', s):+0.2f} ", end='')
        print("|")
        for c in range(g.width):
            s = (r,c)
            print (f"|    {pi(g, 'D', s):+0.2f}    ", end='')
        print("|")
        for c in range(g.width):
            print("+-------------", end='')
        print("+")

In [41]:
def iterative_policy_evaluation(grid, pi, gamma, debug=False):
    
    # initialize the value function to 0
    V = { s:0.0 for s in grid.all_states() }
    
    
    # loop until stabilization
    loop = 0
    while True:
        if debug: print (f"--- Loop #{loop}")
        loop += 1
        
        delta = 0
        
        # enumerate all states
        for s in grid.all_states():
            if debug: print (f"   State {s}")
            
            # if terminal, the value is still zero
            if grid.is_terminal_state(s):
                if debug: print (f"   -> terminal")
                continue
                
                
            # Bellman equation : sum over all available actions
            v = 0
            for a in grid.actions[s]:
                if debug: print (f"      Action: {a}")
                
                # get the probability of taking action 'a' while in state 's'
                pa = pi(grid, a, s)
                if debug: print (f"          pa={pa}")
                if pa == 0: continue
                
                # get the distribution of possible targets if we take action 'a' from state 's'
                targets = grid.transitions(s,a)
                if debug: print (f"          targets={targets}")
                if not targets: continue
                
                for next_state, prob in targets.items():
                    if debug: print (f"              next_state={next_state}, prob={prob}")
                    if debug: print (f"              V[next_state]={V[next_state]}")
                    v += pa * prob * (grid.rewards.get(next_state, 0.0) + gamma * V[next_state])

            
            delta = max(delta, abs(v - V[s]))
            if debug: print (f"   Vs={V[s]} -> {v} : delta={delta}")
                    
            V[s] = v
            
        if (delta < 1e-3):
            break
                
    return V

In [42]:
def random_policy(grid, a, s):
    if s not in grid.actions: return 0.0
    if a not in grid.actions[s]: return 0.0
    return 1.0 / len(grid.actions[s])

std_grid = build_standard_grid()

V = iterative_policy_evaluation(std_grid, random_policy, 0.9)

print_policy(random_policy, std_grid)
print_value (V, std_grid)

Policy
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.50 | +0.50 +0.00 | +0.00 +0.00 | +0.00 +0.50 | +0.50 +0.00 |
|    +0.50    |    +0.50    |    +0.00    |    +0.50    |    +0.50    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.33    |    +0.25    |    +0.00    |    +0.25    |    +0.33    |
| +0.00 +0.33 | +0.25 +0.25 | +0.33 +0.33 | +0.25 +0.25 | +0.33 +0.00 |
|    +0.33    |    +0.25    |    +0.33    |    +0.25    |    +0.33    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.33    |    +0.25    |    +0.00    |    +0.25    |    +0.33    |
| +0.00 +0.33 | +0.25 +0.25 | +0.00 +0.00 | +0.25 +0.25 | +0.33 +0.00 |
|    +0.33    |    +0.25    |    +0.00    |    +0.25    |    +0.33    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.33    |    +0.25    |    +0.25    |    +0.25    |

In [43]:
def fixed_policy(grid, a, s):
    fixed = { (0,0):'D', (1,0):'D', (2,0):'D', (3,0):'D',
              (4,0):'R', (4,1):'R', (4,2):'R', (4,3):'R',
              (0,4):'D', (1,4):'D', (2,4):'D', (3,4):'D',
             (0,1):'D', (1,1):'R', (1,2):'R', (1,3): 'R',
             (2,1):'R',(2,2):'R', (2,3):'R', (0,3) :'R',
             (3,1):'R',(3,2):'R', (3,3):'R'
            }
    return 1.0 if s in fixed and fixed[s] == a else 0.0


V = iterative_policy_evaluation(std_grid, fixed_policy, 0.9)

print_policy(fixed_policy, std_grid)
print_value (V, std_grid)

Policy
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.00 | +0.00 +0.00 | +0.00 +0.00 | +0.00 +1.00 | +0.00 +0.00 |
|    +1.00    |    +1.00    |    +0.00    |    +0.00    |    +1.00    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.00 | +0.00 +1.00 | +0.00 +1.00 | +0.00 +1.00 | +0.00 +0.00 |
|    +1.00    |    +0.00    |    +0.00    |    +0.00    |    +1.00    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.00 | +0.00 +1.00 | +0.00 +1.00 | +0.00 +1.00 | +0.00 +0.00 |
|    +1.00    |    +0.00    |    +0.00    |    +0.00    |    +1.00    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |

In [44]:
def build_fixed_policy(state_action_map):
    return lambda grid, a, s: 1.0 if s in state_action_map and state_action_map[s] == a else 0.0

def policy_iteration(grid, gamma, debug=False):
    # randomly initialize the actions at each state
    V = { s:np.random.random() for s in grid.all_states() }
    A = { s:np.random.choice([a for a in grid.all_actions()]) for s in grid.all_states() }
    
    # create a policy function
    pi = build_fixed_policy(A)

    loop = 0
    while True:
        if debug: print (f"Loop #{loop}: A={A}")
        loop+=1
        
        # policy evaluation
        V = iterative_policy_evaluation(grid, pi, 0.9)
        if debug: print (f"         V={V}")
        
        # policy improvement
        policy_changed = False
        for s in grid.all_states():
            if debug: print (f"    State {s}")
            
            # init max with current values
            maxV = float('-inf')
            maxA = ''
                        
            for a in grid.all_actions():
                
                # get p(s′,r|s,a), the distribution of possible targets if we take action 'a' from state 's'
                targets = grid.transitions(s,a)
                if not targets: continue
                
                v = 0
                for next_state, prob in targets.items():
                    v += prob * (grid.rewards.get(next_state, 0.0) + gamma * V[next_state])
                
                if debug: print (f"      {a} -> {v}")
                if v >= maxV:
                    maxV = v
                    maxA = a

            if maxA != A[s]:
                A[s] = maxA
                policy_changed = True
            
        if not policy_changed:
            break
            
    # last value evaluation
    V = iterative_policy_evaluation(grid, pi, 0.9)

    return A, V

In [45]:
def build_negative_grid(step_cost=-0.1):
    g = build_standard_grid()
    g.rewards.update({ s:step_cost for s in g.actions.keys() })
    g.rewards.update({ (0,0):2*step_cost })
    return g
    
neg_grid = build_negative_grid(-0.1)

A,V = policy_iteration(neg_grid, 0.9)

print_value (neg_grid.rewards, neg_grid)
print_policy(random_policy, neg_grid)
print_value (V, neg_grid)

Value function
+-------+-------+-------+-------+-------+
| -0.20 | -0.10 | +0.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -10.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | +10.00 |
+-------+-------+-------+-------+-------+
Policy
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.50 | +0.50 +0.00 | +0.00 +0.00 | +0.00 +0.50 | +0.50 +0.00 |
|    +0.50    |    +0.50    |    +0.00    |    +0.50    |    +0.50    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.33    |    +0.25    |    +0.00    |    +0.25    |    +0.33    |
| +0.00 +0.33 | +0.25 +0.25 | +0.33 +0.33 | +0.25 +0.25 | +0.33 +0.00 |
|    +0.33

In [46]:
wind_grid = build_windy_grid(-0.1)

A,V = policy_iteration(wind_grid, 0.5)

print_value (wind_grid.rewards, wind_grid)
print_policy(fixed_policy, wind_grid)
print_value (V, wind_grid)

Value function
+-------+-------+-------+-------+-------+
| -0.20 | -0.10 | +0.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -10.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | +10.00 |
+-------+-------+-------+-------+-------+
Policy
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.00 | +0.00 +0.00 | +0.00 +0.00 | +0.00 +1.00 | +0.00 +0.00 |
|    +1.00    |    +1.00    |    +0.00    |    +0.00    |    +1.00    |
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +0.00 | +0.00 +1.00 | +0.00 +1.00 | +0.00 +1.00 | +0.00 +0.00 |
|    +1.00

In [47]:
def build_windy_grid(step_cost=-0.1):
    g = build_negative_grid(step_cost)
    
    def transitions(s, a):
        
        if s not in g.actions: return {}
        if a not in g.actions[s]: return {}
        
        n = len(g.actions[s]) - 1
        
        go_u = ( min(g.height-1,s[0]+1), s[1]                )
        go_d = ( max(0, s[0]-1)        , s[1]                )
        go_r = ( s[0]                  , min(g.width-1,s[1]+1) )
        go_l = ( s[0]                  , max(0, s[1]-1)      )
        
        if a=='U': return {go_u: 0.8 , go_d:0, go_r:0.2, go_l:0 }
        if a=='D': return {go_u: 0, go_d: 0.8 , go_r:0, go_l:0.2 }
        if a=='R': return {go_u: 0, go_d:0.2, go_r: 0.8, go_l:0 }
        if a=='L': return {go_u: 0.2, go_d:0, go_r:0, go_l: 0.8  }

    g.transitions = transitions
    return g

wind_grid = build_windy_grid()

A,V = policy_iteration(wind_grid, 0.9, debug=False)

print_value (wind_grid.rewards, wind_grid)
print_policy(build_fixed_policy(A), wind_grid)
print_value (V, wind_grid)

Value function
+-------+-------+-------+-------+-------+
| -0.20 | -0.10 | +0.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -10.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | +10.00 |
+-------+-------+-------+-------+-------+
Policy
+-------------+-------------+-------------+-------------+-------------+
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
| +0.00 +1.00 | +1.00 +0.00 | +0.00 +0.00 | +0.00 +1.00 | +1.00 +0.00 |
|    +0.00    |    +0.00    |    +0.00    |    +0.00    |    +0.00    |
+-------------+-------------+-------------+-------------+-------------+
|    +1.00    |    +1.00    |    +0.00    |    +1.00    |    +1.00    |
| +0.00 +0.00 | +0.00 +0.00 | +0.00 +1.00 | +0.00 +0.00 | +0.00 +0.00 |
|    +0.00

In [48]:
def value_iteration(grid, gamma, debug=False):
    
    # initialize the value function to 0
    V = { s:0.0 for s in grid.all_states() }
    
    
    # loop until stabilization
    loop = 0
    while True:
        if debug: print (f"--- Loop #{loop}")
        loop += 1
        
        delta = 0
        
        # enumerate all states
        for s in grid.all_states():
            if debug: print (f"   State {s}")
            
            # if terminal, the value is still zero
            if grid.is_terminal_state(s):
                if debug: print (f"   -> terminal")
                continue
                
                
            maxV = float('-inf')
            for a in grid.all_actions():
                
                # get p(s′,r|s,a), the distribution of possible targets if we take action 'a' from state 's'
                targets = grid.transitions(s,a)
                if not targets: continue
                
                v = 0
                for next_state, prob in targets.items():
                    v += prob * (grid.rewards.get(next_state, 0.0) + gamma * V[next_state])
                
                if debug: print (f"      {a} -> {v}")
                if v >= maxV:
                    maxV = v
            
            delta = max(delta, abs(maxV - V[s]))
            if debug: print (f"   Vs={V[s]} -> {maxV} : delta={delta}")
                    
            V[s] = maxV
            
        if (delta < 1e-3):
            break
    
    for s in grid.all_states():
        if debug: print (f"    State {s}")

        maxV = float('-inf')
        maxA = ''

        for a in grid.all_actions():

            # get p(s′,r|s,a), the distribution of possible targets if we take action 'a' from state 's'
            targets = grid.transitions(s,a)
            if not targets: continue

            v = 0
            for next_state, prob in targets.items():
                v += prob * (grid.rewards.get(next_state, 0.0) + gamma * V[next_state])

            if debug: print (f"      {a} -> {v}")
            if v >= maxV:
                maxV = v
                maxA = a

        A[s] = maxA

    return A, V

In [49]:
A,V = value_iteration(wind_grid, 0.9, debug=False)

print_value (wind_grid.rewards, wind_grid)
print_policy(build_windy_policy(A), std_grid)
print_value (V, std_grid)

Value function
+-------+-------+-------+-------+-------+
| -0.20 | -0.10 | +0.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -10.00 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | -0.10 |
+-------+-------+-------+-------+-------+
| -0.10 | -0.10 | -0.10 | -0.10 | +10.00 |
+-------+-------+-------+-------+-------+


NameError: name 'build_windy_policy' is not defined

In [50]:

def first_visit_mc_predition(grid, pi, N, gamma):

    states_and_returns = {} # key:state - value:array of returns
    V = {}
    
    for i in range(N):
        snr = play_episode(grid, pi, gamma)
        seen_states = set()
        for (s,g) in snr:
            if s not in seen_states:
                seen_states.add(s)
                if s in states_and_returns:
                    states_and_returns[s].append(g)
                else:
                    states_and_returns[s] = [g]
        
        for s, returns in states_and_returns.items():
            V[s] = np.mean(returns)
            
    return V

In [None]:
# select an action to do from state s according to the policy pi
def select_action(grid, pi, s):
    A = grid.actions[s]
    p = [pi(grid, a, s) for a in A]
    return np.random.choice(A, p=p)
    

def play_episode(grid, pi, gamma):
    grid.restart(random=True)
    
    # play until gme is over
    s = grid.state 
    states_and_rewards = [(s,0)]
    while not grid.is_game_over():    
        a = select_action(grid, pi, s)
        print(s,"->", a)
        r = grid.move(a)
        s = grid.state
        states_and_rewards.append((s,r))
        
    # iteratively compute returns from rewards
    G = 0
    states_and_returns = []
    for s,r in reversed(states_and_rewards):
        states_and_returns.append((s,G))
        G = r + gamma * G

    states_and_returns.reverse()

    return states_and_returns

In [51]:

V = first_visit_mc_predition(wind_grid, fixed_policy, 100, 0.9)

print_policy(fixed_policy, wind_grid)
print_value (V, std_grid)

(3, 1) -> R
{(4, 1): 0, (2, 1): 0.2, (3, 2): 0.8, (3, 0): 0}
(3, 2) -> R
{(4, 2): 0, (2, 2): 0.2, (3, 3): 0.8, (3, 1): 0}
(3, 3) -> R
{(4, 3): 0, (2, 3): 0.2, (3, 4): 0.8, (3, 2): 0}
(3, 4) -> D
{(4, 4): 0, (2, 4): 0.8, (3, 4): 0, (3, 3): 0.2}
(3, 3) -> R
{(4, 3): 0, (2, 3): 0.2, (3, 4): 0.8, (3, 2): 0}
(3, 4) -> D
{(4, 4): 0, (2, 4): 0.8, (3, 4): 0, (3, 3): 0.2}
(2, 4) -> D
{(3, 4): 0, (1, 4): 0.8, (2, 4): 0, (2, 3): 0.2}
(1, 4) -> D
{(2, 4): 0, (0, 4): 0.8, (1, 4): 0, (1, 3): 0.2}
(0, 4) -> D
{(1, 4): 0, (0, 4): 0, (0, 3): 0.2}


ValueError: probabilities do not sum to 1