In [None]:
import random
import numpy as np
from IPython.display import clear_output

random.seed()

In [None]:
class BlackJack():
    def __init__(self, n_decks, n_players, shuffle_every_round=False, shoe_limit=0.3, interactive=True):
        self.deck_standard = [2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10, 11]
        self.n_decks = n_decks
        self.n_players = n_players
        self.deck_game_start = []
        for i in range(self.n_decks * 4):
            self.deck_game_start.extend(self.deck_standard)
        self.deck_game = self.deck_game_start.copy()
        random.shuffle(self.deck_game)
        self.shuffle_every_round = shuffle_every_round
        self.shoe_limit = shoe_limit
        self.interactive = interactive
        self.player_number = 1
    
    def get_score(self, hand):
        n_aces = hand.count(11)
        s = sum(hand)
        while s > 21 and n_aces > 0:
            s -= 10
            n_aces -= 1
        return s
    
    def deal_cards(self):
        self.hand_players = {i: [self.deck_game.pop() for k in range(2)] for i in range(1, self.n_players + 1)}
        self.hand_dealer = [self.deck_game.pop()]
        if self.interactive:
            print('Players\' hands:', self.hand_players)
            print('Dealer\'s hand:', self.hand_dealer)
    
    def player_choice(self, player, choice_ai='s'):
        self.scores[player] = self.get_score(self.hand_players[player + 1])
        if self.interactive: print('Hand:', self.hand_players[player + 1], ', Score:', self.scores[player])
        if self.scores[player] == 21:
            if self.interactive: print('BlackJack!')
            self.rewards[player] = 0
            return True
        if self.scores[player] > 21:
            if self.interactive: print('Player', player + 1, 'loses')
            self.game_result[player] = -1
            return True
        if self.interactive == True:
            choice = input()
        else:
            choice = choice_ai
        if choice == 'h':
            self.hand_players[player + 1].append(self.deck_game.pop())
            return False
        elif choice == 's':
            return True
        else:
            if self.interactive: print('invalid')
    
    def dealer_choice(self):
        self.scores[-1] = self.get_score(self.hand_dealer)
        if self.interactive:
            print('Dealer')
            print('Hand:', self.hand_dealer, ', Score:', self.scores[-1])
        while self.scores[-1] < 17:
            self.hand_dealer.append(self.deck_game.pop())
            self.scores[-1] = self.get_score(self.hand_dealer)
            if self.interactive: print('Hand:', self.hand_dealer, ', Score:', self.scores[-1])
        if self.scores[-1] > 21:
            if self.interactive: print('Dealer busts')
            self.game_result[-1] = -1
            for player in range(self.n_players):
                if self.game_result[player] != -1:
                    self.game_result[player] = 1
                    if self.rewards[player] != 0:
                        self.rewards[player] = 1
    
    def get_result(self):
        if self.game_result[-1] != -1:
            for player in range(self.n_players):
                if self.game_result[player] != -1 and self.scores[player] > self.scores[-1]:
                    self.game_result[player] = 1
                    if self.rewards[player] != 0:
                        self.rewards[player] = 1
                elif self.game_result[player] != -1 and self.scores[player] < self.scores[-1]:
                    self.game_result[player] = -1
                    if self.rewards[player] != 0:
                        self.rewards[player] = -1
                elif self.game_result[player] != -1 and self.scores[player] == self.scores[-1]:
                    self.rewards[player] = 0.5

        if self.interactive:
            for player in range(self.n_players):
                if self.game_result[player] == 1:
                    print('Player', player + 1, 'won')
                elif self.game_result[player] == -1:
                    print('Player', player + 1, 'lost')
                else:
                    print('Player', player + 1, 'tie')
        
    def start(self):
        self.scores = [0 for player in range(self.n_players + 1)]
        self.game_result = [0 for player in range(self.n_players + 1)]
        self.rewards = [-1 for player in range(self.n_players)]
        if self.shuffle_every_round or (len(self.deck_game) <= len(self.deck_game_start) * self.shoe_limit):
            self.deck_game = self.deck_game_start.copy()
            random.shuffle(self.deck_game)
        if self.interactive:
            print('Number of cards in the full deck:', len(game.deck_game_start))
            print('Current number of cards in the deck:', len(game.deck_game))
            print('-----------------------')
            print('Dealing')
        self.deal_cards()
        if self.interactive: print('-----------------------')

    def play(self):
        for player in range(self.n_players):
            if self.interactive:
                print('Player', player + 1)
                print('For hit type "h", for stand type "s"')
            while True:
                player_done = self.player_choice(player)
                if player_done == True:
                    break
            if self.interactive: print('-----------------------')
        if np.abs(sum(self.game_result)) < self.n_players:
            self.dealer_choice()
            if self.interactive: print('-----------------------')
        self.get_result()
        if self.interactive: print('-----------------------')

In [None]:
game = BlackJack(n_decks=1, n_players=1, shuffle_every_round=True, shoe_limit=0.3)
while True:
    clear_output(wait=True)
    game.start()
    game.play()
    print('Play again? (y/n)')
    if input() != 'y':
        break

Number of cards in the full deck: 52
Current number of cards in the deck: 52
-----------------------
Dealing
Players' hands: {1: [10, 5]}
Dealer's hand: [2]
-----------------------
Player 1
For hit type "h", for stand type "s"
Hand: [10, 5] , Score: 15
h
Hand: [10, 5, 2] , Score: 17
h
Hand: [10, 5, 2, 4] , Score: 21
BlackJack!
-----------------------
Dealer
Hand: [2] , Score: 2
Hand: [2, 10] , Score: 12
Hand: [2, 10, 10] , Score: 22
Dealer busts
-----------------------
Player 1 won
-----------------------
Play again? (y/n)
n


train

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
max_steps_per_episode = 10000
game = BlackJack(n_decks=1, n_players=1, shuffle_every_round=True, shoe_limit=0.3, interactive=False)  # Create the environment
eps = np.finfo(np.float32).eps.item()  # Smallest number such that 1.0 + eps != 1.0

num_inputs = 7
num_actions = 2
num_hidden = 14

inputs = layers.Input(shape=(num_inputs,))
common = layers.Dense(num_hidden, activation="relu")(inputs)
action = layers.Dense(num_actions, activation="softmax")(common)
critic = layers.Dense(1)(common)

model_1 = keras.Model(inputs=inputs, outputs=[action, critic])

optimizer = keras.optimizers.Adam(learning_rate=0.1)
huber_loss = keras.losses.Huber()
action_probs_history = []
critic_value_history = []
rewards_history = []
running_reward = 0
episode_count = 0


while True:  # Run until solved
    game.start()
    
    episode_reward = 0

    with tf.GradientTape() as tape:
        for player in range(game.n_players):
            while True:
                state = [game.n_decks,
                        game.n_players,
                        int(game.shuffle_every_round),
                        sum(game.deck_game),
                        player,
                        sum(game.hand_players[game.player_number]),
                        game.hand_dealer[0]]
                state = tf.convert_to_tensor(state)
                state = tf.expand_dims(state, 0)
                action_probs, critic_value = model_1(state)
                critic_value_history.append(critic_value[0, 0])
                action = np.random.choice(num_actions, p=np.squeeze(action_probs))
                action_probs_history.append(tf.math.log(action_probs[0, action]))
                if action == 1:
                    ai_action = 'h'
                else:
                    ai_action = 's'
                player_done = game.player_choice(player=player, choice_ai=ai_action)
                if player_done == True: break
        if np.abs(sum(game.game_result)) < game.n_players:
            game.dealer_choice()
        game.get_result()
        rewards = game.rewards
        rewards_history.extend(rewards)
        episode_reward += sum(rewards)

        # Update running reward to check condition for solving
        running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward

        # Calculate expected value from rewards
        # - At each timestep what was the total reward received after that timestep
        # - Rewards in the past are discounted by multiplying them with gamma
        # - These are the labels for our critic
        returns = []
        discounted_sum = 0
        for r in rewards_history[::-1]:
            discounted_sum = r + gamma * discounted_sum
            returns.insert(0, discounted_sum)

        # Normalize
        returns = np.array(returns)
        returns = (returns - np.mean(returns)) / (np.std(returns) + eps)
        returns = returns.tolist()

        # Calculating loss values to update our network
        history = zip(action_probs_history, critic_value_history, returns)
        actor_losses = []
        critic_losses = []
        for log_prob, value, ret in history:
            # At this point in history, the critic estimated that we would get a
            # total reward = `value` in the future. We took an action with log probability
            # of `log_prob` and ended up recieving a total reward = `ret`.
            # The actor must be updated so that it predicts an action that leads to
            # high rewards (compared to critic's estimate) with high probability.
            diff = ret - value
            actor_losses.append(-log_prob * diff)  # actor loss

            # The critic must be updated so that it predicts a better estimate of
            # the future rewards.
            critic_losses.append(
                huber_loss(tf.expand_dims(value, 0), tf.expand_dims(ret, 0))
            )

        # Backpropagation
        loss_value = sum(actor_losses) + sum(critic_losses)
        grads = tape.gradient(loss_value, model_1.trainable_variables)
        optimizer.apply_gradients(zip(grads, model_1.trainable_variables))

        # Clear the loss and reward history
        action_probs_history.clear()
        critic_value_history.clear()
        # rewards_history.clear()

    # Log details
    episode_count += 1
    if episode_count % 50 == 0:
        template = "running reward: {:.2f} at episode {}"
        print(template.format(running_reward, episode_count))

    if running_reward > 100:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break
    
    if episode_count >= 1000:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break

running reward: -0.68 at episode 50
running reward: -0.78 at episode 100
running reward: -0.90 at episode 150
running reward: -0.91 at episode 200
running reward: -0.87 at episode 250
running reward: -0.89 at episode 300
running reward: -0.85 at episode 350
running reward: -0.85 at episode 400
running reward: -0.76 at episode 450
running reward: -0.78 at episode 500
running reward: -0.92 at episode 550
running reward: -0.74 at episode 600
running reward: -0.85 at episode 650
running reward: -0.86 at episode 700
running reward: -0.73 at episode 750
running reward: -0.93 at episode 800
running reward: -0.83 at episode 850
running reward: -0.84 at episode 900
running reward: -0.91 at episode 950
running reward: -0.78 at episode 1000
Solved at episode 1000!


In [None]:
print(*rewards_history)
print(*returns)

0 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 0.5 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 0 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 0 0 -1 0.5 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 0 -1 -1 -1 0 -1 -1 -1 -1 0 0 -1 -1 -1 0 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 0.5 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 0.5 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 0 0 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 0 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 -1 -1 -1 0 -1 0 0 -1 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 0 -1 -1 0 0.5 -1 -1 -1 -1 -1 -1 0 -1 0 -1 -1 -1 0 -1 -1 -1 -1 0 0 -1 0 0 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 -1 -1 -1 -1 

In [None]:
def win_rate():
    i = 0
    results = []
    while i < 500:
        game.start()
        for player in range(game.n_players):
            while True:
                state = [game.n_decks,
                        game.n_players,
                        int(game.shuffle_every_round),
                        sum(game.deck_game),
                        player,
                        sum(game.hand_players[game.player_number]),
                        game.hand_dealer[0]]
                state = tf.convert_to_tensor(state)
                state = tf.expand_dims(state, 0)
                action = model_1.predict(state)[-1][-1][0]
                if action > 0.5:
                    ai_action = 'h'
                else:
                    ai_action = 's'
                player_done = game.player_choice(player=player, choice_ai=ai_action)
                if player_done == True: break
        if np.abs(sum(game.game_result)) < game.n_players:
            game.dealer_choice()
        game.get_result()
        results.extend(game.game_result[:-1])
        i += 1
        if i % 50 == 0:
            template = "step {}"
            print(template.format(i))
    return results

In [None]:
results = win_rate()

step 50
step 100
step 150
step 200
step 250
step 300
step 350
step 400
step 450
step 500


In [None]:
print('Agent in general')
print('Win rate:', results.count(1) / len(results) * 100)
print('Lose rate:', results.count(-1) / len(results) * 100)
print('Tie rate:', results.count(0) / len(results) * 100)

Agent in general
Win rate: 38.4
Lose rate: 56.599999999999994
Tie rate: 5.0


In [None]:
print('Win rate:', results[::4].count(1) / len(results[::4]) * 100)
print('Lose rate:', results[::4].count(-1) / len(results[::4]) * 100)
print('Tie rate:', results[::4].count(0) / len(results[::4]) * 100)

Win rate: 38.4
Lose rate: 56.8
Tie rate: 4.8


In [None]:
print('Win rate:', results[1::4].count(1) / len(results[1::4]) * 100)
print('Lose rate:', results[1::4].count(-1) / len(results[1::4]) * 100)
print('Tie rate:', results[1::4].count(0) / len(results[1::4]) * 100)

Win rate: 35.199999999999996
Lose rate: 60.0
Tie rate: 4.8


In [None]:
print('Win rate:', results[2::4].count(1) / len(results[2::4]) * 100)
print('Lose rate:', results[2::4].count(-1) / len(results[2::4]) * 100)
print('Tie rate:', results[2::4].count(0) / len(results[2::4]) * 100)

Win rate: 38.4
Lose rate: 52.800000000000004
Tie rate: 8.799999999999999


In [None]:
print('Win rate:', results[3::4].count(1) / len(results[3::4]) * 100)
print('Lose rate:', results[3::4].count(-1) / len(results[3::4]) * 100)
print('Tie rate:', results[3::4].count(0) / len(results[3::4]) * 100)

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
max_steps_per_episode = 10000
game = BlackJack(n_decks=1, n_players=1, shuffle_every_round=True, shoe_limit=0.3, interactive=False)  # Create the environment
eps = np.finfo(np.float32).eps.item()  # Smallest number such that 1.0 + eps != 1.0

num_inputs = 7
num_actions = 2
num_hidden = 14

inputs = layers.Input(shape=(num_inputs,))
common = layers.Dense(num_hidden, activation="relu")(inputs)
action = layers.Dense(num_actions, activation="softmax")(common)
critic = layers.Dense(1)(common)

model_1 = keras.Model(inputs=inputs, outputs=[action, critic])

optimizer = keras.optimizers.Adam(learning_rate=0.1)
huber_loss = keras.losses.Huber()
action_probs_history = []
critic_value_history = []
rewards_history = []
running_reward = 0
episode_count = 0

In [None]:
game.start()

episode_reward = 0

with tf.GradientTape() as tape:
    for player in range(game.n_players):
        while True:
            state = [game.n_decks,
                    game.n_players,
                    int(game.shuffle_every_round),
                    sum(game.deck_game),
                    player,
                    sum(game.hand_players[game.player_number]),
                    game.hand_dealer[0]]
            state = tf.convert_to_tensor(state)
            state = tf.expand_dims(state, 0)
            action_probs, critic_value = model_1(state)
            critic_value_history.append(critic_value[0, 0])
            action = np.random.choice(num_actions, p=np.squeeze(action_probs))
            action_probs_history.append(tf.math.log(action_probs[0, action]))
            if action == 1:
                ai_action = 'h'
            else:
                ai_action = 's'
            player_done = game.player_choice(player=player, choice_ai=ai_action)
            if player_done == True: break
    if np.abs(sum(game.game_result)) < game.n_players:
        game.dealer_choice()
    game.get_result()
    rewards = game.rewards
    rewards_history.extend(rewards)
    episode_reward += sum(rewards)

    # Update running reward to check condition for solving
    running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward

    # Calculate expected value from rewards
    # - At each timestep what was the total reward received after that timestep
    # - Rewards in the past are discounted by multiplying them with gamma
    # - These are the labels for our critic
    returns = []
    discounted_sum = 0
    for r in rewards_history[::-1]:
        discounted_sum = r + gamma * discounted_sum
        returns.insert(0, discounted_sum)

    # Normalize
    returns = np.array(returns)
    returns = (returns - np.mean(returns)) / (np.std(returns) + eps)
    returns = returns.tolist()

    # Calculating loss values to update our network
    history = zip(action_probs_history, critic_value_history, returns)
    actor_losses = []
    critic_losses = []
    for log_prob, value, ret in history:
        # At this point in history, the critic estimated that we would get a
        # total reward = `value` in the future. We took an action with log probability
        # of `log_prob` and ended up recieving a total reward = `ret`.
        # The actor must be updated so that it predicts an action that leads to
        # high rewards (compared to critic's estimate) with high probability.
        diff = ret - value
        actor_losses.append(-log_prob * diff)  # actor loss

        # The critic must be updated so that it predicts a better estimate of
        # the future rewards.
        critic_losses.append(
            huber_loss(tf.expand_dims(value, 0), tf.expand_dims(ret, 0))
        )

    # Backpropagation
    loss_value = sum(actor_losses) + sum(critic_losses)
    grads = tape.gradient(loss_value, model_1.trainable_variables)
    optimizer.apply_gradients(zip(grads, model_1.trainable_variables))

    # Clear the loss and reward history
    # action_probs_history.clear()
    # critic_value_history.clear()
    # rewards_history.clear()

# Log details
episode_count += 1

print(game.hand_players, game.hand_dealer)
print('game.rewards\t', game.rewards)
print('state\t\t', state)
print('action\t\t', action)
print('critic_value_history', critic_value_history)
print('rewards\t\t', rewards)
print('episode_reward\t', episode_reward)
print('running_reward\t', running_reward)
print('returns\t\t', returns)
print('loss_value\t', loss_value)
print('episode_count\t', episode_count)

ValueError: ignored