In [None]:
# Install and set up spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -O ./spark-3.3.2-bin-hadoop3.tgz  https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar zxf ./spark-3.3.2-bin-hadoop3.tgz

# install and set up the graphframes library. It will be used to send messages between vertices.
!pip install graphframes 
!curl -L -o "/content/spark-3.3.2-bin-hadoop3/jars/graphframes-0.8.2-spark3.2-s_2.12.jar" https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar

# networkx is used to generate sample graphs
!pip install networkx

--2023-02-22 13:21:33--  https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299360284 (285M) [application/x-gzip]
Saving to: ‘./spark-3.3.2-bin-hadoop3.tgz’


2023-02-22 13:21:35 (183 MB/s) - ‘./spark-3.3.2-bin-hadoop3.tgz’ saved [299360284/299360284]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  242k  100  242k    0     0  1301k      0 --:--:-- --:--:-- --:--:-- 1301k
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Set up system paths to allow spark to operate correctly
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"



In [None]:
# Confirm the location of spark
!pip install findspark
import findspark
findspark.init()
display(findspark.find())


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


'/content/spark-3.3.2-bin-hadoop3'

In [None]:
from logging import Logger
_logger = Logger(__name__)

# Other imports
# ===============
import numpy as np
import pandas as pd
import random
from pathlib import Path
import json
import time
import zlib
import pickle



import pyspark
# from pyspark.rdd import RDD
# from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# from pyspark.sql import SQLContext

from pyspark.sql.types import *
from pyspark.sql import Row
# from pyspark.sql.window import Window
import pyspark.sql.functions as f

# graphframes modules
from graphframes import GraphFrame
from graphframes.lib import *

import networkx as nx


In [None]:

# ===========================================
# * Define UDFs for searching incoming paths 
# ===========================================

# * Why we are using a udf and not SQL commands:
# ----------------------------------------------
# The rational for this UDF is that graph.aggregateMessages() creates a complex list of paths between all nodes.
# This cartesian multiplication of paths needs to be unpacked by using f.explode() 

# However, 'exploding' these rows adds a lot of Dataframe processing around each message iteration.

# Instead, we are loading the nested path arrays into a Numpy multidimentional array. 
# We can then perfom all cycle-search and outbound message construction tasks using vectorized code on the worker nodes.
# -----------------------------------------------

# #################################################################################################
# Analysis of NESTED paths
# #################################################################################################


# This schema defines the returned objects from the UDF
nested_path_analysis_schema = StructType([
    StructField("cycles", ArrayType(ArrayType(IntegerType())), True),
    StructField("outbound_message", ArrayType(ArrayType(IntegerType())), True)
    ])




# UDF for analyzing incoming NESTED graph paths
def analyze_nested_paths(vertex_id, path_array):
    '''
    This Vectorized UDF searches a path array efficiently for 3 conditions. 
    If some are met, a cycle is present and *might* be reported.
    
    (1)    :  Is the vertex present in the path? If yes, this path will not be sent onwards to neighbors.
    (2 + 3):  Is the vertex not only present, but also the first in the path and also the smallest id in the path?
                If so, the UDF will report the path as a cycle (these conditions avoid duplicate cycle reporting)
    '''
    cycles = None
    outbound_message = None
    
    try:
       
        # Initiate vectorized handling of path analysis - load incoming paths into a multidimensional array.
        np_path_array = np.array(path_array)
        
        # Find primary cycles
        if np_path_array.shape[1] > 2:
            vertex_is_first_in_path_mask = vertex_id == np_path_array[:,0]    
            vertex_is_smallest_in_path_mask = vertex_id == np_path_array.min(axis=1)
            cycles_to_report_mask = np.logical_and(vertex_is_first_in_path_mask , vertex_is_smallest_in_path_mask)

            if np.any(cycles_to_report_mask):
                cycles = np_path_array[cycles_to_report_mask].tolist()

    
        # Prepare outbound messages for next iteration
        vertex_not_in_path_mask = np.logical_not(np.any(np_path_array == vertex_id, axis=1))    
        if sum(vertex_not_in_path_mask) > 0:
            vertex_id_array = np.zeros(np_path_array[vertex_not_in_path_mask].shape[0]) + vertex_id

            outbound_message = (np.append(np_path_array[vertex_not_in_path_mask], 
                                          vertex_id_array.reshape(vertex_id_array.shape[0],1), axis=1)
                               .astype(dtype=int).tolist()
                                )
    except Exception as exp:
        msg = (f'nested UDF exception -  np_path_array.shape {np_path_array.shape}'
                f'\n\n np_path_array: {np_path_array:} \n'
               f'Original path array: {path_array}\n'
               + str(exp))
        raise Exception(msg)
        


    # return found cycles, and paths which will be forwarded to neighbors in the next iteration
    return Row('cycles', 'outbound_message')(cycles, outbound_message) 


# define path analysis as a UDF - will be executed on the paths dataframe during each iteration
udf_analyze_nested_paths = f.udf(analyze_nested_paths, nested_path_analysis_schema)

In [None]:

# #################################################################################################
# Analysis of FLAT paths
# #################################################################################################

# This schema defines the returned objects from the UDF
flat_path_analysis_schema = StructType([
    StructField("cycle_flag", BooleanType(), True),
    StructField("outbound_message", ArrayType(IntegerType()), True)
    ])

def analyze_flat_path(vertex_id, path_array):

    cycle_flag = None
    outbound_message = None

    np_path_array = np.array(path_array)

    # determine if a primary cycle is detected - will be reported to the user
    if np_path_array[0] == vertex_id:
        if np_path_array.min() == vertex_id:
            cycle_flag = True
    
    # determine if a secondary  cycle is detected - if so, this message will be dropped.
    elif vertex_id not in np_path_array:
        # no secondary cycle detected - forward the message after appending this vertex to the end of the path
        outbound_message = np_path_array.tolist() + [vertex_id]

    # return found cycles, and paths which will be forwarded to neighbors in the next iteration
    return Row('cycle_flag', 'outbound_message')(cycle_flag, outbound_message) 

# define path analysis as a UDF - will be executed on the paths dataframe during each iteration
udf_analyze_flat_path = f.udf(analyze_flat_path, flat_path_analysis_schema)

In [None]:
# Define UDFs for compressing and decompressing messages between vertices
# - the goal here is to avoid large message broadcasts that generate errors

# compression function using zlib
def compress(path_data):    
    compressed_data = zlib.compress(pickle.dumps(path_data))
    
    if compressed_data is None:
        raise Exception(f' **** Compressed data: {compressed_data} *****')
    
    return compressed_data


# decompression function using zlib
def decompress(compressed_data):        
    
    # We first unpack incoming (aggregated) data.
    decompressed_data = []    
    for compressed_msg in compressed_data:
        decompressed_msg = pickle.loads(zlib.decompress(compressed_msg))        
        decompressed_data.append(decompressed_msg)
        
    
    # Next we perform a correction of the nested list structure, since spark aggregation
    # can create jagged arrays which are not compatible with the declared return type of this UDF -
    #  which then leads to execution errors.
    
    # Assume original path is correct unless proven otherwise
    corrected_path_array = decompressed_data.copy()

    # check dimentionality of data. We want to be working with 2d arrays:
    np_path_array = np.array(decompressed_data)
    array_dimensionality = len(np_path_array.shape)
    while array_dimensionality > 2:
        np_path_array = np_path_array[0]
        array_dimensionality = len(np_path_array.shape)

    path_array = np_path_array.tolist()
    corrected_path_array = path_array.copy()

    # Check for jaggedness and correct if needed
    for path in path_array[1:]:      
        # in a jagged array, elements are of different lengths
        if len(path_array[0]) != (len(path)):        
            # we need to "smooth out" the array for processing
            corrected_path_array = []
            for sub_path in path_array:      
                corrected_path_array = corrected_path_array + sub_path              
            break
    
    
    decompressed_data = corrected_path_array
    
#     raise Exception(f' **** Decompressed data: {decompressed_data} *****')
    return decompressed_data



In [None]:

# ======================================================
# Rocha-Thatte: Cycle detection via message aggregation
# ======================================================
def find_cycles(spark_session : SparkSession, edges_list, cycle_search_method = 'dataframe_queries', verbose = False,
                                    max_iterations=10000, get_cached_dataframes = True,
                                    metadata = {'cluster_config': 
                                                {'workers_count':1,
                                                 'workers_vcpu_count':2}}
               ):    
    """
    An implementation of the Rocha-Thatte algorithm for large-scale sparse graphs
    Credits: The algorithm is an implementation of Rocha-Thatte's paper -
            https://www.researchgate.net/publication/283642998_Distributed_cycle_detection_in_large-scale_sparse_graphs
             
               

    input:
    ======
    spark_session           - (mandatory) A live spark session object. Used to create dataframes and RDDs during this run.
    edges                   - (mandatory) A list of tuples, containing source-destinaion integer pairs with all edges in the graph. 
                                            Vertices will be inferred from this list.
    max_iterations          - (optional) Stopping condition to avoid exceeding a desired run-time.    
    get_cached_dataframes   - (optional) Significantly accelerates iterative code over GraphFrames.
    verbose                 - (optional) Show/output all dataframes while running. May cause a large performance impact.
    cycle_search_method     - (optional) control how paths are searched. 3 options:
            
                      * 'nested_paths' a UDF-based method which uses a multidimensional numpy array to process all paths from all source vertices.
                         It's the fastest process this algorithm implements. The downsidew is that it is difficult to debug and 
                         the code is as transparent as the the dataframe-sql approach (below)

                      * 'flat_paths' is a similar approach to the 'nested' one, but we first 'f.explode()' the incoming messages.
                         this has the effect of placing each incoming path in it's own row of the dataframe. It is a much
                         slower option, but the numpy UDF then only needs to scan a single path - which is far simpler code. 
                        
                      * 'dataframe_queries' uses sql-like commands directly on the DataFrame to implement the algorithm.
                         This method is the slowest, but does not invovle using UDFs at all.    
    
    metadata                - (optional) Information about the configuration of the cluster, for performance tracking purposes.
                                            This dictionary is also used internally to append runtime metrics, and is returned to caller.

    output:
    =======
    detected_cycles - a dataframe containing all detected cycles (can be empty)
    metadata        - a dictionary containing metadata about the cycle search, such as number of cycles found, max cycle length etc.
                        - this dictionary has a default value to set partition size to a conservagtive number, 
                           in case this value is not provided by caller (num of vcpu * num of workers = partitions).
    """


#     logger = Logger(__name__)
    print(f"Starting cycle search. Search and messaging method is: '{cycle_search_method}'")
        

    # create an emtpy dataframe to collect all cycles as we find them
    detected_cycles = spark_session.createDataFrame(data=spark_session.sparkContext.emptyRDD(), 
                                        schema=StructType([StructField("cycles", ArrayType(IntegerType()), True)]))


    # Build edges dataframe. Note we do not search for self loops in this algorithm.
    edges = spark_session.createDataFrame(edges_list,["src","dst"]).where(f.col("src") != f.col("dst"))
    
    # Infer vertices from edges.     
    # Note that on the first iteration, all outbound messages are just the id of the source vertex.    
    # ---------------------------------------------------------------------------------------------
    
    # Set up initial message structure based on the type of cycle-search/messaging that will be used    
    vertices = (edges.select("src").union(edges.select("dst")).distinct()
                     .withColumnRenamed('src', 'id')
                     .withColumn("outbound_message", (f.array(f.col("id"))))
               )
    
    # Set up message compression schemas
    # ----------------------------------
    # compression is always the same
    compress_udf = f.udf(compress, BinaryType())
    # decompression is method dependant
    decompress_udf = f.udf(decompress, ArrayType(ArrayType(IntegerType())))
        
    
    # Create graph with initial vertices and edges
    # =============================================    
    graph = GraphFrame(vertices, edges)

    # What is the size of the graph?
    metadata['edge_count'] = len(edges_list)
    metadata['vertex_count'] = vertices.count()
    print(
    'Graph contains:\n'
    '===============\n'
    f'Edges #: {metadata["edge_count"]}\n'
    f'Vertices #: {metadata["vertex_count"]}\n'
    '==============')
    

    # Set up performance tracking
    # ===========================
    # We track how long the run takes
    metadata['start_time'] = time.time()    
    # this list contains performance information about each individual iteration
    iteration_metrics = []
    
    # Create a broadcast variable that contains a single row
    # This will be used to count incoming messages for our stopping condition, 
    #  without trigerring a network wide broadcast (which limits how large of a graph we can process).
    broadcast_df = f.broadcast(spark.createDataFrame([(1, )], ["one"]))
    
    # periodically cache all dataframes to avoid execution plans that are too big
    cache_interval = 10

    # iterate until no more cycles can be found (or until run time limit reached)
    for iteration in range(1, max_iterations):

        # Start measuring runtime - we track how long each iteration takes
        iteration_start_time = time.time()

        print(f'Iteration: {iteration}')

        # ===========================================================
        # * Send/Receieve messages
        # ===========================================================        
        #
        # The method graph.aggregateMessages() implements 2 steps from the BSP model:        
        #
        #   - send messages from each vertex to its neighbors
        #   - receieve messages from all neighbors, aggregate them and output a dataframe with incoming messages per vertex. 
        #     
        # (the 3rd BSP step - processing - is performed later in the code))
        #
        # Note: DO NOT CONFUSE graph.aggregateMessages(...) with AggregateMessages(...). The former is a method of the graph object,
        #       while the latter is a static class with utilities used with the graph's method. 
        # 
        
        # * Send messages to all neighbors
        # When calling this method, we define the mssage that each vertex sends to its neighbors.
        # This is done using a Static Class utility from 'AggregateMessages': AggregateMessages.src[colname] - it identifies a column
        # in graph.vertices which contins the mssage paylod from each vertex. 
        # Graphframes will transmit each value in this field, per vertex, to all its neighbors.
        #
        # * Aggregate incoming messages:
        #  Once trasmitted, graphframes will aggregate the incoming message from all neighbors of any given vertex, 
        #  by using an aggregation function which we specify. In our case, this is 'f.collect_set(AggregateMessages.msg)' which collects
        #  the incoming messages into one big list of id's. This data is then made avaialble via a dataframe that graph.aggregateMessages() 
        #  returns at the end of its run.

        
        # Implementation: send and receieve all messages 
        #  between vertices. Place inbound-aggregated-messages into a dataframe.
        # --------------------------------------------------------------------------------------------------------------------
        print('Commencing messaging between vertices:')
        
        incoming_msg_dataframe = graph.aggregateMessages(
                                # Define how incoming messages are aggregated, 
                                #   so that they can be placed into a single cell (per vertex/row).
                                # Also, decompress all messages after they've been delivered (using a UDF)
                                aggCol=decompress_udf(f.collect_set(AggregateMessages.msg)).alias("incoming_messages"),
                                # Define the column containing the message each vertex sends to its outbound neighbors
                                # Also get a UDF to compress this data before sending
                                sendToDst=compress_udf(AggregateMessages.src["outbound_message"]),
                                )   
          
        if get_cached_dataframes:
            incoming_msg_dataframe = AggregateMessages.getCachedDataFrame(incoming_msg_dataframe)
        
        
        # code to show progress - use the 'verbose' flag to contorl. Has a performance impact.
        if verbose:
            incoming_msg_dataframe.show(n=5, truncate=False, vertical=True)                         


        

    
        
        # Check the 'incoming messages df' for messages that have arrived (to any of the vertices).
        # BREAK condition: if no more messages are received, we're done.
        # Join the DataFrame with the broadcast variable to avoid a network-broadcast (otherwise graph size is limited)
        incoming_message_count = incoming_msg_dataframe.join(broadcast_df, broadcast_df.one == 1).count()
#         incoming_message_count = incoming_msg_dataframe.count()
        if (incoming_message_count == 0):
            print(f"No more messages receieved into any of the vertices. We're done.")
            break

        
        # ================================================================
        # * Processing step:
        #
        # 1. Detect whether a cycle has been found. If so, report it.
        # 2. Compose messages to outbound meighbors, where necessary         
        #
        # Cycle types:
        # ============
        # A "primary cycle" is a cycle that needs to be reported by the vertex detecting it. 
        #   i.e. it fullfills the conditions that the vertex id has been seen in the path before, and it is the smallest id.
        #
        # A "secondary cycle" is a cycle which contains the detecting vertex, but the id of the vertex is not the smallest one and therefore
        #  some other vertex will be reporting it.
        #
        # We detect both types of cycles, since (A) we need the primary cycles to be reported, and (B) a cycle is not propogated as a message
        #  to the next iteration - it is instead "dropped".
        # ================================================================
        
        # Check whether a Dataframe SQL approach is configured, or whether we use UDFs for processing graph paths
        if cycle_search_method != 'dataframe_queries':
            
            # ======================================================
            # Scan incoming messages for cycles and outbound paths, 
            #   using a UDF. 2 Options are avaialble ('flat' and 'nested')
            # ======================================================
            # In this method we use a UDF to load incoming paths into a numpy array since it increases performance
            # when compared with the array_contains() method of the native arrayType of spark. 

            # * 'nested_paths' method uses a multidimensional numpy array to process all paths from all source vertices.
            # It's the fastest process this algorithm implements. The downsidew is that it is difficult to debug and 
            # the code is as transparent as the the dataframe-sql approach (below)

            # 'flat_paths' is a similar approach to the 'nested' one, but we first 'f.explode()' the incoming messages.
            #  this has the effect of placing each incoming path in it's own row of the dataframe. It is a much
            #  slower option, but the numpy UDF then only needs to scan a single path - which is far simpler code. 

            if cycle_search_method == 'nested_paths_udf':                
                analyzed_paths_df = incoming_msg_dataframe.withColumn('analysis_result', 
                                            udf_analyze_nested_paths(f.col('id'), f.col('incoming_messages')))
            
            
            elif cycle_search_method == 'flat_paths_udf':
                analyzed_paths_df = (incoming_msg_dataframe
                                     .withColumn('flat_message', f.explode(f.col('incoming_messages')))
                                     .withColumn('analysis_result', udf_analyze_flat_path(f.col('id'), f.col('flat_message'))))
                
            else:
                print('Unknown cycle_search_method parameter was given. Aborting.')
                return
            
                

            # code to show progress - use the 'verbose' flag to contorl. Has a performance impact.
            if verbose:
                analyzed_paths_df.show(n=5, truncate=False, vertical=True)

            # # primary cycles will be collected in a dataframe that tracks all detected cycles:                
            if cycle_search_method == 'nested_paths_udf':
                temp_cycles = (analyzed_paths_df.where(f.isnull('analysis_result.cycles') == False)
                                            .select(f.explode('analysis_result.cycles').alias('cycles')))        

            else:
                temp_cycles = (analyzed_paths_df.where(f.col('analysis_result.cycle_flag') == True)
                                            .withColumn('cycles', f.col('flat_message')).select('cycles'))

            
            # Cycles have been flagged, we now construct the messages going to neighbors:            
            # Outbound messages have been constructed while we searched for cycles (by the UDF). 
            # Pack them into a new 'vertices DF' used in the next iteration:
            outbound_messages = (analyzed_paths_df.where(f.isnull('analysis_result.outbound_message') == False)
                                                  .select('id', 'analysis_result.outbound_message')
                                                     .alias('outbound_message'))
                                    
            

        else:
        
            # ======================================================================        
            #  DataFrames queries method for finding cycles:
            # ======================================================================
            # Graphframes aggregates all incoming messages into a single structure (a list of lists)
            # by using 'f.explode()' we split the internal lists into seperate rows (still indexed by the same vertex ID).
            #  this lets us search for a cycle on each incoming path messages (from each neighbor)        
            analyzed_paths_df = (incoming_msg_dataframe.withColumn("graph_paths", f.explode(f.col("incoming_messages"))))

            # # scan each incoming message/path to see if it meets our conditions for a cycle:
            # # 1. the recieving vertex is first in the list, and - 
            # # 2. it is the smallest-id in the list
            # # If so, flag it as a cycle which needs to be reported
            analyzed_paths_df = (analyzed_paths_df
                                 .withColumn("primary_cycle_detected", 
                                    # Determine primary cycles
                                     f.when(
                                         ( f.col("id") == f.element_at(f.col("graph_paths"),1) ) 
                                       & ( f.col("id") == f.array_min(f.col("graph_paths"))    )
                                     , True).otherwise(False) )

                                # If not a primary cycle, check if its a secondary cycle (see explanation above)
                                .withColumn("secondary_cycle_detected", 
                                    f.when(f.array_contains(f.col('graph_paths'), f.col('id')) == True, True)
                                            .otherwise(False))
            )
                    
            if get_cached_dataframes:
                analyzed_paths_df = AggregateMessages.getCachedDataFrame(analyzed_paths_df)

            if verbose:
                analyzed_paths_df.show(n=5, vertical=True, truncate=False)
            
            # Now extract all detected primary cycles
            temp_cycles = (analyzed_paths_df
                                .where(f.col('primary_cycle_detected') == True)
                                .withColumn('cycles', f.col('graph_paths'))
                                .select(f.col('cycles'))
                              )
            

            # =====================================================================
            # * Compose messages to outbound neighbors - used in the next iteration
            # =====================================================================
            # Outbound messages are ONLY sent in cases where a path does not contain any cycle.                

            # Since we've already tagged all cycles with a flag, we can just filter and package the remaining paths         
            outbound_messages = (analyzed_paths_df
                                    .where(f.col('secondary_cycle_detected') == False)                                 
                                    .withColumn("outbound_message", f.concat(f.col("graph_paths"), f.array(f.col("id"))))
                                    .select(f.col('id'), f.col('outbound_message'))
                                )



        # The following code is shared by all cycle-search methods. 
        # It handles tracking of discovered cycles, and preparation of the graph for the next iteration of search
        # =======================================================================================================
        if get_cached_dataframes:
            temp_cycles =  AggregateMessages.getCachedDataFrame(temp_cycles)


        # Uncomment to view whether cycles have been found
        if verbose:
            temp_cycles.show(vertical=True, truncate=False, n=10)

        # Use a broadcast join to determine whether we found cycles in this iteration. If so, save them.        
        temp_cycles_count = temp_cycles.join(broadcast_df, broadcast_df.one == 1).count()                
        if temp_cycles_count > 0:                    
            # add newly discovered cycles into a trakcing DF
            detected_cycles = detected_cycles.union(temp_cycles)

            # resolve and cache trakcing DF to avoid memory issues        
            if get_cached_dataframes:
                detected_cycles =  AggregateMessages.getCachedDataFrame(detected_cycles)
          
            


        # Uncomment to view outbound messages in each iteration - may affect performence
        if verbose:
            outbound_messages.show(n=10, truncate=False, vertical=True)

        # In order to avoid memory issues the graph is instantiated with (optionally) cached copies of the dataframes        
        if get_cached_dataframes:
            outbound_messages =  AggregateMessages.getCachedDataFrame(outbound_messages)
            edges =  AggregateMessages.getCachedDataFrame(graph.edges)

        graph = GraphFrame(outbound_messages, edges)
        

        
        # Perform periodic caching of all dataframes to avoid execution plans that are too big
        if iteration % cache_interval == 1:
            print('Commencing periodic caching of dataframes.....')
            graph.vertices.cache
            incoming_msg_dataframe = incoming_msg_dataframe.cache()
            outbound_messages = outbound_messages.cache()
            analyzed_paths_df = analyzed_paths_df.cache()
            temp_cycles = temp_cycles.cache()
            detected_cycles = detected_cycles.cache()
            print('Caching done.')

        # collect iteration statistics
        iteration_metrics.append({
            'iteration_num' : iteration,
            'runtime'       : time.time() - iteration_start_time,            
            'cycles_count'  : temp_cycles_count
        })


    
    # Gather run statistics
    # =====================
    # conclude collecting performance metrics and output report
    metadata['end_time'] = time.time()
    metadata['iteration_metrics'] = iteration_metrics
    
    print(f"Cycle search total run time: {metadata['end_time'] - metadata['start_time']}")
    
    return detected_cycles, metadata

In [None]:

# We use this method before each cycle analysis run, to get a fresh environment.
def get_spark_session(worker_vcpu_count = 4):
    
    
    spark_session = (SparkSession.builder                                                                                  
                          # .appName("Rocha-Thatte").master(f"local[{worker_vcpu_count}]").getOrCreate()                            
#                           .config("spark.serializer", "org.apache.spark.serializer.PythonSerializer")
                            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")                                         
#                             .config("spark.kryoserializer.buffer.max", "2047m")                                         
#                             .config("spark.driver.memory", "10g") 
#                             .config("spark.executor.memory", "10g") 
                            .config("spark.sql.execution.arrow.pyspark.enabled", "true")
#                             .config("spark.sql.pyspark.jvmStacktrace.enabled", "true")             

                            .appName("Rocha-Thatte")                             
                            .getOrCreate()
                     )

    # Output cluster status
    print(f'Resource manager: {spark_session.sparkContext.master}')
    print(f'Spark version: {spark_session.version}')

    return spark_session
             
    

spark = get_spark_session(2)

try:
    serializer = spark.conf.get("spark.serializer")
    print("Current serializer: ", serializer)
except Exception as exp:
    print('Cannot get the spark.serializer configuration setting')
    
try:
    jvmStacktrace = spark.conf.get("spark.sql.pyspark.jvmStacktrace.enabled")
    print("pyspark.jvmStacktrace: ", jvmStacktrace)
except Exception as exp:
    print('Cannot get the pyspark.jvmStacktrace configuration setting')
    
    

# check if Arrow is enabled
arrow_enabled = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")
if arrow_enabled == "true":
    print("Arrow is enabled")
else:
    print("Arrow is not enabled")

Resource manager: local[*]
Spark version: 3.3.2
Cannot get the spark.serializer configuration setting
pyspark.jvmStacktrace:  false
Arrow is not enabled


In [None]:

# ======================================
# Dynamic random sparse graph generator:
# ======================================
# Generate a grpah, defined by n vertices, each with probability p to have an edge to any other vertex.
def generate_sample_graph(n, p):
  
    # networkx creates an internal graph representation. We export this into an adjecancy list.
    # n is number of vertices, p is probability to see an edge between any two vertices
    ws_graph = nx.fast_gnp_random_graph(n = n, p = p, directed = True)  
      
    # each item on the adjecency-list is a sub-list in which the 1st item is a source-vertex, 
    #  and all following numbers are destination vertices from that vertex. 
    # - we convert this structure into a list of directed edges.
    directed_edges = []
    print(f'Generated the following random edge list:')
    for vertex_and_neighbors in nx.generate_adjlist(ws_graph):      
        
        # (the list is a string, we need to split and cast it to integers.)
        vertices = np.array([int(vertex) for vertex in vertex_and_neighbors.split(' ')])
        print(str(vertices[0]) + ' ---> ' + str(vertex_and_neighbors[1:]), flush=True)        
        
        # convert to edge list of tuples                
        for target_vertex in vertices[1:]:
            
            directed_edges.append((int(vertices[0]), int(target_vertex)))

    # Done. Return graph edgelist to caller.
    return directed_edges


In [None]:

# Configure a series of tests. 
# Each tuple defines a number of vertices and probability of an edge between any 2 vertices.
# [(50, 0.04),(60, 0.025),(70,0.015)] # 
test_configs = [(30, 0.06), (40, 0.04)]

for test_config in test_configs:
  # Dynamic sample sparse graph generator
    num_vertices = test_config[0]
    probability_for_edge = test_config[1]

    print(f'Beginning test with {num_vertices} vertices and p={probability_for_edge} probability of edge per vertex-pair')

    edge_list = generate_sample_graph(n = num_vertices, p = probability_for_edge)
    
    # Start distributed search:
    # ---------------------------
    # Options for parameter 'cycle_search_method': 'flat_paths_udf', 'nested_paths_udf', 'dataframe_queries'    
    detected_cycles, metadata = find_cycles(spark, edge_list, cycle_search_method = 'nested_paths_udf', 
                                            verbose = True, get_cached_dataframes = True,
                                            metadata={'cluster_config': 
                                                        {'provider':'Colab',
                                                        'workers_count':1,
                                                        'worker_vcpu_count':worker_vcpu_count,
                                                         'worker_memory_gb' : 12,
                                                        'spark_ver':spark.version,
                                                        'probability_for_edge': probability_for_edge,
                                                        }
                                                    })



    print('Preparing Run summary....:')
    # Persist all data/metadata for analysis later
    # ----------------------------------------
    # make a unique id for this run
    run_id = str(time.time())

    # ensure folder exists for saving run statistics
    directory = './run_stats/'
    Path(directory).mkdir(exist_ok=True)
    
    # Save Discovered Cycles list
    # ----------------------------
    detected_cycles = detected_cycles.toPandas()
    display(detected_cycles)
    detected_cycles.to_csv(directory + 'detected_cycles_' + run_id + '.csv')

    # Save Cluster and Graph configuration (Size of graph, num of nodes, CPU, memory etc.)
    # -----------------------------------------------------------------------------------
    with open(directory + 'metadata_' + run_id + '.json', 'w') as fp:
        json.dump(metadata, fp)
    print(metadata)

    run_summary_df = pd.DataFrame(
                      columns=['iteration_num', 'runtime', 'cycles_count','edge_count','vertex_count','workers_count','worker_vcpu_count','provider'],
                      data=metadata['iteration_metrics']
                  )

    run_summary_df['edge_count'] = metadata.get('edge_count',None)
    run_summary_df['vertex_count'] = metadata.get('vertex_count',None)
    run_summary_df['start_time'] = metadata.get('start_time',None)
    run_summary_df['end_time'] = metadata.get('end_time',None)
    run_summary_df['workers_count'] = metadata['cluster_config']['workers_count']
    run_summary_df['worker_vcpu_count'] = metadata['cluster_config']['worker_vcpu_count']
    run_summary_df['provider'] = metadata['cluster_config']['provider']
    run_summary_df.to_csv(directory + 'run_summary_df_' + run_id + '.csv')
    display(run_summary_df)
        
    # Save edge list for validation of search correctness with other algorithms
    pd.DataFrame(edge_list).to_csv(directory + 'edge_list_df_' + run_id + '.csv')
    
print('Testing run is done.')


Beginning test with 30 vertices and p=0.06 probability of edge per vertex-pair
Generated the following random edge list:
0 --->  6 8 15 18
1 --->  2 24
2 ---> 
3 --->  28 2
4 --->  13 24
5 --->  13 24
6 --->  11 19 20 28
7 --->  9 18 23
8 --->  12 29
9 --->  11 16 21 3 6
10 ---> 0 19 21 23
11 ---> 1 0 7
12 ---> 2 24 8
13 ---> 3 16 5
14 ---> 4 21
15 ---> 5
16 ---> 6
17 ---> 7 21
18 ---> 8 19 27
19 ---> 9 23 27 8
20 ---> 0 25 12 16
21 ---> 1 15
22 ---> 2 29 1
23 ---> 3
24 ---> 4 0 1 2
25 ---> 5 9 11
26 ---> 6 23
27 ---> 7 17 19
28 ---> 8 0 24
29 ---> 9
Getting a spark session...
Resource manager: local[*]
Spark version: 3.3.2
Starting cycle search. Search and messaging method is: 'nested_paths_udf'




Graph contains:
Edges #: 58
Vertices #: 30
Iteration: 1
Commencing messaging between vertices:




-RECORD 0------------------------------------
 id                | 29                      
 incoming_messages | [[22], [8]]             
-RECORD 1------------------------------------
 id                | 19                      
 incoming_messages | [[27], [6], [10], [18]] 
-RECORD 2------------------------------------
 id                | 0                       
 incoming_messages | [[11], [28], [24]]      
-RECORD 3------------------------------------
 id                | 7                       
 incoming_messages | [[11]]                  
-RECORD 4------------------------------------
 id                | 25                      
 incoming_messages | [[20]]                  
only showing top 5 rows

-RECORD 0------------------------------------------------------------
 id                | 29                                              
 incoming_messages | [[22], [8]]                                     
 analysis_result   | {null, [[22, 29], [8, 29]]}                     
-RECO

Unnamed: 0,cycles
0,"[0, 6, 11]"
1,"[0, 6, 28]"
2,"[7, 9, 11]"
3,"[0, 8, 12, 24]"
4,"[0, 6, 28, 24]"
5,"[6, 11, 7, 9]"
6,"[6, 20, 25, 9]"
7,"[0, 6, 20, 12, 24]"
8,"[0, 6, 20, 25, 11]"
9,"[0, 6, 20, 25, 9, 11]"


{'cluster_config': {'provider': 'Colab', 'workers_count': 1, 'worker_vcpu_count': 2, 'worker_memory_gb': 12, 'spark_ver': '3.3.2', 'probability_for_edge': 0.06}, 'edge_count': 58, 'vertex_count': 30, 'start_time': 1677072152.0710416, 'end_time': 1677072216.14568, 'iteration_metrics': [{'iteration_num': 1, 'runtime': 7.629881143569946, 'cycles_count': 0}, {'iteration_num': 2, 'runtime': 6.204765796661377, 'cycles_count': 0}, {'iteration_num': 3, 'runtime': 3.494032859802246, 'cycles_count': 3}, {'iteration_num': 4, 'runtime': 3.0111794471740723, 'cycles_count': 4}, {'iteration_num': 5, 'runtime': 3.26934552192688, 'cycles_count': 2}, {'iteration_num': 6, 'runtime': 5.241308927536011, 'cycles_count': 2}, {'iteration_num': 7, 'runtime': 3.3535163402557373, 'cycles_count': 3}, {'iteration_num': 8, 'runtime': 2.7983014583587646, 'cycles_count': 2}, {'iteration_num': 9, 'runtime': 2.7841804027557373, 'cycles_count': 2}, {'iteration_num': 10, 'runtime': 3.4132351875305176, 'cycles_count': 2},

Unnamed: 0,iteration_num,runtime,cycles_count,edge_count,vertex_count,workers_count,worker_vcpu_count,provider,start_time,end_time
0,1,7.629881,0,58,30,1,2,Colab,1677072000.0,1677072000.0
1,2,6.204766,0,58,30,1,2,Colab,1677072000.0,1677072000.0
2,3,3.494033,3,58,30,1,2,Colab,1677072000.0,1677072000.0
3,4,3.011179,4,58,30,1,2,Colab,1677072000.0,1677072000.0
4,5,3.269346,2,58,30,1,2,Colab,1677072000.0,1677072000.0
5,6,5.241309,2,58,30,1,2,Colab,1677072000.0,1677072000.0
6,7,3.353516,3,58,30,1,2,Colab,1677072000.0,1677072000.0
7,8,2.798301,2,58,30,1,2,Colab,1677072000.0,1677072000.0
8,9,2.78418,2,58,30,1,2,Colab,1677072000.0,1677072000.0
9,10,3.413235,2,58,30,1,2,Colab,1677072000.0,1677072000.0


Beginning test with 40 vertices and p=0.04 probability of edge per vertex-pair
Generated the following random edge list:
0 --->  10 29
1 --->  32
2 --->  9 21 35
3 --->  26
4 --->  24
5 --->  17 36
6 --->  12 14 2 3
7 --->  23
8 --->  24
9 --->  25 35
10 ---> 0 15
11 ---> 1 20 23 8
12 ---> 2 24
13 ---> 3
14 ---> 4 17 20 26
15 ---> 5 29
16 ---> 6 20
17 ---> 7 18 24
18 ---> 8 22
19 ---> 9 33 0 10
20 ---> 0
21 ---> 1 6
22 ---> 2 26 11
23 ---> 3
24 ---> 4 38 16
25 ---> 5 26 0
26 ---> 6 30
27 ---> 7 35 5 14 21
28 ---> 8 6 9
29 ---> 9
30 ---> 0 35 13
31 ---> 1 0 9
32 ---> 2 12 16
33 ---> 3 10 29
34 ---> 4
35 ---> 5 4
36 ---> 6 18 24 35
37 ---> 7
38 ---> 8 6
39 ---> 9
Getting a spark session...
Resource manager: local[*]
Spark version: 3.3.2
Starting cycle search. Search and messaging method is: 'nested_paths_udf'




Graph contains:
Edges #: 61
Vertices #: 37
Iteration: 1
Commencing messaging between vertices:




-RECORD 0------------------------------------
 id                | 29                      
 incoming_messages | [[33], [0], [15]]       
-RECORD 1------------------------------------
 id                | 26                      
 incoming_messages | [[14], [3], [22], [25]] 
-RECORD 2------------------------------------
 id                | 0                       
 incoming_messages | [[19], [25], [31]]      
-RECORD 3------------------------------------
 id                | 22                      
 incoming_messages | [[18]]                  
-RECORD 4------------------------------------
 id                | 32                      
 incoming_messages | [[1]]                   
only showing top 5 rows

-RECORD 0------------------------------------------------------------
 id                | 29                                              
 incoming_messages | [[33], [0], [15]]                               
 analysis_result   | {null, [[33, 29], [0, 29], [15, 29]]}           
-RECO

Unnamed: 0,cycles
0,"[2, 21, 6]"
1,"[6, 12, 24, 38]"
2,"[6, 14, 17, 24, 38]"
3,"[2, 35, 4, 24, 38, 6]"
4,"[2, 9, 35, 4, 24, 38, 6]"
5,"[3, 26, 30, 35, 4, 24, 38, 6]"
6,"[4, 24, 38, 6, 14, 26, 30, 35]"
7,"[6, 14, 17, 18, 22, 11, 8, 24, 38]"
8,"[2, 9, 25, 26, 30, 35, 4, 24, 38, 6]"
9,"[4, 24, 38, 6, 14, 17, 18, 22, 26, 30, 35]"


{'cluster_config': {'provider': 'Colab', 'workers_count': 1, 'worker_vcpu_count': 2, 'worker_memory_gb': 12, 'spark_ver': '3.3.2', 'probability_for_edge': 0.04}, 'edge_count': 61, 'vertex_count': 37, 'start_time': 1677072218.180882, 'end_time': 1677072268.3836012, 'iteration_metrics': [{'iteration_num': 1, 'runtime': 2.0288143157958984, 'cycles_count': 0}, {'iteration_num': 2, 'runtime': 2.1710782051086426, 'cycles_count': 0}, {'iteration_num': 3, 'runtime': 3.108274221420288, 'cycles_count': 1}, {'iteration_num': 4, 'runtime': 3.604018211364746, 'cycles_count': 1}, {'iteration_num': 5, 'runtime': 2.1204700469970703, 'cycles_count': 1}, {'iteration_num': 6, 'runtime': 2.208019256591797, 'cycles_count': 1}, {'iteration_num': 7, 'runtime': 2.2003567218780518, 'cycles_count': 1}, {'iteration_num': 8, 'runtime': 2.510918140411377, 'cycles_count': 2}, {'iteration_num': 9, 'runtime': 3.533104419708252, 'cycles_count': 1}, {'iteration_num': 10, 'runtime': 2.8598947525024414, 'cycles_count': 1

Unnamed: 0,iteration_num,runtime,cycles_count,edge_count,vertex_count,workers_count,worker_vcpu_count,provider,start_time,end_time
0,1,2.028814,0,61,37,1,2,Colab,1677072000.0,1677072000.0
1,2,2.171078,0,61,37,1,2,Colab,1677072000.0,1677072000.0
2,3,3.108274,1,61,37,1,2,Colab,1677072000.0,1677072000.0
3,4,3.604018,1,61,37,1,2,Colab,1677072000.0,1677072000.0
4,5,2.12047,1,61,37,1,2,Colab,1677072000.0,1677072000.0
5,6,2.208019,1,61,37,1,2,Colab,1677072000.0,1677072000.0
6,7,2.200357,1,61,37,1,2,Colab,1677072000.0,1677072000.0
7,8,2.510918,2,61,37,1,2,Colab,1677072000.0,1677072000.0
8,9,3.533104,1,61,37,1,2,Colab,1677072000.0,1677072000.0
9,10,2.859895,1,61,37,1,2,Colab,1677072000.0,1677072000.0


Testing run is done.
