In [0]:
# dbfs:/FileStore/Slashdot0811.txt

# 1. Assignment

In [0]:
%pip install graphframes

Python interpreter will be restarted.
Collecting graphframes
  Using cached graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose
  Using cached nose-1.3.7-py3-none-any.whl (154 kB)
Installing collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7
Python interpreter will be restarted.


In [0]:
from pyspark.sql import functions as F
from graphframes import GraphFrame
from graphframes import *

In [0]:
# Avoid Spark from complaining during connected‑components
spark.sparkContext.setCheckpointDir("/tmp/graphframe‑chkpt")

N = spark.sparkContext.defaultParallelism * 3

In [0]:
sc

In [0]:
file_path = "dbfs:/FileStore/Slashdot0811.txt"

# Reading the tab‑separated file; ignore comment lines that start with '#'
edges_raw = spark.read.option("sep", "\t").option("header", "false").option("comment", "#").csv(file_path).toDF("src", "dst")

In [0]:
edges = edges_raw.filter("src <> dst")

In [0]:
edges.show()

+---+---+
|src|dst|
+---+---+
|  0|  1|
|  0|  2|
|  0|  3|
|  0|  4|
|  0|  5|
|  0|  6|
|  0|  7|
|  0|  8|
|  0|  9|
|  0| 10|
|  0| 11|
|  0| 12|
|  0| 13|
|  0| 14|
|  0| 15|
|  0| 16|
|  0| 17|
|  0| 18|
|  0| 19|
|  0| 20|
+---+---+
only showing top 20 rows



In [0]:
# Building a vertices DataFrame from distinct ids
vertices = edges.select(F.col("src").alias("id")).union(edges.select(F.col("dst").alias("id"))).distinct()

In [0]:
vertices.show()

+---+
| id|
+---+
|  7|
| 15|
| 11|
|  3|
|  8|
| 16|
|  0|
|  5|
| 18|
| 17|
|  6|
| 19|
|  9|
|  1|
| 20|
| 10|
|  4|
| 12|
| 13|
| 14|
+---+
only showing top 20 rows



In [0]:
print(f"Vertices: {vertices.count():,}  |  Edges: {edges.count():,}")

Vertices: 77,360  |  Edges: 828,161


In [0]:
g = GraphFrame(vertices, edges)

### a. Top‑5 out‑degree nodes

In [0]:
out_top5 = g.outDegrees.orderBy(F.desc("outDegree")).limit(5)
out_top5.display()
out_top5.coalesce(1).write.mode("overwrite").csv("/FileStore/graphx‑slashdot/outdegree_top5", header=True)

id,outDegree
2481,2507
4675,2209
394,2198
377,1732
225,1696


### b. Top‑5 in‑degree nodes

In [0]:
in_top5 = g.inDegrees.orderBy(F.desc("inDegree")).limit(5)
in_top5.display()
in_top5.coalesce(1).write.mode("overwrite").csv("/FileStore/graphx‑slashdot/indegree_top5", header=True)

id,inDegree
2481,2539
394,2326
4675,2239
377,1740
225,1716


### c. PageRank (top-5)

In [0]:
pr = g.pageRank(resetProbability=0.15, # tol=0.01, 
                maxIter=10)

pr_top5 = pr.vertices.select("id", "pagerank").orderBy(F.desc("pagerank")).limit(5)
pr_top5.display()
pr_top5.coalesce(1).write.mode("overwrite").csv("/FileStore/graphx‑slashdot/pagerank_top5", header=True)

id,pagerank
2481,228.2540271970992
394,207.63135297704093
34,187.00779399283363
377,180.91047469348936
4675,164.83431863398914


### d. Connected components (top‑5 largest)

In [0]:
core_v = g.degrees.filter("degree >= 2").select("id")      # keeping degree ≥ 2

In [0]:
core_e = g.edges.join(core_v.select(F.col("id").alias("src")), "src").join(core_v.select(F.col("id").alias("dst")), "dst")

In [0]:
g_core = GraphFrame(core_v, core_e)

In [0]:
cc = g_core.connectedComponents()

In [0]:
cc_sizes = cc.groupBy("component").count().orderBy(F.desc("count")).limit(5)

In [0]:
# cc = g.connectedComponents()
cc_sizes.display()

component,count
0,72075


In [0]:
cc_sizes.coalesce(1).write.mode("overwrite").csv("/FileStore/graphx‑slashdot/connected_components_top5", header=True)

### d2. Connected Components

In [0]:
cc = g.connectedComponents()

In [0]:
cc_sizes = cc.groupBy("component").count().orderBy(F.desc("count")).limit(5)

In [0]:
cc_sizes.display()

component,count
0,77360


In [0]:
cc_sizes.coalesce(1).write.mode("overwrite").csv("/FileStore/graphx‑slashdot/connected_components_top5", header=True)

### e. Triangle count (top‑5 vertices)

In [0]:
tri = g.triangleCount()

In [0]:
tri_top5 = tri.select("id", "count").orderBy(F.desc("count")).limit(5)

In [0]:
tri_top5.display()

id,count
46,14888
192,11742
1712,10942
394,10820
337,10534


In [0]:
tri_top5.coalesce(1).write.mode("overwrite").csv("/FileStore/graphx‑slashdot/trianglecount_top5", header=True)