In [1]:
from z3 import *

from verification_oracle import verification_oracle


z3_x = Int('x')
z3_y = Int('y')

z3_input_variable_list = [z3_x, z3_y]
z3_function_declaration = max2 = Function('max2', IntSort(), IntSort(), IntSort())
z3_constraint = And(max2(z3_x, z3_y) >= z3_x, max2(z3_x, z3_y) >= z3_y, Or(max2(z3_y, z3_x) == z3_x, max2(z3_y, z3_x) == z3_y))

verification_oracle_instance = verification_oracle(z3_input_variable_list, z3_function_declaration, z3_constraint)
next(verification_oracle_instance)

In [2]:
from sys import stderr

import numpy as np
from sympy import *

from recursive_default_dict import RecursiveDefaultDict


def calculate_entropy(
    subset_of_input_set,
    term_cover
):
    number_of_inputs_covered_by_terms_in_subset_of_input_set = [
        len(term_cover_of_term & subset_of_input_set)
        for term_cover_of_term in term_cover.values()
    ]

    probabilities_of_inputs_in_subset_of_input_set_labeled_by_terms = np.zeros((
        len(subset_of_input_set), len(term_cover)
    ))
    for i, input_in_subset_of_input_set in enumerate(subset_of_input_set):
        # Fill row (with add one smoothing)
        for j, (term, term_cover_of_term) in enumerate(term_cover.items()):
            if input_in_subset_of_input_set in term_cover_of_term:
                probabilities_of_inputs_in_subset_of_input_set_labeled_by_terms[i, j] = number_of_inputs_covered_by_terms_in_subset_of_input_set[j] + 1
            else:
                probabilities_of_inputs_in_subset_of_input_set_labeled_by_terms[i, j] = 1
        
        # Normalize row (with add one smoothing)
        probabilities_of_inputs_in_subset_of_input_set_labeled_by_terms[i, :] /= (np.sum(probabilities_of_inputs_in_subset_of_input_set_labeled_by_terms[i, :]) + len(term_cover))

    # Calculate the (equally weighted) sum of all rows, and normalize
    probabilities_of_any_input_in_subset_of_input_set_labeled_by_terms = np.sum(probabilities_of_inputs_in_subset_of_input_set_labeled_by_terms, axis=0)
    probabilities_of_any_input_in_subset_of_input_set_labeled_by_terms /= np.sum(probabilities_of_any_input_in_subset_of_input_set_labeled_by_terms)

    # Calculate entropy
    entropy = -np.sum(probabilities_of_any_input_in_subset_of_input_set_labeled_by_terms * np.log2(probabilities_of_any_input_in_subset_of_input_set_labeled_by_terms))

    return entropy


def calculate_information_gain(
    predicate,
    input_set,
    term_cover,
    predicate_cover
):
    predicate_cover_of_predicate = predicate_cover[predicate]
    other_points = input_set - predicate_cover_of_predicate

    return (len(predicate_cover_of_predicate) * calculate_entropy(predicate_cover_of_predicate, term_cover) + len(other_points) * calculate_entropy(other_points, term_cover)) / len(input_set)


class EmptyPredicateSetError(Exception):
    pass


def construct_decision_tree(
    input_set,
    term_cover,
    predicate_set,
    predicate_cover
):
    print('construct_decision_tree', input_set, term_cover, predicate_set, predicate_cover, file=stderr)
    
    terms_covering_all_inputs_list = [
        term
        for term, term_cover_of_term
        in term_cover.items()
        if input_set.issubset(term_cover_of_term)
    ]

    print('terms_covering_all_inputs_list', terms_covering_all_inputs_list, file=stderr)

    if terms_covering_all_inputs_list:
        return terms_covering_all_inputs_list.pop()
    
    if not predicate_set:
        raise EmptyPredicateSetError
    
    predicate_list = []
    information_gain_list = []

    for predicate in predicate_set:
        predicate_list.append(predicate)
        information_gain_list.append(calculate_information_gain(predicate, input_set, term_cover, predicate_cover))
    
    argmax = np.nanargmax(information_gain_list)
    predicate = predicate_list[argmax]

    print('predicate_list:', predicate_list, file=stderr)
    print('information_gain_list:', information_gain_list, file=stderr)

    left_subtree_input_set = predicate_cover[predicate]
    left_subtree_term_cover = {
        term: left_subtree_input_set & term_cover_of_term
        for term, term_cover_of_term
        in term_cover.items()
    }
    left_subtree_predicate_set = predicate_set - {predicate}
    left_subtree_predicate_cover = {
        left_subtree_predicate: left_subtree_input_set & predicate_cover[left_subtree_predicate]
        for left_subtree_predicate in left_subtree_predicate_set
    }
    left_subtree = construct_decision_tree(
        left_subtree_input_set,
        left_subtree_term_cover,
        left_subtree_predicate_set,
        left_subtree_predicate_cover
    )

    right_subtree_input_set = input_set - predicate_cover[predicate]
    right_subtree_term_cover = {
        term: right_subtree_input_set & term_cover_of_term
        for term, term_cover_of_term
        in term_cover.items()
    }
    right_subtree_predicate_set = predicate_set - {predicate}
    right_subtree_predicate_cover = {
        right_subtree_predicate: right_subtree_input_set & predicate_cover[right_subtree_predicate]
        for right_subtree_predicate in right_subtree_predicate_set
    }
    right_subtree = construct_decision_tree(
        right_subtree_input_set,
        right_subtree_term_cover,
        right_subtree_predicate_set,
        right_subtree_predicate_cover
    )

    return (predicate, left_subtree, right_subtree)

```
def divide_and_conquer_enumerative_approach(
    non_terminals,
    terminals,
    production_rules,
    term_non_terminal,
    predicate_non_terminal,
    inputs_and_outputs
):
    term_iterator = bottom_up_tree_search(non_terminals, terminals, production_rules, term_non_terminal)
    predicate_iterator = bottom_up_tree_search(non_terminals, terminals, production_rules, predicate_non_terminal)

    input_set = { input_ for input_, output in inputs_and_outputs }

    term_cover = dict()

    predicate_set = set()
    predicate_cover = dict()

    def generate_and_handle_new_term():
        nonlocal term_iterator, inputs_and_outputs, term_cover

        # Generate new term
        term = next(term_iterator)

        # Update term_cover
        term_cover_of_term = set()
        for (input_, output) in inputs_and_outputs:
            if term.subs(input_) == output:
                term_cover_of_term.add(input_)
        
        term_cover[term] = term_cover_of_term
    
    def generate_and_handle_new_predicate():
        nonlocal predicate_iterator, inputs_and_outputs, predicate_set, predicate_cover

        # Generate new predicate
        predicate = next(predicate_iterator)

        # Add to predicate_set
        predicate_set.add(predicate)

        # Update predicate_cover
        predicate_cover_of_predicate = set()
        for (input_, output) in inputs_and_outputs:
            if predicate.subs(input_):
                predicate_cover_of_predicate.add(input_)
        
        predicate_cover[predicate] = predicate_cover_of_predicate


    while set().union(*term_cover.values()) != input_set:
        generate_and_handle_new_term()
    
    print('term_cover:', term_cover, file=stderr)
    
    while True:
        generate_and_handle_new_term()
        generate_and_handle_new_predicate()

        print('term_cover:', term_cover, file=stderr)
        print('predicate_cover:', predicate_cover, file=stderr)

        # Construct a Decision Tree and yield
        try:
            yield construct_decision_tree(input_set, term_cover, predicate_set, predicate_cover)
        except Exception as e:
            # Write the class of the exception and the message of the exception to stderr
            print(type(e).__name__, str(e), file=stderr)

```

In [3]:
from sympy.core.add import Add
from sympy.core.function import Function
from sympy.core.mul import Mul
from sympy.core.numbers import Integer
from sympy.core.relational import Equality, GreaterThan, LessThan
from sympy.core.symbol import Symbol
from sympy.logic.boolalg import And, Or, Not
from sympy.functions.elementary.piecewise import ExprCondPair, Piecewise


S = Symbol('S')
B = Symbol('B')
x = Symbol('x')
y = Symbol('y')

non_terminals = { S, B }
terminals = { x, y }
# non_terminals to a set of tuples containing the production_rule in reverse_polish_notation
production_rules = {
  S: {
    # x
    (x,),
    # y
    (y,),
    # 0
    (Integer(0),),
    # 1
    (Integer(1),),
    # + S S
    (S, S, (Add, 2)),
    # - S S
    (S, Integer(-1), S, (Mul, 2), (Add, 2)),
    # ite B S S
    (S, B, (ExprCondPair, 2), S, True, (ExprCondPair, 2), (Piecewise, 2)),
  },
  B: {
    # and B B
    (B, B, (And, 2)),
    # or B B
    (B, B, (Or, 2)),
    # not B
    (B, (Not, 1)),
    # <= S S
    (S, S, (LessThan, 2)),
    # = S S
    (S, S, (Equality, 2)),
    # >= S S
    (S, S, (GreaterThan, 2)),
  }
}
start_symbol = S

function_declaration = max2 = Function('max2')
constraint = And(GreaterThan(max2(x, y), x), GreaterThan(max2(x, y), y), Or(Equality(max2(y, x), x), Equality(max2(y, x), y)))

In [4]:
from bottom_up_tree_search import *

term_iterator = enumeration(non_terminals, terminals, production_rules, start_symbol)
predicate_iterator = enumeration(non_terminals, terminals, production_rules, B)

In [5]:
term_cover = dict()
predicate_set = set()
predicate_cover = dict()

In [6]:
# Updated from verification oracle
counterexample_input_set = set()

In [7]:
def evaluate_candidate_program_on_input(
    function_declaration,
    constraint,
    candidate_program,
    counterexample_input
):
    return replace_function_declaration_in_constraint_with_candidate_program(
        counterexample_input,
        function_declaration,
        constraint,
        candidate_program
    ).subs(counterexample_input)

In [8]:
def generate_and_handle_new_term():
    global function_declaration, constraint, term_iterator, term_cover, counterexample_input_set

    # Generate new term
    term = next(term_iterator)

    # Update term_cover
    term_cover_of_term = set()
    for counterexample_input in counterexample_input_set:
        if evaluate_candidate_program_on_input(
            function_declaration,
            constraint,
            term,
            counterexample_input
        ):
            term_cover_of_term.add(counterexample_input)

    term_cover[term] = term_cover_of_term

In [9]:
def generate_and_handle_new_predicate():
    global function_declaration, constraint, predicate_iterator, predicate_set, predicate_cover, counterexample_input_set

    # Generate new predicate
    predicate = next(predicate_iterator)

    # Add to predicate_set
    predicate_set.add(predicate)

    # Update predicate_cover
    predicate_cover_of_predicate = set()
    for counterexample_input in counterexample_input_set:
        if predicate.subs(counterexample_input):
            predicate_cover_of_predicate.add(counterexample_input)
    
    predicate_cover[predicate] = predicate_cover_of_predicate

```python
    while set().union(*term_cover.values()) != input_set:
        generate_and_handle_new_term()
```

In [10]:
set().union(*term_cover.values())

set()

In [11]:
counterexample_input_set

set()

```python
    while True:
        generate_and_handle_new_term()
        generate_and_handle_new_predicate()

        print('term_cover:', term_cover, file=stderr)
        print('predicate_cover:', predicate_cover, file=stderr)

        # Construct a Decision Tree and yield
        try:
            yield construct_decision_tree(input_set, term_cover, predicate_set, predicate_cover)
        except Exception as e:
            # Write the class of the exception and the message of the exception to stderr
            print(type(e).__name__, str(e), file=stderr)
```

In [12]:
generate_and_handle_new_term()

In [13]:
term_cover

{0: set()}

In [14]:
generate_and_handle_new_predicate()

In [15]:
predicate_cover

{True: set()}

```python
def construct_decision_tree(
    counterexample_input_set,
    term_cover,
    predicate_set,
    predicate_cover
):
    print('construct_decision_tree', counterexample_input_set, term_cover, predicate_set, predicate_cover, file=stderr)
    
    terms_covering_all_inputs_list = [
        term
        for term, term_cover_of_term
        in term_cover.items()
        if counterexample_input_set.issubset(term_cover_of_term)
    ]

    print('terms_covering_all_inputs_list', terms_covering_all_inputs_list, file=stderr)

    if terms_covering_all_inputs_list:
        return terms_covering_all_inputs_list.pop()
    
    if not predicate_set:
        raise EmptyPredicateSetError
    
    predicate_list = []
    information_gain_list = []

    for predicate in predicate_set:
        predicate_list.append(predicate)
        information_gain_list.append(calculate_information_gain(predicate, counterexample_input_set, term_cover, predicate_cover))
    
    argmax = np.nanargmax(information_gain_list)
    predicate = predicate_list[argmax]

    print('predicate_list:', predicate_list, file=stderr)
    print('information_gain_list:', information_gain_list, file=stderr)

    left_subtree_input_set = predicate_cover[predicate]
    left_subtree_term_cover = {
        term: left_subtree_input_set & term_cover_of_term
        for term, term_cover_of_term
        in term_cover.items()
    }
    left_subtree_predicate_set = predicate_set - {predicate}
    left_subtree_predicate_cover = {
        left_subtree_predicate: left_subtree_input_set & predicate_cover[left_subtree_predicate]
        for left_subtree_predicate in left_subtree_predicate_set
    }
    left_subtree = construct_decision_tree(
        left_subtree_input_set,
        left_subtree_term_cover,
        left_subtree_predicate_set,
        left_subtree_predicate_cover
    )

    right_subtree_input_set = counterexample_input_set - predicate_cover[predicate]
    right_subtree_term_cover = {
        term: right_subtree_input_set & term_cover_of_term
        for term, term_cover_of_term
        in term_cover.items()
    }
    right_subtree_predicate_set = predicate_set - {predicate}
    right_subtree_predicate_cover = {
        right_subtree_predicate: right_subtree_input_set & predicate_cover[right_subtree_predicate]
        for right_subtree_predicate in right_subtree_predicate_set
    }
    right_subtree = construct_decision_tree(
        right_subtree_input_set,
        right_subtree_term_cover,
        right_subtree_predicate_set,
        right_subtree_predicate_cover
    )

    return (predicate, left_subtree, right_subtree)
```

In [16]:
terms_covering_all_inputs_list = [
    term
    for term, term_cover_of_term
    in term_cover.items()
    if counterexample_input_set.issubset(term_cover_of_term)
]

In [17]:
terms_covering_all_inputs_list

[0]