# Visualizer

In [None]:
def visualizeGraph(n, edges, clusters, n_colors=10, nodes_q=None, title='default'):
    G = nx.Graph()
    G.add_edges_from(edges)
    G.add_nodes_from(range(1, n + 1))
    colors = plt.cm.rainbow(np.linspace(0, 1, n_colors))
    colors = [mcolors.to_hex(color) for color in colors]
    colors_assign_community = {comm: colors[comm] for comm in set(clusters.values())}
    pos = nx.spring_layout(G)  # Use spring layout (force-directed)
    edge_trace = []
    for edge in G.edges():
        x0, y0 = pos[edge[0]]
        x1, y1 = pos[edge[1]]
        edge_trace.append(
            go.Scatter(
                x=[x0, x1, None], y=[y0, y1, None],
                line=dict(width=0.5, color="#888"),
                hoverinfo="none",
                mode="lines"
            )
        )

    # Create node traces grouped by community
    node_traces = []
    for community, color in colors_assign_community.items():
        community_nodes = [node for node in G.nodes if clusters[node] == community]
        node_x = [pos[node][0] for node in community_nodes]
        node_y = [pos[node][1] for node in community_nodes]

        if nodes_q is None:
            node_traces.append(
                go.Scatter(
                    x=node_x, y=node_y,
                    mode="markers",
                    marker=dict(size=40, color=color, line=dict(width=1)),
                    text=[f"Node {node}" for node in community_nodes],
                    hoverinfo="text"
                )
            )
        else:
                node_traces.append(
                    go.Scatter(
                        x=node_x, y=node_y,
                        mode="markers",
                        marker=dict(size=40, color=color, line=dict(width=1)),
                        text=[nodes_q[node - 1] for node in community_nodes],
                        hoverinfo="text"
                    )
                )

    # Combine traces
    fig = go.Figure(data=edge_trace + node_traces)
    fig.update_layout(
        showlegend=False,
        hovermode="closest",
        margin=dict(b=0, l=0, r=0, t=0),
        xaxis=dict(showgrid=False, zeroline=False),
        yaxis=dict(showgrid=False, zeroline=False)
    )

    fig.show()
    fig.write_html(f"./{title}.html")
    return colors_assign_community

In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
        .appName("HotpotQA Clustering")
        .config('spark.executor.instances','2')
        .config('spark.executor.memory','12G')
        .config("spark.driver.memory", "4G")
        .config('spark.executor.cores','6')
        .config('spark.dynamicAllocation.enabled','false') # musai??
        .master('spark://master:7077')
        .getOrCreate())
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/17 02:47:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
import sys
import os

def initialize_communities(num_nodes):
    all_nodes = [i for i in range(1, num_nodes + 1)]
    all_nodes = sc.parallelize(all_nodes)
    communities = all_nodes.map(lambda x_: (x_, x_)).collectAsMap()  # x = node
    return communities


def compute_modularity():
    communities = communities_broadcast.value
    A = bc_A.value
    k = bc_k.value
    total_weight = bc_total_weight.value
    node_pairs = sc.parallelize(bc_node_pairs.value)

    same_community = node_pairs.map(lambda e: (e[0], e[1], communities[e[0]], communities[e[1]])) \
        .filter(lambda x_: x_[2] == x_[3]) \
        .map(lambda x_: A.get((x_[0], x_[1]), 0) - (1 / (2 * total_weight)) * k[x_[0]] * k[x_[1]]) \
        .sum()
    return (1 / (2 * total_weight)) * same_community


def compute_delta_modularity(node, paired):
    bc_communities = communities_broadcast.value
    A = bc_A.value
    k = bc_k.value
    total_weight = bc_total_weight.value
    node_pairs = bc_node_pairs.value
    if bc_communities[node] != bc_communities[paired]:
        bc_communities[node] = bc_communities[paired]

    same_community = 0.0
    for e in node_pairs:
        if bc_communities[e[0]] == bc_communities[e[1]]:
            same_community += A.get((e[0], e[1]), 0) - (1 / (2 * total_weight)) * k[e[0]] * k[e[1]]
    mod = (1 / (2 * total_weight)) * same_community
    improved = False
    if bc_modularity.value < mod:
        improved = True
    return paired, mod, improved


old_dir = os.getcwd()
print(old_dir)
INPUT_PATH = '/home/ubuntu/data/graph-AS-129.txt'
IT_COUNT = 4

precision = 1e-6
data = sc.textFile(INPUT_PATH)
os.chdir('/home/ubuntu/jupyter/results/Clusterization Results')
metadata = data.zipWithIndex() \
    .filter(lambda x_: x_[1] == 0) \
    .map(lambda line: [info for info in line[0].split(' ')]) \
    .collect()
n = int(metadata[0][1])
m = int(metadata[0][2])
n_broadcast = sc.broadcast(n)
print(f"Number of nodes: {n}")
print(f"Number of edges: {m}")
edges = data.zipWithIndex() \
    .filter(lambda x_: n < x_[1] < n + m + 1) \
    .map(lambda line: [coordinate for coordinate in line[0].split(' ')[1:]]) \
    .map(lambda line: [(int(line[0]), int(line[1])), float(line[2])])
nodes = data.zipWithIndex() \
    .filter(lambda x_: 0 < x_[1] < n + 1) \
    .map(lambda line: " ".join(line[0].split(' ')[2:]))
nodes_q = nodes.collect()
# coordinates = data.zipWithIndex() \
#     .filter(lambda x: x[1] > m) \
#     .map(lambda line: [float(coordinate) for coordinate in line[0].split(' ')[1:]]) \
#     .collect()
# artificially extend dataset with self-loops
node_pairs = [((i, i), 0) for i in range(1, n + 1)]
self_loops = sc.parallelize(node_pairs)
edges = edges.union(self_loops)
# persist data
edges.persist()

for it in range(IT_COUNT):
    total_weight = edges.map(lambda e: e[1]).sum()
    bc_total_weight = sc.broadcast(total_weight)
    node_pairs_vec = [(i, j) for i in range(1, n + 1) for j in range(1, n + 1)]
    bc_node_pairs = sc.broadcast(node_pairs_vec)
    A = edges.flatMap(lambda e: (((e[0][0], e[0][1]), e[1]), ((e[0][1], e[0][0]), e[1]))).collectAsMap()
    k = edges.flatMap(lambda e: [(e[0][0], e[1]), (e[0][1], e[1])]) \
        .reduceByKey(lambda x_, y: x_ + y).collectAsMap()
    for node in range(1, n + 1):
        if node not in k.keys():
            k[node] = 0
    for pair in node_pairs_vec:
        if pair not in A.keys():
            A[pair] = 0.0
    bc_A = sc.broadcast(A)
    bc_k = sc.broadcast(k)
    new_communities = initialize_communities(n)
    # print(new_communities)
    communities_broadcast = sc.broadcast(new_communities)
    modularity = compute_modularity()
    bc_modularity = sc.broadcast(modularity)
    # print(modularity)

    # form communities from each node, compute modularity, choose best
    improvement = True
    global_improvement = False
    adjacency_list = edges.flatMap(lambda e: [(e[0][0], (e[0][0], e[0][1])), (e[0][1], (e[0][1], e[0][0]))]) \
        .groupByKey() \
        .collectAsMap()
    while improvement:
        improvement = False
        for node, pairs in adjacency_list.items():
            pairs = sc.parallelize(pairs)
            favourite_config = pairs.filter(lambda x_: x_[0] != x_[1]) \
                .map(lambda x_: compute_delta_modularity(x_[0], x_[1])) \
                .filter(lambda res: res[2] is True) \
                .map(lambda res: (res[0], res[1]))
            if not favourite_config.isEmpty():
                favourite_config = favourite_config.reduce(lambda key, value: max(value))
                if type(favourite_config) is tuple:
                    favourite_config = favourite_config[0]
                improvement = True
                global_improvement = True
                new_communities[node] = new_communities[favourite_config]
                communities_broadcast = sc.broadcast(new_communities)
                modularity = compute_modularity()
                bc_modularity = sc.broadcast(modularity)

    print("Here 1")
    # transition to clusters to provide input for visualization

    # change graph (by making use of communities)
    # change broadcast values
    new_edges = edges.map(lambda edge: ((new_communities[edge[0][0]], new_communities[edge[0][1]]), edge[1])) \
        .reduceByKey(lambda a, b: a + b)
    self_edges = new_edges.filter(lambda edge: edge[0][0] == edge[0][1]) \
        .map(lambda e: ((e[0][0], e[0][1]), 2 * e[1]))
    other_edges = new_edges.filter(lambda edge: edge[0][0] != edge[0][1])
    new_edges = other_edges.union(self_edges)
    new_nodes = new_edges.flatMap(lambda edge: [edge[0][0], edge[0][1]]).distinct()
    new_nodes_collection = new_nodes.collect()
    n_old = n
    n = new_nodes.count()
    n_broadcast = sc.broadcast(n)
    mapping = {}
    for index, x in enumerate(new_nodes_collection):
        mapping[x] = index + 1
    mapping = sc.broadcast(mapping).value

    # TODO: !!!!!!!!!!!!!!!!!
    _edges = edges.map(lambda e: (e[0][0], e[0][1])).collect()
    # visualizeGraph(n_old, _edges, new_communities)
    if it == 0:
        visualizeGraph(n_old, _edges, {key: mapping[value] for key, value in new_communities.items()},
                       nodes_q=nodes_q, title=f'{it}_AS_Louvain_1')
    else:
        visualizeGraph(n_old, _edges, {key: mapping[value] for key, value in new_communities.items()},
                      title=f'{it}_AS_Louvain_1')

    if not global_improvement:
        break

    edges = new_edges.map(lambda edge: ((mapping[edge[0][0]],
                                         mapping[edge[0][1]]),
                                        edge[1]))
    edges.persist()

    print("Here 2")
    # transition to clusters to provide input for visualization
    # TODO: !!!!!!!!!!!!!!!!!
    _edges = edges.map(lambda e: (e[0][0], e[0][1])).collect()
    # visualizeGraph(n, _edges, {(i + 1): value for i, value in enumerate(new_nodes_collection)})
    visualizeGraph(n, _edges, {(i + 1): (i + 1) for i in range(n)}, title=f'{it}_AS_Louvain_2')

os.chdir(old_dir)

/home/ubuntu/jupyter/results/Clusterization Results
Number of nodes: 129
Number of edges: 459
[(3, -0.010793569424865083), (15, -0.010793569424865083), (31, -0.010793569424865083), (33, -0.010793569424865083), (39, -0.010822048499864727), (43, -0.010793569424865082), (49, -0.010793569424865082), (53, -0.010935964799863302), (74, -0.010935964799863303), (78, -0.010793569424865083), (106, -0.010594215899867572), (120, -0.010793569424865077)]


TypeError: 'int' object is not iterable

# Same code works on my machine, on the same datasets...and gives valid results which are saved in /results/Clusterization Results/Louvain