In [37]:
import gymnasium as gym
import numpy as np
from collections import defaultdict
from typing import Any
import math

env = gym.make("Blackjack-v1")

In [38]:
# Definition of a random starting policy
example_policy = defaultdict(lambda: {env.action_space.sample(): 1.0})

# Retrieval function for policies
def get_action_from_policy(policy: dict[Any, dict[Any, float]], state: Any) -> float:
    return np.random.choice(a=list(policy[state].keys()),p=list(policy[state].values()))

In [39]:
# Example execution loop
done = False
total_reward = 0

observation, _ = env.reset()

while not done:
    action = get_action_from_policy(example_policy, observation)

    observation, reward, terminated, truncated, info = env.step(action)
    total_reward += reward

    done = terminated or truncated

total_reward

1.0

In [40]:
# Some utils
def generate_episode(env: gym.Env, policy):
    done = False
    state, _ = env.reset()
    tuples = []
    while not done:
        action = get_action_from_policy(policy, state)
        state_, reward, terminated, truncated, info = env.step(action)

        tuples.append((state, int(action), reward))

        state = state_
        done = terminated or truncated
    
    return tuples

def generate_episode_exploring_starts(env: gym.Env, policy):
    env.reset()
    env.unwrapped.s = env.observation_space.sample()

    initial_state = env.unwrapped.s
    initial_action = env.action_space.sample()

    tuples = []
    state = initial_state
    action = initial_action

    while True:
        next_state, reward, terminated, truncated, _ = env.step(action)
        tuples.append((state,action,reward))

        if terminated or truncated:
            break

        state = next_state
        action = get_action_from_policy(policy, state)

    return tuples

In [41]:
def first_visit_MC_prediction(env: gym.Env, policy, episodes:int=10_000, gamma: float = 0.9):
    values = defaultdict(float)
    counts = defaultdict(int)

    for _ in range(episodes):
        episode = generate_episode(env, policy)
        g = 0

        # Pre-calculate first indices
        first_visit_idx = {}
        for idx, (state,_,_) in enumerate(episode):
            if state not in first_visit_idx:
                first_visit_idx[state] = idx

        for i in range(len(episode) -1, -1, -1):
            state, action, reward = episode[i]
            g = gamma * g + reward

            # Checks if this is truly first visit
            if i == first_visit_idx.get(state):
                counts[state] += 1

                # Avg mean update, like in multi armed bandits
                values[state] += (g - values[state]) / counts[state] 

    return values

first_visit_MC_prediction(env, example_policy)

defaultdict(float,
            {(5, 10, 0): -0.8235294117647058,
             (10, 1, 0): -0.6774193548387097,
             (12, 1, 0): -0.7777777777777777,
             (15, 5, 0): -0.30357142857142844,
             (17, 1, 0): -0.6233766233766231,
             (19, 5, 0): 0.4583333333333333,
             (16, 2, 0): -0.74,
             (14, 10, 0): -0.6616965174129352,
             (16, 6, 0): 0.040000000000000015,
             (13, 9, 0): -0.3975903614457832,
             (12, 9, 0): -0.4896341463414634,
             (7, 10, 0): -0.633333333333333,
             (21, 10, 0): 0.8820960698689954,
             (8, 10, 0): -0.4468085106382978,
             (15, 10, 0): -0.7259235127478753,
             (20, 10, 0): -0.8448543689320387,
             (18, 1, 1): -0.07692307692307693,
             (21, 9, 0): 0.8666666666666666,
             (15, 9, 0): -0.4495833333333335,
             (21, 2, 0): -1.0,
             (21, 2, 1): -0.48857142857142866,
             (17, 10, 0): -0.76932584269

In [42]:
# Exploring Starts and following GPI: Evaluation -> Improvement
def monte_carlo_ES(env: gym.Env, episodes:int=10_000, gamma: float = 0.9):
    policy = defaultdict(lambda: {env.action_space.sample(): 1.0})
    q_values = defaultdict(float)
    counts = defaultdict(int)

    for _ in range(episodes):
        episode = generate_episode_exploring_starts(env, policy)
        g = 0

        # Pre-calculate first indices
        first_visit_idx = {}
        for idx, (state,action,_) in enumerate(episode):
            if (state,action) not in first_visit_idx:
                first_visit_idx[(state,action)] = idx

        for i in range(len(episode) -1, -1, -1):
            state, action, reward = episode[i]
            g = gamma * g + reward

            # Checks if this is truly first visit
            if i == first_visit_idx.get((state,action)):
                counts[(state,action)] += 1

                # Avg mean update, like in multi armed bandits
                q_values[(state,action)] += (g - q_values[(state,action)]) / counts[(state,action)] 

                # Policy improvement step
                max_a = env.action_space.sample()
                max_q_val = q_values[(state,max_a)]
                for a in range(env.action_space.n):
                    if q_values[(state,a)] > max_q_val:
                        max_q_val = q_values[(state,a)]
                        max_a = a

                policy[state] = {max_a: 1.0}

    return policy

monte_carlo_ES_optimized_policy = monte_carlo_ES(env, episodes=1000_000, gamma=1.0)

In [43]:
# We benchmark policies
def get_avg_reward(env: gym.Env, policy, iterations=100_000):
    total_reward = 0
    for _ in range(iterations):
        done = False
        observation, _ = env.reset()
        while not done:
            action = get_action_from_policy(policy, observation)
            observation, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            done = terminated or truncated
    return total_reward / iterations

random_policy_avg_reward = get_avg_reward(env, example_policy) 
print(f"Got {random_policy_avg_reward:.3} from random policy")

MC_ES_policy_avg_reward = get_avg_reward(env, monte_carlo_ES_optimized_policy) 
print(f"Got {MC_ES_policy_avg_reward:.3} from monte carlo exploring starts optimized policy")


Got -0.364 from random policy
Got -0.156 from monte carlo exploring starts optimized policy


In [44]:
# Util for generating a random epsilon-soft policy
def generate_epsilon_soft_policy_on_state(env: gym.Env, epsilon: float):
    policy_on_state = {
        action: epsilon / env.action_space.n for action in range(env.action_space.n)
    }
    policy_on_state[np.random.choice(range(env.action_space.n))] += 1 - epsilon

    return policy_on_state

def assign_epsilon_soft_policy_on_state(env: gym.Env, epsilon: float, action: Any):
    policy_on_state = {
        action: epsilon / env.action_space.n for action in range(env.action_space.n)
    }
    policy_on_state[action] += 1 - epsilon

    return policy_on_state

generate_epsilon_soft_policy_on_state(env, 0.1)

{0: np.float64(0.9500000000000001), 1: np.float64(0.05)}

In [45]:
# An algorithm for getting the optimal epsilon-soft policy using MC control
def on_policy_first_visit_MC_control(env: gym.Env, episodes:int=10_000, gamma: float = 0.9, epsilon: float = 0.1):
    policy = defaultdict(lambda: generate_epsilon_soft_policy_on_state(env, epsilon))
    q_values = defaultdict(float)
    counts = defaultdict(int)

    for _ in range(episodes):
        episode = generate_episode(env, policy)
        g = 0

        # Pre-calculate first indices
        first_visit_idx = {}
        for idx, (state,action,_) in enumerate(episode):
            if (state,action) not in first_visit_idx:
                first_visit_idx[(state,action)] = idx

        for i in range(len(episode) -1, -1, -1):
            state, action, reward = episode[i]
            g = gamma * g + reward

            # Checks if this is truly first visit
            if i == first_visit_idx.get((state,action)):
                counts[(state,action)] += 1

                # Avg mean update, like in multi armed bandits
                q_values[(state,action)] += (g - q_values[(state,action)]) / counts[(state,action)] 

                # Policy improvement step
                max_a = env.action_space.sample()
                max_q_val = q_values[(state,max_a)]
                for a in range(env.action_space.n):
                    if q_values[(state,a)] > max_q_val:
                        max_q_val = q_values[(state,a)]
                        max_a = a

                policy[state] = assign_epsilon_soft_policy_on_state(env, epsilon, max_a)

    return policy

on_policy_MC_control_optimized_policy = on_policy_first_visit_MC_control(env, episodes=1000_000, gamma=.9)

In [46]:
random_policy_avg_reward = get_avg_reward(env, example_policy) 
print(f"Got {random_policy_avg_reward:.3} from random policy")

MC_ES_policy_avg_reward = get_avg_reward(env, monte_carlo_ES_optimized_policy) 
print(f"Got {MC_ES_policy_avg_reward:.3} from monte carlo exploring starts optimized policy")

on_policy_MC_policy_avg_reward = get_avg_reward(env, on_policy_MC_control_optimized_policy)
print(f"Got {on_policy_MC_policy_avg_reward:.3} from monte carlo control on-policy optimized policy")

Got -0.368 from random policy
Got -0.155 from monte carlo exploring starts optimized policy
Got -0.0766 from monte carlo control on-policy optimized policy


In [47]:
def off_policy_mc_prediction(env: gym.Env, b_policy, target_policy, gamma:float=0.9, episodes:int=1000):
    q_values = defaultdict(float)
    c = defaultdict(lambda: 0.0)

    for _ in range(episodes):
        episode = generate_episode(env, b_policy)
        g = 0
        w = 1

        for i in range(len(episode) -1, -1, -1):
            state, action, reward = episode[i]
            g = gamma * g + reward
            c[(state,action)] += w
            q_values[(state,action)] += (w / c[(state,action)]) * (g - q_values[(state,action)])
            w = w * (target_policy[state][action] / b_policy[state][action])
        
    return q_values

example_policy = defaultdict(lambda: generate_epsilon_soft_policy_on_state(env, epsilon=0.1))

off_policy_mc_prediction(env,b_policy=example_policy,target_policy=on_policy_MC_control_optimized_policy, episodes=1000)

defaultdict(float,
            {((18, 10, 0), 0): -0.11999999999999997,
             ((21, 9, 0), 1): -1.0,
             ((14, 9, 0), 1): np.float64(-0.9974358974358974),
             ((18, 4, 0), 0): 0.4,
             ((15, 8, 0), 0): -0.3333333333333333,
             ((21, 8, 1), 1): np.float64(0.6545454545454548),
             ((18, 7, 0), 1): np.float64(-0.5615384615384615),
             ((21, 5, 1), 0): 1.0,
             ((13, 5, 1), 1): np.float64(0.9),
             ((19, 10, 0), 0): -0.21739130434782608,
             ((9, 10, 0), 1): np.float64(-0.3325324675324676),
             ((17, 5, 0), 0): -0.27272727272727276,
             ((6, 10, 0), 0): -1.0,
             ((12, 9, 0), 0): -0.8333333333333334,
             ((15, 9, 1), 1): np.float64(-0.7757142857142858),
             ((20, 6, 0), 1): -1.0,
             ((20, 10, 0), 1): -1.0,
             ((17, 10, 0), 1): np.float64(-0.8826219512195121),
             ((17, 10, 1), 1): np.float64(-0.855),
             ((13, 8, 0), 0): 

In [48]:
actions = [0,1] # Hardcoded for Blackjack-v1
def q_argmax(q_values: dict[Any, float], state: Any):
    max_a = actions[0]
    max_q = -math.inf
    for i in actions: 
        if q_values[i] > max_q:
            max_q = q_values[i]
            max_a = i
    
    return {
        max_a: 1.0
    }

def off_policy_mc_control(env: gym.Env, gamma:float=0.9, episodes:int=1000):
    q_values = defaultdict(float)
    c = defaultdict(lambda: 0.0)
    pi: dict[Any, dict[Any, float]] = {}

    for _ in range(episodes):
        b = defaultdict(lambda: generate_epsilon_soft_policy_on_state(env, epsilon=0.1))
        episode = generate_episode(env, b)
        g = 0
        w = 1

        for i in range(len(episode) -1, -1, -1):
            state, action, reward = episode[i]
            g = gamma * g + reward
            c[(state,action)] += w
            q_values[(state,action)] += (w / c[(state,action)]) * (g - q_values[(state,action)])

            pi[state] = q_argmax(q_values, state) # TODO
            if action != pi[state]:
                break

            w = w * (target_policy[state][action] / b_policy[state][action])
        
    return pi

off_policy_MC_control_optimized_policy = off_policy_mc_control(env, episodes=1000_000)

In [49]:
random_policy_avg_reward = get_avg_reward(env, example_policy) 
print(f"Got {random_policy_avg_reward:.3} from random policy")

MC_ES_policy_avg_reward = get_avg_reward(env, monte_carlo_ES_optimized_policy) 
print(f"Got {MC_ES_policy_avg_reward:.3} from monte carlo exploring starts optimized policy")

on_policy_MC_policy_avg_reward = get_avg_reward(env, on_policy_MC_control_optimized_policy)
print(f"Got {on_policy_MC_policy_avg_reward:.3} from monte carlo control on-policy optimized policy")

off_policy_MC_policy_avg_reward = get_avg_reward(env, off_policy_MC_control_optimized_policy)
print(f"Got {off_policy_MC_policy_avg_reward:.3} from monte carlo control off-policy optimized policy")

Got -0.447 from random policy
Got -0.159 from monte carlo exploring starts optimized policy
Got -0.0813 from monte carlo control on-policy optimized policy
Got -0.185 from monte carlo control off-policy optimized policy
