# Common Friends MapReduce PySpark

In [1]:
import pyspark
from pyspark.sql import *
from pyspark import SparkContext, SparkConf

In [2]:
sc = SparkSession.builder.getOrCreate().sparkContext

Below is the data that we parallelize into a resiliant distributed dataset. Each letter represents a person. The letter on the left of each tuple is the person the tuple is assigned to, the list on the right is their list of friends. The goal of the MapReduce is to find the the common friends between each pair of individuals.

In [3]:
rdd = sc.parallelize([ 
  ("A", ["B", "C", "D"]),
  ("B", ["A", "C", "D", "E"]),
  ("C", ["A", "B", "D", "E"]),
  ("D", ["A", "B", "C", "E"]),
  ("E", ["B", "C", "D"])
])

These are the custom map and reduce functions. The map takes in one of the tuples in the list aboe and returns a list of tuples where the person is is paired with each of their friends as the key and their list of friends is emitted as the value. For example:

The tuple <br>
B -> A C D E

is mapped the the following key value pairs <br>
A B -> A C D E <br>
B C -> A C D E <br>
B D -> A C D E <br>
B E -> A C D E

*_notice that the key is put in alphabetical order, that way it can be sent to the same reducer task as another pair with the same people in its key._

The reduce function is given a tuple with a key that holds two people and a list of two lists which holds each of their friends. The recucer returns the individuals with the intersect of their two friends lists (which is their common friends). For example:

The shuffled input into a reducer might be <br>
A B -> [B C D] [A C D E]

which is reduced to <br>
A B -> C D

In [4]:
def friends_map(x):
  seq = []
  for i in x[1] :
    if i < x[0]:
      seq.append( ((i, x[0]) , x[1]) )
    else:
      seq.append( ((x[0], i) , x[1]) )
  return seq


def friends_reduce(x, y):
    return [a for a in x if a in y]


Finally we use the spark api to pass our Map and Reduce functions into the flatmap and reduceByKey functions, then sort and collect the output.

In [5]:
similar_friends = rdd.flatMap(friends_map) \
             .reduceByKey(friends_reduce) \
             .sortByKey()
             
similar_friends.collect()

[(('A', 'B'), ['C', 'D']),
 (('A', 'C'), ['B', 'D']),
 (('A', 'D'), ['B', 'C']),
 (('B', 'C'), ['A', 'D', 'E']),
 (('B', 'D'), ['A', 'C', 'E']),
 (('B', 'E'), ['C', 'D']),
 (('C', 'D'), ['A', 'B', 'E']),
 (('C', 'E'), ['B', 'D']),
 (('D', 'E'), ['B', 'C'])]