In [None]:
#Friend Recommendation using Mutual Friends
# I used Spark based MapReduce algorithm to generate friend recommendation for users. The recommendations are based on the number of mutual friends.

In [None]:
# First we have to read in the dataset which contains: (UserID,  List of friends’ UserIDs separated by commas)
# Note: The friendship relationship is undirected i.e. if X is a friend of Y, then Y is also a friend of X.
# Define the path to the text file
text_file_path = "dbfs:/FileStore/Mutual_Friends.txt"

# Read the text file into an RDD
original_friends_data_rdd = sc.textFile(text_file_path)

In [None]:
friends_data_rdd = original_friends_data_rdd
print(friends_data_rdd.take(2))

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94', '1\t0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592']


In [None]:
# Process each line to extract user ID and friends
friends_data_rdd = friends_data_rdd.map(lambda line: line.split("\t")).map(lambda pair: (pair[0], pair[1].split(",")))
print(friends_data_rdd.take(5))

[('0', ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94']), ('1', ['0', '5', '20', '135', '2409', '8715', '8932', '10623', '12347', '12846', '13840', '13845', '14005', '20075', '21556', '22939', '23520', '28193', '29724', '29791', '29826', '30691', '31232', '31435', '32317', '32489', '34394', '35589', '35605', '35606', '35613', '35633', '35648', '35678', '38737', '43447', '44846', '44887', '49226', '49985', '623', '629', '4999', '6156', '13912', '14248', '15190', '17636', '19217', '20074', 

In [None]:
# ---Our Friend Recommendation Algorithm---
# To provide friend recommendations to a given user we will:
# 1. get the number of mutual friends between every pair of users
# 2. we will rank those number of mutual friends in descending order
# 3. We will recommend the Users that have the largest number of mutual friends with our User but is not our User's friend

In [None]:
# data = [
#     (0, [1, 3, 4, 6]),
#     (1, [0, 2, 4]),
#     (2, [1, 3, 5]),
#     (3, [0, 2]),
#     (4, [0, 1]),
#     (5, [2]),
#     (6, [0])
# ]

# rdd = sc.parallelize(data)

In [None]:
rdd = friends_data_rdd

In [None]:
from itertools import permutations

# generate pairs of tuples for the user and his friends
users_friends_rdd = rdd.flatMap(lambda x: [((x[0], friend), float("-inf")) for friend in x[1]])
#print(users_friends_rdd.collect())

# Generate pairs of permutations between friends of user because that means they have user as a mutual friend
common_friends_rdd = rdd.flatMap(lambda x: [(pair, 1) for pair in permutations(x[1], 2)])
#print(common_friends_rdd.collect())

combined_pairs_rdd = users_friends_rdd.union(common_friends_rdd)
print(combined_pairs_rdd.collect())

[(('0', '1'), -inf), (('0', '2'), -inf), (('0', '3'), -inf), (('0', '4'), -inf), (('0', '5'), -inf), (('0', '6'), -inf), (('0', '7'), -inf), (('0', '8'), -inf), (('0', '9'), -inf), (('0', '10'), -inf), (('0', '11'), -inf), (('0', '12'), -inf), (('0', '13'), -inf), (('0', '14'), -inf), (('0', '15'), -inf), (('0', '16'), -inf), (('0', '17'), -inf), (('0', '18'), -inf), (('0', '19'), -inf), (('0', '20'), -inf), (('0', '21'), -inf), (('0', '22'), -inf), (('0', '23'), -inf), (('0', '24'), -inf), (('0', '25'), -inf), (('0', '26'), -inf), (('0', '27'), -inf), (('0', '28'), -inf), (('0', '29'), -inf), (('0', '30'), -inf), (('0', '31'), -inf), (('0', '32'), -inf), (('0', '33'), -inf), (('0', '34'), -inf), (('0', '35'), -inf), (('0', '36'), -inf), (('0', '37'), -inf), (('0', '38'), -inf), (('0', '39'), -inf), (('0', '40'), -inf), (('0', '41'), -inf), (('0', '42'), -inf), (('0', '43'), -inf), (('0', '44'), -inf), (('0', '45'), -inf), (('0', '46'), -inf), (('0', '47'), -inf), (('0', '48'), -inf), 

In [None]:
combined_pairs_count_rdd = combined_pairs_rdd.reduceByKey(lambda x, y: x + y)
print(combined_pairs_count_rdd.collect())

[(('26', '18071'), -inf), (('28', '89'), -inf), (('31', '29688'), -inf), (('43', '23768'), -inf), (('2409', '4583'), -inf), (('23520', '32968'), -inf), (('28193', '26110'), -inf), (('28193', '2697'), -inf), (('32317', '35685'), -inf), (('44887', '1'), -inf), (('49226', '25178'), -inf), (('12636', '40062'), -inf), (('41457', '34161'), -inf), (('41457', '49992'), -inf), (('8795', '6'), -inf), (('13793', '31435'), -inf), (('13793', '34330'), -inf), (('11142', '11'), -inf), (('11142', '6468'), -inf), (('29379', '25205'), -inf), (('32064', '27653'), -inf), (('89', '28'), -inf), (('42704', '20599'), -inf), (('2411', '18'), -inf), (('2644', '33109'), -inf), (('2659', '2682'), -inf), (('2659', '8565'), -inf), (('3734', '23550'), -inf), (('3734', '37006'), -inf), (('7463', '44257'), -inf), (('9892', '6941'), -inf), (('10240', '22101'), -inf), (('13076', '31756'), -inf), (('18163', '10119'), -inf), (('19388', '27578'), -inf), (('19388', '27617'), -inf), (('23512', '23593'), -inf), (('25195', '16

In [None]:
filtered_pairs_rdd = combined_pairs_count_rdd.filter(lambda x: x[1] > 0)
print(filtered_pairs_rdd.collect())

[(('7', '43'), 1), (('37', '89'), 1), (('20070', '13661'), 1), (('7184', '1648'), 1), (('4124', '1913'), 1), (('11387', '6893'), 1), (('34304', '33554'), 2), (('34762', '3307'), 1), (('1782', '1359'), 2), (('11399', '34466'), 15), (('30782', '9659'), 1), (('34297', '34154'), 1), (('19920', '29719'), 2), (('31223', '27479'), 1), (('22611', '27551'), 1), (('16918', '9194'), 1), (('43432', '32631'), 1), (('7812', '41433'), 1), (('27550', '9715'), 2), (('16865', '16894'), 3), (('16865', '41438'), 1), (('23586', '34485'), 2), (('82', '27553'), 1), (('35630', '34539'), 1), (('41418', '45868'), 1), (('17106', '27555'), 1), (('1156', '2656'), 1), (('5158', '8492'), 1), (('2', '33079'), 1), (('2273', '6'), 1), (('19539', '9841'), 1), (('15174', '34413'), 3), (('1741', '33767'), 1), (('28348', '33307'), 1), (('12272', '17576'), 2), (('14457', '45588'), 1), (('17910', '40870'), 1), (('33325', '7831'), 2), (('23830', '25228'), 1), (('24101', '17890'), 1), (('19920', '106'), 1), (('1528', '2764'), 

In [None]:
# reorder to (UserID, (UserFriend, Mutual_Friend_Count))
pairs_rdd = filtered_pairs_rdd.map(lambda x: (x[0][0], (x[0][1], x[1])))
print(pairs_rdd.collect())

[('13', ('52', 1)), ('13', ('92', 1)), ('16', ('22', 1)), ('27', ('92', 1)), ('31', ('80', 1)), ('36', ('15', 1)), ('36', ('32', 2)), ('36', ('42', 1)), ('40', ('45', 1)), ('44', ('9', 1)), ('54', ('45', 1)), ('62', ('81', 1)), ('63', ('74', 1)), ('72', ('39', 1)), ('75', ('37', 1)), ('84', ('4', 1)), ('90', ('76', 1)), ('0', ('15190', 1)), ('5', ('2409', 1)), ('13840', ('135', 1)), ('14005', ('12846', 1)), ('14005', ('38737', 2)), ('23520', ('13912', 1)), ('29791', ('49592', 1)), ('35606', ('29481', 1)), ('35648', ('29481', 1)), ('35678', ('5', 2)), ('4999', ('34280', 1)), ('6156', ('34450', 1)), ('33060', ('35613', 2)), ('34406', ('5', 2)), ('34450', ('49226', 3)), ('45054', ('623', 3)), ('12453', ('2755', 1)), ('6893', ('2755', 1)), ('21', ('18625', 1)), ('8795', ('35785', 1)), ('18625', ('35822', 2)), ('9172', ('19051', 3)), ('16519', ('30071', 1)), ('30062', ('38', 1)), ('150', ('20016', 2)), ('12570', ('26', 1)), ('49678', ('0', 3)), ('439', ('2062', 1)), ('1694', ('13076', 1)), 

In [None]:
def process_suggestions(user_suggestions, N):
    # Count occurrences of each suggestion
    suggestion_counts = {}
    for suggestion in user_suggestions:
        suggestion_counts[suggestion] = suggestion_counts.get(suggestion, 0) + 1
    
    # Get the N most common suggestions
    most_common_suggestions = sorted(suggestion_counts.items(), key=lambda x: -x[1])[:N]
    return most_common_suggestions

# Group by key (groups all of the friends and the common mutual friend count for all keys)
grouped_rdd = pairs_rdd.groupByKey()

# Now let's print the result
print(grouped_rdd.collect())

# Example usage:
N = 10  # Define the value of N
result_rdd = grouped_rdd.map(lambda x: (x[0], process_suggestions(x[1], N)))
result_collected = result_rdd.collect()
print(result_collected)

result_collected_dict = dict(result_collected)
print(result_collected_dict)


[('7001', <pyspark.resultiterable.ResultIterable object at 0x7f207023db20>), ('4124', <pyspark.resultiterable.ResultIterable object at 0x7f2070205c40>), ('34154', <pyspark.resultiterable.ResultIterable object at 0x7f2070205460>), ('34356', <pyspark.resultiterable.ResultIterable object at 0x7f2070205d90>), ('34453', <pyspark.resultiterable.ResultIterable object at 0x7f2070205070>), ('13870', <pyspark.resultiterable.ResultIterable object at 0x7f1f9743ad90>), ('4074', <pyspark.resultiterable.ResultIterable object at 0x7f207023d100>), ('49900', <pyspark.resultiterable.ResultIterable object at 0x7f207023dee0>), ('5143', <pyspark.resultiterable.ResultIterable object at 0x7f207023df70>), ('121', <pyspark.resultiterable.ResultIterable object at 0x7f207023d610>), ('157', <pyspark.resultiterable.ResultIterable object at 0x7f1f9739b970>), ('31251', <pyspark.resultiterable.ResultIterable object at 0x7f1f9739b640>), ('128', <pyspark.resultiterable.ResultIterable object at 0x7f1f9739b580>), ('2446',

In [None]:
# Sample random userIDs from the RDD
import random

# Count the number of rows in the original RDD
#num_rows = original_friends_data_rdd.count()
num_rows = original_friends_data_rdd.count()

# Generate a list of 10 random numbers between 0 and the count of the original RDD
random_userIds = ["6872", "6637", "1465", "36868", "31872", "38673", "21297", "13322", "38978", "7696"]

print(random_userIds)

['6872', '6637', '1465', '36868', '31872', '38673', '21297', '13322', '38978', '7696']


In [None]:
user_ids = []
userid_recommendations = []
for userid in random_userIds:
    user_ids.append(userid)

    if userid in result_collected_dict:
        recommendations = result_collected_dict[userid]

        recommendations = sorted(recommendations)

        result = [pair[0][0] for pair in recommendations]

        userid_recommendations.append(result)
    else:
        userid_recommendations.append([])



In [None]:
import pandas as pd

pd.set_option('display.max_colwidth', None)

# Display all rows
pd.set_option('display.max_rows', None)

# create a dataframe of the results
userIDs_recommendations_df = pd.DataFrame({"Random_UserIDs": user_ids, "Recommended_Friends": userid_recommendations})

In [None]:
userIDs_recommendations_df

Unnamed: 0,Random_UserIDs,Recommended_Friends
0,6872,"[1809, 6852, 6855, 6865, 6867, 6868, 6870, 6875, 6877, 6878]"
1,6637,"[11791, 17649, 29529, 37851, 39140, 39394, 529, 6553, 6580, 6608]"
2,1465,"[134, 13820, 14043, 14064, 1731, 18539, 2011, 24814, 34145, 34325]"
3,36868,"[10119, 36893, 37053, 37061, 37093, 37506, 37814, 38505, 44061, 44125]"
4,31872,"[15640, 30430, 31821, 31839, 31840, 31853, 31864, 31870, 31897, 31909]"
5,38673,"[38629, 38636, 38641, 38645, 38648, 38653, 38656, 38672, 40091, 42520]"
6,21297,"[16920, 20500, 21286, 25974, 27514, 32641, 37867, 48076, 7173, 7566]"
7,13322,"[13215, 13276, 13311, 13323, 15129, 20676, 20678, 20679, 20684, 44892]"
8,38978,"[17829, 24563, 26887, 28845, 38952, 38954, 38973, 38980, 43190, 43654]"
9,7696,"[12985, 12987, 15984, 17695, 23508, 23804, 33327, 4858, 7444, 7571]"
