In [1]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import gymnasium as gym
import numpy as np
import tensorflow.keras as keras
from tensorflow.keras import ops, Model
from tensorflow.keras.layers import Dense, Input
import tensorflow as tf

In [3]:
env = gym.make('Blackjack-v1')

In [50]:
NUM_INPUTS = 3
NUM_ACTIONS = 2
NUM_HIDDEN = 128
DISCOUNT_FACTOR = 1
RUNNING_DISCOUNT = 0.5
EPS = np.finfo(np.float32).eps.item()

inputs = Input(shape=(NUM_INPUTS,))
common = Dense(NUM_HIDDEN, activation='relu')(inputs)
actor = Dense(NUM_ACTIONS, activation='softmax')(common)
critic = Dense(units=1)(common)

model = Model(inputs=inputs, outputs=[actor, critic])

In [None]:
import matplotlib.pyplot as plt

MAX_EPISODES = 100_000

def moving_average(x: np.ndarray, window: int) -> np.ndarray:
    result = np.zeros(len(x))
    i = len(x) - 1
    while i >= 0:
        sum_ = 0
        n = 0
        while i - n >= 0 and n < window:
            sum_ += x[i - n]
            n += 1
        result[i] = sum_ / n
        i -= 1
    return result
        
optimizer = keras.optimizers.Adam(learning_rate=0.01)
model.compile(optimizer=optimizer)
critic_loss = keras.losses.Huber()
running_reward = 0
episode_rewards = np.zeros(MAX_EPISODES)

for episode in range(MAX_EPISODES):
    state, _ = env.reset()
    action_probs_history = []
    expected_return_history = []
    rewards_history = []
    with tf.GradientTape() as tape:
        done = False
        while not done:
            # sum_, showing_card, usable_ace = state
            # if episode_count % 10 == 0:
            #     print(f'Episode {episode_count}: sum={sum_}, showing_card={showing_card}, usable_ace={usable_ace}')

            state = tf.convert_to_tensor(state)
            state = tf.expand_dims(state, 0)
            
            action_probs, expected_return = model(state)
            action = np.random.choice(NUM_ACTIONS, p=np.squeeze(action_probs))
            action_probs_history.append(ops.log(action_probs[0, action]))
            expected_return_history.append(expected_return[0, 0])

            state, reward, done, _, _ = env.step(action)
            rewards_history.append(reward)
            episode_rewards[episode] += reward

        running_reward = (
            RUNNING_DISCOUNT * episode_rewards[episode] +
            (1 - RUNNING_DISCOUNT) * running_reward
        )

        returns = np.zeros(len(rewards_history))
        discounted_sum = 0
        for i in range(len(rewards_history)):
            discounted_sum = rewards_history[-1 - i] + DISCOUNT_FACTOR * discounted_sum
            returns[-1 - i] = discounted_sum
        returns = (returns - np.mean(returns)) / (np.std(returns) + EPS)

        actor_loss = 0.
        critic_loss_ = 0.
        for action_prob, expected, return_ in zip(action_probs_history, expected_return_history, returns):
            diff = return_ - expected
            actor_loss -= action_prob * diff
            critic_loss_ += critic_loss(
                np.expand_dims(expected, 0),
                np.expand_dims(return_, 0)
            )

        cost = actor_loss + critic_loss_
        grads = tape.gradient(cost, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

    if episode % 100 == 0:
        print(f'Episode {episode}: {running_reward}')
        # print(np.squeeze(action_probs_history))
        # print(np.squeeze(rewards_history))




In [None]:
avg_reward = moving_average(episode_rewards[:episode], 1000)
plt.plot(range(len(avg_reward)), avg_reward)
plt.show()