In [1]:
!pip install --upgrade --no-cache-dir gdown >/dev/null
!gdown 1-OCBGBtKoY_PadKHcXDyWxHQ2BS8Nulo

Downloading...
From: https://drive.google.com/uc?id=1-OCBGBtKoY_PadKHcXDyWxHQ2BS8Nulo
To: /content/bigdata_hw1_files.zip
100% 38.9M/38.9M [00:00<00:00, 144MB/s]


In [2]:
!unzip bigdata_hw1_files.zip

Archive:  bigdata_hw1_files.zip
   creating: hw1-files/
   creating: hw1-files/q3/
  inflating: hw1-files/q3/patches.csv  
  inflating: hw1-files/q3/lsh.py     
   creating: hw1-files/q1/
  inflating: hw1-files/q1/dataset1.txt  
   creating: hw1-files/q1/.ipynb_checkpoints/
   creating: hw1-files/q2/
  inflating: hw1-files/q2/games_library.txt  
   creating: hw1-files/.ipynb_checkpoints/


In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 26 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ee3057050fa4fdc6fb0a68a4af3f046a5524f912f32928a7571587f6507d8c4e
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [14]:
from pyspark import SparkContext

N = 10
INPUT_USERS = (98, 135, 117, 911, 8804)
INPUT_FILE = 'hw1-files/q1/dataset1.txt'

In [16]:
def map_func(line):
    # Split user id and friend ids
    tokens = line.split('\t')

    # Skip bad lines
    if len(tokens) < 2:
        return []

    # Parse line
    user = int(tokens[0])
    friends = [int(f) for f in tokens[1].split(',') if f.strip() != '']
    pairs = []

    # Mark already friendships with 0
    for f in friends:
        if user in INPUT_USERS:
            pairs.append(((user, f), 0))
        if f in INPUT_USERS:
            pairs.append(((f, user), 0))

    # Mark mutual friendships with 1
    for i in range(0, len(friends) - 1):
        for j in range(i + 1, len(friends)):
            a = friends[i]
            b = friends[j]
            if a in INPUT_USERS:
                pairs.append(((a, b), 1))
            if b in INPUT_USERS:
                pairs.append(((b, a), 1))

    return pairs

In [18]:
# Create context and read input file
sc = SparkContext.getOrCreate()
rdd = sc.textFile(INPUT_FILE)

# Mapping step
map_result = rdd.flatMap(map_func)

# Fetch already friendship pairs.
# Marked them with 0 value in the mapping step.
already_friends = map_result.filter(lambda x: x[1] == 0)

# Reducing step
# Remove already friendship pairs through subtraction to avoid absurd recommendations!
# Simply reduce by summing values for each key.
# Group by has been considered by spark in reduceByKey function.
reduce_result = map_result \
    .subtractByKey(already_friends) \
    .reduceByKey(lambda a, b: a + b)

# Transform mapping format
# Consider user id as key and a list of pairs of recommended friend id with its mutual count as value.
# Sort by mutual count in ascending order.
# Group by user id.
# Sort by user id to generate the output in descending order.
# Include only N top recommendations for each user.
recoms = reduce_result \
    .map(lambda x: (x[0][0], (x[0][1], x[1]))) \
    .sortBy(lambda x: -x[1][1]) \
    .groupByKey() \
    .sortByKey() \
    .mapValues(list) \
    .map(lambda x: (x[0], x[1][:N]))

# Print recommendations for each input user
for r in recoms.collect():
    recom_list = [str(p[0]) for p in r[1]]
    print(r[0], '\t', ','.join(recom_list))

98 	 18560,16324,30691,2554,30134,16350,13654,168,5924,5052
117 	 34164,13793,34169,15314,23510,12519,34207,23507,34140,34220
135 	 33060,13792,34151,25256,34164,629,19217,34441,45054,5490
911 	 24456,40560,39540,30995,37875,30984,30996,41352,30993,33333
8804 	 34332,34179,3230,34174,13182,29745,8677,34233,13872,11400
