<a href="https://colab.research.google.com/github/Aditya0996/PeopleYouMightKnow-FriendRecommendation/blob/main/%E2%80%9CPeople_You_Might_Know%E2%80%9D_social_network_friendship_recommendation_algorithm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Import Data and Library

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u382-ga-1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 18 not upgraded.


In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd
import itertools

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [None]:
from pyspark import SparkConf
conf=SparkConf().set("spark.executor.memory", "8g")

Please upload input file to sample_data folder before running!

In [None]:
#Upload txt file under sample data
txt = spark.read.text("/content/sample_data/soc-LiveJournal1Adj.txt")

In [None]:
#Add users to "user" column and friends to "friends" column
txt = txt.withColumn("user", split(txt.value, "\t")[0])
txt = txt.withColumn("friends", split(txt.value, "\t")[1])

In [None]:
#Split friends column to get a list of friends
txt = txt.withColumn("friends", split(txt.friends, ","))

In [None]:
txt = txt.select(txt.user,txt.friends)

In [None]:
#explode the data to get user , friend
txt.select(txt.user,explode(txt.friends).alias("friend")).show(truncate=False)

+----+------+
|user|friend|
+----+------+
|0   |1     |
|0   |2     |
|0   |3     |
|0   |4     |
|0   |5     |
|0   |6     |
|0   |7     |
|0   |8     |
|0   |9     |
|0   |10    |
|0   |11    |
|0   |12    |
|0   |13    |
|0   |14    |
|0   |15    |
|0   |16    |
|0   |17    |
|0   |18    |
|0   |19    |
|0   |20    |
+----+------+
only showing top 20 rows



In [None]:
friends = txt.select(txt.user,explode(txt.friends).alias("friend"))

In [None]:
#Assign big value to direct friends to be able to filter direct friends out of recommendation
map_friend = friends.rdd.map(lambda row: ((row.user, row.friend),-9999999))
#Take permutation of each list of friends to make a map of mutual friends
map_mutual = txt.rdd.flatMap(lambda row: [(mutual,1) for mutual in itertools.permutations(row.friends, 2)])

Source for flatMap: https://stackoverflow.com/questions/61053329/how-to-convert-rdd-list-of-lists-into-one-list-in-pyspark

In [None]:
#Reduce by key to get the no. of occurance of mutual friends
map_mutual = map_mutual.reduceByKey(lambda a,b: a+b)

In [None]:
# map_mutual.take(10)

In [None]:
#Combining the two maps
map_mutual = map_mutual.union(map_friend)

Source for union command: https://stackoverflow.com/questions/27395420/concatenating-datasets-of-different-rdds-in-apache-spark-using-scala

In [None]:
#Reduce by key to get "count" of direct friends go to -ve value.
reduced = map_mutual.reduceByKey(lambda a,b: a+b)

In [None]:
#Filter out direct friends
reduced = reduced.filter(lambda x: x[1] > 0)

In [None]:
# reduced.take(10)

In [None]:
# ((user,mutual),count): [(user,(count,mutual))]
new_mutualFriends = reduced.map(lambda x: (x[0][0],(x[1],x[0][1])))

In [None]:
#Group by key to get list of recommendation for each user.
new_mutualFriends = new_mutualFriends.groupByKey().mapValues(list)

Source for sorting list of tuples: https://stackoverflow.com/questions/34618029/how-to-sort-rdd-of-nested-list-structure-by-value-in-spark

In [None]:
#Sort the count in descending order and the users with same count in ascending order.
# (k, v): (k, sorted(v, key=lambda x: x[1], reverse=True))
new_mutualFriends = new_mutualFriends.map(lambda x: (x[0], sorted(x[1], key=lambda y:(y[0], -int(y[1])), reverse=True)))

In [None]:
#Take the top 20 recommendation for each user
n = 20
new_mutualFriends = new_mutualFriends.map(lambda x:(x[0],x[1][:n]))

In [None]:
recommendations = new_mutualFriends.map(lambda x: (x[0],[y[1] for y in x[1]]))

In [None]:
# Get the recommendations as a list
recommendationList = recommendations.collect()

In [None]:
#Find users with no friends or no mutual friends by comparing it with original file
txt1 = spark.read.text("/content/sample_data/soc-LiveJournal1Adj.txt")
txt1 = txt1.withColumn("values", split(txt1.value, "\t")[0])
empty = txt1.rdd.map(lambda row: (row.values,1))
notEmpty = recommendations.map(lambda row: (row[0],1))
fullList = empty.union(notEmpty)
emptyList = fullList.reduceByKey(lambda a,b: a-b).filter(lambda x: x[1]>0).map(lambda x: (x[0],[])).collect()

In [None]:
# Add the empty list and recommendation list to get the final list. Also, sorting the list with users in ascending order.
def getkey(x):
  return int(x[0])
finalRecommendation = recommendationList + emptyList
finalRecommendation.sort(key= getkey)

In [None]:
len(finalRecommendation)

49995

In [None]:
for x in finalRecommendation:
  if int(x[0]) == 11 or int(x[0]) == 924 or int(x[0]) == 8941 or int(x[0]) == 8942 or int(x[0]) == 9019 or int(x[0]) == 9020 or int(x[0]) == 9021 or int(x[0]) == 9022 or int(x[0]) == 9990 or int(x[0]) == 9992 or int(x[0]) == 9993:
    print (x[0],"\t",','.join(x[1]))

11 	 27552,7785,27573,27574,27589,27590,27600,27617,27620,27667,32072,33192,10,12,110,638,1797,2141,5784,6893
924 	 439,2409,6995,11860,15416,43748,45881
8941 	 8943,8944,8940
8942 	 8939,8940,8943,8944
9019 	 9022,317,9023
9020 	 9021,9016,9017,9022,317,9023
9021 	 9020,9016,9017,9022,317,9023
9022 	 9019,9020,9021,317,9016,9017,9023
9990 	 13134,13478,13877,34299,34485,34642,37941
9992 	 9987,9989,35667,9991
9993 	 9991,13134,13478,13877,34299,34485,34642,37941


In [None]:
#Get the output.txt file with all recommendations
f = open('/content/sample_data/output.txt', 'w')
for line in finalRecommendation:
  f.write(""+line[0]+"\t"+','.join(line[1])+"\n")