In [1]:
pip install gymnasium

Note: you may need to restart the kernel to use updated packages.


In [23]:
import gymnasium as gym
import numpy as np
from collections import defaultdict
import sys
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable


In [2]:
env = gym.make("Blackjack-v1", natural=True, sab=False)

In [4]:
def basic_policy(observation):
    score, dealer_score, usable_ace = observation
    return 0 if score >= 20 else 1


In [5]:
def basic_policy(observation):
    score = observation[0]# Extract the first element from the observation
    return 0 if score >= 20 else 1

In [6]:
total_episodes = 10
for episode in range(total_episodes):
    observation = env.reset()[0]
    for t in range(100):
        print(f"Observation: {observation}")
        action = basic_policy(observation)
        print(f"Taking action: {action}")
        observation, reward, done, term, info = env.step(action)
        if done:
            print(f"Game ended! Reward: {reward}")
            print('You won :)\n') if reward > 0 else print('You lost :(\n')
            break


Observation: (21, 8, 1)
Taking action: 0
Game ended! Reward: 1.5
You won :)

Observation: (12, 4, 0)
Taking action: 1
Observation: (15, 4, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (10, 8, 0)
Taking action: 1
Observation: (18, 8, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (11, 2, 0)
Taking action: 1
Observation: (17, 2, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (14, 4, 0)
Taking action: 1
Observation: (15, 4, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (18, 9, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (20, 10, 0)
Taking action: 0
Game ended! Reward: 1.0
You won :)

Observation: (18, 1, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (10, 7, 0)
Taking action: 1
Observation: (18, 7, 0)
Taking action: 1
Game ended! Reward: -1.0
You lost :(

Observation: (14, 4, 0)
Taking action: 1
Observation: (15, 4, 0)
Taking action: 1
Obser

### MONTECARLO

In [5]:
# Initialize Q(s, a) and N(s, a) arbitrarily
Q = defaultdict(lambda: np.zeros(env.action_space.n))
N = defaultdict(lambda: np.zeros(env.action_space.n))

In [6]:
num_episodes = 100000


In [10]:
# Implement the Monte Carlo method with exploring start
for i in range(num_episodes):
    # Initialize the episode with an exploring start
    state = env.reset()
    action = np.random.choice(env.action_space.n)

    episode = []
    done = False
    while not done:
        next_state, reward, done, _, _ = env.step(action)
        episode.append((state, action, reward))
        state = next_state
        action = np.random.choice(env.action_space.n)

# Update Q(s, a) and N(s, a) for each state-action pair in the episode
    visited_states_actions = set()
    for t, (state, action, _) in enumerate(episode):
        if (state, action) not in visited_states_actions:
            visited_states_actions.add((state, action))
            G = sum([step[2] for step in episode[t:]])
            N[state][action] += 1
            Q[state][action] += (G - Q[state][action]) / N[state][action]

# Extract the optimal policy
optimal_policy = {}
for state, actions in Q.items():
    optimal_policy[state] = np.argmax(actions)

print("Optimal Policy:", optimal_policy)


TypeError: unhashable type: 'dict'

In [12]:
def generate_episode_from_Q(env, Q, epsilon, nA):
    """ generates an episode from following the epsilon-greedy policy """
    episode = []
    state = env.reset()[0]
    print("state = ", state)
    while True:
        state_tuple = tuple(state)  # Convert the state to a tuple
        if state_tuple in Q:
            action = np.random.choice(np.arange(nA), p=get_probs(Q[state_tuple], epsilon, nA)) 
        else: 
            action = env.action_space.sample()
        next_state, reward, done, info, _ = env.step(action)
        episode.append((state_tuple, action, reward))  # Use the converted state tuple
        state = next_state
        if done:
            break
    return episode


def get_probs(Q_s, epsilon, nA):
    """ obtains the action probabilities corresponding to epsilon-greedy policy """
    policy_s = np.ones(nA) * epsilon / nA
    best_a = np.argmax(Q_s)
    policy_s[best_a] = 1 - epsilon + (epsilon / nA)
    return policy_s

def update_Q(env, episode, Q, alpha, gamma):
    """ updates the action-value function estimate using the most recent episode """
    states, actions, rewards = zip(*episode)
    # prepare for discounting
    discounts = np.array([gamma**i for i in range(len(rewards)+1)])
    for i, state in enumerate(states):
        old_Q = Q[state][actions[i]] 
        Q[state][actions[i]] = old_Q + alpha*(sum(rewards[i:]*discounts[:-(1+i)]) - old_Q)
    return Q

In [13]:
def mc_control(env, num_episodes, alpha, gamma=1.0, eps_start=1.0, eps_decay=.99999, eps_min=0.05):
    nA = env.action_space.n
    # initialize empty dictionary of arrays
    Q = defaultdict(lambda: np.zeros(nA))
    epsilon = eps_start
    # loop over episodes
    for i_episode in range(1, num_episodes+1):
        # monitor progress
        if i_episode % 1000 == 0:
            print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
            sys.stdout.flush()
        # set the value of epsilon
        epsilon = max(epsilon*eps_decay, eps_min)
        # generate an episode by following epsilon-greedy policy
        episode = generate_episode_from_Q(env, Q, epsilon, nA)
        # update the action-value function estimate using the episode
        Q = update_Q(env, episode, Q, alpha, gamma)
    # determine the policy corresponding to the final action-value function estimate
    policy = dict((k,np.argmax(v)) for k, v in Q.items())
    return policy, Q

In [17]:
policy, Q = mc_control(env, 100000, 0.02)

state =  (12, 1, 0)
state =  (12, 7, 0)
state =  (20, 5, 0)
state =  (16, 8, 0)
state =  (20, 10, 0)
state =  (21, 10, 1)
state =  (18, 4, 0)
state =  (14, 4, 0)
state =  (18, 10, 0)
state =  (14, 10, 0)
state =  (13, 7, 0)
state =  (18, 4, 1)
state =  (18, 1, 0)
state =  (11, 10, 0)
state =  (21, 9, 1)
state =  (19, 10, 0)
state =  (14, 5, 1)
state =  (15, 1, 0)
state =  (15, 9, 0)
state =  (14, 2, 0)
state =  (14, 9, 0)
state =  (19, 4, 0)
state =  (19, 7, 1)
state =  (15, 1, 0)
state =  (8, 10, 0)
state =  (10, 6, 0)
state =  (9, 6, 0)
state =  (17, 10, 0)
state =  (17, 6, 0)
state =  (17, 3, 0)
state =  (17, 4, 1)
state =  (21, 5, 1)
state =  (12, 5, 0)
state =  (19, 9, 0)
state =  (20, 6, 0)
state =  (19, 9, 0)
state =  (14, 10, 0)
state =  (13, 10, 0)
state =  (18, 7, 0)
state =  (14, 10, 0)
state =  (5, 10, 0)
state =  (20, 10, 0)
state =  (12, 10, 0)
state =  (7, 2, 0)
state =  (19, 10, 0)
state =  (19, 4, 0)
state =  (17, 8, 0)
state =  (21, 9, 1)
state =  (13, 5, 0)
state =  

In [18]:
print (policy)

{(12, 1, 0): 1, (18, 1, 0): 0, (12, 7, 0): 1, (15, 7, 0): 1, (20, 5, 0): 0, (16, 8, 0): 1, (20, 10, 0): 0, (21, 10, 1): 0, (16, 10, 0): 0, (18, 4, 0): 0, (14, 4, 0): 0, (18, 10, 0): 0, (14, 10, 0): 1, (13, 7, 0): 1, (18, 4, 1): 0, (20, 4, 1): 0, (11, 10, 0): 1, (21, 9, 1): 0, (19, 10, 0): 0, (14, 5, 1): 1, (15, 1, 0): 1, (15, 9, 0): 1, (14, 2, 0): 0, (14, 9, 0): 0, (19, 4, 0): 0, (19, 7, 1): 0, (8, 10, 0): 1, (10, 6, 0): 1, (20, 6, 0): 0, (9, 6, 0): 1, (17, 10, 0): 0, (17, 6, 0): 0, (17, 3, 0): 0, (17, 4, 1): 1, (21, 5, 1): 0, (12, 5, 0): 0, (16, 5, 0): 0, (19, 9, 0): 0, (21, 6, 0): 0, (21, 10, 0): 0, (13, 10, 0): 1, (18, 7, 0): 0, (5, 10, 0): 0, (12, 10, 0): 1, (7, 2, 0): 1, (17, 2, 0): 0, (17, 8, 0): 0, (21, 8, 0): 0, (13, 5, 0): 0, (20, 1, 0): 0, (12, 3, 0): 0, (13, 3, 0): 0, (14, 2, 1): 1, (19, 2, 1): 0, (19, 2, 0): 0, (15, 8, 0): 0, (12, 6, 0): 0, (14, 8, 0): 1, (13, 1, 0): 1, (16, 3, 0): 0, (21, 3, 0): 0, (20, 7, 0): 0, (17, 4, 0): 0, (9, 3, 0): 1, (16, 2, 0): 0, (20, 8, 0): 0, (

In [20]:
total_episodes = 10
for episode in range(total_episodes):
    observation = env.reset()[0]
    for t in range(100):
        print(f"Observation: {observation}")
        action = policy[observation]
        print(f"Taking action: {action}")
        observation, reward, done, term, info = env.step(action)
        if done:
            print(f"Game ended! Reward: {reward}")
            print('You won :)\n') if reward > 0 else print('You lost :(\n')
            break


Observation: (11, 10, 0)
Taking action: 1
Observation: (20, 10, 0)
Taking action: 0
Game ended! Reward: 1.0
You won :)

Observation: (18, 9, 0)
Taking action: 0
Game ended! Reward: 1.0
You won :)

Observation: (15, 2, 0)
Taking action: 0
Game ended! Reward: -1.0
You lost :(

Observation: (18, 10, 0)
Taking action: 0
Game ended! Reward: -1.0
You lost :(

Observation: (12, 5, 0)
Taking action: 0
Game ended! Reward: 1.0
You won :)

Observation: (15, 10, 0)
Taking action: 1
Observation: (16, 10, 0)
Taking action: 0
Game ended! Reward: 1.0
You won :)

Observation: (14, 10, 0)
Taking action: 1
Observation: (18, 10, 0)
Taking action: 0
Game ended! Reward: -1.0
You lost :(

Observation: (20, 6, 0)
Taking action: 0
Game ended! Reward: 0.0
You lost :(

Observation: (10, 6, 0)
Taking action: 1
Observation: (20, 6, 0)
Taking action: 0
Game ended! Reward: 1.0
You won :)

Observation: (20, 10, 0)
Taking action: 0
Game ended! Reward: 0.0
You lost :(



In [27]:
def plot_policy(policy):

    def get_Z(x, y, usable_ace):
        if (x,y,usable_ace) in policy:
            return policy[x,y,usable_ace]
        else:
            return 1

    def get_figure(usable_ace, ax):
        y_range = np.arange(11, 22)  # Inverted axis
        x_range = np.arange(10, 0, -1)  # Inverted axis
        Y, X = np.meshgrid(y_range, x_range)  # Transposed meshgrid
        Z = np.array([[get_Z(x, y, usable_ace) for x in x_range] for y in y_range])
        surf = ax.imshow(Z, cmap=plt.get_cmap('Pastel2', 2), vmin=0, vmax=1, extent=[0.5, 10.5, 10.5, 21.5])  # Inverted extent
        plt.xticks(y_range)  # Inverted ticks
        plt.yticks(x_range)  # Inverted ticks
        plt.gca().invert_xaxis()  # Invert the x-axis
        ax.set_xlabel('Dealer\'s Showing Card')  # Inverted label
        ax.set_ylabel('Player\'s Current Sum')  # Inverted label
        ax.grid(color='w', linestyle='-', linewidth=1)
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="5%", pad=0.1)
        cbar = plt.colorbar(surf, ticks=[0, 1], cax=cax)
        cbar.ax.set_yticklabels(['0 (STICK)', '1 (HIT)'])


            
    fig = plt.figure(figsize=(15, 15))
    ax = fig.add_subplot(121)
    ax.set_title('Usable Ace')
    get_figure(True, ax)
    ax = fig.add_subplot(122)
    ax.set_title('No Usable Ace')
    get_figure(False, ax)
    plt.show()