In [1]:
# This notebook deals with creating an edgelist file containing all the edges in the nth largest
# connected component of the large edgelist (600 million edges). It does not rely on NetworkX
# or any other libraries other than Pandas to do this (since they are too computationally expensive)
import time
import dask.dataframe as dd
from fun.fun import *

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [2]:
class UnionFind:
    def __init__(self, n):
        self.parent = [i for i in range(n)]
        self.rank = [0] * n
        self.size = [1] * n

    def find(self, u):
        if self.parent[u] != u:
            self.parent[u] = self.find(self.parent[u])
        return self.parent[u]

    def union(self, u, v):
        pu, pv = self.find(u), self.find(v)
        if pu == pv:
            return
        if self.rank[pu] < self.rank[pv]:
            self.parent[pu] = pv
            self.size[pv] += self.size[pu]
        elif self.rank[pv] < self.rank[pu]:
            self.parent[pv] = pu
            self.size[pu] += self.size[pv]
        else:
            self.parent[pu] = pv
            self.rank[pv] += 1
            self.size[pv] += self.size[pu]

In [3]:
def initialize_unionfind(edges):
    print("Initializing UnionFind data structure ...")
    nodes = set(edges['source']).union(set(edges['target']))
    node_index = {node: i for i, node in enumerate(nodes)}
    n = len(nodes)
    print("Found {:_} unique nodes".format(n))
    uf = UnionFind(n)
    print("Computing union of all edges ...")
    handled, total = 0, len(edges)
    for _, row in edges.iterrows():
        uf.union(node_index[row['source']], node_index[row['target']])
        handled, perc  = track_progress(total, handled, "edges handled:", inc=100)
    print("\nDone.")
    return uf, node_index

In [5]:
def get_amount_of_components(uf, node_index):
    unique_parents = set()
    for i in range(len(node_index)):
        unique_parents.add(uf.find(i))
    return len(unique_parents)

In [38]:
# START
edges_fn = "../data/edges.parquet"
communities_fn = "../data/communities.csv"

In [7]:
# -> IN : Read edge list to dask df
print("reading edges ... ", end='')
start = time.time()
df = dd.read_parquet("../data/edges.parquet")
end = time.time()
print("read {:_} lines (took {:.1f}s)".format(len(df), (end-start)))
df.head()

reading edges ... read 684_732_453 lines (took 0.1s)


Unnamed: 0,source,target
0,13,103151
1,13,214293
2,103151,214293
3,13,138731
4,13,42023


In [8]:
# Initialize union find
uf, node_index = initialize_unionfind(df)

Initializing UnionFind data structure ...
Found 410_885 unique nodes
Computing union of all edges ...
 edges handled: 684_732_453/684_732_453 (100.00000%)

Done.


In [16]:
# create components by shared parent node (.find())
comps = {}
total, i = len(node_index), 0
for node, i in node_index.items():
    parent = uf.find(i)
    comps[parent] = comps.get(parent, []) + [node]
    i, perc = track_progress(total, i)
print("Done.")
print(len(comps))

 progress: 410_885/410_885 (100.00000%)

Done.
7746


In [None]:
# Sort keys by length of their group and generate group index
keys_sorted = sorted( comps.keys(), reverse=True, key=lambda parent: len(comps[parent]) )
group_index = { parent: (index+1) for index, parent in enumerate(keys_sorted) }
for k, i in group_index.items():
    print(k, i)

In [28]:
# Generate map from node to its group_i
node_groups = {}
for par, group in comps.items():
    group_i = group_index[par]
    for node in group:
        node_groups[node] = group_i

In [37]:
# <- OUT : Save node communities to file
import csv
file = open(communities_fn, 'w', newline='')
writer = csv.writer(file)
writer.writerow(['node', 'community'])
total, i = len(node_index), 0
for n in list(node_index.keys()):
    group_i = node_groups[n]
    writer.writerow([n, group_i])
    i, perc = track_progress(total, i, text="rows written:", inc=5)
file.close()

 rows written: 410_885/410_885 (100.00000%)
