
# CS6220 Homework 2
# Map Reduce: Friends of Friends

Include your code in this file. Make sure the below piece of code is at the top, as we will use that variable for testing.

### Tips and tricks
* Besides the Spark documentation, use the REPL feature heavily, since you'll be able to see functionality and functions.
* One function you may find useful is the `collect()` function that can collects the RDD from all machines and brings it into memory. This is only feasible for small datasets, and it will allow you to effectively debug.
* You can mount the `datapath` from a Google Drive. That way you won't have to keep uploading to Google Colab.
  * Try using the following code block:
  
  ```
     from google.colab import drive
     drive.mount('/content/drive')
  ```

* The total runtime is around 10 minutes, where you'll only notice in the reduce step. Spark is a lazy evaluator, and only when there's a `collect` or other evaluator step will you notice the lag.

In [None]:
#@title Data path for file. We will use the variable `data_path` for grading.
datapath="/content/drive/MyDrive/path-to-soc-LiveJournal1Adj.txt" #@param 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

sample_data = open("small_sample.txt", "w")  # append mode
sample_data.write("1\t2,3,5\n")
sample_data.write("2\t1,3,5,6\n")
sample_data.write("3\t1,2,7\n")
sample_data.write("4\t5,6,7\n")
sample_data.write("5\t1,2,4\n")
sample_data.write("6\t2,4,7\n")
sample_data.write("7\t3,4,6\n")
sample_data.close()

In [None]:
#@title Your Code Below

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=2b943f0f517c712b5bc827c35075fbab5caa882fa5d33e59499ab55e60f5a305
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
from pyspark.sql import *
# from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd
import itertools 

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

Read data file and process data

In [None]:
lines = sc.textFile(datapath, 1).map(lambda x: x.split("\t"))


In [None]:
# turn data type into int(for sorting after)
# split friends into a list
def split_to_int(split):
  
  user = int(split[0])
  if len(split[1]) == 0:
    friends = []
  else:
    friends = list(map(lambda x: int(x), split[1].split(",")))

  return (user, friends)


def make_pair(friend_list):
  user = friend_list[0]
  friends = friend_list[1]

  already_friends = [((user, friend), 0) for friend in friends]
  have_mutual_friend = [(have_mutual, 1)for have_mutual in itertools.combinations(friends, 2)]

  return already_friends+have_mutual_friend



def rec_for_both(rec):
  users = rec[0]
  count = rec[1]

  user_1 = users[0]
  user_2 = users[1]

  rec_1 = (user_1, (user_2, count))
  rec_2 = (user_2, (user_1, count))

  return [rec_1, rec_2]


def rec_to_sorted(recs):
  recs.sort(key=lambda x: (-x[1], x[0]))
  return list(map(lambda x: x[0], recs))[:10]



# [user:int, friends:list[int]]
user_friend_pair = lines.map(split_to_int).flatMap(make_pair)
user_friend_pair.cache()

mutual_friend_counts = user_friend_pair.groupByKey() \
    .filter(lambda edge: 0 not in edge[1]) \
    .map(lambda edge: (edge[0], sum(edge[1])))
    

recommend_list = mutual_friend_counts.flatMap(rec_for_both)\
    .groupByKey()\
    .map(lambda m: (m[0], rec_to_sorted(list(m[1]))))


sample_list = [11, 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]

sample_output = recommend_list.filter(lambda x: x[0] in sample_list)

sample_output.collect()



[(9021, [9020, 9016, 9017, 9022, 317, 9023]),
 (11, [27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667]),
 (9990, [13134, 13478, 13877, 34299, 34485, 34642, 37941]),
 (8942, [8939, 8940, 8943, 8944]),
 (924, [439, 2409, 6995, 11860, 15416, 43748, 45881]),
 (9020, [9021, 9016, 9017, 9022, 317, 9023]),
 (9993, [9991, 13134, 13478, 13877, 34299, 34485, 34642, 37941]),
 (9992, [9987, 9989, 35667, 9989, 9991, 9991]),
 (9019, [9022, 317, 9023]),
 (9022, [9019, 9020, 9021, 317, 9016, 9017, 9023]),
 (8941, [8943, 8944, 8940])]

In [None]:
recommend_list.saveAsTextFile("/content/drive/MyDrive/Output.txt")