# Q2

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u382-ga-1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 18 not upgraded.


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
from pyspark import SparkConf, SparkContext
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [4]:
# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [5]:
# ID of the file on Google Drive
id = '18dmpXLkhaXA3o6eqckTeyp7exz_jsEFS'

# Download 'sales-data-set.csv'
facebook_downloaded = drive.CreateFile({'id': id})
facebook_downloaded.GetContentFile('ego-facebook.txt')

In [6]:
# Initialize Spark
conf = SparkConf().setAppName("PeopleYouMightKnow")
sc = SparkContext(conf=conf)

# Load the friendship data from the input file
data = sc.textFile("ego-facebook.txt")

In [7]:
def friend_pairs(line):
    split = line.split()
    user1 = int(split[0])
    user2 = int(split[1])
    friendship = (user1, user2)
    return friendship

# Transform the data RDD to friendship_data
friendship_data = data.map(friend_pairs)

# Group friends by user using groupByKey
friendship_grouped = friendship_data.groupByKey()

# Convert the grouped data to the desired format
friendship_pairs = friendship_grouped.map(lambda x: (x[0], list(x[1]))).sortByKey()

# Extract all distinct users from the RDD
distinct_users = friendship_data.flatMap(lambda pair: (pair[0], pair[1])).distinct()

# Calculate the number of distinct users
number_of_users = distinct_users.count()

# Create a dictionary to store the direct friends
direct_friends_dict = {}

# Convert the RDD to a list of tuples for easier processing
friendship_list = friendship_pairs.collect()

# Iterate through the list of tuples to find direct friends
for _user in range(1, number_of_users + 1):
    direct_friends = []
    for user, friends in friendship_list:
        if _user in friends:
            direct_friends.append(user)
    direct_friends_dict[_user] = direct_friends

# Create an RDD from the direct_friends_dict
direct_friends_rdd = sc.parallelize(direct_friends_dict.items()).sortByKey()

# Combine RDD
combined_direct_friends_rdd = direct_friends_rdd.union(friendship_pairs).reduceByKey(lambda x,y : x+y).sortByKey()

In [8]:
from itertools import combinations

def generate_user_connections(user_friends):
    user_id = user_friends[0]
    friends = user_friends[1]
    connections = []

    # Create connections between the user and their friends
    for friend in friends:
        connection_key = (user_id, friend) if user_id < friend else (friend, user_id)
        connections.append((connection_key, 0))

    # Create connections between pairs of friends
    friend_pairs = combinations(friends, 2)
    for friend_pair in friend_pairs:
        friend_1, friend_2 = friend_pair
        connection_key = (friend_1, friend_2) if friend_1 < friend_2 else (friend_2, friend_1)
        connections.append((connection_key, 1))

    return connections

friend_connections = combined_direct_friends_rdd.flatMap(generate_user_connections)

In [9]:
# Group the friend_connections RDD by key
grouped_connections = friend_connections.groupByKey()

# Convert the values from ResultIterable to Python lists
grouped_connections_with_lists = grouped_connections.mapValues(list)

# Filter out pairs with 0 values
filtered_connections = grouped_connections_with_lists.filter(lambda pair: 0 not in pair[1])

# Calculate the sum of values for each key using reduceByKey
total_connections = filtered_connections.map(lambda values: (values[0], sum(values[1])))

In [11]:
def transform_connection_data(pair_count):
    pair, count = pair_count
    first_user, second_user = pair
    connection_a = (first_user, (second_user, count))
    connection_b = (second_user, (first_user, count))
    return [connection_a, connection_b]

user_relationships_data = total_connections.flatMap(transform_connection_data)

In [12]:
# Apply groupByKey on the 'recommendations' RDD
grouped_recommendations = user_relationships_data.groupByKey()

# Convert the values from ResultIterable to Python lists
grouped_recommedations_with_lists = grouped_recommendations.mapValues(list)

# Sort the grouped recommendations by user ID in ascending order
sorted_grouped_recommendations = grouped_recommedations_with_lists.sortByKey()

# Sort the recommendations based on the given criteria
def custom_sort(record):
    user_id, recommendations = record
    sorted_recs = sorted(recommendations, key=lambda x: (-x[1], x[0]))
    # Check if there are fewer than 10 recommendations, and if so, keep them all
    if len(sorted_recs) < 10:
        return (user_id, [rec[0] for rec in sorted_recs])

    # If there are more than 10 recommendations, return the top 10
    return (user_id, [rec[0] for rec in sorted_recs[:10]])

sorted_recommendations = sorted_grouped_recommendations.map(custom_sort)

In [13]:
# # Filter the results for the desired user ID
# filtered_result = sorted_recommendations.filter(lambda x: x[0] in [1571])

# # Collect and print the filtered results
# filtered_data = filtered_result.collect()
# print(filtered_data)

In [14]:
# Convert the results to the desired output format
output_data = sorted_recommendations.map(lambda x: f"{x[0]}\t{','.join(map(str, x[1]))}")

# Save the output to a text file
output_data.coalesce(1, True).saveAsTextFile("/content/test2.txt")

# Stop the Spark context
sc.stop()