# NAC coloring search

In this notebook we provide utils to run benchmarks and experiment with our code.

In the first section we start with utility functions, in the second part we load/generate benchmark data. After we run individual benchmarks on selected graph classes with selected algorithms. The algorithms are described in that section.

```bash
tensorboard --logdir benchmarks/logs/nac
```

In [None]:
from typing import *
from collections import defaultdict
import random
from random import Random
from enum import Enum

import matplotlib.pyplot as plt
import matplotlib_inline.backend_inline as backend_inline
from matplotlib.backends import backend_agg
from matplotlib.figure import Figure

import networkx as nx
import tensorflow as tf # unsed only for tensorboard
import os
import time
import datetime
import signal
import itertools

from tqdm import tqdm

import nac as nac
from benchmarks import dataset as dataset

seed=42
TEST=False

# Preparation

In [None]:
# https://stackoverflow.com/a/75898999
from typing import Callable, TypeVar, ParamSpec

P = ParamSpec("P")
T = TypeVar("T")

def copy_doc(wrapper: Callable[P, T]):
    """An implementation of functools.wraps."""

    def decorator(func: Callable) -> Callable[P, T]:
        func.__doc__ = wrapper.__doc__
        return func

    return decorator

In [None]:
@copy_doc(plt.figure)
def figure(num: Any = 1, *args, **kwargs) -> Figure:
    """Creates a figure that is independent on the global plt state"""
    fig = Figure(*args, **kwargs)
    def show():
        manager = backend_agg.new_figure_manager_given_figure(num, fig)
        display(
            manager.canvas.figure,
            metadata=backend_inline._fetch_figure_metadata(manager.canvas.figure),
        )
        manager.destroy()
    fig.show = show
    return fig

In [None]:

def avg(l: List[int]) -> int:
    if len(l) == 0:
        return 0
    return sum(l) // len(l)

In [None]:
class SizeType(Enum):
    VertexNo = "Vertex no."
    MonoClassNo = "Monochromatic classes no."

SIZE_TYPES = [
    SizeType.VertexNo,
    SizeType.MonoClassNo,
]

In [None]:
def run_benchmarks[T](
    function: Callable[[T, int], None],
    params: List[T],
    rounds: int,
    after_benchmark: Callable[[T, int], None] = lambda x, y: None,
    limit_sec:int|None=15,
    seed: int|None = 42,
) -> Dict[T, int]:
    """
    Runs the given function multiple times for each parameter
    and measures the run time.
    Returns a dictionary mapping from the param set to mean runtime in miliseconds.
    """
    if seed is None:
        seed = random.randint(0, 2*32)

    def measure(param: T, seed: int) -> int:
        start = time.time()
        function(param, seed)
        end = time.time()
        return int((end - start) * 1000)

    results: Dict[T, int] = {}
    for param in tqdm(params):
        try:
            # signals are not exact, but generally work
            if (limit_sec):
                def timeout_handler(signum, frame):
                    raise TimeoutError("Benchmark timeout")
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(limit_sec * rounds)

            rand = Random(seed)
            mean = sum(measure(param, rand.randint(0, 2*32)) for _ in range(rounds)) // rounds

            if (limit_sec):
                signal.alarm(0)
        except TimeoutError:
            mean = limit_sec*1000

        results[param] = mean
        after_benchmark(param, mean)

    return results

if TEST:
    display(run_benchmarks(lambda x, y: sum(range(x)), [42**4], after_benchmark=lambda x, y: print(x, y), rounds=2))
    # display(run_benchmarks(lambda x: sum(range(x)), [42**5], rounds=2, limit_sec=1))

In [None]:
def plot_results(
    groups: List[Tuple[str, List[int]]],
    labels: List[str|int],
    title: str = "",
    x_label: str = "Size",
) -> Figure:
    """
    Takes groups of datapoints from a benchmark.
    The items in the same group share the same color and line.
    Each group has a name and list of datapoints
    The second parameter gives the labels of the x axes.
    The other parameters are selfexplanatory.
    The function should be also able to accept pandas/numpy arrays.
    """
    fig = figure()

    ax = fig.add_subplot()
    ax.set_title(title)
    ax.set_xlabel(x_label)
    ax.set_ylabel("Time (ms)")
    ax.set_xticks(range(len(labels)))
    ax.set_xticklabels(labels)

    for group in groups:
        ax.plot(group[1], label=group[0])

    ax.legend()

    return fig

if TEST:
    display(plot_results(
        groups = [
            ("Group 1", [10, 20, 25, 28]),
            ("Group 2", [15, 10, 20]),
        ],
        labels = ['A', 'B', 'C', 'D'],
        title="Test",
        x_label="Size |V|",
    ))

In [None]:
class TensorBoardLogger:
    """
    Stores test results into tensor board
    """

    PATH = os.path.join("benchmarks","logs")

    def __init__(
            self,
            test_name: str,
            variation: str,
            size_type: SizeType,
        ):
        self.test_name = test_name
        self.variation = variation
        self.size_type = size_type

        # Use regex and filesystem safe characters!
        current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        test_name = test_name.replace(" ", "_")
        match size_type:
            case SizeType.VertexNo:
                size_dir = "vert"
            case SizeType.MonoClassNo:
                size_dir = "monc"
        self._log_dir = os.path.join(
            TensorBoardLogger.PATH,
            "nac",
            test_name,
            current_time,
            size_dir,
            variation,
        )
        self.writers: Dict[str, tf.summary.SummaryWriter] = {}

    def _get_writter(self, group_name: str|int):
        if group_name in self.writers:
            return self.writers[group_name]
        writer = tf.summary.create_file_writer(os.path.join(self._log_dir, str(group_name)))
        self.writers[group_name] = writer
        return writer

    def log(self, group_name: str, size: int, time: int):
        writer = self._get_writter(group_name)
        with writer.as_default():
            tf.summary.scalar(f"Search time ({self.test_name} {self.variation}) [{self.size_type.value}]", time, step=size)
        writer.flush()

if TEST:
    logger = TensorBoardLogger("test")
    for i in range(5):
        logger.log("Test 1", i, (i+1)*3)
        logger.log("Test 2", i, (i+1)*2)

In [None]:
def nac_benchmark_core(
    test_name: str,
    size_type: SizeType,
    coloring_no_limit: int | None,
    rounds: int,
    relabel_strategies: List[str],
    split_strategies: List[str],
    merge_strategies: List[str],
    subgraph_sizes: List[int],
    dataset: Dict[int, List[nx.Graph]],
    verbose: bool = True,
    seed: int | None = 42,
) -> List[Dict[str, Dict[int, List[int]]]]:
    """
    Runs benchmarks for NAC coloring search
    Returns results grouped by relabel, split, merge and subgraph size strategies
    """
    if coloring_no_limit is None:
        coloring_no_limit = 2**30

    baseLogger = TensorBoardLogger(test_name, "combined", size_type)
    loggers = [
        TensorBoardLogger(test_name, "relabel", size_type),
        TensorBoardLogger(test_name, "split", size_type),
        TensorBoardLogger(test_name, "merge", size_type),
        TensorBoardLogger(test_name, "subgraph", size_type),
    ]

    # for each strategy holds a dictionary mapping from graph size to list of results
    results: List[Dict[str, Dict[int, List[int]]]] = [
        defaultdict(lambda: defaultdict(list)) for _ in range(4)
    ]

    for graph_size, graphs in dataset.items():
        if verbose:
            print(f"Starting with a new graphs size {graph_size} ({len(graphs)})")

        def algo_name(param: Tuple[str, str, str, int]) -> str:
            _, split, merge, subgraph = param
            return "subgraphs-{}-{}-{}-smart".format(
                merge, split, subgraph
            )

        def find_colorings(param: Tuple[str, str, str, int], seed: int):
            relabel, split, merge, subgraph = param
            for graph in graphs:
                # print(f"Search start: {relabel}-{algo_name(param)}")
                for coloring, _ in zip(
                    nac.NAC_colorings(
                        graph=graph,
                        algorithm=algo_name(param),
                        relabel_strategy=relabel,
                        seed=seed,
                    ),
                    range(coloring_no_limit),
                ): pass

        def callback(param: Tuple[str, str, str, int], time: int):
            baseLogger.log(param[0] + '_' + algo_name(param), graph_size, time)

        params = list(itertools.product(
            relabel_strategies, split_strategies, merge_strategies, subgraph_sizes
        ))
        param_to_time = run_benchmarks(
            function=find_colorings,
            params=params,
            rounds=rounds,
            after_benchmark=callback,
            seed=seed,
        )

        times_dicts = [defaultdict(list) for _ in range(4)]
        for param, time in param_to_time.items():
            for name, dest, times_dict in zip(param, results, times_dicts):
                dest[name][graph_size].append(time)
                times_dict[name].append(time)
        for times_dic, logger in zip(times_dicts, loggers):
            for name, times in times_dic.items():
                logger.log(name, graph_size, avg(times))

    return results

if TEST:
    test_bench_result = nac_benchmark_core(
        test_name="Test run",
        size_type=SizeType.VertexNo,
        coloring_no_limit=None,
        rounds=2,
        relabel_strategies=["random"],
        split_strategies= ["none", "neighbors"],
        merge_strategies= ["linear"],
        subgraph_sizes = [4],
        dataset= {
            20: [nx.Graph([(1, 2), (2, 3)])],
            22: [nx.Graph([(1, 2), (2, 3), (3, 4)])],
        },
    )

In [None]:
def _transform_for_plotting(data: Dict[str, Dict[int, List[int]]]) -> Tuple[List[Tuple[str, List[int]]], List[str|int]]:
    res: List[Tuple[str, List[int]]] = []
    sizes: List[str|int] = []
    for name, measurements_for_sizes in data.items():
        if len(measurements_for_sizes) > len(sizes):
            sizes = [size for size, _ in measurements_for_sizes.items()]

        res.append(
            (
                name,
                [y for _, y in sorted(
                    (size, avg(measurement)) for size, measurement in measurements_for_sizes.items()
                )],
            )
        )
    return res, sizes

def plot_benchmark_results(
    bench_result: List[Dict[str, Dict[int, List[int]]]],
    size_type: SizeType,
):
    for name, cathegory in zip(["Relabel", "Split", "Merge", "Subgraph_size"], bench_result):
        transformed, labels = _transform_for_plotting(cathegory)
        if len(transformed) <= 1:
            continue
        display(plot_results(transformed, labels=labels, title=f"{name} ({size_type.value})", x_label=size_type.value))

if TEST:
    plot_benchmark_results(test_bench_result, SizeType.VertexNo)

# Loading data

In [None]:
def group_by_vertex_no[T: nx.Graph](graphs: Iterable[T]) -> Dict[int, List[T]]:
    res: defaultdict[int, List[T]] = defaultdict(list)
    for graph in graphs:
        res[nx.number_of_nodes(graph)].append(graph)
    return res


def group_by_monochomatic_classes_no[T: nx.Graph ](graphs: Iterable[T],) -> Dict[int, List[T]]:
    res: defaultdict[int, List[T]] = defaultdict(list)
    for graph in graphs:
        classes = len(nac.find_triangle_components(graph)[1])
        res[classes].append(graph)
    return res

def group_dataset[T: nx.Graph](dataset: List[T]) -> Tuple[Dict[int, List[T]], Dict[int, List[T]]]:
    return group_by_vertex_no(dataset), group_by_monochomatic_classes_no(dataset)


In [None]:
class Graphs:
    laman = group_dataset(list(dataset.load_laman_graphs()))
    laman_deg_3_plus = group_dataset(list(dataset.load_laman_degree_3_plus()))
    no_3_nor_4_cycles = group_dataset(dataset.load_no_3_nor_4_cycle_graphs())
    sparse_graphs = group_dataset(dataset.generate_sparse_graphs(30, 40))


In [None]:
def print_graph_datasets():
    def sumarize(title: str, classes: Tuple[Dict[int, List[nx.Graph]], Dict[int, List[nx.Graph]]]):
        print(title)
        print(sorted([(y, len(x)) for y, x in classes[0].items()]),)
        print(sorted([(y, len(x)) for y, x in classes[1].items()]),)
        print()

    sumarize(
        "Laman",
        Graphs.laman,
    )
    sumarize(
        "Laman deg 3+",
        Graphs.laman_deg_3_plus,
    )
    sumarize(
        "No 3 nor 4 cycles",
        Graphs.no_3_nor_4_cycles,
    )
    sumarize(
        "Sparse",
        Graphs.sparse_graphs,
    )
print_graph_datasets()

In [None]:
def get_group(groups: Dict[int, List[nx.Graph]], group_id: int) -> Dict[int, List[nx.Graph]]:
    return {group_id: groups[group_id]}

# Benchmarks, Experiments

In [None]:
class Promissing:
    RELABELING = [
        "none",
        "random",
        "bfs",
    ]
    SPLITTING = [
        "none",
        "neighbors",
        "neighbors_degree",
    ]
    MERGING_OFFLINE = [
        "linear",
        "log",
        "score",
        "shared_vertices"
    ]
    MERGING_ONLINE = [
        "linear",
        "log",
        "shared_vertices"
    ]
    SIZES = [6]

def run_promissing_core(
    test_name: str,
    rounds: int,
    coloring_no_limit: int|None,
    group_limits: Tuple[List[Tuple[int, int, int]], List[Tuple[int, int, int]]],
    dataset: Tuple[Dict[int, List[nx.Graph]], Dict[int, List[nx.Graph]]],
    relabel_strategies:List[str],
    split_strategies:List[str],
    merge_strategies:List[str],
    subgraph_sizes:List[int],
    verbose: bool,
):
    res = []
    for limit, right_dataset, size_type in zip(group_limits, dataset, SIZE_TYPES):
        if verbose:
            print(f"Starting search ({size_type.value})")
        updated_dataset: Dict[int, List[nx.Graph]] = defaultdict(list)
        for size_start, size_end, graph_no in limit:
            for size in range(size_start, size_end+1):
                current = right_dataset[size]
                if len(current) == 0 or len(current) < graph_no:
                    continue
                updated_dataset[size] = current[:graph_no]

        results = nac_benchmark_core(
            test_name=test_name,
            size_type=size_type,
            coloring_no_limit=coloring_no_limit,
            relabel_strategies=relabel_strategies,
            split_strategies=split_strategies,
            merge_strategies=merge_strategies,
            subgraph_sizes=subgraph_sizes,
            dataset=updated_dataset,
            rounds=rounds,
            verbose=verbose,
        )
        res.append(results)
    for results, type in zip(res, SIZE_TYPES):
        plot_benchmark_results(results, type)

    return tuple(res)

def run_promissing_all(
    test_name: str,
    rounds: int,
    group_limits: Tuple[List[Tuple[int, int, int]], List[Tuple[int, int, int]]],
    dataset: Tuple[Dict[int, List[nx.Graph]], Dict[int, List[nx.Graph]]],
    relabel_strategies:List[str]=Promissing.RELABELING,
    split_strategies:List[str]=Promissing.SPLITTING,
    merge_strategies:List[str]=Promissing.MERGING_OFFLINE,
    subgraph_sizes:List[int]=Promissing.SIZES,
    verbose: bool = True,
):
    return run_promissing_core(
        test_name=test_name,
        rounds=rounds,
        coloring_no_limit=None,
        group_limits=group_limits,
        dataset=dataset,
        relabel_strategies=relabel_strategies,
        split_strategies=split_strategies,
        merge_strategies=merge_strategies,
        subgraph_sizes=subgraph_sizes,
        verbose=verbose,
    )

def run_promissing_single(
    test_name: str,
    rounds: int,
    group_limits: Tuple[List[Tuple[int, int, int]], List[Tuple[int, int, int]]],
    dataset: Tuple[Dict[int, List[nx.Graph]], Dict[int, List[nx.Graph]]],
    relabel_strategies:List[str]=Promissing.RELABELING,
    split_strategies:List[str]=Promissing.SPLITTING,
    merge_strategies:List[str]=Promissing.MERGING_ONLINE,
    subgraph_sizes:List[int]=Promissing.SIZES,
    verbose: bool = True,
):
    return run_promissing_core(
        test_name=test_name,
        rounds=rounds,
        coloring_no_limit=1,
        group_limits=group_limits,
        dataset=dataset,
        relabel_strategies=relabel_strategies,
        split_strategies=split_strategies,
        merge_strategies=merge_strategies,
        subgraph_sizes=subgraph_sizes,
        verbose=verbose,
    )


## Laman

### Laman all colorings

In [None]:
if TEST:
    test_search_res = run_promissing_all(
        test_name="Test",
        rounds=1,
        group_limits=(
            [(10, 13, 8)],
            [(8, 11, 8)],
        ),
        dataset=Graphs.laman,
    )
if TEST:
    test_search_res = run_promissing_single(
        test_name="Test",
        rounds=3,
        group_limits=(
            [(10, 13, 8)],
            [(8, 11, 8)],
        ),
        dataset=Graphs.laman,
    )

In [None]:
laman_res = run_promissing_all(
    test_name="Laman all",
    rounds=2,
    group_limits=(
        [(12, 17, 20)],
        [(10, 14, 20)],
    ),
    dataset=Graphs.laman,
)

In [None]:
laman_res2 = run_promissing_all(
    test_name="Laman all",
    rounds=2,
    group_limits=(
        [(17, 20, 5)],
        [(14, 18, 5)],
    ),
    dataset=Graphs.laman,
)

In [None]:
laman_res = run_promissing_single(
    test_name="Laman single",
    rounds=3,
    group_limits=(
        [(28, 30, 24)],
        [(50, 60, 11)],
    ),
    dataset=Graphs.laman,
)