In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
from collections import defaultdict

import operator
import re

In [6]:
conf = SparkConf()
sc = SparkContext(conf=conf)

In [7]:
# input_file = "./data/soc-LiveJournal1Adj.txt" # argv[1]
input_file = "./data/testcase_input.txt" # argv[1]

In [8]:
lines = sc.textFile(input_file) # argv[1]

In [9]:
# Solution

In [15]:
def get_user_friend_pairs(line):
    toks = re.split(r'\t+', line)
    user = int(toks[0])
    friends_str = toks[1]

    if not friends_str:
        return []

    friends = [int(x) for x in friends_str.split(",")]

    if not friends_str:
        return []

    return [(user, friend) for friend in friends]


# This contains every user <-> friend pair.
user_friends = lines.flatMap(get_user_friend_pairs)

# Every user needs the friends of each of the user's friends. In other words, since every friend also has this user in their list of friends (bidirectional), each of this user, would want to see all the friends of this user.
# This contains, each (friend, list of user's friends)
def get_second_order_friends(line):
    toks = re.split(r'\t+', line)
    user = int(toks[0])
    friends_str = toks[1]

    if not friends_str:
        return []

    friends = [int(x) for x in friends_str.split(",")]

    result_list = []
    for friend in friends:
        result_list.append((friend, friends))

    return result_list


second_order_friends = lines.flatMap(get_second_order_friends)

# Now, we can append all second order friends into a single list. Then we can count the number of times each friend occurs (this is the number of times this friend occurs in second order friends; like number of documents a unique word is present in).
# However, we need to filter out the user, and user's first-order friends, because we dont want to recommend those who are already friends. Spark provides, nice reduce-like operation "cogroup" which can help us do filter and count. Further, once the candidate mutual friends for recommendation has been created, we would sort it based on number of mutual friends (secondary key being the user id).
cogrouped_mutual_friends = second_order_friends.cogroup(user_friends)

def filter_count_sort(x):
    user, cogrouped_value = x
    second_order_friends, own_friends = cogrouped_value
    second_ordered_friends_flattened = [friend for cur_second_order_friends in second_order_friends for friend in cur_second_order_friends]

    # filter and count
    filtered_mutual_friends = defaultdict(int)
    for f in second_ordered_friends_flattened:
        if f != user and f not in own_friends:
            filtered_mutual_friends[f] += 1

    # sort
    secondary_sorted = sorted(filtered_mutual_friends.items(), key=operator.itemgetter(0))
    sorted_mutual_friends = sorted(secondary_sorted, key=operator.itemgetter(1), reverse=True)

    num_recommendations = 10
    return (user, sorted_mutual_friends[:num_recommendations])


mutual_friends_ordered = cogrouped_mutual_friends.map(filter_count_sort)

# For recommendations, we do not need the counts.
non_zero_friends_user_recommendations = mutual_friends_ordered.map(lambda k_v: (k_v[0], [friend_count[0] for friend_count in k_v[1]]))

# We can process the users without friends separately.
def process_zero_friends_users(line):
    toks = re.split(r'\t+', line)
    user = int(toks[0])
    friends_str = toks[1]

    if friends_str:
        return []

    return [(user, [])]


# This contains every user <-> friend pair.
zero_friends_user_recommendations = lines.flatMap(process_zero_friends_users)

# We can take union to a single RDD of recommendations.
recommendations = non_zero_friends_user_recommendations.union(zero_friends_user_recommendations)

# For each of seeing the output, sorting the recommendations by user id. This may not be required, and it might save considerable amount of time if skipped.
recommendations = recommendations.sortByKey()

# Formatting and saving the output.
formatted_recommendations = recommendations.map(lambda x: "".join([str(x[0]), "\t", ",".join(map(str, x[1]))]))
formatted_recommendations.coalesce(1).saveAsTextFile("testinput_out")

In [12]:
formatted_recommendations.collect()

['0\t4,6,7,8',
 '1\t5,6,7',
 '2\t4,5',
 '3\t5,6,7',
 '4\t0,2,9,8',
 '5\t1,3,2,9',
 '6\t0,1,3,8',
 '7\t0,1,3,8',
 '8\t0,4,6,7',
 '9\t4,5']

In [14]:
formatted_recommendations.saveAsTextFile("testinput_out")

In [10]:
mutual_friends_ordered.collect()

[(0, [(4, 3), (6, 1), (7, 1), (8, 1)]),
 (4, [(0, 3), (2, 2), (9, 2), (8, 1)]),
 (8, [(0, 1), (4, 1), (6, 1), (7, 1)]),
 (1, [(5, 2), (6, 1), (7, 1)]),
 (5, [(1, 2), (3, 2), (2, 1), (9, 1)]),
 (9, [(4, 2), (5, 1)]),
 (2, [(4, 2), (5, 1)]),
 (6, [(0, 1), (1, 1), (3, 1), (8, 1)]),
 (3, [(5, 2), (6, 1), (7, 1)]),
 (7, [(0, 1), (1, 1), (3, 1), (8, 1)])]

In [11]:
sorted_recommendations.collect()

[(0, [4, 6, 7, 8]),
 (1, [5, 6, 7]),
 (2, [4, 5]),
 (3, [5, 6, 7]),
 (4, [0, 2, 9, 8]),
 (5, [1, 3, 2, 9]),
 (6, [0, 1, 3, 8]),
 (7, [0, 1, 3, 8]),
 (8, [0, 4, 6, 7]),
 (9, [4, 5])]

In [17]:
mutual_friends_ordered.count()

10

In [19]:
mutual_friends_ordered.sortByKey().collect()

[(0, [(4, 3), (6, 1), (7, 1), (8, 1)]),
 (1, [(5, 2), (6, 1), (7, 1)]),
 (2, [(4, 2), (5, 1)]),
 (3, [(5, 2), (6, 1), (7, 1)]),
 (4, [(0, 3), (2, 2), (9, 2), (8, 1)]),
 (5, [(1, 2), (3, 2), (2, 1), (9, 1)]),
 (6, [(0, 1), (1, 1), (3, 1), (8, 1)]),
 (7, [(0, 1), (1, 1), (3, 1), (8, 1)]),
 (8, [(0, 1), (4, 1), (6, 1), (7, 1)]),
 (9, [(4, 2), (5, 1)])]

In [12]:
cogrouped_mutual_friends.count()

20

In [15]:
cogrouped_mutual_friends.collect()

[('4',
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886d30>,
   <pyspark.resultiterable.ResultIterable at 0x20ca5886dd8>)),
 (0,
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886da0>,
   <pyspark.resultiterable.ResultIterable at 0x20ca5886e48>)),
 (4,
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886e80>,
   <pyspark.resultiterable.ResultIterable at 0x20ca5886eb8>)),
 (8,
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886e10>,
   <pyspark.resultiterable.ResultIterable at 0x20ca5886f28>)),
 ('3',
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886ef0>,
   <pyspark.resultiterable.ResultIterable at 0x20ca58862b0>)),
 ('6',
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886d68>,
   <pyspark.resultiterable.ResultIterable at 0x20ca5893470>)),
 ('7',
  (<pyspark.resultiterable.ResultIterable at 0x20ca5893048>,
   <pyspark.resultiterable.ResultIterable at 0x20ca5893080>)),
 (1,
  (<pyspark.resultiterable.ResultIterable at 0x20ca5886fd0>,
   <pyspark.resultitera

In [13]:
user_friends.count()

40

In [14]:
second_order_friends.count()

40

In [None]:
# Solution that would work even for undirected friends, i guess.

In [20]:
def get_user_friend_pairs(line):
    toks = re.split(r'\t+', line)
    user = int(toks[0])
    friends_str = toks[1]
    friends = friends_str.split(",")

    #if not friends_str:
    #    return [] # TODO?

    return [(user, int(friend)) for friend in friends]

user_friends = lines.flatMap(get_user_friend_pairs)

In [22]:
user_friends_inv = user_friends.map(lambda k_v: (k_v[1], k_v[0]))

In [23]:
cogrouped_friends = user_friends.cogroup(user_friends_inv)

In [44]:
def mapper_2(x):
    key, cogrouped_value = x
    return_values = []
    mutual_friends, users = cogrouped_value
    for user in users:
        return_values.append((user, list(mutual_friends)))
    
    return return_values

mutual_friends_unfiltered = cogrouped_friends.flatMap(mapper_2)

In [45]:
mutual_friends_agregated_unfiltered = mutual_friends_unfiltered.reduceByKey(lambda a, b: a + b)

In [46]:
mutual_friends_agregated_unfiltered_cogroup = mutual_friends_agregated_unfiltered.cogroup(user_friends)

In [64]:
def filter_count_sort(x):
    user, cogrouped_value = x
    mutual_friends, own_friends = cogrouped_value
    mutual_friends = list(mutual_friends)[0]

    # filter and count
    filtered_mutual_friends = defaultdict(int)
    for f in mutual_friends:
        if f != user and f not in own_friends:
            filtered_mutual_friends[f] += 1

    # sort
    secondary_sorted = sorted(filtered_mutual_friends.items(), key=operator.itemgetter(0))
    sorted_mutual_friends = sorted(secondary_sorted, key=operator.itemgetter(1), reverse=True)

    return (user, sorted_mutual_friends)

mutual_friends_ordered = mutual_friends_agregated_unfiltered_cogroup.flatMap(filter_count_sort)

In [24]:
user_friends.take(10)

[(0, 1),
 (0, 2),
 (0, 3),
 (0, 5),
 (0, 9),
 (1, 0),
 (1, 2),
 (1, 3),
 (1, 4),
 (1, 9)]

In [25]:
user_friends_inv.take(10)

[(1, 0),
 (2, 0),
 (3, 0),
 (5, 0),
 (9, 0),
 (0, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (9, 1)]

In [28]:
cogrouped_friends.take(10)

[(0,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597710>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e597d30>)),
 (4,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597c50>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e597748>)),
 (8,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597fd0>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e597780>)),
 (1,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597208>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e597668>)),
 (5,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597d68>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e5975c0>)),
 (9,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e5978d0>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e597a20>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597630>,
   <pyspark.resultiterable.ResultIterable at 0x1c47e597b38>)),
 (6,
  (<pyspark.resultiterable.ResultIterable at 0x1c47e597cf8>,
   <pyspark.resultiterable.Resu

In [31]:
cogrouped_friends.map(lambda x : (x[0], [list(x[1][0]), list(x[1][1])])).collect()

[(0, [[1, 2, 3, 5, 9], [1, 2, 3, 5, 9]]),
 (4, [[1, 3, 5, 6, 7], [1, 3, 5, 6, 7]]),
 (8, [[5], [5]]),
 (1, [[0, 2, 3, 4, 9], [0, 2, 3, 4, 9]]),
 (5, [[0, 4, 6, 7, 8], [0, 4, 6, 7, 8]]),
 (9, [[0, 1, 2, 3], [0, 1, 2, 3]]),
 (2, [[0, 1, 3, 9], [0, 1, 3, 9]]),
 (6, [[4, 5, 7], [4, 5, 7]]),
 (3, [[0, 1, 2, 4, 9], [0, 1, 2, 4, 9]]),
 (7, [[4, 5, 6], [4, 5, 6]])]

In [49]:
mutual_friends_agregated_unfiltered.count()

10

In [50]:
mutual_friends_agregated_unfiltered_cogroup.count()

10

In [52]:
mutual_friends_agregated_unfiltered_cogroup.map(lambda x : (x[0], [list(x[1][0]), list(x[1][1])])).collect()

[(0,
  [[[0, 2, 3, 4, 9, 0, 4, 6, 7, 8, 0, 1, 2, 3, 0, 1, 3, 9, 0, 1, 2, 4, 9]],
   [1, 2, 3, 5, 9]]),
 (6, [[[1, 3, 5, 6, 7, 0, 4, 6, 7, 8, 4, 5, 6]], [4, 5, 7]]),
 (1,
  [[[1, 2, 3, 5, 9, 1, 3, 5, 6, 7, 0, 1, 2, 3, 0, 1, 3, 9, 0, 1, 2, 4, 9]],
   [0, 2, 3, 4, 9]]),
 (7, [[[1, 3, 5, 6, 7, 0, 4, 6, 7, 8, 4, 5, 7]], [4, 5, 6]]),
 (8, [[[0, 4, 6, 7, 8]], [5]]),
 (2,
  [[[1, 2, 3, 5, 9, 0, 2, 3, 4, 9, 0, 1, 2, 3, 0, 1, 2, 4, 9]], [0, 1, 3, 9]]),
 (9,
  [[[1, 2, 3, 5, 9, 0, 2, 3, 4, 9, 0, 1, 3, 9, 0, 1, 2, 4, 9]], [0, 1, 2, 3]]),
 (3,
  [[[1, 2, 3, 5, 9, 1, 3, 5, 6, 7, 0, 2, 3, 4, 9, 0, 1, 2, 3, 0, 1, 3, 9]],
   [0, 1, 2, 4, 9]]),
 (4,
  [[[0, 2, 3, 4, 9, 0, 4, 6, 7, 8, 4, 5, 7, 0, 1, 2, 4, 9, 4, 5, 6]],
   [1, 3, 5, 6, 7]]),
 (5, [[[1, 2, 3, 5, 9, 1, 3, 5, 6, 7, 5, 4, 5, 7, 4, 5, 6]], [0, 4, 6, 7, 8]])]

In [65]:
mutual_friends_ordered.count()

20

In [69]:
mutual_friends_ordered.collect()

[0,
 [(4, 3), (6, 1), (7, 1), (8, 1)],
 6,
 [(0, 1), (1, 1), (3, 1), (8, 1)],
 1,
 [(5, 2), (6, 1), (7, 1)],
 7,
 [(0, 1), (1, 1), (3, 1), (8, 1)],
 8,
 [(0, 1), (4, 1), (6, 1), (7, 1)],
 2,
 [(4, 2), (5, 1)],
 9,
 [(4, 2), (5, 1)],
 3,
 [(5, 2), (6, 1), (7, 1)],
 4,
 [(0, 3), (2, 2), (9, 2), (8, 1)],
 5,
 [(1, 2), (3, 2), (2, 1), (9, 1)]]

In [None]:
t1 = user_friends.map(lambda (user, friend): ((user, friend), [user]))
t2 = t1.reduceByKey(lambda a, b: a + b)
t3 = t2.

#user_friends.map(lambda (user, friend): ((user, friend), [user])).reduceByKey(lambda a, b: a + b)

In [43]:
# Solution: 2
def mapper_1(line):
    toks = re.split(r'\t+', line)
    user = int(toks[0])
    friends_str = toks[1]

    if not friends_str:
        return [(user, [])]

    return (user, [int(x) for x in friends_str.split(",")])

user_friends = lines.flatMap(mapper_1)
all_mutual_friends = user_friends.cartesan(user_friends)

AttributeError: 'PipelinedRDD' object has no attribute 'cartesan'

In [7]:
# misc
line = lines.take(1)

In [8]:
line

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94']

In [10]:
re.split(r'\t+', line[0])

['0',
 '1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94']

In [13]:
line = '0\t'

In [14]:
re.split(r'\t+', line)

['0', '']

In [38]:
# normal solution
from collections import defaultdict
import operator

friends_dict = {}

input_file = "./data/testcase_input.txt"
with open(input_file) as fp:
    for line in fp.readlines():
        line = line.strip()
        u, friends_str = re.split(r'\t+', line)
        friends = friends_str.split(",")
        friends_dict[int(u)] = [int(x) for x in friends]

mutual_friends_dict = {}

for user, friends in friends_dict.items():
    fq = defaultdict(int)
    grouped_mutual_friends = []
    for f in friends:
        grouped_mutual_friends += friends_dict[f]
        
    for f in grouped_mutual_friends:
        if f != user and f not in friends_dict[user]:
            fq[f] += 1

    t1 = sorted(fq.items(), key=operator.itemgetter(0))
    mutual_friends_dict[user] = sorted(t1, key=operator.itemgetter(1), reverse=True)

sorted(mutual_friends_dict.items(), key=operator.itemgetter(0))

[(0, [(4, 3), (6, 1), (7, 1), (8, 1)]),
 (1, [(5, 2), (6, 1), (7, 1)]),
 (2, [(4, 2), (5, 1)]),
 (3, [(5, 2), (6, 1), (7, 1)]),
 (4, [(0, 3), (2, 2), (9, 2), (8, 1)]),
 (5, [(1, 2), (3, 2), (2, 1), (9, 1)]),
 (6, [(0, 1), (1, 1), (3, 1), (8, 1)]),
 (7, [(0, 1), (1, 1), (3, 1), (8, 1)]),
 (8, [(0, 1), (4, 1), (6, 1), (7, 1)]),
 (9, [(4, 2), (5, 1)])]

In [18]:
friends_dict

{'0': ['1,2,3,5,9'],
 '1': ['0,2,3,4,9'],
 '2': ['0,1,3,9'],
 '3': ['0,1,2,4,9'],
 '4': ['1,3,5,6,7'],
 '5': ['0,4,6,7,8'],
 '6': ['4,5,7'],
 '7': ['4,5,6'],
 '8': ['5'],
 '9': ['0,1,2,3']}

In [35]:
t1

[(4, 3), (6, 1), (7, 1), (8, 1)]

In [36]:
dict(t1)

{4: 3, 6: 1, 7: 1, 8: 1}

In [37]:
fq

defaultdict(int, {4: 3, 6: 1, 7: 1, 8: 1})

In [53]:
# testing filter_count_sort
x = (0,
  [[[0, 2, 3, 4, 9, 0, 4, 6, 7, 8, 0, 1, 2, 3, 0, 1, 3, 9, 0, 1, 2, 4, 9]],
   [1, 2, 3, 5, 9]])

In [60]:
user, cogrouped_value = x
mutual_friends, own_friends = cogrouped_value
mutual_friends = mutual_friends[0]

# filter and count
filtered_mutual_friends = defaultdict(int)
for f in mutual_friends:
    if f != user and f not in own_friends:
        filtered_mutual_friends[f] += 1

# sort
secondary_sorted = sorted(filtered_mutual_friends.items(), key=operator.itemgetter(0))
sorted_mutual_friends = sorted(secondary_sorted, key=operator.itemgetter(1), reverse=True)

In [61]:
sorted_mutual_friends

[(4, 3), (6, 1), (7, 1), (8, 1)]