In [14]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

import itertools

#if __name__ == '__main__':
# create the Spark Session
spark = SparkSession.builder.getOrCreate()

sc = SparkContext.getOrCreate()
lines = sc.textFile('data/soc-LiveJournal1Adj.txt')
lines = lines.map(lambda line:line.split())
friends = lines.filter(lambda x:len(x)==2).map(lambda x:(x[0],x[1].split(','))) # map friends to user

# create a list of direct friends and mutual friends
directFriends = friends.flatMap(lambda data:[((data[0],friend), -1000000) for friend in data[1]]) # -1000000 is used to differentiate direct friends from mutual friends
mutualFriends = friends.flatMap(lambda data: [(pair, 1) for pair in itertools.permutations(data[1], 2)]) # map pairs of friends
fullList = directFriends.union(mutualFriends) # combine direct and mutual friends

# count the number of mutual friends
fullList = fullList.reduceByKey(lambda x,y: x+y)
mutualCount = fullList.filter(lambda x:x[1]>0) # filter out direct friends
mutualCount = mutualCount.map(lambda x:(x[0][0],(x[0][1], x[1]))) # map to user and number of mutual friends
mutualCount = mutualCount.groupByKey().mapValues(list) # group by user

# sort the list of recommendations
# If there are recommended users with the same number of mutual friends, then output those user IDs in numerically ascending order.
# First sort by number of mutual friends, then by user ID
mutualCount = mutualCount.mapValues(lambda recommendations: sorted(recommendations, key=lambda x: (-x[1], x[0]))[:10]) # sort by number of mutual friends

#mutualCount_sort = mutualCount_sort.map(lambda x:(x[0], [i[0] for i in x[1]])) 
active = mutualCount.collect()  # collect the list of recommendations

# output the result
for i in range(len(active)):
        active[i] = str(active[i][0]) + "\t" + ",".join(str(item) for item in active[i][1])
lonely = lines.filter(lambda x:len(x)==1).flatMap(lambda x:x).collect()
complete = active + lonely
sc.parallelize(complete).repartition(1).saveAsTextFile('output')
sc.stop()


Exception in thread "serve RDD 13" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

In [6]:
# Clear Spark Context
sc.stop()

# Stop Spark Session
spark.stop()

In [15]:
# Read output
output = pd.read_csv('output/part-00000', sep='\t', header=None)
output.columns = ['User', 'Recommendations']
output.head()

Unnamed: 0,User,Recommendations
0,39721,"('22589', 2),('3937', 2),('45992', 2),('4738',..."
1,37010,"('10072', 13),('36679', 12),('36905', 12),('10..."
2,12628,"('12626', 2),('12631', 2),('12641', 2),('12649..."
3,10240,"('1100', 14),('27736', 11),('439', 9),('23305'..."
4,1698,"('1703', 8),('7466', 6),('909', 6),('24456', 5..."


In [16]:
# Extract recommendations for the following users: 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993
users = [924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993, 11]
selected = output[output['User'].isin(users)]
# Output USER ID == 11
selected[selected['User'] == 11]['Recommendations'].values

array(["('27552', 4),('27573', 3),('27574', 3),('27589', 3),('27590', 3),('27600', 3),('27617', 3),('27620', 3),('27667', 3),('32072', 3)"],
      dtype=object)

In [19]:
selected[selected['User'] == 9021]['Recommendations'].values

array(["('9020', 3),('9016', 2),('9017', 2),('9022', 2),('317', 1),('9023', 1)"],
      dtype=object)