<a href="https://colab.research.google.com/github/JulieLiu99/Data-Mining/blob/main/hw1_q1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Set up Spark on the Colab environment.

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Authenticate a Google Drive client to download the file we will be processing in our Spark job.

In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [None]:
# check the current version and get the link of the web interface
spark

In [None]:
data = sc.textFile('data/soc-LiveJournal1Adj.txt') # read in data
data.take(5) # check the first 5 elements in the RDD.

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94',
 '1\t0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592',
 '2\t0,117,135,1220,2755,12453,24539,24714,41456,45046,49927,6893,13795,16659,32828,41878',
 '3\t0,12,41,55,1532,12636,13185,27552,38737',
 '4\t0,8,14,15,18,27,72,80,15326,19068,19079,24596,42697,46126,74,77,33269,38792,38822']

In [None]:
# import itertools

# def make_tuples(line): # gives error, needs cleaning

#   user, friends = line.split('\t')
#   friends = friends.split(',')

#   already_friends = [((user, friend), 0) for friend in friends]
#   have_mutual_friends = [(pair, 1) for pair in itertools.permutations(friends, 2)]

#   return already_friends + have_mutual_friends
def make_tuples(line):
  tokens = line.split('\t')

  if len(tokens) == 1 or tokens[0] == '':
    return []

  user = int(tokens[0])
  friends = tokens[1].split(',')

  already_friends = []
  have_mutual_friends = []

  for friend in friends:
    if friend != '':
      already_friends.append(((user, int(friend)), 0))

  for i in range(0, len(friends) - 1):
    for j in range(i + 1, len(friends)):
      have_mutual_friends.append(((int(friends[i]), int(friends[j])), 1))
      have_mutual_friends.append(((int(friends[j]), int(friends[i])), 1))

  return already_friends + have_mutual_friends

In [None]:
all_pairs = data.flatMap(lambda line: make_tuples(line))

In [None]:
all_pairs.takeSample(False, 5) # random sample (withReplacement, num, seed=None)

[((14049, 13993), 1),
 ((34304, 11384), 1),
 ((10097, 44081), 1),
 ((16861, 16984), 1),
 ((27484, 12943), 1)]

In [None]:
# a separate filter because "cannot unpack non-iterable PipelinedRDD object": already_friends, all_pairs = data.flatMap(lambda line: make_tuples(line))
already_friends = all_pairs.filter(lambda pair: pair[1] == 0) 

In [None]:
already_friends.take(5)

[((0, 1), 0), ((0, 2), 0), ((0, 3), 0), ((0, 4), 0), ((0, 5), 0)]

In [None]:
# get non-friends ((A, B), 1) 
# reduce by key to ((A,  B),  n)
# rearrange to (A, (B,n))
# group and map values to (A, [(rec1, n1), (rec2, n2), ...])
mutual_friends_pairs = all_pairs.subtractByKey(already_friends).\
                       reduceByKey(lambda a, b: a + b).\
                       map(lambda x: (x[0][0], (x[0][1], x[1]))).\
                       groupByKey().\
                       mapValues(list)

In [None]:
mutual_friends_pairs.take(1)

[(35224,
  [(32742, 1),
   (32386, 1),
   (37966, 1),
   (7738, 1),
   (38082, 1),
   (35250, 1),
   (4482, 1),
   (35270, 1),
   (35226, 1),
   (35206, 1),
   (35262, 2),
   (9734, 1),
   (32782, 2),
   (35222, 1),
   (4414, 1),
   (12162, 1),
   (32806, 1),
   (35178, 1),
   (35186, 7),
   (30162, 1),
   (32766, 1),
   (7018, 1),
   (32774, 1),
   (35274, 4),
   (35254, 1),
   (20142, 1),
   (42530, 1),
   (41906, 1),
   (32818, 1),
   (35266, 3),
   (35246, 1),
   (35238, 1),
   (4402, 1),
   (35218, 8),
   (3606, 1),
   (40818, 1),
   (35190, 1),
   (9810, 1),
   (35258, 1),
   (37954, 2),
   (35234, 5),
   (46282, 1),
   (8278, 1),
   (32798, 1),
   (32574, 1),
   (32814, 1),
   (23794, 1),
   (49618, 1),
   (39070, 2),
   (6978, 1),
   (35202, 3),
   (36354, 2),
   (35242, 1),
   (35194, 1),
   (35210, 3),
   (32801, 1),
   (35257, 1),
   (4433, 1),
   (16893, 1),
   (32737, 1),
   (4397, 1),
   (5249, 1),
   (35185, 1),
   (35205, 1),
   (35245, 3),
   (33429, 1),
   (37945, 2),

In [None]:
def recommend_new_friends(user_recommends, K = 10): # get the non-friends with top K = 10 number of mutual friends
  user, recommends = user_recommends

  sorted_recommends = sorted(recommends, key = lambda user_n_mutual: (-user_n_mutual[1], user_n_mutual[0]))[:K]

  result = []
  for user, n_mutual in sorted_recommends:
    result.append(user)

  return user, result

In [None]:
result = mutual_friends_pairs.map(lambda user_recommends: recommend_new_friends(user_recommends)).\
         map(lambda user_recommendations: "{}: {}".format(user_recommendations[0], ",".join(map(lambda x: str(x), user_recommendations[1])))).\
         collect()

In [None]:
IDs = ['924', '8941', '8942', '9019', '9020', '9021', '9022', '9990', '9992', '9993']

for id in IDs:
  for line in result:
    user, recommendations = line.split(': ')
    if user == id:
      print(line)

9022: 317,353,4801,9016,9017,9018,9019,9020,9021,9022


In [None]:
sc.stop()