# Graph Processing with PySpark

## Graphs
<a href='https://photos.google.com/share/AF1QipMejtpuky8_1NPYB9dtsCAupkdjc2XemFIPCDBVplUkNLe3goIyK14PHPlmyl2nSw?key=Q3JQTUwxdnNqVDNSalBZX0xwcFczeGdOSWJwVS1R&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/J5uyrKLu98hMGu-RFY3rFh2fjG9NqedC-5uJQtdhO1SiItr4x67qkd0Eu3EB4H8_GKSs6g26xpPARWwIJM1KJrT1IV5jmti_Zva5dCUg4D8nGVm1tO685CssYK7OS9YexYN9AKVaFpw=w2400' /></a>
<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>
<a href='http://www.mathcs.emory.edu/~cheung/Courses/171/Syllabus/11-Graph/FIGS/Dijkstra/weight01.gif'><img src='http://www.mathcs.emory.edu/~cheung/Courses/171/Syllabus/11-Graph/FIGS/Dijkstra/weight01.gif' style="width:500px;"/></a>

## Graphframes

In [0]:
from graphframes import *

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

### Create Graph

In [0]:
vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Faith", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

In [0]:
edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

In [0]:
g = GraphFrame(vertices, edges)
print(g)

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])


### Common Graph Operations

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
display(g.vertices)

id,name,age
a,Alice,34
b,Bob,36
c,Charlie,30
d,David,29
e,Esther,32
f,Faith,36
g,Gabby,60


In [0]:
display(g.edges)

src,dst,relationship
a,b,friend
b,c,follow
c,b,follow
f,c,follow
e,f,follow
e,d,friend
d,a,friend
a,e,friend


In [0]:
display(g.inDegrees)

id,inDegree
b,2
c,2
f,1
d,1
a,1
e,1


In [0]:
display(g.outDegrees)

id,outDegree
a,2
b,1
c,1
f,1
e,2
d,1


In [0]:
display(g.degrees)

id,degree
b,3
a,3
c,3
f,2
e,3
d,2


### Queries on Graph

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
numFollows = g.edges.filter("relationship = 'follow'")
numFollows.count()

Out[20]: 4

In [0]:
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e1]->(b); (b)-[e2]->(a)")
display(motifs)

a,e1,b,e2
"List(c, Charlie, 30)","List(c, b, follow)","List(b, Bob, 36)","List(b, c, follow)"
"List(b, Bob, 36)","List(b, c, follow)","List(c, Charlie, 30)","List(c, b, follow)"


In [0]:
filtered = motifs.filter("b.age > 38 or a.age > 30") # filter this further by age
display(filtered)

a,e1,b,e2
"List(b, Bob, 36)","List(b, c, follow)","List(c, Charlie, 30)","List(c, b, follow)"


In [0]:
g2 = g.filterEdges("relationship = 'friend'").filterVertices("age > 30").dropIsolatedVertices()

In [0]:
display(g2.vertices)

id,name,age
a,Alice,34
b,Bob,36
e,Esther,32


In [0]:
display(g2.edges)

src,dst,relationship
a,b,friend
a,e,friend


### Shortest Path

<a href='https://networkx.guide/assets/images/shortest-path-example-510eddb7868bfa24c0ea29f009894d0a.jpg'><img src='https://networkx.guide/assets/images/shortest-path-example-510eddb7868bfa24c0ea29f009894d0a.jpg' style="width:600px;"/></a>

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
#Computes shortest paths from each vertex to the given set of landmark vertices, where landmarks are specified by vertex ID. Note that this takes edge direction into account.

results = g.shortestPaths(landmarks=["e"])
results.select("id", "distances").show()

+---+---------+
| id|distances|
+---+---------+
|  g|       {}|
|  f|       {}|
|  e| {e -> 0}|
|  d| {e -> 2}|
|  c|       {}|
|  b|       {}|
|  a| {e -> 1}|
+---+---------+



### Community Detection Algorithm: LPA

<a href='https://neo4j.com/docs/graph-data-science/current/_images/louvain-multilevel-graph.svg'><img src='https://neo4j.com/docs/graph-data-science/current/_images/louvain-multilevel-graph.svg' style="width:600px;"/></a>

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
result = g.labelPropagation(maxIter=5)
result.select("id", "label").show()

+---+-------------+
| id|        label|
+---+-------------+
|  g| 146028888064|
|  f|1382979469312|
|  e|1460288880640|
|  d|1460288880640|
|  c|1382979469312|
|  b|1047972020224|
|  a|1382979469312|
+---+-------------+



<a href='https://neo4j.com/docs/graph-data-science/current/_images/example-graphs/label-propagation-graph.svg'><img src='https://neo4j.com/docs/graph-data-science/current/_images/example-graphs/label-propagation-graph.svg' /></a>

In [0]:
vertices = sqlContext.createDataFrame([
  ("a", "alice"),
  ("mi", "micheal"),
  ("b", "bridget"),
  ("c", "charles"),
  ("d", "doug"),
  ("ma", "mark"),
  ], ["id", "name"])
edges = sqlContext.createDataFrame([
  ("a", "mi", "follow"),
  ("mi", "a", "follow"),
  ("mi", "b", "follow"),
  ("b", "mi", "follow"),
  ("a", "b", "follow"),
  ("b", "a", "follow"),
  ("a", "c", "follow"),
  ("c", "d", "follow"),
  ("ma", "d", "follow"),
  ("d", "ma", "follow"),
], ["src", "dst", "relationship"])

g3 = GraphFrame(vertices, edges)
print(g3)
result = g3.labelPropagation(maxIter=5)
result.select("id", "label").show()

GraphFrame(v:[id: string, name: string], e:[src: string, dst: string ... 1 more field])
+---+-------------+
| id|        label|
+---+-------------+
|  d|1142461300736|
|  c|1460288880640|
| ma| 807453851648|
|  b|1460288880640|
| mi|1382979469312|
|  a|1382979469312|
+---+-------------+



### Breath First Search

<a href='https://cdn.educba.com/academy/wp-content/uploads/2020/03/Breadth-First-Search-3.jpg'><img src='https://cdn.educba.com/academy/wp-content/uploads/2020/03/Breadth-First-Search-3.jpg' style="width:600px;"/></a>

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

# Specify edge filters or max path lengths.
paths = g.bfs("name = 'Esther'", "age < 32",\
        edgeFilter="relationship != 'friend'", maxPathLength=3)
paths.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|{e, Esther, 32}|{e, d, friend}|{d, David, 29}|
+---------------+--------------+--------------+

+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Faith, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+



### Triangle Counting

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
results = g.triangleCount()
results.select("id", "count").show()

+---+-----+
| id|count|
+---+-----+
|  a|    1|
|  b|    0|
|  c|    0|
|  d|    1|
|  e|    1|
|  f|    0|
|  g|    0|
+---+-----+



<a href='https://photos.google.com/share/AF1QipMejtpuky8_1NPYB9dtsCAupkdjc2XemFIPCDBVplUkNLe3goIyK14PHPlmyl2nSw?key=Q3JQTUwxdnNqVDNSalBZX0xwcFczeGdOSWJwVS1R&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/J5uyrKLu98hMGu-RFY3rFh2fjG9NqedC-5uJQtdhO1SiItr4x67qkd0Eu3EB4H8_GKSs6g26xpPARWwIJM1KJrT1IV5jmti_Zva5dCUg4D8nGVm1tO685CssYK7OS9YexYN9AKVaFpw=w2400' /></a>

In [0]:
vertices = sqlContext.createDataFrame([
  ("a", "alice"),
  ("b", "bob"),
  ("c", "chris"),
  ("d", "doris"),
  ("e", "emily"),
  ("f", "fiona")], ["id", "name"])

edges = sqlContext.createDataFrame([
  ("a", "b"),
  ("a", "c"),
  ("a", "d"),
  ("b", "a"),
  ("b", "d"),
  ("b", "e"),
  ("c", "a"),
  ("c", "d"),
  ("d", "a"),
  ("d", "b"),
  ("d", "c"),
  ("d", "e"),
  ("d", "f"),
  ("e", "b"),
  ("e", "d"),
  ("f", "d")], ["src", "dst"])

g4 = GraphFrame(vertices, edges)
print(g4)

results = g4.triangleCount()
results.select("id", "count").show()

GraphFrame(v:[id: string, name: string], e:[src: string, dst: string])
+---+-----+
| id|count|
+---+-----+
|  a|    2|
|  b|    2|
|  c|    1|
|  d|    3|
|  e|    1|
|  f|    0|
+---+-----+

