# Distributed Betweenness

Notebo

In [1]:
### Initialisation

In [1]:
from pyspark.sql import SparkSession
import os
import sys

SPARK_HOME="/home/human/l/rh/platform/spark-2.3.0-bin-hadoop2.7"
DATA = "data/"
OUTPUT = DATA + "out/"

## SparkSession is one main connection with the Spark driver
session = SparkSession \
    .builder \
    .appName("Distributed-Community-Scanner") \
    .config("spark.sql.autoBroadcastJoinThreshold", '-1') \
    .config("spark.ui.enabled", 'true')\
    .getOrCreate()

sc = session.sparkContext

In [2]:
# Add the parent project to the syspath to be able to import the modules
# https://stackoverflow.com/questions/34478398/import-local-function-from-a-module-housed-in-another-directory-with-relative-im

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
# Import modules
from inout.parser import parse
from inout.parser import get_headers
from view.visualiser import draw
from algorithms.shortest_path import single_source_shortest_paths_dijkstra
from distributed_algorithms.betweenness import compute_edge_betweenness

In [25]:
# edges_file = 'edges.csv'
edges_limit = 100  # maximum numbers of edges to be parsed.
edges_file = 'girvan_graph.csv'
edges_path = os.path.join(DATA, edges_file)
headers = get_headers(edges_path)
source_header, target_header, weight_header = headers

# Parse with edges limit:
# graph = parse(edges_path, edge_limit=edges_limit, 
#               source_header=source_header, target_header=target_header, weight_header=weight_header)

# Parse the entire graph (no 'edge_limit' parameter):
graph = parse(edges_path,  # edge_limit=edges_limit, 
              source_header=source_header, target_header=target_header, weight_header=weight_header)

len(list(graph.edges))

38

In [26]:
# draw(graph)

### Broadcast Graph and Algorithms to Workers

In [27]:
# Add dependencies to workers
dependencies_path = "community_scanner.zip"
sc.addPyFile(dependencies_path)

In [28]:
graphBC = sc.broadcast(graph)

In [29]:
def sssp(source):
    """
    Single-Source Shortest path function for the Spark workers.
    Prerrequisite: the 'algorithms' module needs to be added as dependency to all workers with sc.addPyFile  
    :param source: 
    :return: 
    """
    from algorithms.shortest_path import single_source_shortest_paths_dijkstra
    paths, distances = single_source_shortest_paths_dijkstra(graphBC.value, source=source)
    return (source, paths)

### Parallelise node IDs into RDDs

In [9]:
nodes_rdd = sc.parallelize(graph.nodes) 
nodes_rdd.take(5)

[1, 6, 8, 2, 3]

In [30]:
nodes_rdd.getNumPartitions()

8

In [31]:
# Get number of elements in each partition
nodes_rdd.glom().map(len).collect()

[2, 2, 4, 2, 2, 4, 2, 4]

### Compute shortests paths in workers

In [32]:
# shortest_paths_rdd := [(source, paths)]
# paths := {target: [shortest_path]}
# shortest_path := [node]

shortest_paths_rdd = nodes_rdd.map(sssp)
shortest_paths_rdd.cache()
shortest_paths_rdd.take(1)

[(1,
  {1: [[1]],
   2: [[1, 6, 2], [1, 8, 2]],
   3: [[1, 6, 2, 3], [1, 8, 2, 3], [1, 6, 4, 3]],
   4: [[1, 6, 4]],
   5: [[1, 6, 2, 5], [1, 8, 2, 5], [1, 8, 7, 5]],
   6: [[1, 6]],
   7: [[1, 8, 7]],
   8: [[1, 8]],
   9: [[1, 6, 12, 10, 9]],
   10: [[1, 6, 12, 10]],
   11: [[1, 6, 12, 15, 11]],
   12: [[1, 6, 12]],
   13: [[1, 6, 12, 15, 11, 13], [1, 6, 12, 15, 14, 13]],
   14: [[1, 6, 12, 15, 14]],
   15: [[1, 6, 12, 15]],
   16: [[1, 6, 12, 15, 16]],
   17: [[1, 6, 2, 3, 21, 17],
    [1, 8, 2, 3, 21, 17],
    [1, 6, 4, 3, 21, 17],
    [1, 6, 2, 5, 22, 17],
    [1, 8, 2, 5, 22, 17],
    [1, 8, 7, 5, 22, 17],
    [1, 6, 12, 10, 18, 17],
    [1, 6, 12, 15, 16, 17]],
   18: [[1, 6, 12, 10, 18]],
   19: [[1, 6, 2, 3, 21, 19],
    [1, 8, 2, 3, 21, 19],
    [1, 6, 4, 3, 21, 19],
    [1, 6, 12, 10, 18, 19]],
   20: [[1, 6, 12, 10, 18, 20]],
   21: [[1, 6, 2, 3, 21], [1, 8, 2, 3, 21], [1, 6, 4, 3, 21]],
   22: [[1, 6, 2, 5, 22], [1, 8, 2, 5, 22], [1, 8, 7, 5, 22]]})]

### Compute Betweeness

In [33]:
# shortest_paths_rdd := [(source, paths_dict)]
# paths_dict := {target: [shortest_path]}
# shortest_path := [node]


def map_paths_count(rdd_row):
    source, paths_dict = rdd_row
    
    paths_count = list()
    
    for (target, paths_list) in paths_dict.items():
        
        u, v = min(source, target), max(source, target)
        
        paths_count.append( 
            ((u, v), len(paths_list))
        )
        
    return paths_count


# paths_count_rdd := [((source, target), paths_count)]
paths_count_rdd = shortest_paths_rdd.flatMap(map_paths_count)
paths_count_rdd.cache()
paths_count_rdd.take(5)

[((1, 1), 1), ((1, 2), 2), ((1, 3), 3), ((1, 4), 1), ((1, 5), 3)]

In [14]:
def get_edge_count(paths_list):
    """

    :param paths_list:
    :return: key = edge, value = count of shortest paths through that edge to this target
    """
    edge_count = dict()

    for path in paths_list:
        for i in range(len(path) - 1):
            
            u, v = min(path[i], path[i + 1]), max(path[i], path[i + 1])
            
            edge = (u, v)
            edge_count[edge] = edge_count.get(edge, 0) + 1.0

    return edge_count


def map_edges_count(rdd_row):
    source, paths_dict = rdd_row
    
    edges_count_list = list()
    
    for (target, paths_list) in paths_dict.items():
        for (edge, edge_count) in get_edge_count(paths_list).items():
            
            u, v = min(source, target), max(source, target)
            
            edges_count_list.append((
                (u, v), (edge, edge_count)
            ))
     
    return edges_count_list


# edges_count_rdd := [((source, target), (edge, edge_count))]
# (source, target): define the nodes for shortest path computation, i.e. this is not an edge
# edge := (u, v)
# edge_count: number of shortest paths between 'source' and 'target' that goes through 'edge'
edges_count_rdd = shortest_paths_rdd.flatMap(map_edges_count)
edges_count_rdd.cache()
edges_count_rdd.take(5)

[((1, 2), ((2, 8), 1.0)),
 ((1, 2), ((1, 8), 1.0)),
 ((1, 2), ((2, 6), 1.0)),
 ((1, 2), ((1, 6), 1.0)),
 ((1, 3), ((1, 8), 1.0))]

In [34]:
# [((source, target), ((edge, edge_count), paths_count)]
edges_path_count_rdd = edges_count_rdd.join(paths_count_rdd)

# [(edge, betweenness)]
# this produces the edge betweennes per path, 
# i.e. one edge can appear several times in this rdd,
# one time per each shortest path
edges_betweenness_per_path_rdd = edges_path_count_rdd.map(lambda row: (
    row[1][0][0],  # edge 
    row[1][0][1] / row[1][1]  # edge_count / path_count  (sigma_st(e) / sigma_st in Girvan-Newman paper)  
))

# [(edge, betweenness)]
# aggregates (sums) the betweenness of the edges
# In this rdd each edge appears only once.

edge_betweenness_rdd = edges_betweenness_per_path_rdd.reduceByKey(
    lambda accumulator, edge_count: accumulator + edge_count
)

edge_betweenness_rdd.take(5)

[((2, 8), 69.9047619047619),
 ((6, 12), 191.4793650793651),
 ((11, 15), 106.36190476190477),
 ((5, 22), 135.8968253968254),
 ((13, 14), 45.63809523809524)]

In [35]:
assert edge_betweenness_rdd.count() == len(list(graph.edges))

In [36]:
edge_betweenness_rdd

PythonRDD[44] at RDD at PythonRDD.scala:48

In [37]:
nodes_rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

In [21]:
shortest_paths_rdd

PythonRDD[4] at RDD at PythonRDD.scala:48