## Compare the results of handcrafted, baselines, and HumemAI


In [None]:
from typing import Literal
import shutil
import os
from glob import glob
from humemai.utils import read_yaml
import pandas as pd
import numpy as np
from tqdm.auto import tqdm

from typing import Literal
import pandas as pd


def get_handcrafted(
    size: Literal["xxs", "xs", "s", "m", "l", "xl", "xxl"],
    include_pretrain_semantic: bool = True,
    base_path: str = "training-results/non-equal-object-probs/",
) -> pd.DataFrame:
    """Get hand-crafted results.

    room_size=xxs   num_obs=6.0     max_obs=6   min_obs=6
    room_size=xs    num_obs=6.52    max_obs=8   min_obs=5
    room_size=s     num_obs=5.64    max_obs=7   min_obs=5
    room_size=m     num_obs=6.3     max_obs=10  min_obs=5
    room_size=l     num_obs=5.32    max_obs=8   min_obs=5
    room_size=xl    num_obs=5.58    max_obs=7   min_obs=5
    room_size=xxl   num_obs=6.0     max_obs=8   min_obs=5

    Args:
        size: room size

    """

    df = pd.read_csv(
        os.path.join(
            base_path, f"handcrafted/hand-crafted-results-room_size={size}.csv"
        )
    )

    if not include_pretrain_semantic:
        df = df[
            (df["pretrain_semantic"] == False)
            | (df["pretrain_semantic"].str.lower() == "false")
        ]

    df["test_mean"] = pd.to_numeric(df["test_mean"], errors="coerce")
    df_sorted = df.sort_values(
        by=["long_capacity", "test_mean"], ascending=[True, False]
    )
    df = df_sorted.groupby("long_capacity").first().reset_index()

    # add column "room_size":
    df["room_size"] = size

    # Rename columns
    df.rename(
        columns={
            "long_capacity": "capacity",
            "test_mean": "test",
            "test_std": "std_test",
        },
        inplace=True,
    )

    df["val"] = np.nan
    df["test_mm"] = np.nan
    # df["std_test_mm"] = np.nan
    df["val_mm"] = np.nan
    df["agent_type"] = "handcrafted"
    df["#_runs"] = 5
    df["terminates_at"] = 99

    # Reorder columns in df1
    df = df[
        [
            "test",
            # "std_test",
            "val",
            # "test_mm",
            # "std_test_mm",
            # "val_mm",
            "#_runs",
            "capacity",
            "agent_type",
            "pretrain_semantic",
            "semantic_decay_factor",
            "room_size",
            "mm_policy",
            "qa_function",
            "explore_policy",
        ]
    ]

    return df


def determine_hyper_parameters(train: dict) -> dict:
    """Determine hyper parameters."""
    hyper_parameters = {}

    hyper_parameters["capacity"] = train["capacity"]["long"]
    hyper_parameters["pretrain_semantic"] = train["pretrain_semantic"]
    hyper_parameters["semantic_decay_factor"] = train["semantic_decay_factor"]
    hyper_parameters["room_size"] = train["env_config"]["room_size"]
    hyper_parameters["num_iterations"] = train["num_iterations"]
    hyper_parameters["replay_buffer_size"] = train["replay_buffer_size"]
    hyper_parameters["warm_start"] = train["warm_start"]
    hyper_parameters["terminates_at"] = train["env_config"]["terminates_at"]
    hyper_parameters["target_update"] = train["target_update_interval"]
    hyper_parameters["min_epsilon"] = train["min_epsilon"]
    hyper_parameters["gamma"] = train["gamma"]
    hyper_parameters["agent_type"] = "dqn"

    return hyper_parameters


def nanmean(data):
    return None if np.isnan(data).any() else round(np.mean(data))


def nanstd(data):
    return None if np.isnan(data).any() else round(np.std(data))


def nanmax(data):
    return None if np.isnan(data).any() else round(np.max(data))


def nanmin(data):
    return None if np.isnan(data).any() else round(np.min(data))


def get_dataframe(
    room_size: Literal["xxs", "xs", "s", "m", "l", "xl", "xxl"],
    base_path: str = "training-results/non-equal-object-probs/",
) -> pd.DataFrame:
    paths = glob(
        os.path.join(base_path, f"dqn/room_size={room_size}/*/*/explore/results.yaml")
    )

    if len(paths) == 0:
        return pd.DataFrame()

    final = {}
    for result_path in tqdm(paths):
        result = read_yaml(result_path)
        train_path = result_path.replace("results.yaml", "train.yaml")
        train = read_yaml(train_path)

        val_score = max([foo["mean"] for foo in result["validation_score"]])
        test_score = result["test_score"]["mean"]

        hp = determine_hyper_parameters(train)

        hp_str = str(hp)
        if hp_str in final:
            final[hp_str]["val"].append(val_score)
            final[hp_str]["test"].append(test_score)
            final[hp_str]["path"].append(result_path.split("/")[5].split(".")[-1])

        else:
            final[hp_str] = {
                "val": [val_score],
                "test": [test_score],
                "hyper_parameters": hp,
                "path": [result_path.split("/")[5].split(".")[-1]],
            }

    df_list = []
    for key in final:
        data = final[key]
        hp = data["hyper_parameters"]
        df_list.append(
            {
                "test": nanmean(data["test_explore"]),
                # "std_test": nanstd(data["test_explore"]),
                # "test_max": nanmax(data["test_explore"]),
                # "test_min": nanmin(data["test_explore"]),
                "val": nanmean(data["val_explore"]),
                # "std_val": nanstd(data["val_explore"]),
                "test_mm": nanmean(data["test_mm"]),
                # "test_mm_max": nanmax(data["test_mm"]),
                # "test_mm_min": nanmin(data["test_mm"]),
                # "std_test_mm": nanstd(data["test_mm"]),
                "val_mm": nanmean(data["val_mm"]),
                # "std_val_mm": nanstd(data["val_mm"]),
                "#_runs": len(data["test_explore"]),
                "capacity": hp.get("capacity", None),
                "agent_type": hp.get("agent_type", None),
                "pretrain_semantic": hp.get("pretrain_semantic", None),
                "semantic_decay_factor": hp.get("semantic_decay_factor", None),
                "room_size": hp.get("room_size", None),
                "mm_policy": hp.get("mm_policy", None),
                "qa_function": hp.get("qa_function", None),
                "explore_policy": hp.get("explore_policy", None),
                "num_iterations": hp.get("num_iterations", None),
                "replay_buffer_size": hp.get("replay_buffer_size", None),
                "warm_start": hp.get("warm_start", None),
                "terminates_at": hp.get("terminates_at", None),
                "target_update": hp.get("target_update", None),
                "min_epsilon": hp.get("min_epsilon", None),
                "gamma": hp.get("gamma", None),
                "path": data["path"],
            }
        )

    df = pd.DataFrame(df_list)
    df_sorted = df.sort_values(
        by=["capacity", "test"],
        ascending=[True, False],
    )
    return df_sorted


# Function to add blank rows and flag them
def add_blank_rows_and_flag(df):
    # Create a list to hold the new rows
    new_rows = []
    previous_capacity = None

    # Iterate through the dataframe rows
    for index, row in df.iterrows():
        if previous_capacity is not None and row["capacity"] != previous_capacity:
            # Add a blank row and flag it when the capacity changes
            blank_row = pd.Series({col: "" for col in df.columns})
            blank_row["flag"] = True
            new_rows.append(blank_row)
        # Append the current row
        new_row = row.copy()
        new_row["flag"] = False
        new_rows.append(new_row)
        previous_capacity = row["capacity"]

    # Create a new dataframe from the new rows
    new_df = pd.DataFrame(new_rows).reset_index(drop=True)
    return new_df


# Function to highlight the flagged rows
def highlight_blank_rows(row):
    if row.flag:
        return ["background-color: yellow"] * len(row)
    else:
        return [""] * len(row)


def get_all_data(
    size: Literal["xxs", "xs", "s", "m", "l", "xl", "xxl"],
    include_pretrain_semantic: bool = True,
    base_path: str = "training-results/non-equal-object-probs/",
) -> pd.DataFrame:
    df_1 = get_handcrafted(
        size, include_pretrain_semantic=include_pretrain_semantic, base_path=base_path
    )
    df_2 = get_dataframe(size, base_path=base_path)
    df = pd.concat([df_1, df_2], ignore_index=True)

    df = df.sort_values(
        by=["capacity", "test"],
        ascending=[True, False],
    )

    df.rename(columns={"semantic_decay_factor": "sem_decay"}, inplace=True)
    df.rename(columns={"pretrain_semantic": "pretrain_sem"}, inplace=True)
    df.rename(columns={"history_block_size": "history"}, inplace=True)

    # Add blank rows and flag them in the dataframe
    df_with_blanks = add_blank_rows_and_flag(df)

    # Apply the highlight function
    df_with_blanks_styled = df_with_blanks.style.apply(highlight_blank_rows, axis=1)

    # Apply number formatting to styled DataFrame
    df_with_blanks_styled = df_with_blanks_styled.format(na_rep="NaN", precision=2)

    return df, df_with_blanks_styled


def filter_paths(
    room_size: Literal["xxs", "xs", "s", "m", "l", "xl", "xxl"],
    agent_type: Literal["baseline", "episodic", "semantic", "hybrid"],
    num_iterations: int,
    capacity: int,
    semantic_decay_factor: float,
    base_path: str,
) -> list:
    paths = glob(
        os.path.join(base_path, f"baselines/room_size={room_size}/*/*/results.yaml")
    ) + glob(
        os.path.join(base_path, f"dqn/room_size={room_size}/*/*/explore/results.yaml")
    )

    if len(paths) == 0:
        return []

    filtered_paths = []
    for path in tqdm(paths):
        if "baseline" in path:
            train_path = path.replace("results.yaml", "train.yaml")
        else:
            train_path = path.replace("explore/results.yaml", "train.yaml")

        train_data = read_yaml(train_path)

        # Determine hyperparameters to check agent type and capacity
        hp = determine_hyper_parameters(train_data)

        if hp.get("num_iterations") != num_iterations:
            continue

        if hp.get("semantic_decay_factor") != semantic_decay_factor:
            continue

        # Check the agent type
        if hp.get("agent_type") != agent_type:
            continue

        # Check the capacity range
        if hp.get("capacity") != capacity:
            continue

        # If all conditions are met, add the path to the filtered list
        filtered_paths.append(path)

    return filtered_paths

In [None]:
# filter_paths(
#     room_size="xl-different-prob",
#     num_iterations=10000,
#     agent_type="hybrid",
#     capacity=12,
#     semantic_decay_factor=0.99,
#     base_path="training-results/non-equal-object-probs/",
# )

# for foo in filter_paths_by_num_iterations("xl-different-prob"):
#     dir_path = '/'.join(foo.split('/')[:-2])
#     shutil.rmtree(dir_path)

# # save dataframe as markdown
# df.to_markdown("./hp-tuning/xl-different-prob.md", index=False)

df, df_styled = get_all_data(
    "xxl-different-prob",
    include_pretrain_semantic=False,
    base_path="training-results/non-equal-object-probs/",
)
df_styled

In [None]:
# filter_paths(
#     room_size="xl-different-prob",
#     num_iterations=10000,
#     agent_type="hybrid",
#     capacity=12,
#     semantic_decay_factor=0.99,
#     base_path="training-results/non-equal-object-probs/",
# )

# for foo in filter_paths_by_num_iterations("xl-different-prob"):
#     dir_path = '/'.join(foo.split('/')[:-2])
#     shutil.rmtree(dir_path)

# # save dataframe as markdown
# df.to_markdown("./hp-tuning/xl-different-prob.md", index=False)

df, df_styled = get_all_data(
    "xxl-different-prob",
    include_pretrain_semantic=True,
    base_path="training-results/non-equal-object-probs/",
)
df_styled