In [1]:
!pip install pygame pyvirtualdisplay

Collecting pyvirtualdisplay
  Downloading PyVirtualDisplay-3.0-py3-none-any.whl.metadata (943 bytes)
Downloading PyVirtualDisplay-3.0-py3-none-any.whl (15 kB)
Installing collected packages: pyvirtualdisplay
Successfully installed pyvirtualdisplay-3.0


In [2]:
!apt install swig && pip install gymnasium box2d box2d-kengz

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 35 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 1s (1,216 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 126308 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Unpacking swig4.0 (4.0.2-1ubuntu1) ...
Selecting previously unselected package swig.
Preparing to unpack .../swig_4.0.2-1ubu

In [3]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from IPython.display import display, HTML
import base64
import os

In [84]:
env = gym.make("Blackjack-v1")
print(env.observation_space)
print(env.action_space)

Tuple(Discrete(32), Discrete(11), Discrete(2))
Discrete(2)


In [82]:
np.asarray(env.observation_space.sample())

array(5)

In [83]:
env.action_space.sample()

np.int64(0)

In [15]:
from itertools import combinations_with_replacement

def polynomial_features(state, degree=1):
    """
    Generate a polynomial basis feature vector from a state.

    Parameters:
        state: array-like
        degree: int, the maximum total degree of polynomial terms

    Returns:
        feature_vector: np.ndarray, shape (num_features,)
    """
    state = np.asarray(state)
    size = state.shape[0]

    # Create index tuples for all monomials up to the given degree
    feature_vector = [1.0]  # bias term
    for d in range(1, degree + 1):
        for idxs in combinations_with_replacement(range(size), d):
            term = 1.0
            for i in idxs:
                term *= state[i]
            feature_vector.append(term)

    return np.array(feature_vector).reshape(-1)


In [88]:
from math import isinf
import torch
import torch.nn as nn
import torch.nn.functional as F

alpha_sv_estimates = .001
alpha_policy_updates = .0001
gamma = 0.75 #discount factor on future rewards -- discounting pretty heavily
# final episode reward should be allocated primarily to the last few state/action pairs and not earlier ones
num_episodes = 20000
dim = polynomial_features(env.observation_space.sample()).shape[0]
wts = np.zeros(dim)
num_actions = 2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
epochs_per_batch = 5
epsilon = .03


class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 6)
        self.fc2 = nn.Linear(6, 6)
        self.fc3 = nn.Linear(6, action_dim)


    def forward(self, state):

        if torch.isnan(state).any():
          print("NaNs in features!")
        if torch.isinf(state).any():
          print("Infs in features!")

        x = F.relu(self.fc1(state))

        if torch.isnan(x).any():
          print("NaNs detected in output of layer 1!")

        if torch.isinf(x).any():
          print("Infs detected in output of layer 1!")

        y = F.relu(self.fc2(x))

        if torch.isnan(y).any():
          print("NaNs detected in output of layer 2!")

        if torch.isinf(y).any():
          print("Infs detected in output of layer 2!")

        logits = self.fc3(y)

        if torch.isnan(logits).any():
          print("NaNs detected in logits!")

        if torch.isinf(logits).any():
          print("Infs detected in logits!")


        probs = F.softmax(logits, dim=-1)
        probs = probs / probs.sum(dim=-1, keepdim=True)  # ensures it sums to 1
        return probs


def gradient_descent_sv_weights(w, G, s):
  features = polynomial_features(s)
  w += alpha_sv_estimates * (G - np.dot(w, features)) * features
  return w

def select_action(policy_net, state):
    features = polynomial_features(state)
    if np.isnan(features).any():
      print("NaNs in features!")
    if torch.isinf(torch.tensor(features)).any():
      print("Infs in features!")
    input = torch.tensor(features, dtype=torch.float32).to(device)
    probs = policy_net(input)
    m = torch.distributions.Categorical(probs)
    action = m.sample()
    log_prob = m.log_prob(action)
    return action.item(), log_prob

def get_log_prob(s, a):
  features = polynomial_features(s)
  if np.isnan(features).any():
    print("NaNs in features!")
  if torch.isinf(torch.tensor(features)).any():
    print("Infs in features!")
  input = torch.tensor(features, dtype=torch.float32, device=device)
  probs = policy_network(input)
  m = torch.distributions.Categorical(probs)
  log_prob = m.log_prob(torch.tensor(a, dtype=torch.long, device=device))
  return log_prob


def safe_normalize(tensor: torch.Tensor, eps=1e-8):
    if torch.isnan(tensor).any():
        return tensor  # skip normalization
    if torch.isinf(tensor).any():
        return tensor  # skip normalization

    mean = tensor.mean()
    std = tensor.std()

    if torch.isnan(mean) or torch.isnan(std) or std < eps:
        return tensor  # skip normalization

    return (tensor - mean) / (std + eps)

In [89]:
policy_network = PolicyNetwork(dim, num_actions).to(device)
optimizer = torch.optim.Adam(policy_network.parameters(), lr=alpha_policy_updates)
episode_rewards = []
for episode_idx in range(num_episodes):
    rewards = [] #stores rewards of each time step in episode
    actions = [] #stores actions taken in episode under behavior policy
    states = [] #stores states visited in episode by behavior policy
    probs = [] #stores pi(a|s) for each state,action pair in episode under behavior policy; detached from computation graph

    obs, _ = env.reset()
    undiscounted_reward_sum = 0
    done = False

    while not done:
        states.append(obs)
        action, log_prob = select_action(policy_network, obs) #forward pass thru policy network using cur state (obs) and returns action (discrete idx) and log_prob of that action
        prob = torch.exp(log_prob).detach() #probability of the discrete action BUT NOT the sampled action; need to divide
        probs.append(prob)
        obs, reward, terminated, truncated, _ = env.step(action)


        actions.append(action) #store the discrete action index

        done = terminated or truncated
        if not done:
          rewards.append(reward) #store reward from this step
        else: #end of blackjack round
          if action == 1: #last action was hit means that you busted
            rewards.append(-1)
          else: #last action was a stick
            if obs[0] <= 11: #stuck with <= 11 is an awful move
              rewards.append(-5)
            else:
              rewards.append(reward)
        undiscounted_reward_sum += rewards[-1]
    episode_rewards.append(undiscounted_reward_sum)

    if (episode_idx+1) % 100 == 0:
        print('Episode: {} moving avg reward: '.format(episode_idx+1), np.mean(episode_rewards[-20:]))

    # Compute discounted returns
    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)

    # Compute advantages and update baseline weights
    advantages = []
    for i in range(len(states)):
        cur_state = states[i]
        cur_return = returns[i]
        feat_np = polynomial_features(cur_state)
        baseline = np.dot(wts, feat_np)
        adv = cur_return - baseline
        advantages.append(adv)


        #wts = gradient_descent_sv_weights(wts, cur_return, cur_state)

    advantages = torch.tensor(advantages, dtype=torch.float32, device=device)
    safe_normalize(advantages)

    for i in range(epochs_per_batch):
      clipped_vals = []
      unclipped_vals = []

      state_features_np = np.array([polynomial_features(s) for s in states])  # shape: [batch_size, feature_dim]
      states_tensor = torch.tensor(state_features_np, dtype=torch.float32, device=device)
      actions_tensor = torch.tensor(actions, dtype=torch.long, device=device)
      old_probs_tensor = torch.tensor(probs, dtype=torch.float32, device=device)

      logits = policy_network(states_tensor) #forward pass using visited states thru the policy network which outputs a distribution over the discrete action space for each state (ie: batch size x 10 ** 4)
      dists = torch.distributions.Categorical(logits)
      new_log_probs = dists.log_prob(actions_tensor)  # shape: [batch]; log_prob of the discrete action index!
      new_probs = torch.exp(new_log_probs)

      # Compute ratios
      ratios = new_probs / (old_probs_tensor + 1e-8)  # shape: [batch]


      # PPO clipped objective (vectorized)
      clipped_ratios = torch.clamp(ratios, 1 - epsilon, 1 + epsilon)
      clipped_obj = clipped_ratios * advantages
      unclipped_obj = ratios * advantages

      loss_per_step = -torch.min(unclipped_obj, clipped_obj)  # shape: [batch]
      loss = loss_per_step.mean()

      if torch.isnan(loss):
        print("NaN loss detected")
        print("ratios:", ratios)
        print("advantages:", advantages)
        print("unclipped:", unclipped_obj)
        print("clipped:", clipped_obj)

      optimizer.zero_grad()
      loss.backward()

      for name, param in policy_network.named_parameters():
        if param.grad is not None and torch.isnan(param.grad).any():
          print(f"NaN in gradient of {name}")
        if param.grad is not None and torch.norm(param.grad) > 1e6:
          print(f"Exploding gradient in {name}:", torch.norm(param.grad).item())
      torch.nn.utils.clip_grad_norm_(policy_network.parameters(), max_norm=1.0)
      optimizer.step()

    for i in range(len(states)):
        cur_state = states[i]
        cur_return = returns[i]
        wts = gradient_descent_sv_weights(wts, cur_return, cur_state)



  std = tensor.std()


Episode: 100 moving avg reward:  -0.9
Episode: 200 moving avg reward:  -1.2
Episode: 300 moving avg reward:  -1.1
Episode: 400 moving avg reward:  -0.8
Episode: 500 moving avg reward:  -0.45
Episode: 600 moving avg reward:  -0.5
Episode: 700 moving avg reward:  -0.7
Episode: 800 moving avg reward:  -0.45
Episode: 900 moving avg reward:  -0.9
Episode: 1000 moving avg reward:  -0.75
Episode: 1100 moving avg reward:  -0.75
Episode: 1200 moving avg reward:  -0.95
Episode: 1300 moving avg reward:  -0.6
Episode: 1400 moving avg reward:  -0.85
Episode: 1500 moving avg reward:  -0.7
Episode: 1600 moving avg reward:  -1.3
Episode: 1700 moving avg reward:  -0.8
Episode: 1800 moving avg reward:  -0.55
Episode: 1900 moving avg reward:  -0.8
Episode: 2000 moving avg reward:  -0.85
Episode: 2100 moving avg reward:  -1.65
Episode: 2200 moving avg reward:  -1.15
Episode: 2300 moving avg reward:  -1.05
Episode: 2400 moving avg reward:  -0.8
Episode: 2500 moving avg reward:  -1.25
Episode: 2600 moving a

In [91]:
rewards = []
for episode_idx in range(1000):
    obs, _ = env.reset()
    done = False

    while not done:
        action, log_prob = select_action(policy_network, obs)
        obs, reward, terminated, truncated, _ = env.step(action)

        done = terminated or truncated
        if done:
          rewards.append(reward)

print(sum(rewards)/len(rewards))


-0.064


Lose a tad more more than we win -- which is not unexpected in Blackjack

In [93]:
def get_prob(s):
  features = polynomial_features(s)
  if np.isnan(features).any():
    print("NaNs in features!")
  if torch.isinf(torch.tensor(features)).any():
    print("Infs in features!")
  input = torch.tensor(features, dtype=torch.float32, device=device)
  probs = policy_network(input)
  return probs

ct = 0
while True:
  state = env.observation_space.sample()
  if state[0] <= 20:
    ct +=1
    print(state, get_prob(state), sep = ': ')
    if ct == 50:
      break
  else:
    continue



(np.int64(20), np.int64(2), np.int64(0)): tensor([0.9883, 0.0117], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(0), np.int64(2), np.int64(1)): tensor([0.0074, 0.9926], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(19), np.int64(7), np.int64(0)): tensor([0.9754, 0.0246], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(8), np.int64(9), np.int64(0)): tensor([0.0021, 0.9979], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(0), np.int64(2), np.int64(1)): tensor([0.0074, 0.9926], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(1), np.int64(6), np.int64(0)): tensor([0.0057, 0.9943], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(9), np.int64(2), np.int64(0)): tensor([0.0108, 0.9892], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(16), np.int64(1), np.int64(0)): tensor([0.7634, 0.2366], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(10), np.int64(7), np.int64(1)): tensor([0.0270, 0.9730], device='cuda:0', grad_fn=<DivBackward0>)
(np.int64(7), np.int64(0), np.int

seems like a decent policy is learned