In [1]:
from glob import glob
import os
from collections import defaultdict
from pathlib import Path
from enum import Enum
import json
import pandas as pd
import seaborn as sns

In [3]:
UNATE_SYNTHESIS_PATH = '../experiments/synthesis/unate/'
DEPS_SYNTHESIS_PATH = '../experiments/synthesis/dependency/'
# UNATE_AND_DEPS_SYNTHESIS_PATH = '../experiments/synthesis/unates_and_dependency/'
PLAIN_SYNTHESIS_PATH = '../experiments/synthesis/plain/'

In [4]:
class Status(Enum):
    ERROR = "Error"
    SUCCESS = "Success"
    NOT_FOUND = "Not Found"
    TIMEOUT = "Timeout"
    
    def __repr__(self):
        return str(self.value)
    
    def __str__(self):
        return str(self.value)

In [5]:
def get_all_benchmarks(path):
    hoa_files = glob(os.path.join(path, "*.hoa"))
    names = [
        Path(f).stem
        for f in hoa_files
    ]
    return names

print("Total benchmark with Unates Synthesis" , len(get_all_benchmarks(UNATE_SYNTHESIS_PATH)))
print("Total benchmark with Deps Synthesis" , len(get_all_benchmarks(DEPS_SYNTHESIS_PATH)))
# print("Total benchmark with Unates & Deps Synthesis" , len(get_all_benchmarks(UNATE_AND_DEPS_SYNTHESIS_PATH)))
print("Total benchmark with Plain Synthesis" , len(get_all_benchmarks(PLAIN_SYNTHESIS_PATH)))

Total benchmark with Unates Synthesis 202
Total benchmark with Deps Synthesis 202
Total benchmark with Plain Synthesis 202


In [6]:
def extract_vars_from_str(x):
    if len(x) == 0:
        return []
    return x.split(",")

In [7]:
def get_benchmark_name(idx):
    path = "../tools/scripts/benchmarks/{}.txt".format(idx)
    with open(path, 'r') as file:
        name = file.readline().replace("\n", "")
    return name
print(get_benchmark_name(2))

EnemeyModule


In [8]:
def benchmark_loader(path: str, idx: str):
    base = {
        'id': idx,
        'Name': get_benchmark_name(idx),
    }
    
    hoa_path = os.path.join(path, idx + ".hoa")
    json_path = os.path.join(path, idx + ".json")
    
    if not os.path.exists(hoa_path):
        return {
            **base,
            'Status': Status.NOT_FOUND
        }
        
    if not os.path.exists(json_path):
        with open(hoa_path, 'r') as f:
            content = f.readlines()
            return {
                **base,
                'Status': Status.ERROR,
                'Error': content[0]
                
            }
    
    with open(json_path, 'r') as f:
        content = f.readlines()
        try:
            benchmark_json = json.loads(content[0])
        except Exception as e:
            return {
                **base,
                'Status': Status.ERROR,
                'Error': content
            }
    
    # Generic Benchmark Name
    output_vars = benchmark_json['output_vars']
    is_completed = benchmark_json['is_completed']
    total_duration = benchmark_json['total_time']
    
    if not is_completed:
        return {
            **base,
            'Status': Status.TIMEOUT,
            'Is Completed': is_completed,
            'Total Duration': total_duration
        }
    
    # Automaton Build
    is_automaton_build = benchmark_json['automaton']['is_built']
    automaton_build_duration = benchmark_json['automaton']['build_duration']
    total_states = benchmark_json['automaton']['prune_total_states']
    total_edges = benchmark_json['automaton']['total_edges']
    total_output_vars = len(benchmark_json['output_vars'])
    
    # Synthesis
    realizability = benchmark_json['synthesis']['independent_strategy']['realizability']
    indeps_synthesis_duration = benchmark_json['synthesis']['independent_strategy']['duration']
    deps_synthesis_duration = benchmark_json['synthesis']['dependent_strategy']['duration']
    
    # Unateness
    skipped_unate = benchmark_json['unate']['skipped_unate']
    if skipped_unate:
        total_unate_duration = None
        automaton_postprocessing_duration = None
        removed_states = None
        removed_edges = None
    else:
        total_unate_duration = benchmark_json['unate']['total_unate_duration']
        automaton_postprocessing_duration = benchmark_json['unate']['automaton_postprocess_duration']
        removed_states = total_states - benchmark_json['unate']['total_states_after_unate']
        removed_edges = total_edges - benchmark_json['unate']['total_edges_after_unate']

    vars_positive_unates = { var: 0 for var in output_vars }
    vars_negative_unates = { var: 0 for var in output_vars }
    vars_unknown_unates = { var: 0 for var in output_vars }
    state_positive_unates = { state: 0 for state in range(total_states) }
    state_negative_unates = { state: 0 for state in range(total_states) }
    states_not_unates = { state: 0 for state in range(total_states) }
    states_unknown_unates = { state: 0 for state in range(total_states) }
    states_complement_duration = { state: 0 for state in range(total_states) }
    states_complement_failed = { state: None for state in range(total_states) }
    states_removed_edges = { state: 0 for state in range(total_states) }
    states_impacted_edges = { state: 0 for state in range(total_states) }
    
    for state_unateness in benchmark_json['unate'].get('unate_states',[]):
        state = state_unateness['state']
        state_positive_vars = extract_vars_from_str(state_unateness['positive_unate_variables'])
        state_negative_vars = extract_vars_from_str(state_unateness['negative_unate_variables'])
        state_not_unates = extract_vars_from_str(state_unateness['not_unate_variables'])
        state_unknown_unates = extract_vars_from_str(state_unateness['unknown_unate_variables'])
        assert set(state_positive_vars).isdisjoint(set(state_negative_vars))
        
        states_removed_edges[state] = state_unateness['removed_edges']
        states_impacted_edges[state] = state_unateness['impacted_edges']
        
        states_complement_duration[state] = state_unateness['complement_duration']
        states_complement_failed[state] = not state_unateness['complement_succeeded']
        
        for var in state_positive_vars:
            vars_positive_unates[var] += 1
            state_positive_unates[state] += 1
        
        for var in state_negative_vars:
            vars_negative_unates[var] += 1
            state_negative_unates[state] += 1
            
        for var in state_unknown_unates:
            vars_unknown_unates[var] += 1
            states_unknown_unates[state] += 1
    
    # Dependency
    skipped_dependency = benchmark_json['dependency']['skipped_dependencies']
    find_dependency_duration = benchmark_json['dependency']['total_duration']
    total_dependent_vars = 0
    total_independent_vars = 0
    for tested_dependency in benchmark_json["dependency"]['tested_dependencies']:
        is_dependent = tested_dependency['is_dependent']
        if is_dependent:
            total_dependent_vars += 1
        else:
            total_independent_vars += 1
    
    return {
        # Group 1: General
        **base,
        'Status': Status.SUCCESS,
        'Is Completed': is_completed,
        'Total Duration': total_duration,
        'Realizability': realizability,
        'Applied Unate': not skipped_unate,
        'Applied Dependency': not skipped_dependency,
        'Independent Strategy Gates': benchmark_json['synthesis']['independent_strategy']['total_gates'],
        'Independent Synthesis Duration': indeps_synthesis_duration,
        'Dependent Synthesis Duration': deps_synthesis_duration,
        
        # Group 2: Automaton
        'Is Automaton Built': is_automaton_build,
        'Automaton Build Duration': automaton_build_duration,
        'Total Output Vars': total_output_vars,
        'Original Total States': total_states,
        'Original Total Edges': total_edges,
        
        # Unate Impactness
        'Total Unate Duration': total_unate_duration,
        'Automaton Postprocessing Duration': automaton_postprocessing_duration,
        'Total Impacted Edges': sum(states_impacted_edges.values()),
        'Total Removed Edges': sum(states_removed_edges.values()),

        # Group 3: Unates by States
        'States Failed by Complement': sum(1 for val in states_complement_failed.values() if val),
        'Total Complement Duration': sum(val for val in states_complement_duration.values()),
        'Total State with Positive Unates': sum(1 for val in state_positive_unates.values() if val > 0),
        'Total State with Negative Unates': sum(1 for val in state_negative_unates.values() if val > 0),
        'Total State with Unknown Unates': sum(1 for val in states_unknown_unates.values() if val > 0),
        
        # Group 4: Unates by Vars
        'Total Positive Unates Vars (At least 1)': sum(1 for val in vars_positive_unates.values() if val > 0),
        'Total Negative Unates Vars (At least 1)': sum(1 for val in vars_negative_unates.values() if val > 0),
        'Total Positive Unates Vars (All)': sum(1 for val in vars_positive_unates.values() if val == total_states),
        'Total Negative Unates Vars (All)': sum(1 for val in vars_negative_unates.values() if val == total_states),
        
        # Dependency
        'Total Dependent Variables': total_dependent_vars,
        'Total Independent Variables': total_independent_vars,
        'Dependency Ratio': total_dependent_vars / total_output_vars,
        'Find Dependency Duration': find_dependency_duration,
        'Find Dependency Completed': find_dependency_duration != -1,
    }

In [9]:
unates_synthesis_df = pd.DataFrame([
    benchmark_loader(UNATE_SYNTHESIS_PATH, name)
    for name in get_all_benchmarks(UNATE_SYNTHESIS_PATH)
])
unates_synthesis_df.sort_values(by=['Name'], ascending=True, inplace=True)
unates_synthesis_df
unates_synthesis_df.to_csv('./unates.csv', index=False)

In [10]:
dependents_synthesis_df = pd.DataFrame([
    benchmark_loader(DEPS_SYNTHESIS_PATH, name)
    for name in get_all_benchmarks(DEPS_SYNTHESIS_PATH)
])
dependents_synthesis_df.head(5)
dependents_synthesis_df.to_csv('./dependents.csv', index=False)

In [11]:
plain_synthesis_df = pd.DataFrame([
    benchmark_loader(PLAIN_SYNTHESIS_PATH, name)
    for name in get_all_benchmarks(PLAIN_SYNTHESIS_PATH)
])
plain_synthesis_df.sort_values(by=['Name'], ascending=True, inplace=True)
plain_synthesis_df
plain_synthesis_df.to_csv('./plain.csv', index=False)

In [151]:
# unates_and_deps_synthesis_df = pd.DataFrame([
#     benchmark_loader(UNATE_AND_DEPS_SYNTHESIS_PATH, name)
#     for name in get_all_benchmarks(UNATE_AND_DEPS_SYNTHESIS_PATH)
# ])
# unates_and_deps_synthesis_df.head(5)
# unates_and_deps_synthesis_df.to_csv('./unates_and_deps.csv', index=False)

# Sanity Checks

In [12]:
unates_synthesis_df[unates_synthesis_df["Total Removed Edges"] > 0]

Unnamed: 0,id,Name,Status,Is Completed,Total Duration,Realizability,Applied Unate,Applied Dependency,Independent Strategy Gates,Independent Synthesis Duration,...,Total Positive Unates Vars (At least 1),Total Negative Unates Vars (At least 1),Total Positive Unates Vars (All),Total Negative Unates Vars (All),Total Dependent Variables,Total Independent Variables,Dependency Ratio,Find Dependency Duration,Find Dependency Completed,Error


## Automaton Build Duration

In [13]:
# automaton_build_duration_df = pd.merge(
#     pd.merge(
#         plain_synthesis_df[["Name", "Automaton Build Duration"]],
#         unates_and_deps_synthesis_df[["Name", "Automaton Build Duration"]],
#         suffixes=('_plain', '_all'),
#         on='Name'
#     ),
#     pd.merge(
#         unates_synthesis_df[["Name", "Automaton Build Duration"]],
#         dependents_synthesis_df[["Name", "Automaton Build Duration"]],
#         suffixes=('_unates', '_deps'),
#         on='Name'
#     ),
#     on='Name'
# )
# automaton_build_duration_df
# automaton_build_duration_df.to_csv('./automaton-build-duration.csv')

automaton_build_duration_df = pd.merge(
    plain_synthesis_df[["Name", "Automaton Build Duration"]],
    dependents_synthesis_df[["Name", "Automaton Build Duration"]],
    suffixes=(' Dependencies', ' unates'),
    how="outer",
    on='Name'
)
automaton_build_duration_df

Unnamed: 0,Name,Automaton Build Duration Dependencies,Automaton Build Duration unates
0,01,5.0,5.0
1,02,13.0,13.0
2,03,85.0,83.0
3,05,882.0,1021.0
4,06,,
...,...,...,...
197,ltl2dpa20,4.0,
198,ltl2dpa21,3.0,
199,ltl2dpa22,,
200,ltl2dpa23,1.0,


## No Realiability Contradication

In [17]:
# realizability_df = pd.merge(
#     pd.merge(
#         plain_synthesis_df[["Name", "Original Total States", "Total Output Vars", "Realizability"]],
#         unates_and_deps_synthesis_df[["Name", "Realizability"]],
#         suffixes=('_plain', '_all'),
#         on='Name'
#     ),
#     pd.merge(
#         unates_synthesis_df[["Name", "Realizability", "Total Impacted Edges", ]],
#         dependents_synthesis_df[["Name", "Realizability", "Total Dependent Variables"]],
#         suffixes=('_unates', '_deps'),
#         on='Name'
#     ),
#     on='Name'
# )
# realizability_df

realizability_df = pd.merge(
    plain_synthesis_df[["Name", "Realizability"]],
    dependents_synthesis_df[["Name", "Realizability"]],
    suffixes=('_plain', '_dependents'),
    on='Name'
)
realizability_df

Unnamed: 0,Name,Realizability_plain,Realizability_dependents
0,01,REALIZABLE,REALIZABLE
1,02,REALIZABLE,REALIZABLE
2,03,REALIZABLE,REALIZABLE
3,05,REALIZABLE,REALIZABLE
4,06,,
...,...,...,...
197,ltl2dpa20,REALIZABLE,
198,ltl2dpa21,REALIZABLE,
199,ltl2dpa22,,
200,ltl2dpa23,REALIZABLE,


In [18]:
common_realizable = realizability_df[(realizability_df["Realizability_plain"] == "REALIZABLE") & (realizability_df["Realizability_dependents"] == "REALIZABLE")]
len(common_realizable)

81

In [19]:
not_common_realizable = realizability_df[(realizability_df["Realizability_plain"] == "REALIZABLE") & (realizability_df["Realizability_dependents"] != "REALIZABLE")]
not_common_realizable

Unnamed: 0,Name,Realizability_plain,Realizability_dependents
12,ActionConverter,REALIZABLE,
14,Automata16S,REALIZABLE,
15,Automata32S,REALIZABLE,
16,Button,REALIZABLE,
19,EscalatorBidirectional,REALIZABLE,
21,EscalatorCounting,REALIZABLE,
22,EscalatorCountingInit,REALIZABLE,
27,Gamemodule,REALIZABLE,
29,KitchenTimerV0,REALIZABLE,
31,KitchenTimerV10,REALIZABLE,


In [157]:
rows_not_in_plain = unates_synthesis_df[~unates_synthesis_df['Name'].isin(plain_synthesis_df['Name'])]
rows_not_in_plain

Unnamed: 0,id,Name,Status,Is Completed,Total Duration,Realizability,Applied Unate,Applied Dependency,Independent Strategy Gates,Independent Synthesis Duration,...,Total Positive Unates Vars (At least 1),Total Negative Unates Vars (At least 1),Total Positive Unates Vars (All),Total Negative Unates Vars (All),Total Dependent Variables,Total Independent Variables,Dependency Ratio,Find Dependency Duration,Find Dependency Completed,Error


## All DataFrame has same order

In [158]:
def verify_name_order(*dataframes):
    for i in range(1, len(dataframes)):
        if not dataframes[i - 1]['Name'].equals(dataframes[i]['Name']):
            return False
    return True

In [159]:
verify_name_order(
    # unates_and_deps_synthesis_df,
    plain_synthesis_df,
    # dependents_synthesis_df,
    unates_synthesis_df
)

True