<a href="https://colab.research.google.com/github/angadbawa/Who-is-Who-s-Friend-/blob/main/friend_recommend.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import sys
import itertools
from pyspark import SparkConf, SparkContext


def line_to_friend_ownership(line):
   
    split = line.split()
    user_id = int(split[0])

    if len(split) == 1:
        friends = []
    else:
        friends = list(map(lambda x: int(x), split[1].split(',')))

    return user_id, friends


def friend_ownership_to_connection(f_o):
    user_id = f_o[0]
    friends = f_o[1]

    connections = []

    for friend_id in friends:
        key = (user_id, friend_id)
        if user_id > friend_id:
            key = (friend_id, user_id)

        connections.append(
            (key, 0)
        )

    for friend_pair in itertools.combinations(friends, 2):
        friend_0 = friend_pair[0]
        friend_1 = friend_pair[1]

        key = (friend_0, friend_1)
        if friend_0 > friend_1:
            key = (friend_1, friend_0)
        connections.append(
            (key, 1)
        )

    return connections


def mutual_friend_count_to_recommendation(m):
    """
    Maps a "mutual friend count" object to two distinct recommendations. The value
    ``((0, 1), 21)`` encodes that users 0 and 1 share 21 mutual friends. This means that user 1 should be recommended
    to user 0 AND that user 0 should be recommended to user 1. For every input to this function, two "recommendations"
    will be returned in a List.
    A "recommendation" has the following form::
        (user_id_0, (recommended_user, mutual_friends_count))
    :param m: a mutual friend count item
    :return: List[Tuple[int, Tuple[int, int]]] two recommendation items
    """
    connection = m[0]
    count = m[1]

    friend_0 = connection[0]
    friend_1 = connection[1]

    recommendation_0 = (friend_0, (friend_1, count))
    recommendation_1 = (friend_1, (friend_0, count))

    return [recommendation_0, recommendation_1]


def recommendation_to_sorted_truncated(recs):
    if len(recs) > 1024:
        # Before sorting, find the highest 10 elements in recs (if log(len(recs)) > 10)
        # This optimization runs in O(n), where n is the length of recs. This is so that sorting the best 10
        # recommendations can run in constant time. Otherwise, sorting the whole list would run in O(n lgn). 
        # As long as n > 1024 (or, in other words, lg(n) > 10), this is faster.

        max_indices = []

        for current_rec_number in range(0, 10):
            current_max_index = 0
            for i in range(1, len(recs)):
                rec = recs[i]
                if rec[1] >= recs[current_max_index][1] and i not in max_indices:
                    current_max_index = i

            max_indices.append(current_max_index)

        recs = [recs[i] for i in max_indices]

    # Sort first by mutual friend count, then by user_id (for equal number of mutual friends between users)
    recs.sort(key=lambda x: (-x[1], x[0]))

    # Map every [(user_id, mutual_count), ...] to [user_id, ...] and truncate to 10 elements
    return list(map(lambda x: x[0], recs))[:10]


# ============ #
# MAIN PROGRAM #
# ============ #

# Initialize spark configuration and context

# Read from text file, split each line into "words" by any whitespace (i.e. empty parameters to string.split())
lines = sc.textFile("FriendData.txt")

# Map each line to the form: (user_id, [friend_id_0, friend_id_1, ...])
friend_ownership = lines.map(line_to_friend_ownership)

# Map each "friend ownership" to multiple instances of ((user_id, friend_id), VALUE).
# VALUE = 0 indicates that user_id and friend_id are already friends.
# VALUE = 1 indicates that user_id and friend_id are not friends.
friend_edges = friend_ownership.flatMap(friend_ownership_to_connection)
friend_edges.cache()

# Filter all pairs of users that are already friends, then sum all the "1" values to get their mutual friend count.
mutual_friend_counts = friend_edges.groupByKey() \
    .filter(lambda edge: 0 not in edge[1]) \
    .map(lambda edge: (edge[0], sum(edge[1])))

# Create the recommendation objects, group them by key, then sort and truncate the recommendations to the 10 most
# highly recommended.
recommendations = mutual_friend_counts.flatMap(mutual_friend_count_to_recommendation) \
    .groupByKey() \
    .map(lambda m: (m[0], recommendation_to_sorted_truncated(list(m[1]))))

# Save to output directory, end context
recommendations.saveAsTextFile("Result")
sc.stop()

In [None]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=9018e32e130bc4d540968ecfacacb4889e0df17a6c9106e4e57670ba07d6edf1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
