#### Executing this notebook depends on the Delta tables from project 1 being saved in DBFS!
In project 1, make sure to run chapters 1, 2, 3 fully to save the Delta tables.

#### Also make sure you have GraphFrames installed on the cluster!
https://spark-packages.org/package/graphframes/graphframes

(Choose the correct versions of Spark and Scala!)

In [None]:
# For visualization, also install NetworkX.
# Installing NetworkX via jar didn't work for me so here I install it via pip.
%pip install networkx

Python interpreter will be restarted.
Collecting networkx
  Downloading networkx-2.8.2-py3-none-any.whl (2.0 MB)
Installing collected packages: networkx
Successfully installed networkx-2.8.2
Python interpreter will be restarted.


In [None]:
import pyspark.sql.functions as F
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

import matplotlib.pyplot as plt
import graphframes as gf
import networkx as nx

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism) #shuffle partitions for faster processing of graph algorithms

Assignment for consistency!

In [None]:
author_df = authors_df
org_df = orgs_df
venue_df = venues_df

I'll skip this and use the created dataframs instead!

In [None]:
dblp_df = spark.read.load('dbfs:/user/dblpv13/dblp_table')
author_df = spark.read.load('dbfs:/user/dblpv13/author_table')
org_df = spark.read.load('dbfs:/user/dblpv13/org_table')
venue_df = spark.read.load('dbfs:/user/dblpv13/venue_table')

logger.info(f'Read table row counts:\n\tDBLP fact table: {dblp_df.count()}\n\tAuthor table: {author_df.count()}\n\tVenue table: {venue_df.count()}\n\tOrganization table: {org_df.count()}')

INFO:__main__:Read table row counts:
	DBLP fact table: 275859
	Author table: 406699
	Venue table: 13567
	Organization table: 8882


In [None]:
def lim_print(df, lim):
    display(df.limit(lim))

### Vertices
* Needs to contain column **id**

To each DF, add the vertex type. We will have 4 types of vertices: publications, authors, organizations, venues.

In [None]:
# Add type to each df and rename column 'ID' to 'id'.
dblp_df = dblp_df.withColumn('Type', F.lit('publication')).withColumnRenamed('ID', 'id')
author_df = author_df.withColumn('Type', F.lit('author')).withColumnRenamed('ID', 'id')
org_df = org_df.withColumn('Type', F.lit('org')).withColumnRenamed('ID', 'id')
venue_df = venue_df.withColumn('Type', F.lit('venue')).withColumnRenamed('ID', 'id')

# Union the relevant columns of each df into a single df.
_vertices = (dblp_df
            .select('id', 'Type')
            .union(author_df
                   .select('id', 'Type'))
            .union(org_df
                   .select('id', 'Type'))
            .union(venue_df
                   .select('id', 'Type')))

In [None]:
lim_print(_vertices, 10)

id,Type
53e99784b7602d9701f3f5fe,publication
53e99792b7602d9701f5af35,publication
53e99792b7602d9701f5b0ed,publication
53e99792b7602d9701f5b119,publication
53e99792b7602d9701f5b140,publication
53e99792b7602d9701f5b19a,publication
53e99792b7602d9701f5b1ba,publication
53e99792b7602d9701f5b1e7,publication
53e99792b7602d9701f5b2b3,publication
53e99792b7602d9701f5b2bc,publication


### Edges
* Need to contain columns **src** and **dst**

We will have 4 kinds of relationships: 
* isAuthorOf (Author -> Publication)
* cites (Publication -> Publication)
* hosts (Venue -> Publication)
* worksFor (Author -> Organization)

#### isAuthorOf

In [None]:
# Source is exploded dblp.Authors
# Destination is dblp.id
isAuthorOf_edges = (dblp_df
                    .withColumn('src', F.explode(dblp_df.Authors))
                    .withColumnRenamed('id', 'dst')
                    .withColumn('Type', F.lit('isAuthorOf'))
                    .select('src', 'Type', 'dst'))

In [None]:
lim_print(isAuthorOf_edges, 10)

src,Type,dst
53f46a22dabfaee0d9c3d5e5,isAuthorOf,53e99784b7602d9701f3f5fe
53f43a51dabfaec22baa659b,isAuthorOf,53e99792b7602d9701f5af35
53f3b3ffdabfae4b34b2dae9,isAuthorOf,53e99792b7602d9701f5af35
53f4333fdabfaeb22f451979,isAuthorOf,53e99792b7602d9701f5af35
542a6734dabfae646d55cc87,isAuthorOf,53e99792b7602d9701f5b0ed
5630ff9645cedb3399c3ca55,isAuthorOf,53e99792b7602d9701f5b119
53f4371cdabfaec22ba8766f,isAuthorOf,53e99792b7602d9701f5b119
54867430dabfae9b40133dc3,isAuthorOf,53e99792b7602d9701f5b119
56017d4445cedb3395e638f7,isAuthorOf,53e99792b7602d9701f5b140
54301e81dabfaeca69bca10d,isAuthorOf,53e99792b7602d9701f5b19a


#### cites

In [None]:
# Source is dblp.id
# Destination is exploded dblp.References
cites_edges = (dblp_df
               .withColumn('dst', F.explode(dblp_df.References))
               .withColumnRenamed('id', 'src')
               .withColumn('Type', F.lit('cites'))
               .select('src', 'Type', 'dst'))

In [None]:
lim_print(cites_edges, 10)

src,Type,dst
53e99784b7602d9701f3f5fe,cites,53e9a073b7602d9702957efa
53e99784b7602d9701f3f5fe,cites,53e9ad87b7602d970377bfb5
53e99784b7602d9701f3f5fe,cites,53e9be51b7602d9704b11381
53e99784b7602d9701f3f5fe,cites,53e9be04b7602d9704abb31d
53e99784b7602d9701f3f5fe,cites,53e9992bb7602d9702169236
53e99784b7602d9701f3f5fe,cites,53e998cdb7602d97021044db
53e99784b7602d9701f3f5fe,cites,53e9afa6b7602d97039f6054
53e99784b7602d9701f3f5fe,cites,53e99822b7602d9702044e60
53e99792b7602d9701f5af35,cites,53e9b6eeb7602d970427df40
53e99792b7602d9701f5af35,cites,53e9b6eeb7602d9704283b9f


#### hosts

In [None]:
# Source is dblp.Venue
# Destination is dblp.id
hosts_edges = (dblp_df
               .withColumnRenamed('id', 'dst')
               .withColumnRenamed('Venue', 'src')
               .withColumn('Type', F.lit('hosts'))
               .select('src', 'Type', 'dst'))

In [None]:
lim_print(hosts_edges, 10)

src,Type,dst
572de199d39c4f49934b3d5c,hosts,53e99784b7602d9701f3f5fe
54825226582fc50b5e05610e,hosts,53e99792b7602d9701f5af35
53a727f720f7420be8ba3092,hosts,53e99792b7602d9701f5b0ed
0377-2217,hosts,53e99792b7602d9701f5b119
5550376d7cea80f9541873d5,hosts,53e99792b7602d9701f5b140
555036db7cea80f9541603d7,hosts,53e99792b7602d9701f5b19a
53a7310820f7420be8d1bc69,hosts,53e99792b7602d9701f5b1ba
555037837cea80f95418b43e,hosts,53e99792b7602d9701f5b1e7
555037227cea80f95417540f,hosts,53e99792b7602d9701f5b2b3
53907df520f770854f6106bd,hosts,53e99792b7602d9701f5b2bc


#### worksFor

In [None]:
# Source is the first author of each publication
# Destination is the org of that publication
worksFor_edges = (dblp_df
                  .withColumn('src', dblp_df.Authors.getItem(0))
                  .withColumnRenamed('Org', 'dst')
                  .withColumn('Type', F.lit('worksFor'))
                  .select('src', 'Type', 'dst'))

In [None]:
lim_print(worksFor_edges, 10)

src,Type,dst
53f46a22dabfaee0d9c3d5e5,worksFor,5f71b2e91c455f439fe3f23f
53f43a51dabfaec22baa659b,worksFor,5f71b2bd1c455f439fe3dea6
542a6734dabfae646d55cc87,worksFor,5f71b6101c455f439fe555a5
5630ff9645cedb3399c3ca55,worksFor,5f71b29c1c455f439fe3d0d7
56017d4445cedb3395e638f7,worksFor,5f71b2f61c455f439fe3f847
54301e81dabfaeca69bca10d,worksFor,5f71b2841c455f439fe3c6c8
53f42c98dabfaeb22f3fc92d,worksFor,5f71b57c1c455f439fe515f1
53f43685dabfaec09f17df79,worksFor,5f71b4501c455f439fe491ff
53f427b6dabfaec09f0d9c8a,worksFor,5f71b2961c455f439fe3ce44
54096bf9dabfae8faa68e261,worksFor,5f71b2aa1c455f439fe3d5c6


Let's combine the edges into one DF.

In [None]:
_edges = isAuthorOf_edges.union(cites_edges).union(hosts_edges).union(worksFor_edges)

In [None]:
lim_print(_edges, 10)

src,Type,dst
53f46a22dabfaee0d9c3d5e5,isAuthorOf,53e99784b7602d9701f3f5fe
53f43a51dabfaec22baa659b,isAuthorOf,53e99792b7602d9701f5af35
53f3b3ffdabfae4b34b2dae9,isAuthorOf,53e99792b7602d9701f5af35
53f4333fdabfaeb22f451979,isAuthorOf,53e99792b7602d9701f5af35
542a6734dabfae646d55cc87,isAuthorOf,53e99792b7602d9701f5b0ed
5630ff9645cedb3399c3ca55,isAuthorOf,53e99792b7602d9701f5b119
53f4371cdabfaec22ba8766f,isAuthorOf,53e99792b7602d9701f5b119
54867430dabfae9b40133dc3,isAuthorOf,53e99792b7602d9701f5b119
56017d4445cedb3395e638f7,isAuthorOf,53e99792b7602d9701f5b140
54301e81dabfaeca69bca10d,isAuthorOf,53e99792b7602d9701f5b19a


### Build the GraphFrame

In [None]:
_graph = gf.GraphFrame(_vertices, _edges)

### Queries (Motif-finding)

Co-authorship network of an author. Given an Author's id, find all of the authors they have published some work with.

TODO: color nodes according to schema.

In [None]:
net = (_graph
       .filterEdges('Type = "isAuthorOf"')
       .find('(a)-[e1]->(b); (c)-[e2]->(b)')
       .filter('a.id == "542a67e9dabfae646d55d787" and c.id != a.id')) # Put the author ID here

In [None]:
display(net)

INFO:py4j.java_gateway:Received command  on object id 
INFO:py4j.java_gateway:Closing down callback connection


In [None]:
# Functions for drawing nice graphs.

def find_color(id2Type, color_map, special_vertex, v):
    if (special_vertex is not None) and v == special_vertex:
        return color_map['special']
    
    type = id2Type[id2Type['id'] == v]['Type'].values[0]
    
    return color_map[type]

def add_edges(g, df, edge_colnames):
    df_pandas = df.toPandas()

    # Iterate over the columns and rows.
    # From each row, add an edge with the src and dst.
    for colname in edge_colnames:
        for row in df_pandas[colname]:
            src, dst = row['src'], row['dst']
            g.add_edge(row['src'], row['dst'])
    
    
# Based on https://stackoverflow.com/a/57784880
def plot_graph(df, edge_colnames, special_vertex = None):
    """ Inputs: 
            df - DataFrame of the graph you want to plot 
            edge_colnames - iterable of df column names that hold the edges of the graph
            special_vertex - id of the vertex we want to color yellow (special color)
    """
    g = nx.DiGraph()
    
    add_edges(g, df, edge_colnames)
       
    # Get only the vertices columns from the df.
    vertices = df.select([c for c in df.columns if c not in edge_colnames])
    
    id2Type = (vertices
               .withColumn('collected', F.array(vertices.columns))
               .withColumn('exploded', F.explode(F.col('collected')))
               .withColumn('id', F.col('exploded.id'))
               .withColumn('Type', F.col('exploded.Type'))
               .select('id', 'Type')
               .distinct()).toPandas()
    
    color_map = {'publication': '#f54040', 
                 'venue': '#d40ee6', 
                 'org': '#77e637', 
                 'special': '#ffc61c', 
                 'author': '#1da6f5'}
            
    plt.figure(figsize=(13, 13))  
    nx.draw(g, node_color = [find_color(id2Type, color_map, special_vertex, v) for v in g])

**Legend for visualizations:**
* Red - publications
* Purple - venues
* Blue - authors
* Green - organizations
* Yellow - special vertex (vertex that the query started from)

Given an org ID, which authors work for that org. - Martin

In [None]:
author_org = (_graph
             .filterEdges('Type = "worksFor"')
             .find('(a)-[e]->(b)')
             .filter("a.Type = 'author'"))

In [None]:
lim_print(author_org, 10)

INFO:py4j.java_gateway:Received command  on object id 
INFO:py4j.java_gateway:Closing down callback connection


a,e,b
"List(53f32d2cdabfae9a84499372, author)","List(53f32d2cdabfae9a84499372, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(54857187dabfae8a11fb29ae, author)","List(54857187dabfae8a11fb29ae, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f46a91dabfaedd74e7a89b, author)","List(53f46a91dabfaedd74e7a89b, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f43842dabfaedce5540585, author)","List(53f43842dabfaedce5540585, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f46409dabfaee2a1da57b9, author)","List(53f46409dabfaee2a1da57b9, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f431e3dabfaee2a1cb630b, author)","List(53f431e3dabfaee2a1cb630b, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f43315dabfaeb1a7bd2a13, author)","List(53f43315dabfaeb1a7bd2a13, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f42e74dabfaedf435248b2, author)","List(53f42e74dabfaedf435248b2, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(56029d9845cedb3395ff8e95, author)","List(56029d9845cedb3395ff8e95, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"
"List(53f47d54dabfaee43ed4ccff, author)","List(53f47d54dabfaee43ed4ccff, worksFor, 5f71b2811c455f439fe3c5a5)","List(5f71b2811c455f439fe3c5a5, org)"


Given a publication ID, what organizations have the authors of this publication worked for. - Peep

In [None]:
net = (_graph
       .filterEdges('Type = "hosts" | Type = "isAuthorOf"')
       #.find('(a)-[e1]->(b); (c)-[e2]->(b)')
       #.filter('a.id == "542a67e9dabfae646d55d787" and c.id != a.id')) # Put the author ID here
      )

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-4160452806164338>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m net = (_graph
[0m[1;32m      2[0m        [0;34m.[0m[0mfilterEdges[0m[0;34m([0m[0;34m'Type = "hosts" | Type = "isAuthorOf"'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m        [0;31m#.find('(a)-[e1]->(b); (c)-[e2]->(b)')[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m        [0;31m#.filter('a.id == "542a67e9dabfae646d55d787" and c.id != a.id')) # Put the author ID here[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m       )

[0;32m/local_disk0/spark-a397fe1e-d531-46cb-a45c-d11048cf569f/userFiles-d2d0bf1c-e6e9-4dcc-9269-5ba236be23e3/addedFile797076123881782270180adf294_e3f9_420a_8cfd_49bd87fe6264_graphframes_0_8_2_spark3_2_s_2_12_2e19d-cc0e1.jar/graphframes/graphframe.py[0m i

Given a Venue ID, find all authors who have been hosted by that venue. - Martin

In [None]:
hostedAuthor = (_graph.find('(a)-[e1]->(b); (c)-[e2]->(b)')
                .select("a", "b", "c")
                .where("a.id != c.id")
                .where("a.Type = 'venue'")
                .filter("c.Type = 'author'"))

In [None]:
lim_print(hostedAuthor, 10)

INFO:py4j.java_gateway:Received command  on object id 
INFO:py4j.java_gateway:Closing down callback connection


a,b,c
"List(53a7310820f7420be8d1bc69, venue)","List(53e99792b7602d9701f5b1ba, publication)","List(53f42c98dabfaeb22f3fc92d, author)"
"List(53a7310820f7420be8d1bc69, venue)","List(53e99792b7602d9701f5b1ba, publication)","List(53f64e31dabfae6a71b6029f, author)"
"List(53a7310820f7420be8d1bc69, venue)","List(53e99792b7602d9701f5b1ba, publication)","List(53f47307dabfaee02adc6671, author)"
"List(53a72dfa20f7420be8c7a2e9, venue)","List(53e99796b7602d9701f5c118, publication)","List(53f436aedabfaedd74da3090, author)"
"List(53a72dfa20f7420be8c7a2e9, venue)","List(53e99796b7602d9701f5c118, publication)","List(53f43588dabfaee02acc2bf2, author)"
"List(53a72dfa20f7420be8c7a2e9, venue)","List(53e99796b7602d9701f5c118, publication)","List(53f42dd3dabfaeb22f40e049, author)"
"List(53a72dfa20f7420be8c7a2e9, venue)","List(53e99796b7602d9701f5c118, publication)","List(53f44b34dabfaec09f1dc245, author)"
"List(54824c66582fc50b5e008eb0, venue)","List(53e99796b7602d9701f61877, publication)","List(5447fe7fdabfae87b7dbe090, author)"
"List(54824c66582fc50b5e008eb0, venue)","List(53e99796b7602d9701f61877, publication)","List(53f45b89dabfaee1c0b408d5, author)"
"List(54824c66582fc50b5e008eb0, venue)","List(53e99796b7602d9701f61877, publication)","List(562055d045cedb3398260a20, author)"


### Analytics

PageRank - P

connected components - M

In [None]:
#Connected component membership of each node
sc.setCheckpointDir("/tmp/cc-checkpoint-dir") #Connected Components algorithm requires setting a Spark checkpoint directory to improve performance
cc = _graph.connectedComponents()

INFO:py4j.java_gateway:Received command  on object id 
INFO:py4j.java_gateway:Closing down callback connection
Out[273]: DataFrame[id: string, Type: string, component: bigint]

In [None]:
lim_print((cc.where("component != 0").orderBy(F.desc("component"))), 10)

id,Type,component
53f438cedabfaec09f194061,author,34359758087
53f45616dabfaec09f20571a,author,34359758087
5f71b2b11c455f439fe3d93f,org,34359758087
53f43275dabfaee02aca134f,author,34359758087
53907b3e20f770854f5e7c6e,venue,34359758087
53e999b4b7602d97021f9ebf,publication,34359758087
53e999f5b7602d970223c6e5,publication,25769832585
53f43575dabfaedf43577a1a,author,25769832585
5405947adabfae8faa5e4af7,author,25769832585
5390795020f770854f5b658f,venue,25769832585


bfs between two publications: returns a path from one node to another. To find out if a publication indirectly references another publication. -P

in-degrees of a publication. How many authors this publication has. -M

In [None]:
#inDegrees for the authors
#Given a publication ID, we can count the number of edges in
authorInDeg = (_graph
               ._edges
               .where("Type = 'isAuthorOf'")
               .groupBy("src", "dst")
               .count()
               .orderBy(F.desc("count")))

In [None]:
lim_print(authorInDeg, 10)

src,dst,count
5433d319dabfaebba582a12f,53e99967b7602d97021a6cc0,3
53f47214dabfaec22bb52474,53e9987db7602d97020b7f77,3
53f44978dabfaec22bad49a6,53e99a85b7602d97022f6eae,3
53f468ecdabfaeecd6a16d70,53e998dbb7602d9702114407,2
5408d43edabfae450f445750,53e9997eb7602d97021bcb6f,2
53f4d27bdabfaef0eff8123d,53e9989bb7602d97020d3f6e,2
5602514845cedb3395f6a356,53e99998b7602d97021dd412,2
53f438d7dabfaefedbae0141,53e9990db7602d970214c07e,2
53f432eddabfaee2a1cc29ae,53e99953b7602d970219466f,2
53f44adadabfaeee22a093cf,53e9997eb7602d97021bcb6f,2


In [None]:
inDeg = (_graph.inDegrees
        .orderBy(F.desc("inDegree")))

In [None]:
lim_print(inDeg, 10)

id,inDegree
5f71b5c41c455f439fe532ea,827
5f71b2811c455f439fe3c5cb,729
5f71b2861c455f439fe3c771,690
5f71b28b1c455f439fe3c989,630
5f71b2841c455f439fe3c67b,567
5f71b2831c455f439fe3c634,511
5f71b29a1c455f439fe3d023,459
5f71b2841c455f439fe3c6b7,454
5f71b2831c455f439fe3c633,445
5f71b5e01c455f439fe53f9a,443


out-degrees of a venue. How many publications has a venue hosted. -P

traingle count -M

In [None]:
#Compute number of triangles passing through each node
triangle_count = _graph.triangleCount()
lim_print((triangle_count.select("id", "count")), 10)

INFO:py4j.java_gateway:Received command  on object id 
INFO:py4j.java_gateway:Closing down callback connection


id,count
53e99792b7602d9701f5b1ba,1
53e99792b7602d9701f5b140,0
53e99792b7602d9701f5b2b3,0
53e99792b7602d9701f5af35,0
53e99792b7602d9701f5b19a,0
53e99784b7602d9701f3f5fe,0
53e99792b7602d9701f5b2bc,3
53e99792b7602d9701f5b119,0
53e99792b7602d9701f5b1e7,0
53e99792b7602d9701f5b0ed,0


triplets -P