# Playground for Mutation Testing with Execution Taints

This jupyter notebook reimplements our prototype library for execution taints for python.
As multiprocessing is limited in jupyter notebooks only the non-forking version is shown.

We start by importing a few prerequisites.

In [None]:
import types
from typing import Any, Optional, Union, TypeVar, Tuple, Callable, Dict, Iterable
from copy import deepcopy
from itertools import chain
import traceback
from functools import wraps

We also use a logger for better messaging.

In [None]:
import logging
logger = logging.getLogger()

### Constants and Exceptions
#### MAINLINE

The _MAINLINE_ execution is the path taken by the original program. We define `MAINLINE` to be `0`.

In [None]:
MAINLINE = 0

#### PRIMITIVE_TYPES

This is used to decide what return values should be untainted when returning from a function.

In [None]:
PRIMITIVE_TYPES = [bool, int, float]

### Exceptions

#### ShadowExceptionStop
No more mutants alive, stop this execution

In [None]:
class ShadowExceptionStop(Exception): pass

#### ShadowException
Wraps a exception that happened during the run.

In [None]:
class ShadowException(Exception): pass

### Active and Killed Mutants

In [None]:
_LOGICAL_PATH = MAINLINE
_SELECTED_MUTANT: Union[int, None] = None
_STRONGLY_KILLED: set[int] = set()
_SEEN_MUTANTS: set[int] = set()
_FUNCTION_SEEN_MUTANTS: set[int] = set()
_FUNCTION_MASKED_MUTANTS: set[int] = set()
_MASKED_MUTANTS: set[int] = set()

def reinit_path(logical_path: Union[int, None]) -> None:
    global _LOGICAL_PATH
    global _SELECTED_MUTANT
    global _STRONGLY_KILLED
    global _NS_ACTIVE_MUTANTS
    global _SEEN_MUTANTS
    global _FUNCTION_SEEN_MUTANTS
    global _FUNCTION_MASKED_MUTANTS
    global _MASKED_MUTANTS
    global _SHARED_RESULTS_PATH

    if logical_path is not None:
        _LOGICAL_PATH = logical_path
    else:
        _LOGICAL_PATH = MAINLINE

    _SELECTED_MUTANT = None
    _STRONGLY_KILLED = set()

    _SEEN_MUTANTS = set()
    _FUNCTION_SEEN_MUTANTS = set()
    _FUNCTION_MASKED_MUTANTS = set()
    _MASKED_MUTANTS = set()

def get_logical_path() -> int:
    return _LOGICAL_PATH

def set_logical_path(path: int) -> None:
    global _LOGICAL_PATH
    _LOGICAL_PATH = path

def active_mutants() -> set[int]:
    global _SEEN_MUTANTS
    global _MASKED_MUTANTS
    return _SEEN_MUTANTS - _MASKED_MUTANTS

def try_next_logical_path() -> None:
    cur_active_mutants = active_mutants()
    if cur_active_mutants:
        set_logical_path(cur_active_mutants.pop())
    else:
        raise ShadowExceptionStop()

def add_strongly_killed(mut: int) -> None:
    global _MASKED_MUTANTS

    if mut in _STRONGLY_KILLED:
        logger.warning(f"redundant strongly killed: {mut}")
        assert False

    _MASKED_MUTANTS.add(mut)
    _STRONGLY_KILLED.add(mut)

    if get_logical_path() == mut:
        try_next_logical_path()


def get_selected_mutant() -> Union[int, None]:
    return _SELECTED_MUTANT

def set_selected_mutant(mut: Union[int, None]):
    global _SELECTED_MUTANT
    _SELECTED_MUTANT = mut

def get_strongly_killed() -> set[int]:
    return _STRONGLY_KILLED


def merge_strongly_killed(killed: set[int]):
    global _STRONGLY_KILLED
    _STRONGLY_KILLED |= killed


def get_seen_mutants() -> set[int]:
    return _SEEN_MUTANTS


def get_function_seen() -> set[int]:
    return _FUNCTION_SEEN_MUTANTS


def get_function_masked() -> set[int]:
    return _FUNCTION_MASKED_MUTANTS


def add_function_seen_mutants(muts: set[int]):
    global _FUNCTION_SEEN_MUTANTS
    _FUNCTION_SEEN_MUTANTS |= muts


def reset_function_seen() -> set[int]:
    global _FUNCTION_SEEN_MUTANTS
    _FUNCTION_SEEN_MUTANTS.clear()


def reset_function_masked() -> set[int]:
    global _FUNCTION_MASKED_MUTANTS
    _FUNCTION_MASKED_MUTANTS.clear()


def add_seen_mutants(muts: set[int]):
    global _SEEN_MUTANTS
    global _FUNCTION_SEEN_MUTANTS
    _SEEN_MUTANTS |= muts
    _FUNCTION_SEEN_MUTANTS |= muts


def get_masked_mutants() -> set[int]:
    return _MASKED_MUTANTS


def set_masked_mutants(masked: set[int]) -> None:
    global _MASKED_MUTANTS
    _MASKED_MUTANTS = masked


def add_masked_mutants(masked: set[int]):
    global _MASKED_MUTANTS
    global _FUNCTION_MASKED_MUTANTS
    _MASKED_MUTANTS |= masked
    _FUNCTION_MASKED_MUTANTS |= masked


def remove_masked_mutants(masked: set[int]):
    global _MASKED_MUTANTS
    _MASKED_MUTANTS -= masked


def t_get_killed() -> dict[str, Any]:
    res = {
        'strong': _STRONGLY_KILLED,
        'masked': _MASKED_MUTANTS,
        'seen': _SEEN_MUTANTS,
    }
    return res

### shadow_get()

When we want something from the shadow, if it is not there, get it from mainline.

In [None]:
def shadow_get(shadow, path) -> Any:
    if path in shadow:
        return shadow[path]
    elif MAINLINE in shadow:
        return shadow[MAINLINE]
    else:
        raise ValueError()

### The ShadowVariable Class

The `ShadowVariable` class is used to manage execution taints for variables of all types, be it primitive types, container, or objects created from wrapped classes.
The basic approach to create the execution traces, implemented by this class, is to create a independent copy of the variables for each stored taint. Accesses to the taint values, through attributes, methods or directly, are passed through to each wrapped taint value, the then resulting values for each access are combined in a ShadowVariable.

In [None]:
class ShadowVariable:
    _shadow: dict[int, Any]
    __slots__ = ['_shadow']

There are two ways to initialize a shadow variable. One where we produce a new shadow variable from a new object, and the second, when we produce a new shadow variable from a given mapping.

In [None]:
class ShadowVariable(ShadowVariable):
    def __init__(self, values: Any, from_mapping: bool):
        if not from_mapping:
            self._shadow = self._init_from_object(values)
        else:
            self._shadow = self._init_from_mapping(values)

In [None]:
SV = TypeVar('SV', bound="ShadowVariable")

#### Dunder methods
##### Allowed DUNDER

In [None]:
ALLOWED_UNARY_DUNDER_METHODS = {
    '__abs__':   lambda args, kwargs, a, _: abs(a, *args, **kwargs),
    '__round__': lambda args, kwargs, a, _: round(a, *args, **kwargs),
    '__neg__':   lambda args, kwargs, a, _: -a,
    '__len__':   lambda args, kwargs, a, _: len(a),
    '__index__': lambda args, kwargs, a, _: a.__index__(),
}

ALLOWED_BOOL_DUNDER_METHODS = {
    '__add__':        lambda args, kwargs, a, b: a + b,
    '__sub__':        lambda args, kwargs, a, b: a - b,
    '__truediv__':    lambda args, kwargs, a, b: a / b,
    '__floordiv__':   lambda args, kwargs, a, b: a // b,
    '__mul__':        lambda args, kwargs, a, b: a * b,
    '__pow__':        lambda args, kwargs, a, b: a ** b,
    '__mod__':        lambda args, kwargs, a, b: a % b,
    '__and__':        lambda args, kwargs, a, b: a & b,
    '__or__':         lambda args, kwargs, a, b: a | b,
    '__xor__':        lambda args, kwargs, a, b: a ^ b,
    '__lshift__':     lambda args, kwargs, a, b: a << b,
    '__rshift__':     lambda args, kwargs, a, b: a >> b,
    '__eq__':         lambda args, kwargs, a, b: a == b,
    '__ne__':         lambda args, kwargs, a, b: a != b,
    '__le__':         lambda args, kwargs, a, b: a <= b,
    '__lt__':         lambda args, kwargs, a, b: a < b,
    '__ge__':         lambda args, kwargs, a, b: a >= b,
    '__gt__':         lambda args, kwargs, a, b: a > b,

    # same methods but also do the reversed side
    '__radd__':       lambda args, kwargs, b, a: a + b,
    '__rsub__':       lambda args, kwargs, b, a: a - b,
    '__rtruediv__':   lambda args, kwargs, b, a: a / b,
    '__rfloordiv__':  lambda args, kwargs, b, a: a // b,
    '__rmul__':       lambda args, kwargs, b, a: a * b,
    '__rpow__':       lambda args, kwargs, b, a: a ** b,
    '__rmod__':       lambda args, kwargs, b, a: a % b,
    '__rand__':       lambda args, kwargs, b, a: a & b,
    '__ror__':        lambda args, kwargs, b, a: a | b,
    '__rxor__':       lambda args, kwargs, b, a: a ^ b,
    '__rlshift__':    lambda args, kwargs, b, a: a << b,
    '__rrshift__':    lambda args, kwargs, b, a: a >> b,
    '__req__':        lambda args, kwargs, b, a: a == b,
    '__rne__':        lambda args, kwargs, b, a: a != b,
    '__rle__':        lambda args, kwargs, b, a: a <= b,
    '__rlt__':        lambda args, kwargs, b, a: a < b,
    '__rge__':        lambda args, kwargs, b, a: a >= b,
    '__rgt__':        lambda args, kwargs, b, a: a > b,
}

##### Passthrough DUNDER

In [None]:
PASSTHROUGH_DUNDER_METHODS = {
    # '__init__' is already manually defined, after instantiation of SV
    # the passthrough will happen through __getattribute__
    '__iter__',
    '__next__',
    '__setitem__',
    '__getitem__',
}

##### Disallowed DUNDER

We have not implemented these yet, though many will just work if they are moved to the passthrough list.

In [None]:
DISALLOWED_DUNDER_METHODS = [
    '__aenter__', '__aexit__', '__aiter__', '__anext__', '__await__',
    '__bytes__', '__call__', '__cmp__', '__complex__', '__contains__',
    '__delattr__', '__delete__', '__delitem__', '__delslice__',  
    '__enter__', '__exit__', '__fspath__',
    '__get__', '__getslice__', 
    '__import__', '__imul__', 
    '__int__', '__invert__',
    '__ior__', '__ixor__', 
    '__nonzero__',
    '__pos__', '__prepare__', '__rdiv__',
    '__rdivmod__', '__repr__', '__reversed__',
    '__set__',
    '__setslice__', '__sizeof__', '__subclasscheck__', '__subclasses__',
    '__divmod__',
    '__div__',
]

These are not implemented because Python enforces that the specific type is returned in these cases, we cannot override these dunders to return a `ShadowVariable`. Hence, disallow these dunders to avoid accidentally losing taint info

In [None]:
DISALLOWED_DUNDER_METHODS.extend([
    '__bool__', '__float__',

    # ShadowVariable needs to be pickleable for caching to work.
    # so '__reduce__', '__reduce_ex__', '__class__' are implemented.
    # For debugging '__dir__' is not disallowed.
])

##### Ignored DUNDER

In [None]:
LIST_OF_IGNORED_DUNDER_METHODS = [
    '__new__', '__init__', '__init_subclass__', '__instancecheck__', '__getattribute__', 
    '__setattr__', '__str__', '__format__', 
    '__iadd__', '__getnewargs__', '__getnewargs_ex__', '__iand__', '__isub__', 
]

##### Defining DUNDER

Now use the previously defined lambdas to generate wrapped dunder methods for the `ShadowVariable`.

In [None]:
def not_allowed(obj, method, *args, **kwargs) -> None:
    logger.error("{method} %s %s %s", obj, args, kwargs)
    raise NotImplementedError("dunder method {method} is not allowed")

In [None]:
class ShadowVariable(ShadowVariable):
    for method in ALLOWED_UNARY_DUNDER_METHODS.keys():
        exec(f"""
    def {method}(self, *args, **kwargs):
        # assert len(args) == 0 and len(kwargs) == 0, f"{{len(args)}} == 0 and {{len(kwargs)}} == 0"
        return self._do_unary_op("{method}", *args, **kwargs)
        """.strip())

    # self.{method} = lambda other, *args, **kwargs: self._do_bool_op(other, *args, **kwargs)
    for method in ALLOWED_BOOL_DUNDER_METHODS.keys():
        exec(f"""
    def {method}(self, other, *args, **kwargs):
        assert len(args) == 0 and len(kwargs) == 0, f"{{len(args)}} == 0 and {{len(kwargs)}} == 0"
        return self._do_bool_op(other, "{method}", *args, **kwargs)
        """.strip())

    for method in DISALLOWED_DUNDER_METHODS:
        exec(f"""
    def {method}(self, *args, **kwargs):
        not_allowed(self, *args, **kwargs)
        """.strip())

    for method in PASSTHROUGH_DUNDER_METHODS:
        exec(f"""
    def {method}(self, *args, **kwargs):
        return self._callable_wrap("{method}", *args, **kwargs)
        """.strip())
        

##### Invoke DUNDER

We want to invoke the mutated expression safely, that is catching any exceptions and killing the mutation that was responsible for the exception without causing other mutants to fail.

We always execute the mainline first.

In [None]:
class ShadowVariable(ShadowVariable):
    def _do_op_safely(self, paths: Iterable[int],
            left: Callable[..., Any], right: Callable[..., Any],
            args: tuple[Any, ...], kwargs: dict[str, Any], op_func: Any) -> Any:

        fpart = [MAINLINE] if MAINLINE in paths else []
        spart = [p for p in set(paths) if p != MAINLINE and p in active_mutants()]
        
        res = {}
        for k in (*fpart, *spart):
            try:
                k_res = op_func(args, kwargs, left(k), right(k))
            except (ZeroDivisionError, OverflowError, TypeError, ValueError) as e:
                assert k != MAINLINE
                add_strongly_killed(k)
            except Exception as e:
                logger.error(f"Unknown Exception: {e}")
                raise e
            res[k] = k_res
        return res

In [None]:
class ShadowVariable(ShadowVariable):
    def _do_unary_op(self, op: str, *args: tuple[Any, ...], **kwargs: dict[str, Any]) -> SV:
        self_shadow = get_selected(self._shadow)
        res = self._do_op_safely(
            self_shadow.keys(),
            lambda k: self_shadow[k],
            lambda k: None,
            args,
            kwargs,
            ALLOWED_UNARY_DUNDER_METHODS[op],
        )
        return ShadowVariable(res, from_mapping=True)

    def _do_bool_op(self, other, op, *args, **kwargs):
        assert len(args) == 0 and len(kwargs) == 0, f"{args} {kwargs}"
        self_shadow = get_selected(self._shadow)
        if isinstance(other, ShadowVariable):
            other_shadow = get_selected(other._shadow)
            # notice that both self and other has taints.
            # the result we need contains taints from both.
            masked_shadows = get_masked_mutants() - set([MAINLINE])
            common_shadows = {k for k in self_shadow if k in other_shadow} - masked_shadows
            only_self_shadows = self_shadow.keys() - common_shadows - set([MAINLINE]) - masked_shadows
            only_other_shadows = other_shadow.keys() - common_shadows - set([MAINLINE]) - masked_shadows

            if only_self_shadows:
                if MAINLINE not in other_shadow:
                    raise ShadowException()
                other_main = other_shadow[MAINLINE]
                vs_ = self._do_op_safely(
                    only_self_shadows,
                    lambda k: self_shadow[k],
                    lambda k: other_main,
                    args,
                    kwargs,
                    ALLOWED_BOOL_DUNDER_METHODS[op],
                )
            else:
                vs_ = {}

            if only_other_shadows:
                if MAINLINE not in self_shadow:
                    raise ShadowException()
                self_main = self_shadow[MAINLINE]
                vo_ = self._do_op_safely(
                    only_other_shadows,
                    lambda k: self_main,
                    lambda k: other_shadow[k],
                    args,
                    kwargs,
                    ALLOWED_BOOL_DUNDER_METHODS[op],
                )
            else:
                vo_ = {}

            # if there was a pre-existing taint of the same name, this mutation was
            # already executed. So, use that value.
            cs_ = self._do_op_safely(
                common_shadows,
                lambda k: self_shadow[k],
                lambda k: other_shadow[k],
                args,
                kwargs,
                ALLOWED_BOOL_DUNDER_METHODS[op],
            )
            res = {**vs_, **vo_, **cs_}
        else:
            res = self._do_op_safely(
                self_shadow.keys(),
                lambda k: self_shadow[k],
                lambda k: other,
                args,
                kwargs,
                ALLOWED_BOOL_DUNDER_METHODS[op],
            )
        return ShadowVariable(res, from_mapping=True)

#### create_shadow_variable()

In [None]:
class ShadowVariable(ShadowVariable):
    def _from_container(self, obj: Any, converter: Any) ->  dict[int, Any]:
        # handle container values
        combined = {MAINLINE: []}
        for elem in obj:
            if isinstance(elem, ShadowVariable):
                # make a copy for each path that is new
                for path in elem._shadow:
                    if path not in combined:
                        combined[path] = deepcopy(combined[MAINLINE])

                # append the corresponding path value for each known path
                for path in combined:
                    if path in elem._shadow:
                        combined[path].append(elem._shadow[path])
                    else:
                        combined[path].append(elem._shadow[MAINLINE])

            else:
                for elems in combined.values():
                    elems.append(elem)
        # convert each path value back to a converter
        return {path:converter(combined[path]) for path in combined}

    def _from_dict(self, obj: Any) ->  dict[int, Any]:
        if not obj: return {MAINLINE: {}}
    
        # This is a bit tricky as dict can have untainted and ShadowVariables as key and value.
        # A few examples on the left the dict obj and on the right (->) the resulting ShadowVariable (sv):
        # {0: 0}                                       -> sv(0: {0: 0})
        # {0: sv(0: 1, 1: 2)}                          -> sv(0: {0: 1}, 1: {0: 2})
        # {sv(0: 0, 1: 1): 0}                          -> sv(0: {0: 0}, 1: {1: 0})
        # {sv(0: 0, 1: 1, 2: 2): sv(0: 0, 2: 2, 3: 3)} -> sv(0: {0: 0}, 1: {1: 0}, 2: {2: 2}, 3: {0: 3})
        #
        # There can also be multiple key value pairs.
        # {0: 0, sv(0: 1, 2: 2): 2}                    -> sv(0: {0: 0, 1: 2}, 2: {0: 0, 2: 2})

        # First expand all possible combinations for each key value pair.
        all_expanded = []
        for key, data in obj.items():
            # Get all paths for a key, value pair.
            expanded = {}
            if isinstance(key, ShadowVariable):
                key_shadow = key._shadow
                key_paths = set(key_shadow.keys())
            else:
                # If it is an untainted value, that is equivalent to the mainline path.
                key_paths = set([MAINLINE])

            if isinstance(data, ShadowVariable):
                data_shadow = data._shadow
                data_paths = set(data_shadow.keys())
            else:
                data_paths = set([MAINLINE])

            all_paths = key_paths | data_paths

            # Expand each combination.
            for path in all_paths:
                if isinstance(key, ShadowVariable):
                    if path in key_shadow:
                        path_key = key_shadow[path]
                    else:
                        path_key = key_shadow[MAINLINE]
                else:
                    path_key = key

                if isinstance(data, ShadowVariable):
                    if path in data_shadow:
                        path_data = data_shadow[path]
                    else:
                        path_data = data_shadow[MAINLINE]
                else:
                    path_data = data

                expanded[path] = (path_key, path_data)
            all_expanded.append(expanded)

        # Get all paths that are needed in the resulting ShadowVariable.
        combined = {}
        for path in chain(*[paths.keys() for paths in all_expanded]):
            combined[path] = {}
        # Build the resulting path dictionaries from the expanded combinations.
        for expanded in all_expanded:
            for src_path, (key, val) in expanded.items():
                # If the combination is for mainline add it to all dictionaries, if they do not
                # have an alternative value.
                if src_path == MAINLINE:
                    for trg_path, dict_values in combined.items():
                        if trg_path == MAINLINE or (
                            trg_path != MAINLINE and trg_path not in expanded
                        ):
                            assert key not in dict_values
                            dict_values[key] = val
                else:
                    path_dict = combined[src_path]
                    assert key not in path_dict
                    path_dict[key] = val
        return combined
    

In [None]:
class ShadowVariable(ShadowVariable):
    def _init_from_object(self, obj: Any) -> None:
        shadow = {}
        value_type = type(obj)
        # OOS: optionally make sure there are no nested shadow variables in the values
        if isinstance(obj, ShadowVariable):
            # Don't wrap an existing shadow 
            shadow = obj._shadow
        elif value_type is tuple:
            shadow = self._from_container(obj, tuple)
        elif value_type is list:
            shadow = self._from_container(obj, list)
        elif value_type is set:
            shadow = self._from_container(obj, set)
        elif value_type is dict:
            shadow = self._from_dict(obj)
        else:
            shadow[MAINLINE] = obj
        return shadow

In [None]:
class ShadowVariable(ShadowVariable):
    def _init_from_mapping(self, values: Dict[int, Any]) -> Dict[int, Any]:
        assert type(values) is dict
        combined = {}
        if MAINLINE in values:
            # Keep mainline as initial value and add other values from there.
            mainline_val = values[MAINLINE]
            if isinstance(mainline_val, ShadowVariable):
                combined = mainline_val._shadow
            else:
                combined = {MAINLINE: mainline_val}
        else:
            combined = {}

        for mut_id, val in values.items():
            if mut_id == MAINLINE:
                continue

            if isinstance(val, ShadowVariable):
                combined[mut_id] = shadow_get(val._shadow, mut_id)
            else:
                assert mut_id not in combined
                combined[mut_id] = val

        return combined

In [None]:
class ShadowVariable(ShadowVariable):
    def _duplicate_mainline(self, new_path: int) -> None:
        assert new_path not in self._shadow

        mainline_val = self._shadow[MAINLINE]
        # make a copy for all shadow variants that will be needed
        copy = object.__new__(type(mainline_val))

        queue = [(mainline_val, copy, var) for var in dir(mainline_val)]
        while queue:
            cur_main, cur_copy, var_name = queue.pop(0)
            if var_name.startswith('_'):
                continue

            to_be_copied_var = cur_main.__getattribute__(var_name)

            try:
                existing_copy_var = cur_copy.__getattribute__(var_name)
            except AttributeError:
                # Var does not exist yet, just assign it.
                try:
                    cur_copy.__setattr__(var_name, deepcopy(to_be_copied_var))
                except TypeError:
                    raise NotImplementedError("Can not deepcopy variable.")
                # All done with this var.
                continue

            # Skip if is the same bound method as for mainline.
            if callable(existing_copy_var) and hasattr(existing_copy_var, '__self__'):
                assert existing_copy_var.__func__ == to_be_copied_var.__func__
                continue

            # OOS Implement copying of objects where attributes already exist.
            raise NotImplementedError()

        self._shadow[new_path] = copy

In [None]:
class ShadowVariable(ShadowVariable):
    def __hash__(self) -> int:
        """For ShadowVariable wrapped object, there are two contexts where __hash__ can be used.
        One is during normal usage of hash(sv) or sv.__hash__(), the returned value should be a ShadowVariable.
        (Note that the first version checks that the return value is a int, making this impossible.)
        The second usage is during built-in functions such as initialization of set or dict objects. To support these
        objects it is necessary to return an actual hash. (Or alternatively create a substitution class, however,
        this requires a alternative syntax for dictionaries, for example: {key: val} -> ShadowVariableSet([(key, val), ..]).
        These are solvable problems but out of scope.)
        
        Currently only a combined hash of the different path ids and path values is returned and the context where a
        ShadowVariable should be returned is ignored."""
        # OOS: How to detect that __hash__ is called in a context that should return a SV instead of the current implementation?
        return hash(tuple(self._shadow.items()))

Take an object and the name of a method, return the method without having the object associated,
    a simple function that allows changing the 'self' parameter. Also return a boolean that is true if the method is
    built-in and false if not.


In [None]:
def convert_method_to_function(obj: object, method_name: str) -> Tuple[Callable[..., Any], bool]:
    method = obj.__getattribute__(method_name)
    assert callable(method)
    # convert bound method to free standing function, this allows changing the self argument
    if isinstance(method, types.BuiltinFunctionType):
        # If the called method is built there is no direct way to get the
        # underlying function using .__func__ or similar (at least after my research).
        # Instead use the object __getattribute__ on the type, this gives the function instead of the method.
        return object.__getattribute__(type(obj), method_name), True
    elif isinstance(method, types.MethodWrapperType):
        return object.__getattribute__(type(obj), method_name), True
    else:
        return method.__func__, False

In [None]:
class ShadowVariable(ShadowVariable):
    def _callable_wrap(self, name: str, *args: tuple[Any, ...], **kwargs: dict[str, Any]) -> Any:
        log_val = self._get_logical_res(get_logical_path())
        logical_func, is_builtin = convert_method_to_function(log_val, name)
        shadow = self._shadow

        if is_builtin:
            # Need some special handling if __next__ is called, it has influence on control flow.
            # StopIteration is raised once all elements during the iteration have been passed,
            # this can vary if we have different lengths for the wrapped variables.
            method_is_next = name == '__next__'
            if method_is_next:
                next_results: dict[int, bool] = {}

            # The method is a builtin, there can not be any mutations in the builtins.
            # Apply the method to each path value and combine the results into a ShadowValue and return that instead.
            untainted_args = untaint_args(*args, **kwargs)

            all_paths = set(shadow.keys()) | set(untainted_args.keys())
            results = {}
            for path in sorted(all_paths):  # Do mainline first.
                if path != MAINLINE and path not in active_mutants():
                    # Skip inactive mutants.
                    continue

                # Get the applicable path value.
                if path in shadow:
                    initial_path_val = shadow[path]
                else:
                    initial_path_val = log_val

                # Check that path and log would use the same function.
                path_func, _ = convert_method_to_function(initial_path_val, name)
                if path_func != logical_func:
                    raise NotImplementedError()

                # If the path value is the logical/mainline value copy it as it might be used several times.
                # Note that the copy step can change the actual function being called, for example a dict_keyiterator
                # will be turned into a list_iterator. For this reason, avoid copying for __next__.
                if not method_is_next:
                    path_val = deepcopy(initial_path_val)
                else:
                    path_val = initial_path_val

                # Get the arguments for the current path, otherwise use mainline.
                if path in untainted_args:
                    path_args, path_kwargs = deepcopy(untainted_args[path])
                else:
                    path_args, path_kwargs = deepcopy(untainted_args[MAINLINE])


                try:
                    results[path] = logical_func(path_val, *path_args, **path_kwargs)
                except IndexError as e:
                    if name == '__getitem__':
                        if path == MAINLINE:
                            if get_logical_path() == MAINLINE:
                                raise NotImplementedError()

                            # Not on mainline, kill all active mutants that do not have an alternative path here.
                            for mut in active_mutants() - all_paths:
                                add_strongly_killed(mut)
                        else:
                            add_strongly_killed(path)
                        continue
                    else:
                        message = traceback.format_exc()
                        logger.error(f"Error: {e} {message}")
                        raise NotImplementedError()
                except StopIteration as e:
                    if method_is_next:
                        next_results[path] = False
                        continue
                    else:
                        raise NotImplementedError()
                except Exception as e:
                    message = traceback.format_exc()
                    logger.error(f"Error: {message}")
                    raise NotImplementedError()

                if method_is_next:
                    next_results[path] = True

                if initial_path_val != path_val:
                    self._shadow[path] = path_val

            if method_is_next:
                # Handle different lengths of iterables.
                if t_cond(ShadowVariable(next_results, from_mapping=True)):
                    return ShadowVariable(results, from_mapping=True)
                else:
                    raise StopIteration

            return ShadowVariable(results, from_mapping=True)
        
        else:
            # Treat this method as a forkable function call, the self parameter is the ShadowVariable.
            diverging_mutants = []
            companion_mutants = []

            for path, val in self._shadow.items():
                if path == MAINLINE or path == get_logical_path():
                    continue

                val_func, _ = convert_method_to_function(val, name)
                if val_func == logical_func:
                    companion_mutants.append(path)
                else:
                    diverging_mutants.append(path)

            add_masked_mutants(set(diverging_mutants))

            wrapped_fun = t_wrap(logical_func)
            return wrapped_fun(self, *args, **kwargs)
        

In [None]:
class ShadowVariable(ShadowVariable):
    def __repr__(self):
        return f"ShadowVariable({self._shadow})"

    def __getstate__(self) -> dict[int, Any]:
        return self._shadow

    def __setstate__(self, attributes):
        self._shadow = attributes

    def _get_paths(self):
        return self._shadow.keys()

    def _keep_active(self, seen: set[int], masked: set[int]) -> None:
        self._shadow = get_active(self._shadow, seen, masked)

    def _get_logical_res(self, logical_path: int) -> Any:
        if logical_path in self._shadow:
            return self._shadow[logical_path]
        else:
            return self._shadow[MAINLINE]

    def _all_path_results(self, seen_mutants, masked_mutants):
        paths = seen_mutants - masked_mutants - set([MAINLINE])  # self._get_paths()

        if MAINLINE in self._shadow:
            yield MAINLINE, self._shadow[MAINLINE]

        for path in paths:
            yield path, shadow_get(self._shadow, path)

    def _maybe_untaint(self) -> Union[SV, Any]:
        mainline_type = type(self._shadow[MAINLINE])
        if mainline_type not in PRIMITIVE_TYPES: return self
    
        # Only return a untainted version if shadow only contains the 
        # mainline value and that value is a primitive type.
        if len(self._shadow) == 1 and MAINLINE in self._shadow:
            return self._shadow[MAINLINE]

        return self

    def _maybe_overwrite(self, other: Any, seen: set[int], masked: set[int], including_main: bool):
        if isinstance(other, ShadowVariable):
            for path, val in other._all_path_results(seen, masked):
                if path == MAINLINE and not including_main:
                    continue
                if shadow_get(self._shadow, path) != val:
                    self._shadow[path] = val
        elif type(other) is dict:
            assert False, f"merge with type not handled: {other}"
        else:
            if including_main:
                self._shadow[MAINLINE] = val
            for aa in seen - masked:
                assert aa != MAINLINE
                if shadow_get(self._shadow, aa) != other:
                    self._shadow[aa] = val

In [None]:
class ShadowVariable(ShadowVariable):
    def __setattr__(self, name: str, value: Any) -> Any:
        if name.startswith("_"):
            # logger.debug(f"super setattr {name, value}")
            return super(ShadowVariable, self).__setattr__(name, value)

        self_shadow = get_selected(self._shadow)

        # Need to handle ShadowVariable arguments differently than normal ones.
        if isinstance(value, ShadowVariable):
            other_shadow = get_selected(value._shadow)

            # duplicate mainline for all paths that are not already in mainline
            # note that this updates self_shadow, in particular also the keys used during the next _do_op_safely call
            for os in other_shadow:
                if os not in self_shadow:
                    self._duplicate_mainline(os)

            # Assign the respective value for each path separately.
            res = self._do_op_safely(
                self_shadow.keys(),
                lambda k: self_shadow[k],
                lambda k: shadow_get(other_shadow, k),
                tuple(),
                dict(),
                lambda _1, _2, obj, new_val: obj.__setattr__(name, new_val),
            )
        else:
            # Just assign the same value to all paths.
            res = self._do_op_safely(
                self_shadow.keys(),
                lambda k: self_shadow[k],
                lambda _: value,
                tuple(),
                dict(),
                lambda _1, _2, obj, new_val: obj.__setattr__(name, new_val),
            )

        return res

    def __getattribute__(self, name: str) -> Any:
        if name.startswith("_"):
            # __init__ is manually defined for ShadovVariable but can also be later called during usage of ShadowVariable.
            # In the second case we want to call __init__ on the path values instead.
            # This requires a special case here.
            if name != "__init__":
                return super(ShadowVariable, self).__getattribute__(name)
        
        log_res = self._get_logical_res(get_logical_path()).__getattribute__(name)
        
        if callable(log_res):
            # OOS returning a lambda here only works if the callable is only called
            # it does not work if the callable is compared to some other function or other edge cases
            # maybe return a dedicated object instead that raises errors for edge cases / implements them correctly
            return lambda *args, **kwargs: self._callable_wrap(name, *args, **kwargs)
            
        results = {}
        for path, val in self._shadow.items():
            try:
                res = val.__getattribute__(name)
            except:
                raise NotImplementedError()

            results[path] = res

        return ShadowVariable(results, from_mapping=True)


### t_class

#### _NEW_NO_INIT
When a wrapped class is copied init should not be called for the wrapped objects, otherwise there will be infinite recursion, as init is a wrapped function.
This variable indicates if init should be called when creating a new object or not. Also this is needed when loading results for  forks, same reason.

In [None]:
_NEW_NO_INIT = False

In [None]:
def set_new_no_init() -> None:
    global _NEW_NO_INIT
    _NEW_NO_INIT = True


def unset_new_no_init() -> None:
    global _NEW_NO_INIT
    _NEW_NO_INIT = False


def copy_args(args, kwargs):
    set_new_no_init()
    copied = deepcopy((args, kwargs))
    unset_new_no_init()
    return copied

In [None]:
def t_class(orig_class):
    orig_new = orig_class.__new__

    def wrap_new(cls, *args, **kwargs):
        new = orig_new(cls)

        if _NEW_NO_INIT:
            # If loading from pickle (happens when combining forks), no need to wrap in ShadowVariable
            return new

        else:
            # For actual usage wrap the object inside a ShadowVariable
            obj = ShadowVariable(new, False)

            # only call __init__ if instance of cls is returned
            # https://docs.python.org/3/reference/datamodel.html#object.__new__
            if isinstance(new, cls):
                obj.__init__(*args, **kwargs)

            return obj

    orig_class._orig_new = orig_new
    orig_class.__new__ = wrap_new
    return orig_class

In [None]:
def untaint_args(*args: tuple[Union[ShadowVariable, Any], ...], **kwargs: dict[str, Union[ShadowVariable, Any]]) -> dict[int, Any]:
    """Get a mapping of each path to args and kwargs for each path available from the ShadowVariable in the arguments."""
    all_muts = set([MAINLINE])
    for arg in args + tuple(kwargs.values()):
        if isinstance(arg, ShadowVariable):
            all_muts |= arg._get_paths()

    mainline_incomplete = False

    untainted_args = {}
    for mut in all_muts:

        mut_args = []
        for arg in args:
            if isinstance(arg, ShadowVariable):
                arg_shadow = arg._shadow
                if mut in arg_shadow:
                    mut_args.append(arg_shadow[mut])
                elif MAINLINE in arg_shadow:
                    mut_args.append(arg_shadow[MAINLINE])
                else:
                    mainline_incomplete = True
                    continue
            else:
                mut_args.append(arg)

        mut_kwargs = {}
        for name, arg in kwargs.items():
            if isinstance(arg, ShadowVariable):
                arg_shadow = arg._shadow
                if mut in arg_shadow:
                    mut_kwargs[name] = arg_shadow[mut]
                elif MAINLINE in arg_shadow:
                    mut_kwargs[name] = arg_shadow[MAINLINE]
                else:
                    mainline_incomplete = True
                    continue
            else:
                mut_kwargs[name] = arg

        untainted_args[mut] = (tuple(mut_args), dict(mut_kwargs))

    # Could not get a value for mainline for every argument. (Some sv do not have a mainline value)
    if mainline_incomplete:
        # This should only happen for non-mainline paths.
        assert get_logical_path() != MAINLINE
        del untainted_args[MAINLINE]

    return untainted_args


def get_selected(mutations: dict[int, Any]) -> dict[int, Any]:
    sm = get_selected_mutant()
    if sm is not None:
        return {path: val for path, val in mutations.items() if path in [MAINLINE, sm] }
    else:
        return mutations


def get_active(mutations: dict[int, Any], seen: set[int], masked: set[int]) -> dict[int, Any]:
    filtered_mutations = {path: val for path, val in mutations.items() if path in seen - masked }

    # logger.debug(f"log_path: {get_logical_path()}")
    if MAINLINE in mutations:
        filtered_mutations[MAINLINE] = mutations[MAINLINE]
    if get_logical_path() in mutations:
        filtered_mutations[get_logical_path()] = mutations[get_logical_path()]

    return filtered_mutations


def get_active_shadow(val: Any, seen: set[int], masked: set[int]) -> Union[dict[int, Any], None]:
    if isinstance(val, ShadowVariable):
        return get_active(val._shadow, seen, masked)
    else:
        return None


def t_combine_shadow(mutations: dict[int, Any]) -> Any:
    if get_logical_path() == MAINLINE:
        add_seen_mutants(set(mutations.keys()) - get_masked_mutants() - set([MAINLINE]))
        add_function_seen_mutants(set(mutations.keys()))

    evaluated_mutations = {}
    for mut, res in mutations.items():
        # Skip inactive mutants
        if (mut not in active_mutants()) and mut != MAINLINE:
            continue

        if type(res) != ShadowVariable and callable(res):
            if mut != MAINLINE:
                set_selected_mutant(mut)
            try:
                res = res()
            except ShadowExceptionStop as e:
                raise e
            except ShadowException as e:
                # Mainline value causes an exception
                if mut == MAINLINE:
                    if get_logical_path() == MAINLINE:
                        # The current path is mainline, in this case the original evaluation caused an exception.
                        # Meaning, the test suite is not green, just raise the error nothing that can be done.
                        raise e

                    # Not following mainline but mainline value fails, paths that do not have an own value in the
                    # mutation list would fail as well, mark them as killed.
                    for active_mut in active_mutants():
                        if active_mut not in mutations:
                            add_strongly_killed(active_mut)

                # Non-Mainline value causes an exception.
                else:
                    if mut not in get_strongly_killed():
                        # Just mark it as killed
                        add_strongly_killed(mut)

                continue
            except Exception as e:
                raise e
            finally:
                set_selected_mutant(None)

        evaluated_mutations[mut] = res

    res = ShadowVariable(evaluated_mutations, from_mapping=True)
    res._keep_active(get_seen_mutants(), get_masked_mutants())
    return res


def t_cond(cond: Any) -> bool:

    if isinstance(cond, ShadowVariable):
        diverging_mutants = []
        companion_mutants = []

        # get the logical path result, this is used to decide which mutations follow the logical path and which do not
        logical_result = cond._get_logical_res(get_logical_path())
        assert type(logical_result) == bool, f"{cond}"

        for path, val in cond._all_path_results(get_seen_mutants(), get_masked_mutants()):
            if path == MAINLINE or path == get_logical_path():
                continue
            assert type(val) == bool, f"{cond}"
            if val == logical_result:
                companion_mutants.append(path)
            else:
                diverging_mutants.append(path)

        # Follow the logical path, if that is not the same as mainline mark other mutations as inactive
        if diverging_mutants:
            add_masked_mutants(set(diverging_mutants))

        return logical_result

    elif type(cond) == bool:
        return cond
    
    else:
        raise ValueError(f"Unhandled t_cond type: {cond}")


def shadow_assert(cmp_result):
    if isinstance(cmp_result, ShadowVariable):
        # Do the actual assertion as would be done in the unchanged program but only for mainline execution
        if get_logical_path() == MAINLINE:
            # This assert should never fail for a green test suite
            assert cmp_result._shadow[MAINLINE] is True, f"{cmp_result}"

        for path, res in cmp_result._all_path_results(get_seen_mutants(), get_masked_mutants()):
            if path == MAINLINE and get_logical_path() != MAINLINE:
                continue
            assert type(res) == bool
            if not res: # assert fails for mutation
                add_strongly_killed(path)

    else:
        if not cmp_result is True:
            if get_logical_path() is not MAINLINE:
                # If we are not following mainline, mark all active mutants as killed
                for mut in active_mutants():
                    add_strongly_killed(mut)
            else:
                # If we are following mainline the test suite is not green
                assert cmp_result, f"Failed original assert"

#### Call Function

In [None]:
def call_maybe_cache(f, *args, **kwargs):
    if False:
        # Caching is implemented here in the library, however, it is omitted here.
        raise NotImplementedError()

    else:
        # no caching, just do it normally
        try:
            res = f(*args, **kwargs)
        except ShadowExceptionStop as e:
            raise e
        except ShadowException as e:
            raise e
        except Exception as e:
            message = f"Error: {e} {traceback.format_exc()}"
            logger.error(message)
            raise NotImplementedError(f"Exceptions in wrapped functions are not supported: {message}")

        res = ShadowVariable(res, from_mapping=False)
        res._keep_active(get_seen_mutants(), get_masked_mutants())
        return res

#### Wrapping Functions

In [None]:
def no_fork_wrap(f, *args, **kwargs):
    initial_args, initial_kwargs = copy_args(args, kwargs)
    before_logical_path = get_logical_path()
    before_masked = deepcopy(get_masked_mutants())

    remaining_paths = set([get_logical_path()])
    done_paths = set()

    for arg in chain(args, kwargs.values()):
        if isinstance(arg, ShadowVariable):
            remaining_paths |= arg._get_paths()

    remaining_paths -= before_masked
    if before_logical_path != MAINLINE and MAINLINE in remaining_paths:
        remaining_paths.remove(MAINLINE)

    tainted_return = {}
    while remaining_paths:
        if before_logical_path in remaining_paths:
            remaining_paths.remove(before_logical_path)
            next_path = before_logical_path
        else:
            next_path = remaining_paths.pop()

        set_masked_mutants((before_masked | done_paths | get_strongly_killed()) - set((next_path,)))
        set_logical_path(next_path)

        if get_logical_path() == before_logical_path:
            copied_args, copied_kwargs = args, kwargs
        else:
            copied_args, copied_kwargs = copy_args(initial_args, initial_kwargs)

        # Filter args and kwargs for currently available, they will be updated with the fork values.
        for arg in copied_args:
            if isinstance(arg, ShadowVariable):
                arg._keep_active(get_seen_mutants(), get_masked_mutants())

        for arg in copied_kwargs.values():
            if isinstance(arg, ShadowVariable):
                arg._keep_active(get_seen_mutants(), get_masked_mutants())

        try:
            res = call_maybe_cache(f, *copied_args, **copied_kwargs)
        except ShadowExceptionStop as e:
            remaining_paths -= get_strongly_killed()
            continue 
        except ShadowException as e:
            for mut in active_mutants():
                add_strongly_killed(mut)
            remaining_paths -= get_strongly_killed()
            continue 
        after_masked = deepcopy(get_masked_mutants())
        new_masked = after_masked - before_masked

        assert isinstance(res, ShadowVariable)
        shadow = res._shadow

        # Update results for the current execution.
        if get_logical_path() != MAINLINE and before_logical_path == get_logical_path() and MAINLINE in shadow:
            tainted_return[MAINLINE] = shadow[MAINLINE]

        for active_mut in active_mutants() | set([get_logical_path()]):
            assert active_mut not in tainted_return
            if active_mut in shadow:
                tainted_return[active_mut] = shadow[active_mut]
            elif MAINLINE in shadow:
                tainted_return[active_mut] = shadow[MAINLINE]
            else:
                # Do not add to done paths.
                continue
            done_paths.add(active_mut)

        # Update the args with the fork values, this is for functions that mutate the arguments.
        overwrite_main = before_logical_path == get_logical_path()
        for ii, val in enumerate(copied_args):
            arg = args[ii]
            if isinstance(arg, ShadowVariable):
                arg._maybe_overwrite(val, get_seen_mutants(), get_masked_mutants(), overwrite_main)

        for key, val in copied_kwargs.items():
            arg = kwargs[key]
            if isinstance(arg, ShadowVariable):
                arg._maybe_overwrite(val, get_seen_mutants(), get_masked_mutants(), overwrite_main)

        # Update remaining paths.
        remaining_paths |= new_masked
        remaining_paths -= (done_paths | get_strongly_killed())

    set_masked_mutants(before_masked | get_strongly_killed())

    # If there are no more active mutants and there is nothing to return just stop immediately.
    if len(tainted_return) == 0:
        assert get_logical_path() != MAINLINE
        assert len(active_mutants()) == 0
        try_next_logical_path()

    if before_logical_path == MAINLINE or before_logical_path in active_mutants():
        set_logical_path(before_logical_path)
    else:
        try_next_logical_path()

    res = ShadowVariable(tainted_return, from_mapping=True)
    return res._maybe_untaint()


def t_wrap(f):
    @wraps(f)
    def flow_wrapper(*args, **kwargs):
        return no_fork_wrap(f, *args, **kwargs)

    f._is_shadow_wrapped = True
    return flow_wrapper

#### The Functions to Provide the Functionality of the Library

In [None]:
def reinit(logical_path: Union[int, None]=None, execution_mode: Union[str, None]=None, no_atexit: bool=False) -> None:
    # initializing shadow
    reinit_path(logical_path)


def t_gather_results() -> Any:
    return t_get_killed()


def mark_current_execution_killed():
    # Current execution is crashing, mark all active mutants as strongly killed
    for mut in active_mutants():
        add_strongly_killed(mut)


def t_final_exception() -> None:
    mark_current_execution_killed()
    t_gather_results()


def t_final_exception_test() -> None:
    mark_current_execution_killed()


def t_assert(cmp_result):
    shadow_assert(cmp_result)


def t_logical_path():
    return get_logical_path()


def t_seen_mutants():
    return get_seen_mutants()


def t_masked_mutants():
    return get_masked_mutants()


def t_active_mutants():
    return active_mutants()


def untaint(obj):
    if hasattr(obj, '_shadow'):
        return obj._shadow[MAINLINE]
    return obj


def get_ns_active(mutations, active, masked):
    if active is not None:
        filtered_mutations = { path: val for path, val in mutations.items() if path in active }
    else:
        filtered_mutations = { path: val for path, val in mutations.items() if path not in masked }

    # logger.debug(f"log_path: {get_logical_path()}")
    filtered_mutations[MAINLINE] = mutations[MAINLINE]
    if get_logical_path() in mutations:
        filtered_mutations[get_logical_path()] = mutations[get_logical_path()]

    return filtered_mutations


def t_combine(mutations: dict[int, Any]) -> Any:
    return t_combine_shadow(mutations)


def t_sv(var: Any) -> ShadowVariable:
    return ShadowVariable(var, from_mapping=False)


# Init when importing shadow
reinit()


# Usage of The Library

Define some functions to make it easy to compare which mutations have been killed.

In [None]:
def gen_killed(strong):
    return {
        'strong': set(strong),
    }


def get_killed():
    results = t_get_killed()
    return {
        'strong': set(results['strong']),
    }

The basic component is a tainted variable (called a `ShadowVariable` internally), it combines the different values caused by mutations into one variable.

In [None]:
print("To implement the library transparently global state needs to be kept.")
print("Using the reinit function this global state can be reset.")
print("The 'shadow' execution mode enables the non-forking, non-memoizing execution mode for the library.")
print("Atexit is needed to correctly gather results on the end of programs, this is not needed here.")
reinit(execution_mode='shadow', no_atexit=True)

print()
print("The basic component is a tainted variable, let us create a simple one:")
tainted_int = t_combine({0: 0, 1: 1})
print(tainted_int)

print()
print("Assert that the expected value is zero, this is true for mainline (0) but not for mutation 1.")
t_assert(tainted_int == 0)

print()
print("This results in mutation 1 being marked as strongly killed:")
killed = get_killed()
print(killed)

assert killed == gen_killed({1})

Variables can also be simply wrapped adding only the mainline.

In [None]:
reinit(execution_mode='shadow', no_atexit=True)

print()
print("Wrapping a variable:")
tainted_int = t_sv(0)
print(tainted_int)
tainted_list = t_sv([])
print(tainted_list)

Functions need to be wrapped to correctly handle execution.

`@t_wrap` provides all functionality needed to wrap a function.

To correctly handle control flow we need to wrap conditionals, this functionality is provided by `t_cond`.
If the boolean values differ the execution will be forked in the forking version and re-execution of the function
is done in the non-forking version (which is used in this notebook).

In [None]:
@t_wrap
def add_to_list(data, val):
    ii = 0
    while t_cond(ii < val):
        data.append(ii)
        ii += 1
    print(f"Following mutation {t_logical_path()}, data at end of execution: {data}")

In the next example create a list and update it using the newly defined function.

In [None]:
reinit(execution_mode='shadow', no_atexit=True)

print("Initialize variables")
tainted_int = t_combine({0: 0, 1: 1, 2: 2})
tainted_list = t_sv([])
print(tainted_int)
print(tainted_list)

print()
print("Execute function:")
add_to_list(tainted_list, tainted_int)
print("The results are merged in the wrapper and list is updated:")
print(tainted_list)

Another more complex example illustrating the execution order for nested functions.

The number after the function name is the currently followed path. The inner function is re-executed until all currently active mutations are evaluated.

In [None]:
@t_wrap
def inner(tainted_int):
    print(f"inner {t_logical_path()}:                     {tainted_int}")
    if t_cond(tainted_int == 1):
        print(f"inner {t_logical_path()} is equal 1:          {tainted_int}")
        tainted_int -= 1
    else:
        print(f"inner {t_logical_path()} is not equal 1:      {tainted_int}")
        tainted_int += 1
    print(f"inner {t_logical_path()} res:                 {tainted_int}")
    return tainted_int

@t_wrap
def func(tainted_int):
    print(f"func  {t_logical_path()}:                     {tainted_int}")
    if t_cond(tainted_int <= 1):
        print(f"func  {t_logical_path()} is less equal 1:     {tainted_int}")
        tainted_int += 1
        tainted_int = inner(tainted_int)
    else:
        print(f"func  {t_logical_path()} is not less equal 1: {tainted_int}")
        tainted_int -= 1
        tainted_int = inner(tainted_int)
    print(f"func  {t_logical_path()} res:                 {tainted_int}")
    return tainted_int

reinit(execution_mode="shadow", no_atexit=True)
var = t_combine({0: 0, 1: 1, 2: 2, 3: 3})
res = func(var)
print(f"Final result: {res}")
t_assert(res == 0)
assert get_killed() == gen_killed([1, 3])

The prototype also supports user defined classes. This is supported by the `t_class` function wrapper, this transparently wraps attributes and method calls to created objects.

Currently the objects are duplicated for evaluation of further mutants, while computationally expensive this eases the implementation effort.
Performance can be improved for example by using a copy on write mechanism and copying fields individually.

In [None]:
@t_class
class BankAccount:
    balance: int
    overdrawn: bool

    def __init__(self, initial_balance: int):
        self.balance = t_combine({(0): lambda : initial_balance, (1): lambda : initial_balance != 1, (2): lambda : initial_balance + 1, (3): lambda : initial_balance * 2})
        self.overdrawn = t_combine({(0): lambda : False})
        self.update_overdrawn()

    def __repr__(self):
        return f"BankAccount(balance={self.balance}, overdrawn={self.overdrawn})"

    def update_overdrawn(self) ->None:
        if t_cond(self.balance >= 0):
            self.overdrawn = t_combine({(0): lambda : False})
        else:
            self.overdrawn = t_combine({(0): lambda : True, (10): lambda : True != 1})

    def deposit(self, amount: int) ->None:
        self.balance = t_combine({(0): lambda : self.balance + amount, (13): lambda : self.balance - amount, (14): lambda : self.balance * amount, (16): lambda : self.balance % amount, (17): lambda : self.balance << amount, (18): lambda : self.balance >> amount, (19): lambda : self.balance | amount, (20): lambda : self.balance ^ amount, (21): lambda : self.balance & amount, (22): lambda : self.balance // amount})
        self.update_overdrawn()

    def withdraw(self, amount: int) ->None:
        self.balance = t_combine({(0): lambda : self.balance - amount, (23): lambda : self.balance + amount, (24): lambda : self.balance * amount, (26): lambda : self.balance % amount, (27): lambda : self.balance << amount, (28): lambda : self.balance >> amount, (29): lambda : self.balance | amount, (30): lambda : self.balance ^ amount, (31): lambda : self.balance & amount, (32): lambda : self.balance // amount})
        self.update_overdrawn()

    def is_overdrawn(self) ->bool:
        return self.overdrawn


@t_wrap
def bank_example() ->None:
    my_account = BankAccount(10)
    t_assert(my_account.balance == 10)
    t_assert(my_account.overdrawn == False)
    print(my_account)
    print()

    my_account.deposit(5)
    t_assert(my_account.balance == 15)
    t_assert(my_account.overdrawn == False)
    print(my_account)
    print()

    my_account.withdraw(200)
    t_assert(my_account.balance == -185)
    t_assert(my_account.overdrawn == True)
    print(my_account)


reinit(execution_mode="shadow", no_atexit=True)
bank_example()
assert get_killed() == gen_killed([ 1, 2, 3, 10, 13, 14, 16, 17, 18, 21, 22, 23, 24, 26, 27, 28, 29, 30, 31, 32])