In [None]:
host_domain = "https://berghain.challenges.listenlabs.ai/"

In [None]:
def start_new_game(scenario: int, player_id: str = "5f840748-3dda-4641-bf7c-0bc4c9f5b219"):
  game_url = f"https://berghain.challenges.listenlabs.ai/new-game?scenario={scenario}&playerId={player_id}"
  resp = requests.get(game_url).json()
  # game_info = {
  #     "gameId": resp["gameId"]
  # }

  return resp["gameId"]

In [None]:
%%capture
!pip install stable-baselines3[extra]

In [None]:
# gym stuff
import gymnasium
from gymnasium import Env
from gymnasium.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete

# helpers
import numpy as np
import random
from IPython.display import clear_output
import os

# stable baselines stuff
from stable_baselines3 import PPO, DQN
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import BaseCallback, EvalCallback
from stable_baselines3.common.env_util import make_vec_env

# import pandas
import pandas as pd

# typing
from typing import Optional, Union, Any

# visualizations
import matplotlib.pyplot as plt

# torch stuff for optuna
import torch
import torch.nn as nn

import copy

import time

import requests

# Model Definition

In [None]:

class BouncerEnv(Env):
  def __init__(self,
               df: pd.DataFrame | str,
               game_kwargs: Dict,
               reward_weights_kwargs: Dict):
    # df, sample and sample index
    self.df = df
    self.cur_sample_idx = 0
    # all relevant game vars
    self.game_kwargs = game_kwargs
    self.status = "null"
    self.init_game_vars(game_kwargs)
    # Actions we can take, accept or reject
    self.action_space = Discrete(2, start=0)
    # Subgroup categories of sample, cats_accepted, accepted and rejected
    self.init_observation_space()
    # Set starting state, from first sample
    self.init_state()
    # reward weights
    self.reward_weights_kwargs = reward_weights_kwargs


  def init_game_vars(self, game_kwargs):
    self.num_cats = len(game_kwargs["rel_frequencies"])
    self.cats_accepted = [0, 0, 0, 0]
    self.total_accepted = 0
    self.total_rejected = 0
    self.rel_freqs = game_kwargs["rel_frequencies"]     # list[float num_cats]
    self.cat_mins = game_kwargs["cat_mins"]             # list[int num_cats]
    self.max_accepted = game_kwargs["max_accepted"]     # int
    self.max_rejected = game_kwargs["max_rejected"]     # int
    if type(self.df) == str and self.status != "running": # only reset environment on state: null | completed | failed
      # Add a requests session with retries, to reduce tcp overhead (no handshake every request)
      self.session = requests.Session()
      self.failed = False
      adapter = requests.adapters.HTTPAdapter(max_retries=5)
      self.session.mount('http://', adapter)
      self.session.mount('https://', adapter)
      self.hostDomain = "https://berghain.challenges.listenlabs.ai/"
      player_id = "5f840748-3dda-4641-bf7c-0bc4c9f5b219"
      game_url = f"https://berghain.challenges.listenlabs.ai/new-game?scenario={game_kwargs['scenario']}&playerId={player_id}"
      try:
        resp = self.session.get(game_url).json()
      except requests.exceptions.RequestException as e:
        print(f"failed to connect on error: {e}")
        self.failed = True
        self.session.close()
        return
      self.gameId = resp["gameId"]
      print(resp["gameId"])
      self.accept_all = False
    elif type(self.df) != str:
      self.df = self.df.sample(frac=1).reset_index(drop=True) # Reshuffle and reset index
      self.cur_sample_idx = 0

  def init_observation_space(self):
    self.observation_space = Dict(
        {
          "cur_sample": MultiBinary(self.num_cats),
          "rel_freqs": Box(0, 1, shape=(self.num_cats,), dtype=np.float32),
          "percentage_cats_filled": Box(0, 1, shape=(self.num_cats,), dtype=np.float32),
          "percentage_accepted": Box(0, 1, shape=(1,), dtype=np.float32),
          "percentage_rejected": Box(0, 1, shape=(1,), dtype=np.float32)
        }
        )

  def init_state(self):
    first_sample = self.get_first_sample()
    self.state = {
        "cur_sample": copy.deepcopy(first_sample),
        "rel_freqs": np.array(self.rel_freqs, dtype=np.float32),
        "percentage_cats_filled": np.array([0.0]*self.num_cats, dtype=np.float32),
        "percentage_accepted": np.array([0.0], dtype=np.float32),
        "percentage_rejected": np.array([0.0], dtype=np.float32)
    }

  def get_first_sample(self):
    if type(self.df) == str:
      # if failed to connect
      if self.failed:
        return np.array([0,0,0,0], dtype=np.int8)
      # else try to keep game going
      try:
        resp = self.session.get(f"{self.hostDomain}decide-and-next?gameId={self.gameId}&personIndex=0")
      except requests.exceptions.RequestException as e:
        print(f"failed to connect on error {e}")
        self.failed = True
        return np.array([0,0,0,0], dtype=np.int8)
      resp = resp.json()
      self.status = resp["status"]
      next_person = resp["nextPerson"]
      if next_person:
        self.next_index = next_person["personIndex"]
        next_attributes = next_person["attributes"]
        techno_lover = next_attributes["techno_lover"]
        well_connected = next_attributes["well_connected"]
        creative = next_attributes["creative"]
        berlin_local = next_attributes["berlin_local"]
        first_sample = np.array(
            [techno_lover,
            well_connected,
            creative,
            berlin_local],
            dtype=np.int8
        )
    else:
      first_sample = np.array(self.df.iloc[0], dtype = np.int8)

    return first_sample

  def req_satisfied(self):
    for i in range(self.num_cats):
      if self.cats_accepted[i] < self.cat_mins[i]:
        return False
    return True # Return True if all minimums are met

  def next_sample(self, accepted):
    """
    Handles decision logic for the online puzzle
    """
    if type(self.df) == str:
      if self.accept_all:
        decision = "true"
      elif accepted:
        decision = "true"
      else:
        decision = "false"
      req = f"{self.hostDomain}decide-and-next?gameId={self.gameId}&personIndex={self.next_index}&accept={decision}"
      resp = self.session.get(req)
      resp = resp.json()

      self.status = resp["status"]
      next_person = resp["nextPerson"]
      if next_person:
        self.next_index = next_person["personIndex"]
        next_attributes = next_person["attributes"]
        techno_lover = next_attributes["techno_lover"]
        well_connected = next_attributes["well_connected"]
        creative = next_attributes["creative"]
        berlin_local = next_attributes["berlin_local"]
        next_sample = np.array(
            [techno_lover,
            well_connected,
            creative,
            berlin_local],
            dtype=np.int8
        )
      else:
        next_sample = None
        self.failed = True
    else:
      self.cur_sample_idx += 1
      next_sample = np.array(self.df.iloc[self.cur_sample_idx], dtype = np.int8)

    return next_sample

  def calc_potential(self, cat_percentages, percentage_accepted):
    """
    Calculates the potential of a given state (defined by category counts).

    The potential increases as the categorical minimums are met,
    and is capped once a minimum is reached.
    Weighted by rarity to prioritize rare attributes.

    When framed as 'negative distance to goal', we nudge the model towards
    approaching the goal.

    PBRS provides dense rewards to nudge the model along the way.

    Current notes:

    - level-adaptive potential scaling and lower rejection penalties
      - This allowed the model to understand that as levels progress, selectiveness is more important
      - max level: 15

    - power > 1.0 for categorical filling potential, I applied this to the progress*weight attribute
      - power = 2.0: seems to weigh too heavily on rareness, causing rejections to grow a ton, to about 10000
      - max level: 18, pretty solid... Though rejections are huge

    - SPARSE REWARDS AS LEVEL COMPLETION
      - Upon objective completion, reward sparse rewards

    No change for rejctions, hence rejections are neutral

    Positive for goal approaching if accept and has category

    No change for accept with no categories
      Incentivizes goal approachment

    Positive for budget if accept and category count outweighs decreased budget

    Negative for budget if accept and category count does not outweigh decreased budget
      -   incentivises selectiveness
    """
    potential = 0.0
    for i in range(self.num_cats):

      # Weight by rarity
      # This makes progress on rare categories contribute more to the potential
      # implication: distance is larger for rarer categories, budget is tighter for rarer categories
      rarity_weight = (1.0 / self.rel_freqs[i])

      # budget signals are very clear to model learning
      # include budget here, distances increase as budget is depleted and a category remains unfilled (percentage of budget needed grows)
      budget_left = (1.0 - percentage_accepted)
      # 1000 >= n_budget_left >= 1, this function is not called when all acceptances are made
      n_budget_left = budget_left * self.max_accepted
      p_left = 1.0 - cat_percentages[i]
      n_left = p_left * self.cat_mins[i]

      # logic: it costs way more of a budget to accept a rarer category than it does to accept a common category
      # min <= 0 as n_left <= 0
      # max = 1.0 (when n_left >= n_budget_left)
      # dynamic term
      # By capping at 1.0, this signals it's always in a winnable state, which may be detrimental to learning
      p_budget_needed_to_fill = -1.0 * min(n_left / n_budget_left, self.reward_weights_kwargs['max_budget_needed']) # hyperparameter

      p_budget_needed_to_fill *= self.reward_weights_kwargs['budget_scale'] # budget weighting as this is a key signal for deciding next step, coupled with distance, hyperparameter

      # As distance becomes smaller, potential grows larger
      # Distance weighted by rarity and translated by budget
      # Signals importance of certain categories, and current budget left to acquire them
      potential += (p_budget_needed_to_fill)*(rarity_weight**self.reward_weights_kwargs['rarity_exponent'])

    return potential

  # This is your main reward calculation function
  def calc_reward(self, action, last_state_dict, current_state_dict, done):
    """
    Calculate the total reward for a step using base reward + potential-based shaping + final episodic rewards.
    """
    # --- Score Calculation Variables ---
    last_cat_percentages = last_state_dict['percentage_cats_filled']
    current_cat_percentages = current_state_dict['percentage_cats_filled']
    last_rejected_percentage = last_state_dict['percentage_rejected'][0]
    cur_rejected_percentage = current_state_dict['percentage_rejected'][0]
    # for potential based budget calculations
    last_accepted_percentage = last_state_dict['percentage_accepted'][0]
    current_accepted_percentage = current_state_dict['percentage_accepted'][0]

    shaping_term = 0
    if not done:
      # --- Potential-Based Shaping Term as Distance From Goal (gamma * Phi(s') - Phi(s)) ---
      # no potential rewards on done state as potential of any terminal state should be zero (nowhere to move to...)
      last_potential = self.calc_potential(last_cat_percentages, last_accepted_percentage)
      current_potential = self.calc_potential(current_cat_percentages, current_accepted_percentage)
      # > 0 only when distance from goal decreases
      shaping_term = (self.reward_weights_kwargs['potential_discount_factor'] * current_potential - last_potential) * self.reward_weights_kwargs['potential_shaping_scalar']

    # --- Final/Episodic Reward ---
    # Sparsest rewards upon done state
    final_reward = 0
    if done:
      # beat game
      if self.req_satisfied(): # Assuming this checks all final conditions
        final_reward += self.reward_weights_kwargs['final_bonus']
      # lost game
      else:
        final_reward -= self.reward_weights_kwargs['final_penalty']

    # final episodic rewards for online game, can be infinite technically... leave as is for now, currently infinite
    elif type(self.df) == str and self.accept_all:
      final_reward += self.reward_weights_kwargs['final_bonus']

    # --- Combine all reward components ---
    total_reward = shaping_term + final_reward

    if type(total_reward) != float:
      return float(total_reward)
    return total_reward

  def increment_counts(self, accepted):
    if accepted:
      # increment categorical counts
      for i in range(self.num_cats):
          if self.state["cur_sample"][i]:
            self.cats_accepted[i] +=  1
      # increment total accepted
      self.total_accepted += 1
    else:
      self.total_rejected += 1

  def calculate_state_percentages(self, accepted):
    # update state percentages on each decision
    # conditional here to save unneeded computations
    if accepted:
      self.state["percentage_accepted"][0] = self.total_accepted / self.max_accepted
      for i in range(self.num_cats):
        # cap it at 1.0, helps fo reward signaling as well
        self.state["percentage_cats_filled"][i] = min(self.cats_accepted[i] / self.cat_mins[i], 1.0)
    else:
      self.state["percentage_rejected"][0] = self.total_rejected / self.max_rejected

  def step(self, action: bool):
    # add sleep aspect to decrease chances of sending too many requests
    time.sleep(0.01)
    if type(self.df) == str and self.accept_all:
      action = True

    # for keeping track of goal progression
    last_state_dict = copy.deepcopy(self.state)
    self.increment_counts(action)
    self.calculate_state_percentages(action)

    # check termination conditions to determine final reward (goal met or not)
    done = False
    truncated = False

    # Stopping condition, relies upon next_sample != None for the online puzzle
    # sample next person in line
    # requirement satisfied could be used here
    next_person = self.next_sample(action)
    if type(self.df) == str:
      if next_person is None:
        done = True
      # early break to return needed processing information and keep accepting
      elif self.accept_all:
        reward = 0.0
        self.state["cur_sample"] = next_person
        done = False
        return self.state, reward, done, truncated, {}
      # if not done and not yet on accept all stage
      elif self.req_satisfied():
        self.accept_all = True
        done = False
    else:
      if self.total_accepted == self.max_accepted or self.total_rejected == self.max_rejected:
        done = True
      elif self.req_satisfied():
        done = True
      else:
        done = False

    current_state_dict = copy.deepcopy(self.state)

    # calculate reward before sampling next person
    reward = self.calc_reward(action, last_state_dict, current_state_dict, done)

    if next_person is not None:
      self.state["cur_sample"] = next_person

    # truncated, not relevant here due to no timelimit and not being able to go out of bounds
    truncated = False

    # Placeholder for info
    info = {}

    return self.state, reward, done, truncated, info

  def render(self):
    # visualizations
    pass

  def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None) -> tuple[dict[str, np.ndarray], dict]:
    # reset environment
    self.init_game_vars(self.game_kwargs)
    self.init_state()
    # Placeholder for info
    info = {}
    return self.state, info

relative_frequencies = [0.6265, 0.47, 0.06227, 0.398]
cat_mins = [650, 450, 300, 750]

game_kwargs = {
    "rel_frequencies": relative_frequencies,
    "cat_mins": cat_mins,
    "max_accepted": 1000,
    "max_rejected": 20000,
    "max_hardness": 1,
    "max_hardness_completions": 1,
    "scenario": 2
}

reward_weights_kwargs = {
    'rarity_exponent': 0.3, # how important is the rarity of a sample?
    'budget_scale': 20.0, # how important is budget?
    'max_budget_needed': 2.0, # how bad is a losing state?
    'final_bonus': 1000.0, # used
    'final_penalty': -500.0, # used
    'potential_shaping_scalar': 1.0, # Weight for the potential shaping term
    'potential_discount_factor': 0.99999 # Discount factor for the potential shaping term
}

ENV_KWARGS = {
    "df": "",
    "game_kwargs": game_kwargs,
    "reward_weights_kwargs": reward_weights_kwargs
}

# Eval Callback

In [None]:
class CustomEvalCallback(BaseCallback):
  """
  A custom callback that derives from ``BaseCallback``.

  :param verbose: Verbosity level: 0 for no output, 1 for info messages, 2 for debug messages
  """
  def __init__(self, train_env, verbose: int = 0, eval_freq: int = 500):
      super().__init__(verbose)
      # Those variables will be accessible in the callback
      # (they are defined in the base class)
      # The RL model
      # self.model = None  # type: BaseAlgorithm
      # An alias for self.model.get_env(), the environment used for training
      # self.training_env # type: VecEnv
      # Number of time the callback was called
      # self.n_calls = 0  # type: int
      # num_timesteps = n_envs * n times env.step() was called
      # self.num_timesteps = 0  # type: int
      # local and global variables
      # self.locals = {}  # type: Dict[str, Any]
      # self.globals = {}  # type: Dict[str, Any]
      # The logger object, used to report things in the terminal
      # self.logger # type: stable_baselines3.common.logger.Logger
      # Sometimes, for event callback, it is useful
      # to have access to the parent object
      # self.parent = None  # type: Optional[BaseCallback]
      self.eval_rounds = 1
      self.eval_freq = eval_freq
      self.train_env = train_env
      self.test_env = BouncerEnv(
          df = "",
          game_kwargs=game_kwargs,
          reward_weights_kwargs=reward_weights_kwargs
          )

      self.reward_results_trial = []
      self.tot_accepted_results_trial = []
      self.tot_rejected_results_trial = []
      self.tot_cats_accepted_results_trial: list[list[int]] = []

      self.reward_results = []
      self.tot_accepted_results = []
      self.tot_rejected_results = []
      self.tot_cats_accepted_results = []
      self.max_level = []

  def eval_model(self):
    """
    Evaluation function
    """
    attempts = 0
    max_attempts = 1
    for episode in range(self.eval_rounds):
      obs, _ = self.test_env.reset()
      # ensure hardness not reset until next evaluation
      self.test_env.reset_hardness = False
      done = False
      score = 0
      cur_reward = []
      cur_tot_accepted_results = []
      cur_tot_rejected_results = []
      cur_tot_cats_accepted_results = []
      while not done:
        action, _state = self.model.predict(obs, deterministic=True)
        obs, reward, done, _, info = self.test_env.step(action)
        # vec_env.render()
        score+=reward
        # vec env auto reset, so ensure printing on final state before reset
        if done:
          print('Episode:{} \nScore:{}'.format(episode, score))
          # Access and print last observation before reset
          print(f'Terminal Observation:')
          print(f'    Fulfilled Categories: {obs["percentage_cats_filled"]}')
          print(f'    Accepted: {obs["percentage_accepted"]}')
          print(f'    Rejected: {obs["percentage_rejected"]}')
          succeeded = self.test_env.req_satisfied()
          if not succeeded:
            attempts += 1
          else:
            attempts = 0
          print(f'Succeeded: {succeeded}')

      if attempts >= max_attempts:
        break

  def visualize(self, trial=False):
    reward_results = self.reward_results
    tot_accepted_results = self.tot_accepted_results
    tot_rejected_results = self.tot_rejected_results
    tot_cats_accepted_results = np.array(self.tot_cats_accepted_results)
    reward_title = "Rewards Per Update"
    tot_accepted_title = "Total Accepted Per Update"
    tot_rejected_title = "Total Rejected Per Update"
    tot_cats_accepted_title = "Total Categories Accepted Per Update"
    fig, axs = plt.subplots(nrows = 2, ncols = 2, sharex=False, sharey=False, figsize=(5,5))
    axs[0,0].plot(reward_results)
    axs[0,0].set_title(reward_title)
    axs[0,1].plot(tot_accepted_results)
    axs[0,1].set_title(tot_accepted_title)
    axs[1,0].plot(tot_rejected_results)
    axs[1,0].set_title(tot_rejected_title)
    tot_cats_accepted_results = np.array(tot_cats_accepted_results)
    axs[1,1].plot(tot_cats_accepted_results[:, 0], 'tab:orange')
    axs[1,1].plot(tot_cats_accepted_results[:, 1], 'tab:green')
    axs[1,1].plot(tot_cats_accepted_results[:, 2], 'tab:red')
    axs[1,1].plot(tot_cats_accepted_results[:, 3], 'tab:blue')
    axs[1,1].legend(['tl', 'wc', 'c', 'bl'])
    axs[1,1].set_title(tot_cats_accepted_title)
    plt.show()
    print(f'MAX LEVELS: {self.max_level}')

  def _on_training_start(self) -> None:
    """
    This method is called before the first rollout starts.
    """
    super()._on_training_start()
    self.eval_model()

  def _on_step(self) -> bool:
    super()._on_step()
    if self.n_calls % self.eval_freq == 0:
      self.eval_model()
    return True

  def _on_training_end(self) -> None:
    """
    Training Visualization
    """
    # self.visualize()
    pass

# Run the model

In [None]:
model = DQN(
    "MultiInputPolicy",
    env=vec_env,
    verbose=0,
    seed = 1234963,
    learning_rate=1e-5
    )

model.learn(total_timesteps=1_000_000, progress_bar = True,)

save_path = "/content/drive/MyDrive/rl_stuff"
model_file_path = os.path.join(save_path, "ll_ai_puzzle_DQN")

model.save(model_file_path)
print(f"Model saved to {model_file_path}.zip")

Output()

KeyboardInterrupt: 

In [None]:
model_file_path = os.path.join(save_path, "ll_puzzle_2_082525_0")

# Run from load

In [None]:
# load rl model
save_path = "/content/drive/MyDrive/rl_stuff"
os.makedirs(save_path, exist_ok=True) # Create the folder if it doesn't exist
model_file_path = os.path.join(save_path, "bouncer_dqn_400k")
vec_env = make_vec_env(BouncerEnv, n_envs = 1, env_kwargs=ENV_KWARGS)
loaded_model = DQN.load(model_file_path, env=vec_env)
print("Model loaded successfully!")



In [None]:
vec_env.get_attr('failed')

[True]

In [None]:
for i in range(10):
  state = vec_env.reset()
  done = False
  score = 0
  while not done and not vec_env.get_attr('failed')[0]:
    action, _state = loaded_model.predict(state, deterministic=True)
    try:
      time.sleep(0.01)
      state, reward, done, info = vec_env.step(action)
    except:
      break



















