In [1]:
import tempfile
from subprocess import run
from pathlib import Path
from collections import defaultdict
import re
import pydot
import networkx as nx


def create_cfg_dots(bc_path, out_dir):
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    run(["opt", "-passes=dot-cfg", "-debug-pass-manager", str(bc_path), "-S", "-o", "bitcode.ll"], cwd=out_dir)


def get_dot_file_graph(path):
    graphs = pydot.graph_from_dot_file(path)
    return graphs[0]

def label_lines(label):
    return [
        re.sub(' #[0-9]+,?', '', ll)
        for ll 
        in label[1:-1].replace('\\l...', '').replace("\\l", "\n").replace("\\<", "<").replace("\\>", ">").splitlines()
    ]
    

def add_function_cfg(cfg_graph, path):
    graph_dot = get_dot_file_graph(path)

    nodes = graph_dot.get_nodes()
    edges = graph_dot.get_edges()
    name = graph_dot.get_name()
    type_ = graph_dot.get_type()

    function_name = name.split("'")[1]
    # print(function_name)
    assert type_ == "digraph", type_

    new_nodes = []
    for ii, nn in enumerate(nodes):
        if nn.get_name() == '"\\n"':
            continue

        nn_label = nn.get_label()
        bb_label = label_lines(nn_label)[0][1:-1]
        bb_lines = label_lines(nn_label)[1:-1]

        if ii == 0:
            bb_node_name = f"{function_name}"
            bb_entry_name = f"{function_name}-{bb_label}"
        else:
            bb_node_name = f"{function_name}-{bb_label}"

        for ll in bb_lines:
            if '#' in ll:
                print(ll)
            cfg_graph.graph['instr'][function_name][ll] = bb_node_name
        new_nodes.append((bb_node_name, {'bb_lines': bb_lines}))

    cfg_graph.add_nodes_from(new_nodes)

    new_edges = []
    for ee in edges:
        source = ee.get_source()
        destination = ee.get_destination()

        source_node = graph_dot.get_node(source.split(":")[0])[0]
        dest_node = graph_dot.get_node(destination.split(":")[0])[0]

        source_label = label_lines(source_node.get_label())[0][1:-1]
        dest_label = label_lines(dest_node.get_label())[0][1:-1]

        source_label = f"{function_name}-{source_label}"
        if source_label == bb_entry_name:
            source_label = f"{function_name}"
        dest_label = f"{function_name}-{dest_label}"
        if dest_label == bb_entry_name:
            dest_label = f"{function_name}"

        new_edges.append((source_label, dest_label))

    cfg_graph.add_edges_from(new_edges)


def create_initial_graph(path):
    cfg_graph = nx.DiGraph()
    cfg_graph.graph['instr'] = defaultdict(dict)
    cfg_graph.graph['node_to_muts'] = defaultdict(list)
    cfg_graph.graph['mut_to_node'] = {}

    for pp in Path(path).glob("*.dot"):
        add_function_cfg(cfg_graph, pp)
    return cfg_graph


def add_function_call_edges(cfg_graph):
    for nn in cfg_graph.nodes:
        for ll in cfg_graph.nodes[nn]['bb_lines']:
            if " call " in ll:
                # static calls
                if mm := re.match(".* call .*@(\S+)\(.*", ll):
                    func_name = mm.group(1)
                    if func_name.startswith("llvm."):
                        continue

                    try:
                        called_node = cfg_graph.nodes[func_name]
                        cfg_graph.add_edge(nn, func_name)
                        # print(f"found {func_name}")
                    except KeyError:
                        pass
                        # print(f"not found {func_name}")
                elif mm := re.match(".*call.*%(\S+)\(.*", ll): # dynamic calls
                    pass # TODO
                else: # something is wrong
                    print("#", mm, nn, ll)
                    assert False


def load_mutations(cfg_graph, mutations):
    for rr in mutations:
        mutation_id, funname, instr = rr
        instr = re.sub(' #[0-9]+,?', '', instr)
        if instr not in cfg_graph.graph['instr'][funname]:
            for kk, aa in cfg_graph.graph['instr'][funname].items():
                print(kk)
            raise ValueError()
        node = cfg_graph.graph['instr'][funname][instr]
        cfg_graph.graph['node_to_muts'][node].append(mutation_id)
        cfg_graph.graph['mut_to_node'][mutation_id] = node


def is_reachable(tc_cfg_graph, cur_supermutant, candidate):
    c_node = tc_cfg_graph.graph['mut_to_node'][candidate]
    for cs in cur_supermutant:
        cs_node = tc_cfg_graph.graph['mut_to_node'][cs]
        if c_node == cs_node or \
            tc_cfg_graph.has_edge(c_node, cs_node) or \
            tc_cfg_graph.has_edge(cs_node, c_node):
            return True
    return False


def get_reachable_mutants(tc_cfg_graph, entry_node):
    # Get all mutations that are reachable from the entry node.
    reachable_muts = []
    nodes_with_muts = set()
    for nn in tc_cfg_graph.out_edges(entry_node):
        mut_node = nn[1]
        muts = tc_cfg_graph.graph['node_to_muts'][mut_node]
        # print(mut_node, muts)
        reachable_muts.extend(muts)
        if len(muts) > 0:
            nodes_with_muts.add(mut_node)
    
    return reachable_muts


def get_supermutants(tc_cfg_graph, reachable_muts):
    # Go through all mutants and get those that can be combined into supermutants.
    muts_todo = reachable_muts.copy()
    supermutants = []
    while len(muts_todo) > 0:
        # Start with a mutation and go through all other mutations to see if they are not reachable.
        # If they are not reachable add them to the supermutant.
        cur_supermutant = [muts_todo.pop()]
        for candidate in muts_todo:
            if not is_reachable(tc_cfg_graph, cur_supermutant, candidate):
                cur_supermutant.append(candidate)

        # Remove all mutations in the supermutant from the todo muts, they are done.
        for cs in cur_supermutant:
            try:
                muts_todo.remove(cs)
            except:
                pass
        
        supermutants.append(cur_supermutant)
    return supermutants

In [46]:
import cfg_supermutants
entry_node = 'LLVMFuzzerTestOneInput'
cfg_base_dir = Path('tmp/cfgs')

with open(mutation_locations_graph_path(prog_info), 'rt') as f:
    call_graph = json.load(f)

call_info = indirect_call_info(call_graph)

muts = [
    (mm[0], mm[3][mm[0]]['funname'], mm[3][mm[0]]['instr'])
    for mm in mutations
]

cfg_base_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(dir=cfg_base_dir) as tmp_dir:
    with start_mutation_container(None, None) as container:
        bc_path_in_container = Path("/home/mutator", prog_info['orig_bc'])
        tmp_dir_in_container = Path("/home/mutator/tmp/cfgs", Path(tmp_dir).name)
        run_exec_in_container(
            container, raise_on_error=True,
            cmd=["opt", "-passes=dot-cfg", "-debug-pass-manager", str(bc_path_in_container), "-S"],
            exec_args=['--workdir', str(tmp_dir_in_container)],
        )
        cfg_graph = cfg_supermutants.create_initial_graph(tmp_dir)
print(cfg_graph)

cfg_supermutants.add_function_call_edges(cfg_graph, call_info)
print(cfg_graph)

cfg_supermutants.load_mutations(cfg_graph, muts)

tc_cfg_graph = cfg_supermutants.transitive_closure(cfg_graph)
print(tc_cfg_graph)

reachable_muts = cfg_supermutants.get_reachable_mutants(tc_cfg_graph, entry_node)
supermutants = cfg_supermutants.get_supermutants(tc_cfg_graph, reachable_muts)

print(supermutants)

print(f"Generated {len(supermutants)} supermutants out of {len(reachable_muts)} reachable mutants ", end='')
print(f"a reduction of {len(reachable_muts) / len(supermutants)}. Failed muts: {len(tc_cfg_graph.graph['mut_failed'])}.")

return supermutants, None

Running pass: VerifierPass on [module]
Running analysis: VerifierAnalysis on [module]
Running analysis: InnerAnalysisManagerProxy<AnalysisManager<Function>, Module> on [module]
Running pass: CFGPrinterPass on LLVMFuzzerTestOneInput
Running analysis: BlockFrequencyAnalysis on LLVMFuzzerTestOneInput
Running analysis: LoopAnalysis on LLVMFuzzerTestOneInput
Running analysis: DominatorTreeAnalysis on LLVMFuzzerTestOneInput
Running analysis: BranchProbabilityAnalysis on LLVMFuzzerTestOneInput
Running analysis: PostDominatorTreeAnalysis on LLVMFuzzerTestOneInput
Running analysis: TargetLibraryAnalysis on LLVMFuzzerTestOneInput
Writing '.LLVMFuzzerTestOneInput.dot'...
Running pass: CFGPrinterPass on default_malloc
Running analysis: BlockFrequencyAnalysis on default_malloc
Running analysis: LoopAnalysis on default_malloc
Running analysis: DominatorTreeAnalysis on default_malloc
Running analysis: BranchProbabilityAnalysis on default_malloc
Running analysis: PostDominatorTreeAnalysis on default_m

In [2]:
entry_node = 'LLVMFuzzerTestOneInput'

# create_cfg_dots('/home/philipp/Sync/git/llvm-mutation-tool/tmp/llvm-graph/cares_name.bc', '/home/philipp/Sync/git/llvm-mutation-tool/tmp/llvm-graph')

cfg_graph = create_initial_graph('/home/philipp/Sync/git/llvm-mutation-tool/tmp/llvm-graph')
print(cfg_graph)

backup_cfg_graph = cfg_graph.copy()

# add_function_call_edges(cfg_graph)
# print(cfg_graph)

# import sqlite3
# c = sqlite3.connect("data/dev/stats_all.db")
# mutations = c.execute("select mutation_id, funname, instr from mutations")
# load_mutations(cfg_graph, mutations)

# tc_cfg_graph = nx.transitive_closure(cfg_graph, reflexive=None)
# reachable_muts = get_reachable_mutants(tc_cfg_graph, entry_node)
# supermutants = get_supermutants(tc_cfg_graph, reachable_muts)

# print(supermutants)

# print(f"Generated {len(supermutants)} supermutants out of {len(reachable_muts)} reachable mutants")
# print(f"reduction of {len(reachable_muts) / len(supermutants)}.")


DiGraph with 2684 nodes and 4253 edges


In [71]:

def add_function_call_edges(cfg_graph):
    for nn in cfg_graph.nodes:
        for ll in cfg_graph.nodes[nn]['bb_lines']:
            if " call " in ll:
                # static calls
                if mm := re.match(".* call .*@(\S+)\(.*", ll):
                    func_name = mm.group(1)
                    if func_name.startswith("llvm."):
                        continue

                    try:
                        called_node = cfg_graph.nodes[func_name]
                        cfg_graph.add_edge(nn, func_name)
                        # print(f"found {func_name}")
                    except KeyError:
                        pass
                        # print(f"not found {func_name}")
                elif mm := re.match(".*call (\S+) .*%(?:\S+)\((.+)\)", ll): # dynamic calls
                    return_type = mm.group(1)
                    args = mm.group(2)
                    print(ll)
                    func_args = [return_type] + [aa.strip().split(' ')[0] for aa in args.split(',')]
                    print(func_args)
                else: # something is wrong
                    print("#", mm, nn, ll)
                    assert False

cfg_graph = backup_cfg_graph.copy()

add_function_call_edges(cfg_graph)

  call void %288(i8* %291, i32 %285, i32 %292, %struct.hostent* %284) !dbg !1978
['void', 'i8*', 'i32', 'i32', '%struct.hostent*']
  call void %296(i8* %299) !dbg !1983
['void', 'i8*']
  call void %300(i8* nonnull %0) !dbg !1985
['void', 'i8*']
  tail call void %318(i8* %321, i32 16, i32 %18, %struct.hostent* null) !dbg !2004
['void', 'i8*', 'i32', 'i32', '%struct.hostent*']
  tail call void %322(i8* %325) !dbg !2005
['void', 'i8*']
  tail call void %326(i8* nonnull %0) !dbg !2007
['void', 'i8*']
  %3 = tail call i8* %2(i64 40) !dbg !1643
['i8*', 'i64']
  call void %166(i8* %168, i32 0, i32 %170, %struct.hostent* %164) !dbg !1905
['void', 'i8*', 'i32', 'i32', '%struct.hostent*']
  call void %174(i8* %175) !dbg !1910
['void', 'i8*']
  call void %182(i8* %184, i32 4, i32 %186, %struct.hostent* null) !dbg !1921
['void', 'i8*', 'i32', 'i32', '%struct.hostent*']
  call void %187(i8* %188) !dbg !1922
['void', 'i8*']
  tail call void %7(i8* %9) !dbg !1658
['void', 'i8*']
  tail call void %10(