In [0]:
# Dataset :  Epinions social network - https://snap.stanford.edu/data/soc-Epinions1.html

# Dataset statistics
# Nodes	75879
# Edges	508837
# Nodes in largest WCC	75877 (1.000)
# Edges in largest WCC	508836 (1.000)
# Nodes in largest SCC	32223 (0.425)
# Edges in largest SCC	443506 (0.872)
# Average clustering coefficient	0.1378
# Number of triangles	1624481
# Fraction of closed triangles	0.0229
# Diameter (longest shortest path)	14
# 90-percentile effective diameter	5

In [0]:
# file_path = "/FileStore/tables/soc_Epinions1.txt"
file_path = "s3://cs6350.001.f22/soc-Epinions1.txt"

In [0]:
row_rdd = spark.sparkContext.textFile(file_path) \
    .zipWithIndex()
display(spark.createDataFrame(row_rdd))

_1,_2
# Directed graph (each unordered pair of nodes is saved once): soc-Epinions1.txt,0
# Directed Epinions social network,1
# Nodes: 75879 Edges: 508837,2
# FromNodeId	ToNodeId,3
0	4,4
0	5,5
0	7,6
0	8,7
0	9,8
0	10,9


In [0]:
n_skip_rows = 3
row_rdd = spark.sparkContext.textFile(file_path) \
    .zipWithIndex() \
    .filter(lambda row: row[1] >= n_skip_rows) \
    .map(lambda row: row[0])
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter","\t").csv(row_rdd)
df.show(5)

+------------+--------+
|# FromNodeId|ToNodeId|
+------------+--------+
|           0|       4|
|           0|       5|
|           0|       7|
|           0|       8|
|           0|       9|
+------------+--------+
only showing top 5 rows



In [0]:
column_name = df.columns[0]
data = df.withColumnRenamed(column_name, column_name[2:])
data.printSchema()

root
 |-- FromNodeId: integer (nullable = true)
 |-- ToNodeId: integer (nullable = true)



In [0]:
data.show(5)

+----------+--------+
|FromNodeId|ToNodeId|
+----------+--------+
|         0|       4|
|         0|       5|
|         0|       7|
|         0|       8|
|         0|       9|
+----------+--------+
only showing top 5 rows



In [0]:
vertices_FromNode = data.select("FromNodeId").toDF("id").distinct()
vertices_ToNode = data.select("ToNodeId").toDF("id").distinct()
vertices = vertices_FromNode.union(vertices_ToNode).distinct().sort("id")
vertices.show(5)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In [0]:
# Nodes	75879
vertices.count()

Out[8]: 75879

In [0]:
edges = data.select("FromNodeId", "ToNodeId").toDF("src", "dst")
edges.show(5)

+---+---+
|src|dst|
+---+---+
|  0|  4|
|  0|  5|
|  0|  7|
|  0|  8|
|  0|  9|
+---+---+
only showing top 5 rows



In [0]:
# Edges	508837
edges.count()

Out[10]: 508837

In [0]:
from graphframes import GraphFrame
from pyspark.sql.functions import count, desc, sum

epinionsGraph = GraphFrame(vertices, edges)
epinionsGraph.cache()

Out[11]: GraphFrame(v:[id: int], e:[src: int, dst: int])

In [0]:
# a. Find the top 5 nodes with the highest outdegree and find the count of the number of outgoing edges in each
outDeg = epinionsGraph.outDegrees
outDeg.orderBy(desc("outDegree")).show(5, truncate=False)

+-----+---------+
|id   |outDegree|
+-----+---------+
|645  |1801     |
|763  |1669     |
|634  |1621     |
|71399|1128     |
|3924 |976      |
+-----+---------+
only showing top 5 rows



In [0]:
# b. Find the top 5 nodes with the highest indegree and find the count of the number of incoming edges in each
inDeg = epinionsGraph.inDegrees
inDeg.orderBy(desc("inDegree")).show(5, truncate = False)

+---+--------+
|id |inDegree|
+---+--------+
|18 |3035    |
|143|1521    |
|737|1317    |
|790|1284    |
|136|1180    |
+---+--------+
only showing top 5 rows



In [0]:
# c. Calculate PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values. You are free to define any suitable parameters.
ranks = epinionsGraph.pageRank(resetProbability=0.15, maxIter=10)

In [0]:
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(5)

+----+------------------+
|  id|          pagerank|
+----+------------------+
|  18|345.13733694791057|
| 737|240.25396712974688|
| 118| 162.0208569979244|
|1719|158.85345607508734|
| 136|151.67122319883188|
+----+------------------+
only showing top 5 rows



In [0]:
# d. Run the connected components algorithm on it and find the top 5 components with the largest number of nodes.
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

In [0]:
# Dataset is a directed Graph we should not use the connectedComponents()
# cc = epinionsGraph.connectedComponents()
# cc.show(5)
# filtered_cc = cc.where("component != 0")
# filtered_cc.show()
# filtered_cc.select("component").distinct().show()
# filtered_cc.groupBy("component").agg(count('id').alias('count')).orderBy(desc("count")).show()

In [0]:
# Dataset is a directed Graph we should use the stronglyConnectedComponents()
scc = epinionsGraph.stronglyConnectedComponents(maxIter=3)
scc.show(5)

+-----+---------+
|   id|component|
+-----+---------+
| 9200|        0|
| 6400|        0|
|42200|    42200|
|68200|    68200|
|68000|    68000|
+-----+---------+
only showing top 5 rows



In [0]:
# Nodes in largest SCC	32223
scc.groupBy("component").agg(count('id').alias('count')).orderBy(desc("count")).show(5)

+---------+-----+
|component|count|
+---------+-----+
|        0|32223|
|    47210|   15|
|    33367|    9|
|      515|    9|
|    35011|    8|
+---------+-----+
only showing top 5 rows



In [0]:
# 'component 0' is also a connected component no need to filter
# filtered_scc = scc.where("component != 0")
# filtered_scc.show(5)
# filtered_scc.groupBy("component").agg(count('id').alias('count')).orderBy(desc("count")).show(5)

In [0]:
# e. Run the triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count. In case of ties, you can randomly select the top 5 vertices.
tc = epinionsGraph.triangleCount()
tc.show(5)

+-----+---+
|count| id|
+-----+---+
|16511|  1|
|  302|  3|
| 4854|  5|
| 2716|  4|
| 3885|  2|
+-----+---+
only showing top 5 rows



In [0]:
tc.orderBy(desc("count")).show(5, truncate=False)

+-----+---+
|count|id |
+-----+---+
|48674|645|
|47203|18 |
|25817|27 |
|25230|634|
|24752|44 |
+-----+---+
only showing top 5 rows



In [0]:
# Number of triangles	1624481
# 1624481 * 3 = 4873443 - A triangle it is added thrice for each of the three vertices
print(1624481 * 3)
tc.agg(sum('count').alias('totalCount')).show()

4873443
+----------+
|totalCount|
+----------+
|   4873443|
+----------+

