<a href="https://colab.research.google.com/github/Chrispako990210/GRO860/blob/master/pacc2101_blackjack.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Download packages to session

!pip install gymnasium
!pip install numpy
!pip install stable-baselines3


Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting stable-baselines3
  Downloading stable_baselines3-2.4.0-py3-none-any.whl.metadata (4.5 kB)
Downloading stable_baselines3-2.4.0-py3-none-any.whl (183 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.9/183.9 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: stable-baselines3
Successfully installed stable-baselines3-2.4.0


In [2]:
import gymnasium as gym
import random
import numpy as np
from collections import deque

# Useful functions for card manipulations and calculations

def compare(a, b):
    if a == b:
        return -1.0 # Tie, dealer still wins
    return float(a > b) - float(a < b)

def has_usable_ace(hand):
    return int(1 in hand and sum(hand) + 10 <= 21)

def get_hand_value(hand):
    if has_usable_ace(hand):
        return sum(hand) + 10
    return sum(hand)

def is_bust(hand):
    return get_hand_value(hand) > 21

def score(hand):
    return 0 if is_bust(hand) else get_hand_value(hand)

# Creating are custom environment
class CustomBlackjack(gym.Env):

    def __init__(self, n_decks=3, n_shuffle=0.25):
        super().__init__()
        self.n_decks = n_decks
        self.deck = self.initialize_deck()
        self.min_cards_left = int(n_shuffle * len(self.deck))
        self.initial_bankroll = 200.0
        self.target = self.initial_bankroll * 1.5
        self.min_bet = 0
        self.max_bet = 50
        self.penality = self.max_bet * 2.0
        self.card_count = 0
        self.card_norm = 50

        self.bets = list(range(self.min_bet, self.max_bet + 1, 10))
        self.moves = [0, 1, 2]  # Hit, Stand, Double Down

        self.games_played = 0
        self.episode_count = 0

        self.action_space = gym.spaces.Box(
            low=np.array([self.min_bet, 0]),
            high=np.array([self.max_bet, 2]),
            shape=(2,),
            dtype=np.float32
            )

        # Observations mapped as [is_betting_round [0, 1], dealer card [1,10], agent sum [4, 21], usable_ace [0, 1], bankroll [0, 2000], card_count [-1000, 1000]]
        # TODO: Maybe add min and max bets as static values in the observation space, this may help the agent to learn the limits of the game and avoid invalid bets conditions.
        self.observation_space = gym.spaces.Box(
            low=np.array([0, 0, 0, 0, 0, -self.card_norm, self.max_bet]),
            high=np.array([1, 10, 21, 1, self.target, self.card_norm, self.max_bet]),
            shape=(7,),
            dtype=np.int16
            )

    def _map_action(self, action):
        # Map action to nearest valid bet and move
        bet = min(self.bets, key=lambda x: abs(x - action[0]))
        move = int(np.round(np.clip(action[1], 0, 2)))
        return bet, move

    def reset(self, seed=None):
        # Initialize/reset all relevant game state variables
        super().reset(seed=seed)
        self.deck = self.initialize_deck()

        # Reset bankroll and game variables
        self.bankroll = self.initial_bankroll
        self.current_bet = 0
        self.betting_round = 1

        # Reset blackjack state
        self.agent_hand = self.draw_hand()
        self.dealer_hand = self.draw_hand(count_card=False)

        # Update counters
        self.episode_count += 1
        # Info for debugging and
        info = {"episode_number": self.episode_count,
                "hands_played": self.games_played,
                "bankroll": self.bankroll}
        self.games_played = 0

        return self.get_observations(is_start = True), info

    def _normalize_observation(self, observation):
        # Betting round: already binary, no change needed
        betting_round = observation[0]

        # Dealer card: normalize from [0, 10] to [0, 1]
        dealer_card = observation[1] / 10

        # Agent sum: normalize from [0, 21] to [0, 1]
        agent_sum = observation[2] / 21

        # Usable ace: already binary, no change needed
        usable_ace = observation[3]

        # Bankroll: normalize based on initial and max bankroll
        bankroll = observation[4] / self.target

        # Card count: normalize to [-1, 1] range
        card_count = observation[5] / self.card_norm

        return np.array([
            betting_round,
            dealer_card,
            agent_sum,
            usable_ace,
            bankroll,
            card_count,
            (self.max_bet / (self.target))
        ], np.float32)

    def get_observations(self, is_start = False):
        if is_start:
            return np.array([1, 0, 0, 0, 0.5, 0, self.max_bet/self.target], dtype=np.float32)
        else:
            obs = np.array([
                self.betting_round,
                0 if self.betting_round else self.dealer_hand[0],
                0 if self.betting_round else get_hand_value(self.agent_hand),
                has_usable_ace(self.agent_hand),
                self.bankroll,
                self.card_count,
            ])
            # Apply normalization here
            return self._normalize_observation(obs)

    def initialize_deck(self):
        self.card_count = 0
        self.games_played = 0
        self.deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10] * 4 * self.n_decks
        random.shuffle(self.deck) # Shuffle the deck at the beginning
        return deque(self.deck)

    def draw_card(self, count_card=True):
        if len(self.deck) <= self.min_cards_left:
            self.deck = self.initialize_deck()
        card = self.deck.popleft()
        if count_card:
            self.update_count(card)
        return card

    def draw_hand(self, count_card=True):
        return [self.draw_card(), self.draw_card(count_card)]

    def update_count(self, card):
        if card in [2, 3, 4, 5, 6]:
            self.card_count += 1
        elif card in [10, 1]:
            self.card_count -= 1

    # Not implemented, could help exploration and learning of game mechanics
    def card_count_reward_watchdog(self, move):
        bet_high_i = int((2/3) * len(self.bets)) - 1
        bet_low_i = int((1/3) * len(self.bets)) - 1
        high_bet = self.bets[bet_high_i]
        low_bet = self.bets[bet_low_i]
        tc_threshold = 5
        # Good decisions
        if self.card_count >= tc_threshold and self.current_bet >= high_bet:
            if move in [0, 1]:
                return 1.0 # Reward for placing a correct bet
            elif move == 2:
                return 2.0 # Reward for placing a correct bet and doubling down
        elif self.card_count <= -tc_threshold and self.current_bet <= low_bet:
            return 1.0 # Reward for placing a correct bet

        # Bad decisions
        if self.card_count >= tc_threshold and self.current_bet <= low_bet:
            if move in [0, 1]:
                return -1.0 # Penality for placing a wrong bet
            elif move == 2:
                return -2.0
        elif self.card_count <= -tc_threshold and self.current_bet >= high_bet:
            if move in [0, 1]:
                return -2.0
            elif move == 2:
                return -4.0
        return 0.0

    def step(self, action):
        bet, move = self._map_action(action)
        self.current_bet = bet
        terminated = False
        truncated = False
        reward = 0.0
        info = {}

        if self.betting_round:
            if not (bet <= min(self.max_bet, self.bankroll)):
                truncated = True
                reward -= self.penality
                info = {"outcome": "Invalid bet"}
            else:
                self.betting_round = 0
            return self.get_observations(), reward, terminated, truncated, info

        if move == 0:  # Hit
            self.agent_hand.append(self.draw_card())
            if is_bust(self.agent_hand):  # Bust
                reward -= self.current_bet
                self.bankroll -= self.current_bet
                self.end_game()

        elif move == 1:  # Stand
            while get_hand_value(self.dealer_hand) < 17:
                self.dealer_hand.append(self.draw_card())
            # update the count for the hidden card that is now revealed
            self.update_count(self.dealer_hand[1])
            result = compare(score(self.agent_hand), score(self.dealer_hand))
            reward += (result * self.current_bet)
            self.bankroll += (result * self.current_bet)
            self.end_game()

        elif move == 2:  # Double Down
            self.current_bet = max(self.current_bet*2, self.bankroll) # Here we clip the value so the agent doesnt have to worry about it.
            self.agent_hand.append(self.draw_card())
            if is_bust(self.agent_hand):  # Bust
                reward -= self.current_bet
                self.bankroll -= self.current_bet
            else:
                while get_hand_value(self.dealer_hand) < 17:
                    self.dealer_hand.append(self.draw_card())
                # update the count for the hidden card that is now revealed
                self.update_count(self.dealer_hand[1])
                result = compare(score(self.agent_hand), score(self.dealer_hand))
                reward += (result * self.current_bet)
                self.bankroll += (result * self.current_bet)
            self.end_game()
        else:
            truncated = True
            reward -= self.penality
            info = {"outcome": "Invalid move"}
        # We might want to implement additional termination conditions and rewards here
        if self.bankroll <= 0:
            reward -= self.current_bet
            terminated = True
            info = {"outcome": "Bankrupt"}
        elif self.bankroll >= self.target:
            reward += self.current_bet
            terminated = True
            info = {"outcome": "Profit"}

        return self.get_observations(), reward, terminated, truncated, info

    def end_game(self):
        self.games_played += 1
        self.betting_round = 1


In [3]:
# Training section
from stable_baselines3 import PPO

from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.vec_env import VecMonitor
from stable_baselines3.common.vec_env import DummyVecEnv

from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.utils import set_random_seed
import os

n_envs = 4
n_steps = 1024
total_timesteps = n_steps * 150 * n_envs
entropy = 0.005
gamma = 0.999

def make_env(rank, seed):
    def _init():
        env = CustomBlackjack()
        set_random_seed(seed + rank)
        return env
    return _init

seed = 42
env = SubprocVecEnv([make_env(i, seed) for i in range(n_envs)])
env = VecMonitor(env)
model = PPO("MlpPolicy",
            env,
            learning_rate=0.0001,
            verbose=1,
            ent_coef=entropy,
            gamma=gamma, tensorboard_log="./logs_ppo/",
            device="cpu",
            n_steps=n_steps
)

model_dir = os.path.join(os.getcwd(), "models")
os.makedirs(model_dir, exist_ok=True)

eval_env = DummyVecEnv([lambda: CustomBlackjack()]) # Single environment for evaluation
eval_env = VecMonitor(eval_env)
eval_cb = EvalCallback(
    eval_env,
    best_model_save_path=model_dir,
    log_path = "./logs_ppo/eval_logs",
    eval_freq=500,
    deterministic=True,
    render=False,
    verbose=1
)

model.learn(total_timesteps=total_timesteps, log_interval=10, callback=eval_cb)

# Save the model
model.save("ppo_mlp_blackjack")

Using cpu device
Logging to ./logs_ppo/PPO_1


KeyboardInterrupt: 

In [None]:
# Load the trained agent and performe inference for performance evaluation based on termination conditions.
path = os.path.join(os.getcwd(), "models", "best_model.zip")
model = PPO.load(path, device="cpu")

# Initialize variables to keep track of the agent's performance
bankrupt = 0
doubled = 0
bad_move = 0
bad_bet = 0
total_rewards = []
env = CustomBlackjack()

n_episodes = 1000
for episode in range(n_episodes):
    obs, _ = env.reset()
    done = False
    truncated = False
    episode_reward = 0
    while not (done or truncated):
        # Get action from the model
        action, _states = model.predict(obs, deterministic=True)
        # Step the environment
        obs, reward, done, truncated, info = env.step(action)
        # Update the episode reward
        episode_reward += reward

        if info.get("outcome") == "Bankrupt":
            bankrupt += 1
        if info.get("outcome") == "Invalid move":
            bad_move += 1
        if info.get("outcome") == "Invalid bet":
            bad_bet += 1
        if info.get("outcome") == "Profit":
            doubled += 1

    total_rewards.append(episode_reward)

# Calculate and display statistics
print(f"Evaluation over {n_episodes} episodes:")
print(f"Bankroll lost: {bankrupt} times")
print(f"Bankroll x1.5: {doubled} times")
print(f"Invalid moves: {bad_move} times")
print(f"Invalid bets: {bad_bet} times")
print(f"Average reward per episode: {np.mean(total_rewards):.2f}")
print(f"Total rewards per episode: {total_rewards}")