In [54]:
import gym

# Create the Blackjack environment
env = gym.make('Blackjack-v1')


  deprecation(
  deprecation(


In [55]:
def basic_policy(state):
    """
    A simple policy that hits if the player's sum is less than 20, else holds.
    """
    player_sum, dealer_sum, usable_ace = state
    return 0 if player_sum >= 20 else 1  # 0 = hold, 1 = hit

def play_episode(policy, env):
    """
    Plays an episode of Blackjack using the given policy.

    Args:
        policy: A function that takes a state and returns an action (0=hold, 1=hit).
        env: The Blackjack environment.

    Returns:
        A tuple (states, actions, rewards) for the episode.
    """
    states = []
    actions = []
    rewards = []

    state = env.reset()
    while True:
        states.append(state)
        action = policy(state)
        actions.append(action)
        next_state, reward, done, _ = env.step(action)
        rewards.append(reward)
        state = next_state
        if done:
            break

    return states, actions, rewards


In [56]:
from collections import defaultdict

def monte_carlo_policy_evaluation(policy, env, num_episodes, discount_factor=1.0):
    """
    Evaluate a policy using Monte Carlo sampling.

    Args:
        policy: A function that takes a state and returns an action.
        env: The environment.
        num_episodes: Number of episodes to sample.
        discount_factor: Discount factor for future rewards.

    Returns:
        A dictionary mapping state to value.
    """
    # Store returns for each state
    returns_sum = defaultdict(float)
    returns_count = defaultdict(int)
    V = defaultdict(float)

    for _ in range(num_episodes):
        # Generate an episode using the policy
        episode = play_episode(policy, env)
        states, _, rewards = episode

        # Calculate returns
        G = 0
        for t in reversed(range(len(states))):
            G = rewards[t] + discount_factor * G
            state = states[t]
            # First visit Monte Carlo: only consider first time state is visited in episode
            if state not in states[:t]:
                returns_sum[state] += G
                returns_count[state] += 1
                V[state] = returns_sum[state] / returns_count[state]

    return V

# Example usage: Evaluate the initial policy with many episodes
V = monte_carlo_policy_evaluation(basic_policy, env, num_episodes=5000)


  if not isinstance(terminated, (bool, np.bool8)):


In [57]:
V

defaultdict(float,
            {(18, 2, False): -0.631578947368421,
             (13, 2, False): -0.5555555555555556,
             (19, 1, False): -0.8333333333333334,
             (16, 1, False): -0.8793103448275862,
             (17, 9, False): -0.7746478873239436,
             (11, 9, False): 0.03571428571428571,
             (6, 9, False): -0.42857142857142855,
             (18, 10, False): -0.7188940092165899,
             (11, 10, False): -0.08421052631578947,
             (20, 2, False): 0.7076923076923077,
             (10, 2, False): -0.2777777777777778,
             (8, 2, False): -0.5555555555555556,
             (20, 9, False): 0.7195121951219512,
             (20, 5, False): 0.6582278481012658,
             (17, 8, False): -0.56,
             (21, 9, True): 1.0,
             (15, 10, False): -0.6740331491712708,
             (21, 8, False): 0.9347826086956522,
             (14, 8, False): -0.3333333333333333,
             (18, 3, False): -0.84,
             (19, 9, False):

In [58]:
def greedy_policy_from_value_function(V, env):
    """
    Create a greedy policy based on the given value function.

    Args:
        V: A dictionary mapping state to value.
        env: The environment.

    Returns:
        A policy function that maps state to action.
    """
    def policy(state):
        # Get the player's sum and other details from the state
        player_sum, dealer_card, usable_ace = state

        # Calculate the value of holding (i.e., not taking another card)
        hold_value = V[state] if state in V else 0

        # Calculate the value of hitting (i.e., taking another card)
        hit_value = float('-inf')  # Start with the lowest possible value

        if player_sum < 21:  # If the player's sum is less than 21, consider hitting
            for card_value in range(1, 11):  # Possible values of the next card
                next_player_sum = player_sum + card_value

                # If adding the card causes the player to go bust (over 21) with a usable ace, reduce the sum by 10
                if next_player_sum > 21 and usable_ace:
                    next_player_sum -= 10
                    usable_ace = False

                # Check the value of the next state
                next_state = (next_player_sum, dealer_card, usable_ace)
                if next_state in V:
                    hit_value = max(hit_value, V[next_state])

        # Return 1 for "hit" if hitting provides a higher value, otherwise return 0 for "hold"
        return 1 if hit_value > hold_value else 0

    return policy


In [59]:
def evaluate_policy(policy, env, num_episodes):
    """
    Evaluate the performance of a policy by running multiple episodes.

    Args:
        policy: A policy function that maps state to action.
        env: The environment.
        num_episodes: Number of episodes to evaluate.

    Returns:
        The average return over the episodes.
    """
    total_return = 0

    for _ in range(num_episodes):
        states, _, rewards = play_episode(policy, env)
        total_return += sum(rewards)

    return total_return / num_episodes

# Evaluate the optimal policy
num_test_episodes = 1000
average_return = evaluate_policy(improved_policy, env, num_test_episodes)

print(f"Average return of the optimal policy over {num_test_episodes} episodes: {average_return:.2f}")


Average return of the optimal policy over 1000 episodes: -0.53


In [61]:
def monte_carlo_policy_iteration(env, num_episodes, discount_factor=1, tol=1e-10):
    """
    Perform Monte Carlo Policy Iteration to find an optimal policy.

    Args:
        env: The environment.
        num_episodes: Number of episodes to sample for policy evaluation.
        discount_factor: Discount factor for future rewards.
        tol: Convergence tolerance for policy improvement.

    Returns:
        A tuple (optimal_policy, optimal_value_function).
    """
    # Initialize a random policy
    policy = basic_policy  # initial policy
    V = defaultdict(float)
    counter = 0
    while True:
        counter += 1

        average_return = evaluate_policy(policy, env, num_test_episodes)

        print(f"Iteration number {counter} trained model for {num_episodes} episodes, Tested for {num_test_episodes} has average return {average_return:.2f}")
        if(counter >100):
          break
        # Policy Evaluation
        V_new = monte_carlo_policy_evaluation(policy, env, num_episodes, discount_factor)

        # Policy Improvement
        new_policy = greedy_policy_from_value_function(V_new, env)



        # Check for convergence
        if max(abs(V_new[state] - V[state]) for state in V_new) < tol:
            break


        # Update policy and value function
        policy = new_policy
        V = V_new

    return policy, V

# Perform Monte Carlo Policy Iteration
improved_policy, optimal_value_function = monte_carlo_policy_iteration(env, num_episodes=5000)


Iteration number 1 trained model for 5000 episodes, Tested for 1000 has average return -0.34
Iteration number 2 trained model for 5000 episodes, Tested for 1000 has average return -0.63
Iteration number 3 trained model for 5000 episodes, Tested for 1000 has average return -0.62
Iteration number 4 trained model for 5000 episodes, Tested for 1000 has average return -0.62
Iteration number 5 trained model for 5000 episodes, Tested for 1000 has average return -0.66
Iteration number 6 trained model for 5000 episodes, Tested for 1000 has average return -0.65
Iteration number 7 trained model for 5000 episodes, Tested for 1000 has average return -0.63
Iteration number 8 trained model for 5000 episodes, Tested for 1000 has average return -0.67
Iteration number 9 trained model for 5000 episodes, Tested for 1000 has average return -0.65
Iteration number 10 trained model for 5000 episodes, Tested for 1000 has average return -0.67
Iteration number 11 trained model for 5000 episodes, Tested for 1000 

In [62]:
V

  and should_run_async(code)


defaultdict(float,
            {(18, 2, False): -0.631578947368421,
             (13, 2, False): -0.5555555555555556,
             (19, 1, False): -0.8333333333333334,
             (16, 1, False): -0.8793103448275862,
             (17, 9, False): -0.7746478873239436,
             (11, 9, False): 0.03571428571428571,
             (6, 9, False): -0.42857142857142855,
             (18, 10, False): -0.7188940092165899,
             (11, 10, False): -0.08421052631578947,
             (20, 2, False): 0.7076923076923077,
             (10, 2, False): -0.2777777777777778,
             (8, 2, False): -0.5555555555555556,
             (20, 9, False): 0.7195121951219512,
             (20, 5, False): 0.6582278481012658,
             (17, 8, False): -0.56,
             (21, 9, True): 1.0,
             (15, 10, False): -0.6740331491712708,
             (21, 8, False): 0.9347826086956522,
             (14, 8, False): -0.3333333333333333,
             (18, 3, False): -0.84,
             (19, 9, False):