In [3]:
from classes import *
from utils import *
import math

from typing import List, Set, Tuple, Any, Dict, DefaultDict, Optional
from collections import defaultdict
import linearize_io_helper as hp

%load_ext nb_black
%load_ext autoreload
%autoreload 2

# make the font of the plots bigger
plt.rcParams.update({'font.size': 14})


The nb_black extension is already loaded. To reload it, use:
  %reload_ext nb_black
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<IPython.core.display.Javascript object>

In [None]:
def linearize_io(spec: List[Call], verbose=False):
    sort_by_var: DefaultDict[int, List[Call]] = defaultdict(list)
    false_cases: List[CallCAS] = []
    true_cases: List[CallCAS] = []

    hp.populate_call_bins(spec, sort_by_var, true_cases, false_cases)

    writes = hp.basic_io_checks(sort_by_var)
    if writes is None:
        if verbose:
            print("basic_io_checks failed")
        return False

    if hp.isAny_cas_intersect_write(false_cases, writes):
        raise Exception("Assumption Violation: CAS intersects Write")

    if not hp.basic_true_cas_checks(true_cases):
        if verbose:
            print("basic_true_cas_checks failed")
        return False

    hp.topological_true_cas_sort(true_cases)

    true_cas_var_groups = hp.make_true_cas_var_groups(true_cases)

    if not hp.intra_group_check(sort_by_var, true_cas_var_groups):
        if verbose:
            print("intra_group_check failed")
        return False

    if not hp.inter_group_check(sort_by_var, true_cas_var_groups):
        if verbose:
            print("inter_group_check failed")
        return False

    intervals: Dict[int, I] = hp.make_intervals(sort_by_var)

    if not hp.io_check(intervals):
        if verbose:
            print("io_check failed")
        return False

    blocks = hp.make_blocks(sort_by_var, intervals, true_cas_var_groups)

    false_cas_var_resolver = hp.get_false_cas_resolvers(sort_by_var, false_cases, blocks, writes, intervals)

    if not hp.false_cas_group_check(false_cas_var_resolver, writes):
        if verbose:
            print("false_cas_group_check failed")
        return False

    if verbose:
        print(blocks)
        print({f"{k} ({round(k.start, 2)} - {round(k.end,2)})": v for k, v in false_cas_var_resolver.items()})

    for false_cas in false_cases:
        if len(false_cas_var_resolver[false_cas]) == 0:
            if verbose:
                print("false_cas_var_resolver[false_cas] == 0")
            return False

    # it is linearizable
    for false_cas in false_cases:
        v = false_cas_var_resolver[false_cas].pop()
        sort_by_var[v].append(false_cas)

    hp.set_order(sort_by_var, true_cas_var_groups)

    return True


In [84]:
test = load_test("testio.pkl")

5000it [00:00, 27680.75it/s]


<IPython.core.display.Javascript object>

In [None]:
def run_test(testsample: List[Tuple[List[Call], bool]]):
    wrong_test_no = []
    for i in tqdm.tqdm(range(len(testsample))):
        testcase, res = copy.deepcopy(testsample[i])
        try:
            sol = linearize_io(testcase)
            s2 = (sol is True)
            if res ^ s2:
                wrong_test_no.append(i)

        except AssertionError as e:
            raise e

    print(f"Tests failed: {len(wrong_test_no)}")
    if len(wrong_test_no) == 0:
        print("All tests passed")
    else:
        print(f"First failed: {str(wrong_test_no[:10]).strip('[]')} ...")
    
    return wrong_test_no

In [4]:
test = load_test("6.pkl")
test *= 1
for testcase, res in tqdm.tqdm(test):
    linearize_generic(testcase, StateIO())

for testcase, res in tqdm.tqdm(test):
    linearize_io(testcase)


1000it [00:00, 20484.80it/s]
100%|██████████| 1000/1000 [00:52<00:00, 18.95it/s]
100%|██████████| 1000/1000 [00:00<00:00, 11566.19it/s]


<IPython.core.display.Javascript object>

In [None]:
wrong_test_no = run_test(test)

In [None]:
i = 725564
testcase, res = copy.deepcopy(test[i])
print(f"Testcase {i} is {res}")
print(linearize_io(testcase, verbose=False))
# sol = linearize_generic(testcase, StateIO())
# visualize_history(testcase)
visualize_history(testcase)

In [None]:
testcase, res = copy.deepcopy(test[i])
print(f"Testcase {i} is {res}")
visualize_history(testcase)

In [None]:
for wrong_test in no[:10]:
    testcase, res = copy.deepcopy(test[wrong_test])
    print(f"Testcase {wrong_test} is {res}")
    print(linearize_io(testcase, verbose=True))
    visualize_history(testcase)