In [2]:
#@title Download and install the PySpark packages

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=d4f6b70fcaba2c8afea17a1c62620e9ecd31e79697ed88a89e154f5429a1d6f5
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [3]:
#@title Setup Spark and Data

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [4]:
!wget https://course.ccs.neu.edu/cs6220/fall2023/homework-3/soc-LiveJournal1Adj.txt

--2024-02-06 02:24:52--  https://course.ccs.neu.edu/cs6220/fall2023/homework-3/soc-LiveJournal1Adj.txt
Resolving course.ccs.neu.edu (course.ccs.neu.edu)... 129.10.117.35
Connecting to course.ccs.neu.edu (course.ccs.neu.edu)|129.10.117.35|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4156181 (4.0M) [text/plain]
Saving to: ‘soc-LiveJournal1Adj.txt’


2024-02-06 02:24:53 (8.90 MB/s) - ‘soc-LiveJournal1Adj.txt’ saved [4156181/4156181]



In [5]:
def parse_line(line):
    parts = line.split('\t')
    user = int(parts[0])
    friends =  [] if parts[1]=='' else list(map(int, parts[1].split(',')))
    return user, friends

def mutual_friends(pair):
    (user, friend), mutual_friends = pair
    return user, (friend, mutual_friends)

# Load the data
raw_data = sc.textFile("soc-LiveJournal1Adj.txt")

# Convert data to key-value pairs (user, friends)
user_friends_rdd = raw_data.map(parse_line)

friend_pairs = user_friends_rdd\
.flatMap(lambda x: [(x[0], friend) for friend in x[1]])

#leave our all the pairs that are already taken to eradicate redundancy network counts
mutual_friends_pairs = friend_pairs\
.join(friend_pairs)\
.filter(lambda x: x[1][0] < x[1][1])

#filter out all the users that are direct friends.
mutual_friends_count = mutual_friends_pairs\
.map(lambda x: (x[1][0],x[1][1]))\
.join(user_friends_rdd)\
.filter(lambda x: x[1][0] not in x[1][1])\
.map(lambda x : ((x[0],x[1][0]),1))\
.reduceByKey(lambda x, y: x + y)


# Create a DataFrame from mutual_friends_count RDD
mutual_friends_df = mutual_friends_count.toDF(["user_pair", "mutual_friends_count"])
# Split user_pair column into two separate columns
mutual_friends_df = mutual_friends_df\
.withColumn("user1",mutual_friends_df.user_pair._1)\
.withColumn("user2", mutual_friends_df.user_pair._2)

# # Drop the user_pair column
mutual_friends_df = mutual_friends_df.drop("user_pair")

# # Group the data by user and create a list of recommendations
user_recommendations = mutual_friends_df.groupBy("user1").agg(
    collect_list(struct(col("user2"), col("mutual_friends_count"))).alias("mutual_friends")
)

In [6]:
# Function to extract top N recommendations based on the logic mentioned in the question
def get_top_recommendations(row, N=10):
    user = row[0]
    mutual_friends = sorted(row[1], key=lambda x: (-x[1], x[0]))[:N]
    recommendations = [friend for friend, _ in mutual_friends]
    return user, recommendations

# Apply the function to each row and collect the results
result = user_recommendations.rdd.map(get_top_recommendations)

In [17]:
#sorting by key to display the result in the text
final_result = result.sortByKey().collect()

#writing the results to the ouput file and named output.txt
with open("output.txt", "w") as file:
  for user, recommendations in final_result:
    file.write(f"{user}\t{','.join(map(str, recommendations))}"+"\n")

In [7]:
#printing the needed examples for the Document
for i in [ 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]:
  print(i, result.filter(lambda x : x[0]==i).take(1))

924 [(924, [2409, 6995, 11860, 15416, 43748, 45881])]
8941 [(8941, [8943, 8944])]
8942 [(8942, [8943, 8944])]
9019 [(9019, [9022, 9023])]
9020 [(9020, [9021, 9022, 9023])]
9021 [(9021, [9022, 9023])]
9022 [(9022, [9023])]
9990 [(9990, [13134, 13478, 13877, 34299, 34485, 34642, 37941])]
9992 [(9992, [35667])]
9993 [(9993, [13134, 13478, 13877, 34299, 34485, 34642, 37941])]
