In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf spark-3.3.2-bin-hadoop2.tgz
!pip install -q findspark
!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m153.6/154.7 kB[0m [31m4.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

import findspark
findspark.init("spark-3.3.2-bin-hadoop2")# SPARK_HOME

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from graphframes import GraphFrame
import math
from collections import *

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [3]:
!wget https://github.com/sparkling-graph/sparkling-graph/raw/master/loaders/src/test/resources/simple.csv -O simple.csv
simple_data = spark.read.csv("simple.csv", header = True, inferSchema= True)
simple_data.printSchema()

--2024-04-08 04:28:51--  https://github.com/sparkling-graph/sparkling-graph/raw/master/loaders/src/test/resources/simple.csv
Resolving github.com (github.com)... 20.27.177.113
Connecting to github.com (github.com)|20.27.177.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/sparkling-graph/sparkling-graph/master/loaders/src/test/resources/simple.csv [following]
--2024-04-08 04:28:51--  https://raw.githubusercontent.com/sparkling-graph/sparkling-graph/master/loaders/src/test/resources/simple.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49 [text/plain]
Saving to: ‘simple.csv’


2024-04-08 04:28:52 (2.35 MB/s) - ‘simple.csv’ saved [49/49]

root
 |-- v1: integer (nullable = true)
 |-- v2: i

In [4]:
file_path = "simple.csv"
edge_adjacency_list_all = defaultdict(list)

def csv_to_adjacency_list(file_path):
    # Read the CSV file
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Group by src and aggregate dst into a list
    adjacency_list_df = df.groupBy("v1").agg(collect_list("v2").alias("adjacency_list"))

    # Handle the undirected nature of the graph
    df_swapped = df.selectExpr("v2 as v1", "v1 as v2")
    df_undirected = df.union(df_swapped)

    # Group by vertex and aggregate its neighbors to form the adjacency list
    adjacency_list_df = df_undirected.groupBy("v1").agg(collect_list("v2").alias("v_adjacency_list")).sort(asc("v1"))

    # Output the adjacency list for all vertices
    adjacency_list_df.show()

    adjacency_list =  {row['v1']: row['v_adjacency_list'] for row in adjacency_list_df.collect()}

    return adjacency_list

'''
PCSS Mapper:
Input: key is the input vertex, value is the adjacency list of
the input vertex
Output: < key, value >, where the key is the edge
connecting the input vertex and its neighbor and value is
the adjacency list of the input vertex
'''
def PCSS_Mapper(v, v_adjacency_list):
    edge_adjacency_pair = {}

    # For each neighbor vi in the adjacency list
    for vi in v_adjacency_list:
        if v < vi:
            # If v < vi, Take (v, vi) as key
            key = (v, vi)
        else:
            # else, Take (vi, v) as key
            key = (vi, v)
        # Take the adjacency list of vertex v as value
        value = v_adjacency_list
        edge_adjacency_pair[key] = value

    # Return {((v,vi), v_adjecency_list), (v,vj), v_adjecency_list)...}
    return edge_adjacency_pair

'''
PCSS_Combiner
Input: key is edge, value is the adjacency list of one vertex of the edge
Output: < key, value >, where key is edge, values is the adjacency list of both vertexes of the edge
'''
def PCSS_Combiner(dicts_list):
    for d in dicts_list:
        for k, v in d.items():
            if v not in edge_adjacency_list_all[k]:
                # Append a new list containing v for each dictionary
                edge_adjacency_list_all[k].append(v)
    return dict(edge_adjacency_list_all)

'''
PCSS_Reducer and Cutting_Edge
Input: key is one edge, values includes the adjacency lists
of the two vertices adjacent to the edge
Output: < key, value >, where the key is the edge and
value is structural similarity of the edge
'''
def PCSS_Reducer_Cutting_Edge(edge_neighbors_dict, epsilon):
    similarity_dict = defaultdict(list)
    for k,v in edge_neighbors_dict.items():
        #calculate structural similarity
        similarity = len(list(set(v[0])&set(v[1])))/math.sqrt(len(v[0])*len(v[1]))

        #prune the edges with only similarity is bigger than threshold
        if similarity >= epsilon:
            similarity_dict[k].append(similarity)
    return dict(similarity_dict)

'''
pre_LPCC_parser
Input: key is one edge, value is structural similarity of the edge
Output: < key, value >, where the key is the vertex and
value is the adjacency list of the input vertex
'''
def pre_LPCC_parser(reduced_simi_dict):
    reduced_edges = []
    for k,v in reduced_simi_dict.items():
        reduced_edges.append(k)

    # Parallelize the list to create an RDD
    edges_rdd = sc.parallelize(reduced_edges)

    # Generate (vertex, adjacent) pairs for both directions
    adjacency_pairs = edges_rdd.flatMap(lambda edge: [(edge[0], edge[1]), (edge[1], edge[0])])

    # Group by key (vertex) and map values to form the adjacency list
    adjacency_list = adjacency_pairs.groupByKey().mapValues(list)

    # return value as adjacency list in dict: {1: [2,3], 2:[1,3],3:[1,2]}
    result = adjacency_list.collectAsMap()
    sorted_result = {k: result[k] for k in sorted(result)}
    return sorted_result

'''
LPCC
Input: A network in the format of adjacency list
Output: All of the connected components in the network
'''
def LPCC(reduced_adj_list):
    # initialize the input to be (vertex ID, structure information),
    # in which the structure information includes status, label and adjacency list of the vertex
    stuct_info_v = {k: (1, k, v) for k, v in reduced_adj_list.items()}
    print(stuct_info_v)
    for k,v in stuct_info_v.items():
        kv = LPCC_Mapper(k,v)
        LPCC_Reducer(kv)

'''
LPCC_Mapper
Input: key is the input vertex ID, value is the structure
information (status, label and adjacency list) of the input
vertex
Output: < key, value >, where the key is a vertex ID
and value is the label or the structure information of the
input vertex
'''
def LPCC_Mapper(v,struct_info):
    status = struct_info[0]
    label = struct_info[1]
    adj_list = struct_info[2]

    #update v and its neighbors
    emit_list = {v:[]}
    # if v is activated, update its neighors:
    if status == 1:
        for v_i in adj_list:
            emit_list[v].append((v_i, label))
    # the structure information of the input vertex
    emit_list[v].append((struct_info))
    print('EMIT-list', emit_list)
    return emit_list

'''
LPCC_Reducer
Input: key is vertex ID, values includes the structure
information of the vertex and the labels from its neighbors.
Output: < key, value >, where the key is the vertex ID
and value is the updated structure information of the vertex
'''
def LPCC_Reducer(emit_list_from_mapper):
    # extract k-v pair from input, we know there is only one v
    v = next(iter(emit_list_from_mapper.keys()))
    values = next(iter(emit_list_from_mapper.values()))
    struct_info = list(values[-1])
    # Find the current label
    current_label = struct_info[1]

    # Find the smallest label from the neighbors
    neighbors = list(values[:-1])
    smallest_item = neighbors[0]
    for item in neighbors:
        if item[1] < smallest_item[1]:
            smallest_item = item

    # If the smallest label from its neighbors is less than its current label
    if smallest_item[1] < current_label:
        # Set the vertex as activated
        struct_info[0] = 1
        # update the label in the structure information as the smallest label
        struct_info[1] = smallest_item[1]
    else:
        struct_info[0] = 0

    updated_struct_info = {v: struct_info}
    print(updated_struct_info)
    return updated_struct_info

In [7]:
temp_dict = []
adjacency_list_all = csv_to_adjacency_list(file_path)
print(adjacency_list_all)

print(edge_adjacency_list_all)

for  key, value in adjacency_list_all.items():
    e_adjacency_list = PCSS_Mapper(key, value)
    print(e_adjacency_list)
    temp_dict.append(e_adjacency_list)

print('Temp_dict', temp_dict)

en_dict = PCSS_Combiner(temp_dict)
print('PCSS_dict', en_dict)

reduced_graph = PCSS_Reducer_Cutting_Edge(en_dict,0.2)
print('Reduced Graph', reduced_graph)

reduced_adj_list = pre_LPCC_parser(reduced_graph)
print(reduced_adj_list)

LPCC(reduced_adj_list)

+---+----------------+
| v1|v_adjacency_list|
+---+----------------+
|  1|       [2, 4, 3]|
|  2|          [3, 1]|
|  3|          [1, 2]|
|  4|             [1]|
|  5|             [6]|
|  6|             [5]|
+---+----------------+

{1: [2, 4, 3], 2: [3, 1], 3: [1, 2], 4: [1], 5: [6], 6: [5]}
defaultdict(<class 'list'>, {(1, 2): [[2, 4, 3], [3, 1]], (1, 4): [[2, 4, 3], [1]], (1, 3): [[2, 4, 3], [1, 2]], (2, 3): [[3, 1], [1, 2]], (5, 6): [[6], [5]]})
{(1, 2): [2, 4, 3], (1, 4): [2, 4, 3], (1, 3): [2, 4, 3]}
{(2, 3): [3, 1], (1, 2): [3, 1]}
{(1, 3): [1, 2], (2, 3): [1, 2]}
{(1, 4): [1]}
{(5, 6): [6]}
{(5, 6): [5]}
Temp_dict [{(1, 2): [2, 4, 3], (1, 4): [2, 4, 3], (1, 3): [2, 4, 3]}, {(2, 3): [3, 1], (1, 2): [3, 1]}, {(1, 3): [1, 2], (2, 3): [1, 2]}, {(1, 4): [1]}, {(5, 6): [6]}, {(5, 6): [5]}]
PCSS_dict {(1, 2): [[2, 4, 3], [3, 1]], (1, 4): [[2, 4, 3], [1]], (1, 3): [[2, 4, 3], [1, 2]], (2, 3): [[3, 1], [1, 2]], (5, 6): [[6], [5]]}
Reduced Graph {(1, 2): [0.4082482904638631], (1, 3): [0.40

In [None]:
'''
PSCAN: orignal code in SCALA, partial implementation from the authors
https://github.com/dawnranger/spark-pscan/blob/master/README.md

@inproceedings{zhaorepo,
  author = {Zhao, Weizhong},
  title = {https://github.com/dawnranger/spark-pscan},
  year = {2018},
  publisher = {GitHub},
  journal = {GitHub repository},
  howpublished = {\url{https://github.com/dawnranger/spark-pscan}},
  commit = {4e1cc87add0feae6c9027610d6ff789787234852}
}

'''

from graphframes import GraphFrame
from pyspark.sql.functions import col, explode, array, size, sqrt
from pyspark.sql import functions as F

def pscan(graph, epsilon=0.5):
    # Assume `graph` is a GraphFrame object

    # Extracting vertices and edges from the graph
    vertices = graph.vertices
    edges = graph.edges

    # Creating a DataFrame of neighbors
    edges_both_directions = edges.select(col("src").alias("id"), col("dst").alias("neighbor"))\
        .unionByName(edges.select(col("dst").alias("id"), col("src").alias("neighbor")))

    # Grouping neighbors together by vertex
    neighbors = edges_both_directions.groupBy("id").agg(F.collect_set("neighbor").alias("neighbors"))

    # Adding self to neighbor set
    vertices_with_neighbors = vertices.join(neighbors, "id", "left_outer")\
        .withColumn("neighbors", F.array_union(col("neighbors"), array(col("id"))))\
        .na.fill({"neighbors": []})

    # Joining neighbors information to edges for similarity calculation
    edges_with_neighbors = edges.join(vertices_with_neighbors.withColumnRenamed("id", "src").withColumnRenamed("neighbors", "src_neighbors"), "src")\
        .join(vertices_with_neighbors.withColumnRenamed("id", "dst").withColumnRenamed("neighbors", "dst_neighbors"), "dst")

    # Calculating similarity
    def calculate_similarity(src_neighbors, dst_neighbors):
        intersection = size(array_intersect(col(src_neighbors), col(dst_neighbors)))
        denominator = sqrt(size(col(src_neighbors)) * size(col(dst_neighbors)))
        return intersection / denominator

    edges_with_similarity = edges_with_neighbors.withColumn("similarity", calculate_similarity("src_neighbors", "dst_neighbors"))

    # Filtering edges based on similarity
    edges_filtered = edges_with_similarity.filter(col("similarity") >= epsilon)

    # Constructing a new graph with filtered edges
    graph_filtered = GraphFrame(vertices, edges_filtered.select(col("src"), col("dst")))

    # Finding connected components (clusters)
    components = graph_filtered.connectedComponents()

    # Joining the cluster IDs back to the original vertices
    vertices_with_components = vertices.join(components, "id")

    # Returning a new graph with vertices labeled by their component ID
    return GraphFrame(vertices_with_components, edges)



In [None]:
import sys
print(sys.version)

3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
