Implementing an Algorithm in Katana Python
==========================================

In [None]:
import numpy as np
import timeit

from katana.local import InsertBag, Graph
from katana import do_all, do_all_operator

import katana.local
katana.local.initialize()

In [2]:
# Constants
INFINITY = 2**31-1

In [3]:
def bfs(graph: Graph, source):
    """
    Compute the BFS distance to all nodes from source.

    The algorithm in bulk-synchronous level by level.

    :param graph: The graph to use.
    :param source: The source node for the traversal.
    :return: An array of distances, indexed by node ID.
    """
    next_level_number = 0

    # The work lists for the current and next levels using a Katana concurrent data structure.
    curr_level_worklist = InsertBag[np.uint32]()
    next_level_worklist = InsertBag[np.uint32]()

    # Create an initialize the distance array. source is 0, everywhere else is INFINITY
    distance = np.empty((graph.num_nodes(),), dtype=np.uint32)
    distance[:] = INFINITY
    distance[source] = 0

    # Start processing with just the source node.
    next_level_worklist.push(source)
    # Execute until the worklist is empty.
    while not next_level_worklist.empty():
        # Swap the current and next work lists
        curr_level_worklist, next_level_worklist = next_level_worklist, curr_level_worklist

        # Clear the worklist for the next level.
        next_level_worklist.clear()
        next_level_number += 1

        # In parallel process the current worklist, by applying bfs_operator for each
        # element of the worklist.
        do_all(
            curr_level_worklist,
            # The call here binds the initial arguments of bfs_operator.
            bfs_operator(graph, next_level_worklist, next_level_number, distance)
        )

    return distance

In [4]:
# This function is marked as a Katana operator meaning that it will be compiled to
# native code and prepared for use with Katana do_all.
@do_all_operator()
def bfs_operator(graph: Graph, next_level_worklist, next_level_number, distance, node_id):
    """
    The operator called for each node in the work list.

    The initial 4 arguments are provided by bfs above. node_id is taken from
    the worklist and passed to this function by do_all.

    :param next_level_worklist: The work list to add next nodes to.
    :param next_level_number: The level to assign to nodes we find.
    :param distance: The distance array to fill with data.
    :param node_id: The node we are processing.
    :return:
    """
    # Iterate over the out edges of our node
    for edge_id in graph.out_edge_ids_for_node(node_id):
        # Get the destination of the edge
        dst = graph.out_edge_dst(edge_id)
        # If the destination has not yet been reached set it's level and add it
        # to the work list, so it's out edges can be processed in the next level.
        if distance[dst] == INFINITY:
            distance[dst] = next_level_number
            next_level_worklist.push(dst)
        # There is a race here, but it's safe. If multiple calls to operator add
        # the same destination, they will all set the same level. It will create
        # more work since the node will be processed more than once in the next
        # level, but it avoids atomic operations, so it can still be a win in
        # low-degree graphs.

In [None]:
from katana.example_data import get_rdg_dataset

# Download the input
rmat15_cleaned_symmetric_path = get_rdg_dataset("rmat15_cleaned_symmetric")

# Load our graph
graph = Graph(rmat15_cleaned_symmetric_path)

In [6]:
# Run our algorithm
distances = bfs(graph, 0)

In [7]:
# Look at some arbitrary results
distances[:20], distances[490:510]

(array([0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       dtype=uint32),
 array([1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 2, 1, 1],
       dtype=uint32))

In [8]:
# Maximum distance to a reached node (i.e., nodes that do not have infinite distance)
np.max(distances[distances < INFINITY])

2

In [9]:
# Number of reached nodes
np.count_nonzero(distances != INFINITY)

29352

Even algorithms written in Python this way perform reasonably well.

In [10]:
print("Average algorithm runtime (over 100 runs):")
print(timeit.timeit(lambda: bfs(graph, 0), number=100) / 100 * 1000, "ms")

Average algorithm runtime (over 100 runs):
1.6204886100604199 ms
