# Transitive Closure

In [None]:
import pycompss.interactive as ipycompss

In [None]:
ipycompss.start(graph=True)

In [None]:
from dds import DDS

In [None]:
import time
import random

In [None]:
def _generate_graph():
    """Generate graph.

    :return: Set of edges.
    """
    random.seed(1)
    num_edges = 10
    num_vertices = 5
    rand = random.Random(42)

    edges = set()
    while len(edges) < num_edges:
        src = rand.randrange(0, num_vertices)
        dst = rand.randrange(0, num_vertices)
        if src != dst:
            edges.add((src, dst))
    return edges

In [None]:
def transitive_closure(partitions=2):
    """Transitive closure.

    :param partitions: Number of partitions.
    :results: Transitive closure result.
    """
    print("--- TRANSITIVE CLOSURE ---")

    edges = _generate_graph()
    start_time = time.time()

    o_d = DDS().load(edges, partitions).collect(future_objects=True)

    # Because join() joins on keys, the edges are stored in reversed order.
    edges = DDS().load(o_d, -1).map(lambda x_y: (x_y[1], x_y[0]))

    next_count = DDS().load(o_d, -1).count()

    while True:
        old_count = next_count
        # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
        # then project the result to obtain the new (x, z) paths.
        new_edges = (
            DDS()
            .load(o_d, -1)
            .join(edges)
            .map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
        )
        o_d = (
            DDS()
            .load(o_d, -1)
            .union(new_edges)
            .distinct()
            .collect(future_objects=True)
        )

        next_count = DDS().load(o_d, -1).count()

        if next_count == old_count:
            break

    print(f"- TC has {next_count} edges")
    print(f"- Elapsed Time: {time.time() - start_time} (s)")
    print("--------------------------")

    return next_count == 20

In [None]:
transitive_closure()

In [None]:
ipycompss.stop()