## Map Reduce in Spark - Hui Hu
Write a Spark program that implements a simple “People You Might Know” social network friendship recommendation algorithm. The key idea is that if two people have a lot of mutual friends, then the system should recommend that they connect with each other.

## Pipeline Sketch
Utilize spark to break dowm user and their friends into pairs, and then count the number of mutual friends for each pair. Then, for each user, sort the pairs by the number of mutual friends, and output the top 10 pairs.

## Data
- The data file is [soc-LiveJournal1Adj.txt](https://course.ccs.neu.edu/cs6220/fall2023/homework-3/) in the homework 3 folder.
- The file contains the adjacency list and has multiple lines in the following format: `<User><TAB><Friends>`
- Here, `<User>` is a unique integer ID corresponding to a unique user and `<Friends>` is a comma separated list of unique IDs corresponding to the friends of the user with the unique ID `<User>`. Note that the friendships are mutual (i.e., edges are undirected): if A is friend with B then B is also friend with A. The data provided is consistent with that rule as there is an explicit entry for each side of each edge.

In [2]:
datalink = "https://course.ccs.neu.edu/cs6220/fall2023/homework-3/"
datapath = "data/soc-LiveJournal1Adj.txt"

In [3]:
import pyspark
from typing import Tuple, List
import os
import heapq

# Set the Python hash seed
os.environ["PYTHONHASHSEED"] = "0"
sc = pyspark.SparkContext()

24/02/02 22:45:00 WARN Utils: Your hostname, Huis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.128 instead (on interface en0)
24/02/02 22:45:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/02 22:45:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/02/02 22:45:17 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 63906)
Traceback (most recent call last):
  File "/Users/huihu/.pyenv/versions/3.9.16/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/huihu/.pyenv/versions/3.9.16/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/huihu/.pyenv/versions/3.9.16/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/huihu/.pyenv/versions/3.9.16/lib/python3.9/socketserver.py", line 747, in __init__
  

### Read date and extract userId and friends
- Read data from the text file and convert the data into a key-value pair RDD, where the key is the userId and the value is the list of friends of the user.
- userId and friends' Id are all integers.

In [5]:
lines = (
    sc.textFile(datapath, 1)
    .map(lambda line: line.split()).map(
        lambda x: (int(x[0]), [int(i) for i in x[1].split(",")] if len(x) > 1 else [])
    )
)
lines.take(5)

                                                                                

[(0,
  [1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49,
   50,
   51,
   52,
   53,
   54,
   55,
   56,
   57,
   58,
   59,
   60,
   61,
   62,
   63,
   64,
   65,
   66,
   67,
   68,
   69,
   70,
   71,
   72,
   73,
   74,
   75,
   76,
   77,
   78,
   79,
   80,
   81,
   82,
   83,
   84,
   85,
   86,
   87,
   88,
   89,
   90,
   91,
   92,
   93,
   94]),
 (1,
  [0,
   5,
   20,
   135,
   2409,
   8715,
   8932,
   10623,
   12347,
   12846,
   13840,
   13845,
   14005,
   20075,
   21556,
   22939,
   23520,
   28193,
   29724,
   29791,
   29826,
   30691,
   31232,
   31435,
   32317,
   32489,
   34394,
   35589,
   35605,
   35606,
   35613,
   35633,
   35648,
   35678,
   38737,
   43

### Extract pairs of mutual friends
If A and B are recommended to be friends, they have a mutual friend C, then A and B will both in the list of C's friends. Therefore, we can extract potential pairs of mutual friends from the list of friends of each user, and then count the frequency for each pair.

In [6]:
def extract_friend_pairs(params: Tuple[int, List[int]]):
    user_id, friends = params

    n = len(friends)
    ans = []

    for friend in friends:
        if user_id < friend:
            # keep the smaller userId comes first
            # key: (userId_1, userId_2)
            # value: frequency of the pair (negative infinity means the pair is invalid)
            ans.append(((user_id, friend), float("-inf")))
        else:
            ans.append(((friend, user_id), float("-inf")))

    for i in range(n):
        for j in range(i + 1, n):
            if friends[i] < friends[j]:
                ans.append(((friends[i], friends[j]), 1))
            else:
                ans.append(((friends[j], friends[i]), 1))

    return ans


friend_pairs = lines.flatMap(extract_friend_pairs)
friend_pairs.take(5)

[((0, 1), -inf),
 ((0, 2), -inf),
 ((0, 3), -inf),
 ((0, 4), -inf),
 ((0, 5), -inf)]

### Find potential friends
- First we will calculate the frequency of each pair of potential friends.
- Then we will filter out the pairs that are already friends.

In [9]:
new_friend_pairs = (
    friend_pairs.reduceByKey(lambda x, y: x + y)
    .filter(lambda x: x[1] > 0)
    # although (A, B) and (B, A) are the same, 
    # they arre treated as different pairs when recommending friends for A and B
    .flatMap(lambda x: [x, ((x[0][1], x[0][0]), x[1])])
)

# key: userId
# value: recommended friend and frequency
new_friend_pairs.take(5)

                                                                                

[((7, 22), 1), ((22, 7), 1), ((13, 50), 1), ((50, 13), 1), ((32, 93), 1)]

### Collect all potential friends for each user
- For each user, we will collect all potential friends and sort them by frequency in descending order. If ties, sort by userId in ascending order.
- Since each user has 245 potential friends on average, we utilize the heap data structure to store the potential friends and only keep the top 10 potential friends for each user.

In [10]:
potential_friends_with_frequency = new_friend_pairs.map(
    lambda x: (x[0][0], [(x[0][1], x[1])])
).reduceByKey(lambda x, y: x + y)

# key: userId
# value: recommended friends and frequency
potential_friends_with_frequency.take(5)

                                                                                

[(49245,
  [(18523, 1),
   (24249, 1),
   (49240, 5),
   (12020, 1),
   (37308, 1),
   (23298, 1),
   (6461, 1),
   (38929, 1),
   (49211, 4),
   (25145, 1),
   (43572, 1),
   (15314, 1),
   (22279, 1),
   (13557, 1),
   (30762, 1),
   (19881, 1),
   (14472, 1),
   (49985, 1),
   (27656, 1),
   (43606, 1),
   (45372, 1),
   (1661, 1),
   (35841, 1),
   (36754, 1),
   (49241, 5),
   (13298, 1),
   (35692, 1),
   (41622, 1),
   (43253, 1),
   (49293, 14),
   (9650, 1),
   (33507, 1),
   (34188, 1),
   (49212, 7),
   (49255, 5),
   (16404, 1),
   (49272, 6),
   (15994, 1),
   (44846, 1),
   (41477, 1),
   (49207, 5),
   (33121, 1),
   (8047, 1),
   (40997, 1),
   (41406, 1),
   (12207, 1),
   (49256, 5),
   (27655, 1),
   (41900, 1),
   (34395, 1),
   (44898, 1),
   (29940, 1),
   (1099, 1),
   (1384, 1),
   (17677, 1),
   (49230, 6),
   (15370, 1),
   (24796, 1),
   (19365, 1),
   (11615, 1),
   (32377, 1),
   (41631, 1),
   (37068, 1),
   (49280, 1),
   (33119, 3),
   (16747, 1),
   (13

In [11]:
def extract_n_friends_by_frequency(
    params: Tuple[int, List[Tuple[int, int]]], n: int = 10
):
    user_id, friends_list = params

    heap = []
    for index, value in enumerate(friends_list):
        if index < n:
            heapq.heappush(heap, (value[1], -value[0]))
        else:
            heapq.heappushpop(heap, (value[1], -value[0]))

    friends = []
    for _ in range(len(heap)):
        friends.insert(0, -heapq.heappop(heap)[1])

    return (user_id, list(friends))

potential_friends = potential_friends_with_frequency.map(extract_n_friends_by_frequency)

# key: userId
# value: the top 10 recommended friends
potential_friends.take(5)


                                                                                

[(36794,
  [10058, 36808, 37150, 36687, 36698, 36708, 36735, 36763, 36854, 36989]),
 (13344, [7661, 16163, 0, 36, 284, 311, 575, 1347, 1371, 1390]),
 (45855,
  [16869, 16963, 3736, 11236, 13576, 15657, 16878, 16883, 16906, 30015]),
 (35630, [9194, 13870, 16894, 25199, 35562, 42051, 1, 1031, 16862, 23690]),
 (20439, [19, 998, 1174, 3360, 3883, 3895, 4839, 6168, 7655, 11341])]

In [13]:
user11 = potential_friends.lookup(11)
print(user11)
# expected 27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667

[[27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667]]


In [17]:
user_list = [924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]
ans = potential_friends.filter(lambda x: x[0] in user_list).collect()
for user in ans:
    print(f"user {user[0]}", user[1])

[Stage 28:>                                                         (0 + 1) / 1]

user 924 [439, 2409, 6995, 11860, 15416, 43748, 45881]
user 9020 [9021, 9016, 9017, 9022, 317, 9023]
user 9019 [9022, 317, 9023]
user 9993 [9991, 13134, 13478, 13877, 34299, 34485, 34642, 37941]
user 9022 [9019, 9020, 9021, 317, 9016, 9017, 9023]
user 8941 [8943, 8944, 8940]
user 9992 [9987, 9989, 35667, 9991]
user 9021 [9020, 9016, 9017, 9022, 317, 9023]
user 9990 [13134, 13478, 13877, 34299, 34485, 34642, 37941]
user 8942 [8939, 8940, 8943, 8944]


                                                                                

24/02/03 04:00:07 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1925779 ms exceeds timeout 120000 ms
24/02/03 04:00:07 WARN SparkContext: Killing executors is not supported by current scheduler.
24/02/03 04:00:08 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$