In [None]:
!pip install pyspark
!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl.metadata (934 bytes)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7


In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from graphframes import *
conf = SparkConf().setAppName('graph_processing').set('spark.jars.packages',
'graphframes:graphframes:0.8.1-spark3.0-s_2.12')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession(sc)



# Assignment


In [None]:

users_df = spark.read.csv("/content/user.txt", header=True, inferSchema=True)
relationships_df = spark.read.csv("/content/relationships.txt", header=True, inferSchema=True)


g = GraphFrame(users_df, relationships_df)


# 1 Find all triadic closures (fully connected triplets)

In [None]:
triadic_closures = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")
print("Triadic Closures:")
triadic_closures.show()

Triadic Closures:
+----------+----------+----------+
|         a|         b|         c|
+----------+----------+----------+
| {1, John}| {2, Mary}|{4, Karen}|
| {1, John}| {2, Mary}|{3, Steve}|
| {2, Mary}|{3, Steve}| {1, John}|
| {2, Mary}|{3, Steve}| {5, Paul}|
|{3, Steve}|{4, Karen}| {1, John}|
|{4, Karen}| {5, Paul}| {2, Mary}|
| {5, Paul}| {1, John}|{3, Steve}|
| {1, John}|{6, Alice}|  {8, Eve}|
| {1, John}|{6, Alice}|  {8, Eve}|
|{6, Alice}|  {7, Bob}| {9, Mike}|
|  {8, Eve}| {1, John}|{6, Alice}|
| {9, Mike}|{6, Alice}|  {7, Bob}|
| {1, John}|{3, Steve}| {5, Paul}|
| {1, John}|{3, Steve}| {5, Paul}|
| {1, John}|{3, Steve}|{4, Karen}|
| {2, Mary}|{4, Karen}| {1, John}|
| {2, Mary}|{4, Karen}| {5, Paul}|
|{3, Steve}| {5, Paul}| {1, John}|
|{3, Steve}| {5, Paul}| {2, Mary}|
|{3, Steve}| {5, Paul}| {1, John}|
+----------+----------+----------+
only showing top 20 rows



# 2 Identify chain patterns (A → B → C)

In [None]:
chain_patterns = g.find("(a)-[]->(b); (b)-[]->(c)")
print("Chain Patterns:")
chain_patterns.show()

Chain Patterns:
+----------+----------+----------+
|         a|         b|         c|
+----------+----------+----------+
| {1, John}| {2, Mary}|{4, Karen}|
| {1, John}| {2, Mary}|{3, Steve}|
| {2, Mary}|{3, Steve}| {1, John}|
| {2, Mary}|{3, Steve}| {5, Paul}|
| {2, Mary}|{3, Steve}|{4, Karen}|
|{3, Steve}|{4, Karen}| {1, John}|
|{3, Steve}|{4, Karen}| {5, Paul}|
|{4, Karen}| {5, Paul}| {1, John}|
|{4, Karen}| {5, Paul}| {2, Mary}|
|{4, Karen}| {5, Paul}| {1, John}|
| {5, Paul}| {1, John}|{3, Steve}|
| {5, Paul}| {1, John}|{6, Alice}|
| {5, Paul}| {1, John}| {2, Mary}|
| {1, John}|{6, Alice}|  {8, Eve}|
| {1, John}|{6, Alice}|  {7, Bob}|
|{6, Alice}|  {7, Bob}| {9, Mike}|
|{6, Alice}|  {7, Bob}|  {8, Eve}|
|  {7, Bob}|  {8, Eve}| {1, John}|
|  {7, Bob}|  {8, Eve}| {1, John}|
|  {8, Eve}| {1, John}|{3, Steve}|
+----------+----------+----------+
only showing top 20 rows



#3 Identify one-way following patterns (A → B, but B ⊁ A)

In [None]:
forward_follows = g.find("(a)-[e]->(b)").selectExpr("a.id as src", "b.id as dst")
reverse_follows = g.find("(b)-[e2]->(a)").selectExpr("b.id as src", "a.id as dst")


one_way_follows = forward_follows.join(
    reverse_follows,
    on=["src", "dst"],
    how="left_anti"
)

print("One-way Follows:")
one_way_follows.show()

One-way Follows:
+---+---+
|src|dst|
+---+---+
+---+---+



# 4 Find the user with the highest followers (in-degree)

In [None]:
max_followers = g.inDegrees.orderBy("inDegree", ascending=False)
print("User with Highest Followers:")
max_followers.show(1)

User with Highest Followers:
+---+--------+
| id|inDegree|
+---+--------+
|  1|       6|
+---+--------+
only showing top 1 row



# 5 Find the user with the lowest number of friends (out-degree)

In [None]:
min_friends = g.outDegrees.orderBy("outDegree", ascending=True)
print("User with Lowest Number of Friends:")
min_friends.show(1)

User with Lowest Number of Friends:
+---+---------+
| id|outDegree|
+---+---------+
|  9|        2|
+---+---------+
only showing top 1 row



# 6 Detect communities using Label Propagation Algorithm (LPA)

In [None]:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
lpa_result = g.labelPropagation(maxIter=5)
print("Communities Detected:")
lpa_result.show()

Communities Detected:
+---+-----+-----+
| id| name|label|
+---+-----+-----+
|  4|Karen|    8|
|  1| John|    8|
|  6|Alice|    8|
|  3|Steve|    8|
|  7|  Bob|    8|
|  9| Mike|    7|
|  8|  Eve|    1|
|  5| Paul|    8|
|  2| Mary|    8|
+---+-----+-----+

