# Task: discover user communities in the user-user network

in task 2 you produced the list of edges.

now you will need to transform those into two additional data structures:
* a set of graph nodes  
* a set of adjacency lists
 
 To understand their formats, consider this small example:
edges RDD:  `([('u1','u2','w1'),('u2','u3','w2'),('u2','u4','w3'),('u1','u5','w4')])`

* the nodes set contains the unique userIds that are found in the edges RDD:
  `['u3', 'u4', 'u2', 'u5', 'u1']`
* the adjacency lists are in the form of a python dictionary of dictionaries, as in this example:
```
[('u3', {'u2': 'w2'}),
 ('u2', {'u3': 'w2', 'u1': 'w1', 'u4': 'w3'}),
 ('u4', {'u2': 'w3'}),
 ('u5', {'u1': 'w4'}),
 ('u1', {'u5': 'w4', 'u2': 'w1'})]
 ```

once you have produced these datsa structures, you add them to a `Graph` object. This Graph is then passed on to a library that computes Single-source-Shortest-Paths (SSSP):
`graph = Graph('user-user-network', allNodes.collect(), allAdjLists)`

## available methods
The following library methods are provided to you to implement the Girwan-Newman algorithm, using this Graph as input:

### SSSP

which is available in module
`from comscan.algorithms.shortest_path.single_source_shortest_paths_dijkstra`

and is defined as:

```
def single_source_shortest_paths_dijkstra(graph, source, cutoff=None):
    """
    Provides Single Source Shortest Path implementation for a weighted graph
    :param graph: Weighted graph implemented with adjacency list.
    Edges in the graph are in the form of (u,v,w) where u,v are long and w is float
    :param source: source node
    :param cutoff: threshold value of largest possible distance in shortest paths.
    :return: all possible shortest paths from source to every other node in the graph
    """
```
this will return the set of all shortest paths in the graph, starting from a **specific source node**, e.g.

`paths, distances = single_source_shortest_paths_dijkstra(graph, source=<source node>)`

### computing the betweeness of each edge in the graph, given the shortest paths:

`def compute_edge_betweenness(shortest_paths_rdd: RDD) -> RDD:
    """Compute edge betweenness centrality of a graph in a distributed fashion given the shortest paths `
 
Using these methods and your knowledge of the GW algorithm, you should try and implement a Spark version of it where RDDs are parallelised where possible so that as much of the computation as possible occurs in parallel.

### removing edges from the graph:
`graph.remove_edge(u,v)`  where `(u,v)` is an edge (works by side-effect. no need to reassign the graph variable)

### calculating the number of connected components in the graph, once edges are removed (removing edges may produce multiple CC)
`ncc = number_connected_components(graph)`

### other utility functions
`get_top_k_edges_with_highest_betweenness(sc, graph, k)` 

will give you the top-k edges with highest betweeness, as you will look for connected components (communities) by repeatedly removing high-betweeness edges from the original graph

`_remove_self_loops(graph)` updates `graph` by removing all loops from a node to itself

please note:
* you may ask workers to call the shortst-path method. If you do so, be careful as the method requires access to the entire graph. So you must **broadcast** the graph data structure to the workers prior to letting them work in it.
* the total betweenees of an edge can be computed *piecemeal* because it can be obtained by adding up partial betweeness values. 

You will produce a set of communities. Each of them is represented by a set of node IDs, so you end up with a list of lists:

```[ [u,v,...], [u2, v2, ...] ]```

save the result to file as indicated below. don't forget to use your `<id>` to make your file name unique.

In [2]:
import pandas as pd
import pyspark.sql.functions as f
import os,sys

In [3]:
# IN:
EDGES_SMALL_PARQUET = "/FileStore/tables/edges-small.parquet"
# EDGES_100K_PARQUET = "/FileStore/tables/edges-100K.parquet"

# ALL_NODES_100K_TEXT = "/FileStore/tables/nodes-100K.txt"
# ALL_ADJLIST_100K_TEXT =  "/FileStore/tables/adjlist-100K.txt"

# Checkpoint files (to facilitate partial rerun during debugging)
ALL_NODES_SMALL_TEXT = "/FileStore/tables/180571930/nodes-small.txt"
ALL_ADJLIST_SMALL_TEXT =  "/FileStore/tables/180571930/adjlist-small.txt"
SHORTEST_PATH_SMALL =   "/FileStore/tables/180571930/shortest-paths-small.txt"

# SHORTEST_PATH_50K =   "/FileStore/tables/shortest-paths-50k.txt"
# SHORTEST_PATH_100K =   "/FileStore/tables/shortest-paths-100k.txt"

# OUT . the output you should produce containing all discovered communities (list of lists of graph nodes)
# CC_SMALL = "/FileStore/tables/ConnectedComponents-small.txt"
CC_SMALL = "/FileStore/tables/180571930/ConnectedComponents-small.txt"

In [4]:
# Import modules
from comscan.model.graph import Graph
from comscan.algorithms.connected import connected_components
from comscan.algorithms.connected import number_connected_components
from comscan.distributed_algorithms.betweenness import compute_edge_betweenness
from comscan.distributed_algorithms.shortest_paths import compute_shortest_paths

In [5]:
def _remove_self_loops(graph):
    for n, neighbors in graph.adj_list.items():
        if n in neighbors:
            graph.remove_edge(n, n)

In [6]:
def get_top_k_edges_with_highest_betweenness(sc, graph, k):
  shortest_paths_rdd = compute_shortest_paths(sc, graph)
  edge_betweenness_rdd = compute_edge_betweenness(shortest_paths_rdd)
  top_k_betweenness = edge_betweenness_rdd.sortBy(lambda x: x[1], ascending=False).take(k)
  # print("top_k_betweenness" + str(top_k_betweenness.collect()))
  # print("top_k_betweenness" + str(top_k_betweenness.take(100)))
  top_k_edges = list(map(lambda x: x[0],top_k_betweenness))
  return top_k_edges

# your code here.
once you have created:
* nodes
* adjLists 
as explained above, you *may* save them here for checkpointing.
however please note that there is no overwrite mode for pickle files. please ask a demonstrator on how to delete files from Databricks' DBFS

In [8]:
# 1. Create a Graph
# 1.1 Edge: Convert dataframe to rdd
edges = spark.read.parquet(EDGES_SMALL_PARQUET)  # DataFrame
edgesRdd = edges.rdd                             
# edgesRdd.collect()

In [9]:
edges.printSchema()

In [10]:
edges.agg(f.countDistinct('userId1')).show() # 609
edges.agg(f.countDistinct('userId2')).show() # 609

In [11]:
# 1.2 Create Node
nodesRdd = edgesRdd.keys().union(edgesRdd.values()).distinct()  
nodesRdd.count() # 610

In [12]:
# 1.3 Create Adjacency List - [(x[0], {x[1]: x[2]}), ..]
edges_Rdd = edgesRdd.map(lambda row : (row[0], {row[1]:row[2]}))             # [(1, {76: 0.0344}),...]
edges_RRdd = edgesRdd.map(lambda row : (row[1], {row[0]:row[2]}))            # [(76, {1: 0.0344}),...]
adjLists = edges_Rdd.union(edges_RRdd).reduceByKey(lambda x, y: {**x, **y})  # [(1, {76: 0.0344}),(1, {510: 0.0384}),...()]
# adjLists.count() # 610
adjLists.take(10)

### checkpoint save nodes and lists

In [14]:
%fs
rm -r "/FileStore/tables/180571930/nodes-small.txt"

In [15]:
%fs
rm -r "/FileStore/tables/180571930/adjlist-small.txt"

In [16]:
# note this will fail if the files already exist
nodesRdd.saveAsPickleFile(ALL_NODES_SMALL_TEXT)
adjLists.saveAsPickleFile(ALL_ADJLIST_SMALL_TEXT) 

### checkpoint. load nodes and adjlist from file

In [18]:
## load up all nodes from file, as a RDD. However the Graph object expects to see a list, thus we use collect() here
nodes_rdd = sc.pickleFile(ALL_NODES_SMALL_TEXT)
nodes = nodes_rdd.collect()

## adjLists is an RDD representing a dictionary of dictionaries (the adjlist). the collectAsMap() method reconstructs the original dictionary
adjLists = sc.pickleFile(ALL_ADJLIST_SMALL_TEXT).collectAsMap()

In [19]:
type(adjLists)

create a `Graph()`   object and add all nodes and adjacency lists

In [21]:
# 1.4 Create a graph
graph = Graph('user-user-network', nodes, adjLists)
# type(graph) # comscan.model.graph.Graph

In [22]:
# graph.nodes          # [200, 400, 600, 401, 1, 601, 201, 2, 602, 202]
# graph.edges          # <generator object edges at 0x7f40627c4e60>

# your Girwan-Newman implementation goes here

In [24]:
# 2. Girwan-Newman algorithm
# 2.1 Broadcast the graph data structure to the workers prior to letting them work in it.
sc.broadcast(nodes)
sc.broadcast(adjLists)

In [25]:
# 2.2 Removing all loops from a node to itself
_remove_self_loops(graph)

In [26]:
k = 10000        # Set k
targetNCC = 15   # Set a target ncc
count = 0        # the number of cut edges
ncc = number_connected_components(graph)
cc = list(connected_components(graph))

while ncc < targetNCC:
  # 2.3 Get top-k edges with highest betweeness
  top_k_edges = get_top_k_edges_with_highest_betweenness(sc, graph, k)
    
  # 2.4 Removing edges from the graph:
  for edge in top_k_edges:
    graph.remove_edge(edge[0], edge[1])
  count += k
  
  # 2.5 Calculate the number of connected components
  newNcc = number_connected_components(graph)
  
  # 2.6 Calculate connected components
  newCc = list(connected_components(graph))
  
  # 2.7 Community Detect
  if(newNcc != ncc)
    ncc = newNcc
    cc = newCc
    print("Detect a new community, Remove %i edges" % count)

In [27]:
# 3 Save file 
# 3.1 list->rdd
ccSmallRdd = sc.parallelize(cc)
# ccSmallRdd.collect()

In [28]:
%fs
rm -r "/FileStore/tables/180571930/ConnectedComponents-small.txt"

In [29]:
ccSmallRdd.saveAsPickleFile(CC_SMALL)

In [30]:
# 3.2 Test 
test = sc.pickleFile(CC_SMALL)
test.collect()