# Graph Frames

In [0]:
import uuid
import graphframes as gf
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, ArrayType, StringType, LongType, StructField, IntegerType
from typing import List
from pyspark.sql.functions import udf

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

# Here Path indicates input file path, and delta_dir points to file
delta_dir = "dbfs:/delta/tables/"

In [0]:
factTable = spark.read.format('delta').load(f'{delta_dir}Fact_Table')
authors = spark.read.format('delta').load(f'{delta_dir}Author')
publications = spark.read.format('delta').load(f'{delta_dir}Publication')
venues = spark.read.format('delta').load(f'{delta_dir}Venue')
domains = spark.read.format('delta').load(f'{delta_dir}FieldOfStudy')
institutions = spark.read.format('delta').load(f'{delta_dir}Organization')

In [0]:
citation_pubs = factTable.withColumnRenamed("fos", "Field_of_Study").withColumn("cites", F.explode("references")).drop("references")
citation_pubs.show(5)

+--------------------+----------------+--------------------+--------------------+----------+---------+--------------------+
|      Publication_ID|  Field_of_Study|            Venue_ID|             Lang_ID|AuthorRank|Author_ID|               cites|
+--------------------+----------------+--------------------+--------------------+----------+---------+--------------------+
|5b66d491ab2dfb459...|Computer science|5390acc420f70186a...|4d1b04c0-ca9c-42a...|         1|     null|53e9a20fb7602d970...|
|5b66d491ab2dfb459...|Computer science|5390acc420f70186a...|4d1b04c0-ca9c-42a...|         1|     null|57a4e91aac44365e3...|
|5b66d491ab2dfb459...|Computer science|5390acc420f70186a...|4d1b04c0-ca9c-42a...|         1|     null|53e9b2d1b7602d970...|
|5b66d491ab2dfb459...|Computer science|5390acc420f70186a...|4d1b04c0-ca9c-42a...|         1|     null|5a260c3b17c44a4ba...|
|5b66d491ab2dfb459...|Computer science|5390acc420f70186a...|4d1b04c0-ca9c-42a...|         1|     null|53e9ba8fb7602d970...|
+-------

In [0]:
knowledge_graph_v = (factTable
					.select(F.col("Publication_ID").alias("id"))
					.union(factTable.select(F.col("Venue_ID").alias("id")))
					.union(factTable.select(F.col("fos").alias("id")))
					.distinct()
				  ).na.drop("any")

knowledge_graph_v.show(5)

+--------------------+
|                  id|
+--------------------+
|5b66d491ab2dfb459...|
|5b66d491ab2dfb459...|
|5b66d492ab2dfb459...|
|5b66d491ab2dfb459...|
|5b66d491ab2dfb459...|
+--------------------+
only showing top 5 rows



In [0]:
authorship_e = (citation_pubs
					.select(F.col("Author_ID").alias("src"), F.col("Publication_ID").alias("dst"))
					.withColumn("relationship", F.lit("Authorship"))
					.distinct()
				  )

co_authorship_e = (citation_pubs
					 .select(F.col("Author_ID").alias("src"), F.col("Author_ID").alias("dst"))
					 .withColumn("relationship", F.lit("Co-authorship"))
					 .distinct()
				  )

specialization_e = (citation_pubs
					 .select(F.col("Author_ID").alias("src"), F.col("Field_of_Study").alias("dst"))
					 .withColumn("relationship", F.lit("Specialization"))
					 .distinct()
				  )

association_e = (authors
					 .join(institutions.withColumnRenamed("id", "Org_ID"))
					 .select(F.col("_id").alias("src"), F.col("Org_ID").alias("dst"))
					 .withColumn("relationship", F.lit("works_for"))
					 .distinct()
				  )

citation_e = (citation_pubs
					  .select(F.col("Publication_ID").alias("src"), F.col("cites").alias("dst"))
					  .withColumn("relationship", F.lit("Cites"))
					  .distinct()
				  )

knowledge_graph_e = authorship_e.union(co_authorship_e).union(specialization_e).union(association_e).union(citation_e).na.drop("any")

knowledge_graph_e.show(5)

+--------------------+--------------------+------------+
|                 src|                 dst|relationship|
+--------------------+--------------------+------------+
|542cee34dabfae4bb...|5b66d491ab2dfb459...|  Authorship|
|562c7d1945cedb339...|5b66d491ab2dfb459...|  Authorship|
|53f49857dabfaeb9c...|5b66d491ab2dfb459...|  Authorship|
|5631a6cd45cedb339...|5b66d492ab2dfb459...|  Authorship|
|53f42afddabfaec09...|5b66d493ab2dfb459...|  Authorship|
+--------------------+--------------------+------------+
only showing top 5 rows



In [0]:
knowledge_graph = gf.GraphFrame(knowledge_graph_v, knowledge_graph_e)
knowledge_graph

Out[131]: GraphFrame(v:[id: string], e:[src: string, dst: string ... 1 more field])

In [0]:
motifs = knowledge_graph.find("(a)-[e1]->(b); (c)-[e2]->(b)").filter("e1.relationship == e2.relationship")
motifs.show()

In [0]:
result = knowledge_graph.connectedComponents()
display(result)