In [1]:
import os

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

import itertools

In [2]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/18 15:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
#read text file
lines = sc.textFile('q1/data/soc-LiveJournal1Adj.txt')

In [7]:
lines.take(1)

                                                                                

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94']

In [8]:
def line_to_groups(line):
    #parse line from txt to structure below
    #eg. 0 /t 1,2,3 -> (0,[1,2,3])
    
    user = int(line.split()[0])
    
    if len(line.split()) == 1:
        friends = []
    else:
        friends = list(map(lambda x: int(x), line.split()[1].split(',')))
    
    return user, friends

In [9]:
groups = lines.map(line_to_groups)
#groups.take(2)

In [10]:
def friend_ownership_to_connection(f_o):

    user_id = f_o[0]
    friends = f_o[1]

    connections = []

    for friend_id in friends:
        key = (user_id, friend_id)
        if user_id > friend_id:
            key = (friend_id, user_id)

        connections.append((key, 0))

    for friend_pair in itertools.combinations(friends, 2):
        friend_0 = friend_pair[0]
        friend_1 = friend_pair[1]

        key = (friend_0, friend_1)
        if friend_0 > friend_1:
            key = (friend_1, friend_0)
        connections.append((key, 1))

    return connections

In [11]:
scores = groups.flatMap(friend_ownership_to_connection)

In [12]:
scores.take(10)

[((0, 1), 0),
 ((0, 2), 0),
 ((0, 3), 0),
 ((0, 4), 0),
 ((0, 5), 0),
 ((0, 6), 0),
 ((0, 7), 0),
 ((0, 8), 0),
 ((0, 9), 0),
 ((0, 10), 0)]

In [13]:
already_friends = scores.filter(lambda relationship: relationship[1] == 0)

In [14]:
#filter out the already friends pairs
mutual_friends_pairs = scores.subtractByKey(already_friends)

In [15]:
mutual_friends_counts = mutual_friends_pairs.reduceByKey(lambda x,y: x+y)

In [16]:
mutual_friends_counts.take(10)

                                                                                

[((19083, 29251), 1),
 ((14581, 18585), 1),
 ((10, 16876), 1),
 ((1528, 40998), 1),
 ((35013, 42253), 1),
 ((168, 19150), 1),
 ((1220, 23922), 1),
 ((18309, 39825), 1),
 ((13928, 34986), 1),
 ((34290, 43728), 1)]

In [17]:
def mutual_connection_score(y):
    # assign score for both users 
    # eg.((8,16), 3) -> (8, (16,3)) (16, (8,3))
    
    pair = y[0]
    score = y[1]

    u0 = pair[0]
    u1 = pair[1]

    rec0 = (u0, (u1, score))
    rec1 = (u1, (u0, score))
    
    return [rec0, rec1]

In [18]:
def recommend_10(recs):
    #return a list of top 10 recs, for same scores, sort by user_id  
    
    #sort by the score (second value) descending, and user_id (first value) ascending
    recs.sort(key=lambda x: (-x[1],x[0]))

    #select only recommended users and return the first 10 
    return list(map(lambda x: x[0], recs))[:10]

In [19]:
recommendations = mutual_friends_counts.flatMap(mutual_connection_score) \
    .groupByKey() \
    .map(lambda m: (m[0], recommend_10(list(m[1]))))

In [20]:
user_IDs = [924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]

In [21]:
print(recommendations.filter(lambda x: x[0] == 9021).collect())



[(9021, [9020, 9016, 9017, 9022, 317, 9023])]


                                                                                

In [22]:
for i in user_IDs:
    print(recommendations.filter(lambda x: x[0] == i).collect())

                                                                                

[(924, [439, 2409, 6995, 11860, 15416, 43748, 45881])]


                                                                                

[(8941, [8943, 8944, 8940])]


                                                                                

[(8942, [8939, 8940, 8943, 8944])]


                                                                                

[(9019, [9022, 317, 9023])]


                                                                                

[(9020, [9021, 9016, 9017, 9022, 317, 9023])]


                                                                                

[(9021, [9020, 9016, 9017, 9022, 317, 9023])]


                                                                                

[(9022, [9019, 9020, 9021, 317, 9016, 9017, 9023])]


                                                                                

[(9990, [13134, 13478, 13877, 34299, 34485, 34642, 37941])]


                                                                                

[(9992, [9987, 9989, 35667, 9991])]




[(9993, [9991, 13134, 13478, 13877, 34299, 34485, 34642, 37941])]


                                                                                

In [None]:
#recommendations.saveAsTextFile("test1.txt")