# Graphs in Spark

## Setup

In [0]:
import os
java8_location= '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['JAVA_HOME'] = java8_location

In [2]:
!pip install pyspark==2.3.0
!pip install graphframes

Collecting pyspark==2.3.0
[?25l  Downloading https://files.pythonhosted.org/packages/58/49/45370cc153a6adcf2c304a3c06e801ed3c9650d0f852e7fde04bd8ffb534/pyspark-2.3.0.tar.gz (211.9MB)
[K     |████████████████████████████████| 211.9MB 64.5MB/s 
[?25hCollecting py4j==0.10.6 (from pyspark==2.3.0)
[?25l  Downloading https://files.pythonhosted.org/packages/4a/08/162710786239aa72bd72bb46c64f2b02f54250412ba928cb373b30699139/py4j-0.10.6-py2.py3-none-any.whl (189kB)
[K     |████████████████████████████████| 194kB 41.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.3.0-py2.py3-none-any.whl size=212271917 sha256=9f64e8e05a95268aa89f8d8607ad6eaff9328cba19ce9b2eb260ead8df0634b8
  Stored in directory: /root/.cache/pip/wheels/d9/db/ff/e6f3a8a564163ea64bc2072357e77b3404d10f91be48352796
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed

In [0]:
os.environ["PYSPARK_SUBMIT_ARGS"] = ("--packages graphframes:graphframes:0.6.0-spark2.3-s_2.11 pyspark-shell")

In [0]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

### Creating RDD from file 
- A recap for Google Colab 

In [0]:
rdd_from_file = sc.textFile('/content/alice.txt', 4)
rdd_from_file.take(3)

['CHAPTER I. Down the Rabbit-Hole',
 '',
 'Alice was beginning to get very tired of sitting by her sister on the']

## Triangle Counting (A Naive implementation)
<a href='https://photos.google.com/share/AF1QipMejtpuky8_1NPYB9dtsCAupkdjc2XemFIPCDBVplUkNLe3goIyK14PHPlmyl2nSw?key=Q3JQTUwxdnNqVDNSalBZX0xwcFczeGdOSWJwVS1R&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/J5uyrKLu98hMGu-RFY3rFh2fjG9NqedC-5uJQtdhO1SiItr4x67qkd0Eu3EB4H8_GKSs6g26xpPARWwIJM1KJrT1IV5jmti_Zva5dCUg4D8nGVm1tO685CssYK7OS9YexYN9AKVaFpw=w2400' /></a>

In [0]:
edges = sc.parallelize([('alice','bob'),('alice','doris'),('bob','emily'),
                        ('chris','alice'),('chris','doris'),('doris','bob'),
                        ('doris','emily'),('fiona','doris')])
edges.collect()

[('alice', 'bob'),
 ('alice', 'doris'),
 ('bob', 'emily'),
 ('chris', 'alice'),
 ('chris', 'doris'),
 ('doris', 'bob'),
 ('doris', 'emily'),
 ('fiona', 'doris')]

In [0]:
edges2nodes = edges.flatMap(lambda x: x)
edges2nodes.collect()

['alice',
 'bob',
 'alice',
 'doris',
 'bob',
 'emily',
 'chris',
 'alice',
 'chris',
 'doris',
 'doris',
 'bob',
 'doris',
 'emily',
 'fiona',
 'doris']

In [0]:
nodes = edges2nodes.distinct()
nodes.collect()

['alice', 'bob', 'doris', 'emily', 'chris', 'fiona']

In [0]:
t1 = edges.join(edges)
t1.collect()

[('bob', ('emily', 'emily')),
 ('doris', ('bob', 'bob')),
 ('doris', ('bob', 'emily')),
 ('doris', ('emily', 'bob')),
 ('doris', ('emily', 'emily')),
 ('alice', ('bob', 'bob')),
 ('alice', ('bob', 'doris')),
 ('alice', ('doris', 'bob')),
 ('alice', ('doris', 'doris')),
 ('chris', ('alice', 'alice')),
 ('chris', ('alice', 'doris')),
 ('chris', ('doris', 'alice')),
 ('chris', ('doris', 'doris')),
 ('fiona', ('doris', 'doris'))]

In [0]:
t2 = t1.filter(lambda x:x[1][0] !=x[1][1])
t2.collect()

[('doris', ('bob', 'emily')),
 ('doris', ('emily', 'bob')),
 ('alice', ('bob', 'doris')),
 ('alice', ('doris', 'bob')),
 ('chris', ('alice', 'doris')),
 ('chris', ('doris', 'alice'))]

In [0]:
def order_nodes(x):
  if x[0] < x[1]:
    return (x[0],x[1])
  else:
    return (x[1],x[0])

## Why do we need `order_nodes`?

- `('bob', ('emily', 'doris'))` & `('bob', ('doris', 'emily'))` are the same.
- changing them both to `('bob', ('doris', 'emily'))` allows us to eliminate duplicates

In [0]:
t3 = t2.mapValues(order_nodes)
t3.collect()

[('doris', ('bob', 'emily')),
 ('doris', ('bob', 'emily')),
 ('alice', ('bob', 'doris')),
 ('alice', ('bob', 'doris')),
 ('chris', ('alice', 'doris')),
 ('chris', ('alice', 'doris'))]

In [0]:
t4 = t3.distinct()
t4.collect()

[('chris', ('alice', 'doris')),
 ('doris', ('bob', 'emily')),
 ('alice', ('bob', 'doris'))]

In [0]:
count_triangles = t4.count()
count_triangles # count() is an action

3

### A Fast Triangle Counting Algorithm (Heavy-hitter nodes)

<a href='https://photos.google.com/share/AF1QipOVDr6R-9AaJnMWdvtMOrYAPQuxrbw3Ibk3CcItK5WBCFGeMNY_GEOwqCHYa1yR1A?key=b1ZQbWdjYzM0YmhKNkF6TEl2Um04YlIyYWxrVDJn&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/vCwj_9zWYYmGcs_24HfqmnO1X38pCkxm-It-fCfRE6AXZvfy4HYWUe6iBVzE6ew8Iov2371YoQ_WfvNwd_tHN3Dp6Zgvqh2RUDaOhmMSSolADK1tJrW-w1Pngt72F-4Vde7FWVrMX98=w2400' /></a>

In [0]:
t_graph = sc.parallelize([(1,2),(1,4),(1,5),(2,3),(2,4),(2,6),(3,4),(3,5),(3,6)]) 
                          # undirected graph
                          # n nodes, m edges, m>=n, nodes are numbered (integers)
t_graph.collect()

[(1, 2), (1, 4), (1, 5), (2, 3), (2, 4), (2, 6), (3, 4), (3, 5), (3, 6)]

In [0]:
nodes = t_graph.flatMap(lambda x: x).distinct()
nodes.collect()

[1, 2, 4, 5, 3, 6]

In [0]:
degrees = t_graph.flatMap(lambda x: x).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
degrees.collect()

[(1, 3), (2, 4), (4, 3), (5, 2), (3, 4), (6, 2)]

In [0]:
nodes_sorted_by_degrees = degrees.sortBy(lambda x: x[1])
nodes_sorted_by_degrees.collect()

[(5, 2), (6, 2), (1, 3), (4, 3), (2, 4), (3, 4)]

In [0]:
import math
cutoff = math.sqrt(t_graph.count()) # heavy hitter nodes: nodes with degree >= sqrt(m)
cutoff

3.0

In [0]:
heavy_hitter_nodes_with_degrees = nodes_sorted_by_degrees.filter(lambda x: x[1]>=cutoff)
heavy_hitter_nodes_with_degrees.collect()

[(1, 3), (4, 3), (2, 4), (3, 4)]

## GraphFrames

### Basic Operations

In [0]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

<a href='https://photos.google.com/share/AF1QipNQedye6kYIw0b6i3SsOpt7PH0lh0VElgwRNtMkpWZZkm6KGFEtRoZletAvSacFzw?key=bE4tVXg0VUNZZmgtOE5aN0tJN2NBSFpfdXJjOHFR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/fKwYuJ3--I-9ck7aKm-Pq0FGRShMqsk5aFAelPar5y_7DIfytdxLw4XhARxHPM_N5PknomhV7QDqFvOkXfL1Jzov1kqHsK-cBDO3J93aWqHVrG8TfGg1RtBoByRM0NyIRfWp9kD3V7g=w2400' /></a>

In [0]:
vertices = sqlContext.createDataFrame([("a", "Alice", 34), ("b", "Bob", 36), ("c", "Charlie", 30),
                                       ("d", "David", 29), ("e", "Esther", 32), ("f", "Faith", 36),
                                       ("g", "Gabby", 60)],
                                      ["id", "name", "age"])

In [0]:
edges = sqlContext.createDataFrame([("a", "b", "friend"), ("b", "c", "follow"), ("c", "b", "follow"),
                                    ("f", "c", "follow"), ("e", "f", "follow"), ("e", "d", "friend"),
                                    ("d", "a", "friend"), ("a", "e", "friend")],
                                   ["src", "dst", "relationship"])

In [0]:
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)

In [0]:
g

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])

In [0]:
g.vertices.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Faith| 36|
|  g|  Gabby| 60|
+---+-------+---+



In [0]:
g.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+



In [17]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  1|       1|
|  3|       1|
|  4|       1|
|  2|       1|
+---+--------+



In [18]:
g.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|  1|        1|
|  4|        1|
|  2|        2|
+---+---------+



In [0]:
follows = g.edges.filter("relationship = 'follow'")
follows.collect()

[Row(src='b', dst='c', relationship='follow'),
 Row(src='c', dst='b', relationship='follow'),
 Row(src='f', dst='c', relationship='follow'),
 Row(src='e', dst='f', relationship='follow')]

In [0]:
num_follows = follows.count()
num_follows

4

### Triangle Counting (triangles passing through vertices)

In [19]:
triangles_in_g = g.triangleCount()
triangles_in_g.collect()

[Row(count=1, id=1),
 Row(count=0, id=3),
 Row(count=1, id=4),
 Row(count=1, id=2)]

### Finding Motifs

<a href='https://photos.google.com/share/AF1QipMZdJym8QBDFa7uZQefYozKdR04YVCuvOkMEHWsCifBYRvkOP0_kZN1_fb0EY5cug?key=cUdLa2dFdnNsRVNqR0lCZ0FVTU5DckVMQ3dmOVNR&source=ctrlq.org'><img src='https://lh3.googleusercontent.com/mIqUCR37-QvGiYkipJCOQeAZgeXY4v3aqAD_JZtdJhUL4d-USrHu0AWF0sAmDh_dEW0kgl8ZxiX_VM1Xo9sC1e4VBEvbKwkvW1Z8Ly5IGqpCz8sPgQWKAmmhvBYgKK5BmTzDBymhXCc=w2400' /></a>

(Image Source: Wikipedia)

In [0]:
motifs = g.find("(v1)-[e1]->(v2); (v2)-[e2]->(v1)")
motifs.show()

+----------------+--------------+----------------+--------------+
|              v1|            e1|              v2|            e2|
+----------------+--------------+----------------+--------------+
|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|
+----------------+--------------+----------------+--------------+



In [0]:
one_sided = g.find("(v1)-[e1]->(v2); !(v2)-[]->(v1)")
one_sided.show()

+---------------+--------------+----------------+
|             v1|            e1|              v2|
+---------------+--------------+----------------+
| [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
| [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
| [f, Faith, 36]|[f, c, follow]|[c, Charlie, 30]|
|[e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
|[e, Esther, 32]|[e, f, follow]|  [f, Faith, 36]|
| [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
+---------------+--------------+----------------+



In [0]:
directed_triangles = g.find("(v1)-[e1]->(v2); (v2)-[e2]->(v3); (v3)-[e3]->(v1)")
directed_triangles.show()

+---------------+--------------+---------------+--------------+---------------+--------------+
|             v1|            e1|             v2|            e2|             v3|            e3|
+---------------+--------------+---------------+--------------+---------------+--------------+
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, d, friend]|
| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|
+---------------+--------------+---------------+--------------+---------------+--------------+



### Breadth First Search (BFS)

In [0]:
path_age = g.bfs("name = 'Esther'", "age < 32") # find path between vertex 
                                                # with name "Esther" to a vertex with age < 32
path_age.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|[e, Esther, 32]|[e, d, friend]|[d, David, 29]|
+---------------+--------------+--------------+



In [0]:
paths_follow = g.bfs("name = 'Esther'", "age < 32", edgeFilter="relationship != 'friend'")
paths_follow.show()

+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|[e, Esther, 32]|[e, f, follow]|[f, Faith, 36]|[f, c, follow]|[c, Charlie, 30]|
+---------------+--------------+--------------+--------------+----------------+



### Shortest Path

In [0]:
shortest_paths = g.shortestPaths(landmarks=["a"]) # a is the destination
shortest_path_results = shortest_paths.select("id", "distances")
shortest_path_results.show()

+---+---------+
| id|distances|
+---+---------+
|  g|       []|
|  b|       []|
|  e| [a -> 2]|
|  a| [a -> 0]|
|  f|       []|
|  d| [a -> 1]|
|  c|       []|
+---+---------+



### PageRank (static)

In [0]:
pageranker = g.pageRank(resetProbability=0.01, maxIter=3) # resetProbability: probability of resetting to a random vertex
pageranks = pageranker.vertices.select("id", "pagerank")
pageranks.show()

+---+--------------------+
| id|            pagerank|
+---+--------------------+
|  g|0.011647254575707157|
|  b|   2.600278993344426|
|  e| 0.30565336938435944|
|  a| 0.31141876039933447|
|  f|  0.3027995008319468|
|  d|  0.3027995008319468|
|  c|   3.165402620632279|
+---+--------------------+



In [0]:
sorted_pageranks = pageranks.orderBy("pagerank", ascending=False)
sorted_pageranks.show()

+---+--------------------+
| id|            pagerank|
+---+--------------------+
|  c|   3.165402620632279|
|  b|   2.600278993344426|
|  a| 0.31141876039933447|
|  e| 0.30565336938435944|
|  f|  0.3027995008319468|
|  d|  0.3027995008319468|
|  g|0.011647254575707157|
+---+--------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
spark = SparkSession.builder.appName("Spark").master("local[4]").getOrCreate()

In [0]:
schema = StructType([
    StructField("src", IntegerType(), True),
    StructField("dst", IntegerType(), True)])
edges = spark.read.csv("/content/net.txt",schema=schema)

In [0]:
vertices = edges.select('src').union(edges.select('dst')).withColumnRenamed('src','id').distinct()

In [0]:
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)

In [39]:
g

GraphFrame(v:[id: int], e:[src: int, dst: int])

In [40]:
g.vertices.show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  4|
|  2|
+---+



In [41]:
g.edges.show()

+---+---+
|src|dst|
+---+---+
|  1|  2|
|  2|  3|
|  2|  4|
|  4|  1|
|  3|  5|
|  2|  5|
+---+---+



In [32]:
g.inDegrees()

TypeError: ignored

In [0]:
mutual = g.find("(v1)-[e1]->(v2);(v2)-[e2]->(v3);!(v3)-[]->(v1);!(v1)-[]->(v3)")

In [46]:
mutual.show()

+---+------+---+------+---+
| v1|    e1| v2|    e2| v3|
+---+------+---+------+---+
|[1]|[1, 2]|[2]|[2, 5]|[5]|
|[1]|[1, 2]|[2]|[2, 3]|[3]|
+---+------+---+------+---+



In [48]:
mutual.select("v1.id","v3.id").show()

+---+---+
| id| id|
+---+---+
|  1|  5|
|  1|  3|
+---+---+

