In [None]:
%pip install gym

In [None]:
import numpy as np
import gym
import tensorflow as tf
from tensorflow.keras import layers, Model, optimizers
import matplotlib.pyplot as plt
from sklearn.model_selection import ParameterGrid

# Set up the Blackjack environment
env = gym.make('Blackjack-v1')

In [13]:
# PPO Actor-Critic Model
class PPOModel(Model):
    def __init__(self, num_actions):
        super(PPOModel, self).__init__()
        self.common = layers.Dense(128, activation='relu')
        self.actor = layers.Dense(num_actions, activation='softmax')
        self.critic = layers.Dense(1)

    def call(self, inputs):
        x = self.common(inputs)
        return self.actor(x), self.critic(x)

In [14]:
# Function to compute discounted rewards
def compute_returns(rewards, dones, gamma=0.99):
    returns = []
    discounted_sum = 0
    for reward, done in zip(reversed(rewards), reversed(dones)):
        if done:
            discounted_sum = 0
        discounted_sum = reward + gamma * discounted_sum
        returns.insert(0, discounted_sum)
    return returns

# Normalize advantages
def normalize(x):
    x -= np.mean(x)
    x /= (np.std(x) + 1e-8)
    return x

In [19]:
# Updated train_ppo function to accept hyperparameters
def train_ppo(env, model, actor_optimizer, critic_optimizer, num_episodes=1000, clip_epsilon=0.2, update_epochs=10):
    all_rewards = []
    actor_losses = []
    critic_losses = []

    for episode in range(num_episodes):
        print(f"{episode} out of {num_episodes} training episodes complete")
        state = env.reset()
        state = np.reshape(state, [1, -1])
        episode_states = []
        episode_actions = []
        episode_rewards = []
        episode_dones = []

        while True:
            # Obtain the logits (action probabilities) and value (state value) from the model
            logits, value = model(state)
            # Choose an action based on the logits using a probability distribution
            action = np.random.choice(env.action_space.n, p=np.squeeze(logits))
            next_state, reward, done, _ = env.step(action)
            # Reshape the next state to be compatible with the model input
            next_state = np.reshape(next_state, [1, -1])

            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            episode_dones.append(done)

            state = next_state

            if done:
                break
        # Compute the returns (discounted rewards) for the episode
        returns = compute_returns(episode_rewards, episode_dones)
        returns = np.array(returns).reshape(-1, 1)
        states = np.vstack(episode_states)
        actions = np.array(episode_actions)

        for _ in range(update_epochs):
            with tf.GradientTape() as tape:
                # Get the logits and values from the model for the collected states
                logits, values = model(states)
                values = tf.reshape(values, [-1])
                advantage = returns - values

                action_probs = tf.reduce_sum(tf.one_hot(actions, env.action_space.n) * logits, axis=1)
                old_action_probs = action_probs  # Placeholder for old action probs

                # Calculate the ratio of new and old action probabilities
                ratios = tf.exp(tf.math.log(action_probs + 1e-10) - tf.math.log(old_action_probs + 1e-10))
                # Calculate the surrogate loss for the policy update
                surrogate1 = ratios * advantage
                surrogate2 = tf.clip_by_value(ratios, 1 - clip_epsilon, 1 + clip_epsilon) * advantage

                # Calculate the actor loss using the clipped surrogate loss
                actor_loss = -tf.reduce_mean(tf.minimum(surrogate1, surrogate2))
                # Calculate the critic loss as the mean squared error of returns and values
                critic_loss = tf.reduce_mean(tf.square(returns - values))

                total_loss = actor_loss + 0.5 * critic_loss

            # Calculate the gradients of the total loss with respect to model parameters
            grads = tape.gradient(total_loss, model.trainable_variables)
            actor_grads = grads[:len(model.common.trainable_variables) + len(model.actor.trainable_variables)]
            critic_grads = grads[len(model.common.trainable_variables) + len(model.actor.trainable_variables):]

            # Split the gradients into actor and critic gradients
            actor_optimizer.apply_gradients(zip(actor_grads, model.trainable_variables[:len(model.common.trainable_variables) + len(model.actor.trainable_variables)]))
            critic_optimizer.apply_gradients(zip(critic_grads, model.trainable_variables[len(model.common.trainable_variables) + len(model.actor.trainable_variables):]))

        all_rewards.append(np.sum(episode_rewards))
        actor_losses.append(actor_loss.numpy())
        critic_losses.append(critic_loss.numpy())

    return all_rewards, actor_losses, critic_losses


In [None]:
# Function to train PPO with given hyperparameters
def train_ppo_with_params(env, params, num_episodes=500):
    actor_optimizer = optimizers.Adam(learning_rate=params['actor_learning_rate'])
    critic_optimizer = optimizers.Adam(learning_rate=params['critic_learning_rate'])

    model = PPOModel(env.action_space.n)

    all_rewards, actor_losses, critic_losses = train_ppo(
        env,
        model,
        actor_optimizer,
        critic_optimizer,
        num_episodes=num_episodes,
        clip_epsilon=params['clip_epsilon'],
        update_epochs=params['update_epochs']
    )

    avg_reward = np.mean(all_rewards[-100:])  # Average reward over the last 100 episodes
    return avg_reward

# Define the grid of hyperparameters
param_grid = {
    'actor_learning_rate': [0.0003, 0.001, 0.003],
    'critic_learning_rate': [0.0003, 0.001, 0.003],
    'gamma': [0.95, 0.99, 0.999],
    'clip_epsilon': [0.1, 0.2, 0.3],
    'update_epochs': [5, 10, 20]
}

# Grid search for hyperparameter tuning
best_avg_reward = -float('inf')
best_params = None

# Evaluate each combination of hyperparameters
for params in ParameterGrid(param_grid):
    print(f"Evaluating hyperparameters: {params}")
    avg_reward = train_ppo_with_params(env, params)
    if avg_reward > best_avg_reward:
        best_avg_reward = avg_reward
        best_params = params

print(f"Best Parameters: {best_params}, Best Average Reward: {best_avg_reward}")

In [21]:
# Visualizations
def plot_training_statistics(rewards, actor_losses, critic_losses, wins, draws, losses):
    episodes = range(len(rewards))
    total_games = np.array(wins) + np.array(draws) + np.array(losses)
    win_percentages = np.array(wins) / total_games * 100
    draw_percentages = np.array(draws) / total_games * 100
    loss_percentages = np.array(losses) / total_games * 100

    plt.figure(figsize=(20, 10))

    plt.subplot(2, 3, 1)
    plt.plot(episodes, rewards)
    plt.xlabel('Episodes')
    plt.ylabel('Total Rewards')
    plt.title('Total Rewards per Episode')

    plt.subplot(2, 3, 2)
    plt.plot(episodes, actor_losses)
    plt.xlabel('Episodes')
    plt.ylabel('Actor Loss')
    plt.title('Actor Loss per Episode')

    plt.subplot(2, 3, 3)
    plt.plot(episodes, critic_losses)
    plt.xlabel('Episodes')
    plt.ylabel('Critic Loss')
    plt.title('Critic Loss per Episode')

    plt.subplot(2, 3, 4)
    plt.plot(episodes, win_percentages, label='Win %')
    plt.plot(episodes, draw_percentages, label='Draw %')
    plt.plot(episodes, loss_percentages, label='Loss %')
    plt.xlabel('Episodes')
    plt.ylabel('Percentage')
    plt.title('Win/Draw/Loss Percentages')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [28]:
# Train the agent with best hyperparameters and track win/draw/loss percentages
def train_ppo_with_tracking(env, model, actor_optimizer, critic_optimizer, num_episodes=1000, clip_epsilon=0.2, update_epochs=10):
    all_rewards = []
    actor_losses = []
    critic_losses = []
    win_counts = []
    draw_counts = []
    loss_counts = []

    for episode in range(num_episodes):
        print(f"{episode} out of {num_episodes} training episodes complete.")
        state = env.reset()
        state = np.reshape(state, [1, -1])
        episode_states = []
        episode_actions = []
        episode_rewards = []
        episode_dones = []

        wins = 0
        draws = 0
        losses = 0

        while True:
            # Obtain the logits (action probabilities) and value (state value) from the model
            logits, value = model(state)
            # Choose an action based on the logits using a probability distribution
            action = np.random.choice(env.action_space.n, p=np.squeeze(logits))
            next_state, reward, done, _ = env.step(action)
            # Reshape the next state to be compatible with the model input
            next_state = np.reshape(next_state, [1, -1])

            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            episode_dones.append(done)

            state = next_state

            if done:
                if reward > 0:
                    wins += 1
                elif reward == 0:
                    draws += 1
                else:
                    losses += 1
                break

        # Compute the returns (discounted rewards) for the episode
        returns = compute_returns(episode_rewards, episode_dones)
        returns = np.array(returns).reshape(-1, 1)
        states = np.vstack(episode_states)
        actions = np.array(episode_actions)

        for _ in range(update_epochs):
            with tf.GradientTape() as tape:
                # Get the logits and values from the model for the collected states
                logits, values = model(states)
                values = tf.reshape(values, [-1])
                advantage = returns - values

                # Calculate action probabilities for the taken actions
                action_probs = tf.reduce_sum(tf.one_hot(actions, env.action_space.n) * logits, axis=1)
                old_action_probs = action_probs  # Placeholder for old action probs

                # Calculate the ratio of new and old action probabilities
                ratios = tf.exp(tf.math.log(action_probs + 1e-10) - tf.math.log(old_action_probs + 1e-10))
                # Calculate the surrogate loss for the policy update
                surrogate1 = ratios * advantage
                surrogate2 = tf.clip_by_value(ratios, 1 - clip_epsilon, 1 + clip_epsilon) * advantage

                # Calculate the actor loss using the clipped surrogate loss
                actor_loss = -tf.reduce_mean(tf.minimum(surrogate1, surrogate2))
                # Calculate the critic loss as the mean squared error of returns and values
                critic_loss = tf.reduce_mean(tf.square(returns - values))

                total_loss = actor_loss + 0.5 * critic_loss

            # Calculate the gradients of the total loss with respect to model parameter
            grads = tape.gradient(total_loss, model.trainable_variables)
            # Split the gradients into actor and critic gradients
            actor_grads = grads[:len(model.common.trainable_variables) + len(model.actor.trainable_variables)]
            critic_grads = grads[len(model.common.trainable_variables) + len(model.actor.trainable_variables):]

            # optimizer
            actor_optimizer.apply_gradients(zip(actor_grads, model.trainable_variables[:len(model.common.trainable_variables) + len(model.actor.trainable_variables)]))
            critic_optimizer.apply_gradients(zip(critic_grads, model.trainable_variables[len(model.common.trainable_variables) + len(model.actor.trainable_variables):]))

        all_rewards.append(np.sum(episode_rewards))
        actor_losses.append(actor_loss.numpy())
        critic_losses.append(critic_loss.numpy())
        win_counts.append(wins)
        draw_counts.append(draws)
        loss_counts.append(losses)

    return model, all_rewards, actor_losses, critic_losses, win_counts, draw_counts, loss_counts

In [None]:
best_params = {'actor_learning_rate': 0.0003, 'clip_epsilon': 0.1, 'critic_learning_rate': 0.0003, 'update_epochs': 10}
# Train the agent with best hyperparameters
best_actor_optimizer = optimizers.Adam(learning_rate=best_params['actor_learning_rate'])
best_critic_optimizer = optimizers.Adam(learning_rate=best_params['critic_learning_rate'])

best_model = PPOModel(env.action_space.n)

model, best_rewards, best_actor_losses, best_critic_losses, win_counts, draw_counts, loss_counts = train_ppo_with_tracking(
    env,
    best_model,
    best_actor_optimizer,
    best_critic_optimizer,
    num_episodes=1000,
    clip_epsilon=best_params['clip_epsilon'],
    update_epochs=best_params['update_epochs']
)

# Plot the results
plot_training_statistics(best_rewards, best_actor_losses, best_critic_losses, win_counts, draw_counts, loss_counts)

In [30]:
# Parameters for testing
games = 10
wins = 0
losses = 0
draws = 0
verbose = True

for i in range(games):
    state = env.reset()
    done = False
    rewards = 0
    player_hand = []
    dealer_hand = []

    # Get the initial player and dealer cards
    player_hand.append(state[0])  # Player's first card
    dealer_visible_card = state[1]  # Dealer's visible card (1-10 where 1 is ace)
    dealer_hand.append(dealer_visible_card)

    while not done:
        state_array = np.array(state).reshape(1, -1)
        action_probs, _ = model(state_array)
        action = np.argmax(action_probs.numpy()[0])
        state, reward, done, _ = env.step(action)
        rewards += reward

        # Update the player's hand if the action was hit
        if action == 1:  # Hit
            player_hand.append(state[0])

    if verbose:
        # Print the result of the game
        print("Player's hand:", player_hand)
        print("Dealer's visible hand:", dealer_visible_card)

    if rewards > 0:
        print("Win! :) ", rewards, "\n")
        wins += 1
    elif rewards == 0:
        print("Draw! :| ", rewards, "\n")
        draws += 1
    else:
        print("Loss! :( ", rewards, "\n")
        losses += 1

print("Win percentage: ", (wins/games)*100, "%")
print("Draw percentage: ", (draws/games)*100, "%")
print("Loss percentage: ", (losses/games)*100, "%")


Player's hand: [17]
Dealer's visible hand: 10
Loss! :(  -1.0 

Player's hand: [13]
Dealer's visible hand: 7
Loss! :(  -1.0 

Player's hand: [15]
Dealer's visible hand: 2
Win! :)  1.0 

Player's hand: [14]
Dealer's visible hand: 2
Loss! :(  -1.0 

Player's hand: [16]
Dealer's visible hand: 7
Loss! :(  -1.0 

Player's hand: [10]
Dealer's visible hand: 6
Loss! :(  -1.0 

Player's hand: [19]
Dealer's visible hand: 8
Win! :)  1.0 

Player's hand: [14]
Dealer's visible hand: 1
Loss! :(  -1.0 

Player's hand: [10]
Dealer's visible hand: 9
Loss! :(  -1.0 

Player's hand: [5, 16]
Dealer's visible hand: 10
Loss! :(  -1.0 

Win percentage:  20.0 %
Draw percentage:  0.0 %
Loss percentage:  80.0 %
