ID: V01053626
Name: Newsha Bahardoost

Sketch:This PySpark code recommends friends by analyzing mutual connections. It loads user-friend data, extracts individual relationships, and finds pairs of users who share mutual friends. After counting shared connections, it filters out existing friendships and ranks the top 10 potential friends for each user. Finally, it selects and displays recommendations for specific target users.

In [4]:
from pyspark.sql import SparkSession #Read data, Process data, and save the processed data
from pyspark.sql.functions import col, split, explode, collect_list, row_number, array_join #imports useful functions from PySpark's functions
from pyspark.sql.window import Window # The Window function enables operations that depend on the relationships between rows like ranking
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')


spark = SparkSession.builder \
    .appName("MutualFriendRecommendation") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

raw_data = spark.read.text("/content/p1-users.txt")

user_friends = raw_data.select(
    split(col("value"), "\t").getItem(0).cast("int").alias("user"),
    split(split(col("value"), "\t").getItem(1), ",").cast("array<int>").alias("friends")
).cache()

user_friend_pairs = user_friends \
    .select("user", explode("friends").alias("friend")) \
    .cache()

mutual_friends = user_friend_pairs.alias("uf1") \
    .join(
        user_friend_pairs.alias("uf2"),
        (col("uf1.friend") == col("uf2.friend")) &
        (col("uf1.user") < col("uf2.user"))
    ) \
    .select(
        col("uf1.user").alias("user1"),
        col("uf2.user").alias("user2"),
        col("uf1.friend").alias("mutual_friend")
    )

mutual_friend_counts = mutual_friends \
    .groupBy("user1", "user2") \
    .count()

existing_friends = user_friend_pairs \
    .select(
        col("user").alias("user1"),
        col("friend").alias("user2")
    ).distinct()

recommendations = mutual_friend_counts \
    .join(
        existing_friends,
        ["user1", "user2"],
        "left_anti"
    )

window_spec = Window.partitionBy("user1").orderBy(col("count").desc(), col("user2").asc())


final_recommendations = recommendations \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 10) \
    .groupBy("user1") \
    .agg(collect_list("user2").alias("recommendations")) \
    .select(
        col("user1").alias("user"),
       array_join(col("recommendations"), ",").alias("recommendations")
    )

target_users = [11, 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]

result = final_recommendations \
   .filter(col("user").isin(target_users)) \
   .orderBy("user") \
   .select("user", "recommendations")


for row in result.collect():
    user = row["user"]
    recommendations = row["recommendations"]
    print(f"{user}\t{recommendations}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
11	27552,7785,27573,27574,27589,27590,27600,27617,27620,27667
924	2409,6995,11860,15416,43748,45881
8941	8943,8944
8942	8943,8944
9019	9022,9023
9020	9021,9022,9023
9021	9022,9023
9022	9023
9990	13134,13478,13877,34299,34485,34642,37941
9992	35667
9993	13134,13478,13877,34299,34485,34642,37941
