In [None]:
import os
from pathlib import Path
import sys
from typing import List

module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
    sys.path.append(module_path)

# write jsonl file
from ai_profiling.generators.planning_data_generator_datatypes import (
    ValidationLLMResponsePlanningData,
)
from ai_profiling.helpers.file_helper.file_helper import (
    get_base_model_from_json,
    get_files_in_folder,
)

!jupyter nbextension enable --py widgetsnbextension


planning_data = {"verbose": [], "concise": []}

def has_model_to_ignore(llm_response_json_evaluation_datum:ValidationLLMResponsePlanningData, to_ignore: List[str]) -> bool:
    if llm_response_json_evaluation_datum.llm_response_planning_data is not None:
        for to_ignore_name in to_ignore:
            if to_ignore_name.lower() in llm_response_json_evaluation_datum.llm_response_planning_data.llm_response.llm_model_id.lower():
                return True
    return False

folder_path = "/Users/jungkookang/Downloads/nl2flow/output/plan/llm_plan_evaluation/long"
file_paths = get_files_in_folder(folder_path=Path(folder_path), file_extension="json")

to_ignore=["codellama", "deepseek-coder"]

for file_path in file_paths:
    llm_response_json_evaluation_datum = get_base_model_from_json(
        file_path=file_path, base_model=ValidationLLMResponsePlanningData
    )
    if not has_model_to_ignore(llm_response_json_evaluation_datum, to_ignore):
        planning_data["verbose"].append(llm_response_json_evaluation_datum)


folder_path = "/Users/jungkookang/Downloads/nl2flow/output/plan/llm_plan_evaluation/short"
file_paths = get_files_in_folder(folder_path=Path(folder_path), file_extension="json")

for file_path in file_paths:
    llm_response_json_evaluation_datum = get_base_model_from_json(
        file_path=file_path, base_model=ValidationLLMResponsePlanningData
    )
    if not has_model_to_ignore(llm_response_json_evaluation_datum, to_ignore):
        planning_data["concise"].append(llm_response_json_evaluation_datum)

print(f"Data points: Long: {len(planning_data['verbose'])}, Short: {len(planning_data['concise'])}")

In [None]:
# add all data to each llm model
from collections import defaultdict
from copy import deepcopy
import pprint
from typing import Dict, List

llm_bin_meta = {}
llm_bin_meta_no_plan = {}
llm_bin_meta_plan = {}

long_plan_models = set()
for category in planning_data.keys():
    llm_bin: Dict[str, Dict[str, int]] = defaultdict(
        lambda: {"total": 0, "optimal": 0, "sound": 0, "valid": 0, "long_plan": 0, "plan_length": defaultdict(lambda: 0), "optimal_plan_length": defaultdict(lambda: 0), "prompt_length_total": 0}
    )
    llm_bin_no_plan: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "optimal": 0, "sound": 0, "valid": 0})
    llm_bin_plan: Dict[str, Dict[str, int]] = defaultdict(lambda: {"total": 0, "optimal": 0, "sound": 0, "valid": 0})
    for llm_response_planning_evaluation_unit in planning_data[category]:
        llm_model_id = (llm_response_planning_evaluation_unit.llm_response_planning_data.llm_response.llm_model_id).split("/")[-1]
        llm_model_id = llm_model_id.replace("3-1", "3.1").replace("3-3", "3.3")
        llm_bin[llm_model_id]["total"] += 1
        llm_bin[llm_model_id]["prompt_length_total"] += len(llm_response_planning_evaluation_unit.llm_response_planning_data.planning_prompt)
        
        optimal_plan_length = llm_response_planning_evaluation_unit.llm_response_planning_data.pddl_generator_output.planning_datum_tag.length_of_sequence
        llm_bin[llm_model_id]["optimal_plan_length"][optimal_plan_length] += 1
        is_no_plan = (
            optimal_plan_length
            == 0
        )

        if is_no_plan:
            llm_bin_no_plan[llm_model_id]["total"] += 1
            optimal_plan_length = 0
        else:
            llm_bin_plan[llm_model_id]["total"] += 1

        

        if llm_response_planning_evaluation_unit.llm_plan is not None:
            plan_length = len(llm_response_planning_evaluation_unit.llm_plan)
            llm_bin[llm_model_id]["plan_length"][plan_length] += 1
            if plan_length > 10:
                long_plan_models.add(llm_model_id)
                llm_bin[llm_model_id]["long_plan"] +=1


        # soundness
        if (
            llm_response_planning_evaluation_unit.report_soundness is not None
            and llm_response_planning_evaluation_unit.report_soundness.determination
        ):
            llm_bin[llm_model_id]["sound"] += 1
            if is_no_plan:
                llm_bin_no_plan[llm_model_id]["sound"] += 1
            else:
                llm_bin_plan[llm_model_id]["sound"] += 1

        # handle no plan case
        if (
            llm_response_planning_evaluation_unit.llm_response_planning_data.pddl_generator_output.planning_datum_tag.length_of_sequence
            == 0
        ):  # no plan case
            if (llm_response_planning_evaluation_unit.llm_plan is not None) and (
                len(llm_response_planning_evaluation_unit.llm_plan) == 0
            ):
                llm_bin[llm_model_id]["valid"] += 1
                llm_bin[llm_model_id]["optimal"] += 1
                llm_bin_no_plan[llm_model_id]["valid"] += 1
                llm_bin_no_plan[llm_model_id]["optimal"] += 1
        else:  # plan exists
            if (
                llm_response_planning_evaluation_unit.report_validity is not None
                and llm_response_planning_evaluation_unit.report_validity.determination
            ):
                llm_bin[llm_model_id]["valid"] += 1
                llm_bin_plan[llm_model_id]["valid"] += 1
            if (
                llm_response_planning_evaluation_unit.report_optimality is not None
                and llm_response_planning_evaluation_unit.report_optimality.determination
            ):
                llm_bin[llm_model_id]["optimal"] += 1
                llm_bin_plan[llm_model_id]["optimal"] += 1

    for model_name in llm_bin.keys():
        llm_bin[model_name]["long_plan_fraction"] = llm_bin[model_name]["long_plan"] / llm_bin[model_name]["total"]
        llm_bin[model_name]["avg_prompt_length"] = llm_bin[model_name]["prompt_length_total"] / llm_bin[model_name]["total"]
    
    llm_bin_meta[category] = deepcopy(llm_bin)
    llm_bin_meta_no_plan[category] = deepcopy(llm_bin_no_plan)
    llm_bin_meta_plan[category] = deepcopy(llm_bin_plan)


pprint.pp(long_plan_models)

pprint.pp(llm_bin_meta)
# pprint.pp(llm_bin_meta_no_plan)
# pprint.pp(llm_bin_meta_plan)

In [None]:
from scipy import stats
from ai_profiling.helpers.file_helper.file_helper import write_json_from_dict

result_distributions = {}
model_names = list(llm_bin_meta["verbose"].keys())
for model_name in model_names:
    dists = []
    for prompt_style in llm_bin_meta.keys():
        sample = []
        for plan_length, frequency in llm_bin_meta[prompt_style][model_name]["plan_length"].items():
            sample += [plan_length for _ in range(frequency)]
        dists.append(sample)
    statistic, p_value = stats.ks_2samp(dists[0], dists[1])
    u_statistic, u_p_value= stats.mannwhitneyu(dists[0], dists[1])
    result_distributions[model_name] = {"k-stat": float(statistic), "k-p_value": float(p_value), "u-stat": float(u_statistic), "u-p_value": float(u_p_value)}

write_json_from_dict(file_path=Path(os.path.join("output", "distribution", "plan_length_distribution.json")), dic=result_distributions)

In [None]:
import collections
from copy import deepcopy
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np

def process_model_obj(model_input, to_ignore):
    new_model_input = {}
    for model_name, val in model_input.items():
        new_model_name = model_name.split("/")[-1]
        if "DeepSeek-V3" == new_model_name:
            new_model_name += "(685b, fp8)"
        new_name = new_model_name.strip().lower()

        should_ignore = False
        for name_to_ignore in to_ignore:
            if name_to_ignore in new_name:
                should_ignore = True
                break
        if not should_ignore:
            new_model_input[new_name] = deepcopy(val)

    return collections.OrderedDict(sorted(new_model_input.items(), key=lambda it: str(it[0]), reverse=False)) # collections.OrderedDict(sorted(llm_bin.items()))

In [None]:
from math import ceil
from typing import Any, Set

from matplotlib.ticker import MaxNLocator


def make_len_length_chart_subplot(
    llm_bin_input,
    output_folder_path: str,
    max_y: int,
    to_ignore: List[str] = [],
    skip_interval: int = 1,
    max_x: Optional[float] = None,
    file_extention: str = "png",
    select_idx: int = 0,
) -> None:
    def get_length_arr(tmp_dict, min_val, max_val):
        lst = []
        for val in range(min_val, max_val + 1):
            tmp_val = tmp_dict[val] if val in tmp_dict else 0
            lst.append(tmp_val)

        return np.array(lst)

    key_name = "plan_length"
    optimal_key_name = "optimal_plan_length"
    new_llm_bin = deepcopy(llm_bin_input)
    llm_bin = {category: process_model_obj(val, to_ignore) for category, val in new_llm_bin.items()}

    # find min, max length
    max_plan_length = -1
    optimal_max_plan_length = -1
    for idx, category in enumerate(llm_bin.keys()):
        tmp_dict = llm_bin[category]

        for model_name, val_dict in tmp_dict.items():
            length_dict = val_dict[key_name]
            optimal_length_dict = val_dict[optimal_key_name]
            for plan_length in length_dict.keys():
                max_plan_length = max(max_plan_length, plan_length)
            for plan_length in optimal_length_dict.keys():
                optimal_max_plan_length = max(optimal_max_plan_length, plan_length)

    if select_idx == -1:  # length comparison chart
        total_data_dict: Dict[str, Dict[str, Any]] = {}
        model_names: Set[str] = set()
        for idx, category in enumerate(llm_bin.keys()):
            tmp_dict = llm_bin[category]
            # plan length
            x_lables = tuple(range(0, max_plan_length + 1))
            max_length_observed = max(max_plan_length, optimal_max_plan_length)
            # collect model names
            for model_name in tmp_dict.keys():
                model_names.add(model_name)
            data_dict = {
                model_name: get_length_arr(val_dict[key_name], min_val=0, max_val=max_length_observed)
                for model_name, val_dict in tmp_dict.items()
            }
            # optimal_data_dict = {model_name: get_length_arr(val_dict[optimal_key_name], min_val=0, max_val=max_length_observed) for model_name, val_dict in tmp_dict.items()}
            # num_subplots = len(data_dict) + 1
            total_data_dict[category] = {"plan_lengths": data_dict, "x_labels": np.arange(len(x_lables))}

        columns = 2
        rows = len(model_names)
        fig, ax = plt.subplots(rows, columns, figsize=(10, 20))

        model_names_list = list(model_names)
        model_names_list.sort()

        for col, category in enumerate(total_data_dict):
            for row, model_name in enumerate(model_names_list):
                plan_length_distribution = total_data_dict[category]["plan_lengths"][model_name]
                x = total_data_dict[category]["x_labels"]
                # plot
                rects = ax[row, col].bar(x, plan_length_distribution)
                ax[row, col].bar_label(rects, padding=3, fontsize=8)  # fmt="%.2f"

                pre_fix = category.capitalize() + " prompt style" + "\n\n" if row == 0 else ""

                ax[row, col].set_title(pre_fix + f"{model_name.capitalize()}", y=1)
                max_x_tmp = (max_plan_length + 1) if max_x is None else max_x
                ax[row, col].set_ylim(0, max_y)
                ax[row, col].set_xlim(0, int(max_x_tmp))
                ax[row, col].tick_params(axis="x", labelsize=12)
                ax[row, col].tick_params(axis="y", labelsize=12)
                ax[row, col].set_xlabel("Plan length", fontsize=12)
                ax[row, col].set_ylabel("Frequency", fontsize=12)
                ax[row, col].spines[["right", "top"]].set_visible(False)
                ax[row, col].xaxis.set_major_locator(MaxNLocator(integer=True))
        plt.tight_layout()
        plt.savefig(
            os.path.join(output_folder_path, ("plan_length_comparison" + "." + file_extention)), bbox_inches="tight"
        )
        plt.show()

    else: # only for optimal plan length plot
        for idx, category in enumerate(llm_bin.keys()):
            if idx != select_idx:
                continue

            tmp_dict = llm_bin[category]
            # plan length
            x_lables = tuple(range(0, max_plan_length + 1))

            max_length_observed = max(max_plan_length, optimal_max_plan_length)
            data_dict = {
                model_name: get_length_arr(val_dict[key_name], min_val=0, max_val=max_length_observed)
                for model_name, val_dict in tmp_dict.items()
            }
            optimal_data_dict = {
                model_name: get_length_arr(val_dict[optimal_key_name], min_val=0, max_val=max_length_observed)
                for model_name, val_dict in tmp_dict.items()
            }
            num_subplots = len(data_dict) + 1
            columns = 2 if num_subplots > 1 else 1
            rows = ceil(num_subplots / 2)
            fig, ax = plt.subplots(1, 1, figsize=(4, 4))  # figsize=(12, 8) height_ratios=[rows,1]

            x = np.arange(len(x_lables))  # the label locations
            # counter = 0
            # for attribute, measurement in data_dict.items():
            #     row = int(counter / 2)
            #     col = counter % 2
            #     rects = ax[row, col].bar(x, measurement)
            #     ax[row, col].bar_label(rects, padding=3, fontsize=8)  # fmt="%.2f"
            #     ax[row, col].set_title(f"{attribute.capitalize()}", y=1)
            #     max_x_tmp = (max_plan_length + 1) if max_x is None else max_x
            #     ax[row, col].set_ylim(0, max_y)
            #     ax[row, col].set_xlim(0, int(max_x_tmp))
            #     ax[row, col].tick_params(axis="x", labelsize=12)
            #     ax[row, col].tick_params(axis="y", labelsize=12)
            #     ax[row, col].set_xlabel("Plan length", fontsize=12)
            #     ax[row, col].set_ylabel("Frequency", fontsize=12)
            #     ax[row, col].spines[["right", "top"]].set_visible(False)
            #     ax[row, col].xaxis.set_major_locator(MaxNLocator(integer=True))
            #     counter += 1

            # Optimal plan length
            for attribute, measurement in optimal_data_dict.items():
                row = 0
                col = 0
                rects = ax.bar(x, measurement)
                ax.bar_label(rects, padding=3, fontsize=8)  # fmt="%.2f"
                # ax.set_title("Optimal plan", y=1)
                max_x_tmp = (max_plan_length + 1) if max_x is None else max_x
                ax.set_ylim(0, max_y)
                ax.set_xlim(0, int(max_x_tmp))
                ax.tick_params(axis="x", labelsize=12)
                ax.tick_params(axis="y", labelsize=12)
                ax.set_xlabel("Plan length", fontsize=12)
                ax.set_ylabel("Frequency", fontsize=12)
                ax.spines[["right", "top"]].set_visible(False)
                ax.xaxis.set_major_locator(MaxNLocator(integer=True))
                # ax[rows-1, columns -1].set_visible(False)
                break

        # if counter < (rows * columns):
        #     ax[rows-1, columns -1].set_visible(False)

        plt.tight_layout()
        plt.savefig(os.path.join(output_folder_path, ("plan_length_optimal" + "." + file_extention)), bbox_inches="tight")
        plt.show()

In [None]:
# Prompt effect on plan length
make_len_length_chart_subplot(llm_bin_input=llm_bin_meta, output_folder_path="output/plot", max_y=700, to_ignore=["codellama", "deepseek-coder"], max_x=15, select_idx=-1) # select index determines verbose or concise

In [None]:
# Optimal plan length
make_len_length_chart_subplot(llm_bin_input=llm_bin_meta, output_folder_path="output/plot", max_y=700, to_ignore=["codellama", "deepseek-coder"], max_x=15, select_idx=0) # select index determines verbose or concise

In [None]:
def make_planning_chart_multi_rev(
    llm_bin_input,
    file_name_path: str,
    is_no_plan: bool = False,
) -> None:
    new_llm_bin = deepcopy(llm_bin_input)
    # llm_bin: soundness, validity, optimality -> long, short -> model
    llm_bin2 = {}
    include_categories = {"optimal": "Optimality", "valid": "Validity", "sound": "Soundness"}
    for prompt_type, obj in new_llm_bin.items():

        for model_name, obj2 in obj.items():
            total = obj2["total"]
            for quality, metric in obj2.items():
                if quality not in include_categories:
                    continue
                if quality == "total":
                    continue
                if quality not in llm_bin2:
                    llm_bin2[quality] = {}
                if prompt_type not in llm_bin2[quality]:
                    llm_bin2[quality][prompt_type] = {}
                llm_bin2[quality][prompt_type][model_name] = metric / total
    rows = len(llm_bin2)
    cols = len(llm_bin2["valid"])
    fig, ax = plt.subplots(1, cols, figsize=(10, 5)) if is_no_plan else plt.subplots(rows, cols, figsize=(10, 10))
    fig.tight_layout()
    handles = None
    labels = None


    for quality, obj in llm_bin2.items():
        if is_no_plan and quality != "valid":
            continue
        row = 0
        if not is_no_plan:
            if quality == "valid":
                row = 1
            elif quality == "optimal":
                row = 2
        
        for prompt_type, obj2 in obj.items():
            col = 0
            if prompt_type == "concise":
                col = 1
            
            width = 0.25  # the width of the bars
            multiplier = 0
            model_names = list(obj2.keys())
            model_names.sort()
            values = [obj2[model_name] for model_name in model_names]
            x = np.arange(1)  # the label locations
            for idx, model_name in enumerate(model_names):
                offset = width * multiplier
                if is_no_plan:
                    rects = ax[col].bar(x + offset, [values[idx]], width * 0.8, label=model_name.capitalize())
                    ax[col].bar_label(rects, padding=3, fontsize=12, fmt="%.2f")
                else:
                    rects = ax[row, col].bar(x + offset, [values[idx]], width * 0.8, label=model_name.capitalize())
                    ax[row, col].bar_label(rects, padding=3, fontsize=12, fmt="%.2f")
                multiplier += 1


            # rects = ax[row, col].bar(model_names, values)
            # ax[row, col].bar_label(rects, padding=3, fontsize=8, fmt="%.2f") #
            if is_no_plan:
                if row == 0:
                    ax[col].set_title(f"{prompt_type.capitalize()} prompt", y=1, fontsize=12)
                ax[col].set_ylim(0, 1.0)
                ax[col].xaxis.set_visible(False) 
                # ax[row, col].tick_params(axis='x', labelsize=12)
                ax[col].tick_params(axis='y', labelsize=12)
                ax[col].spines[["right", "top"]].set_visible(False)
                # ax[row, col].set_xlabel("Plan length", fontsize=12)
                if col == 0:
                    ax[col].set_ylabel("Correct \"no plan\" detection rate", fontsize=12)

                if col == cols -1:
                    # ax[row, col].legend(loc="best",fontsize='small') # bbox_to_anchor=(legend_x, legend_height)
                    handles, labels = ax[col].get_legend_handles_labels()
                # ax[row, col].xaxis.set_major_locator(MaxNLocator(integer=True))
            else:
                if row == 0:
                    ax[row, col].set_title(f"{prompt_type.capitalize()} prompt", y=1, fontsize=12)
                ax[row, col].set_ylim(0, 1.0)
                ax[row, col].xaxis.set_visible(False) 
                # ax[row, col].tick_params(axis='x', labelsize=12)
                ax[row, col].tick_params(axis='y', labelsize=12)
                ax[row, col].spines[["right", "top"]].set_visible(False)
                # ax[row, col].set_xlabel("Plan length", fontsize=12)
                if col == 0:
                    ax[row, col].set_ylabel(f"{include_categories[quality]}", fontsize=12)

                if row == rows -1 and col == cols -1:
                    # ax[row, col].legend(loc="best",fontsize='small') # bbox_to_anchor=(legend_x, legend_height)
                    handles, labels = ax[row, col].get_legend_handles_labels()
                # ax[row, col].xaxis.set_major_locator(MaxNLocator(integer=True))

    labels, handles = zip(*sorted(zip(labels, handles), key=lambda t: t[0]))
    
    if is_no_plan:
        fig.legend(handles, labels, bbox_to_anchor=(0.97, 0.9), fontsize="medium", loc='upper right', frameon=False)
    else:
        fig.legend(handles, labels,bbox_to_anchor=(1.15, 0.6), fontsize="large", loc='upper center', frameon=False)
    plt.tight_layout()
    plt.savefig(file_name_path, bbox_inches="tight")
    plt.show()
 

# all data

In [None]:
make_planning_chart_multi_rev(llm_bin_input=llm_bin_meta, file_name_path="output/plot/planning_all_data.png", is_no_plan=False)

# No Plan Case

In [None]:
make_planning_chart_multi_rev(llm_bin_input=llm_bin_meta_no_plan, file_name_path="output/plot/planning_no_plan.png", is_no_plan=True)

# There is a Plan

In [None]:
make_planning_chart_multi_rev(llm_bin_input=llm_bin_meta_plan, file_name_path="output/plot/planning_plan.png", is_no_plan=False)

# Create Regression Data

In [None]:
import statistics


llm_bin_meta_reg = {}
prompt_stat = {"verbose": {"raw": []}, "concise": {"raw": []}}
for category in planning_data.keys():
    X = []
    Y = []
    X_model = {}
    Y_model = {}
    X_0 = []  # no plan case
    Y_0 = []  # no plan case
    X_model_0 = {}  # no plan case
    Y_model_0 = {}  # no plan case

    for llm_response_planning_evaluation_unit in planning_data[category]:
        llm_model_id = llm_response_planning_evaluation_unit.llm_response_planning_data.llm_response.llm_model_id
        if llm_model_id not in X_model:
            X_model[llm_model_id] = []
            Y_model[llm_model_id] = []
            X_model_0[llm_model_id] = []
            Y_model_0[llm_model_id] = []

        # X
        tag = llm_response_planning_evaluation_unit.llm_response_planning_data.pddl_generator_output.planning_datum_tag

        prompt_length = len(llm_response_planning_evaluation_unit.llm_response_planning_data.planning_prompt)
        prompt_stat[category]["raw"].append(prompt_length)

        if tag.length_of_sequence > 0:
            # 1. sequence length
            # 2. Actions
            # 3. input parameters
            # 4. coupling
            # 5. slot-fill
            # 6. goals

            tmp_x = [
                float(tag.length_of_sequence),
                float(tag.number_of_agents),
                float(tag.input_parameters_per_agent),
                float(tag.coupling_of_agents),
                float(
                    llm_response_planning_evaluation_unit.llm_response_planning_data.pddl_generator_output.agent_info_generator_input.proportion_slot_fillable_variables
                ),
                float(tag.number_of_goals),
                # float(len(llm_response_planning_evaluation_unit.llm_response_planning_data.planning_prompt)),
                # float(len(llm_response_planning_evaluation_unit.llm_response_planning_data.llm_response.generated_text)),
            ]
            X.append(tmp_x)
            X_model[llm_model_id].append(tmp_x)
        else:
            # 1. Actions
            # 2. input parameters
            # 3. coupling
            # 4. slot-fill
            # 5. goals
            tmp_x = [
                float(tag.number_of_agents),
                float(tag.input_parameters_per_agent),
                float(tag.coupling_of_agents),
                float(
                    llm_response_planning_evaluation_unit.llm_response_planning_data.pddl_generator_output.agent_info_generator_input.proportion_slot_fillable_variables
                ),
                float(tag.number_of_goals),
                # float(len(llm_response_planning_evaluation_unit.llm_response_planning_data.planning_prompt)),
                # float(len(llm_response_planning_evaluation_unit.llm_response_planning_data.llm_response.generated_text)),
            ]
            X_0.append(tmp_x)
            X_model_0[llm_model_id].append(tmp_x)

        # Y
        sound_planning = 0
        valid_planning = 0
        optimal_planning = 0

        # soundness
        if (
            llm_response_planning_evaluation_unit.report_soundness is not None
            and llm_response_planning_evaluation_unit.report_soundness.determination
        ):
            sound_planning = 1

        # handle no plan case
        if (
            llm_response_planning_evaluation_unit.llm_response_planning_data.pddl_generator_output.planning_datum_tag.length_of_sequence
            == 0
        ):  # no plan case
            if (llm_response_planning_evaluation_unit.llm_plan is not None) and (
                len(llm_response_planning_evaluation_unit.llm_plan) == 0
            ):
                valid_planning = 1
                optimal_planning = 1
        else:  # plan exists
            if (
                llm_response_planning_evaluation_unit.report_validity is not None
                and llm_response_planning_evaluation_unit.report_validity.determination
            ):
                valid_planning = 1
            if (
                llm_response_planning_evaluation_unit.report_optimality is not None
                and llm_response_planning_evaluation_unit.report_optimality.determination
            ):
                optimal_planning = 1

        tmp_y = [sound_planning, valid_planning, optimal_planning]

        if tag.length_of_sequence > 0:
            Y.append(tmp_y)
            Y_model[llm_model_id].append(tmp_y)
        else:
            Y_0.append(tmp_y)
            Y_model_0[llm_model_id].append(tmp_y)

    llm_bin_meta_reg[category] = {
        "X_model": X_model,
        "Y_model": Y_model,
        "X": X,
        "Y": Y,
        "X_model_0": X_model_0,
        "Y_model_0": Y_model_0,
        "X_0": X_0,
        "Y_0": Y_0,
    }


for category in prompt_stat.keys():
    prompt_stat[category]["mean"] = statistics.mean(prompt_stat[category]["raw"])
    print(prompt_stat[category]["mean"])

In [None]:
from sklearn.preprocessing import normalize
import numpy as np
for category in llm_bin_meta_reg.keys():
    tmp_data_dict = llm_bin_meta_reg[category]
    X = tmp_data_dict["X"]
    X_model = tmp_data_dict["X_model"]
    X_0 = tmp_data_dict["X_0"]
    X_model_0 = tmp_data_dict["X_model_0"]

    # normalize
    X_1 = np.array(X, dtype=np.float64)
    X_n = normalize(X_1, axis=0, norm="l2")

    X_1_0 = np.array(X_0, dtype=np.float64)
    X_n_0 = normalize(X_1_0, axis=0, norm="l2")

    X_model_n = {}
    for llm_model_id, data_x in X_model.items():
        tmp_1 = np.array(data_x, dtype=np.float64)
        X_model_n[llm_model_id] = normalize(tmp_1, axis=0, norm="l2")
    
    X_model_n_0 = {}
    for llm_model_id, data_x in X_model_0.items():
        tmp_1 = np.array(data_x, dtype=np.float64)
        X_model_n_0[llm_model_id] = normalize(tmp_1, axis=0, norm="l2")

    llm_bin_meta_reg[category]["X_n"] = X_n
    llm_bin_meta_reg[category]["X_model_n"] = X_model_n
    llm_bin_meta_reg[category]["X_n_0"] = X_n_0
    llm_bin_meta_reg[category]["X_model_n_0"] = X_model_n_0


In [None]:
# choose Y
sound_idx = 0
valid_idx = 1
optimal_idx = 2

In [None]:
from typing import Any
import statsmodels.api as sm

from ai_profiling.helpers.file_helper.file_helper import write_txt_file


def get_logistic_regression(llm_bin_meta_reg: Any, group_name: str, output_folder_path: str, has_plan: bool):

    for evaluation_idx in range(3):
        if evaluation_idx == 0:
            evaluation_str = "Soundness"
        elif evaluation_idx == 1:
            evaluation_str = "Validity"
        elif evaluation_idx == 2:
            evaluation_str = "Optimality"

        print(f"\n\n\n\n Evaluation Category: {evaluation_str} \n\n\n\n")

        for category in llm_bin_meta_reg.keys():
            print(f"Category: {category}")
            X_key = "X_n" if has_plan else "X_n_0"
            X_model_key = "X_model_n" if has_plan else "X_model_n_0"
            Y_key = "Y" if has_plan else "Y_0"
            Y_model_key = "Y_model" if has_plan else "Y_model_0"

            tmp = llm_bin_meta_reg[category]
            X_n = tmp[X_key]
            X_model_n = tmp[X_model_key]
            Y = tmp[Y_key]
            Y_model = tmp[Y_model_key]

            print("\n\n\nSubject: Total")
            logit = sm.Logit(np.array(list(map(lambda arr: arr[evaluation_idx], Y))), X_n)
            result = logit.fit_regularized(method="l1", alpha=1)
            # result = logit.fit(maxiter=1000)
            print(result.summary())
            model_name = "total"
            name = f"{category}_{model_name}"

            write_txt_file(
                file_path=os.path.join(output_folder_path, group_name, (name + ".csv")), text=result.summary().as_csv()
            )

            for llm_model_id in X_model_n.keys():
                print(f"\n\n\nSubject: {llm_model_id}")
                x_n = X_model_n[llm_model_id]
                y = np.array(list(map(lambda arr: arr[evaluation_idx], Y_model[llm_model_id])))
                logit = sm.Logit(y, x_n)
                result = logit.fit_regularized(method="l1", alpha=1)
                print(result.summary())
                tmp_model_name = llm_model_id.split("/")[-1]
                name = f"{category}_{tmp_model_name}"
                write_txt_file(
                    file_path=os.path.join(output_folder_path, group_name, evaluation_str, ( name + ".csv")),
                    text=result.summary().as_csv(),
                )

# Logistic Regression

In [None]:
get_logistic_regression(llm_bin_meta_reg=llm_bin_meta_reg, group_name="plan", output_folder_path="output/regression_plan", has_plan=True)
get_logistic_regression(llm_bin_meta_reg=llm_bin_meta_reg, group_name="no_plan", output_folder_path="output/regression_plan", has_plan=False)