# Benchmarking RL methods to solve BlackJack


Thomas Lemercier & Gaspard Berthelier

## Imports

In [1]:
from google.colab import drive
drive.mount("/content/drive")
%cd /content/drive/MyDrive/RL/blackjack

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/RL/blackjack


In [2]:
import gym
import time
from tqdm.notebook import tqdm

In [3]:
from src.utils.general import state_to_index, get_num_states
from src.utils.data_struct import Transition
from src.agents import Agent
from src.utils.visualization import plot_policy_simple_blackjack


def play_episode(env: gym.Env, agent: Agent, render: bool = False, array_encoding: bool = False):
    """plays one episode of given agent"""
    state = env.reset()
    if render:
        env.render()
    if array_encoding:
        state = state_to_array_encoding(state, env.observation_space)
    else:
        state = state_to_index(state, env.observation_space)
    terminated = False
    while not terminated:
        action = agent.get_best_action(state)
        next_state, reward, terminated, _, _ = env.step(action)
        if render:
            env.render()
        if array_encoding:
            next_state = state_to_array_encoding(next_state, env.observation_space)
        else:
            next_state = state_to_index(next_state, env.observation_space)
        state = next_state
    return reward

  and should_run_async(code)


In [4]:
n_episodes = 500_000
n_test_episodes = 10_000

def play_policy(env: gym.Env, agent: Agent, n_test_episodes: int,  array_encoding: bool = False):
  """test the agent's policy on multiple episodes, prints the last one"""
  rewards = [play_episode(env, agent, render= i == n_test_episodes-1, array_encoding=array_encoding) for i in range(n_test_episodes)]
  n_wins = sum(reward == 1 for reward in rewards)
  n_draws = sum(reward == 0 for reward in rewards)
  print(f"Win rate: {n_wins / n_test_episodes:.2f}")
  print(f"Draw rate: {n_draws / n_test_episodes:.2f}")

  and should_run_async(code)


## Infinite Deck

Infinite deck, aka probabilities of drawing each card remains constant

In [5]:
from src.envs import InfiniteSimpleBlackjack
env = InfiniteSimpleBlackjack(seed=42)

### Random

Random agent which takes action randomly at each step

In [6]:
from src.agents import RandomAgent

def main_random(env: gym.Env, agent: RandomAgent, n_episodes: int):
    start = time.time()
    n_wins = 0
    n_draws = 0

    for _ in tqdm(range(n_episodes)):
        state = env.reset()
        terminated = False
        while not terminated:
            action = agent.act(state)
            state, reward, terminated, _, _ = env.step(action)

        if reward == 1:
            n_wins += 1
        elif reward == 0:
            n_draws += 1

    print(f"Win rate: {n_wins / n_episodes:.2f}")
    print(f"Draw rate: {n_draws / n_episodes:.2f}")
    print(f"\nTime taken: {time.time() - start:.2f} seconds")

In [7]:
agent = RandomAgent(env.action_space, seed=42)
main_random(env, agent, n_episodes)

  0%|          | 0/500000 [00:00<?, ?it/s]

Win rate: 0.28
Draw rate: 0.03

Time taken: 10.42 seconds


### Sarsa

In [8]:
from src.agents import SarsaAgent

def main_sarsa(env: gym.Env, agent: SarsaAgent, n_episodes: int):
    start = time.time()
    n_wins = 0
    n_draws = 0
    for _ in tqdm(range(n_episodes)):
        state = env.reset()
        state = state_to_index(state, env.observation_space)
        terminated = False
        action = agent.act(state)
        while not terminated:
            next_state, reward, terminated, _, _ = env.step(action)
            next_state = state_to_index(next_state, env.observation_space)

            transition = Transition(state=state, action=action, next_state=next_state, reward=reward, done=terminated)

            action = agent.act(next_state)
            state = next_state
            agent.step(transition, action)

        if reward == 1:
            n_wins += 1
        elif reward == 0:
            n_draws += 1

    print(f"Win rate: {n_wins / n_episodes:.2f}")
    print(f"Draw rate: {n_draws / n_episodes:.2f}")
    print(f"\nTime taken: {time.time() - start:.2f} seconds")

In [9]:
from src.explorations import EpsilonGreedy, UCB
from src.utils.data_struct import SarsaParameters

# exploration = EpsilonGreedy(epsilon=0.8, decay=0.999999, seed=42)
exploration = UCB(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n, seed=42)
qlearning_parameters = SarsaParameters(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n)
agent = SarsaAgent(qlearning_parameters, exploration)

main_sarsa(env, agent, n_episodes)

  0%|          | 0/500000 [00:00<?, ?it/s]

ValueError: invalid entry in coordinates array

In [None]:
env.observation_space.high - env.observation_space.low + 1

In [None]:
policy = agent.get_policy()
plot_policy_simple_blackjack(policy, env.observation_space)

play_policy(env, agent, n_test_episodes)

### Q Learning

In [None]:
from src.agents import QlearningAgent

def main_qlearning(env: gym.Env, agent: QlearningAgent, n_episodes: int):
    start = time.time()
    n_wins = 0
    n_draws = 0
    for _ in tqdm(range(n_episodes)):
        state = env.reset()
        state = state_to_index(state, env.observation_space)
        terminated = False
        while not terminated:
            action = agent.act(state)
            next_state, reward, terminated, _, _ = env.step(action)
            next_state = state_to_index(next_state, env.observation_space)

            transition = Transition(state=state, action=action, next_state=next_state, reward=reward, done=terminated)

            state = next_state
            agent.step(transition)

        if reward == 1:
            n_wins += 1
        elif reward == 0:
            n_draws += 1

    print(f"Win rate: {n_wins / n_episodes:.2f}")
    print(f"Draw rate: {n_draws / n_episodes:.2f}")
    print(f"\nTime taken: {time.time() - start:.2f} seconds")

In [None]:
from src.utils.data_struct import QlearningParameters

# exploration = EpsilonGreedy(epsilon=0.8, decay=0.999999, seed=42)
exploration = UCB(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n, seed=42)
qlearning_parameters = QlearningParameters(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n)
agent = QlearningAgent(qlearning_parameters, exploration)

main_qlearning(env, agent, n_episodes)

In [None]:
policy = agent.get_policy()
plot_policy_simple_blackjack(policy, env.observation_space)

play_policy(env, agent, n_test_episodes)

## Simple Finite

In [None]:
from src.envs import SimpleBlackjack
env = SimpleBlackjack(seed=42,packs=1)

### Random

In [None]:
agent = RandomAgent(env.action_space, seed=42)
main_random(env, agent, n_episodes)

In [None]:
play_policy(env, agent, n_test_episodes)

### Sarsa

In [None]:
# exploration = EpsilonGreedy(epsilon=0.8, decay=0.999999, seed=42)
exploration = UCB(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n, seed=42)
qlearning_parameters = SarsaParameters(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n)
agent = SarsaAgent(qlearning_parameters, exploration)

main_sarsa(env, agent, n_episodes)

In [None]:
policy = agent.get_policy()
plot_policy_simple_blackjack(policy, env.observation_space)

play_policy(env, agent, n_test_episodes)

### Q Learning

In [None]:
# exploration = EpsilonGreedy(epsilon=0.8, decay=0.999999, seed=42)
exploration = UCB(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n, seed=42)
qlearning_parameters = QlearningParameters(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n)
agent = QlearningAgent(qlearning_parameters, exploration)

main_qlearning(env, agent, n_episodes)

In [None]:
policy = agent.get_policy()
plot_policy_simple_blackjack(policy, env.observation_space)

play_policy(env, agent, n_test_episodes)

## Complete

In [None]:
from src.envs import Blackjack
env = Blackjack(seed=42,packs=1)

### Random

In [None]:
agent = RandomAgent(env.action_space, seed=42)
main_random(env, agent, n_episodes)

In [None]:
play_policy(env, agent, n_test_episodes)

### Sarsa

In [None]:
# exploration = EpsilonGreedy(epsilon=0.8, decay=0.999999, seed=42)
exploration = UCB(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n, seed=42)
qlearning_parameters = SarsaParameters(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n)
agent = SarsaAgent(qlearning_parameters, exploration)

main_sarsa(env, agent, n_episodes)

In [None]:
policy = agent.get_policy()
plot_policy_simple_blackjack(policy, env.observation_space)

play_policy(env, agent, n_test_episodes)

### Q Learning

In [None]:
# exploration = EpsilonGreedy(epsilon=0.8, decay=0.999999, seed=42)
exploration = UCB(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n, seed=42)
qlearning_parameters = QlearningParameters(num_states=get_num_states(env.observation_space), num_actions=env.action_space.n)
agent = QlearningAgent(qlearning_parameters, exploration)

main_qlearning(env, agent, n_episodes)

In [None]:
policy = agent.get_policy()
plot_policy_simple_blackjack(policy, env.observation_space)

play_policy(env, agent, n_test_episodes)

### DQN

In [None]:
from src.agents.dqn import DQN
from src.utils.general import state_to_array_encoding, get_input_dim_encoding

def main_dqn(env: gym.Env, agent: DQN, n_episodes: int):
    start = time.time()
    n_wins = 0
    n_draws = 0
    for _ in tqdm(range(n_episodes)):
        state = env.reset()
        state = state_to_array_encoding(state, env.observation_space)
        terminated = False
        while not terminated:
            action = agent.act(state)
            next_state, reward, terminated, _, _ = env.step(action)

            next_state = state_to_array_encoding(next_state, env.observation_space)
            action = torch.Tensor([action]).long()
            reward = torch.Tensor([reward]).float()
            terminated = torch.Tensor([terminated]).float()

            transition = Transition(state=state, action=action, next_state=next_state, reward=reward, done=terminated)

            state = next_state
            agent.step(transition)

        if reward == 1:
            n_wins += 1
        elif reward == 0:
            n_draws += 1

    print(f"Win rate: {n_wins / n_episodes:.2f}")
    print(f"Draw rate: {n_draws / n_episodes:.2f}")

    print(f"\nTime taken: {time.time() - start:.2f} seconds")

In [None]:
import torch
from src.utils.data_struct import DQNParameters
from src.networks import MLP
from src.utils.buffer import ReplayBuffer

params = DQNParameters()
q_network = MLP(get_input_dim_encoding(env.observation_space), 64, env.action_space.n).to(params.device)
target_network = MLP(get_input_dim_encoding(env.observation_space), 64, env.action_space.n).to(params.device)
target_network.load_state_dict(q_network.state_dict())
replay_buffer = ReplayBuffer(10_000)
optimizer = torch.optim.AdamW(q_network.parameters(), lr=0.001, weight_decay=0.0001)
criterion = torch.nn.MSELoss()
exploration = EpsilonGreedy(0.3, 0.9)

agent = DQN(q_network, target_network, replay_buffer, optimizer, criterion, exploration, params)
main_dqn(env, agent, n_episodes)

In [None]:
policy = agent.get_policy()
play_policy(env, agent, n_test_episodes, array_encoding=True)