In [1]:
import os
import numpy as np
import pandas as pd
import torch
import gymnasium as gym
from stable_baselines3 import PPO
from sb3_contrib import RecurrentPPO
import edss

class BayesEstimator():
    def __init__(self, env: gym.Env, transform=None) -> None:
        self.buildActions: list[edss.Action] = []
        self.testActions: list[edss.Action] = []
        for action in env.unwrapped.actions:
            match action.type:
                case "build":
                    self.buildActions.append(action)
                case "test":
                    self.testActions.append(action)
                case _:
                    pass
        self.transform = transform
        self.features = np.zeros_like(env.unwrapped.features, dtype=np.single)

    def estimate(self, observation):
        self.features = np.zeros_like(self.features)
        for i, b in enumerate(self.buildActions):
            feature = b.feature
            self.features[feature] = max(self.features[feature], observation["Build_progress"][i])
        
        match self.transform:
            case "arctan":
                self.features = 2*np.arctan(self.features)/np.pi 
            case "tanh":
                self.features = np.tanh(0.55 * self.features)   # Note: ln(3)/2 ~ 0.55
            case _: 
                self.features = np.clip(self.features,1,0.5)
        
        for i, t in enumerate(self.testActions):
            test = observation["Test_progress"][i]
            sen = t.attr2
            spc = t.attr1
            feature = t.feature
            prior = self.features[feature]
            if test == -1:
                pass
            elif test == 1:
                self.features[feature] = (sen * prior) / (sen * prior + (1-spc) * (1 - prior))
            else: # test == 0
                self.features[feature] = ((1-sen) * prior) / ((1-sen) * prior + spc * (1-prior))

        return {
            "Budget": observation["Budget"],
            "Features": self.features,
            "Build_progress": observation["Build_progress"],
            "Test_progress": observation["Test_progress"]
        }

def evaluateDesign(design, driver):
    env = edss.EDSSEnv(edss.design_from_yaml(design))
    bayes1 = BayesEstimator(env)
    bayes2 = BayesEstimator(env, transform="arctan")
    bayes3 = BayesEstimator(env, transform="tanh")

    exp_dir = design.removesuffix('.yaml')
    mdp_agent = PPO.load(os.path.join(exp_dir, "mdp_agent"))
    pomdp_agent = PPO.load(os.path.join(exp_dir, "pomdp_agent"))
    #bayes1_agent = PPO.load(os.path.join(exp_dir, "bayes_step_agent"))
    bayes2_agent = PPO.load(os.path.join(exp_dir, "bayes_agent"))
    #bayes3_agent = PPO.load(os.path.join(exp_dir, "bayes_tanh_agent"))
    recurrent_agent = RecurrentPPO.load(os.path.join(exp_dir, "recurrent_agent"))

    obs, _ = env.reset()
    episode_starts = torch.ones((0,), dtype=bool)
    _, recurrent_states = recurrent_agent.predict({"Budget": obs["Budget"], "Build_progress": obs["Build_progress"], "Test_progress": obs["Test_progress"]}, episode_start=np.ones((1,),dtype=bool))
    recurrent_states_tensor = (torch.tensor(recurrent_states[0]), torch.tensor(recurrent_states[1]))
    done = False
    #idx = ["MDP", "POMDP", "Bayes step", "Bayes arctan", "Bayes tahn", "Recurrent"]
    idx = ["MDP", "POMDP", "Bayes arctan", "Recurrent"]
    columns = []
    fea_cols = []
    bel1_cols = []
    bel2_cols = []
    bel3_cols = []
    bi_cols = []
    ti_cols = []
    for i in range(len(obs["Features"])):
        fea_cols.append(f'FS{i+1}')
        bel1_cols.append(f'BsS{i+1}')
        bel2_cols.append(f'BaS{i+1}')
        bel3_cols.append(f'BtS{i+1}')
    for i in range(len(obs["Build_progress"])):
        columns.append(f'BA{i+1}')
        bi_cols.append(f'BP{i+1}')
    for i in range(len(obs["Test_progress"])):
        columns.append(f'VA{i+1}')
        ti_cols.append(f'VP{i+1}')
    columns.append('Term')

    with torch.no_grad():
        while not done:
            p_obs = {"Budget": obs["Budget"], "Build_progress": obs["Build_progress"], "Test_progress": obs["Test_progress"]}

            t_obs, _ = mdp_agent.policy.obs_to_tensor(obs)
            mdp_dist = mdp_agent.policy.get_distribution(t_obs).distribution.probs
        
            t_obs, _ = pomdp_agent.policy.obs_to_tensor(p_obs)
            pomdp_dist = pomdp_agent.policy.get_distribution(t_obs).distribution.probs

            #t_obs, _ = bayes1_agent.policy.obs_to_tensor(bayes1.estimate(p_obs))
            #bayes1_dist = bayes1_agent.policy.get_distribution(t_obs).distribution.probs

            t_obs, _ = bayes2_agent.policy.obs_to_tensor(bayes2.estimate(p_obs))
            bayes2_dist = bayes2_agent.policy.get_distribution(t_obs).distribution.probs

            #t_obs, _ = bayes3_agent.policy.obs_to_tensor(bayes3.estimate(p_obs))
            #bayes_dist = bayes3_agent.policy.get_distribution(t_obs).distribution.probs

            t_obs, _ = recurrent_agent.policy.obs_to_tensor(p_obs)
            recurrent_dist, recurrent_states_tensor = recurrent_agent.policy.get_distribution(t_obs, recurrent_states_tensor, episode_starts)
            recurrent_dist = recurrent_dist.distribution.probs

            budget_frame = pd.DataFrame([obs["Budget"]], columns=["Budget"])
            feature_frame = pd.DataFrame([obs["Features"]], columns=fea_cols)
            #belief1_frame = pd.DataFrame([bayes1.estimate(p_obs)["Features"]], columns=bel1_cols)
            belief2_frame = pd.DataFrame([bayes2.estimate(p_obs)["Features"]], columns=bel2_cols)
            #belief3_frame = pd.DataFrame([bayes3.estimate(p_obs)["Features"]], columns=bel3_cols)
            build_frame = pd.DataFrame([obs["Build_progress"]], columns=bi_cols)
            test_frame = pd.DataFrame([obs["Test_progress"]], columns=ti_cols)
            df = budget_frame.join(feature_frame).join(belief2_frame).join(build_frame).join(test_frame)
            display(df.style.hide(axis="index"))
            #df = pd.DataFrame([mdp_dist[0].numpy(), pomdp_dist[0].numpy(), bayes1_dist[0].numpy(), bayes2_dist[0].numpy(), bayes3_dist[0].numpy(), recurrent_dist[0].numpy()], index=idx, columns=columns)
            df = pd.DataFrame([mdp_dist[0].numpy(), pomdp_dist[0].numpy(), bayes2_dist[0].numpy(), recurrent_dist[0].numpy()], index=idx, columns=columns)
            s = df.style.highlight_max(axis=1, color="grey")
            display(s)
            print()

            match(driver):
                case "mdp":
                     action, _ = mdp_agent.predict(obs, deterministic=True)
                case "pomdp":
                     action, _ = pomdp_agent.predict(p_obs, deterministic=True)
                #case "bayes1":
                #    action, _ = bayes1_agent.predict(bayes1.estimate(p_obs), deterministic=True)
                case "bayes2":
                     action, _ = bayes2_agent.predict(bayes2.estimate(p_obs), deterministic=True)
                #case "bayes3":
                #     action, _ = bayes3_agent.predict(bayes3.estimate(p_obs), deterministic=True)
                case "recurrent":
                     action, recurrent_states = recurrent_agent.predict(p_obs, state=recurrent_states, deterministic=True)

            obs, _, done, _, _ = env.step(action)


In [None]:
design = "experiments/test1.yaml"
#design = "experiments/build_pref.yaml"
#design = "experiments/test_pref.yaml"

evaluateDesign(design=design, driver="mdp")
evaluateDesign(design=design, driver="pomdp")
evaluateDesign(design=design, driver="bayes2")
evaluateDesign(design=design, driver="recurrent")