# IS622 Week 14 Discussion
## Counting triangles with PySpark / MapReduce
### Brian Chu | Nov. 29, 2015

In [1]:
import os
import sys

# Path for Spark source folder
os.environ['SPARK_HOME']="/home/brian/workspace/cuny_msda_is622/spark-1.5.1-bin-hadoop2.6"

# Append pyspark to Python Path
sys.path.append("/home/brian/workspace/cuny_msda_is622/spark-1.5.1-bin-hadoop2.6/python/")

# Append py4j to Python Path
sys.path.append("/home/brian/workspace/cuny_msda_is622/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip")

# Launch Spark
execfile("/home/brian/workspace/cuny_msda_is622/spark-1.5.1-bin-hadoop2.6/python/pyspark/shell.py")

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.10 (default, Oct 14 2015 16:09:02)
SparkContext available as sc, HiveContext available as sqlContext.


In [2]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

### Define edges and make copy for upcoming join

In [3]:
# Figure 10-1 example
e1 = [('A','B'), ('A','C'), ('B','C'), ('B','D'), ('D','E'), ('D','F'), ('D','G'), ('E','F'), ('G','F')]
e1dist = sc.parallelize(e1)

In [4]:
# Convert to PySpark RDDs
e1df = sqlc.createDataFrame(e1dist)
e1rdd = e1df.rdd
e2rdd = e1rdd

### Define node key to join edge pairs
Some theory and source code borrowed or modified from:  
https://class.coursera.org/datasci-001/lecture/85  
http://dataorigami.net/blogs/napkin-folding/18433407-joins-in-mapreduce-pt-2-generalizing-joins  

In [5]:
e1key = e1rdd.keyBy(lambda r: r[0])
e2key = e2rdd.keyBy(lambda r: r[1])

In [6]:
# Examine values
e1key.collect()

[(u'A', Row(_1=u'A', _2=u'B')),
 (u'A', Row(_1=u'A', _2=u'C')),
 (u'B', Row(_1=u'B', _2=u'C')),
 (u'B', Row(_1=u'B', _2=u'D')),
 (u'D', Row(_1=u'D', _2=u'E')),
 (u'D', Row(_1=u'D', _2=u'F')),
 (u'D', Row(_1=u'D', _2=u'G')),
 (u'E', Row(_1=u'E', _2=u'F')),
 (u'G', Row(_1=u'G', _2=u'F'))]

In [7]:
# Examine values
e2key.collect()

[(u'B', Row(_1=u'A', _2=u'B')),
 (u'C', Row(_1=u'A', _2=u'C')),
 (u'C', Row(_1=u'B', _2=u'C')),
 (u'D', Row(_1=u'B', _2=u'D')),
 (u'E', Row(_1=u'D', _2=u'E')),
 (u'F', Row(_1=u'D', _2=u'F')),
 (u'G', Row(_1=u'D', _2=u'G')),
 (u'F', Row(_1=u'E', _2=u'F')),
 (u'F', Row(_1=u'G', _2=u'F'))]

### Map tuples with addition x/y value to determine which schema the edge is from

In [8]:
e1map = e1key.map(lambda (k,v): (k, ('x',v)))
e2map = e2key.map(lambda (k,v): (k, ('y',v)))

In [9]:
e1map.collect()

[(u'A', ('x', Row(_1=u'A', _2=u'B'))),
 (u'A', ('x', Row(_1=u'A', _2=u'C'))),
 (u'B', ('x', Row(_1=u'B', _2=u'C'))),
 (u'B', ('x', Row(_1=u'B', _2=u'D'))),
 (u'D', ('x', Row(_1=u'D', _2=u'E'))),
 (u'D', ('x', Row(_1=u'D', _2=u'F'))),
 (u'D', ('x', Row(_1=u'D', _2=u'G'))),
 (u'E', ('x', Row(_1=u'E', _2=u'F'))),
 (u'G', ('x', Row(_1=u'G', _2=u'F')))]

In [10]:
e2map.collect()

[(u'B', ('y', Row(_1=u'A', _2=u'B'))),
 (u'C', ('y', Row(_1=u'A', _2=u'C'))),
 (u'C', ('y', Row(_1=u'B', _2=u'C'))),
 (u'D', ('y', Row(_1=u'B', _2=u'D'))),
 (u'E', ('y', Row(_1=u'D', _2=u'E'))),
 (u'F', ('y', Row(_1=u'D', _2=u'F'))),
 (u'G', ('y', Row(_1=u'D', _2=u'G'))),
 (u'F', ('y', Row(_1=u'E', _2=u'F'))),
 (u'F', ('y', Row(_1=u'G', _2=u'F')))]

### Combine all tuples and group by the node key

In [11]:
unioned = e1map.union(e2map)
unioned.collect()

[(u'A', ('x', Row(_1=u'A', _2=u'B'))),
 (u'A', ('x', Row(_1=u'A', _2=u'C'))),
 (u'B', ('x', Row(_1=u'B', _2=u'C'))),
 (u'B', ('x', Row(_1=u'B', _2=u'D'))),
 (u'D', ('x', Row(_1=u'D', _2=u'E'))),
 (u'D', ('x', Row(_1=u'D', _2=u'F'))),
 (u'D', ('x', Row(_1=u'D', _2=u'G'))),
 (u'E', ('x', Row(_1=u'E', _2=u'F'))),
 (u'G', ('x', Row(_1=u'G', _2=u'F'))),
 (u'B', ('y', Row(_1=u'A', _2=u'B'))),
 (u'C', ('y', Row(_1=u'A', _2=u'C'))),
 (u'C', ('y', Row(_1=u'B', _2=u'C'))),
 (u'D', ('y', Row(_1=u'B', _2=u'D'))),
 (u'E', ('y', Row(_1=u'D', _2=u'E'))),
 (u'F', ('y', Row(_1=u'D', _2=u'F'))),
 (u'G', ('y', Row(_1=u'D', _2=u'G'))),
 (u'F', ('y', Row(_1=u'E', _2=u'F'))),
 (u'F', ('y', Row(_1=u'G', _2=u'F')))]

In [12]:
grouped = unioned.groupByKey()
grouped.collect()

[(u'A', <pyspark.resultiterable.ResultIterable at 0x7fa0fac5e590>),
 (u'C', <pyspark.resultiterable.ResultIterable at 0x7fa0fabf0290>),
 (u'E', <pyspark.resultiterable.ResultIterable at 0x7fa0fabf0690>),
 (u'G', <pyspark.resultiterable.ResultIterable at 0x7fa0fabf0710>),
 (u'B', <pyspark.resultiterable.ResultIterable at 0x7fa0fabf0750>),
 (u'D', <pyspark.resultiterable.ResultIterable at 0x7fa0fabf0790>),
 (u'F', <pyspark.resultiterable.ResultIterable at 0x7fa0fabf07d0>)]

### Join function for node keys with common edge values
#### This produces the relation e1.B = e2.B

In [13]:
def joined(seq):
    x_keys = []
    y_keys = []
    for (n, v) in seq:
        if n == 'x':
            x_keys.append(v)
        elif n == 'y':
            y_keys.append(v)
    return [(y, x) for x in x_keys for y in y_keys]

In [14]:
result = grouped.flatMapValues(lambda x : joined(x))
sorted(result.collect())

[(u'B', (Row(_1=u'A', _2=u'B'), Row(_1=u'B', _2=u'C'))),
 (u'B', (Row(_1=u'A', _2=u'B'), Row(_1=u'B', _2=u'D'))),
 (u'D', (Row(_1=u'B', _2=u'D'), Row(_1=u'D', _2=u'E'))),
 (u'D', (Row(_1=u'B', _2=u'D'), Row(_1=u'D', _2=u'F'))),
 (u'D', (Row(_1=u'B', _2=u'D'), Row(_1=u'D', _2=u'G'))),
 (u'E', (Row(_1=u'D', _2=u'E'), Row(_1=u'E', _2=u'F'))),
 (u'G', (Row(_1=u'D', _2=u'G'), Row(_1=u'G', _2=u'F')))]

### Filter (reduce) out pairs that match remaining edge
#### This produces the relations e1.B = e3.A and e2.B = e3.B

In [15]:
e3 = e1
matching_edges = result.filter(lambda x: (x[1][0][0], x[1][1][1]) in e3).collect()
sorted(matching_edges)

[(u'B', (Row(_1=u'A', _2=u'B'), Row(_1=u'B', _2=u'C'))),
 (u'E', (Row(_1=u'D', _2=u'E'), Row(_1=u'E', _2=u'F'))),
 (u'G', (Row(_1=u'D', _2=u'G'), Row(_1=u'G', _2=u'F')))]

### Print resulting triangles

In [16]:
tris = sorted([(x[1][0][0], x[1][0][1], x[1][1][1]) for x in matching_edges])
print [str('-'.join(x)) for x in tris]

['A-B-C', 'D-E-F', 'D-G-F']


## Put all of the above into one function and to test other graphs and evaluate runtime

In [17]:
# Input list of edges (as tuples)
def triangles(edges1):
    
    edges1dist = sc.parallelize(edges1)
    edges1df = sqlc.createDataFrame(edges1dist)
    edges1rdd = edges1df.rdd
    edges2rdd = edges1rdd

    edges1key = edges1rdd.keyBy(lambda r: r[0])
    edges2key = edges2rdd.keyBy(lambda r: r[1])
    
    edges1map = edges1key.map(lambda (k,v): (k, ('x',v)))
    edges2map = edges2key.map(lambda (k,v): (k, ('y',v)))

    unioned = edges1map.union(edges2map)
    grouped = unioned.groupByKey()
    
    result = grouped.flatMapValues(lambda x : joined(x))
    edges3 = edges1
    matching_edges = result.filter(lambda x: (x[1][0][0], x[1][1][1]) in edges3).collect()
    
    tris = sorted([(x[1][0][0], x[1][0][1], x[1][1][1]) for x in matching_edges])
    print [str('-'.join(t)) for t in tris]


### Figure 10-1

In [18]:
fig10_1 = [('A','B'), ('A','C'), ('B','C'), ('B','D'), ('D','E'), ('D','F'), ('D','G'), ('E','F'), ('G','F')]
triangles(fig10_1)

['A-B-C', 'D-E-F', 'D-G-F']


In [19]:
%%capture
%timeit -n 5 -o triangles(fig10_1)

<TimeitResult : 5 loops, best of 3: 726 ms per loop>

### Figure 10-9

In [20]:
fig10_9 = [('A','B'), ('A','C'), ('B','C'), ('B','H'), ('C','D'), ('D','E'), ('D','F'),
           ('E','F'), ('G','H'), ('G','I'), ('H','I')]
triangles(fig10_9)

['A-B-C', 'D-E-F', 'G-H-I']


In [21]:
%%capture
%timeit -n 5 -o triangles(fig10_9)

<TimeitResult : 5 loops, best of 3: 664 ms per loop>

### Figure 10-2

In [22]:
fig10_2 = [('U1','T1'), ('U1','T2'), ('U1','T3'), ('U1','W1'), ('U1','W2'),
           ('U2','T2'), ('U2','T3'), ('U2','T4'), ('U2','W2'), ('U2','W3'),
           ('T1','W1'), ('T2','W1'), ('T2','W2'), ('T2','W3'),
           ('T3','W2'), ('T4','W2'), ('T4','W3')]

triangles(fig10_2)

['U1-T1-W1', 'U1-T2-W1', 'U1-T2-W2', 'U1-T3-W2', 'U2-T2-W2', 'U2-T2-W3', 'U2-T3-W2', 'U2-T4-W2', 'U2-T4-W3']


In [23]:
%%capture
%timeit -n 5 -o triangles(fig10_2)

<TimeitResult : 5 loops, best of 3: 710 ms per loop>

### Conclusion

This feels like a fairly clunky and inefficient function with excessive maps and joins. Sure enough, my R function did the triangle counts in roughly 2-3 ms, which is significantly faster than the PySpark function above. However, with larger graphs, PySpark distributed clusters may be more favorable. 