In [2]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame

spark = SparkSession.builder.getOrCreate()

vertexesPath = "Spark/Exercises/Ex_55/vertexes.csv"
edgesPath = "Spark/Exercises/Ex_55/edges.csv"
outputPath = "Spark/Exercises/Ex_55/res"

In [3]:
vertexesDF = spark.read.load(path=vertexesPath , format="csv", header=True, inferSchema=True)
vertexesDF.show()

+---+----------+--------+
| id|entityName|    name|
+---+----------+--------+
| V1|      user|   Paolo|
| V2|     topic|     SQL|
| V3|      user|   David|
| V4|     topic|Big Data|
| V5|      user|    John|
+---+----------+--------+



In [4]:
edgesDF = spark.read.load(path=edgesPath , format="csv", header=True, inferSchema=True)
edgesDF.show()

+---+---+----------+
|src|dst|  linktype|
+---+---+----------+
| V1| V2|      like|
| V1| V3|    follow|
| V1| V4|    follow|
| V3| V2|    follow|
| V3| V4|    follow|
| V5| V2|  expertOf|
| V2| V4|correlated|
| V4| V2|correlated|
+---+---+----------+



In [5]:
filteredEdgesDF = edgesDF.filter("linktype == 'follow'")
filteredEdgesDF.show()

+---+---+--------+
|src|dst|linktype|
+---+---+--------+
| V1| V3|  follow|
| V1| V4|  follow|
| V3| V2|  follow|
| V3| V4|  follow|
+---+---+--------+



In [6]:
g = GraphFrame(vertexesDF,filteredEdgesDF)

In [7]:
pathsDF = g.find("(v1)-[]->(v2)")
pathsDF.show()

+-----------------+--------------------+
|               v1|                  v2|
+-----------------+--------------------+
|[V1, user, Paolo]|   [V3, user, David]|
|[V1, user, Paolo]|[V4, topic, Big D...|
|[V3, user, David]|    [V2, topic, SQL]|
|[V3, user, David]|[V4, topic, Big D...|
+-----------------+--------------------+



In [10]:
topicPathsDF = pathsDF.filter("v1.entityName == 'user' and v2.entityName == 'topic'")
topicPathsDF.show()

+-----------------+--------------------+
|               v1|                  v2|
+-----------------+--------------------+
|[V1, user, Paolo]|[V4, topic, Big D...|
|[V3, user, David]|    [V2, topic, SQL]|
|[V3, user, David]|[V4, topic, Big D...|
+-----------------+--------------------+



In [11]:
finalDF = topicPathsDF.selectExpr("v1.name as username","v2.name as topic")
finalDF.show()

+--------+--------+
|username|   topic|
+--------+--------+
|   Paolo|Big Data|
|   David|     SQL|
|   David|Big Data|
+--------+--------+



In [12]:
finalDF.write.csv(outputPath,header=True)