#Installation

1. Run all code below to set up the environment
2. Upload input data file to /content folder in Google colab repo

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

In [3]:
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [5]:
!pip install -q findspark

In [6]:
os.environ["SPARK_HOME"]

'/content/spark-3.3.2-bin-hadoop3'

#Code for Question 1

Run Each cell sequentially.

**Some cells might take 30 sec - 2 min to complete running since RDD follows lazy execution.

In [7]:
import findspark
findspark.init()
from pyspark import SparkContext
from operator import add

In [8]:
def process_file():
  file1 = open("/content/soc-LiveJournal1Adj.txt", "r")
  friend_graph_dict = {}

  temp = file1.readline()
  while temp:
    key,value = temp.split("\t")
    friend_graph_dict[key] = value.replace("\n","").split(",")
    temp = file1.readline()
  
  file1.close()
  return bfs_dict(friend_graph_dict)

In [39]:
def bfs_dict(grph):
    
    key_list = []

    for node in grph.keys():
      stack = []
      exclude_list = set() 
      stack.append(node)
      exclude_list.add(node)

      for x in grph[node]:
        stack.append(x)
        exclude_list.add(x)

      while stack:
        root = stack[-1]
        try:
          for c in grph[root]:
            if c not in exclude_list:
              key_list.append((node, c))
          stack.pop()
        except KeyError:
          break
    
        
    return key_list

In [40]:
def get_recommended_users(out_tuple, limit=10):

  recommended_list = []
  sorted_recc = sorted(out_tuple[1], key=lambda x:(x[1],1/int(x[0])),reverse=True)
  if len(sorted_recc) <= limit:
    recommended_list.append([out_tuple[0],sorted_recc])
  else:
    recommended_list.append([out_tuple[0],sorted_recc[0:10]]) 

  return recommended_list


In [42]:
#Create the Spark Session
sc1 = SparkContext("local", "FriendRecommendation")

In [43]:
#Preprocess file and create Map RDD
dataset = process_file()
rdd1 = sc1.parallelize(dataset,100)

map_rdd = rdd1.map(lambda x: (x,1))

In [44]:
#Shuffle/sort RDD
shuffle_rdd = map_rdd.sortByKey()
print(shuffle_rdd.take(10))
print(shuffle_rdd.getNumPartitions())

[(('0', '10001'), 1), (('0', '1001'), 1), (('0', '10014'), 1), (('0', '10018'), 1), (('0', '10020'), 1), (('0', '10023'), 1), (('0', '10025'), 1), (('0', '10038'), 1), (('0', '10041'), 1), (('0', '10042'), 1)]
100


In [45]:
#Reduce
counts = shuffle_rdd.reduceByKey(add, 100)
print(counts.getNumPartitions())

100


In [46]:
print(len(counts.collect()))
#print(counts.take(10))

12054196


In [16]:
#make groups based on user_id
map_rdd2 = counts.map(lambda x: (x[0][0], (x[0][1],x[1])))

In [17]:
group_rdd = map_rdd2.groupByKey(500)

In [18]:
print(group_rdd.take(10))

[('10513', <pyspark.resultiterable.ResultIterable object at 0x7f26746274f0>), ('1083', <pyspark.resultiterable.ResultIterable object at 0x7f26746279d0>), ('1114', <pyspark.resultiterable.ResultIterable object at 0x7f267d03c9a0>), ('11190', <pyspark.resultiterable.ResultIterable object at 0x7f267d03ceb0>), ('11779', <pyspark.resultiterable.ResultIterable object at 0x7f267d03c430>), ('13035', <pyspark.resultiterable.ResultIterable object at 0x7f267d03ce50>), ('13516', <pyspark.resultiterable.ResultIterable object at 0x7f267460e7f0>), ('13977', <pyspark.resultiterable.ResultIterable object at 0x7f267460e670>), ('14121', <pyspark.resultiterable.ResultIterable object at 0x7f267460e4c0>), ('15523', <pyspark.resultiterable.ResultIterable object at 0x7f26e89fdc40>)]


##Query Users

In [19]:
#Querying User Recommendations
user_list = [924,8941,8942,9019,9020,9021,9022,9990,9992,9993]
user_list = list(map(str, [x for x in user_list]))

data = group_rdd.filter(lambda x:(x[0]in user_list)) 
print(data.take(10))

[('8941', <pyspark.resultiterable.ResultIterable object at 0x7f2674627f70>), ('9022', <pyspark.resultiterable.ResultIterable object at 0x7f2674627f40>), ('9020', <pyspark.resultiterable.ResultIterable object at 0x7f26746276a0>), ('9021', <pyspark.resultiterable.ResultIterable object at 0x7f2674627c10>), ('924', <pyspark.resultiterable.ResultIterable object at 0x7f2674627d30>), ('9993', <pyspark.resultiterable.ResultIterable object at 0x7f2674627a60>), ('9990', <pyspark.resultiterable.ResultIterable object at 0x7f2674627610>), ('9992', <pyspark.resultiterable.ResultIterable object at 0x7f2674627d00>), ('9019', <pyspark.resultiterable.ResultIterable object at 0x7f26746279d0>), ('8942', <pyspark.resultiterable.ResultIterable object at 0x7f2674627670>)]


In [32]:
top10 = sorted(data.flatMap(get_recommended_users).collect(), key=lambda x: int(x[0]))

#Print Results

In [47]:
#Print (user_id, [(recommended_user, number_mutual_friends)])
print(*top10,sep="\n")

['924', [('439', 1), ('2409', 1), ('6995', 1), ('11860', 1), ('15416', 1), ('43748', 1), ('45881', 1)]]
['8941', [('8943', 2), ('8944', 2), ('8940', 1)]]
['8942', [('8939', 3), ('8940', 1), ('8943', 1), ('8944', 1)]]
['9019', [('9022', 2), ('317', 1), ('9023', 1)]]
['9020', [('9021', 3), ('9016', 2), ('9017', 2), ('9022', 2), ('317', 1), ('9023', 1)]]
['9021', [('9020', 3), ('9016', 2), ('9017', 2), ('9022', 2), ('317', 1), ('9023', 1)]]
['9022', [('9019', 2), ('9020', 2), ('9021', 2), ('317', 1), ('9016', 1), ('9017', 1), ('9023', 1)]]
['9990', [('13134', 1), ('13478', 1), ('13877', 1), ('34299', 1), ('34485', 1), ('34642', 1), ('37941', 1)]]
['9992', [('9987', 4), ('9989', 4), ('35667', 3), ('9991', 2)]]
['9993', [('9991', 5), ('13134', 1), ('13478', 1), ('13877', 1), ('34299', 1), ('34485', 1), ('34642', 1), ('37941', 1)]]


In [49]:
#Print Recommendations in required format
file2 = open("top10_results", "w")
for line in top10:

  temp_str = line[0]+"\t"
  for i in range(0,len(line[1])):
    if i == len(line[1])-1:
      temp_str += line[1][i][0]+"\n"
    else:
      temp_str += line[1][i][0]+", "
  file2.write(temp_str)
  print(temp_str, end="")
file2.close()

924	439, 2409, 6995, 11860, 15416, 43748, 45881
8941	8943, 8944, 8940
8942	8939, 8940, 8943, 8944
9019	9022, 317, 9023
9020	9021, 9016, 9017, 9022, 317, 9023
9021	9020, 9016, 9017, 9022, 317, 9023
9022	9019, 9020, 9021, 317, 9016, 9017, 9023
9990	13134, 13478, 13877, 34299, 34485, 34642, 37941
9992	9987, 9989, 35667, 9991
9993	9991, 13134, 13478, 13877, 34299, 34485, 34642, 37941


In [41]:
#stop the SparkContext object
sc1.stop()