In [60]:
sc.addPyFile("/home/bigbenchung/spark-3.3.2-bin-hadoop3/jars/graphframes-0.8.2-spark3.0-s_2.12.jar")

24/05/19 22:08:26 WARN SparkContext: The path /home/bigbenchung/spark-3.3.2-bin-hadoop3/jars/graphframes-0.8.2-spark3.0-s_2.12.jar has been added already. Overwriting of added paths is not supported in the current version.


In [61]:
from graphframes import *
from pyspark.sql.functions import *

In [62]:
# Vertics DataFrame
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 37|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 38|
|  g|  Gabby| 60|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
|  g|  e|      follow|
+---+---+------------+



In [63]:
# Starting vertex is 'a'
layers = [g.vertices.select('id').where("id = 'a'")]
visited =  layers[0]

while layers[-1].count() > 0:
    # From the current layer, get all the one-hop neighbors
    d1 = layers[-1].join(g.edges, layers[-1]['id'] == g.edges['src'])
    # Rename the column as 'id', and remove visited verices and duplicates
    d2 = d1.select(d1['dst'].alias('id')) \
           .subtract(visited).distinct().cache()
    layers += [d2]
    visited = visited.union(layers[-1]).cache()

                                                                                

In [64]:
two_hop_neighbours = layers[1].union(layers[2]).cache()

In [65]:
dst_a = g.edges.filter(f"dst = 'a'")
dst_a_list = dst_a.select("src").rdd.flatMap(lambda x: x).collect()

two_hop_neighbours.rdd.filter(lambda row: row["id"] not in dst_a_list).collect()

                                                                                

[Row(id='e'), Row(id='b'), Row(id='f'), Row(id='c')]

In [66]:
g.edges.filter("dst = 'c' and relationship = 'follow'").show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  f|  c|      follow|
+---+---+------------+



In [67]:
g.edges.filter("relationship = 'follow'").groupBy('dst').count().filter("count >= 2").show()

# myInDegrees = g.edges.groupBy('dst').count()\
#                .withColumnRenamed('dst', 'id').withColumnRenamed('count', 'inDegree')

+---+-----+
|dst|count|
+---+-----+
|  c|    2|
+---+-----+



In [68]:
oneway = g.find("(a)-[]->(b); (b)-[]->(c); !(c)-[]->(a)")

In [69]:
oneway.show()

+----------------+----------------+----------------+
|               a|               b|               c|
+----------------+----------------+----------------+
|  {a, Alice, 34}| {e, Esther, 32}|  {f, Fanny, 38}|
|{c, Charlie, 37}|    {b, Bob, 36}|{c, Charlie, 37}|
|  {g, Gabby, 60}| {e, Esther, 32}|  {f, Fanny, 38}|
|  {d, David, 29}|  {a, Alice, 34}|    {b, Bob, 36}|
| {e, Esther, 32}|  {f, Fanny, 38}|{c, Charlie, 37}|
|  {f, Fanny, 38}|{c, Charlie, 37}|    {b, Bob, 36}|
|    {b, Bob, 36}|{c, Charlie, 37}|    {b, Bob, 36}|
|  {a, Alice, 34}|    {b, Bob, 36}|{c, Charlie, 37}|
|  {g, Gabby, 60}| {e, Esther, 32}|  {d, David, 29}|
+----------------+----------------+----------------+



In [70]:
# g.vertices and g.edges are just DataFrames
# You can use any DataFrame API on them

g.edges.filter("src = 'a'").show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [71]:
g.edges.filter("src = 'a'").count()

2

In [72]:
# Count the number of followers of c.
# This queries the edge DataFrame.
print(g.edges.filter("relationship = 'follow' and dst = 'c'").count())

2


In [73]:
# A GraphFrame has additional attributes

g.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|  a|        2|
|  c|        1|
|  b|        1|
|  f|        1|
|  e|        2|
|  d|        1|
|  g|        1|
+---+---------+



In [74]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  b|       2|
|  c|       2|
|  f|       1|
|  d|       1|
|  a|       1|
|  e|       2|
+---+--------+



In [75]:
g.inDegrees.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dst#13695], functions=[count(1)])
   +- Exchange hashpartitioning(dst#13695, 200), ENSURE_REQUIREMENTS, [plan_id=18741]
      +- HashAggregate(keys=[dst#13695], functions=[partial_count(1)])
         +- Project [dst#13695]
            +- Scan ExistingRDD[src#13694,dst#13695,relationship#13696]




In [76]:
myInDegrees = g.edges.groupBy('dst').count()\
               .withColumnRenamed('dst', 'id').withColumnRenamed('count', 'inDegree')
myInDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  b|       2|
|  c|       2|
|  f|       1|
|  d|       1|
|  a|       1|
|  e|       2|
+---+--------+



In [77]:
myInDegrees.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dst#13695], functions=[count(1)])
   +- Exchange hashpartitioning(dst#13695, 200), ENSURE_REQUIREMENTS, [plan_id=18800]
      +- HashAggregate(keys=[dst#13695], functions=[partial_count(1)])
         +- Project [dst#13695]
            +- Scan ExistingRDD[src#13694,dst#13695,relationship#13696]




In [78]:
print(g.inDegrees.storageLevel)

Serialized 1x Replicated


In [79]:
g.inDegrees.cache()

DataFrame[id: string, inDegree: int]

In [80]:
print(g.inDegrees.storageLevel)

Disk Memory Deserialized 1x Replicated


In [81]:
print(g.vertices.storageLevel)

Serialized 1x Replicated


In [82]:
g.cache()

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

In [83]:
print(g.vertices.storageLevel)
print(g.edges.storageLevel)

Disk Memory Deserialized 1x Replicated
Disk Memory Deserialized 1x Replicated


In [84]:
# A triplet view of the graph

g.triplets.show()

+----------------+--------------+----------------+
|             src|          edge|             dst|
+----------------+--------------+----------------+
|  {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|
|{c, Charlie, 37}|{c, b, follow}|    {b, Bob, 36}|
|  {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|
|  {f, Fanny, 38}|{f, c, follow}|{c, Charlie, 37}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
| {e, Esther, 32}|{e, d, friend}|  {d, David, 29}|
|  {g, Gabby, 60}|{g, e, follow}| {e, Esther, 32}|
|  {a, Alice, 34}|{a, e, friend}| {e, Esther, 32}|
| {e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|
+----------------+--------------+----------------+



In [85]:
g.triplets.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [src#14565, edge#14563, dst#14567]
   +- BroadcastHashJoin [edge#14563.dst], [dst#14567.id], Inner, BuildRight, false
      :- BroadcastHashJoin [edge#14563.src], [src#14565.id], Inner, BuildRight, false
      :  :- Project [struct(src, src#13694, dst, dst#13695, relationship, relationship#13696) AS edge#14563]
      :  :  +- Filter (isnotnull(src#13694) AND isnotnull(dst#13695))
      :  :     +- InMemoryTableScan [dst#13695, relationship#13696, src#13694], [isnotnull(src#13694), isnotnull(dst#13695)]
      :  :           +- InMemoryRelation [src#13694, dst#13695, relationship#13696], StorageLevel(disk, memory, deserialized, 1 replicas)
      :  :                 +- *(1) Scan ExistingRDD[src#13694,dst#13695,relationship#13696]
      :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id),false), [plan_id=19150]
      :     +- Project [struct(id, id#13688, n

### Motif Finding

In [86]:
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[]->(b); (b)-[]->(a)").filter('a.id < b.id')
motifs.show()

+------------+----------------+
|           a|               b|
+------------+----------------+
|{b, Bob, 36}|{c, Charlie, 37}|
+------------+----------------+



In [87]:
# Find triangles

triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")
triangles = triangles.filter("a.id < b.id AND a.id < c.id")
triangles.show()

+--------------+---------------+--------------+
|             a|              b|             c|
+--------------+---------------+--------------+
|{a, Alice, 34}|{e, Esther, 32}|{d, David, 29}|
+--------------+---------------+--------------+



In [88]:
triangles.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#15309, b#15311, c#15334]
   +- BroadcastHashJoin [c#15334.id, a#15309.id], [_extract_src#16026, _extract_dst#16027], Inner, BuildRight, false
      :- Project [a#15309, b#15311, c#15334]
      :  +- BroadcastHashJoin [_extract_dst#16029], [c#15334.id], Inner, BuildRight, (a#15309.id < c#15334.id), false
      :     :- Project [a#15309, b#15311, _extract_dst#16029]
      :     :  +- BroadcastHashJoin [b#15311.id], [_extract_src#16030], Inner, BuildRight, false
      :     :     :- Project [a#15309, b#15311]
      :     :     :  +- BroadcastHashJoin [_extract_dst#16032], [b#15311.id], Inner, BuildRight, (a#15309.id < b#15311.id), false
      :     :     :     :- Project [_extract_dst#16032, a#15309]
      :     :     :     :  +- BroadcastHashJoin [_extract_src#16033], [a#15309.id], Inner, BuildLeft, false
      :     :     :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false),

In [89]:
# Negation
oneway = g.find("(a)-[]->(b); !(b)-[]->(a)")
oneway.show()

+---------------+----------------+
|              a|               b|
+---------------+----------------+
| {a, Alice, 34}| {e, Esther, 32}|
|{e, Esther, 32}|  {d, David, 29}|
| {a, Alice, 34}|    {b, Bob, 36}|
| {g, Gabby, 60}| {e, Esther, 32}|
|{e, Esther, 32}|  {f, Fanny, 38}|
| {f, Fanny, 38}|{c, Charlie, 37}|
| {d, David, 29}|  {a, Alice, 34}|
+---------------+----------------+



In [90]:
oneway.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16126, b#16128], functions=[])
   +- Exchange hashpartitioning(a#16126, b#16128, 200), ENSURE_REQUIREMENTS, [plan_id=20780]
      +- HashAggregate(keys=[a#16126, b#16128], functions=[])
         +- SortMergeJoin [coalesce(a#16126, [,,0]), isnull(a#16126), coalesce(b#16128, [,,0]), isnull(b#16128)], [coalesce(a#16175, [,,0]), isnull(a#16175), coalesce(b#16176, [,,0]), isnull(b#16176)], LeftAnti
            :- Sort [coalesce(a#16126, [,,0]) ASC NULLS FIRST, isnull(a#16126) ASC NULLS FIRST, coalesce(b#16128, [,,0]) ASC NULLS FIRST, isnull(b#16128) ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(coalesce(a#16126, [,,0]), isnull(a#16126), coalesce(b#16128, [,,0]), isnull(b#16128), 200), ENSURE_REQUIREMENTS, [plan_id=20773]
            :     +- Project [a#16126, b#16128]
            :        +- BroadcastHashJoin [_extract_dst#16934], [b#16128.id], Inner, BuildRight, false
            :    

In [91]:
# Negation
oneway = g.find("(a)-[]->(b); (b)-[]->(c); !(c)-[]->(a)")
oneway.show()

+----------------+----------------+----------------+
|               a|               b|               c|
+----------------+----------------+----------------+
|  {a, Alice, 34}| {e, Esther, 32}|  {f, Fanny, 38}|
|{c, Charlie, 37}|    {b, Bob, 36}|{c, Charlie, 37}|
|  {g, Gabby, 60}| {e, Esther, 32}|  {f, Fanny, 38}|
|  {d, David, 29}|  {a, Alice, 34}|    {b, Bob, 36}|
| {e, Esther, 32}|  {f, Fanny, 38}|{c, Charlie, 37}|
|  {f, Fanny, 38}|{c, Charlie, 37}|    {b, Bob, 36}|
|    {b, Bob, 36}|{c, Charlie, 37}|    {b, Bob, 36}|
|  {a, Alice, 34}|    {b, Bob, 36}|{c, Charlie, 37}|
|  {g, Gabby, 60}| {e, Esther, 32}|  {d, David, 29}|
+----------------+----------------+----------------+



In [92]:
# Find vertices without incoming edges:
single = g.find("!()-[]->(a)")
single.show()

+--------------+
|             a|
+--------------+
|{g, Gabby, 60}|
+--------------+



In [93]:
# More meaningful queries can be expressed by applying filters.
# Question: where is this filter applied?

g.find("(a)-[e]->(b); (b)-[]->(a)").filter("b.age > 36").show()

+------------+--------------+----------------+
|           a|             e|               b|
+------------+--------------+----------------+
|{b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
+------------+--------------+----------------+



In [94]:
g.find("(a)-[]->(b); (b)-[]->(a)").filter("b.age > 36").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#18595, b#18597]
   +- BroadcastHashJoin [b#18597.id, a#18595.id], [_extract_src#18758, _extract_dst#18759], Inner, BuildRight, false
      :- Project [a#18595, b#18597]
      :  +- BroadcastHashJoin [_extract_dst#18761], [b#18597.id], Inner, BuildRight, false
      :     :- Project [_extract_dst#18761, a#18595]
      :     :  +- BroadcastHashJoin [_extract_src#18762], [a#18595.id], Inner, BuildLeft, false
      :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [plan_id=22520]
      :     :     :  +- Project [dst#13695 AS _extract_dst#18761, src#13694 AS _extract_src#18762]
      :     :     :     +- Filter (isnotnull(src#13694) AND isnotnull(dst#13695))
      :     :     :        +- InMemoryTableScan [dst#13695, src#13694], [isnotnull(src#13694), isnotnull(dst#13695)]
      :     :     :              +- InMemoryRelation [src#13694, dst#13695, relationship#13696], Storag

In [95]:
# Find chains of 4 vertices such that at least 2 of the 3 edges are "friend" relationships.
# The when function is similar to the CASE WHEN in SQL

chain4 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)").where('a!=d AND a!=c AND b!=d')
chain4.show()

friendTo1 = lambda e: when(e['relationship'] == 'friend', 1).otherwise(0)

chain4.select('*',friendTo1(chain4['e1']).alias('f1'), \
                  friendTo1(chain4['e2']).alias('f2'), \
                  friendTo1(chain4['e3']).alias('f3')) \
      .where('f1 + f2 + f3 >= 2').select('a', 'b', 'c', 'd').show()

+---------------+--------------+---------------+--------------+----------------+--------------+----------------+
|              a|            e1|              b|            e2|               c|            e3|               d|
+---------------+--------------+---------------+--------------+----------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}| {f, Fanny, 38}|{f, c, follow}|{c, Charlie, 37}|{c, b, follow}|    {b, Bob, 36}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}| {e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|{f, c, follow}|{c, Charlie, 37}|
| {g, Gabby, 60}|{g, e, follow}|{e, Esther, 32}|{e, d, friend}|  {d, David, 29}|{d, a, friend}| 

In [96]:
# Find all people who follow Charlie
g.find("(a)-[e]->(b)").filter("e.relationship = 'follow' AND b.name = 'Charlie'").select('a.name').show()

+-----+
| name|
+-----+
|  Bob|
|Fanny|
+-----+



In [97]:
# Find Alice's two-hop neighbors' names, regardless of the edge type
g.find('(a)-[]->(b); (b)-[]->(c)').filter('a.name="Alice"').select('c.name').show()

+-------+
|   name|
+-------+
|Charlie|
|  David|
|  Fanny|
+-------+



In [98]:
# Alice's two-hop neighbors excluding those who have an edge back to Alice.
g.find('(a)-[]->(b); (b)-[]->(c); !(c)-[]->(a)').filter('a.name="Alice"').select('c.name').show()

+-------+
|   name|
+-------+
|  Fanny|
|Charlie|
+-------+



In [99]:
# all people who are being followed by at least 2 people
g.find('()-[e]->(a)').filter('e.relationship="follow"').groupBy('a').count().filter('count>=2').select('a.name').show()

+-------+
|   name|
+-------+
|Charlie|
+-------+



### Subgraphs

In [100]:
# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).

g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'") \
      .dropIsolatedVertices()

g1.vertices.show()
g1.edges.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  a| Alice| 34|
|  b|   Bob| 36|
|  e|Esther| 32|
+---+------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [101]:
# Select subgraph based on edges "e" of type "follow"
# pointing from a younger user "a" to an older user "b".

paths = g.find("(a)-[e]->(b)")\
  .filter("e.relationship = 'follow'")\
  .filter("a.age < b.age")

paths.show()
# "paths" contains vertex info. Extract the edges.

e2 = paths.select("e.*")
e2.show()

# Construct the subgraph
g2 = GraphFrame(g.vertices, e2).dropIsolatedVertices()

g2.vertices.show()
g2.edges.show()

+---------------+--------------+----------------+
|              a|             e|               b|
+---------------+--------------+----------------+
|   {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
|{e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|
+---------------+--------------+----------------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  e|  f|      follow|
+---+---+------------+

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  b|    Bob| 36|
|  c|Charlie| 37|
|  e| Esther| 32|
|  f|  Fanny| 38|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  e|  f|      follow|
+---+---+------------+



### BFS

In [102]:
# Starting vertex is 'a'
layers = [g.vertices.select('id').where("id = 'a'")]
visited =  layers[0]

while layers[-1].count() > 0:
    # From the current layer, get all the one-hop neighbors
    d1 = layers[-1].join(g.edges, layers[-1]['id'] == g.edges['src'])
    # Rename the column as 'id', and remove visited verices and duplicates
    d2 = d1.select(d1['dst'].alias('id')) \
           .subtract(visited).distinct().cache()
    layers += [d2]
    visited = visited.union(layers[-1]).cache()

24/05/19 22:11:33 WARN CacheManager: Asked to cache already cached data.
24/05/19 22:11:33 WARN CacheManager: Asked to cache already cached data.


                                                                                

24/05/19 22:11:34 WARN CacheManager: Asked to cache already cached data.
24/05/19 22:11:34 WARN CacheManager: Asked to cache already cached data.


                                                                                

24/05/19 22:11:35 WARN CacheManager: Asked to cache already cached data.
24/05/19 22:11:35 WARN CacheManager: Asked to cache already cached data.


                                                                                

In [103]:
layers[0].show()

+---+
| id|
+---+
|  a|
+---+



In [104]:
layers[1].show()

+---+
| id|
+---+
|  e|
|  b|
+---+



In [105]:
layers[2].show()

+---+
| id|
+---+
|  f|
|  d|
|  c|
+---+



In [106]:
layers[3].show()

+---+
| id|
+---+
+---+



In [107]:
# GraphFrames provides own BFS:

paths = g.bfs("id = 'a'", "id='f'")
paths.show()

+--------------+--------------+---------------+--------------+--------------+
|          from|            e0|             v1|            e1|            to|
+--------------+--------------+---------------+--------------+--------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 38}|
+--------------+--------------+---------------+--------------+--------------+



### List Ranking

In [108]:
# -1 denotes end of list
data = [(0, 5), (1, 0), (3, 4), (4, 6), (5, -1), (6,1)]
e = spark.createDataFrame(data, ['src', 'dst'])
v = e.select(col('src').alias('id'), when(e.dst == -1, 0).otherwise(1).alias('d'))
v1 = spark.createDataFrame([(-1, 0)], ['id', 'd'])
v = v.union(v1)
v.show()
e.show()

+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  1|
|  3|  1|
|  4|  1|
|  5|  0|
|  6|  1|
| -1|  0|
+---+---+

+---+---+
|src|dst|
+---+---+
|  0|  5|
|  1|  0|
|  3|  4|
|  4|  6|
|  5| -1|
|  6|  1|
+---+---+



In [109]:
while e.filter('dst != -1').count() > 0:
    g = GraphFrame(v, e)
    g.cache()
    v = g.triplets.select(col('src.id').alias('id'), 
                          (col('src.d') + col('dst.d')).alias('d')) \
         .union(v1)
    e = g.find('(a)-[]->(b); (b)-[]->(c)') \
         .select(col('a.id').alias('src'), col('c.id').alias('dst')) \
         .union(e.filter('dst = -1'))
    e.show()
v.show()



+---+---+
|src|dst|
+---+---+
|  0| -1|
|  6|  0|
|  4|  1|
|  1|  5|
|  3|  6|
|  5| -1|
+---+---+



                                                                                

+---+---+
|src|dst|
+---+---+
|  6| -1|
|  1| -1|
|  3|  0|
|  4|  5|
|  0| -1|
|  5| -1|
+---+---+



                                                                                ]

+---+---+
|src|dst|
+---+---+
|  3| -1|
|  4| -1|
|  6| -1|
|  1| -1|
|  0| -1|
|  5| -1|
+---+---+



                                                                                

+---+---+
| id|  d|
+---+---+
|  1|  2|
|  4|  4|
|  0|  1|
|  6|  3|
|  3|  5|
|  5|  0|
| -1|  0|
+---+---+



### Message passing via AggregateMessages

In [111]:
from pyspark.sql.functions import coalesce, col, lit, sum, when, min, max
from graphframes.lib import AggregateMessages as AM

# AggregateMessages has the following members: src, dst, edge, msg
# For each user, sum the ages of the adjacent users.
agg = g.aggregateMessages(
    sum(AM.msg).alias("summedAges"),
#     sendToSrc = AM.dst['age'],
    sendToDst = AM.src['age']
)
agg.show()

AnalysisException: No such struct field age in id, d

### The Pregel Model for Graph Computation

In [56]:
# Pagerank in the Pregel model 

from pyspark.sql.functions import coalesce, col, lit, sum, when, min
from graphframes.lib import Pregel

# Need to set up a directory for Pregel computation
sc.setCheckpointDir("checkpoint")

'''
Use builder pattern to describe the operations.
Call run() to start a run. It returns a DataFrame of vertices from the last iteration.

When a run starts, it expands the vertices DataFrame using column expressions 
defined by withVertexColumn(). Those additional vertex properties can be 
changed during Pregel iterations. In each Pregel iteration, there are three 
phases:

* Given each edge triplet, generate messages and specify target vertices to 
  send, described by sendMsgToDst() and sendMsgToSrc().
* Aggregate messages by target vertex IDs, described by aggMsgs().
* Update additional vertex properties based on aggregated messages and states 
  from previous iteration, described by withVertexColumn().
'''
v = g.outDegrees
g = GraphFrame(v,e)
ranks = g.pregel \
        .setMaxIter(5) \
        .withVertexColumn("rank", lit(1.0), \
            coalesce(Pregel.msg(), lit(0.0)) * lit(0.85) + lit(0.15)) \
        .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \
        .aggMsgs(sum(Pregel.msg())) \
        .run()
ranks.show()

# pyspark.sql.functions.coalesce(*cols): Returns the first column that is not null.
# Not to be confused with spark.sql.coalesce(numPartitions)


ModuleNotFoundError: No module named 'graphframes.lib.Pregel'

In [54]:
# BFS in the Pregel model

g = GraphFrame(v,e)

dist = g.pregel \
        .withVertexColumn('d', when(v['id'] == 'a', 0).otherwise(99999), \
            when(Pregel.msg() < col('d'), Pregel.msg()).otherwise(col('d'))) \
        .withVertexColumn('active', when(v['id'] == 'a', True).otherwise(False), \
            when(Pregel.msg() < col('d'), True).otherwise(False)) \
        .sendMsgToDst(when(Pregel.src('active'), Pregel.src('d') + 1)) \
        .aggMsgs(min(Pregel.msg())) \
        .run()
dist.show()


AttributeError: 'GraphFrame' object has no attribute 'pregel'