# Homework 3

Submit your *.ipynb through Gradescope by downloading: `File` ⇒ `Download` ⇒ `Download .ipynb`, and then submit with your PDF via link to your repository.

### Setup

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u392-ga-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 32 not upgraded.


In [1]:
#@title Import PySpark and create SparkContext

import itertools
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Mount Google Drive

In [2]:
# mount drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
#@title Download the data from the course website
!wget https://course.ccs.neu.edu/cs6220/fall2023/homework-3/soc-LiveJournal1Adj.txt

--2024-02-08 21:35:30--  https://course.ccs.neu.edu/cs6220/fall2023/homework-3/soc-LiveJournal1Adj.txt
Resolving course.ccs.neu.edu (course.ccs.neu.edu)... 129.10.117.35
Connecting to course.ccs.neu.edu (course.ccs.neu.edu)|129.10.117.35|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4156181 (4.0M) [text/plain]
Saving to: ‘soc-LiveJournal1Adj.txt.6’


2024-02-08 21:35:30 (20.8 MB/s) - ‘soc-LiveJournal1Adj.txt.6’ saved [4156181/4156181]



### Load the data in!

In [4]:
# [0] Read the data in
lines = sc.textFile("soc-LiveJournal1Adj.txt", 1)
lines = lines.map(lambda line: line.split())

In [5]:
lines.take(2)

[['0',
  '1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94'],
 ['1',
  '0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592']]

### Step 1 & 2: Parse input data, create friend pairs

In [6]:
# [Step 1] Parse input data to a list of [user, friends]
# [Step 2] Create friend pairs
"""
Create friend pairs

the result of one friend pair should be
((user_id, user_id), number of mutual friend)

number of mutual friend:
if A and B are potential frients, set this value to 1
if A and B are already friends, set this value to -inf

for example: A | B, C
A & B are friend
A & C are friend
B & C have one common friend: A
C & B have one common friend: A

the result should be
[([A, B], -inf) , ([A, C], -inf), ([B, C], 1), ([C, B], 1)]
"""

def create_friend_pairs(line):

  if len(line) < 2:
    return []
  user, friends = line[0], line[1].split(',')
  friend_pairs = []
  minimum = float('-inf')
  for friend in friends:
    friend_pairs.append(((user, friend), minimum))
  for i in range(len(friends) - 1):
    for j in range(i + 1, len(friends)):
      friend_pairs.append(((friends[i], friends[j]), 1))
      friend_pairs.append(((friends[j], friends[i]), 1))

  return friend_pairs

### Get potential friend pairs

In [7]:
# [Step 3] Filter result and get potential friend pairs
"""
1. Sum up and reduce:
Sum up the number of mutual friend by the same key
after the sum up, we could have a list like this:
((A, B), -inf))
((A, C), 3)
((A, D), 8)
((B, G), 7)
...

2. Filter out the negative:
Filter out the pairs with negative number of mutual friends.
"""
flat_result = lines.flatMap(create_friend_pairs).reduceByKey(lambda x, y : x + y)
filtered_result = flat_result.filter(lambda pair : pair[1] > 0)

### Step 4: Sort result and get the Top N

In [8]:
# [Step 4] Sort result and get the top N.
# 1. Change data format: (user, friend), count ---> (user, (friend, count))
# 2. Group the result by user
# 3. Sort each user's reult first by decreasing count, then increasing friend ID.

group_result = filtered_result \
                .map(lambda pair : (pair[0][0], (pair[0][1], int(pair[1])))) \
                .groupByKey() \
                .map(lambda pair: (pair[0], sorted(list(pair[1]), key=lambda x: (-x[1], x[0]))[:10]))

group_result.lookup('11')

[[('27552', 4),
  ('27573', 3),
  ('27574', 3),
  ('27589', 3),
  ('27590', 3),
  ('27600', 3),
  ('27617', 3),
  ('27620', 3),
  ('27667', 3),
  ('32072', 3)]]

### Step 5: Save output

In [14]:
results = group_result.collect()
file_path = "/content/drive/MyDrive/CS6220/output.txt"

with open(file_path, 'w') as f:
    for user, recommendations in sorted(results, key=lambda x: int(x[0])):
        recommendations_str = ','.join(str(friend) for friend, _ in recommendations)
        f.write(f"{user}\t{recommendations_str}\n")