## Question 1 

In [1]:
# Import the libraries we will need
import pandas as pd
import numpy as np

import findspark
findspark.init()

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
import pyspark.pandas as ps

# create the Spark Session
spark = SparkSession.builder.appName("Q1").getOrCreate()



In [2]:
friend = spark.read.csv("hw1-bundle/hw1-bundle/q1/data/soc-LiveJournal1Adj.txt", sep='\t')
friend = friend.toDF("User", "Friends")
friend = friend.withColumn("User", friend["User"].cast("int"))
friend = friend.withColumn("Friends", split(friend["Friends"], ",").cast("array<int>"))

no_friend = friend.filter(size(friend["Friends"]) == -1).withColumnRenamed("Friends", "Recommendations") # filter the users with no friends

friend = friend.filter(size(friend["Friends"]) != -1)

In [3]:
all_user_ids = friend.select("User").rdd.flatMap(lambda x:x).collect()
all_user_ids = spark.createDataFrame([(all_user_ids,)], ["All Users"])
friend = friend.crossJoin(all_user_ids)

In [4]:
# all_user_id = friend.select("User")
unfriend = friend.withColumn("Unfriends", array_except(friend["All Users"], friend["Friends"])).select("User", "Unfriends")
unfriend = unfriend.withColumn("Unfriend", explode(unfriend["Unfriends"]))
unfriend = unfriend.withColumn("Unfriend", unfriend["Unfriend"].cast("int"))
unfriend = unfriend.filter(unfriend["User"] != unfriend["Unfriend"])

friend = friend.select(["User", "Friends"])

In [5]:
mutual_friend = unfriend.join(friend, on="User", how="left").withColumnRenamed("Friends", "User's Friends")
mutual_friend = mutual_friend.join(friend.withColumnRenamed("User", "Unfriend"), on="Unfriend", how="left").withColumnRenamed("Friends", "Unfriend's Friends")
mutual_friend = mutual_friend.withColumn("Mutual Friends", array_intersect(mutual_friend["User's Friends"], mutual_friend["Unfriend's Friends"]))
mutual_friend = mutual_friend.withColumn("Num of Mutual Friends", size(mutual_friend["Mutual Friends"]))
mutual_friend = mutual_friend.select(["User", "Unfriend", "Num of Mutual Friends"])


In [6]:
window_spec = Window.partitionBy("User").orderBy(desc("Num of Mutual Friends"), asc("Unfriend"))

In [7]:
mutual_friend = mutual_friend.withColumn("rank", row_number().over(window_spec))
mutual_friend = mutual_friend.filter((mutual_friend["rank"] <= 10) & (mutual_friend["Num of Mutual Friends"] > 0))

In [8]:
mutual_friend = mutual_friend.select(["User", "Unfriend"]).groupby("User").agg(collect_list("Unfriend").alias("Recommendations"))

In [9]:
final = mutual_friend.union(no_friend).sort(asc("user"))

In [10]:
final.filter((final["User"] == 924) | 
             (final["User"] == 8941) | 
             (final["User"] == 8942) |
             (final["User"] == 9019) |
             (final["User"] == 9020) |
             (final["User"] == 9021) | 
             (final["User"] == 9022) | 
             (final["User"] == 9990) |
             (final["User"] == 9992) |
             (final["User"] == 9993)).take(10)

[Row(User=924, Recommendations=[439, 2409, 6995, 11860, 15416, 43748, 45881]),
 Row(User=8941, Recommendations=[8943, 8944, 8940]),
 Row(User=8942, Recommendations=[8939, 8940, 8943, 8944]),
 Row(User=9019, Recommendations=[9022, 317, 9023]),
 Row(User=9020, Recommendations=[9021, 9016, 9017, 9022, 317, 9023]),
 Row(User=9021, Recommendations=[9020, 9016, 9017, 9022, 317, 9023]),
 Row(User=9022, Recommendations=[9019, 9020, 9021, 317, 9016, 9017, 9023]),
 Row(User=9990, Recommendations=[13134, 13478, 13877, 34299, 34485, 34642, 37941]),
 Row(User=9992, Recommendations=[9987, 9989, 35667, 9991]),
 Row(User=9993, Recommendations=[9991, 13134, 13478, 13877, 34299, 34485, 34642, 37941])]