# Course project - Database Management MAP543
***
In the following, we explore an efficient and scalable approach in MapReduce to find all the connected components in a given graph. Finding connected components in a graph is a well-known problem in a wide variety of application areas such as social network analysis, data mining, image processing ...

•The algorithm is descibed in this [paper](https://www.cse.unr.edu/~hkardes/pdfs/ccf.pdf)

•The work to consists of understanding the MapReduce algorithm, and coding it into Spark by using both RDD and DataFrames 

•Both Python and Scala implementations must be provided,  

•Experimental analysis comparing the RDD and DataFrame versions has to be conducted on graphs of increasing size 

•For small graphs use Databricks, for bigger ones (<20GB) use the GC cluster.


We will also implement different techniques and compare them, based ont this [video](https://www.youtube.com/watch?v=Io1x6mQlh1E)

A graph is a mathematical structure used to model relationships between objects. It consists of a set of nodes and a set of edges that connect pairs of nodes.

A graph is said to be connected if for every pair of vertices in a graph, there exists a sequence of edges that forms a path between those vertices.

We chose to use the [Web Google graph](http://snap.stanford.edu/data/web-Google.html) released in 2002 by Google. There are 875K nodes and 5.1M edges in this graph. Nodes represent web pages and directed edges represent hyperlinks between them.

In [2]:
import os
from pyspark import SparkContext, SparkConf, HashPartitioner
from pyspark.rdd import RDD
import time

# Set the absolute path to your JDK installation
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk-22.jdk/Contents/Home'

# Configure and initialize Spark
conf = SparkConf().setAppName("Connected_Components").setMaster("local")
sc = SparkContext(conf=conf)

24/04/09 15:03:10 WARN Utils: Your hostname, Gaspards-Laptop.local resolves to a loopback address: 127.0.0.1; using 10.222.22.68 instead (on interface en0)
24/04/09 15:03:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/09 15:03:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Import Data

In [3]:
file_path = "/Users/gaspardhassenforder/Documents/X/Database Management/Project_Connected_graphs/web-Google.txt.gz"
data = sc.textFile(file_path)

# clean the data
data = data.filter(lambda x: "#" not in x)
data = data.map(lambda x: x.split("\t")).map(lambda x: (int(x[0]), int(x[1])))

# Techniques from paper

## CCF-Iterate

In [18]:
# Function corresponding to CCF-Iterate
def ccf_iterate(rdd):
    # Map part of function
    # transform (A -> B) to (A, B) and (B, A) since the graph is undirected
    mapped_values = rdd.flatMap(lambda x: [(x[0], x[1]), (x[1], x[0])]) # map part of the CCF-Iterate algorithm
    
    # Reduce Part of the function
    graph = mapped_values.groupByKey().mapValues(list) # (key, [neighbors])
    graph = graph.map(lambda x: (x[0], x[1], min(x[1]))) # for each pair get min neighbor (key, [neighbors], min neighbor)
    
    graph = graph.filter(lambda x: x[0] > x[2]) # we do nothing if key < min neighbor
    graph = graph.map(lambda x: (x[0], [val for val in x[1] if val != x[2]], x[2])) # get all non min neighbors, (key, [non min neighbors], min neighbor)
    
    # get the number of new pairs
    # new_pairs = graph.map(lambda x: len(x[1])).sum() 
    new_pairs = graph.flatMap(lambda x: [(x[0], x[2])] + [(val, x[2]) for val in x[1]]).distinct().count()
    
    edges_1 = graph.map(lambda x: (x[0], x[2])) # Create the edge (key, min neighbor)
    edges_2 = graph.flatMap(lambda x: [(val, x[2]) for val in x[1] if val != x[2]]) # Create the edge (non min neighbor, min neighbor)
    edges = edges_1.union(edges_2) # Union the two RDDs
    

    output_graph = edges.filter(lambda x: x[0] != x[1]) # filter the graph
    output_graph = output_graph.groupByKey().mapValues(list) # (key, [values])
    
    
    
    
    return output_graph, new_pairs


def ccf_iterate_and_dedup(rdd):
   
    # Map part of function
    # transform (A -> B) to (A, B) and (B, A) since the graph is undirected
    mapped_values = rdd.flatMap(lambda edge: [(edge[0], edge[1]), (edge[1], edge[0])])

    # Group nodes by their adjacency lists and convert to integer for processing
    adjacency_lists = mapped_values.groupByKey().mapValues(list)
    adjacency_lists = adjacency_lists.map(lambda x: (int(x[0]), sorted(set(map(int, x[1])))))

    # Emit new pairs: (node, min_connected_node) and (neighbor, min_connected_node)
    new_pairs_rdd = adjacency_lists.flatMap(
        lambda x: [(x[0], x[1][0])] + [(neighbor, x[1][0]) for neighbor in x[1] if neighbor != x[0]]
    ).distinct()

    # Count the new pairs for convergence assessment
    new_pairs_count = new_pairs_rdd.count()


    return new_pairs_rdd, new_pairs_count



def optimized_ccf_iterate(rdd):

    num_partitions = 200
    partitioner = HashPartitioner(num_partitions)
    
    # Ensure the RDD is partitioned as specified for optimized processing
    rdd = rdd.partitionBy(num_partitions)
    
    # Emit pairs in both directions to make the graph undirected
    bidirectional_edges = rdd.flatMap(lambda edge: [(edge[0], edge[1]), (edge[1], edge[0])])
    
    # Reduce shuffling by using the same partitioner
    bidirectional_edges = bidirectional_edges.partitionBy(num_partitions)
    
    # Group by key and cache the result for performance improvement
    adjacency_lists = bidirectional_edges.groupByKey().mapValues(list).cache()
    
    # Minimize connections by selecting the minimum neighbor
    minimized_connections = adjacency_lists.map(lambda x: (x[0], min(x[1])))
    
    # Emit new pairs: (node, min_neighbor) and (neighbor, min_neighbor)
    new_pairs_rdd = adjacency_lists.flatMap(
        lambda x: [(x[0], min(x[1]))] + [(n, min(x[1])) for n in x[1] if n != x[0]]
    ).distinct().partitionBy(num_partitions)
    
    # Count the new pairs for convergence assessment
    new_pairs_count = new_pairs_rdd.count()
    
    return new_pairs_rdd, new_pairs_count









## CCF- Iterate with secondary sorting

In [29]:
def ccf_iterate_sorting_and_dedup(rdd):
   
    # Emit edges in both directions to ensure the graph is undirected
    mapped_values = rdd.flatMap(lambda edge: [(edge[0], edge[1]), (edge[1], edge[0])])

    # Group nodes by their adjacency lists and convert to integer for processing
    adjacency_lists = mapped_values.groupByKey().mapValues(list)
    adjacency_lists = adjacency_lists.map(lambda x: (int(x[0]), sorted(set(map(int, x[1])))))

    # Determine the minimum connected node for each node
    min_connections = adjacency_lists.map(lambda x: (x[0], min(x[1])))

    # Emit new pairs: (node, min_connected_node) and (neighbor, min_connected_node)
    new_pairs_rdd = adjacency_lists.flatMap(
        lambda x: [(x[0], x[1][0])] + [(neighbor, x[1][0]) for neighbor in x[1] if neighbor != x[0]]
    ).distinct()

    # Count the new pairs for convergence assessment
    new_pairs_count = new_pairs_rdd.count()

    # Prepare for the next iteration by forming a new graph representation
    updated_graph = new_pairs_rdd

    return updated_graph, new_pairs_count


def ccf_iterate_with_secondary_sorting(rdd):
    """
    Performs an iteration of the CCF algorithm utilizing secondary sorting to optimize the process.
    
    Args:
    rdd (RDD): An RDD of tuples representing the edges of the graph (node1, node2)
    num_partitions (int): The number of partitions to use
    
    Returns:
    RDD: Updated graph representation after the iteration
    int: Count of new pairs formed during the iteration
    """
    num_partitions = 200
    
    # Emit pairs in both directions and ensure they are keyed for sorting
    def emit_and_prepare_for_sort(edge):
        return [(edge[0], (edge[1], 'a')), (edge[1], (edge[0], 'b'))]
    
    # Apply the custom secondary sort by first ensuring the 'a' values (original edges) come before 'b' values (inverted edges)
    sorted_edges = rdd.flatMap(emit_and_prepare_for_sort).groupByKey().mapValues(sorted)
    
    # Process each group to find the minimum and emit pairs accordingly
    def process_group(group):
        key, values = group
        min_value = values[0][0]  # Assumption: values are sorted, first value's neighbor is min
        new_pairs = [(key, min_value)]
        for value, tag in values:
            if value != min_value:
                new_pairs.append((value, min_value))
        return new_pairs
    
    # Using mapPartitions to preserve partitioning and avoid shuffling
    processed_edges = sorted_edges.mapPartitions(lambda partition: (pair for group in partition for pair in process_group(group)), preservesPartitioning=True)
    
    # Flatten, deduplicate, and count new pairs
    new_pairs_rdd = processed_edges.distinct().partitionBy(num_partitions)
    new_pairs_count = new_pairs_rdd.count()
    
    return new_pairs_rdd, new_pairs_count

# Techniques from video

In [33]:
def small_star(graph):
    for edge in graph:
        yield min(edge), max(edge)

def large_star(graph):
    min_nodes = {node: min(edge) for edge in graph for node in edge}
    for edge in graph:
        yield min_nodes[edge[0]], min_nodes[edge[1]]
        
def find_connected_components(graph):
    while True:
        # Apply Small-Star operation
        small_star_graph = list(small_star(graph))
        
        # Apply Large-Star operation
        large_star_graph = list(large_star(small_star_graph))
        
        # Check if the graph has changed
        if set(large_star_graph) == set(graph):
            break
        
        # Update the graph
        graph = large_star_graph
    
    # Count the number of unique nodes
    num_components = len(set(node for edge in graph for node in edge))
    
    return num_components

In [34]:
start_time = time.time()
numComponents = find_connected_components(data.collect())
end_time = time.time()
time_taken_star = end_time - start_time
print("Number of connected components: ", numComponents)
print("Elapsed time: ", time_taken_star)

                                                                                

Number of connected components:  610570
Elapsed time:  22.93051791191101


In [45]:
def perform_large_star_operation(edges_rdd):
    """
    Applies the Large-Star operation on an RDD representing a graph.
    """
    def to_min_connections(pair):
        node, neighbors = pair
        min_neighbor = min([node] + neighbors)
        return [(n, min_neighbor) for n in [node] + neighbors]

    # Assuming edges_rdd is an RDD of (node, neighbor)
    return edges_rdd \
        .groupByKey() \
        .mapValues(list) \
        .flatMap(to_min_connections) \
        .distinct() \
        .reduceByKey(min)

def perform_small_star_operation(edges_rdd):
    """
    Applies the Small-Star operation on an RDD representing a graph.
    """
    return edges_rdd \
        .map(lambda x: (x if x[0] < x[1] else (x[1], x[0]))) \
        .distinct() \
        .reduceByKey(min)


def find_connected_components_with_stats(edges_rdd):
    """
    Finds connected components, returns the number of distinct components, 
    and the size of the largest component.
    """
    prev_rdd = None
    curr_rdd = edges_rdd.cache()

    # Iterate until no further changes
    while prev_rdd is None or not curr_rdd.subtract(prev_rdd).isEmpty():
        if prev_rdd is not None:
            prev_rdd.unpersist()
        
        # Perform Large-Star Operation
        curr_rdd = perform_large_star_operation(curr_rdd).cache()
        
        # Perform Small-Star Operation
        curr_rdd = perform_small_star_operation(curr_rdd).cache()
        
        prev_rdd = curr_rdd

    # After finding connected components, calculate stats
    # Count the number of distinct components by looking at unique node connections to the smallest node
    num_components = curr_rdd.map(lambda x: x[1]).distinct().count()
    
    # Calculate the size of the largest component
    # First, map each edge to its component identifier, then count occurrences (sizes)
    component_sizes = curr_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)
    largest_component_size = component_sizes.map(lambda x: x[1]).max()

    return curr_rdd, num_components, largest_component_size

curr_rdd, num_components, largest_component_size = find_connected_components_with_stats(data)

print("Number of connected components: ", num_components)
print("Size of the largest component: ", largest_component_size)


                                                                                

Number of connected components:  163154
Size of the largest component:  1


In [46]:
from pyspark import SparkContext

# Assume a SparkContext sc is already created and 'data' RDD exists containing all edges.
# sc = SparkContext(appName="ConnectedComponents")
# data = sc.parallelize([(1, 2), (2, 3), (3, 1), ...])

def get_bidirectional_edges(line):
    nodes = [int(x) for x in line.split(" ")]
    return [(nodes[0], nodes[1]), (nodes[1], nodes[0])]

def large_star_step(edge):
    node, neighbor = edge
    return (node, neighbor) if node < neighbor else (neighbor, node)

def small_star_step(key_val_pair):
    key, (val1, val2) = key_val_pair
    if val1 != val2:
        return (val1, val2)
    else:
        return (key, val1)

# Assuming 'data' contains edges in tuple format, skip the conversion step with get_bidirectional_edges
edges = data.flatMap(lambda e: [e, (e[1], e[0])])
edges.persist()

# Perform Large-Star operations until convergence
while True:
    min_edges = edges.reduceByKey(min).map(lambda v: (v[0], min(v[0], v[1])))
    large_star_edges = edges.filter(lambda e: e[0] < e[1])
    new_edges = large_star_edges.join(min_edges).map(lambda e: (e[1][0], e[1][1])).distinct()

    # Perform Small-Star operations
    small_star_edges = new_edges.map(large_star_step).distinct()
    small_star_edges.persist()
    min_small_star_edges = small_star_edges.reduceByKey(min).map(lambda v: (v[0], min(v[0], v[1])))
    updated_edges = small_star_edges.join(min_small_star_edges).map(small_star_step).distinct()

    # Check for convergence
    updated_bidirectional_edges = updated_edges.flatMap(lambda e: [(e[0], e[1]), (e[1], e[0])])
    if updated_bidirectional_edges.subtract(edges).isEmpty() and edges.subtract(updated_bidirectional_edges).isEmpty():
        break
    else:
        edges = updated_bidirectional_edges
        edges.persist()

# Finalize connected components representation
final_edges = updated_edges.map(large_star_step).distinct()
final_edges.persist()

# Example of how to use the final edges
print(final_edges.take(10))

component_groups = final_edges.map(lambda edge: edge[1]).distinct()

# Count the number of distinct connected components
num_connected_components = component_groups.count()

print(f"Number of Connected Components: {num_connected_components}")

                                                                                

[(58, 850832), (58, 580752), (58, 470672), (58, 629904), (58, 588688), (58, 212880), (58, 782736), (3898, 796304), (1338, 502160), (53306, 191376)]




Number of Connected Components: 663864


                                                                                

24/04/09 18:39:22 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 972424 ms exceeds timeout 120000 ms
24/04/09 18:39:22 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/09 18:39:25 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

# Apply techniques

In [30]:
def find_connected_components(graph, secondary_sorting=False):
    new_pairs = 1
    old_pairs = 0
    iter = 0
    while new_pairs != old_pairs:
        old_pairs = new_pairs
        
        if secondary_sorting:
            graph, new_pairs = ccf_iterate_with_secondary_sorting(graph)
        else:
            graph, new_pairs = iterative_ccf(graph)
            
        print("Iteration: ", iter, " - New pairs: ", new_pairs)
        
        if new_pairs == old_pairs:
            print("No change in new pairs; stopping iterations.")
            break
        
        iter += 1
        
    n_components = graph.map(lambda x : x[1]).distinct().count()
    return graph, iter - 1, n_components

In [31]:
start_time = time.time()
result, num_iter, n_components = find_connected_components(data, True)
end_time = time.time()
time_taken_ccf = end_time - start_time

print(f"Number of connected components: ", {n_components}, " - Number of iterations: ", {num_iter})


                                                                                

Iteration:  0  - New pairs:  3715310


                                                                                

Iteration:  1  - New pairs:  3149998


                                                                                

Iteration:  2  - New pairs:  3050320


                                                                                

Iteration:  3  - New pairs:  1950247


                                                                                

Iteration:  4  - New pairs:  1748073


                                                                                

Iteration:  5  - New pairs:  1745934


                                                                                

Iteration:  6  - New pairs:  1745934
No change in new pairs; stopping iterations.




Number of connected components:  {5492}  - Number of iterations:  {5}


                                                                                

In [32]:
largest_component = result.groupBy(lambda x : x[1]).map(lambda x : len(x[1])).max()
print(f"\n The largest component has {largest_component} elements")




 The largest component has 855801 elements


                                                                                

# Plot

In [18]:
sc.stop()

In [29]:
for item in data.take(10):  # Use .take() to fetch a manageable number of elements
    print(item)

# Directed graph (each unordered pair of nodes is saved once): web-Google.txt 
# Webgraph from the Google programming contest, 2002
# Nodes: 875713 Edges: 5105039
# FromNodeId	ToNodeId
0	11342
0	824020
0	867923
0	891835
11342	0
11342	27469


                                                                                