# P2.1 Graph Creation
Artjom, Heidi, Kaja, Rasmus

This is the first notebook of a 2nd project in course Big Data Management. In this notebook we create the graph dataframes, save them and look at the basic properties.

Call out Delta tables after clusters are started or restarted again.

In [0]:
spark.sql("CREATE TABLE publication USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/publication'")
spark.sql("CREATE TABLE author USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/author'")
spark.sql("CREATE TABLE organization USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/organization'")
spark.sql("CREATE TABLE venue USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/venue'")
spark.sql("CREATE TABLE type USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/type'")
spark.sql("CREATE TABLE fieldofstudies USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/fieldofstudies'")
spark.sql("CREATE TABLE language USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/language'")
spark.sql("CREATE TABLE keyword USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/keyword'")
spark.sql("CREATE TABLE reference USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/reference'")
spark.sql("CREATE TABLE authorrank USING DELTA LOCATION 'dbfs:/tmp/data/warehouse/authorrank'")

Out[1]: DataFrame[]

In [0]:
%sql
select 'publication' as tbl, count(*) from publication group by 1 union
select 'author' as tbl, count(*) from author group by 1 union
select 'organization' as tbl, count(*) from organization group by 1 union
select 'venue' as tbl, count(*) from venue group by 1 union
select 'type' as tbl, count(*) from type group by 1 union
select 'fieldofstudies' as tbl, count(*) from fieldofstudies group by 1 union
select 'language' as tbl, count(*) from language group by 1 union
select 'keyword' as tbl, count(*) from keyword group by 1 union
select 'reference' as tbl, count(*) from reference group by 1 union
select 'authorrank' as tbl, count(*) from authorrank group by 1

tbl,count(1)
publication,7168
author,15739
organization,3375
venue,3360
type,8
fieldofstudies,42
language,1
keyword,5127
reference,46929
authorrank,34222


In [0]:
import graphframes as gf
import pyspark.sql.functions as F
import networkx as nx
import matplotlib.pyplot as plt

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

### Importing references (skip this if `references` dimension is already created)

In our Extract part the dimenson with references was not implemented properly. Here we fix this by preprocessing and transforming our json splits in a way similar to what was done with other dimension tables. The reference dimension contains now the publication ID common for our warehouse (integers) and references of given publication shown by original source IDs (strings).

In [0]:
#dbutils.fs.rm('dbfs:/tmp/data/warehouse/references',recurse=True)

In [0]:
def get_references(filename):
    print(filename)
    dbutils.fs.cp("file:/databricks/driver/data/" + filename,"dbfs:/tmp/data/source")
    filename = "dbfs:/tmp/data/source/" + filename
    print("Transform ...")
    data = spark.read.schema(df_schema).option("multiline",True).json(filename)
    data = data.select(
        F.col("_id").alias("PublicationID"),
        F.col("references").alias("References")
    )
    (data
     .filter(F.col("PublicationID").isNotNull())
     .filter(F.col("References").isNotNull()
     )
     .select("PublicationID", F.explode("References").alias("RefPublicationID"))
      .dropDuplicates()
      .write.format("delta").mode("overwrite")
      .option("overwriteSchema", "true")
      .saveAsTable("sourceRef"))
    print("Load ...")
    query_ref = """
      CREATE TABLE IF NOT EXISTS reference (
      PublicationID INT, RefPublSourceID STRING)
      USING DELTA LOCATION '/tmp/data/warehouse/references';"""
    spark.sql(query_ref)
    query_merge_ref = """
      MERGE INTO reference tgt
      USING ( SELECT publ.ID as PublicationID, ref.RefPublicationID as RefPublSourceID
              FROM sourceRef ref
              LEFT JOIN publication publ ON publ.PublicationID = ref.PublicationID
              where publ.ID IS NOT NULL
            ) src
      ON tgt.PublicationID = src.PublicationID AND tgt.RefPublSourceID = src.RefPublSourceID
      WHEN NOT MATCHED THEN INSERT (PublicationID, RefPublSourceID)
      VALUES (src.PublicationID, src.RefPublSourceID); 
      """
    spark.sql(query_merge_ref)


In [0]:
#for i in range(1,106):
#    get_references(f'dblpv13_22_{i}.json')

In [0]:
%sql select * from reference

PublicationID,RefPublSourceID
1626,53e9b93eb7602d970452bd3f
1751,53e9b4c3b7602d9703fe0e23
1265,53e9acb5b7602d9703690c92
1341,53e9ba76b7602d970469b44a
1018,53e9b7bbb7602d9704367142
1761,53e9b930b7602d970451cd21
1786,53e9a743b7602d970307c82e
1581,53e9bd87b7602d9704a28d02
1251,53e9b8e1b7602d97044c794e
1022,53e998fdb7602d970213ec22


Our reference table in Delta WH holds the source IDs of the referenced publications as then we don't loose them before they have information. But for creating publication to publication relationships between meaningful publications, we find the DW ID for loaded referenced publications. In the following query we get to know how much referenced publications we are left.

In [0]:
%sql 
select case when p.ID is null then 0 else 1 end as Existing, count(*)
from reference r 
left join publication p on p.PublicationID = r.RefPublSourceID
group by 1

Existing,count(1)
1,239
0,46690


Now we create our graph consistng on vertices and edges as discussed below.

### Vertices

- Authors
- Organizations
- Field of Studies 
- Venues
- Publication

We create corresponding delta tables from our warehouse dimension tables and then union them all together into `df_vertices` dataframe. Note that in dimension tables we have entries IDs given by integer numbers. Below we transform these integer IDs into strings by adding suffix. This gives us the way to differentiate between different type of vertices by means of their IDs.

In [0]:
df_publ = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/publication")
       .selectExpr("ID", "'P' as T", "Title", "'Publication' as type")
       .select(F.concat(F.col("T"),F.col("ID")).alias("id"),F.col("Title").alias("name"),"type")
      )
display(df_publ)

id,name,type
P845,Blind Estimation of MIMO Channels Using Genetic Algorithm,Publication
P846,Constrained k-closest pairs query processing based on growing window in crime databases,Publication
P847,Ranking Search Intents Underlying a Query.,Publication
P848,Threshold hypergraphs,Publication
P849,Probability propagation,Publication
P850,Minitrack Introduction,Publication
P851,Quantum Hypercomputation,Publication
P852,Probabilistic robotics,Publication
P853,A tangible user interface for multi-user awareness systems,Publication
P854,Height-based deformation and ray supersampling for colon unfolding,Publication


In [0]:
df_auth = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/author")
       .selectExpr("*", "'A' as T", "' ' as S", "'Author' as type")
       .select(
           F.concat(F.col("T"),F.col("ID")).alias("id"),
           F.concat(F.col("FirstName"),F.col("S"),
                    F.when(F.col("MiddleName").isNotNull(),F.concat(F.col("MiddleName"),F.col("S"))).otherwise(""),
                    F.col("LastName")).alias("name"),
           "type")
      )
display(df_auth)

id,name,type
A1,Nicolò Cesa-Bianchi,Author
A2,Abdelwadood Mesleh,Author
A3,Kathryn Dempsey,Author
A4,Long Zhang,Author
A5,Tanja Schmedes,Author
A6,R. Van Buskirk,Author
A7,J A Foster,Author
A8,Hugh Couchman,Author
A9,Rainer Manthey,Author
A10,Jian-Ming Jin,Author


In [0]:
df_org = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/organization")
       .selectExpr("ID", "'O' as T", "Organization", "'Organization' as type")
       .select(F.concat(F.col("T"),F.col("ID")).alias("id"),F.col("Organization").alias("name"),"type")
      )
display(df_org)

id,name,type
O660,"Institut für Informationswissenschaft,Universität Graz,Graz,Österreich",Organization
O661,"Department of Mathematics, California State College, Sonoma, Rohnert Park, California 94928 USA",Organization
O662,"Atostek Ltd., Hermiankatu 8 D, FIN-33720 Tampere, Finland",Organization
O663,MFF University Karlovy,Organization
O664,"University of Mannheim, Germany",Organization
O665,"GTE Labs., Inc., Waltham, MA",Organization
O666,"University of Newcastle, Callaghan, NSW, Australia",Organization
O667,"The Netherlands Media, The Netherlands",Organization
O668,"CMI, 39 rue Joliot-Curie, 13453 Marseille Cedex 13, France",Organization
O669,"University of Innsbruck,Innsbruck,Österreich",Organization


In [0]:
df_ven = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/venue")
       .selectExpr("ID", "'V' as T", "VenueAbreviation", "'Venue' as type")
       .select(F.concat(F.col("T"),F.col("ID")).alias("id"),F.col("VenueAbreviation").alias("name"),"type")
      )
display(df_ven)

id,name,type
V569,PROGRAM-ELECTRONIC LIBRARY AND INFORMATION SYSTEMS,Venue
V570,Computers & Mathematics with Applications,Venue
V571,ICASSP (5),Venue
V572,Programming and Computer Software,Venue
V573,ICC,Venue
V574,"IEEE Transactions on Systems, Man, and Cybernetics, Part C",Venue
V575,"Combinatorics, Probability & Computing",Venue
V576,OOPSLA,Venue
V577,LATIN,Venue
V578,SIBGRAPI,Venue


In [0]:
df_fos = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/fieldofstudies")
       .selectExpr("ID", "'F' as T", "FieldOfStudy", "'FieldOfStudy' as type")
       .select(F.concat(F.col("T"),F.col("ID")).alias("id"),F.col("FieldOfStudy").alias("name"),"type")
      )
display(df_fos)

id,name,type
F1,Information sciences,FieldOfStudy
F2,Mathematics,FieldOfStudy
F3,Biology Science,FieldOfStudy
F4,Earth Sciences,FieldOfStudy
F5,Computer Sciences,FieldOfStudy
F6,"Electrical, electronic and information engineering",FieldOfStudy
F7,Civil engineering,FieldOfStudy
F8,Physical sciences,FieldOfStudy
F9,Mechanical engineering,FieldOfStudy
F10,Chemical sciences,FieldOfStudy


In [0]:
df_vertices = (df_publ
               .union(df_auth)
               .union(df_org)
               .union(df_ven)
               .union(df_fos))
display(df_vertices)

id,name,type
P845,Blind Estimation of MIMO Channels Using Genetic Algorithm,Publication
P846,Constrained k-closest pairs query processing based on growing window in crime databases,Publication
P847,Ranking Search Intents Underlying a Query.,Publication
P848,Threshold hypergraphs,Publication
P849,Probability propagation,Publication
P850,Minitrack Introduction,Publication
P851,Quantum Hypercomputation,Publication
P852,Probabilistic robotics,Publication
P853,A tangible user interface for multi-user awareness systems,Publication
P854,Height-based deformation and ray supersampling for colon unfolding,Publication


In [0]:
df_vertices.groupBy("type").count().show()

+------------+-----+
|        type|count|
+------------+-----+
| Publication| 7168|
|      Author|15739|
|Organization| 3375|
|       Venue| 3360|
|FieldOfStudy|   42|
+------------+-----+



### Edges

- Authorship (Author, Publication)
- Co-authorship (Author, Author)
- WorksFor (Author, Organization)
- Specializations (Author, FieldOfStudy) 
- Cites (Publication, Publication)

Here we do in a way similar to the case with vertices. We use created string IDs to identify edge source and destination. We also provide edge type and type-dependent rank as follows
- Authorship rank is calculated as following. We take authors rank from the list of publication authors, take inverse of this rank, and multiply the result by 10, rounding the result and adding 1 (avoiding 0). As a result, authorship rank shows an "importance" of the author for publication given: maximum authorship rank (11) means that its the first author of the publication 
- Co-authorship rank indicates the number of common publications of two authors
- WorksFor rank shows how many publications were completed by author being affiliated in given organisation
- Specialization rank shows how many publications were done by author under given field of study
- Cites rank has no particular meaning

In [0]:
df_auth_publ = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/authorrank")
       .selectExpr("AuthorID", "'A' as T1", "PublicationID", "'P' as T2", "'Authorship' as type", "int((1/AuthorRank)*10+1) as AuthorRank")
       .select(F.concat(F.col("T1"),F.col("AuthorID")).alias("src"),
               F.concat(F.col("T2"),F.col("PublicationID")).alias("dst"),
               "type", F.col("AuthorRank").alias("rank"))
       .dropDuplicates()
      )
display(df_auth_publ)

src,dst,type,rank
A6111,P2495,Authorship,6
A5666,P2502,Authorship,2
A5745,P2503,Authorship,6
A7641,P2508,Authorship,11
A6415,P2510,Authorship,3
A5343,P2518,Authorship,6
A5271,P2521,Authorship,11
A6857,P2522,Authorship,6
A6836,P2539,Authorship,3
A5207,P2552,Authorship,11


In [0]:
df_auth_publ2 = df_auth_publ.select(F.col("src").alias("auth2"),F.col("dst").alias("publ2"))
df_auth_publ1 = df_auth_publ.select(F.col("src").alias("auth1"),F.col("dst").alias("publ1"))

df_auth_auth = (df_auth_publ1
                .join(df_auth_publ2,df_auth_publ1.publ1==df_auth_publ2.publ2,'left')
                .filter(F.col("auth1")!=F.col("auth2"))
                .selectExpr("*","'Co-authorship' as type")
                .groupBy("auth1","auth2","type").count()
                .toDF("src","dst","type","rank")
               )
display(df_auth_auth)

src,dst,type,rank
A5666,A6381,Co-authorship,1
A7641,A7518,Co-authorship,1
A6836,A6335,Co-authorship,1
A6592,A7429,Co-authorship,1
A7120,A6545,Co-authorship,1
A7120,A6781,Co-authorship,1
A6722,A5453,Co-authorship,1
A6722,A4974,Co-authorship,1
A5230,A7110,Co-authorship,1
A5297,A7247,Co-authorship,1


In [0]:
df_auth_org = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/authorrank")
       .selectExpr("PublicationID", "AuthorID", "'A' as T1", "OrganizationID", "'O' as T2", "'WorksFor' as type")
       .filter(F.col("OrganizationID").isNotNull())
       .select(F.concat(F.col("T1"),F.col("AuthorID")).alias("src"),
               F.concat(F.col("T2"),F.col("OrganizationID")).alias("dst"),
               "type", "PublicationID")
       .groupBy("src","dst","type").count()
       .toDF("src","dst","type","rank")
      )
display(df_auth_org)

src,dst,type,rank
A5950,O793,WorksFor,1
A5950,O766,WorksFor,1
A7695,O1971,WorksFor,1
A7139,O2017,WorksFor,1
A6478,O1837,WorksFor,1
A6943,O2377,WorksFor,1
A6688,O2236,WorksFor,1
A6688,O1895,WorksFor,1
A5801,O228,WorksFor,1
A5691,O400,WorksFor,1


In [0]:
df_auth_fos = (spark.read.format("delta")
       .load("dbfs:/tmp/data/warehouse/authorrank")
       .selectExpr("PublicationID", "AuthorID", "'A' as T1", "FieldOfStudyID", "'F' as T2", "'Specialization' as type")
       .filter(F.col("FieldOfStudyID").isNotNull())
       .select(F.concat(F.col("T1"),F.col("AuthorID")).alias("src"),
               F.concat(F.col("T2"),F.col("FieldOfStudyID")).alias("dst"),
               "type", "PublicationID")
       .dropDuplicates()
       .groupBy("src","dst","type").count()
       .toDF("src","dst","type","rank")
      )
display(df_auth_fos)

src,dst,type,rank
A6857,F5,Specialization,1
A2094,F5,Specialization,1
A6550,F5,Specialization,1
A4974,F5,Specialization,1
A7150,F5,Specialization,1
A5842,F5,Specialization,1
A6325,F5,Specialization,1
A7055,F5,Specialization,1
A6680,F5,Specialization,1
A7037,F5,Specialization,1


In [0]:
df_publ = (spark.read.format("delta")
                .load("dbfs:/tmp/data/warehouse/publication"))
df_publ = df_publ.select(F.col("PublicationID").alias("RefPublSID"),F.col("ID").alias("RefPublicationID"))
df_ref = (spark.read.format("delta")
                .load("dbfs:/tmp/data/warehouse/reference"))

df_publ_publ = (df_ref
                .join(df_publ,df_publ.RefPublSID==df_ref.RefPublSourceID)
                .selectExpr("PublicationID", "'P' as T1", "RefPublicationID", "'P' as T2", "'Cites' as type")
                .select(F.concat(F.col("T1"),F.col("PublicationID")).alias("src"),
                        F.concat(F.col("T2"),F.col("RefPublicationID")).alias("dst"),
                        "type")
                .groupBy("src","dst","type").count()
                .toDF("src","dst","type","rank")
               )
display(df_publ_publ)

src,dst,type,rank
P2501,P4073,Cites,1
P2627,P3089,Cites,1
P5042,P4114,Cites,1
P4205,P4467,Cites,1
P3483,P3483,Cites,1
P4289,P5650,Cites,1
P4868,P5305,Cites,1
P5045,P5055,Cites,1
P2993,P5148,Cites,1
P2734,P4710,Cites,1


In [0]:
print("Authorships: ", df_auth_publ.count())
print("Co-authorships: ", df_auth_auth.count())
print("WorksFor: ", df_auth_org.count())
print("Specializations: ", df_auth_fos.count())
print("Cites: ", df_publ_publ.count())

Authorships:  16984
Co-authorships:  40896
WorksFor:  17430
Specializations:  12529
Cites:  239


In [0]:
df_edges = (df_auth_publ
            .union(df_auth_auth)
            .union(df_auth_org)
            .union(df_auth_fos)
            .union(df_publ_publ)
           )
display(df_edges)

src,dst,type,rank
A6111,P2495,Authorship,6
A5666,P2502,Authorship,2
A5745,P2503,Authorship,6
A7641,P2508,Authorship,11
A6415,P2510,Authorship,3
A5343,P2518,Authorship,6
A5271,P2521,Authorship,11
A6857,P2522,Authorship,6
A6836,P2539,Authorship,3
A5207,P2552,Authorship,11


### Graph

Below we create the graph with vertices and edges made above and make formal analysis

In [0]:
graph = gf.GraphFrame(df_vertices, df_edges)

In [0]:
dbutils.fs.ls('dbfs:/tmp/data')
dbutils.fs.mkdirs('dbfs:/tmp/data/graph')

Out[21]: True

In [0]:
graph.vertices.write.parquet("dbfs:/tmp/data/graph/vertices")
graph.edges.write.parquet("dbfs:/tmp/data/graph/edges")

#### Properties:

Distributions of node degrees, indegrees and outdegrees

In [0]:
display(graph.degrees) # total edges connected to a vertice

id,degree
A6592,12
P2654,3
P2699,4
A6608,7
P2774,3
A7320,7
P2823,2
A6101,7
P2882,2
A7280,14


In [0]:
display(graph.inDegrees) # incoming edges

id,inDegree
P2654,3
P2699,4
P2774,3
P2823,2
P2882,2
P2891,6
P2969,3
P2998,4
P3002,2
P3039,3


In [0]:
display(graph.outDegrees) # outgoing edges

id,outDegree
A6592,7
A6608,5
A7320,6
A6101,5
A7280,9
A6068,5
A7294,7
A5905,3
A5755,11
A5534,9


Counting triangles

In [0]:
results = graph.triangleCount()
display(results.select("id", "count"))

id,count
P845,10
P847,36
P850,3
P855,0
P863,0
P866,6
P868,0
P883,0
P906,3
P910,1


In [0]:
results_co_auth = graph.filterEdges("type = 'Co-authorship'").triangleCount()
display(results_co_auth.select("id", "count").orderBy("count", ascending=False))

id,count
A214,499
A1288,499
A1427,496
A1383,496
A1514,496
A1120,496
A1307,496
A1306,496
A900,496
A1135,496


Who are these TOP2 authors:

In [0]:
%sql select * from author where ID in (214,1288) 

ID,AuthorID,FirstName,MiddleName,LastName
214,53f32deddabfae9a8449d523,Martin,,Nordio
1288,53f430fbdabfaeb22f435e19,Bertrand,,Meyer


In [0]:
df_triplets = (graph.triplets
               .select(F.col("src.type").alias("src_type"),F.col("edge.type").alias("edge_type"),F.col("dst.type").alias("dst_type"))
               .groupby("src_type","edge_type","dst_type").count()
              )
display(df_triplets)

src_type,edge_type,dst_type,count
Author,Authorship,Publication,16984
Author,Co-authorship,Author,40896
Author,WorksFor,Organization,17430
Author,Specialization,FieldOfStudy,12529
Publication,Cites,Publication,239
