For dynamic programming, we had to know all the states, put the agent into a state, know transition prob etc.. Great solution technique when you know everything.  MC methods learn only from experience.  In RL, the random component is the return.  Instead of calculating the expectation of G, we estimate with sample mean.  Only works for episodic tasks because the episode must terminate before we can calculate rewards.  Methods are similar to multi-armed bandit, like each state in MABandit.  Start with prediction then move to control.
$$
V_\pi(s) \approx \frac{1}{N} \sum_i G_{i,s}
$$
We generate $G$ by playing a bunch of episodes, logging states and rewards.  We calculate $G$ by
$$
G(t)= r(t+1)+\gamma G(t+1),
$$
iterating through states in reverse order, and once we have all (s,G) pairs we average for each s.


What happens if you visit a state more than once (e.g. $t=1,3$).  Either you can use first visit ($G(1)$ only), or every visit ($G(1),G(3)$) and both lead to the same answer for large N

Note that because we are using a sample mean the central limit theorem still applies

In [16]:
import numpy as np


class Grid: # Environment
    def __init__(self, width, height, start):
        self.width = width
        self.height = height
        self.i = start[0]
        self.j = start[1]

    def set(self, rewards, actions):
        # rewards should be a dict of: (i, j): r (row, col): reward
        # actions should be a dict of: (i, j): A (row, col): list of possible actions
        self.rewards = rewards
        self.actions = actions

    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]

    def current_state(self):
        return (self.i, self.j)

    def is_terminal(self, s):
        return s not in self.actions

    def move(self, action):
        # check if legal move first
        if action in self.actions[(self.i, self.j)]:
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1
        # return a reward (if any)
        return self.rewards.get((self.i, self.j), 0)

    def undo_move(self, action):
    # these are the opposite of what U/D/L/R should normally do
        if action == 'U':
            self.i += 1
        elif action == 'D':
            self.i -= 1
        elif action == 'R':
            self.j -= 1
        elif action == 'L':
            self.j += 1
        # raise an exception if we arrive somewhere we shouldn't be
        # should never happen
        assert(self.current_state() in self.all_states())

    def game_over(self):
        # returns true if game is over, else false
        # true if we are in a state where no actions are possible
        return (self.i, self.j) not in self.actions

    def all_states(self):
        # possibly buggy but simple way to get all states
        # either a position that has possible next actions
        # or a position that yields a reward
        return set(list(self.actions.keys()) + list(self.rewards.keys()))


def standard_grid():
    # define a grid that describes the reward for arriving at each state
    # and possible actions at each state
    # the grid looks like this
    # x means you can't go there
    # s means start position
    # number means reward at that state
    # .  .  .  1
    # .  x  . -1
    # s  .  .  .
    g = Grid(3, 4, (2, 0))
    rewards = {(0, 3): 1, (1, 3): -1}
    actions = {
        (0, 0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('L', 'D', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('L', 'R', 'U'),
        (2, 3): ('L', 'U'),
      }
    g.set(rewards, actions)
    return g


def negative_grid(step_cost=-0.1):
    # in this game we want to try to minimize the number of moves
    # so we will penalize every move
    g = standard_grid()
    g.rewards.update({
    (0, 0): step_cost,
    (0, 1): step_cost,
    (0, 2): step_cost,
    (1, 0): step_cost,
    (1, 2): step_cost,
    (2, 0): step_cost,
    (2, 1): step_cost,
    (2, 2): step_cost,
    (2, 3): step_cost,
    })
    return g



In [17]:
SMALL_ENOUGH=10**-4
def print_values(V,g):
    for i in range(g.width):
        print("-------------------------")
        for j in range(g.height):
            v=V.get((i,j),0)
            if v>=0:
                print(" %.2f|" % v, end="")
            else:
                print("%.2f|" % v, end="")
        print("")
        
def print_policy(P,g):
    for i in range(g.width):
        print("")
        print("----------------")
        for j in range(g.height):
            p=P.get((i,j),' ')
            print(" %s |" % p,end="")

In [3]:
ALL_POSSIBLE_ACTIONS=('U','D','L','R')
GAMMA=0.9
#policy evaluation with MC
def play_game(grid,policy):
    #return states and returns
    #reset to start at random posn,
    
    start_states=list(grid.actions.keys())
    start_idx=np.random.choice(len(start_states))
    grid.set_state(start_states[start_idx])
    
    s=grid.current_state()
    state_reward=[(s,0)] #state reward tuple
    while not grid.game_over():
        a=policy[s]
        r=grid.move(a)
        s=grid.current_state()
        state_reward.append((s,r))
    G=0
    state_return=[]
    first = True
#     print(state_reward)
    for s,r in reversed(state_reward):
        if first:
            first = False
        else:
            #ignore first state bc value for terminal state is 0
            state_return.append((s,G))
        G=r+GAMMA*G
#     print(state_return)
    state_return.reverse()
    
    return state_return

        

In [4]:
grid = standard_grid()

# print rewards
print("rewards:")
print_values(grid.rewards, grid)

# state -> action
policy = {
(2, 0): 'U',
(1, 0): 'U',
(0, 0): 'R',
(0, 1): 'R',
(0, 2): 'R',
(1, 2): 'R',
(2, 1): 'R',
(2, 2): 'R',
(2, 3): 'U',
}

V = {}
returns = {} # dictionary of state -> list of returns we've received
states = grid.all_states()
for s in states:
    if s in grid.actions.keys():
        returns[s] = []
    else:
  # terminal state or state we can't otherwise get to
        V[s] = 0
#now that everything is initialized we start the process of going through states
for t in range(100):
    #generate episode
    state_return=play_game(grid,policy)
#     print(state_return)
    seen_states=set()
    for s,G in state_return:
        if s not in seen_states:
            returns[s].append(G)
            V[s]=np.mean(returns[s]) #just one thing but will be more later?
            seen_states.add(s)
            
print("values")
print("")
print_values(V,grid)
print("policy")
print_policy(policy,grid)
        



rewards:
-------------------------
 0.00| 0.00| 0.00| 1.00|
-------------------------
 0.00| 0.00| 0.00|-1.00|
-------------------------
 0.00| 0.00| 0.00| 0.00|
values

-------------------------
 0.81| 0.90| 1.00| 0.00|
-------------------------
 0.73| 0.00|-1.00| 0.00|
-------------------------
 0.66|-0.81|-0.90|-1.00|
policy

----------------
 R | R | R |   |
----------------
 U |   | R |   |
----------------
 U | R | R | U |

In [5]:
#windy gridworld

grid = standard_grid()
def random_action(a):
    p=np.random.random()
    if p<.5:
        return a
    else:
        tmp=list(ALL_POSSIBLE_ACTIONS)
        tmp.remove(a)
        return np.random.choice(tmp)
    

ALL_POSSIBLE_ACTIONS=('U','D','L','R')
GAMMA=0.9
#policy evaluation with MC
def play_game(grid,policy):
    #return states and returns
    #reset to start at random posn,
    
    start_states=list(grid.actions.keys())
    start_idx=np.random.choice(len(start_states))
    grid.set_state(start_states[start_idx])
    
    s=grid.current_state()
    state_reward=[(s,0)] #state reward tuple
    while not grid.game_over():
        a=policy[s]
        a=random_action(a) #this is the only difference from the last one
        r=grid.move(a)
        s=grid.current_state()
        state_reward.append((s,r))
    G=0
    state_return=[]
    first = True
#     print(state_reward)
    for s,r in reversed(state_reward):
        if first:
            first = False
        else:
            #ignore first state bc value for terminal state is 0
            state_return.append((s,G))
        G=r+GAMMA*G
#     print(state_return)
    state_return.reverse()
    
    return state_return

        
    
# print rewards
print("rewards:")
print_values(grid.rewards, grid)

# state -> action
policy = {
    (2, 0): 'U',
    (1, 0): 'U',
    (0, 0): 'R',
    (0, 1): 'R',
    (0, 2): 'R',
    (1, 2): 'U',
    (2, 1): 'L',
    (2, 2): 'U',
    (2, 3): 'L',
  }

V = {}
returns = {} # dictionary of state -> list of returns we've received
states = grid.all_states()
for s in states:
    if s in grid.actions.keys():
        returns[s] = []
    else:
  # terminal state or state we can't otherwise get to
        V[s] = 0
#now that everything is initialized we start the process of going through states
for t in range(5000):
    #generate episode
    state_return=play_game(grid,policy)
#     print(state_return)
    seen_states=set()
    for s,G in state_return:
        if s not in seen_states:
            returns[s].append(G)
            V[s]=np.mean(returns[s]) #just one thing but will be more later?
            seen_states.add(s)
            
print("values")
print("")
print_values(V,grid)
print("policy")
print_policy(policy,grid)
        


rewards:
-------------------------
 0.00| 0.00| 0.00| 1.00|
-------------------------
 0.00| 0.00| 0.00|-1.00|
-------------------------
 0.00| 0.00| 0.00| 0.00|
values

-------------------------
 0.43| 0.56| 0.72| 0.00|
-------------------------
 0.33| 0.00| 0.21| 0.00|
-------------------------
 0.25| 0.19| 0.12|-0.15|
policy

----------------
 R | R | R |   |
----------------
 U |   | U |   |
----------------
 U | L | U | L |

MC for the control problem 

Key is to choose $\text{argmax}_a Q(s,a),$ returning triples (s,a,G), which means we need $|S|\times |A|$ iterations instead of just $|S|,$ so we need many more iterations of MC.  Further, with a fixed policy we only take one action per state.  We can fix this by using the "exploring starts" method, in which we choose a random initial state and a random initial action, thereafter following policy.  This makes sense given our definition of Q
$$
Q_\pi(s,a)=E_\pi \left[ G(t) \, | \, S_t=s, A_t=a\right]
$$
Then we deal with the control problem by alternating between policy evalution and greedy policy improvement. The improvement is the same as before:
$$
\pi(s) = \text{argmax}_a Q(s,a)
$$

Like in value iteration, we do not start a fresh MC evaluation each round, but instead keep updating the same Q, doing policy improvement after each episode.
One side thing, we avoid getting stuck in a state (bumping into wall) by giving rewward of -100 is in same action and end episode.  Although not formally proven, we achieve stability when both value and policy converge to optimal value and optimal policy


In [19]:
ALL_POSSIBLE_ACTIONS=('U','D','L','R')
GAMMA=0.9
#policy evaluation with MC
def play_game_es(grid,policy):
    #return states and returns
    #reset to start at random posn,
    
    start_states=list(grid.actions.keys())
    start_idx=np.random.choice(len(start_states))
    grid.set_state(start_states[start_idx])
    
    s=grid.current_state()
    a=np.random.choice(ALL_POSSIBLE_ACTIONS) #THIS IS WHAT MAKES IT EXPLORING START
    seen_states=set()
    state_action_reward=[(s,a,0)] #state reward tuple
    while True:
        old_s=grid.current_state()
#         print(old_s,a)
        r=grid.move(a)
        s=grid.current_state()
        if s in seen_states:
            state_action_reward.append((s,None,-100))
            break
        elif grid.game_over():
            state_action_reward.append((s,None,r))
            break
        else:
            a=policy[s]
            state_action_reward.append((s,a,r))
        seen_states.add(s)
    G=0
    state_action_return=[]
    first = True
#     print(state_reward)
    for s,a,r in reversed(state_action_reward):
        if first:
            first = False
        else:
            #ignore first state bc value for terminal state is 0
            state_action_return.append((s,a,G))
        G=r+GAMMA*G
#     print(state_return)
    state_action_return.reverse()
    
    return state_action_return

def max_dict(d): #just a helper function for getting max from dict
    max_key=None
    max_val = float('-inf')
    for k,v in d.items():
        if v>max_val:
            max_val=v
            max_key=k
    return max_key,max_val

In [23]:
grid = negative_grid()
#print rewards
print("rewards:")
print_values(grid.rewards, grid)

policy={}
for s in grid.actions.keys():
    policy[s]=np.random.choice(ALL_POSSIBLE_ACTIONS)
    
Q={}
returns={}
states = grid.all_states()
#this is just initialization
for s in states:
    if s in grid.actions.keys():
        Q[s]={}
        for a in ALL_POSSIBLE_ACTIONS:
            Q[s][a]=0
            returns[(s,a)]=[]
    else:
        pass #terminal state or state cannot reach
    
#Now we start
deltas=[] #for debugging, but could be used to test for convergence
for t in range(20000):
    if t % 1000 ==0:
        print(t)
        #generate episode using current policy
    biggest_change=0
    state_action_return=play_game_es(grid,policy)
    seen_SAR=set()
    #update samples of return function
    for (s,a,G) in state_action_return:
        #check if seen bc using first-visit policy
        sa=(s,a)
        if sa not in seen_SAR:
            old_q=Q[s][a]
            returns[sa].append(G)
            Q[s][a]=np.mean(returns[sa])
            biggest_change=max(biggest_change,abs(old_q-Q[s][a]))
            seen_SAR.add(sa)
    deltas.append(biggest_change)
    
    #now update policy
    
    for s in policy.keys():
        policy[s]=max_dict(Q[s])[0] #taking the action with max value
        
#now find values V
V={}
for s, Qs in Q.items():
    V[s]=max_dict(Q[s])[1] #get value of best action
    

print("values")
print("")
print_values(V,grid)
print("policy")
print_policy(policy,grid)

rewards:
-------------------------
-0.10|-0.10|-0.10| 1.00|
-------------------------
-0.10| 0.00|-0.10|-1.00|
-------------------------
-0.10|-0.10|-0.10|-0.10|
0
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
values

-------------------------
 0.62| 0.80| 1.00| 0.00|
-------------------------
 0.42| 0.00| 0.80| 0.00|
-------------------------
 0.25| 0.46| 0.62| 0.46|
policy

----------------
 R | R | R |   |
----------------
 U |   | U |   |
----------------
 R | R | U | L |

Reinforcement learning wthout exploring starts, which might not work unless you can control which state you are in "in god mode."  In this case, we will be epsilon greedy instead of greedy.  Thus we ensure that every policy has a chance of being selected $\pi(a|s)\geq \epsilon |A(s)|$. Many iterations are required to find states far from te policy (e.g. $\left(\epsilon/|A(s)|\right)^k$)

In [25]:
ALL_POSSIBLE_ACTIONS=('U','D','L','R')
GAMMA=0.9
def random_action(a,eps=.1):
    p=np.random.random()
    if p<(1-eps):
        return a
    else:
        return np.random.choice(ALL_POSSIBLE_ACTIONS)
    


#policy evaluation with MC
def play_game_no_es(grid,policy):
    #return states and returns
    #reset to start at random posn,
    s=(2,0)
    grid.set_state(s)
    a=random_action(policy[s])
    state_action_reward=[(s,a,0)] #state reward tuple
    while True:
        r=grid.move(a)
        s=grid.current_state()
        if grid.game_over():
            state_action_reward.append((s,None,r))
            break
        else:
            a=random_action(policy[s])
            state_action_reward.append((s,a,r))
    G=0
    state_action_return=[]
    first = True
#     print(state_reward)
    for s,a,r in reversed(state_action_reward):
        if first:
            first = False
        else:
            #ignore first state bc value for terminal state is 0
            state_action_return.append((s,a,G))
        G=r+GAMMA*G
#     print(state_return)
    state_action_return.reverse()
    
    return state_action_return


In [26]:
grid = negative_grid()
#print rewards
print("rewards:")
print_values(grid.rewards, grid)

policy={}
for s in grid.actions.keys():
    policy[s]=np.random.choice(ALL_POSSIBLE_ACTIONS)
    
Q={}
returns={}
states = grid.all_states()
#this is just initialization
for s in states:
    if s in grid.actions.keys():
        Q[s]={}
        for a in ALL_POSSIBLE_ACTIONS:
            Q[s][a]=0
            returns[(s,a)]=[]
    else:
        pass #terminal state or state cannot reach
    
#Now we start
deltas=[] #for debugging, but could be used to test for convergence
for t in range(5000):
    if t % 1000 ==0:
        print(t)
        #generate episode using current policy
    biggest_change=0
    state_action_return=play_game_no_es(grid,policy)
    seen_SAR=set()
    #update samples of return function
    for (s,a,G) in state_action_return:
        #check if seen bc using first-visit policy
        sa=(s,a)
        if sa not in seen_SAR:
            old_q=Q[s][a]
            returns[sa].append(G)
            Q[s][a]=np.mean(returns[sa])
            biggest_change=max(biggest_change,abs(old_q-Q[s][a]))
            seen_SAR.add(sa)
    deltas.append(biggest_change)
    
    #now update policy
    
    for s in policy.keys():
        policy[s]=max_dict(Q[s])[0] #taking the action with max value
        
#now find values V
V={}
for s, Qs in Q.items():
    V[s]=max_dict(Q[s])[1] #get value of best action
    

print("values")
print("")
print_values(V,grid)
print("policy")
print_policy(policy,grid)

rewards:
-------------------------
-0.10|-0.10|-0.10| 1.00|
-------------------------
-0.10| 0.00|-0.10|-1.00|
-------------------------
-0.10|-0.10|-0.10|-0.10|
0
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
values

-------------------------
 0.54| 0.77| 1.00| 0.00|
-------------------------
 0.24| 0.00| 0.77| 0.00|
-------------------------
 0.22| 0.37| 0.54| 0.37|
policy

----------------
 R | R | R |   |
----------------
 U |   | U |   |
----------------
 R | R | U | L |