# Pipeline Sketch

## Please provide a description of how you used Spark to solve this problem.

For every user, we have a list of first-degree friends. In all the first degree friends, they all have at least 1 mutual friend, which is the user. And in all direct friends, where the user is mapped to each friend, cannot be the friends that are recommended to them. Thus, should be excluded in the recommended friends.

### For example,
```
User A: User B, User C, User D
User B: User A, User D, User C
```

i.e. All of the following pairs have at least 1 mutual friend, which is User A, but we don't really care of this information so we can ignore. And we count that as 1, indicating a mutual friend.

#### Mutual Friend List
```
  User 1, User 2
((User B, User C), 1)
((User B, User D), 1)
((User C, User D), 1)
((User A, User D), 1)
((User A, User C), 1)
((User D, User C), 1)
```
#### Direct Friend list
```
(User A, User B)
(User A, User C)
(User A, User D)
(User B, User A)
(User B, User D)
(User B, User C)
```
Since User B, User C are direct friends C can't be recommended to B and vice versa. However, User D can be recommended to User B because B is not friends with D. And since we are using a list where there is mutual friends to recommend user 2 to user 1, we also need a list in reverse to see if we want to recommend user 1 to user 2.

#### Updated Mutual Friend List for recommendation

```
  User 1, User 2
((User B, User C), 1)
((User B, User D), 1)
((User C, User D), 1)
((User A, User D), 1)
((User A, User C), 1)
((User D, User C), 2)
((User C, User B), 1)
((User D, User B), 1)
((User D, User A), 1)
((User C, User A), 1)
((User C, User D), 1)
```

When there's more than one (User 1, User 2), we can accumulate them by adding them together. Then we can sort them in descending order of number of mutual friends, then the potential recommendation in ascending order(User2). Then format them as

```
(User 1, (User 2, mutual_count))...
```
Then, again map to:
```
(User 1, User 2)
```
Then group by User A to get the result
At the end, map them according to User 1 and accumulate a list of recommendation by limiting to 10.
```
User\t'recommended users'
```




In [124]:
#@title Datapath for the file
datapath = '/content/drive/MyDrive/6220/assignment/soc-LiveJournal1Adj.txt' #@param

# Install appropriate packages

In [125]:
#@title Install all appropriate packages

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

openjdk-8-jdk-headless is already the newest version (8u382-ga-1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 18 not upgraded.


# Import and create Spark Context

In [126]:
#@title Import and create Spark Context

# import necessary libraries
import itertools

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [127]:
# mount drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Read the data

## Reads the data and splits the data between the tabs
### The format:

```
(<User1>, <Friend1,Friend2,..>), (<User2>, <Friendsss...>)
```



### For testing purpose, I took a sample size starting with 20 to 1000.

In [128]:
# limited_lines = sc.textFile(datapath, 1).take(100)
# lines = sc.parallelize(limited_lines)
# friendship = lines.map(lambda line: line.split()).zipWithIndex().map(lambda x: x[0]).map(lambda x: (x[0], x[1].split(',') if x and len(x) > 1 else ''))
# friendship.collect()

In [129]:
lines = sc.textFile(datapath, 1)
friendship = lines.map(lambda line: line.split()).zipWithIndex().map(lambda x: x[0]).map(lambda x: (x[0], x[1].split(',') if x and len(x) > 1 else ''))

## Helper function to find all the permutation of those with a mutual friend

In [130]:
def mutual_frd(frd_list):
  res = []
  for i in range(len(frd_list) - 1):
    for j in range(i + 1, len(frd_list)):
      res.append(((frd_list[i], frd_list[j]), 1))
      res.append(((frd_list[j], frd_list[i]), 1))
  return res

## List of the direct friends

In [131]:
direct_frd = friendship.flatMap(lambda x: [(x[0], x[1][i]) for i in range(len(x[1]))]).collect()
direct_frd = set(direct_frd)

## Apply the mutual friend and direct friend as helper functions.
### For those with mutual friends, we make sure that they are also not direct friends (by filter) and count them (reduceByKey).

In [132]:
mutual_frd_list = friendship.flatMap(lambda x: mutual_frd(x[1])).filter(lambda x: x[0] not in direct_frd).reduceByKey(lambda x, y: x + y)

## Format mutual friend list in a way



```
((key), (value, mutual_count)),...
```
## Then, subsequently, sort the number of mutual friends in a descending order of the number of mutual friends, then

```
user (key), potential recommendation (value)
```


In [133]:
format_mutual_list = mutual_frd_list.map(lambda x: (int(x[0][0]), (int(x[0][1]), x[1]))).sortBy(lambda x: (-x[1][1], x[1][0])).map(lambda x: (x[0], x[1][0]))

## Generate a list of recommended friends

In [134]:
recommendation = format_mutual_list.groupByKey().mapValues(lambda x: list(x)[:10])

## Format output as required

```
<User><Tab><Recommendation>...
```

In [135]:
check = {11, 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993}
res = recommendation.filter(lambda entry: entry[0] in check).map(lambda entry: str(entry[0]) + '\t' + ','.join(str(elt) for elt in entry[1]))
res_list = res.collect()

In [136]:
# Specify the output file path
output_file_path = '/content/drive/MyDrive/6220/assignment/output.txt'

# Write the formatted recommendations to the output file
with open(output_file_path, 'w') as output_file:
    for item in res_list:
        output_file.write(item + '\n')

print(f"Recommendations have been written to {output_file_path}")

Recommendations have been written to /content/drive/MyDrive/6220/assignment/recommendations.txt


## Stop SparkContext after completion

In [137]:
sc.stop()