In [1]:
## Retrieved from http://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5
from pyspark import *
from pyspark.sql import *
from graphframes import *

In [2]:
spark = SparkSession.builder.appName('fun').getOrCreate()

In [3]:
vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50),
                                  ('2', 'May', 'Derrick', 26),
                                  ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks','Mike', 93),
                                  ('98', 'Berg','Tim', 28),
                                  ('99', 'Page', 'Allan', 16)],
                                 ['id','name','firstname','age'])

In [4]:
edges= spark.createDataFrame([('1','2','friend'),
                             ('2','1','friend'),
                             ('3','1','friend'),
                             ('1','3','friend'),
                             ('2','3','follows'),
                             ('3','4','friend'),
                             ('4','3','friend'),
                             ('5','3','friend'),
                             ('3','5','friend'),
                             ('4','5','follows'),
                             ('98','99','friend'),
                             ('99','98','friend')],
                            ['src','dst','type'])

In [5]:
g = GraphFrame(vertices, edges)

In [19]:
## Take a look at the schema of the Dataframes
vertices.printSchema()
edges.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- age: long (nullable = true)

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- type: string (nullable = true)



In [6]:
## Take a look at the DataFrames
g.vertices.show()
g.edges.show()

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  2|   May|  Derrick| 26|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
| 98|  Berg|      Tim| 28|
| 99|  Page|    Allan| 16|
+---+------+---------+---+

+---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  2|  1| friend|
|  3|  1| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  4|  3| friend|
|  5|  3| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
| 99| 98| friend|
+---+---+-------+



In [7]:
## Check the number of edges of each vertex
g.degrees.show()

+---+------+
| id|degree|
+---+------+
|  3|     7|
| 98|     2|
| 99|     2|
|  5|     3|
|  1|     4|
|  4|     3|
|  2|     3|
+---+------+



In [8]:
## Directed vs undirected edges
##
## Start with directed and undirected edges

In [9]:
copy = edges

In [12]:
from pyspark.sql.functions import udf

In [18]:
@udf("string")

def to_undir(src, dst):
    if src >= dst:
        return 'Delete'
    else :
        return 'Keep'

In [20]:
copy.withColumn('undir', to_undir(copy.src, copy.dst))\
.filter('undir =="Keep"').drop('undir').show()

+---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
+---+---+-------+



In [21]:
## for efficiently, it's better to avoid udf functions where possible
## and use built-in pyspakr.sql.functions instead

In [22]:
## Filtering and connected components

In [23]:
g.vertices.filter("age>30").show()

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
+---+------+---------+---+



In [24]:
g.inDegrees.filter("inDegree >= 2").sort("inDegree",ascending=False).show()

+---+--------+
| id|inDegree|
+---+--------+
|  3|       4|
|  5|       2|
|  1|       2|
+---+--------+



In [26]:
g.edges.filter('type =="friend"').show()

+---+---+------+
|src|dst|  type|
+---+---+------+
|  1|  2|friend|
|  2|  1|friend|
|  3|  1|friend|
|  1|  3|friend|
|  3|  4|friend|
|  4|  3|friend|
|  5|  3|friend|
|  3|  5|friend|
| 98| 99|friend|
| 99| 98|friend|
+---+---+------+



In [27]:
sc.setCheckpointDir('graphframes_cps')

In [28]:
g.connectedComponents().show()

+---+------+---------+---+------------+
| id|  name|firstname|age|   component|
+---+------+---------+---+------------+
|  1|Carter|  Derrick| 50|154618822656|
|  2|   May|  Derrick| 26|154618822656|
|  3| Mills|     Jeff| 80|154618822656|
|  4|  Hood|   Robert| 65|154618822656|
|  5| Banks|     Mike| 93|154618822656|
| 98|  Berg|      Tim| 28|317827579904|
| 99|  Page|    Allan| 16|317827579904|
+---+------+---------+---+------------+



In [29]:
## Motif finding

In [30]:
g.find("(a)-[e]->(b); (b)-[e2]->(a)").show()

+--------------------+----------------+--------------------+----------------+
|                   a|               e|                   b|              e2|
+--------------------+----------------+--------------------+----------------+
| [98, Berg, Tim, 28]|[98, 99, friend]|[99, Page, Allan,...|[99, 98, friend]|
|[2, May, Derrick,...|  [2, 1, friend]|[1, Carter, Derri...|  [1, 2, friend]|
|[99, Page, Allan,...|[99, 98, friend]| [98, Berg, Tim, 28]|[98, 99, friend]|
|[3, Mills, Jeff, 80]|  [3, 5, friend]|[5, Banks, Mike, 93]|  [5, 3, friend]|
|[1, Carter, Derri...|  [1, 3, friend]|[3, Mills, Jeff, 80]|  [3, 1, friend]|
|[3, Mills, Jeff, 80]|  [3, 1, friend]|[1, Carter, Derri...|  [1, 3, friend]|
|[5, Banks, Mike, 93]|  [5, 3, friend]|[3, Mills, Jeff, 80]|  [3, 5, friend]|
|[4, Hood, Robert,...|  [4, 3, friend]|[3, Mills, Jeff, 80]|  [3, 4, friend]|
|[1, Carter, Derri...|  [1, 2, friend]|[2, May, Derrick,...|  [2, 1, friend]|
|[3, Mills, Jeff, 80]|  [3, 4, friend]|[4, Hood, Robert,...|  [4

In [32]:
mutualFriends = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(b); (b)-[]->(a)").dropDuplicates()

In [33]:
mutualFriends.filter('a.id == 2 and c.id == 3').show()

+--------------------+--------------------+--------------------+
|                   a|                   b|                   c|
+--------------------+--------------------+--------------------+
|[2, May, Derrick,...|[1, Carter, Derri...|[3, Mills, Jeff, 80]|
+--------------------+--------------------+--------------------+



In [34]:
## Triangle Count and Page Rank

In [35]:
g.triangleCount().show()

+-----+---+------+---------+---+
|count| id|  name|firstname|age|
+-----+---+------+---------+---+
|    2|  3| Mills|     Jeff| 80|
|    0| 98|  Berg|      Tim| 28|
|    0| 99|  Page|    Allan| 16|
|    1|  5| Banks|     Mike| 93|
|    1|  1|Carter|  Derrick| 50|
|    1|  4|  Hood|   Robert| 65|
|    1|  2|   May|  Derrick| 26|
+-----+---+------+---------+---+



In [36]:
pr = g.pageRank(resetProbability = 0.15, tol = 0.01)

In [37]:
## look at the pagerank score for every vertex
pr.vertices.show()

+---+------+---------+---+------------------+
| id|  name|firstname|age|          pagerank|
+---+------+---------+---+------------------+
|  1|Carter|  Derrick| 50|0.9055074972891308|
|  3| Mills|     Jeff| 80| 1.853919642738813|
|  2|   May|  Derrick| 26|0.5377967999474921|
|  4|  Hood|   Robert| 65|0.6873519241384106|
| 98|  Berg|      Tim| 28|1.0225331112091938|
|  5| Banks|     Mike| 93|0.9703579134677663|
| 99|  Page|    Allan| 16|1.0225331112091938|
+---+------+---------+---+------------------+



In [38]:
## look at the weight of every edge
pr.edges.show()

+---+---+-------+------------------+
|src|dst|   type|            weight|
+---+---+-------+------------------+
|  1|  2| friend|               0.5|
| 99| 98| friend|               1.0|
|  1|  3| friend|               0.5|
|  4|  5|follows|               0.5|
|  5|  3| friend|               1.0|
| 98| 99| friend|               1.0|
|  3|  5| friend|0.3333333333333333|
|  4|  3| friend|               0.5|
|  2|  1| friend|               0.5|
|  3|  4| friend|0.3333333333333333|
|  3|  1| friend|0.3333333333333333|
|  2|  3|follows|               0.5|
+---+---+-------+------------------+

