In [0]:
from graphframes import *
from pyspark.sql.functions import desc

In [0]:
path = "/FileStore/tables/Wiki_Vote.txt"
output_path = "/FileStore/tables/output_3_2.txt"

In [0]:
# defining the graph edges
edges = spark.read.option("header", "true").option("inferedSchema","true").option("delimiter","\t").csv(path)
edges = edges.withColumnRenamed("FromNodeId", "src").withColumnRenamed("ToNodeId", "dst")
display(edges)
edges.count()

src,dst
30,1412
30,3352
30,5254
30,5543
30,7478
3,28
3,30
3,39
3,54
3,108


Out[4]: 103689

In [0]:
# defining the vertices of our graph
vertices = edges.select("src").union(edges.select("dst")).distinct().withColumnRenamed("src","id")
display(vertices)
vertices.count()

id
296
467
675
691
1159
1090
1436
1512
1572
2069


Out[5]: 7115

In [0]:
# Creating the graph frame from vertices and edges
graph = GraphFrame(vertices, edges)
graph

Out[6]: GraphFrame(v:[id: string], e:[src: string, dst: string])

In [0]:
#1 Find the top 5 nodes with the highest outdegree and find the count of the number of outgoing edges in each
out_degree = graph.outDegrees.orderBy(desc("outDegree"))
display(out_degree.take(5))

text = 'Q1 (id, outDegree)' + '\n'
for line in out_degree.take(5):
    text += str(line[0]) + ' ' + str(line[1])
    text += '\n'
text += '\n'

id,outDegree
2565,893
766,773
11,743
457,732
2688,618


In [0]:
#2 Find the top 5 nodes with the highest indegree and find the count of the number of incoming edges in each
in_degree = graph.inDegrees.orderBy(desc("inDegree"))
display(in_degree.take(5))

text += 'Q2 (id, inDegree)' + '\n'
for line in in_degree.take(5):
    text += str(line[0]) + ' ' + str(line[1])
    text += '\n'
text += '\n'

id,inDegree
4037,457
15,361
2398,340
2625,331
1297,309


In [0]:
#3 Calculate PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values. You are free to define any suitable parameters.
page_ranks = graph.pageRank(resetProbability=0.15, maxIter=10)
page_ranks_ordered = page_ranks.vertices.orderBy(desc("pagerank")).select("id","pagerank").distinct()
page_ranks_5 = page_ranks_ordered.orderBy(desc("pagerank"))
display(page_ranks_5.take(5))

text += 'Q3 (id, pageRank)' + '\n'
for line in page_ranks_5.take(5):
    text += str(line[0]) + ' ' + str(line[1])
    text += '\n'
text += '\n'

id,pagerank
4037,32.761392590350795
15,26.25300495761947
6634,26.16452443488649
2625,23.51151593302638
2398,18.728389390669683


In [0]:
#4 Run the connected components algorithm on it and find the top 5 components with the largest number of nodes.
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
connected_component = graph.connectedComponents()
connected_component_5 = connected_component.groupBy("component").count().orderBy(desc("count"))
display(connected_component_5.take(5))

component,count
0,7066
532575944741,3
592705486870,3
936302870556,3
884763263008,2


In [0]:
text += 'Q4 (component, countOfNodes)' + '\n'
for line in connected_component_5.take(5):
    text += str(line[0]) + ' ' + str(line[1])
    text += '\n'
text += '\n'

In [0]:
#5 Run the triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count. In case of ties, you can randomly select the top 5 vertices.
triangle_counts = graph.triangleCount()
triangle_counts_5 = triangle_counts.select("id", "count").distinct().orderBy(desc("count"))
display(triangle_counts_5.take(5))
text += 'Q5 (id, triangleCount)' + '\n'
for line in triangle_counts_5.take(5):
    text += str(line[0]) + ' ' + str(line[1])
    text += '\n'

id,count
2565,30940
1549,22003
766,18204
1166,17361
2688,14220


In [0]:
print(text)

Q1 (id, outDegree)
2565 893
766 773
11 743
457 732
2688 618

Q2 (id, inDegree)
4037 457
15 361
2398 340
2625 331
1297 309

Q3 (id, pageRank)
4037 32.761392590350795
15 26.25300495761947
6634 26.16452443488649
2625 23.51151593302638
2398 18.728389390669683

Q4 (component, countOfNodes)
0 7066
532575944741 3
592705486870 3
936302870556 3
884763263008 2

Q5 (id, triangleCount)
2565 30940
1549 22003
766 18204
1166 17361
2688 14220



In [0]:
dbutils.fs.put(output_path, text)

Wrote 431 bytes.
Out[15]: True

In [0]:
# Viewing the outputt.txt file
op = sc.textFile("dbfs:/FileStore/tables/output_3_2.txt")
op.collect()

Out[16]: ['Q1 (id, outDegree)',
 '2565 893',
 '766 773',
 '11 743',
 '457 732',
 '2688 618',
 '',
 'Q2 (id, inDegree)',
 '4037 457',
 '15 361',
 '2398 340',
 '2625 331',
 '1297 309',
 '',
 'Q3 (id, pageRank)',
 '4037 32.761392590350795',
 '15 26.25300495761947',
 '6634 26.16452443488649',
 '2625 23.51151593302638',
 '2398 18.728389390669683',
 '',
 'Q4 (component, countOfNodes)',
 '0 7066',
 '532575944741 3',
 '592705486870 3',
 '936302870556 3',
 '884763263008 2',
 '',
 'Q5 (id, triangleCount)',
 '2565 30940',
 '1549 22003',
 '766 18204',
 '1166 17361',
 '2688 14220']