In [None]:
import io
import itertools
import json
import re
import os
import io
import sys
import functools
from contextlib import redirect_stdout
from pathlib import Path
from collections import defaultdict
from pprint import pprint
sys.path.append(f"..")
sys.path.append(r"../../../pmsat-inference")
sys.path.append(r"../../../AALpy")

import matplotlib.pyplot as plt
from aalpy import MooreMachine, MooreState
from aalpy.utils import load_automaton_from_file

from typing import Any, TypeAlias
from collections.abc import Callable
HeuristicFunction: TypeAlias = Callable[[MooreMachine, dict[str, Any], list[list[str | list[str]]]], float]

# TODO: outdated file, needs new pythonpath


# TODO: use heuristics from active_pmsatlearn/heuristics

In [None]:
EXAMPLES = "ping_pong_example", "simple_example_server", "simple_example_server_with_glitch"
DEFAULT_DATA_DIR = "../../generated_data_4"

In [None]:
def load_example(example: str) -> tuple[MooreMachine, tuple[MooreMachine, dict[str, Any]], dict[str, Any]]:
    """ Load the example from the pmsat-inference repo.
    
    Returns a tuple of:
        real model
        tuple or (learned model, pmsat info)
        info dict (containing used traces)
    """
    def load_automaton(*args, **kwargs) -> MooreMachine:
        """ Call aalpy.utils.load_automaton_from_file, but suppress it's print output 
        (aalpy prints a warning that our automata are not input complete) """
        with io.StringIO() as f, redirect_stdout(f):
            return load_automaton_from_file(*args, **kwargs)
    
    assert example in EXAMPLES
    example_dir = Path("../pmsat-inference/examples-results/" + example)
    real_model = load_automaton(example_dir / "RealModel.dot", "moore")
    
    pmsat_learned_models = {}
    pmsat_infos = {}
    base_pattern = "pmsatLearned-rc2-N(\d)-RN(2|3|4)"
    model_pattern = fr"{base_pattern}\.dot"
    info_pattern = fr"{base_pattern}\.json"
    for root, _, files in os.walk(example_dir):
        for file in files:
            if match := re.search(model_pattern, file):
                pmsat_learned_models[match.group(1)] = load_automaton(example_dir / file, "moore")
            elif match := re.search(info_pattern, file):
                with open(example_dir/file, "r") as f:
                    pmsat_infos[match.group(1)] = json.load(f)
    
    models_and_info = []
    for num_states_learned, model in pmsat_learned_models.items():
        pmsat_info = pmsat_infos[num_states_learned]
        models_and_info.append((model, pmsat_info))
        
    with open(example_dir / "info.json", "r") as f:
        info = json.load(f)
        
    return real_model, models_and_info, info

In [None]:
def load_data(model_name: str, models_dir: str = DEFAULT_DATA_DIR) -> tuple[MooreMachine, tuple[MooreMachine, dict[str, Any], dict[str, Any]], dict[str, Any]]:
    """ Load the data from one learned model.
    NOTE: This only returns actually learned models!
          If a model could not be learned with the given number of states, it is not included in learned_models!
          Therefore, no info about runs with too few states (not possible to learn) or too many states (timeout) is included!
    
    Returns a tuple of:
        real model
        list of tuples of (learned model, pmsat info, learning result)
        info dict of whole run (containing used traces)
    """
    def load_automaton(*args, **kwargs) -> MooreMachine:
        """ Call aalpy.utils.load_automaton_from_file, but suppress it's print output 
        (aalpy prints a warning that our automata are not input complete) """
        with io.StringIO() as f, redirect_stdout(f):
            return load_automaton_from_file(*args, **kwargs)
    
    model_dir = Path(models_dir) / model_name
    assert model_dir.exists(), f"{model_dir} does not exist!"
    real_model = load_automaton(model_dir / "RealModel.dot", "moore")
    
    pmsat_learned_models = {}
    pmsat_infos = {}
    learning_results = {}
    model_pattern = r"LearnedModel_(\d)States\.dot"
    info_pattern = r"LearningInfo_(\d)States\.json"
    result_pattern = r"LearningResults_(\d)States\.json"
    for root, _, files in os.walk(model_dir):
        for file in files:
            if match := re.search(model_pattern, file):
                pmsat_learned_models[match.group(1)] = load_automaton(model_dir / file, "moore")
            elif match := re.search(info_pattern, file):
                with open(model_dir/file, "r") as f:
                    pmsat_infos[match.group(1)] = json.load(f)
            elif match := re.search(result_pattern, file):
                with open(model_dir/file, "r") as f:
                    learning_results[match.group(1)] = json.load(f)
    
    models_and_info = []
    for num_states_learned, model in pmsat_learned_models.items():
        pmsat_info = pmsat_infos[num_states_learned]
        learning_result = learning_results[num_states_learned]
        models_and_info.append((model, pmsat_info, learning_result))
        
    with open(model_dir / "info.json", "r") as f:
        info = json.load(f)
        
    return real_model, models_and_info, info

In [None]:
def calculate_heurisic_scores(real_model: MooreMachine, learned_models: list[tuple[MooreMachine, dict, dict]], infos: dict, heuristic: HeuristicFunction) -> dict[int, float]:
    """ 
    Calculate the heuristic scores for multiple learned models of one single automaton.
    :param real_model: the original automaton
    :param learned_models: list of tuples of (learned model, pmsat info)
    :param infos: learning info dict for the whole run (e.g. containing used traces)
    :param heuristic: the heuristic function to calculate the scores with
    
    :return a dict of (distance_to_real_model -> heuristic score)
    """
    traces = infos["traces"]
    
    def distance(real_model: MooreMachine, learned_model: MooreMachine):
        return len(learned_model.states) - len(real_model.states)
    
    distance_to_heuristic = {
        distance(real_model, learned_model): heuristic(learned_model, learning_info, traces) for learned_model, learning_info, learning_result in learned_models
    }
    
    return distance_to_heuristic


def plot_heuristic_scores(real_model: MooreMachine, learned_models: list[tuple[MooreMachine, dict, dict]], infos: dict, heuristic: HeuristicFunction, new_plot: bool = True, show_plot: bool = True, as_line: bool = False, mark_best_model: bool=False):
    """ 
    Plot the heuristic scores of multiple learned models of one original automaton over distance to the original automaton.
    This calls calculate_heuristic_scores() to calculate the values.
    
    :param real_model: the original automaton
    :param learned_models: list of tuples of (learned model, pmsat info)
    :param infos: learning info dict for the whole run (e.g. containing used traces)
    :param heuristic: the heuristic function to calculate the scores with
    :param new_plot: whether to create a new plot, or to write to the current plot instance in plt
    :param show_plot: whether to show the plot directly
    :param as_line: Whether to connect all points in this plot by a line
    :param mark_best_model: whether to mark the best model in the plot by 
    """
    scores = calculate_heurisic_scores(real_model, learned_models, infos, heuristic)
    if new_plot:
        plt.figure()

    plt.axvline(0, color='red', linewidth=0.5, linestyle='--') # TODO this line is added many times
    
    if as_line:
        plt.plot(scores.keys(), scores.values())
    else:
        plt.scatter(scores.keys(), scores.values())
        
    if mark_best_model:
        best_key = max(scores, key=scores.get)
        best_value = scores[best_key]
        plt.scatter([best_key], [best_value], marker='*', facecolor='none', edgecolors="red", s=75, zorder=5)

    curr_ticks, curr_tick_labels = plt.xticks()
    if not set(scores.keys()).issubset(set(curr_ticks)) or any(int(ct) != ct for ct in curr_ticks):
        # add new ticks if (a) current keys are not in current ticks or (b) current ticks contains floats
        plt.xticks(list(scores.keys()))
        
    plt.xlabel("Distance to ground truth")
    plt.ylabel(f"{heuristic.__name__} score")
    plt.title(f"Scores of heuristic '{heuristic.__name__}'")
    
    if show_plot:
        plt.show()
    
    
def calculate_heuristic_scores_for_example(example: str, heuristic: HeuristicFunction) -> dict[int, float]:
    """ Calculate the heuristic scores of a given example from the pmsat-inference repo. """
    real_model, learned_models, infos = load_example(example)
    return calculate_heurisic_scores(real_model, learned_models, infos, heuristic)


def plot_heuristic_scores_for_example(example: str, heuristic: HeuristicFunction, **kwargs):
    """ Plot the heuristic scores of a given example from the pmsat-inference repo."""
    real_model, learned_models, infos = load_example(example)
    plot_heuristic_scores(real_model, learned_models, infos, heuristic, **kwargs)
    
    
def calculate_heuristic_scores_for_model(model: str, heuristic: HeuristicFunction, models_dir: str = DEFAULT_DATA_DIR) -> dict[int, float]:
    """ Calculate the heuristic scores of the learnt models of a given model from the given models directory"""
    real_model, learned_models, infos = load_data(model, models_dir)
    return calculate_heurisic_scores(real_model, learned_models, infos, heuristic)


def plot_heuristic_scores_for_model(model: str, heuristic: HeuristicFunction, models_dir: str = DEFAULT_DATA_DIR, **kwargs):
    """ Plot the heuristic scores of the learnt models of a given model from the given models directory"""
    real_model, learned_models, infos = load_data(model, models_dir)
    plot_heuristic_scores(real_model, learned_models, infos, heuristic, **kwargs)


def plot_heuristic_scores_for_all_models_in_dir(heuristic: HeuristicFunction, models_dir: str = DEFAULT_DATA_DIR, **kwargs):
    """ Plot the heuristic scores of learnt models of all models in the given models directory"""
    plt.figure()
    for model_name in os.listdir(models_dir):
        if not (Path(models_dir) / model_name).is_dir():
            continue
        plot_heuristic_scores_for_model(model_name, heuristic, models_dir=models_dir, new_plot=False, show_plot=False, **kwargs)
    plt.show()
    
def _calc_dist_to_num_best_models(heuristic: HeuristicFunction, models_dir=DEFAULT_DATA_DIR):
    dist_to_num_best_models_at_dist = defaultdict(int)
    for model_name in os.listdir(models_dir):
        if not (Path(models_dir) / model_name).is_dir():
            continue
        scores = calculate_heuristic_scores_for_model(model_name, heuristic, models_dir=models_dir)
        best_key = max(scores, key=scores.get)
        dist_to_num_best_models_at_dist[best_key] += 1
        
    return dict(dist_to_num_best_models_at_dist)

def plot_number_of_best_models_identified(heuristic: HeuristicFunction, models_dir: str = DEFAULT_DATA_DIR, new_plot=False, show_plot=True, as_line=False):
    """ 
    Plot the number of best models identified with the given heuristic, calculated on all models in the given models directory
    
    :param heuristic: The heuristic to use
    :param models_dir: The models directory
    :param new_plot: Whether to create a new plot instance or use the existing one
    :param show_plot: Whether to show the plot
    :param as_line: Whether to show the data points as a line or as disconnected points
    """
    dist_to_num_best_models_at_dist = _calc_dist_to_num_best_models(heuristic, models_dir=models_dir)
    if new_plot:
        plt.figure()
    
    label = heuristic.__name__    
    if as_line:
        keys = sorted(dist_to_num_best_models_at_dist.keys())
        vals = [dist_to_num_best_models_at_dist[k] for k in keys]
        plt.plot(keys, vals, label=label)
    else:
        plt.scatter(dist_to_num_best_models_at_dist.keys(), dist_to_num_best_models_at_dist.values(), label=label)
    
    plt.title(f"Number of best models identified at distance")
    plt.xlabel("Distance to ground truth")
    plt.ylabel("Number of best models identified")
    plt.legend(loc=(1.04, 1))
    
    if show_plot:
        plt.show()
    
def print_heuristic_stats_for_all_models_in_dir(*heuristics: HeuristicFunction, models_dir=DEFAULT_DATA_DIR, **kwargs):    
    from prettytable import PrettyTable
    
    num_models = sum(1 for m in os.listdir(models_dir) if (Path(models_dir) / m).is_dir())
    
    stats = {}
    for heuristic in heuristics:
        stats[heuristic.__name__] = _calc_dist_to_num_best_models(heuristic, models_dir=models_dir)

    all_distances = set()
    for dist_to_num_best in stats.values():
        all_distances.update(dist_to_num_best.keys())
    all_distances = sorted(all_distances)

    table = PrettyTable()
    table.field_names = ["Distance"] + [heuristic.__name__ for heuristic in heuristics]

    for distance in all_distances:
        row = [distance]
        for heuristic in heuristics:
            dist_to_num_best = stats[heuristic.__name__]
            num_best_models = dist_to_num_best.get(distance, 0)
            percent = (num_best_models / num_models * 100) if num_models > 0 else 0
            row.append(f"{num_best_models} ({percent:.1f}%)")
        table.add_row(row)

    print(table)
    

In [None]:
def compare_heuristics(heuristics: list[HeuristicFunction], models_dir: str = DEFAULT_DATA_DIR):
    for heuristic in heuristics:
        plot_heuristic_scores_for_all_models_in_dir(heuristic, models_dir=models_dir, as_line=True, mark_best_model=True)
    
    plt.figure()
    for heuristic in heuristics:
        plot_number_of_best_models_identified(heuristic, models_dir=models_dir, new_plot=False, show_plot=False, as_line=True)
    plt.show()    
    
    print_heuristic_stats_for_all_models_in_dir(*heuristics, models_dir=models_dir)

def _create_partial_heuristics(heuristic: HeuristicFunction, **kwargs):
    def to_str(x):
        if callable(x):
            return x.__name__
        return str(x)
    heuristic_functions = []
    all_combinations = list(itertools.product(*kwargs.values()))
    for combination in all_combinations:

        # round floats to 2 comma
        new_comb = []
        for component in combination:
            if isinstance(component, float):
                component = round(component, 2)
            new_comb.append(component)
        combination = new_comb

        kwarg_dict = dict(zip(kwargs.keys(), combination))
        fun = functools.partial(heuristic, **kwarg_dict)
        fun.__name__ = heuristic.__name__ + "(" + ", ".join(f"{key}={to_str(value)}" for key, value in kwarg_dict.items()) + ")"
        heuristic_functions.append(fun)
    return heuristic_functions
    

In [None]:
import numpy as np

def base_heuristic(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, stat_glitched=np.mean, stat_dominant=np.min):
    score = 0
        
    # glitched_trans_freq: use mean (or max) of find upper outliers, which should actually be dominant transitions
    # the higher the mean is, the more likely are glitched transitions which should be dominant transitions - i.e., too little states
    score += 1 / stat_glitched(learning_info["glitched_delta_freq"] or [1])
    
    # dominant_trans_freq: use min to find lower outliers, which could be glitches that were encoded as dominant transitions
    # the lower min is, the worse it is for us
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    # remove frequency 0 from dominant_delta_freq # TODO actually, I think this should be removed by the pmsat alg?
    dominant_delta_freq = [freq for freq in dominant_delta_freq if freq > 0]
    score -= 1 / stat_dominant(dominant_delta_freq)
    
    return score

# plot_heuristic_scores_for_example("ping_pong_example", heuristic)
# plot_heuristic_scores_for_model("MooreMachine_4States_4Inputs_3Outputs_11c10030c44546dfaa0e3e6cc27ce858", heuristic, as_line=True)
plot_heuristic_scores_for_all_models_in_dir(base_heuristic, as_line=True, mark_best_model=True)
plot_number_of_best_models_identified(base_heuristic)
print_heuristic_stats_for_all_models_in_dir(base_heuristic)


In [None]:
def base_punish_unreachable(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, factor=0.5):
    assert 0 <= factor <= 1, "factor must be between 0 and 1"
    score = base_heuristic(learned_model, learning_info, traces)
    
    learned_model.compute_prefixes()
    unreachable_states = [s for s in learned_model.states if s.prefix is None]
    for state in unreachable_states:
        if score > 0:
            score *= factor
        elif score < 0:
            score *= (1+factor)
        else:
            score -= 1
            
    return score

In [None]:
# compare_heuristics([base_heuristic, *_create_partial_heuristics(base_punish_unreachable, factor=np.arange(0, 1, 0.25))])
compare_heuristics(_create_partial_heuristics(base_heuristic, stat_glitched=[np.mean, np.max], stat_dominant=[np.min]))

In [None]:
def quad_exp_penalty(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, weight_g=1.0, weight_d=1.0):
        
    g = np.array(learning_info["glitched_delta_freq"])
    d = np.array(learning_info["dominant_delta_freq"])
    
    if len(g) > 0:
        normalized_g = (g - np.min(g)) / (np.max(g) - np.min(g) + 1e-8)  # avoid division by zero
        g_penalty = np.mean(normalized_g ** 2)  # penalize high values in g (quadratic penalty)
    else:
        g_penalty = 0
    
    # penalize very small values in d
    normalized_d = (d - np.min(d)) / (np.max(d) - np.min(d) + 1e-8)
    d_penalty = np.mean(np.exp(-normalized_d))
    
    score = -weight_g * g_penalty + weight_d * (1 - d_penalty)
    return score

compare_heuristics([base_heuristic, *_create_partial_heuristics(quad_exp_penalty, weight_g=np.arange(0, 1.1, 0.2), weight_d=np.arange(0, 1.1, 0.2))])

In [None]:
def relative_penalty(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]]):
    g = np.array(learning_info["glitched_delta_freq"])
    d = np.array(learning_info["dominant_delta_freq"])

    # penalize high values in `g` (relative to the mean of `g`)
    if len(g) > 0:
        g_penalty = np.mean(g) / (1 + np.max(g) - np.mean(g))
    else:
        g_penalty = 0
    
    # reward high values in `d`, penalizing small ones compared to the median
    d_reward = (np.min(d) / (1 + np.median(d))) if np.min(d) > 0 else 0
    
    score = d_reward - g_penalty
    return score


compare_heuristics([base_heuristic, relative_penalty])

In [None]:
def base_heuristic_parameterized(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, stat_glitched=np.mean, stat_dominant=np.min, numerator_g=1, numerator_d=1, log=False):
    if log:
        print_d = print
    else:
        print_d = lambda *_: None

    print_d("Calculating base heuristic | score = 0")

    score = 0
        
    glitched_delta_freq = learning_info["glitched_delta_freq"]
    if len(glitched_delta_freq) > 0:
        score += numerator_g / stat_glitched(glitched_delta_freq)
        print_d(f"{glitched_delta_freq=} | {stat_glitched.__name__}(glitched_delta_freq)={stat_glitched(glitched_delta_freq)} | {numerator_g}/{stat_glitched.__name__}(glitched_delta_freq)={numerator_g / stat_glitched(glitched_delta_freq)} | {score=}")
    else:
        score += 1
        print_d(f"No glitched delta frequencies | {score=}")

    
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    dominant_delta_freq = [freq for freq in dominant_delta_freq if freq > 0]
    
    score -= numerator_d / stat_dominant(dominant_delta_freq)

    print_d(f"{dominant_delta_freq=} | {stat_dominant.__name__}(dominant_delta_freq)={stat_dominant(dominant_delta_freq)} | {numerator_d}/{stat_dominant.__name__}(dominant_delta_freq)={numerator_d / stat_dominant(dominant_delta_freq)} | {score=}")
    
    return score

# compare_heuristics([*_create_partial_heuristics(base_heuristic_parameterized, log=[True])])
# plot_heuristic_scores_for_example("ping_pong_example", base_heuristic_parameterized)
compare_heuristics([base_heuristic, *_create_partial_heuristics(base_heuristic_parameterized, numerator_g=[3], numerator_d=[1, 2, 3])])

In [None]:
def lower_10(lst: list[float]):
    return np.percentile(lst, 10)

def upper_10(lst: list[float]):
    return np.percentile(lst, 90)

compare_heuristics(_create_partial_heuristics(base_heuristic, stat_glitched=[np.max, upper_10], stat_dominant=[np.min, lower_10]))

In [None]:
def choose_threshold(data, k=2):
    mean = np.mean(data)
    std = np.std(data)
    return mean - k * std


def threshold_penalty(lst, weight=1):
    threshold = choose_threshold(lst)
    penalty = 0
    for x in lst:
        if x < threshold:
            penalty += weight * (threshold - x)  # Penalize based on how far below the threshold
    return penalty

def threshold_exp_penalty(lst, weight=1):
    threshold = choose_threshold(lst)
    penalty = 0
    for x in lst:
        if x < threshold:
            penalty += weight * (threshold - x) ** 2  # Exponential penalty
    return penalty

def threshold_rel_penalty(lst, weight=1):
    threshold = choose_threshold(lst)
    penalty = 0
    for x in lst:
        if x < threshold:
            penalty += weight * ((threshold - x) / threshold)  # Relative penalty
    return penalty


def threshold_heuristic(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, kind="normal"):
    match kind:
        case "normal":
            return -threshold_exp_penalty(learning_info["dominant_delta_freq"], weight=1)
        case "exp":
            return -threshold_exp_penalty(learning_info["dominant_delta_freq"], weight=1)
        case "rel":
            return -threshold_rel_penalty(learning_info["dominant_delta_freq"], weight=1)
        case _:
            raise NotImplementedError


compare_heuristics([base_heuristic, *_create_partial_heuristics(threshold_heuristic, kind=["normal", "exp", "rel"])])

In [None]:
def lower_10_sum(lst: list[float]):
    lt = np.percentile(lst, 10)
    return sum(x for x in lst if x <= lt)

def upper_10_sum(lst: list[float]):
    ut = np.percentile(lst, 90)
    return sum(x for x in lst if x >= ut)

def q1_sum(lst: list[float]):
    q1 = np.percentile(lst, 25)
    return sum(x for x in lst if x <= q1)

def q3_sum(lst: list[float]):
    q3 = np.percentile(lst, 75)
    return sum(x for x in lst if x >= q3)

compare_heuristics(_create_partial_heuristics(base_heuristic_parameterized, stat_glitched=[np.max, q3_sum, upper_10_sum], stat_dominant=[np.min, q1_sum, lower_10_sum]))


In [None]:
def punish_overlapping_values(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]]):
        
    glitched_delta_freq = learning_info["glitched_delta_freq"]
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    dominant_delta_freq = [freq for freq in dominant_delta_freq if freq > 0]
    
    score = base_heuristic_parameterized(learned_model, learning_info, traces)

    for g_freq in glitched_delta_freq:
        if min(dominant_delta_freq) <= g_freq <= max(dominant_delta_freq):
            score -= score / len(glitched_delta_freq)
    
    if len(glitched_delta_freq) > 0:
        for d_freq in dominant_delta_freq:
            if min(glitched_delta_freq) <= d_freq <= max(glitched_delta_freq):
                score -= score / len(dominant_delta_freq)
    return score


compare_heuristics([base_heuristic_parameterized, punish_overlapping_values])


In [None]:
import statistics

def compute_z_scores_rel_dominant(dominant_frequencies, glitched_frequencies):
    """
    Compute the z scores of the dominant frequencies (in relation to dominant frequencies) and
    of glitched frequencies (also in relation to dominant frequencies)
    """
    mean_dominant = statistics.mean(dominant_frequencies)
    std_dev_dominant = statistics.stdev(dominant_frequencies)

    if std_dev_dominant == 0:
        z_scores_dominant = [0] * len(dominant_frequencies)
        z_scores_glitched = [0] * len(glitched_frequencies)
    else:
        z_scores_dominant = [(f - mean_dominant) / std_dev_dominant for f in dominant_frequencies]
        z_scores_glitched = [(g - mean_dominant) / std_dev_dominant for g in glitched_frequencies]

    return z_scores_dominant, z_scores_glitched

def compute_z_scores(frequencies):
    """
    Compute the z scores of the given list of frequencies
    """
    if len(frequencies) < 2:
        return [0] * len(frequencies)

    mean = statistics.mean(frequencies)
    std_dev = statistics.stdev(frequencies)

    if std_dev == 0:
        z_scores = [0] * len(frequencies)
    else:
        z_scores = [(f - mean) / std_dev for f in frequencies]

    return z_scores


def zscores_sum(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, glitch_threshold=1, dominant_threshold=-1):
    score = 0

    glitched_delta_freq = learning_info["glitched_delta_freq"]
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    dominant_delta_freq = [freq for freq in dominant_delta_freq if freq > 0]

    glitched_z_scores = compute_z_scores(glitched_delta_freq)
    dominant_z_scores = compute_z_scores(dominant_delta_freq)

    glitch_sum_above_threshold = sum(zscore for zscore in glitched_z_scores if zscore >= glitch_threshold)
    if glitch_sum_above_threshold > 0:
        score -= glitch_sum_above_threshold

    dominant_sum_below_threshold = sum(zscore for zscore in dominant_z_scores if zscore <= dominant_threshold)
    if dominant_sum_below_threshold > 0:
        score -= dominant_sum_below_threshold

    return score

def find_similar_frequencies(dominant_frequencies, glitched_frequencies, z_threshold=1.0):
    """
    Calculate the z-score of the dominant frequency relative to the glitched list to find dominant frequencies
    which are similar to glitched frequencies.
    """
    if len(glitched_frequencies) == 0:
        return []

    mean_glitched = np.mean(glitched_frequencies)
    std_glitched = np.std(glitched_frequencies)

    similar_frequencies = []
    for d in dominant_frequencies:
        z_score = (d - mean_glitched) / std_glitched if std_glitched != 0 else 0
        if abs(z_score) <= z_threshold:
            similar_frequencies.append(d)

    return similar_frequencies

def new_heuristic(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]]):
    score = base_heuristic_parameterized(learned_model, learning_info, traces)
    # score = 0

    glitched_delta_freq = learning_info["glitched_delta_freq"]
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    dominant_delta_freq = [freq for freq in dominant_delta_freq if freq > 0]

    # encoded_glitches = find_similar_frequencies(dominant_delta_freq, glitched_delta_freq)
    # if len(encoded_glitches) > 0:
    #     score += 1 / len(encoded_glitches)

    num_glitches = len(learning_info["glitch_steps"])
    # score -= num_glitches * len(learned_model.states)  # this works quite well on its own
    if num_glitches > 0:
        score += 1 / num_glitches

    learned_model.compute_prefixes()
    assert len(learned_model.states) == learning_info["num_states"]
    num_unreachable_states = sum(1 for state in learned_model.states if state.prefix is None)
    if num_unreachable_states > 0:
        score -= 1/num_unreachable_states

    return score

# compare_heuristics([base_heuristic_parameterized, *_create_partial_heuristics(zscores_sum, glitch_threshold=[1], dominant_threshold=[-3])])
compare_heuristics([new_heuristic, base_heuristic_parameterized])

In [None]:
def new_heuristic3(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, punish_zero=False):
    score = 0
        
    # reward lower mean of glitched frequencies
    glitched_delta_freq = learning_info["glitched_delta_freq"]
    if len(glitched_delta_freq) > 0:
        score += 1 / np.mean(glitched_delta_freq)
    else:
        score += 1  # this rewards "no glitches" a lot
        
    # reward lower numbers of glitches
    num_glitches = len(learning_info["glitch_steps"])
    if num_glitches > 0:
        score += 1 / num_glitches

    # penalize unreachable states (in absolute numbers!) a LOT
    learned_model.compute_prefixes()
    num_unreachable = sum(1 for state in learned_model.states if state.prefix is None)
    score -= num_unreachable
    
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    dominant_delta_freq2 = [freq for freq in dominant_delta_freq if freq > 0]
    score -= 1 / np.min(dominant_delta_freq2)
    # score -= len(learned_model.states) / lower_10(dominant_delta_freq)
    
    if punish_zero:
        dom_zero_trans = [freq for freq in dominant_delta_freq if freq == 0]
        score -= len(dom_zero_trans)
    
    return score

compare_heuristics([*_create_partial_heuristics(new_heuristic3, punish_zero=[True, False]), base_heuristic], models_dir="../generated_data_1")

In [None]:
def simple_base_heuristic(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]]):
    score = 0
        
    glitched_delta_freq = learning_info["glitched_delta_freq"]
    if len(glitched_delta_freq) > 0:
        score += 1 / np.mean(glitched_delta_freq)
    else:
        score += 1
    
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    dominant_delta_freq = [freq for freq in dominant_delta_freq if freq > 0]
    
    score -= 1 / np.min(dominant_delta_freq)
    
    return score

In [None]:
def advanced_heuristic(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, reward_lower_glitch_mean=True, reward_lower_num_glitches=True, punish_unreachable_states=True, punish_low_dominant_freq=True, punish_input_incompleteness=True):
    score = 0
        
    glitched_delta_freq = learning_info["glitched_delta_freq"]
    dominant_delta_freq = learning_info["dominant_delta_freq"]
    
    # reward lower mean of glitched frequencies
    if reward_lower_glitch_mean:
        if len(glitched_delta_freq) > 0:
            score += 1 / np.mean(glitched_delta_freq)
        else:
            score += 1  # this rewards "no glitches" a lot
        
    # reward lower numbers of glitches
    if reward_lower_num_glitches:
        num_glitches = len(learning_info["glitch_steps"])
        if num_glitches > 0:
            score += 1 / num_glitches

    # penalize unreachable states (in absolute numbers!) a LOT
    if punish_unreachable_states:
        learned_model.compute_prefixes()
        num_unreachable = sum(1 for state in learned_model.states if state.prefix is None)
        score -= num_unreachable
    
    # penalize low dominant frequencies
    if punish_low_dominant_freq:
        dominant_delta_freq_without_zero = [freq for freq in dominant_delta_freq if freq > 0]
        score -= 1 / np.min(dominant_delta_freq_without_zero)
    
    # punish dominant frequencies that are 0 (-> not input complete)
    if punish_input_incompleteness:
        num_dominant_zero_freq = sum(1 for freq in dominant_delta_freq if freq == 0)
        score -= num_dominant_zero_freq
    
    return score

In [None]:
compare_heuristics([simple_base_heuristic, advanced_heuristic], models_dir="../generated_data_1")

In [None]:
compare_heuristics([
    simple_base_heuristic, 
    *_create_partial_heuristics(advanced_heuristic, 
                                reward_lower_glitch_mean=[True, False], 
                                reward_lower_num_glitches=[True, False], 
                                punish_unreachable_states=[True, False], 
                                punish_low_dominant_freq=[True, False], 
                                punish_input_incompleteness=[True, False])], 
    models_dir="../generated_data_1")

In [None]:
def _(func, **kwargs):
    f = functools.partial(func, **kwargs)
    f.__name__ = func.__name__ + "(" + ", ".join(f"{key}={value}" for key, value in kwargs.items()) + ")"
    return f

compare_heuristics(
    [
        simple_base_heuristic, 
        advanced_heuristic,  # all True
        _(advanced_heuristic, punish_input_incompleteness=False),
        _(advanced_heuristic, punish_unreachable_states=False),
        _(advanced_heuristic, punish_input_incompleteness=False,
                              punish_unreachable_states=False),
        _(advanced_heuristic, reward_lower_num_glitches=False),
        
    ],
    models_dir="../generated_data_1")

In [None]:
import numpy as np

In [None]:
def advanced_heuristic2(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]], *, reward_lower_glitch_mean=True, reward_lower_num_glitches=True, punish_unreachable_states=True, punish_low_dominant_freq=True, punish_input_incompleteness=True):
    score = 0

    glitched_delta_freq = learning_info["glitched_delta_freq"]
    dominant_delta_freq = learning_info["dominant_delta_freq"]

    # reward lower mean of glitched frequencies
    # ADDS between 0 and 1
    if reward_lower_glitch_mean:
        if len(glitched_delta_freq) > 0:
            score += 1 / np.mean(glitched_delta_freq)

    # reward lower numbers of glitches
    # ADDS between 0 and 1
    if reward_lower_num_glitches:
        num_glitches = len(learning_info["glitch_steps"])
        if num_glitches > 0:
            score += 1 / num_glitches
        else:
            score += 1  # this rewards "no glitches" a lot

    # penalize low dominant frequencies
    # SUBTRACTS between 0 and 1
    if punish_low_dominant_freq:
        dominant_delta_freq_without_zero = [freq for freq in dominant_delta_freq if freq > 0]
        score -= 1 / np.min(dominant_delta_freq_without_zero)

    # # punish dominant frequencies that are 0 (-> not input complete)
    # if punish_input_incompleteness:
    #     num_dominant_zero_freq = sum(1 for freq in dominant_delta_freq if freq == 0)
    #     num_dominant_freq = len(dominant_delta_freq)
    #     score -= 1 / (1 + np.exp(-num_dominant_zero_freq))

    # penalize unreachable states
    # SETS score to -1 or does nothing
    # TODO: this does nothing to the distribution
    if punish_unreachable_states:
        learned_model.compute_prefixes()
        num_unreachable = sum(1 for state in learned_model.states if state.prefix is None)
        # num_states = len(learned_model.states)
        # percent = num_unreachable / num_states
        # score -= 1 / (1 + np.exp(-num_unreachable))
        # score -= np.exp(-num_unreachable)
    #     if num_unreachable > 0:
    #         score = -2

    return score

def advanced_heuristic3(*args, **kwargs):
    return advanced_heuristic(*args, **kwargs, punish_input_incompleteness=False, punish_unreachable_states=False)

compare_heuristics([simple_base_heuristic, advanced_heuristic, advanced_heuristic3], models_dir="../generated_data_1")

In [None]:
def plot_something(value_name: str, stat_method = np.sum, models_dir="../generated_data", new_plot=True, show_plot=True, as_line=False,
                   value_retriever_function=lambda learned_model, learning_info, learning_result, value_name: learning_result[value_name]):
    if new_plot:
        plt.figure()

    values_at_distance = defaultdict(list)
    for model_name in os.listdir(models_dir):
        if not (Path(models_dir) / model_name).is_dir():
            continue
        real_model, learned_models, infos = load_data(model_name, models_dir)
        for learned_model, learning_info, learning_result in learned_models:
            val = value_retriever_function(learned_model, learning_info, learning_result, value_name)
            values_at_distance[learning_result["distance"]].append(val)

    keys = sorted(values_at_distance.keys())
    vals = [stat_method(values_at_distance[k]) for k in keys]
    if as_line:
        plt.plot(keys, vals, label=value_name)
    else:
        plt.scatter(keys, vals, label=value_name)

    plt.title(f"{stat_method.__name__} of {value_name} at distance")
    plt.xlabel("Distance to ground truth")
    plt.ylabel(f"{stat_method.__name__} of {value_name}")
    plt.legend(loc=(1.04, 1))

    if show_plot:
        plt.show()

In [None]:
plot_something('num_states_unreachable', stat_method=np.mean, models_dir='../generated_data_1')

In [None]:
plot_something('dominant transition 0-frequencies', stat_method=np.mean, models_dir='../generated_data_1',
               value_retriever_function=lambda lm, li, lr, vn: sum(1 for freq in lr['dominant_delta_freq'] if freq == 0))

In [None]:
things = [
    "num_states_unreachable",
    "percent_states_unreachable",
    "num_glitches",
    "percent_glitches",
    "mean_glitch_trans_freq",
    "median_glitch_trans_freq",
    "min_glitch_trans_freq",
    "max_glitch_trans_freq",
    "mean_dominant_trans_freq",
    "median_dominant_trans_freq",
    "max_dominant_trans_freq",
    "min_dominant_trans_freq",
]

for thing in things:
    for stat_method in (np.mean, np.median):
        plot_something(thing, stat_method=stat_method, models_dir='../generated_data_1')

In [None]:
def test_heuristic(learned_model: MooreMachine, learning_info: dict[str, Any], traces: list[list[str | list[str]]]):
    score = 0

    glitched_delta_freq = learning_info["glitched_delta_freq"]
    dominant_delta_freq = learning_info["dominant_delta_freq"]

    num_dominant_zero_freq = sum(1 for freq in dominant_delta_freq if freq == 0)
    score = num_dominant_zero_freq

    return score

plot_heuristic_scores_for_all_models_in_dir(test_heuristic, models_dir='../generated_data_1')


In [None]:
from active_pmsatlearn.learnalgo import simple_heuristic, intermediary_heuristic
compare_heuristics([simple_heuristic, intermediary_heuristic], models_dir='../generated_data_1')

In [None]:
import random
import numpy as np
from active_pmsatlearn.defs import Trace

def _get_number_of_next_valid_steps(state: MooreState, trace: Trace) -> int:
    c = 0
    for inp, out in trace:
        next_state = state.transitions[inp]
        if next_state.output == out:
            c += 1
            state = next_state
        else:
            break

    return c

def approximate_glitch_frequencies(learned_model: MooreMachine, traces: list[Trace]) -> tuple[list[int], list[int]]:
    learned_model.make_input_complete()  # TODO: this might be the problem. If the glitch leads to an unreachable state, ...
    
    dominant_steps = defaultdict(int)
    glitch_steps = defaultdict(int)

    for trace in traces:
        learned_model.reset_to_initial()
        assert learned_model.current_state.output == trace[0]
        for step_index, (inp, out) in enumerate(trace[1:], start=1):
            old_state = learned_model.current_state
            model_out = learned_model.step(inp)
            new_state = learned_model.current_state
            if model_out != out:
                # this step (must|would) have been marked by pmsat as a glitch

                believe_states = {s: _get_number_of_next_valid_steps(s, trace[step_index+1:]) for s in learned_model.states if s.output == out}

                best_original_believe_state = max(believe_states, key=believe_states.get)
                learned_model.current_state = best_original_believe_state
                glitch_steps[(old_state, inp, best_original_believe_state)] += 1
            else:
                dominant_steps[(old_state, inp, new_state)] += 1

    return [d for d in dominant_steps.values()], [g for g in glitch_steps.values()]


def old_approximate_glitch_frequencies(learned_model: MooreMachine, traces: list[Trace]) -> list[float]:
    # learned_model.make_input_complete()  # TODO: this might be the problem. If the glitch leads to an unreachable state, ...
    
    glitch_steps = defaultdict(int)

    for trace in traces:
        learned_model.reset_to_initial()
        assert learned_model.current_state.output == trace[0]
        for step_index, (inp, out) in enumerate(trace[1:], start=1):
            old_state = learned_model.current_state
            model_out = learned_model.step(inp)
            if model_out != out:
                # this step (must|would) have been marked by pmsat as a glitch

                # dict of <first believe state> -> <current believe state>
                believe_states = {s: s for s in learned_model.states if s.output == out}
                while len(believe_states) > 1:

                    _inp, _out = trace[step_index]
                    curr_believe_states = list(believe_states.items())
                    for orig_state, curr_state in curr_believe_states:
                        if (nxt := curr_state.transitions[_inp]).output == _out:
                            believe_states[orig_state] = nxt
                        else:
                            if len(believe_states) >= 2:
                                believe_states.pop(orig_state)
                            else:
                                break

                    step_index += 1
                    if step_index == len(trace):
                        # we reached the end of the trace -> choose one state at random
                        while len(believe_states) > 1:
                            believe_states.pop(random.choice(list(believe_states.keys())))

                best_original_believe_state = list(believe_states.values())[0]
                learned_model.current_state = best_original_believe_state
                glitch_steps[(old_state, inp, best_original_believe_state)] += 1


    return list(glitch_steps.values())

def _compare(actuals: list, approxs: list):
    actuals = np.array(actuals)
    approxs = np.array(approxs)
    # Metrics
    mae = np.mean(np.abs(actuals - approxs))
    mse = np.mean((actuals - approxs) ** 2)
    rmse = np.sqrt(mse)
    mape = np.mean(np.abs((actuals - approxs) / actuals)) * 100
    r2 = 1 - (np.sum((actuals - approxs) ** 2) / np.sum((actuals - np.mean(actuals)) ** 2))

    # Print metrics
    print(f"mean absolute error: {mae}")
    print(f"mean squared error: {mse}")
    print(f"root mean squared error: {rmse}")
    print(f"mean absolute percentage error: {mape:.2f}%")
    print(f"R²: {r2:.2f}")

    # Visualization
    plt.figure(figsize=(15, 20))  # Adjusted figure size for vertical stacking

    # Scatter Plot
    plt.subplot(3, 1, 1)  # 3 rows, 1 column, 1st plot
    plt.scatter(actuals, approxs, color='blue', label='Approximations')
    plt.plot(actuals, actuals, color='red', linestyle='--', label='Perfect Fit (y=x)')
    plt.xlabel('Actuals')
    plt.ylabel('Approximations')
    plt.title('Scatter Plot')
    plt.legend()

    # Line Plot
    plt.subplot(3, 1, 2)  # 3 rows, 1 column, 2nd plot
    plt.plot(actuals, label='Actuals', marker='o')
    plt.plot(approxs, label='Approximations', marker='x')
    plt.xlabel('Index')
    plt.ylabel('Values')
    plt.title('Line Plot')
    plt.legend()

    # Error Plot
    plt.subplot(3, 1, 3)  # 3 rows, 1 column, 3rd plot
    errors = actuals - approxs
    plt.bar(range(len(errors)), errors, color='purple')
    plt.axhline(0, color='red', linestyle='--')
    plt.xlabel('Index')
    plt.ylabel('Error (Actual - Approx)')
    plt.title('Error Plot')

    # Adjust layout and show
    plt.tight_layout()
    plt.show()

def compare_actual_with_approx_freq(stat_method=np.mean, freqs="glitched", models_dir=DEFAULT_DATA_DIR):
    actuals = []
    approxs = []

    for model_name in os.listdir(models_dir):
        if not os.path.isdir(Path(models_dir) / model_name):
            continue
        ground_truth, learned_models, info = load_data(model_name, models_dir)
        for learned_model, learning_info, learning_result in learned_models:
            if not learned_model.is_input_complete():
                continue
            actual_freqs = learning_info["glitched_delta_freq"] if freqs == "glitched" else learning_info["dominant_delta_freq"]
            actual = stat_method(actual_freqs or [0])
            actuals.append(actual)

            approx_freqs = approximate_glitch_frequencies(learned_model, info["traces"])[1 if freqs == "glitched" else 0]
            approx = np.max(approx_freqs or [0])
            approxs.append(approx)
    
    print("-" * 20)
    print(f"{freqs.upper()}:")
    _compare(actuals, approxs)
    # diffs = []
    # for actual, approx in zip(actuals, approxs):
    #     diff = actual - approx
    #     print(f"{stat_method.__name__}: ACTUAL: {actual:.2f} | APPROX: {approx:.2f} | diff: {diff:.2f}")
    # stdev = np.std

compare_actual_with_approx_freq(stat_method=np.mean, freqs="glitched")
compare_actual_with_approx_freq(stat_method=np.mean, freqs="dominant")

In [None]:
from active_pmsatlearn.defs import SupportedAutomaton, PmSatLearningInfo


def approximate_advanced_heuristic(learned_model: SupportedAutomaton, learning_info: PmSatLearningInfo, traces: list[Trace],
                       *, reward_lower_glitch_mean=True, reward_lower_num_glitches=True, punish_unreachable_states=True,
                       punish_low_dominant_freq=True, punish_input_incompleteness=True):
    score = 0

    if learned_model is None:
        return -100

    dominant_delta_freq, glitched_delta_freq = approximate_glitch_frequencies(learned_model, traces)

    # reward lower mean of glitched frequencies
    # ADDS between 0 and 1
    if reward_lower_glitch_mean:
        if len(glitched_delta_freq) > 0:
            score += 1 / np.mean(glitched_delta_freq)

    # reward lower numbers of glitches
    # ADDS between 0 and 1
    if reward_lower_num_glitches:
        num_glitches = sum(glitched_delta_freq)
        if num_glitches > 0:
            score += 1 / num_glitches
        else:
            score += 1

    # penalize low dominant frequencies
    # SUBTRACTS between 0 and 1
    if punish_low_dominant_freq:
        dominant_delta_freq_without_zero = [freq for freq in dominant_delta_freq if freq > 0]
        score -= 1 / np.min(dominant_delta_freq_without_zero)

    # penalize unreachable states (in absolute numbers!) a LOT
    # SUBTRACTS between 0 and inf
    if punish_unreachable_states:
        learned_model.compute_prefixes()
        num_unreachable = sum(1 for state in learned_model.states if state.prefix is None)
        score -= num_unreachable

    # punish dominant frequencies that are 0 (-> not input complete)
    # SUBTRACTS between 0 and inf
    if punish_input_incompleteness:
        num_dominant_zero_freq = sum(1 for freq in dominant_delta_freq if freq == 0)
        score -= num_dominant_zero_freq

    return float(score)  # cast numpy away (nicer output)

from active_pmsatlearn.learnalgo import simple_heuristic
compare_heuristics([simple_heuristic, approximate_advanced_heuristic])

In [None]:
print("test")