### 1. Getting Started

In [0]:
from graphframes import *

In [0]:
from pyspark import *
from pyspark.sql import *

In [0]:
spark = SparkSession.builder.appName('fun').getOrCreate()
spark

In [0]:
vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50), 
                                  ('2', 'May', 'Derrick', 26),
                                 ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks', 'Mike', 93),
                                 ('98', 'Berg', 'Tim', 28),
                                 ('99', 'Page', 'Allan', 16)],
                                 ['id', 'name', 'firstname', 'age'])

In [0]:
vertices

Out[5]: DataFrame[id: string, name: string, firstname: string, age: bigint]

In [0]:
edges = spark.createDataFrame([('1', '2', 'friend'), 
                               ('2', '1', 'friend'),
                              ('3', '1', 'friend'),
                              ('1', '3', 'friend'),
                               ('2', '3', 'follows'),
                               ('3', '4', 'friend'),
                               ('4', '3', 'friend'),
                               ('5', '3', 'friend'),
                               ('3', '5', 'friend'),
                               ('4', '5', 'follows'),
                              ('98', '99', 'friend'),
                              ('99', '98', 'friend')],
                              ['src', 'dst', 'type'])

In [0]:
edges

Out[7]: DataFrame[src: string, dst: string, type: string]

In [0]:
g = GraphFrame(vertices, edges)



In [0]:
# Take a look at the DataFrames
g.vertices.show()
g.edges.show()

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  2|   May|  Derrick| 26|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
| 98|  Berg|      Tim| 28|
| 99|  Page|    Allan| 16|
+---+------+---------+---+

+---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  2|  1| friend|
|  3|  1| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  4|  3| friend|
|  5|  3| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
| 99| 98| friend|
+---+---+-------+



In [0]:
# Check the number of edges of each vertex
g.degrees.show()

+---+------+
| id|degree|
+---+------+
|  1|     4|
|  2|     3|
|  3|     7|
|  4|     3|
|  5|     3|
| 98|     2|
| 99|     2|
+---+------+



### 2. Directed vs undirected edges

In [0]:
copy = edges
from pyspark.sql.functions import udf
@udf("string")
def to_undir(src, dst):
    if src >= dst:
        return 'Delete'
    else : 
        return 'Keep'
copy.withColumn('undir', to_undir(copy.src, copy.dst))\
.filter('undir == "Keep"').drop('undir').show()

+---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
+---+---+-------+



### 3. Filtering and connected components

In [0]:
g.vertices.filter("age > 30").show()
g.inDegrees.filter("inDegree >= 2").sort("inDegree", ascending=False).show()
g.edges.filter('type == "friend"')

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
+---+------+---------+---+

+---+--------+
| id|inDegree|
+---+--------+
|  3|       4|
|  1|       2|
|  5|       2|
+---+--------+

Out[11]: DataFrame[src: string, dst: string, type: string]

In [0]:
sc.setCheckpointDir('https://community.cloud.databricks.com/?o=291647158364918#folder/86354967488104')

In [0]:
g.connectedComponents().show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-86354967488105>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mg[0m[0;34m.[0m[0mconnectedComponents[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/jars/spark--maven-trees--ml--11.x--graphframes--org.graphframes--graphframes_2.12--org.graphframes__graphframes_2.12__0.8.2-db1-spark3.2.jar/graphframes/graphframe.py[0m in [0;36mconnectedComponents[0;34m(self, algorithm, checkpointInterval, broadcastThreshold, optStartIter, intermediateStorageLevel, sparsityThreshold)[0m
[1;32m    336[0m         """
[1;32m    337[0m         [0mjavaIntermediateStorageLevel[0m [0;34m=[0m [0mself[0m[0;34m.[0m[0m_sc[0m[0;34m.[0m[0m_getJavaStorageLevel[0m[0;34m([0m[0mintermediateStorageLevel[0m[0;

### 4. Motif finding

In [0]:
g.find("(a)-[e]->(b); (b)-[e2]->(a)").show()

+--------------------+----------------+--------------------+----------------+
|                   a|               e|                   b|              e2|
+--------------------+----------------+--------------------+----------------+
|{2, May, Derrick,...|  {2, 1, friend}|{1, Carter, Derri...|  {1, 2, friend}|
|{1, Carter, Derri...|  {1, 3, friend}|{3, Mills, Jeff, 80}|  {3, 1, friend}|
|{1, Carter, Derri...|  {1, 2, friend}|{2, May, Derrick,...|  {2, 1, friend}|
|{3, Mills, Jeff, 80}|  {3, 1, friend}|{1, Carter, Derri...|  {1, 3, friend}|
|{4, Hood, Robert,...|  {4, 3, friend}|{3, Mills, Jeff, 80}|  {3, 4, friend}|
|{3, Mills, Jeff, 80}|  {3, 4, friend}|{4, Hood, Robert,...|  {4, 3, friend}|
|{3, Mills, Jeff, 80}|  {3, 5, friend}|{5, Banks, Mike, 93}|  {5, 3, friend}|
|{5, Banks, Mike, 93}|  {5, 3, friend}|{3, Mills, Jeff, 80}|  {3, 5, friend}|
| {98, Berg, Tim, 28}|{98, 99, friend}|{99, Page, Allan,...|{99, 98, friend}|
|{99, Page, Allan,...|{99, 98, friend}| {98, Berg, Tim, 28}|{98,

In [0]:
mutualFriends = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(b); (b)-[]->(a)")\
.dropDuplicates()

In [0]:
mutualFriends.filter('a.id == 2 and c.id == 3').show()

+--------------------+--------------------+--------------------+
|                   a|                   b|                   c|
+--------------------+--------------------+--------------------+
|{2, May, Derrick,...|{1, Carter, Derri...|{3, Mills, Jeff, 80}|
+--------------------+--------------------+--------------------+



### 5. TriangleCount and PageRank

In [0]:
g.triangleCount().show()

+-----+---+------+---------+---+
|count| id|  name|firstname|age|
+-----+---+------+---------+---+
|    1|  1|Carter|  Derrick| 50|
|    1|  2|   May|  Derrick| 26|
|    2|  3| Mills|     Jeff| 80|
|    1|  4|  Hood|   Robert| 65|
|    1|  5| Banks|     Mike| 93|
|    0| 98|  Berg|      Tim| 28|
|    0| 99|  Page|    Allan| 16|
+-----+---+------+---------+---+



In [0]:
pr = g.pageRank(resetProbability=0.15, tol=0.01)
## look at the pagerank score for every vertex
pr.vertices.show()
## look at the weight of every edge
pr.edges.show()

+---+------+---------+---+------------------+
| id|  name|firstname|age|          pagerank|
+---+------+---------+---+------------------+
|  3| Mills|     Jeff| 80| 1.853919642738813|
| 98|  Berg|      Tim| 28|1.0225331112091938|
| 99|  Page|    Allan| 16|1.0225331112091938|
|  5| Banks|     Mike| 93|0.9703579134677663|
|  1|Carter|  Derrick| 50|0.9055074972891308|
|  4|  Hood|   Robert| 65|0.6873519241384106|
|  2|   May|  Derrick| 26|0.5377967999474921|
+---+------+---------+---+------------------+

+---+---+-------+------------------+
|src|dst|   type|            weight|
+---+---+-------+------------------+
|  2|  1| friend|               0.5|
|  3|  1| friend|0.3333333333333333|
|  1|  2| friend|               0.5|
|  1|  3| friend|               0.5|
|  5|  3| friend|               1.0|
|  4|  3| friend|               0.5|
|  2|  3|follows|               0.5|
|  3|  4| friend|0.3333333333333333|
|  4|  5|follows|               0.5|
|  3|  5| friend|0.3333333333333333|
| 99| 98| fr