# GraphGeeks - Knowledge Graph Construction with GraphFrames Demo

This is a demonstration of the key sourcery of GraphFrames for building large knowledge graphs. 

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql import DataFrame, SparkSession
from graphframes import GraphFrame

### Configure Paths for Data Loading

In [3]:
# Change me if you download a different stackexchange site
STACKEXCHANGE_SITE: str = "stats.meta.stackexchange.com"
BASE_PATH: str = f"python/graphframes/tutorials/data/{STACKEXCHANGE_SITE}"
NODE_PATH: str = f"{BASE_PATH}/Nodes.parquet"
EDGE_PATH: str = f"{BASE_PATH}/Edges.parquet"

### Setup our `SparkSession`

In [66]:
# Create SparkConf with memory settings
conf = (
    SparkConf()
    .setAppName("GraphGeeks Demo")
    .set("spark.driver.memory", "32g")
    .set("spark.executor.memory", "4g")
    .set("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
    .set("spark.checkpoint.dir", "/tmp/spark-checkpoint")
    .set("spark.sql.caseSensitive", True)
)


# Create SparkSession with SparkConf
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoint")

### Load the Stack Exchange Archive for `stats.meta.stackexchange.com`

In [67]:
# Load our nodes and edges... in GraphFrames we call nodes vertices
vertex_df: DataFrame = spark.read.parquet(NODE_PATH)
edge_df: DataFrame = spark.read.parquet(EDGE_PATH)

### Instantiate a GraphFrame

In [68]:
# Instantiate our GraphFrame
g: GraphFrame = GraphFrame(vertex_df, edge_df)

### How much data we got?

In [69]:
# Count the nodes and edges
print(f"Vertex count: {g.vertices.count():,}")
print(f"Edge count: {g.edges.count():,}")

Vertex count: 129,751
Edge count: 97,104


### What types of nodes do we have?

In [70]:
# What kind of nodes we do we have to work with?
node_counts = (
    vertex_df
    .select("id", F.col("Type").alias("Node Type"))
    .groupBy("Node Type")
    .count()
    .orderBy(F.col("count").desc())
    # Add a comma formatted column for display
    .withColumn("count", F.format_number(F.col("count"), 0))
)

# You can use .show() or .toPandas() for small tables
node_counts.toPandas()

Unnamed: 0,Node Type,count
0,Badge,43029
1,Vote,42593
2,User,37709
3,Answer,2978
4,Question,2025
5,PostLinks,1274
6,Tag,143


In [71]:
# What kind of edges do we have to work with?
edge_counts = (
    g.edges
    .select("src", "dst", F.col("relationship").alias("Edge Type"))
    .groupBy("Edge Type")
    .count()
    .orderBy(F.col("count").desc())
    # Add a comma formatted column for display
    .withColumn("count", F.format_number(F.col("count"), 0))
)
edge_counts.toPandas()

Unnamed: 0,Edge Type,count
0,Earns,43029
1,CastFor,40701
2,Tags,4427
3,Answers,2978
4,Posts,2767
5,Asks,1934
6,Links,1180
7,Duplicates,88


## A Battery of Graph Algorithms!

### PageRank

This week, I will add weighted PageRank - the `weight` column already gets used, it just can't exist at the start... :)

In [12]:
g2 = g.pageRank(resetProbability=0.15, maxIter=10)

g2.vertices.select("id", "Type", "pagerank").toPandas()

25/03/25 07:33:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,id,Type,pagerank
0,0e0f90e5-f19e-41b0-a51c-28d1ec143a28,User,0.555685
1,0e9226a0-39bd-4209-9227-8e3904a1b595,Vote,0.555685
2,be615286-8adb-4159-9372-ae4fa1054053,Vote,0.555685
3,1db69632-4e45-42f2-be93-9dec9c7e76bd,Badge,0.713129
4,58af9aca-66f4-4336-b0b4-77bb72b2710b,Badge,1.028017
...,...,...,...
129746,1ce0d944-7ff7-4ff9-a048-304ca5a2eda8,Vote,0.555685
129747,b464f70a-f4ba-4bd9-b744-ccf532cfa6e4,Badge,0.608166
129748,f8b45f59-9498-4f29-8f0e-0dd75595254d,Vote,0.555685
129749,16d081a4-fba9-41e3-b10d-fc4c3b4fec35,Answer,1.028017


### Connected Components - Drives Many Entity Resolution Processes!

In [42]:
component_node_df = g.connectedComponents().select("id", "component")

25/03/25 08:10:14 WARN CacheManager: Asked to cache already cached data.
                                                                                

### How many connected components of what size?

In [43]:
component_node_df.groupBy("component").count().orderBy(F.col("count").desc()).limit(20).toPandas()

                                                                                

Unnamed: 0,component,count
0,1,56442
1,17179869740,18
2,34359738458,18
3,103079215642,18
4,111669150065,17
5,77,17
6,420906795038,17
7,8589934811,16
8,480,16
9,17179869561,16


### And Strongly Connected Components?

In [24]:
strong_component_node_df = g.stronglyConnectedComponents(maxIter=3).select("id", "component")

25/03/25 08:00:51 WARN BlockManager: Block rdd_5740_1 already exists on this machine; not re-adding it
25/03/25 08:00:51 WARN BlockManager: Block rdd_5740_3 already exists on this machine; not re-adding it
25/03/25 08:00:53 WARN BlockManager: Block rdd_5772_2 already exists on this machine; not re-adding it
25/03/25 08:01:15 WARN BlockManager: Block rdd_6229_1 already exists on this machine; not re-adding it
25/03/25 08:01:16 WARN BlockManager: Block rdd_6245_5 already exists on this machine; not re-adding it
25/03/25 08:01:17 WARN BlockManager: Block rdd_6261_4 already exists on this machine; not re-adding it
                                                                                

In [44]:
strong_component_node_df.groupBy("component").count().orderBy(F.col("count").desc()).limit(20).toPandas()

                                                                                

Unnamed: 0,component,count
0,42949673589,19
1,8589934622,14
2,146028888392,5
3,274877907107,4
4,60129542384,4
5,249108103608,4
6,68719476888,3
7,154618822779,3
8,111669149904,3
9,446676599104,3


### Network Motif Finding

A network motif is a pattern in a network that is over or under abundant compared to a random graph model of your network :)

In [34]:
# G5: Divergent Triangles
paths = g.find("(a)-[e1]->(b); (a)-[e2]->(c); (c)-[e3]->(b)")

paths.show()



+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                  e1|                   b|                  e2|                   c|                  e3|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{61ca9985-f61d-49...|{61ca9985-f61d-49...|{e72170f9-8c4d-4a...|{61ca9985-f61d-49...|{07138d15-965f-4f...|{07138d15-965f-4f...|
|{0ca0fcd6-5658-47...|{0ca0fcd6-5658-47...|{e72170f9-8c4d-4a...|{0ca0fcd6-5658-47...|{07138d15-965f-4f...|{07138d15-965f-4f...|
|{510b3597-50fa-44...|{510b3597-50fa-44...|{6a5a8f11-011d-45...|{510b3597-50fa-44...|{077183d3-c8a7-42...|{077183d3-c8a7-42...|
|{73468dd7-c49c-4c...|{73468dd7-c49c-4c...|{5efe239d-6add-4f...|{73468dd7-c49c-4c...|{0b24dc87-b9ef-4a...|{0b24dc87-b9ef-4a...|
|{0ca0fcd6-5658-47...|{0ca0fcd6-5658-47...|{1cc99853-02ab-4f...|{0ca0fcd6-5658-47...|{0ba2d12b-40fa-4e..

                                                                                

In [35]:
graphlet_type_df = paths.select(
    F.col("a.Type").alias("A_Type"),
    F.col("e1.relationship").alias("(a)-[e1]->(b)"),
    F.col("b.Type").alias("B_Type"),
    F.col("e2.relationship").alias("(a)-[e2]->(c)"),
    F.col("c.Type").alias("C_Type"),
    F.col("e3.relationship").alias("(c)-[e3]->(b)"),
)

graphlet_count_df = (
    graphlet_type_df.groupby(
        "A_Type", "(a)-[e1]->(b)", "B_Type", "(a)-[e2]->(c)", "C_Type", "(c)-[e3]->(b)"
    )
    .count()
    .orderBy(F.col("count").desc())
    # Add a comma formatted column for display
    .withColumn("count", F.format_number(F.col("count"), 0))
)
graphlet_count_df.show()

+--------+-------------+--------+-------------+--------+-------------+-----+
|  A_Type|(a)-[e1]->(b)|  B_Type|(a)-[e2]->(c)|  C_Type|(c)-[e3]->(b)|count|
+--------+-------------+--------+-------------+--------+-------------+-----+
|     Tag|         Tags|Question|         Tags|Question|        Links|1,775|
|    User|         Asks|Question|        Posts|  Answer|      Answers|  274|
|Question|        Links|Question|        Links|Question|        Links|  236|
|     Tag|         Tags|Question|         Tags|Question|   Duplicates|  140|
|    User|         Asks|Question|         Asks|Question|        Links|  103|
|Question|        Links|Question|        Links|Question|   Duplicates|   14|
|Question|   Duplicates|Question|        Links|Question|        Links|   13|
|Question|        Links|Question|   Duplicates|Question|        Links|   12|
|    User|         Asks|Question|         Asks|Question|   Duplicates|    8|
|Question|   Duplicates|Question|        Links|Question|   Duplicates|    8|

### Motif Path Interpretation

1. `(Tag)-[Tags]->(Question B); (Tag)-[Tags]->(Question C); (Question C)-[Links]->(Question B)` - “A tag is used on a question, that tag is used on another question, and the two questions are linked.” It makes sense that questions sharing tags are often linked.

2. `(User)-[Asks]->(Question B); (User)-[Posts]->(Answer C); (Answer C)-[Answers]->(Question B)` - or “A user answers their own question.”

3. A triangle of linked questions.

4. `(Tag)-[Tags]->(Question B); (Tag)-[Tags]->(Question C); (Question B)-[Duplicates]->(Question C)` - or “A tag appears for a pair of duplicate answers.”

5. A user asks linked questions.

#### For More Motif Work - check out the [Network Motif Finding Tutorial](https://graphframes.github.io/graphframes/docs/_site/motif-tutorial.html)

### Filtering to get Subgraphs

In [37]:
from graphframes.examples import Graphs

g_friends = Graphs(spark).friends()

In [38]:
g1_friends = g_friends.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

### A Complex Filter to Build a Subgraph

In [39]:
# Select subgraph based on edges "e" of type "follow"
# pointing from a younger user "a" to an older user "b"
paths = g_friends.find("(a)-[e]->(b)")\
  .filter("e.relationship = 'follow'")\
  .filter("a.age < b.age")

# "paths" contains vertex info. Extract the edges
e2 = paths.select("e.src", "e.dst", "e.relationship")

# In Spark 1.5+, the user may simplify this call
# val e2 = paths.select("e.*")
# Construct the subgraph
g2 = GraphFrame(g_friends.vertices, e2)

### Breadth First Search

In [40]:
# Search from "Esther" for users of age < 32
paths = g_friends.bfs("name = 'Esther'", "age < 32")
paths.show()

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.
+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|
+---------------+--------------+--------------+



In [45]:
# Specify edge filters or max path lengths
g_friends.bfs("name = 'Esther'", "age < 32",\
  edgeFilter="relationship != 'friend'", maxPathLength=3).toPandas()

Unnamed: 0,from,e0,v1,e1,to
0,"(e, Esther, 32)","(e, f, follow)","(f, Fanny, 36)","(f, c, follow)","(c, Charlie, 30)"


### Label Propagation

In [53]:
result = g_friends.labelPropagation(maxIter=5)
result.select("id", "label").show()

25/03/25 08:19:07 WARN BlockManager: Block rdd_7856_0 already exists on this machine; not re-adding it


+---+-------------+
| id|        label|
+---+-------------+
|  f|1460288880640|
|  e|1382979469312|
|  d|1460288880640|
|  c|1382979469312|
|  b|1047972020224|
|  a|1382979469312|
+---+-------------+



### Triangle Counts

Counting the triangles a node participates in helps compute its local clustering coefficient - which measures how embedded it is in the surrounding network.

In [72]:
results = g.triangleCount()
results.select("id", "count").show()

25/03/25 08:34:30 WARN CacheManager: Asked to cache already cached data.
                                                                                9]]]

+--------------------+-----+
|                  id|count|
+--------------------+-----+
|1d9f8360-580e-457...|    3|
|ac563ff8-d787-484...|    1|
|e1ed7468-f6c1-424...|    0|
|713c6cd6-b3b7-4f1...|    3|
|39beb7dc-6c4b-4d6...|    0|
|8bcef202-e698-49c...|    2|
|ac110948-11c8-43b...|    0|
|a14b44eb-5a1c-412...|   11|
|72151c16-1aa8-452...|    0|
|2c23cf34-61ac-407...|    3|
|c5782a2b-04ad-497...|    0|
|4da74ba2-ad76-4cd...|    2|
|e9ce1087-db4c-48e...|    0|
|13010ce4-7e07-4fe...|    1|
|6d2269e4-3a21-490...|    0|
|c89382be-270d-4f8...|    2|
|6539d85d-d474-43d...|    6|
|5b400a95-2ce7-4d5...|    0|
|b4047440-0be5-406...|   22|
|cd361cc9-4281-455...|    0|
+--------------------+-----+
only showing top 20 rows



### Pregel in GraphFrames - Message passing via AggregateMessages

GraphFrames contains the highly scalable Pregel system, which is Bulk Synchronous Parallel (BSP). It passes messages from each and every node once in a cycle, before repeating this operation. Nodes can transmit messages and summarize their combined inputs to do so. This is a lot like the way Graph Neural Network (GNN) message passing works.

In [74]:
from graphframes.lib import AggregateMessages as AM
from graphframes.examples import Graphs
from pyspark.sql.functions import sum as sqlsum


g = Graphs(spark).friends()  # Get example graph

# For each user, sum the ages of the adjacent users
msgToSrc = AM.dst["age"]
msgToDst = AM.src["age"]
agg = g.aggregateMessages(
    sqlsum(AM.msg).alias("summedAges"),
    sendToSrc=msgToSrc,
    sendToDst=msgToDst)
agg.show()

+---+----------+
| id|summedAges|
+---+----------+
|  f|        62|
|  e|        65|
|  d|        66|
|  c|       108|
|  b|        94|
|  a|        65|
+---+----------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
+---+---+------------+



## Mercury Graph

Another library that you can use to build networks in PySpark. Contains a Node2Vec, Louvain and Spectral Clustering algorithms... I really want to add `struc2vec` as well.

In [55]:
from pyspark.ml.functions import vector_to_array
from mercury.graph.core import Graph  # type: ignore
from mercury.graph.embeddings import SparkNode2Vec  # type: ignore


DIMENSIONS = 16

In [56]:
# Compute Node2Vec embeddings - Build the Graph.
gm = Graph(
    data=edge_df.withColumn("weight", F.lit(1.0)),
    nodes=vertex_df,
)

n2v = SparkNode2Vec(dimension=DIMENSIONS, num_paths_per_node=4, num_epochs=3, w2v_min_count=1)
n2v.fit(gm)

# Convert the VectorUDF to an ArrayType of DoubleTypes
signal_df: DataFrame = n2v.embedding().select(
    F.col("word").alias("id"), vector_to_array(F.col("vector")).alias("node2vec")
)

                                                                                