In [6]:
from luna.state_abstraction_utils import AbstractStateExtraction
from luna.probabilistic_abstraction_model import (
    HmmModel,
    DtmcModel,
)
from types import SimpleNamespace
import numpy as np

llm = "alpaca_7B_with_semantics"
dataset = "truthful_qa"
result_save_path = "../../../data/songda"
extract_block_idx = 31
info_type = "hidden_states"
abstraction_method = "KMeans"
model_type = "HMM"
hmm_n_comp = 100
abstract_state_num = 15
pca_dim = 5
grid_history_dependency_num = 1

state_abstract_args = {
    "llm_name": llm,
    "result_save_path": result_save_path,
    "dataset": dataset,
    "test_ratio": 0.2,
    "extract_block_idx": extract_block_idx,
    "info_type": info_type,
    "is_attack_success": 1,
    "cluster_method": abstraction_method,
    "abstract_state": abstract_state_num,
    "pca_dim": pca_dim,
    "grid_history_dependency_num": grid_history_dependency_num,
    "result_eval_path": "{}/eval/{}".format(result_save_path, llm),
}


prob_args = {
    "llm_name": llm,
    "result_save_path": result_save_path,
    "dataset": dataset,
    "test_ratio": 0.2,
    "extract_block_idx": extract_block_idx,
    "info_type": info_type,
    "is_attack_success": 1,
    "iter_num": 30,
    "cluster_method": abstraction_method,
    "abstract_state": abstract_state_num,
    "pca_dim": pca_dim,
    "model_type": model_type,
    "hmm_components_num": hmm_n_comp if hmm_n_comp else "",
    "grid_history_dependency_num": grid_history_dependency_num
    if grid_history_dependency_num
    else "",
}

stat_dict = {
    "proper_stopped_and_true": 0,
    "proper_stopped_and_false": 0,
    "loop_generated_and_true": 0,
    "loop_generated_and_false": 0,
}

abs_args = SimpleNamespace(**state_abstract_args)

# abstractStateExtraction = AbstractStateExtraction(
#     abs_args, None, None, None
# )

hmm_model = HmmModel(
    prob_args["dataset"],
    prob_args["extract_block_idx"],
    prob_args["info_type"],
    prob_args["cluster_method"],
    prob_args["abstract_state"],
    prob_args["pca_dim"],
    prob_args["test_ratio"],
    prob_args["hmm_components_num"],
    prob_args["iter_num"],
    prob_args["is_attack_success"],
    prob_args["grid_history_dependency_num"],
    state_abstract_args["result_eval_path"],
)
(
    dtmc_transition_aucroc,
    dtmc_transition_fpr,
    dtmc_transition_tpr,
) = hmm_model.get_aucroc_by_transition_binding()

prob_model = hmm_model
test_abstract_traces = hmm_model.test_traces
val_abstract_traces = hmm_model.val_traces
train_abstract_traces = hmm_model.train_traces

train_data_points = [{} for _ in range(len(train_abstract_traces))]
for i, one_trace in enumerate(train_abstract_traces):
    train_data_points[i]["step_by_step_analyzed_trace"] = one_trace
    train_data_points[i]["label"] = hmm_model.train_groundtruths[i]


hmm_model.get_aucroc_by_state_binding()

all_instances = (
    hmm_model.train_instances + hmm_model.val_instances + hmm_model.test_instances
)
for instance in all_instances:
    classification = instance["is_loop_generated"]
    if classification == 0:
        if instance["binary_label"] >= 0.5:
            stat_dict["proper_stopped_and_true"] += 1
        else:
            stat_dict["proper_stopped_and_false"] += 1
    else:
        if instance["binary_label"] >= 0.5:
            stat_dict["loop_generated_and_true"] += 1
        else:
            stat_dict["loop_generated_and_false"] += 1

# calculate stat_dict percentage
total = sum(stat_dict.values())
for key in stat_dict:
    stat_dict[key] /= total

train_transition_matrix = hmm_model.train_transition_probs
semantic_value_dict = hmm_model.get_semantic_state_model()

print("train_transition_matrix")
print(train_transition_matrix)
print("state_semantics")
print(semantic_value_dict)
# save matrix
np.save(
    "train_transition_matrix.npy",
    train_transition_matrix,
)

FileNotFoundError: [Errno 2] No such file or directory: '../../../data/songda/eval/alpaca_7B_with_semantics/truthful_qa/31/hidden_states_KMeans_15_5_0.2.pkl'

In [3]:
semantic_dataset_dict = {
    "truthful_qa": "truth_probability",
    "sst2": "is_ood",
    "advglue++": "is_adversarial",
}


def dtmc_to_prism_updated(
    dtmc_dict,
    semantic_value_dict,
    output_file_path,
    llm_name,
    dataset,
    semantic_dataset_dict,
):
    # Increment state numbers by 1 to free up state 0
    incremented_dtmc_dict = {
        start_state
        + 1: {end_state + 1: prob for end_state, prob in transitions.items()}
        for start_state, transitions in dtmc_dict.items()
    }

    semantic_value_dict = {
        state + 1: semantic_value for state, semantic_value in semantic_value_dict.items()
    }

    # Find the maximum state number for the state range after incrementing
    max_state = max(incremented_dtmc_dict.keys())

    # Start writing the PRISM model file content
    prism_content = f"dtmc\n\nmodule {llm_name}\n\n"

    # Add the state declarations
    prism_content += f"// local state\nstate : [0..{max_state}] init 0;\n"
    prism_content += f"{semantic_dataset_dict[dataset]} : [0..100] init 0;\n\n"

    # Add the initial transition from state 0 to state 1 with probability 1
    prism_content += "// Initial transition from state 0 to state 1\n"
    prism_content += (
        f"[] state=0 -> 1 : (state'=1) & ({semantic_dataset_dict[dataset]}'={semantic_value_dict[1]});\n"
    )

    # Sort the states in ascending order and iterate over each start state in the incremented DTMC dictionary
    for start_state in sorted(incremented_dtmc_dict.keys()):

        transitions = incremented_dtmc_dict[start_state]
        if start_state == 19:
            print(transitions)

        # Skip the initial state since it has been already handled
        if start_state == 0:
            continue

        # Start the transitions for this state
        transitions_str = (
            f"// Transitions from state {start_state}\n[] state={start_state} -> "
        )

        # Gather the transition probabilities and next states
        transition_parts = []
        for end_state, probability in sorted(transitions.items()):
            # Format each transition part
            transition_parts.append(
                f"{probability} : (state'={end_state}) & ({semantic_dataset_dict[dataset]}'={semantic_value_dict[end_state]})"
            )

        # Concatenate transition parts with '+' and add to the transitions string
        transitions_str += " + ".join(transition_parts) + ";\n"

        # Add the transitions to the PRISM model content
        prism_content += transitions_str

    # End the module
    prism_content += "\nendmodule\n"

    # Write the content to the output file
    with open(output_file_path, "w") as f:
        f.write(prism_content)


# Specify the path for the updated output PRISM model file
updated_output_prism_file_path = "eval/prism/{}_{}_{}_{}_{}.pm".format(
    dataset, llm, model_type, abstract_state_num, pca_dim
)

# Convert the DTMC model dictionary to the updated PRISM format and write to a file
if dataset == "truthful_qa":
    train_state_matrix = prob_model.state_positive_prob_map
    dtmc_to_prism_updated(
        train_transition_matrix,
        semantic_value_dict,
        updated_output_prism_file_path,
        llm,
        dataset,
        semantic_dataset_dict,
    )
else:
    dtmc_to_prism_updated(
        train_transition_matrix,
        semantic_value_dict,
        updated_output_prism_file_path,
        llm,
        dataset,
        semantic_dataset_dict,
    )

{12: 0.8333333333333334, 274: 0.06060606060606061, 297: 0.007575757575757576, 136: 0.045454545454545456, 7: 0.007575757575757576, 202: 0.015151515151515152, 148: 0.007575757575757576, 20: 0.007575757575757576, 228: 0.007575757575757576, 242: 0.007575757575757576}


FileNotFoundError: [Errno 2] No such file or directory: 'eval/prism/truthful_qa_alpaca_7B_with_semantics_HMM_400_500.pm'