In [1]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame

In [2]:
spark = SparkSession.builder.appName("GraphFrames").master("local[*]")\
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12").getOrCreate()
spark

In [3]:
vertices_csv_path = "facebook_vertices.csv"
edges_csv_path = "facebook_edges.csv"

vertices = spark.read.csv(vertices_csv_path, header=True, inferSchema=True)
edges = spark.read.csv(edges_csv_path, header=True, inferSchema=True)

In [4]:
g = GraphFrame(vertices, edges)



In [5]:
# (a) Identifying influential users using PageRank
pagerank_results = g.pageRank(resetProbability=0.15, tol=0.01)
pagerank_results.vertices.orderBy("pagerank", ascending=False).show(10)



+---+------------------+
| id|          pagerank|
+---+------------------+
|143|1.8688906932154679|
| 53| 1.862280827337311|
|301|1.6765667360032137|
| 42|1.6765667360032137|
|312|1.6596539296200101|
|343|1.5256099594417722|
|111|1.3903143531621407|
|115|  1.37232062377254|
|200|1.3449475249152258|
|326| 1.313344048132692|
+---+------------------+
only showing top 10 rows



In [10]:
spark.sparkContext.setCheckpointDir("C:/Big Data/checkpoints")
components = g.connectedComponents(algorithm = "graphframes", checkpointInterval=10)
components.select("id", "component").orderBy("component").show(10)



+---+---------+
| id|component|
+---+---------+
|266|        0|
|245|        0|
| 10|        0|
|264|        0|
|323|        0|
|346|        0|
| 72|        0|
| 37|        0|
|230|        0|
|272|        0|
+---+---------+
only showing top 10 rows



In [11]:
# Apliko algoritmin e përhapjes së informacionit
label_propagation = g.labelPropagation(maxIter=5)
label_propagation.orderBy("label").show(10, truncate=False)

+---+-----+
|id |label|
+---+-----+
|201|8    |
|91 |8    |
|110|8    |
|193|8    |
|259|8    |
|264|8    |
|245|8    |
|99 |23   |
|267|23   |
|124|23   |
+---+-----+
only showing top 10 rows



In [12]:
# (d) Impact of removing a key user
def remove_user(graph, user_id):
    new_vertices = graph.vertices.filter(f"id != '{user_id}'")
    new_edges = graph.edges.filter(f"src != '{user_id}' AND dst != '{user_id}'")
    return GraphFrame(new_vertices, new_edges)

graph_without_user = remove_user(g, '1')
print("Graph size after removing user 1:")
print("Vertices:", graph_without_user.vertices.count())
print("Edges:", graph_without_user.edges.count())



Graph size after removing user 1:
Vertices: 350
Edges: 983


In [14]:
graph_without_user_connected_components = graph_without_user.connectedComponents(algorithm = "graphframes", checkpointInterval=10)
print("Connected components after removing user 1:")
graph_without_user_connected_components.select("id", "component").orderBy("component").show(10)



Connected components after removing user 1:
+---+---------+
| id|component|
+---+---------+
|266|        0|
|245|        0|
| 10|        0|
|264|        0|
|323|        0|
|346|        0|
| 72|        0|
| 37|        0|
|230|        0|
|272|        0|
+---+---------+
only showing top 10 rows

