#### Q 3.1 Provide a short description (not more than 3-4 lines) sketching your spark pipeline


Given the input format, pipeline starts by first parsing the text to (key, value) RDD where key is user_id and value is the list of ids for all friends. In order to make recommendtions, we shall know the number of friends shared by any two users. For that, pipeline first constructs pairs of users and then calculate mutual friends count. As a final step, this data is transformed for appropriate representation. Every step is performed using RDD API

##### Q 3.2 Recommendation Pipeline

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf
from itertools import combinations

In [3]:
conf = SparkConf()
sc = SparkContext(conf=conf)

In [4]:
# input
rdd = sc.textFile('friends.txt')

In [5]:
rdd.take(1)

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94']

In [6]:
def split_line(line):
    
    splitted = line.split()
    user = int(splitted[0])
    
    try:
        friends = list(map(int, splitted[1].strip().split(',')))
    except IndexError as e:
        friends = []
    
    return user, friends

friends_rdd = rdd.map(split_line)

In [7]:
# form pairs, each pair represents a connection (first degree) or a possible recommendation

def form_pairs(data):
    user = data[0]
    friends = data[1]
    
    pairs = []
    
    for friend in friends:
        p = sorted((user, friend))
        pairs.append((p, 0))
        
    mutual_friend_pairs = combinations(friends, 2)
    
    for mfp in mutual_friend_pairs:
#         sorting would have been needed in friends list weren't sorted already.
#         p = sorted(mfp)
        pairs.append((mfp, 1))
        
    return pairs

pairs_rdd = friends_rdd.flatMap(form_pairs)

In [8]:
# filter out first degree connections & calculate count

agg_rdd = pairs_rdd.filter(lambda pair: pair[1] == 1).reduceByKey(lambda x, y: x + y)

In [9]:
agg_rdd.cache()

PythonRDD[7] at RDD at PythonRDD.scala:53

In [10]:
agg_rdd.take(10)

[((1, 3), 2),
 ((1, 5), 5),
 ((1, 7), 1),
 ((1, 9), 1),
 ((1, 11), 1),
 ((1, 13), 2),
 ((1, 15), 1),
 ((1, 17), 1),
 ((1, 19), 2),
 ((1, 21), 1)]

In [11]:
# change data represention, row for each friend in pair
mutual_friends_rdd = agg_rdd.flatMap(lambda pair_: ((pair_[0][0], (pair_[0][1], pair_[1])), (pair_[0][1], (pair_[0][0], pair_[1]))))

In [12]:
mutual_friends_rdd.take(10)

[(1, (3, 2)),
 (3, (1, 2)),
 (1, (5, 5)),
 (5, (1, 5)),
 (1, (7, 1)),
 (7, (1, 1)),
 (1, (9, 1)),
 (9, (1, 1)),
 (1, (11, 1)),
 (11, (1, 1))]

In [13]:
# sort and get recommendations

def get_recommendations(network):
    network.sort(key=lambda x: (-x[1], x[0]))
    
    rec = [r[0] for r in network[:10]]
    
    rec_ = [str(r_) for r_ in rec]
    
    return ','.join(rec_)


recommendations = mutual_friends_rdd.groupByKey().map(lambda x: (x[0], get_recommendations(list(x[1]))))

In [14]:
recommendations.cache()

PythonRDD[14] at RDD at PythonRDD.scala:53

In [15]:
# sanity check
recommendations.filter(lambda x: x[0] == 11).take(10)

[(11, '27552,7785,27573,27574,27589,27590,27600,27617,27620,27667')]

In [20]:
# write to file
def toTSVLine(data):
    return '\t'.join(str(d) for d in data)

lines = recommendations.map(toTSVLine)

lines.coalesce(1).saveAsTextFile('recommendation.txt')

##### Q3.3 Provide recommendations for user IDs: 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993

In [16]:
recommendations.filter(lambda x: x[0] in [924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]).take(15)

[(9020, '9021,320,9016,9017,9018,9019,9022,317,9023'),
 (9022, '9019,9020,9021,317,320,9016,9017,9018,9023'),
 (924, '439,2409,6995,11860,15416,43748,45881'),
 (8942, '8939,8938,8941,8945,8946,8938,8940,8941,8943,8944'),
 (9990, '9987,9989,9993,9994,9988,35667,9991,9992,9988,9992'),
 (9992, '9987,9988,9989,9993,9994,35667,9990,9989,9990,9991'),
 (9019, '320,9018,9016,9017,9020,9021,9022,317,9023'),
 (9021, '9020,320,9016,9017,9018,9019,9022,317,9023'),
 (8941, '8938,8946,8939,8942,8943,8944,8945,8940,8942'),
 (9993, '9990,9994,9987,9988,9989,9991,9992,35667,13134,13478')]