# Train a neural network to optimally play blackjack.
### Author: T Lindauer

In [67]:
import collections
import statistics
from typing import Any

import numpy as np
import tensorflow as tf

In [68]:
from BlackjackEnv import BlackjackEnv

Define first model used

In [230]:
modelOne = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(40,)),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='softmax')
])

Define second model used

In [231]:
modelTwo = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(40,)),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='softmax')
])

In [71]:
def reward(hand_score: int) -> float:
    return min(hand_score / 21, 0 if hand_score > 21 else 1)

In [72]:
def gen_sparse_matrix(num: int):
    return [0] * (num - 1) + [1] + [0] * (40 - num)

Training loop

In [235]:
def run_episode(env: BlackjackEnv, initial_state_0: tf.Tensor, initial_state_1: tf.Tensor):

    initial_state_shape = initial_state_0.shape
    state_0 = initial_state_0
    state_1 = initial_state_1

    action_probs_0 = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values_0 = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards_0 = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

    action_probs_1 = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    values_1 = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
    rewards_1 = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

    for t in range(0, 3):
        # state_0 = tf.expand_dims(state_0, -1)
        # state_1 = tf.expand_dims(state_1, -1)

        state_0 = state_0[np.newaxis, :]
        state_1 = state_1[np.newaxis, :]

        print(state_0.shape)
        print(state_1.shape)

        action_logits_t_0, value_0 = modelOne(state_0, 1)
        action_logits_t_1, value_1 = modelTwo(state_1, 1)


        values_0 = values_0.write(t, tf.squeeze(value_0))
        values_1 = values_1.write(t, tf.squeeze(value_1))

        action_probs_t_0 = tf.nn.softmax(action_logits_t_0)
        action_probs_t_1 = tf.nn.softmax(action_logits_t_1)

        action_0 = tf.random.categorical(action_logits_t_0, 1)[0, 0]
        action_1 = tf.random.categorical(action_logits_t_1, 1)[0, 0]

        action_probs_0 = action_probs_0.write(t, action_probs_t_0[0, action_0])
        action_probs_1 = action_probs_1.write(t, action_probs_t_1[0, action_1])

        state_0.set_shape(initial_state_shape)
        state_1.set_shape(initial_state_shape)

        translated_action_0 = round(action_0)
        translated_action_1 = round(action_1)
        if translated_action_0:
            env.add_card(0)
        if translated_action_1:
            env.add_card(1)

        state_0 = gen_sparse_matrix(env.hands[0])
        state_1 = gen_sparse_matrix(env.hands[1])
        # state_0 = env.hands[0] / 40
        # state_1 = env.hands[1] / 40

        rewards_0 = rewards_0.write(t, reward(env.hands[0]))
        rewards_1 = rewards_1.write(t, reward(env.hands[1]))

    action_probs_0 = action_probs_0.stack()
    action_probs_1 = action_probs_1.stack()

    values_0 = values_0.stack()
    values_1 = values_1.stack()

    rewards_0 = rewards_0.stack()
    rewards_1 = rewards_1.stack()

    return action_probs_0, action_probs_1, values_0, values_1, rewards_0, rewards_1

In [193]:
eps = np.finfo(np.float32).eps.item()


def get_expected_return(
        rewards: tf.Tensor,
        gamma: float,
        standardize: bool = True) -> tf.Tensor:
    """Compute expected returns per timestep."""

    n = tf.shape(rewards)[0]
    returns = tf.TensorArray(dtype=tf.float32, size=n)

    # Start from the end of `rewards` and accumulate reward sums
    # into the `returns` array
    rewards = tf.cast(rewards[::-1], dtype=tf.float32)
    discounted_sum = tf.constant(0.0)
    discounted_sum_shape = discounted_sum.shape
    for i in tf.range(n):
        reward = rewards[i]
        discounted_sum = reward + gamma * discounted_sum
        discounted_sum.set_shape(discounted_sum_shape)
        returns = returns.write(i, discounted_sum)
    returns = returns.stack()[::-1]

    if standardize:
        returns = ((returns - tf.math.reduce_mean(returns)) /
                   (tf.math.reduce_std(returns) + eps))

    return returns

In [194]:
def compute_loss(
        action_probs: tf.Tensor,
        values: tf.Tensor,
        returns: tf.Tensor) -> tf.Tensor:
    """Computes the combined actor_critic loss."""
    huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

    advantage = returns - values

    action_log_probs = tf.math.log(action_probs)
    actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)

    critic_loss = huber_loss(values, returns)

    return actor_loss + critic_loss

In [195]:
@tf.function
def train_step(env: BlackjackEnv, initial_state_0: tf.Tensor, initial_state_1: tf.Tensor, optimizer_0: tf.keras.optimizers.Optimizer,
               optimizer_1: tf.keras.optimizers.Optimizer, gamma: float) -> tuple[Any, Any]:
    with tf.GradientTape() as tape:
        action_probs_0, action_probs_1, values_0, values_1, rewards_0, rewards_1 = run_episode(env, initial_state_0, initial_state_1)

        returns_0 = get_expected_return(rewards_0, gamma)
        returns_1 = get_expected_return(rewards_1, gamma)

        action_probs_0, values_0, returns_0 = [tf.expand_dims(x, 1) for x in [action_probs_0, values_0, returns_0]]
        action_probs_1, values_1, returns_1 = [tf.expand_dims(x, 1) for x in [action_probs_1, values_1, returns_1]]

        loss_0 = compute_loss(action_probs_0, values_0, returns_0)
        loss_1 = compute_loss(action_probs_1, values_1, returns_1)

    grads_0 = tape.gradient(loss_0, modelOne.trainable_variables)
    grads_1 = tape.gradient(loss_1, modelTwo.trainable_variables)

    optimizer_0.apply_gradients(zip(grads_0, modelOne.trainable_variables))
    optimizer_1.apply_gradients(zip(grads_1, modelTwo.trainable_variables))

    episode_reward_0 = tf.math.reduce_sum(rewards_0)
    episode_reward_1 = tf.math.reduce_sum(rewards_1)

    return episode_reward_0, episode_reward_1


In [196]:
n_training_iters = 10_000

In [197]:
optimizer_0 = tf.keras.optimizers.Adam(learning_rate=0.01)
optimizer_1 = tf.keras.optimizers.Adam(learning_rate=0.01)
gamma = 0.99

In [209]:
def train():
    performance_list_0 = []
    performance_list_1 = []
    episodes_reward_0: collections.deque = collections.deque(maxlen=5)
    episodes_reward_1: collections.deque = collections.deque(maxlen=5)

    for i in range(n_training_iters):
        if i / n_training_iters * 100 == 10:
            print(f"Percent completion: {i / n_training_iters * 100}")

        env = BlackjackEnv(2)

        # initial_state_0 = tf.constant(tf.expand_dims(gen_sparse_matrix(env.hands[0]), 0), dtype=tf.int32)
        # initial_state_1 = tf.constant(tf.expand_dims(gen_sparse_matrix(env.hands[1]), 0), dtype=tf.int32)
        initial_state_0 = tf.constant(gen_sparse_matrix(env.hands[0]), dtype=tf.float32)
        initial_state_1 = tf.constant(gen_sparse_matrix(env.hands[1]), dtype=tf.float32)

        episode_reward_0, episode_reward_1 = train_step(env, initial_state_0, initial_state_1, optimizer_0, optimizer_1, gamma)

        episodes_reward_0.append(int(episode_reward_0))
        episodes_reward_1.append(int(episode_reward_1))

        running_reward_0 = statistics.mean(episode_reward_0)
        running_reward_1 = statistics.mean(episode_reward_1)

        performance_list_0.append(running_reward_0)
        performance_list_1.append(running_reward_1)

In [236]:
train()

(1, 40)
(1, 40)


OperatorNotAllowedInGraphError: in user code:

    File "C:\Users\zenith\AppData\Local\Temp\ipykernel_57388\3042118400.py", line 5, in train_step  *
        action_probs_0, action_probs_1, values_0, values_1, rewards_0, rewards_1 = run_episode(env, initial_state_0, initial_state_1)
    File "C:\Users\zenith\AppData\Local\Temp\ipykernel_57388\2852600450.py", line 25, in run_episode  *
        action_logits_t_0, value_0 = modelOne(state_0, 1)

    OperatorNotAllowedInGraphError: Iterating over a symbolic `tf.Tensor` is not allowed: AutoGraph did convert this function. This might indicate you are trying to use an unsupported feature.
