Environment setup

In [1]:
import typing as tp
import numpy as np
import gymnasium as gym

cards = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10]
card_names = ['A', '2', '3', '4', '5', '6', '7', '8', '9', '10']

player_scores_axis = np.arange(12, 22, 1)
dealer_card_offset = 1
player_scores_offset = 12

def default_player_policy_func(dealer_card: int, player_scores: int, player_has_ace: bool) -> int:
    if player_has_ace:
        if player_scores < 20:
            return 1
    else:
        if player_scores < 16:
            return 1
    return 0

policy_grids: tp.Dict[str, np.ndarray] = {
    'no_ace': np.zeros((10, 10), dtype=int),
    'ace': np.zeros((10, 10), dtype=int)
}

for dealer_card in range(1, 10):
    for player_scores in range(12, 22):
        cell_idx = (dealer_card - dealer_card_offset, player_scores - player_scores_offset)
        policy_grids['no_ace'][cell_idx] = default_player_policy_func(dealer_card, player_scores, False)
        policy_grids['ace'][cell_idx] = default_player_policy_func(dealer_card, player_scores, True)
        
def take_action_from_policy_grid(dealer_card: int, player_scores: int, player_has_ace: bool) -> int:
    global policy_grids
    if player_scores < player_scores_offset:
        return 1
    cell_idx = (dealer_card - dealer_card_offset, player_scores - player_scores_offset)
    assert cell_idx[0] >= 0 and cell_idx[1] >= 0
    action = policy_grids['ace' if player_has_ace else 'no_ace'][cell_idx]
    return action


In [2]:
from tqdm import tqdm
import cv2
import imageio

def estimate_policy(n_experiments: int = 100_000, out_file_name = None, render_every_n_episode: int = 1_000) -> float:
    
    env = gym.make("Blackjack-v1", sab=True, render_mode='rgb_array' if out_file_name is not None else None)
    
    frames = []
    
    sum_reward = 0
    for ep_idx in tqdm(range(n_experiments)):
        
        if out_file_name is not None and ep_idx % render_every_n_episode == 0:
            render_flag = True
            cur_avg_reward = (sum_reward / ep_idx) if ep_idx > 0 else 0
        else:
            render_flag = False
    
        obs, info = env.reset()
        done = False
        ep_reward = 0

        # play one episode
        while not done:
            player_scores, dealer_card, player_has_ace = obs
            
            action = take_action_from_policy_grid(dealer_card, player_scores, player_has_ace)
            next_obs, reward, terminated, truncated, info = env.step(action)
            ep_reward += reward
            
            done = terminated or truncated
            obs = next_obs
            
            if render_flag:
                frame = env.render()
                cv2.putText(frame, f'Mean reward: {cur_avg_reward:.2f}', (230, 40),
                            cv2.FONT_HERSHEY_SIMPLEX, 1.1, (255, 255, 0), 2, 2)
                frames.append(frame)
        sum_reward += ep_reward
    
    if out_file_name is not None:
        imageio.mimsave(out_file_name, frames)
    
    return sum_reward / n_experiments

default_policy_reward = estimate_policy(out_file_name="default_policy_game.mp4")
default_policy_reward

100%|██████████| 100000/100000 [00:13<00:00, 7578.71it/s]


-0.08682

In [3]:
from tqdm import tqdm
from pathlib import Path
import shutil
import imageio
import plotly.graph_objects as go
from plotly.subplots import make_subplots

camera = dict(
    up=dict(x=0, y=0, z=1),
    center=dict(x=0, y=0, z=0),
    eye=dict(x=-1.25, y=1.25, z=1.25)
)

def draw_value_function(title: str, values: tp.Dict[str, np.ndarray]) -> go.Figure:

    fig = make_subplots(rows=1, cols=2,
                        shared_xaxes=False,
                        specs=[[{'type': 'surface'}, {'type': 'surface'}]],
                        subplot_titles=["No ace", "Ace"])

    fig.add_trace(go.Surface(y=card_names, x=player_scores_axis,
                             z=values['no_ace'], colorscale='YlGnBu'), col=1, row=1)
    fig.layout.scene1.camera = camera
    fig.layout.scene1.xaxis.nticks = 9
    fig.layout.scene1.yaxis.nticks = 10
    fig.add_trace(go.Surface(y=cards, x=player_scores_axis, z=values['ace'], colorscale='YlGnBu'), col=2, row=1)
    fig.layout.scene2.camera = camera
    fig.layout.scene2.xaxis.nticks = 10
    fig.layout.scene2.yaxis.nticks = 10
    fig.update_layout(scene_camera=camera, title=title,
                      margin=dict(r=25, l=25, b=10, t=80),
                      width=1000,
                      showlegend=False)
    fig.update_scenes(xaxis_title_text='Player',  
                      yaxis_title_text='Dealer',  
                      zaxis_title_text='Reward')
    return fig


def estimate_value_function(num_episodes: int = 300_000, frame_step: int = 3_000):
    
    env = gym.make("Blackjack-v1", sab=True)

    temp_dir = Path('tmp')
    if temp_dir.exists():
        shutil.rmtree(temp_dir)
    temp_dir.mkdir()
    
    value_grid_count: tp.Dict[str, np.ndarray] = {
        'no_ace': np.zeros((10, 10), dtype=int),
        'ace': np.zeros((10, 10), dtype=int)
    }
    value_grid: tp.Dict[str, np.ndarray] = {
        'no_ace': np.zeros((10, 10), dtype=float),
        'ace': np.zeros((10, 10), dtype=float)
    }
    
    # fill a state with a score of 21 as a winning position
    value_grid['no_ace'][:, -1] = 1
    value_grid['ace'][:, -1] = 1
    
    image_paths = []
    
    for ep_idx in tqdm(range(num_episodes + frame_step)):
    
        obs, info = env.reset()
        done = False

        # play one episode
        while not done:
            player_scores, dealer_card, player_has_ace = obs
            action = take_action_from_policy_grid(dealer_card, player_scores, player_has_ace)
            
            next_obs, reward, terminated, truncated, info = env.step(action)
            
            done = terminated or truncated
            
            if player_scores >= player_scores_offset:
                
                v_g_count = value_grid_count['ace' if player_has_ace else 'no_ace']
                v_g = value_grid['ace' if player_has_ace else 'no_ace']
                
                cell_idx = (dealer_card - dealer_card_offset, player_scores - player_scores_offset)
                
                N = v_g_count[cell_idx] + 1
                v_g_count[cell_idx] = N
                if done:
                    td_target = reward
                else:
                    next_player_scores, next_dealer_card, next_player_has_ace = next_obs
                    next_v_g = value_grid['ace' if next_player_has_ace else 'no_ace']
                
                    next_cell_idx = (next_dealer_card - dealer_card_offset, next_player_scores - player_scores_offset)
                    td_target = reward + next_v_g[next_cell_idx]
                v_g[cell_idx] += (td_target - v_g[cell_idx]) / N
                
            obs = next_obs
        
        if ep_idx % frame_step == 0:
            ep_name = str(ep_idx) if ep_idx > 0 else '1'
            fig = draw_value_function(f'Value function on step: {ep_name}', value_grid)

            image_path = temp_dir / f'{ep_name}.png'
            fig.write_image(image_path)
            image_paths.append(imageio.imread(image_path))
            
    imageio.mimsave('blackjack_default_value_function.mp4', image_paths, fps=20)

estimate_value_function()



100%|██████████| 303000/303000 [02:54<00:00, 1733.53it/s]


In [12]:
import math
import random
    
q_grid: tp.Dict[str, np.ndarray] = {
    'no_ace': np.zeros((10, 10, 2), dtype=float),
    'ace': np.zeros((10, 10, 2), dtype=float)
}
    
# fill a state with a score of 21 as a winning position
q_grid['no_ace'][:, -1, :] = [1, 0]
q_grid['ace'][:, -1, :] = [1, 0]

def estimate_q_function(update_factor: float = 1e-3, num_episodes: int = 10_000_000, frame_step: int = 100_000):
    
    global q_grid
    
    env = gym.make("Blackjack-v1", sab=True)

    temp_dir = Path('tmp')
    if temp_dir.exists():
        shutil.rmtree(temp_dir)
    temp_dir.mkdir()
    
    eps_from = 1.0
    eps_to = 1e-2
    n_epochs_of_decays = math.ceil(num_episodes * 0.95)
    
    image_paths = []
    
    for ep_idx in tqdm(range(num_episodes + frame_step)):
        if ep_idx > n_epochs_of_decays:
            eps_greedy_coeff = 0.0
        else:
            step_coeff = min(max(ep_idx / n_epochs_of_decays, 0.0), 1.0)
            eps_greedy_coeff = eps_from * math.exp(math.log(eps_to / eps_from) * step_coeff)
    
        obs, info = env.reset()
        done = False

        while not done:
            player_scores, dealer_card, player_has_ace = obs
            cell_idx = (dealer_card - dealer_card_offset, player_scores - player_scores_offset)
            q = q_grid['ace' if player_has_ace else 'no_ace']
            if random.uniform(0, 1) < eps_greedy_coeff:
                action = np.random.choice([0, 1])
            else:
                action = q[cell_idx].argmax()
            
            next_obs, reward, terminated, truncated, info = env.step(action)
            
            done = terminated or truncated
            
            if player_scores >= player_scores_offset:
                if terminated:
                    td_target = reward
                else:
                    next_player_scores, next_dealer_card, next_player_has_ace = next_obs
                    next_cell_idx = (next_dealer_card - dealer_card_offset, next_player_scores - player_scores_offset)
                    next_q = q_grid['ace' if next_player_has_ace else 'no_ace']
                    
                    td_target = reward + next_q[next_cell_idx].max()
                q[cell_idx][action] += (td_target - q[cell_idx][action]) * update_factor
                
            obs = next_obs
        
        if ep_idx % frame_step == 0:
            ep_name = str(ep_idx) if ep_idx > 0 else '1'
            
            values = {
                'no_ace': q_grid['no_ace'].max(axis=2),
                'ace': q_grid['ace'].max(axis=2)
            }
            fig = draw_value_function(f'Value-function from Q-function on step: {ep_name}', values)

            image_path = temp_dir / f'{ep_name}.png'
            fig.write_image(image_path)
            image_paths.append(imageio.imread(image_path))
            
    imageio.mimsave('blackjack_optimal_value_function.mp4', image_paths)

estimate_q_function()



100%|██████████| 10100000/10100000 [22:42<00:00, 7415.55it/s] 


In [13]:
policy_grids['no_ace'] = q_grid['no_ace'].argmax(axis=2)
policy_grids['ace'] = q_grid['ace'].argmax(axis=2)

new_policy_reward = estimate_policy(out_file_name='optimal_policy_game.mp4')
new_policy_reward

100%|██████████| 100000/100000 [00:12<00:00, 7919.74it/s]


-0.04101

In [22]:
import plotly.express as px

fig = make_subplots(rows=1, cols=2, subplot_titles=("No ace", "Ace"))
dealer_axis = np.linspace(1, 10, 10)
colorscale = [[0, "rgb(150,150,150)"], [1, "rgb(155,230,155)"]]

annotations = ['Stick', 'Hit']
policy_texts = {
    'no_ace': [[annotations[policy_grids['no_ace'][i, j]] for j in range(10)] for i in range(10)],
    'ace': [[annotations[policy_grids['ace'][i, j]] for j in range(10)] for i in range(10)]
}

fig.add_trace(go.Heatmap(x=player_scores_axis, y=card_names, z=policy_grids['no_ace'],
                         colorscale=colorscale,
                         text=policy_texts['no_ace'], texttemplate="%{text}"), 1, 1)
fig.add_trace(go.Heatmap(x=player_scores_axis, y=card_names, z=policy_grids['ace'],
                         colorscale=colorscale,
                         text=policy_texts['no_ace'], texttemplate="%{text}"), 1, 2)
fig.update_layout(width=800, height=400)
fig.update_scenes(xaxis_title_text='Dealer', yaxis_title_text='Player')
fig.update_traces(showscale=False)
fig.update_xaxes(showline=True, linewidth=1, title='Player scores', dtick=1)
fig.update_yaxes(showline=True, linewidth=1, title='Dealer scores', dtick=1)


fig.show()
