In [None]:
sc.addPyFile("/csproject/msbd5003/jars/graphframes-0.8.2-spark3.0-s_2.12.jar")

In [1]:
!pip install graphframes

Collecting graphframes
  Using cached graphframes-0.6-py2.py3-none-any.whl (18 kB)
Installing collected packages: graphframes
Successfully installed graphframes-0.6


In [2]:
from graphframes import *
from pyspark.sql.functions import *

In [3]:
# Vertics DataFrame
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 37),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 38),
  ("g", "Gabby", 60)
], ["id", "name", "age"])

# Edges DataFrame(directed edges)
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("g", "e", "follow")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

                                                                                

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 37|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 38|
|  g|  Gabby| 60|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
|  g|  e|      follow|
+---+---+------------+



In [5]:
myInDegrees = g.edges.filter("relationship='follow'").groupBy('dst').count()\
    .withColumnRenamed('dst', 'id')
myInDegrees.show()
myInDegrees.join(g.vertices,on = 'id').filter("count>=2").select("name").show()

+---+-----+
| id|count|
+---+-----+
|  c|    2|
|  b|    1|
|  f|    1|
|  e|    1|
+---+-----+

+-------+
|   name|
+-------+
|Charlie|
+-------+



In [8]:
# g.vertices and g.edges are just DataFrames
# You can use any DataFrame API on them

g.edges.filter("src = 'a'").show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  e|      friend|
+---+---+------------+



In [None]:
g.edges.filter("src = 'a'").count()

2

In [25]:
# Count the number of followers of c.
# This queries the edge DataFrame.
print(g.edges.filter("relationship = 'follow' and dst = 'c'").count())
g.edges.filter("relationship = 'follow' and dst = 'c'").show()

2
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  f|  c|      follow|
+---+---+------------+



In [29]:
g.edges.filter("relationship = 'follow' and dst = 'c'")

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  b|  c|      follow|
|  f|  c|      follow|
+---+---+------------+



In [None]:
# A GraphFrame has additional attributes

g.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|  g|        1|
|  f|        1|
|  e|        2|
|  d|        1|
|  c|        1|
|  b|        1|
|  a|        2|
+---+---------+



In [38]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  b|       2|
|  c|       2|
|  f|       1|
|  d|       1|
|  a|       1|
|  e|       2|
+---+--------+



In [None]:
g.inDegrees.explain()#doesn't show node with indegree of 0; since it use group by 

== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#171]
   +- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
      +- *(1) Project [dst#45]
         +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]




In [72]:
#Find all people who are being followed by at least 2 people.
myInDegrees = g.edges.filter("relationship='follow'").groupBy('dst').count()\
    .withColumnRenamed('dst', 'id')
myInDegrees.join(g.vertices,on = 'id').filter("count>=2").select("name").show()

+-------+
|   name|
+-------+
|Charlie|
+-------+



In [None]:
myInDegrees = g.edges.groupBy('dst').count()\
               .withColumnRenamed('dst', 'id').withColumnRenamed('count', 'inDegree')
myInDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       2|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+



In [None]:
myInDegrees.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[dst#45], functions=[count(1)])
+- Exchange hashpartitioning(dst#45, 200), true, [id=#218]
   +- *(1) HashAggregate(keys=[dst#45], functions=[partial_count(1)])
      +- *(1) Project [dst#45]
         +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]




In [None]:
print(g.inDegrees.storageLevel)

Serialized 1x Replicated


In [None]:
g.inDegrees.cache()

DataFrame[id: string, inDegree: int]

In [None]:
print(g.inDegrees.storageLevel)#store it in memory, if not enough, store it in disk

Disk Memory Deserialized 1x Replicated


In [None]:
print(g.vertices.storageLevel)

Serialized 1x Replicated


In [None]:
g.cache()#build this thing in a modular way, 

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

In [None]:
print(g.vertices.storageLevel)
print(g.edges.storageLevel)

Disk Memory Deserialized 1x Replicated
Disk Memory Deserialized 1x Replicated


In [None]:
# A triplet view of the graph

g.triplets.show() #complete picture for graph



+----------------+--------------+----------------+
|             src|          edge|             dst|
+----------------+--------------+----------------+
|  {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|
|{c, Charlie, 37}|{c, b, follow}|    {b, Bob, 36}|
|  {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|
|  {f, Fanny, 38}|{f, c, follow}|{c, Charlie, 37}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
| {e, Esther, 32}|{e, d, friend}|  {d, David, 29}|
|  {g, Gabby, 60}|{g, e, follow}| {e, Esther, 32}|
|  {a, Alice, 34}|{a, e, friend}| {e, Esther, 32}|
| {e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|
+----------------+--------------+----------------+



In [None]:
g.triplets.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [src#53, edge#51, dst#55]
   +- SortMergeJoin [edge#51.dst], [dst#55.id], Inner
      :- Sort [edge#51.dst ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(edge#51.dst, 200), ENSURE_REQUIREMENTS, [plan_id=342]
      :     +- SortMergeJoin [edge#51.src], [src#53.id], Inner
      :        :- Sort [edge#51.src ASC NULLS FIRST], false, 0
      :        :  +- Exchange hashpartitioning(edge#51.src, 200), ENSURE_REQUIREMENTS, [plan_id=335]
      :        :     +- Project [struct(src, src#6, dst, dst#7, relationship, relationship#8) AS edge#51]
      :        :        +- Filter (isnotnull(src#6) AND isnotnull(dst#7))
      :        :           +- Scan ExistingRDD[src#6,dst#7,relationship#8]
      :        +- Sort [src#53.id ASC NULLS FIRST], false, 0
      :           +- Exchange hashpartitioning(src#53.id, 200), ENSURE_REQUIREMENTS, [plan_id=336]
      :              +- Project [struct(id, id#0, name, name#1, a

### Motif Finding

In [9]:
#the most powerful technique
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[]->(b); (b)-[]->(a)").filter('a.id < b.id')#omit variable inside [] same edge in diff direction
motifs.show()
#vertex can be same for defining src and dst, every edge in the pattern should be diff
# can use edge variable to represent edges in between,need to use diff variable to represent diff edges



+------------+----------------+
|           a|               b|
+------------+----------------+
|{b, Bob, 36}|{c, Charlie, 37}|
+------------+----------------+



In [13]:
# Find triangles
triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)")#two joins finding a triangles a->b->c(src)
triangles = triangles.filter("a.id < b.id AND a.id < c.id")#
triangles.show()

+--------------+---------------+--------------+
|             a|              b|             c|
+--------------+---------------+--------------+
|{a, Alice, 34}|{e, Esther, 32}|{d, David, 29}|
+--------------+---------------+--------------+



In [22]:
g.find("(a)-[]->(b);(b)-[]->(c)").filter("a.name=='Alice'").select('c.name').show()


+-------+
|   name|
+-------+
|Charlie|
|  David|
|  Fanny|
+-------+



In [None]:
triangles.explain()

== Physical Plan ==
*(6) Project [a#630, b#632, c#657]
+- *(6) BroadcastHashJoin [c#657.id, a#630.id], [__tmp-6526019406657860729#687.src, __tmp-6526019406657860729#687.dst], Inner, BuildRight
   :- *(6) Project [a#630, b#632, c#657]
   :  +- *(6) BroadcastHashJoin [__tmp-430217833014886237#655.dst], [c#657.id], Inner, BuildRight, (a#630.id < c#657.id)
   :     :- *(6) BroadcastHashJoin [b#632.id], [__tmp-430217833014886237#655.src], Inner, BuildRight
   :     :  :- *(6) Project [a#630, b#632]
   :     :  :  +- *(6) BroadcastHashJoin [__tmp-1043886091038848698#628.dst], [b#632.id], Inner, BuildRight, (a#630.id < b#632.id)
   :     :  :     :- *(6) BroadcastHashJoin [__tmp-1043886091038848698#628.src], [a#630.id], Inner, BuildRight
   :     :  :     :  :- *(6) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-1043886091038848698#628]
   :     :  :     :  :  +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :     :  :     :  :        +- InMemoryRel

In [None]:
# Negation(anti join)
oneway = g.find("(a)-[]->(b); !(b)-[]->(a)")
oneway.show()#only having edges from a to b, without edges from b to a

+---------------+----------------+
|              a|               b|
+---------------+----------------+
| {a, Alice, 34}| {e, Esther, 32}|
|{e, Esther, 32}|  {d, David, 29}|
| {a, Alice, 34}|    {b, Bob, 36}|
| {g, Gabby, 60}| {e, Esther, 32}|
|{e, Esther, 32}|  {f, Fanny, 38}|
| {f, Fanny, 38}|{c, Charlie, 37}|
| {d, David, 29}|  {a, Alice, 34}|
+---------------+----------------+



In [23]:
g.find("(a)-[]->(b);(b)-[]->(c);!(c)-[]->(a)").filter("a.name=='Alice'").select('c.name').show()

+-------+
|   name|
+-------+
|  Fanny|
|Charlie|
+-------+



In [None]:
oneway.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#361, b#363], functions=[])
   +- Exchange hashpartitioning(a#361, b#363, 200), ENSURE_REQUIREMENTS, [plan_id=5226]
      +- HashAggregate(keys=[a#361, b#363], functions=[])
         +- SortMergeJoin [coalesce(a#361, [,,0]), isnull(a#361), coalesce(b#363, [,,0]), isnull(b#363)], [coalesce(a#410, [,,0]), isnull(a#410), coalesce(b#411, [,,0]), isnull(b#411)], LeftAnti
            :- Sort [coalesce(a#361, [,,0]) ASC NULLS FIRST, isnull(a#361) ASC NULLS FIRST, coalesce(b#363, [,,0]) ASC NULLS FIRST, isnull(b#363) ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(coalesce(a#361, [,,0]), isnull(a#361), coalesce(b#363, [,,0]), isnull(b#363), 200), ENSURE_REQUIREMENTS, [plan_id=5219]
            :     +- Project [a#361, b#363]
            :        +- SortMergeJoin [_extract_dst#434], [b#363.id], Inner
            :           :- Sort [_extract_dst#434 ASC NULLS FIRST], false, 0
            :    

In [None]:
# Negation
oneway = g.find("(a)-[]->(b); (b)-[]->(c); !(c)-[]->(a)")
oneway.show()

+----------------+----------------+----------------+
|               a|               b|               c|
+----------------+----------------+----------------+
|  {a, Alice, 34}| {e, Esther, 32}|  {f, Fanny, 38}|
|{c, Charlie, 37}|    {b, Bob, 36}|{c, Charlie, 37}|
|  {g, Gabby, 60}| {e, Esther, 32}|  {f, Fanny, 38}|
|  {d, David, 29}|  {a, Alice, 34}|    {b, Bob, 36}|
| {e, Esther, 32}|  {f, Fanny, 38}|{c, Charlie, 37}|
|  {f, Fanny, 38}|{c, Charlie, 37}|    {b, Bob, 36}|
|    {b, Bob, 36}|{c, Charlie, 37}|    {b, Bob, 36}|
|  {a, Alice, 34}|    {b, Bob, 36}|{c, Charlie, 37}|
|  {g, Gabby, 60}| {e, Esther, 32}|  {d, David, 29}|
+----------------+----------------+----------------+



In [None]:
# Find vertices without incoming edges:
single = g.find("!()-[]->(a)")
single.show()

+--------------+
|             a|
+--------------+
|{g, Gabby, 60}|
+--------------+



In [None]:
single.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#464], functions=[])
   +- Exchange hashpartitioning(a#464, 200), ENSURE_REQUIREMENTS, [plan_id=6090]
      +- HashAggregate(keys=[a#464], functions=[])
         +- Project [struct(id, id#0, name, name#1, age, age#2L) AS a#464]
            +- SortMergeJoin [coalesce(struct(id, id#0, name, name#1, age, age#2L), [,,0]), isnull(struct(id, id#0, name, name#1, age, age#2L))], [coalesce(a#478, [,,0]), isnull(a#478)], LeftAnti
               :- Sort [coalesce(struct(id, id#0, name, name#1, age, age#2L), [,,0]) ASC NULLS FIRST, isnull(struct(id, id#0, name, name#1, age, age#2L)) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(coalesce(struct(id, id#0, name, name#1, age, age#2L), [,,0]), isnull(struct(id, id#0, name, name#1, age, age#2L)), 200), ENSURE_REQUIREMENTS, [plan_id=6082]
               :     +- Scan ExistingRDD[id#0,name#1,age#2L]
               +- Sort [coalesce(a#478, [,,0]) ASC

In [15]:
# More meaningful queries can be expressed by applying filters.
# Question: where is this filter applied?
#cached in memory=> push all the way down to vextex dataframe
#g.find("(a)-[e]->(b); (b)-[]->(a)") multiple joins
g.find("(a)-[e]->(b); (b)-[]->(a)").filter("b.age > 36").show()
g.find("(a)-[e]->(b); (b)-[]->(a)").filter("b.age > 35").show()

+------------+--------------+----------------+
|           a|             e|               b|
+------------+--------------+----------------+
|{b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
+------------+--------------+----------------+

+----------------+--------------+----------------+
|               a|             e|               b|
+----------------+--------------+----------------+
|{c, Charlie, 37}|{c, b, follow}|    {b, Bob, 36}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
+----------------+--------------+----------------+



In [None]:
g.find("(a)-[]->(b); (b)-[]->(a)").filter("b.age > 36").explain()

== Physical Plan ==
*(4) Project [a#2584, b#2586]
+- *(4) BroadcastHashJoin [b#2586.id, a#2584.id], [__tmp2506060614762666678#2609.src, __tmp2506060614762666678#2609.dst], Inner, BuildRight
   :- *(4) Project [a#2584, b#2586]
   :  +- *(4) BroadcastHashJoin [__tmp-3851898762290097694#2582.dst], [b#2586.id], Inner, BuildRight
   :     :- *(4) BroadcastHashJoin [__tmp-3851898762290097694#2582.src], [a#2584.id], Inner, BuildRight
   :     :  :- *(4) Project [struct(src, src#44, dst, dst#45, relationship, relationship#46) AS __tmp-3851898762290097694#2582]
   :     :  :  +- InMemoryTableScan [dst#45, relationship#46, src#44]
   :     :  :        +- InMemoryRelation [src#44, dst#45, relationship#46], StorageLevel(disk, memory, deserialized, 1 replicas)
   :     :  :              +- *(1) Scan ExistingRDD[src#44,dst#45,relationship#46]
   :     :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, struct<id:string,name:string,age:bigint>, false].id)), [id=#1356]
   :     :     +- 

In [30]:
# Find chains of 4 vertices such that at least 2 of the 3 edges are "friend" relationships.
# The when function is similar to the CASE WHEN in SQL

chain4 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(d)").where('a!=d AND a!=c AND b!=d')#want 4 unqiue users
chain4.show()
friendTo1 = lambda e: when(e['relationship'] == 'friend', 1).otherwise(0)#check whether relationship is friend or follow
#column pruning, "*", optimer will figure out to print which columns
chain4.select('*',friendTo1(chain4['e1']).alias('f1'), \
                  friendTo1(chain4['e2']).alias('f2'), \
                  friendTo1(chain4['e3']).alias('f3')) \
      .where('f1 + f2 + f3 >= 2').select('a', 'b', 'c', 'd').show()

+---------------+--------------+---------------+--------------+----------------+--------------+----------------+
|              a|            e1|              b|            e2|               c|            e3|               d|
+---------------+--------------+---------------+--------------+----------------+--------------+----------------+
| {g, Gabby, 60}|{g, e, follow}|{e, Esther, 32}|{e, d, friend}|  {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}|  {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|
|{e, Esther, 32}|{e, f, follow}| {f, Fanny, 38}|{f, c, follow}|{c, Charlie, 37}|{c, b, follow}|    {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|{f, c, follow}|{c, Charlie, 37}|
| {g, Gabby, 60}|{g, e, follow}|{e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|{f, c, follow}|{

### Subgraphs

In [None]:
# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).

g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")\
      .dropIsolatedVertices()# 22-friend-31(nonsense to have an edge with a non satis node 22)

g1.vertices.show()
g1.edges.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  e|Esther| 32|
|  b|   Bob| 36|
|  a| Alice| 34|
+---+------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  e|      friend|
|  a|  b|      friend|
+---+---+------------+



In [None]:
# Select subgraph based on edges "e" of type "follow"
# pointing from a younger user "a" to an older user "b".

paths = g.find("(a)-[e]->(b)")\
  .filter("e.relationship = 'follow'")\
  .filter("a.age < b.age")

paths.show()
# "paths" contains vertex info. Extract the edges.

e2 = paths.select("e.*")
e2.show()

# Construct the subgraph
g2 = GraphFrame(g.vertices, e2).dropIsolatedVertices()

g2.vertices.show()
g2.edges.show()

+---------------+--------------+----------------+
|              a|             e|               b|
+---------------+--------------+----------------+
|[e, Esther, 32]|[e, f, follow]|  [f, Fanny, 38]|
|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 37]|
+---------------+--------------+----------------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  e|  f|      follow|
|  b|  c|      follow|
+---+---+------------+

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  f|  Fanny| 38|
|  e| Esther| 32|
|  c|Charlie| 37|
|  b|    Bob| 36|
+---+-------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  e|  f|      follow|
|  b|  c|      follow|
+---+---+------------+



In [37]:
#Find all people who follow Charlie.
g.find("(a)-[e]->(b)").filter("e.relationship = 'follow' and b.name = 'Charlie'").select("a.name").show()

+-----+
| name|
+-----+
|Fanny|
|  Bob|
+-----+



### BFS

In [None]:
# Starting vertex is 'a'
# bfs is suitable for graph with short diameter; 
layers = [g.vertices.select('id').where("id = 'a'")]#store different layers: list of lists
visited =  layers[0]

while layers[-1].count() > 0:#layers[-1]: current layer
    # From the current layer, get all the one-hop neighbors
    d1 = layers[-1].join(g.edges, layers[-1]['id'] == g.edges['src'])
    # Rename the column as 'id', and remove visited verices and duplicates
    d2 = d1.select(d1['dst'].alias('id')) \
           .subtract(visited).distinct().cache()# may conatin multiple duplicates when joining
    layers += [d2]
    visited = visited.union(layers[-1]).cache()

In [None]:
layers[0].show()

+---+
| id|
+---+
|  a|
+---+



In [None]:
layers[1].show()

+---+
| id|
+---+
|  e|
|  b|
+---+



In [None]:
layers[2].show()

+---+
| id|
+---+
|  f|
|  d|
|  c|
+---+



In [None]:
layers[3].show()

+---+
| id|
+---+
+---+



In [None]:
# GraphFrames provides own BFS:

paths = g.bfs("id = 'a'", "age > 36")
paths.show()

+--------------+--------------+---------------+--------------+----------------+
|          from|            e0|             v1|            e1|              to|
+--------------+--------------+---------------+--------------+----------------+
|{a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|{b, c, follow}|{c, Charlie, 37}|
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, f, follow}|  {f, Fanny, 38}|
+--------------+--------------+---------------+--------------+----------------+



### List Ranking

In [None]:
# -1 denotes end of list
data = [(0, 5), (1, 0), (3, 4), (4, 6), (5, -1), (6,1)]# 3 is the starting node (5, -1) is the last node
e = spark.createDataFrame(data, ['src', 'dst'])
v = e.select(col('src').alias('id'), when(e.dst == -1, 0).otherwise(1).alias('d'))#initialization of link ranking(d represent distance)
v1 = spark.createDataFrame([(-1, 0)], ['id', 'd'])
v = v.union(v1)
v.show()
e.show()

+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  1|
|  3|  1|
|  4|  1|
|  5|  0|
|  6|  1|
| -1|  0|
+---+---+

+---+---+
|src|dst|
+---+---+
|  0|  5|
|  1|  0|
|  3|  4|
|  4|  6|
|  5| -1|
|  6|  1|
+---+---+



In [None]:
while e.filter('dst != -1').count() > 0:
    g = GraphFrame(v, e)
    g.cache()
    v = g.triplets.select(col('src.id').alias('id'), 
                          (col('src.d') + col('dst.d')).alias('d')) \#
         .union(v1)
    e = g.find('(a)-[]->(b); (b)-[]->(c)') \# find node.next.next;(add -1 0, otherwise the last node cannot be find though motif finding)
         .select(col('a.id').alias('src'), col('c.id').alias('dst')) \
         .union(e.filter('dst = -1'))
    e.show()
v.show()

+---+---+
|src|dst|
+---+---+
|  6|  0|
|  3|  6|
|  1|  5|
|  4|  1|
|  0| -1|
|  5| -1|
+---+---+

+---+---+
|src|dst|
+---+---+
|  3|  0|
|  4|  5|
|  6| -1|
|  1| -1|
|  0| -1|
|  5| -1|
+---+---+

+---+---+
|src|dst|
+---+---+
|  3| -1|
|  4| -1|
|  1| -1|
|  6| -1|
|  0| -1|
|  5| -1|
+---+---+

+---+---+
| id|  d|
+---+---+
|  0|  1|
|  1|  2|
|  3|  5|
|  4|  4|
|  5|  0|
|  6|  3|
| -1|  0|
+---+---+



### Message passing via AggregateMessages

In [None]:
from pyspark.sql.functions import coalesce, col, lit, sum, when, min, max
from graphframes.lib import AggregateMessages as AM

# AggregateMessages has the following members: src, dst, edge, msg
# For each user, sum the ages of the adjacent users.
agg = g.aggregateMessages(
    sum(AM.msg).alias("summedAges"),
    #sendToSrc = AM.dst['age'],
    sendToDst = AM.src['age'])
agg.show()

+---+----------+
| id|summedAges|
+---+----------+
|  f|        32|
|  e|        94|
|  d|        32|
|  c|        74|
|  b|        71|
|  a|        29|
+---+----------+



### The Pregel Model for Graph Computation

In [None]:
# Pagerank in the Pregel model 

from pyspark.sql.functions import coalesce, col, lit, sum, when, min
from graphframes.lib import Pregel

# Need to set up a directory for Pregel computation
sc.setCheckpointDir("checkpoint")

'''
Use builder pattern to describe the operations.
Call run() to start a run. It returns a DataFrame of vertices from the last iteration.

When a run starts, it expands the vertices DataFrame using column expressions 
defined by withVertexColumn(). Those additional vertex properties can be 
changed during Pregel iterations. In each Pregel iteration, there are three 
phases:

* Given each edge triplet, generate messages and specify target vertices to 
  send, described by sendMsgToDst() and sendMsgToSrc().
* Aggregate messages by target vertex IDs, described by aggMsgs().
* Update additional vertex properties based on aggregated messages and states 
  from previous iteration, described by withVertexColumn().
'''
v = g.outDegrees
g = GraphFrame(v,e)
ranks = g.pregel \
        .setMaxIter(5) \
        .sendMsgToDst(Pregel.src("rank") / Pregel.src("outDegree")) \# can also sendMsgToSrc
        .aggMsgs(sum(Pregel.msg())) \
        .withVertexColumn("rank", lit(1.0), \#turn constant into a column of constant
            coalesce(Pregel.msg(), lit(0.0)) * lit(0.85) + lit(0.15)) \# no message, no computation 0*anything = 0
        .run()
ranks.show()

# pyspark.sql.functions.coalesce(*cols): Returns the first column that is not null.

# Not to be confused with spark.sql.coalesce(numPartitions)# merge partition


+---+---------+-------------------+
| id|outDegree|               rank|
+---+---------+-------------------+
|  a|        2|    0.4758149609375|
|  b|        1| 2.2680220312499997|
|  c|        1|  2.780783203124999|
|  f|        1|0.41104330078124995|
|  e|        2| 0.5032932031249999|
|  d|        1|0.41104330078124995|
|  g|        1|               0.15|
+---+---------+-------------------+



In [None]:
# BFS in the Pregel model

g = GraphFrame(v,e)

dist = g.pregel \
        .sendMsgToDst(when(Pregel.src('active'), Pregel.src('d') + 1)) \
        .aggMsgs(min(Pregel.msg())) \
        .withVertexColumn('d', when(v['id'] == 'a', 0).otherwise(99999), \
            when(Pregel.msg() < col('d'), Pregel.msg()).otherwise(col('d'))) \#d represent distance
        .withVertexColumn('active', when(v['id'] == 'a', True).otherwise(False), \
            when(Pregel.msg() < col('d'), True).otherwise(False)) \#active or not
        .run()
dist.show()


+---+---------+-----+------+
| id|outDegree|    d|active|
+---+---------+-----+------+
|  a|        2|    0| false|
|  b|        1|    1| false|
|  c|        1|    2| false|
|  f|        1|    2| false|
|  e|        2|    1| false|
|  d|        1|    2| false|
|  g|        1|99999| false|
+---+---------+-----+------+

