In [None]:
#@title Data path for file.
datapath="/content/drive/MyDrive/soc-LiveJournal1Adj.txt" #@param

In [None]:
#@title Install all the appropriate packages

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

In [None]:
#@title Import and create Spark Context

# Let's import the libraries we will need
import itertools

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
# mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Read the data in
lines = sc.textFile(datapath, 1)
lines = lines.map(lambda line: line.split())

## Functions Used

## 1. function: fmt
This function is for creating pairs of potential friends and pairs of existing friends.
ex: x = ['1', ['2','4','5']] <br>
we can create pairs of potential friends:<br>
(('2','4'), 1), (('2','5'), 1), (('4','5'), 1)<br>
(('4','2'), 1), (('5','2'), 1), (('5','4'), 1)<br>
we can also create pairs of existing friends: <br>
(('1','2'), -inf), (('1','4'), -inf), (('1','5'), -inf)

In [None]:
def fmt(x):
  y = []
  friends = x[1]
  # if len(friends) > 0:
  for i in range(0, len(friends)-1):
    for j in range(i+1, len(friends)):
      temp1 = ((friends[i], friends[j]), 1)
      temp2 = ((friends[j], friends[i]), 1)
      y.append(temp1)
      y.append(temp2)
  for f in friends:
    temp = ((x[0], f), float('-inf'))
    y.append(temp)
  return y



## 2. function : generate_recommendation_list <br>
if the number of recommendations is larger than 10, then we only retrieve the first ten recommendations.
Also, store the recommendations as comma separated list.<br>
ex: ['1', '2,3,4'] <br>
This means for user 1, the recommendation is 2, 3 and 4.

In [None]:
def generate_recommendation_list(x):
  user_id = x[0]
  recommended_friends = []
  for i in x[1]:
    recommended_friends.append(i[0])
  if len(recommended_friends) > 10:
    recommended_friends = recommended_friends[0: 10]
  return [user_id, ','.join(recommended_friends)]

##3. Function: sort_value
ex: x = ('1', [('2', 3), ('4', 8), ('3', 3)])<br>
This user 1 has 3 mutual friends with user 2, 8 with user 4 and 3 with user 3. <br>
We sort x[1], which is a list in descending order of the number of mutual friends, if the number of mutual friends are the same, sort by the user id in numerically ascending order.

In [None]:
def sort_value(x):
  sorted_list = sorted(x[1], key = lambda x: (-x[1], int(x[0])))
  return (x[0], sorted_list)

## 4. Final output

In [19]:
output = lines.map(lambda x: (x[0], x[1].split(',')) if len(x) > 1 else (x[0], [])).map(lambda x: fmt(x)).flatMap(lambda x: x).reduceByKey(lambda x, y: x+y).filter(lambda x: x[1] > 0).map(lambda x: (x[0][0], [(x[0][1], x[1])])).reduceByKey(lambda x,y: x+y).map(lambda x: sort_value(x)).map(lambda x: generate_recommendation_list(x)).collect()

## 5. Sort the ouput according to the user id.

In [None]:
output = sorted(output, key=lambda x: int(x[0]))

## 6. Save the output into "output.txt"

In [None]:
with open("/content/drive/MyDrive/output.txt", 'w') as f:
  for line in output:
    f.write(line[0])
    f.write('\t')
    f.write(line[1])
    f.write('\n')