In [2]:
import random
import numpy as np

class WindyGridworld:
    def __init__(self, size=5, start=(4, 0), goal=(0, 4), wind_column=2):
        self.size = size
        self.start = start
        self.goal = goal
        self.wind_column = wind_column
        self.state = start
        self.actions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
        self.action_names = ['E', 'S', 'W', 'N']
    
    def step(self, action):
        # Apply wind effect if in the wind column
        if self.state[1] == self.wind_column:
            action = self.apply_wind(action)
        
        new_state = (self.state[0] + action[0], self.state[1] + action[1])

        # Enforce boundaries
        new_state = self.check_boundaries(new_state)

        # Check if goal is reached
        done = new_state == self.goal
        reward = 1 if done else -1

        self.state = new_state
        return new_state, reward, done

    # Wind has 1/3 probability of keeping action the same, 1/3 of flipping E/W, and 1/3 of flipping N/S
    def apply_wind(self, action):
        # Simulate wind effect with a probability
        wind_effects = [action, (-action[0], action[1]), (action[0], -action[1])]
        return random.choice(wind_effects)

    def check_boundaries(self, state):
        x, y = state
        x = min(max(x, 0), self.size - 1)
        y = min(max(y, 0), self.size - 1)
        return (x, y)

    def reset(self):
        self.state = self.start
        return self.state    

# Monte Carlo Approach

Compare to Dynamic Programming, we would use a model-free approach like Monte Carlo when we don't know the environment dynamics.  Technically I could code this more efficiently with DP, but that would involve taking advantage of knowledge of how the wind works.  We want to show that a method can learn how to deal with randomized effects like the wind.

We'll use the first-visit MC approach for estimating state-values.  After generating a set of episodes, for each state, s, the set of episodes that all pass through state s is considered for calculating the value of state s. MC updates for estimating the valuefunction are based on the total return obtained in that episode starting from the first time that state s is visited.

In [None]:
def generate_episode(env):
    episode = []
    state = env.reset()
    while True:
        action = random.choice(env.actions)  # Simple random policy
        next_state, reward, done = env.step(action)
        episode.append((state, action, reward))
        if done:
            break
        state = next_state
    return episode

def first_visit_mc(env, num_episodes, gamma=0.99):
    # Initialize state values and returns
    value_table = np.zeros((env.size, env.size))
    
    
    returns = {}
    for x in range(env.size):
        for y in range(env.size):
            # Initialize the list of returns for this state as empty
            returns[(x, y)] = []


    # CONTINYA: UNDERSTAND calculation of returns/ value_table.
    for _ in range(num_episodes):
        episode = generate_episode(env)
        G = 0
        # The return of a given state is the total return from the time the state has been visited.
        # So looping in reverse, we can keep building up the reward with discounting.  We can check if the current state is the first instance of that state.  If so, we can store the return in the returns dictionary.
        # the value_table keeps a running average of the returns for a given state.

        for i in reversed(range(len(episode))):
            state, _, reward = episode[i]
            G = gamma * G + reward
            # checks if the current state at index i is the first occurrence of that state in the episode
            if state not in [x[0] for x in episode[:i]]:
                returns[state].append(G)
                value_table[state[0], state[1]] = np.mean(returns[state])

    return value_table

def derive_policy_from_value_table(env, value_table):
    policy = np.zeros((env.size, env.size), dtype='U1')

    for x in range(env.size):
        for y in range(env.size):
            if (x,y) == env.goal:
                continue
            best_value = -float('inf')
            best_action = None
            for action_index, (dx, dy) in enumerate(env.actions):
                next_x, next_y = x + dx, y + dy
                # Check if next state is within grid boundaries
                if 0 <= next_x < env.size and 0 <= next_y < env.size:
                    value = value_table[next_x, next_y]
                    if value > best_value:
                        best_value = value
                        best_action = action_index
            policy[x, y] = env.action_names[best_action]
    return policy


env = WindyGridworld()
value_table = first_visit_mc(env, 100)
print(value_table)
policy = derive_policy_from_value_table(env, value_table)
print(policy)

## Exploratory start

If we wanted to have it start a random position while generating episodes (exploratory start method), we'd just adjust the episode generator as follows


In [None]:
def generate_episode_with_exploratory_start(env):
    episode = []
    # Choose a random state from the grid as the start state, specifically excluding terminal states
    start_states = [(x, y) for x in range(env.size) for y in range(env.size) if (x, y) not in env.terminal_states]
    state = random.choice(start_states)
    env.state = state  # Manually set the environment's state to the chosen start state

    while True:
        action = random.choice(env.actions)
        next_state, reward, done = env.step(action)
        episode.append((state, action, reward))
        if done:
            break
        state = next_state
    return episode

## $\epsilon$-greedy policy


In [30]:
#TODO: understand why generate_episode isn't completing
def generate_episode(env, value_table, epsilon):
    episode = []
    state = env.reset()
    while True:
        action = epsilon_greedy_policy(state, value_table, epsilon, env)
        next_state, reward, done = env.step(action)
        episode.append((state, action, reward))
        if done:
            break
        state = next_state
    return episode

def first_visit_mc(env, num_episodes, gamma=0.99, epsilon=0.5):
    value_table = np.zeros((env.size, env.size))

    returns = {}
    for x in range(env.size):
        for y in range(env.size):
            # Initialize the list of returns for this state as empty
            returns[(x, y)] = []

    for ep_num in range(num_episodes):
        episode = generate_episode(env, value_table, epsilon)
        G = 0
        for i in reversed(range(len(episode))):
            state, _, reward = episode[i]
            G = gamma * G + reward
            if state not in [x[0] for x in episode[:i]]:
                returns[state].append(G)
                value_table[state[0], state[1]] = np.mean(returns[state])
    
    return value_table

# CONTINYA: the issue is that it keeps going back and forth between
def epsilon_greedy_policy(state, value_table, epsilon, env):
    if np.random.rand() < epsilon:
        return random.choice(env.actions)  # Explore: choose a random action
    else:
        # Exploit: choose the best action based on the current value_table
        return derive_action_from_value_table(state, value_table, env)

def derive_action_from_value_table(state, value_table, env):
    best_value = -float('inf')
    best_actions = []
    for action in env.actions:
        next_x, next_y = (state[0] + action[0] , state[1] + action[1])
        if 0 <= next_x < env.size and 0 <= next_y < env.size:
            # Only consider actions that move within the grid
            value = value_table[next_x, next_y]
            if value > best_value:
                best_value = value
                best_actions = [action]
            elif value == best_value:
                best_actions.append(action)

    return random.choice(best_actions)

def action_to_action_name(action):
    if (action == (0,1)):
        return 'E'
    if (action == (0,-1)):
        return 'W'
    if (action == (1,0)):
        return 'S'
    if (action == (-1,0)):
        return 'N'

def derive_policy_from_value_table(env, value_table):
    policy = np.zeros((env.size, env.size), dtype='U1')

    for x in range(env.size):
        for y in range(env.size):
            state = (x,y)
            if state == env.goal:
                continue
            best_action = derive_action_from_value_table(state, value_table, env)
        
            policy[x, y] = action_to_action_name(best_action)
    return policy

env = WindyGridworld()
value_table = first_visit_mc(env, 1000)
policy = derive_policy_from_value_table(env, value_table)
print(policy)

[['E' 'E' 'E' 'E' '']
 ['E' 'E' 'E' 'E' 'N']
 ['E' 'E' 'E' 'N' 'N']
 ['N' 'N' 'E' 'N' 'N']
 ['N' 'N' 'E' 'N' 'N']]
