# Friend Recommendation System using Common Neighbors

# Data Description
The data ([連結文字](https://snap.stanford.edu/data/ego-Facebook.html)) was collected from survey participants using a Facebook app developed by Stanford University. The data includes information from 4,031 users' friend networks, and the user IDs are encapsulated by some virtual user IDs.

# Data Structure
user1   friend_1<br>
user1   friend_2<br>
...<br>
user4031 friend_k

# Task: Friend Recommendation based on Common Friends
Suggest friends to each user based on similarity, where the similarity is estimated by common friends.

# Library Installization

In [None]:
!pip install pyspark findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ec2de98332350ab807d98a69f6cfb9ac5cb401876c6067e49d7db5391866adcc
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: findspark, pyspark
Successfully installed findspark-2.0.1 pyspark-3.5.1


In [None]:
def parse_line(line): # read the node_id and friend_id's from a record
  parts = line.split()
  node_id = int(parts[0])
  friend_id = int(parts[1])
  return (node_id, friend_id)

# Main Steps

In [None]:
class FriendRecommendation:

  def __init__(self, spark, file_path):
    self.spark = spark
    self.sc = spark.sparkContext # # instantiate a Spark Context
    self.file_path = file_path

  def run_recommendation(self, k = 10, output_file = False):

    def parse_line(line): # read the node_id and friend_id's from a record
      parts = line.split()
      node_id = int(parts[0])
      friend_id = int(parts[1])
      return (node_id, friend_id)

    matrix_rdd = self.sc.textFile(self.file_path) # read the text file
    parsed_matrix_rdd = matrix_rdd.map(parse_line) # output: [(node_id, friend_id), ...]
    # aggregate the friends from each node into a list
    node_neighbors_rdd = parsed_matrix_rdd.groupByKey().mapValues(list) # [(node_id, list_of_frds), ...]
    # generate a sequence of node pairs, where pairs of two same nodes are excluded (no interested in common friends of two same nodes)
    node_id_pairs = node_neighbors_rdd.cartesian(node_neighbors_rdd).filter(lambda x: x[0][0] != x[1][0])

    def find_common_neighbors(pair): # return number of common friends of a pair of nodes
      list1, list2 = pair[0][1], pair[1][1]
      neighbors1 = set(list1)
      neighbors2 = set(list2)
      return len(neighbors1.intersection(neighbors2))

    # input: pair of nodes; output: [(node1, node2, #common_frds), ...] (sorted by node1 ASC and #common_frds DESC)
    node_common_neighbors = node_id_pairs.map(lambda pair: (pair[0][0], pair[1][0], find_common_neighbors(pair))).sortBy(lambda x: (x[0], -1 * x[2]))

    node_common_neighbors_new = node_common_neighbors.map(lambda pair: (pair[0], pair[1]))
    node_common_neighbors_new = node_common_neighbors_new.groupByKey().mapValues(list)
    # output: [(node_id, list_of_frds), ...] (sorted by list_of_frds DESC)

    # exclude the existing frds from recommendation
    my_dict = node_neighbors_rdd.collectAsMap()
    def exclude_existing_frds(node_id, frd_list):
      return (node_id, [ele for ele in frd_list if ele not in my_dict[node_id]])
    result_rdd = node_common_neighbors_new.map(lambda x: exclude_existing_frds(x[0], x[1]))
    new_rdd = result_rdd.mapValues(lambda x: x[:k]).map(lambda x: (x[0], x[1])).sortBy(lambda x: x[0])
    # output: the k most recommended friends to each node: [(node_id, [frd1, ..., frd_k]), ...]

    # output the recommendation result to a textfile
    if output_file:
      with open("Recommendation based on Common Neighbors.txt", "w") as f:
        for line in new_rdd.collect():
          f.write(f"{line}\n")

    return new_rdd

In [None]:
"""
instantiate a SparkSession
use all the available threads
define the app name
"""
spark = SparkSession.builder \
     .master("local[*]") \
     .appName("Friends' Recommendation") \
     .getOrCreate()

# create an instance of the class and run the recommendation
recommendation = FriendRecommendation(spark, "/content/sample_data/facebook_entire.txt")
result_rdd = recommendation.run_recommendation(k = 10, output_file = True) # recommend k friends to each node and generate the output to a textfile

spark.stop() # terminate the SparkSession

# Output File
The output file is available on [連結文字](https://drive.google.com/file/d/1OysBhMlYp7xDmboTYtWfFVmD4TofiZB-/view?usp=sharing).