In [8]:
!pip install pyspark
!pip install graphframes



In [9]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from graphframes import GraphFrame

conf = SparkConf().setAppName('graph_processing').set('spark.jars.packages', 'graphframes:graphframes:0.8.1-spark3.0-s_2.12')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession(sc)


In [10]:
user_data = spark.read.csv('/content/user.txt', header=True, inferSchema=True)
relationship_data = spark.read.csv('/content/relationships.txt', header=True, inferSchema=True)

#1

In [11]:
vertices = user_data.withColumnRenamed('id', 'id').withColumnRenamed('name', 'name')
edges = relationship_data.withColumnRenamed('src', 'src').withColumnRenamed('dst', 'dst').withColumnRenamed('relationship', 'relationship')

g = GraphFrame(vertices, edges)

vertices.show()
edges.show()

triangles = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a)").dropDuplicates()

triangles.show(truncate=False)


+---+-----+
| id| name|
+---+-----+
|  1| John|
|  2| Mary|
|  3|Steve|
|  4|Karen|
|  5| Paul|
|  6|Alice|
|  7|  Bob|
|  8|  Eve|
|  9| Mike|
+---+-----+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  1|  2|     friends|
|  2|  3|     friends|
|  3|  4|     friends|
|  4|  5|     friends|
|  5|  1|     friends|
|  1|  6|     friends|
|  6|  7|     friends|
|  7|  8|     friends|
|  8|  1|     friends|
|  9|  6|     friends|
|  1|  3|     follows|
|  2|  4|     follows|
|  3|  5|     follows|
|  4|  1|     follows|
|  3|  1|     follows|
|  5|  2|     follows|
|  5|  1|     follows|
|  6|  8|     follows|
|  7|  9|     follows|
|  8|  1|     follows|
+---+---+------------+
only showing top 20 rows

+----------+---------------+----------+---------------+----------+---------------+
|a         |e1             |b         |e2             |c         |e3             |
+----------+---------------+----------+---------------+----------+---------------+
|{5, Paul} |{5, 

#2

In [12]:
chain_patterns = g.find("(a)-[e1]->(b); (b)-[e2]->(c)")\
    .filter("e1.relationship = 'follows' and e2.relationship = 'follows'")\
    .dropDuplicates()

chain_patterns.show(truncate=False)


+----------+---------------+----------+---------------+----------+
|a         |e1             |b         |e2             |c         |
+----------+---------------+----------+---------------+----------+
|{5, Paul} |{5, 1, follows}|{1, John} |{1, 3, follows}|{3, Steve}|
|{9, Mike} |{9, 7, follows}|{7, Bob}  |{7, 9, follows}|{9, Mike} |
|{4, Karen}|{4, 1, follows}|{1, John} |{1, 3, follows}|{3, Steve}|
|{6, Alice}|{6, 8, follows}|{8, Eve}  |{8, 1, follows}|{1, John} |
|{3, Steve}|{3, 1, follows}|{1, John} |{1, 3, follows}|{3, Steve}|
|{1, John} |{1, 3, follows}|{3, Steve}|{3, 1, follows}|{1, John} |
|{8, Eve}  |{8, 1, follows}|{1, John} |{1, 3, follows}|{3, Steve}|
|{2, Mary} |{2, 4, follows}|{4, Karen}|{4, 1, follows}|{1, John} |
|{3, Steve}|{3, 5, follows}|{5, Paul} |{5, 1, follows}|{1, John} |
|{3, Steve}|{3, 5, follows}|{5, Paul} |{5, 2, follows}|{2, Mary} |
|{7, Bob}  |{7, 9, follows}|{9, Mike} |{9, 7, follows}|{7, Bob}  |
|{1, John} |{1, 3, follows}|{3, Steve}|{3, 5, follows}|{5, Pau

#3

In [25]:
from pyspark.sql.functions import col

follows_edges = edges.filter(col('relationship') == 'follows')

one_way_follow = follows_edges.alias('e1').join(
    follows_edges.alias('e2'),
    (col('e1.src') == col('e2.dst')) & (col('e1.dst') == col('e2.src')) & (col('e2.relationship') == 'follows'),
    'left_anti'
).select('e1.src', 'e1.dst', 'e1.relationship')

one_way_follow.show(truncate=False)


+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|2  |4  |follows     |
|3  |5  |follows     |
|4  |1  |follows     |
|5  |2  |follows     |
|5  |1  |follows     |
|6  |8  |follows     |
|8  |1  |follows     |
+---+---+------------+



#4

In [26]:
from pyspark.sql.functions import col, desc

follows_edges = edges.filter(col('relationship') == 'follows')

followers_count = follows_edges.groupBy('dst').count()

max_followers = followers_count.agg({'count': 'max'}).collect()[0][0]

users_with_max_followers = followers_count.filter(col('count') == max_followers)

users_with_max_followers.show(truncate=False)

+---+-----+
|dst|count|
+---+-----+
|1  |4    |
+---+-----+



#5

In [27]:
from pyspark.sql.functions import col

friends_edges = edges.filter(col('relationship') == 'friends')

friends_count_src = friends_edges.groupBy('src').count().withColumnRenamed('count', 'friends_count')
friends_count_dst = friends_edges.groupBy('dst').count().withColumnRenamed('count', 'friends_count')

friends_count = friends_count_src.union(friends_count_dst)\
    .groupBy('src').agg({'friends_count': 'sum'}).withColumnRenamed('sum(friends_count)', 'total_friends')\
    .withColumnRenamed('src', 'id')

min_friends = friends_count.agg({'total_friends': 'min'}).collect()[0][0]

users_with_min_friends = friends_count.filter(col('total_friends') == min_friends)

users_with_min_friends.show(truncate=False)


+---+-------------+
|id |total_friends|
+---+-------------+
|9  |1            |
+---+-------------+



#6

In [28]:
result = g.labelPropagation(maxIter=5)

result.select('id', 'label').show(truncate=False)

+---+-----+
|id |label|
+---+-----+
|4  |8    |
|1  |8    |
|6  |8    |
|3  |8    |
|7  |8    |
|9  |7    |
|8  |1    |
|5  |8    |
|2  |8    |
+---+-----+

