In [7]:
import pyspark

In [8]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = (
    SparkSession.builder.appName("HW3")
    .master("local[*]")
    .config("spark.driver.maxResultSize", 0)
    .getOrCreate()
)

sc: SparkContext = spark.sparkContext

In [9]:
twit_rdd = sc.textFile("twitter_data.jsonl")

In [42]:
import json

json_rdd = twit_rdd.map(lambda x: json.loads(x))

In [43]:
def create_row_of_user_twit_matrix(x):
    related_twit_ids = []
    user_id = x["user"]["id"]
    # Set the score of each twit to one
    if x["tweet_type"] == "generated":
        if "id" in x:
            related_twit_ids.append((x["id"], 1))
    if x["tweet_type"] == "replied":
        if "in_reply_to_status_id_str" in x:
            related_twit_ids.append((x["in_reply_to_status_id_str"], 1))
    if x["tweet_type"] == "quoted":
        if "id" in x:
            related_twit_ids.append((x["id"], 1))
        if "quoted_status" in x:
            if "id" in x["quoted_status"]:
                related_twit_ids.append((x["quoted_status"]["id"], 1))
    if x["tweet_type"] == "retweeted":
        if "id" in x:
            related_twit_ids.append((x["id"], 1))
        if "retweeted_status" in x:
            if "id" in x["retweeted_status"]:
                related_twit_ids.append((x["retweeted_status"]["id"], 1))
    return (user_id, (related_twit_ids))


user_twit_matrix_rdd = (
    json_rdd.map(create_row_of_user_twit_matrix)
    .reduceByKey(lambda x, y: x + y)
    .map(lambda x: (x[0], list(x[1])))
)

In [44]:
from collections import defaultdict


def update_scores_user_twit_matrix(x):
    sums_dict = defaultdict(int)
    for unique_id, number in x[1]:
        sums_dict[unique_id] += number

    result_list = list(sums_dict.items())
    return (x[0], result_list)


updated_scores_user_twit_matrix_rdd = user_twit_matrix_rdd.map(
    update_scores_user_twit_matrix
).sortBy(lambda x: len(x[1]), ascending=False)

                                                                                

In [None]:
import random

# This parameter shows the number of samples used for testing the model.
number_of_samples = 100
test_users_matrix = updated_scores_user_twit_matrix_rdd.filter(lambda x: len(x[1])>1).takeSample(
    False, number_of_samples
)
test_users_ids = [item[0] for item in test_users_matrix]



# Apply filter on matrix in the rows which are in the test_users matrix
def filter_test_users(x):
    updated_twits = []  # Their score has set to 0
    new_twits_scores = []
    if x[0] in test_users_ids:
        number_of_random_samples = len(x[1]) // 2
        if number_of_random_samples > 0:
            sampled_twit_from_one_user = random.sample(x[1], number_of_random_samples)
            for twit_id, score in sampled_twit_from_one_user:
                updated_twits.append((twit_id, 0))
            new_twits_scores = [
                item for item in x[1] if item[0] not in test_users_ids
            ] + updated_twits
            return (x[0], new_twits_scores), (x[0], sampled_twit_from_one_user)
        else:
            return x, None
    else:
        return x, None


user_twit_matrix_for_test_rdd = updated_scores_user_twit_matrix_rdd.map(
    lambda x: filter_test_users(x)[0]
)

test_users_selected_twits_with_user_ids = updated_scores_user_twit_matrix_rdd.map(
    lambda x: filter_test_users(x)[1]
).filter(lambda x: True if x is not None else False).collect()

In [53]:
updated_scores_user_twit_matrix_list = updated_scores_user_twit_matrix_rdd.collect()
updated_scores_user_twit_matrix_list

                                                                                

[('728252911675944961',
  [('1719588219304693793', 1),
   ('1719605952310026748', 1),
   ('1719616655674728807', 1),
   ('1719629837386596514', 1),
   ('1719636834412642624', 1),
   ('1719699231752036545', 1),
   ('1719709925604016238', 1),
   ('1719711298391261254', 1),
   ('1719722407156404726', 1),
   ('1719734798745297201', 1),
   ('1719769872370950201', 1),
   ('1719783631214796822', 1),
   ('1719815104894554376', 1),
   ('1719856570530476052', 1),
   ('1719873531809308889', 1),
   ('1719883250422755339', 1),
   ('1719930338040861084', 1),
   ('1720002811767796204', 1),
   ('1720003834724925737', 1),
   ('1720005701429350638', 1),
   ('1720032654697857101', 1),
   ('1720071781594038347', 1),
   ('1720107158786682930', 1),
   ('1720108146213949897', 1),
   ('1720143502053450228', 1),
   ('1720151046566105314', 1),
   ('1720162395400614273', 1),
   ('1720173705278501233', 1),
   ('1720192643810971726', 1),
   ('1720200132027666753', 1),
   ('1720292656813666441', 1),
   ('1720304174

In [26]:
main_user_id = "931938860963061760"
main_user_twit_list = []
for item in updated_scores_user_twit_matrix_list:
    if item[0] == main_user_id:
        main_user_twit_list = main_user_twit_list + item[1]
        break
abs_main_user_twit_list = 0
for twit_id, score in main_user_twit_list:
    abs_main_user_twit_list = abs_main_user_twit_list + (int(score)) ** 2
print(abs_main_user_twit_list)

285


In [28]:
import math


def calculate_cosine_similarity(x):
    all_twit_list = main_user_twit_list + x[1]

    mult_dict = defaultdict(lambda: 1)
    count_dict = defaultdict(int)  # Keep track of the count for each unique_id
    abs_another_user_twit_list = 0

    for unique_id, number in all_twit_list:
        mult_dict[unique_id] *= number
        count_dict[unique_id] += 1

    for unique_id, count in count_dict.items():
        if count == 1:
            mult_dict[unique_id] = 0

    for twit_id, score in x[1]:
        abs_another_user_twit_list = abs_another_user_twit_list + (int(score)) ** 2

    vector_product = sum(mult_dict.values(), 0)
    cosine_similarity = vector_product / (
        math.sqrt(abs_another_user_twit_list * abs_main_user_twit_list)
    )
    return ((x[0], cosine_similarity), x[1])


def filter_similarity_result(x, threshold):
    if x[0][1] > threshold:
        return True
    else:
        return False


cosine_similarity_calculation = (
    updated_scores_user_twit_matrix_rdd.map(calculate_cosine_similarity)
    .sortBy(lambda x: x[0][1], ascending=False)
    .filter(lambda x: filter_similarity_result(x, 0.03))
)
similar_users_and_their_twits = cosine_similarity_calculation.collect()

                                                                                

[(('931938860963061760', 1.0),
  [('1719628248747847917', 1),
   ('1719660509169906096', 1),
   ('1719746724007170539', 1),
   ('1719757994445984132', 1),
   ('1719778692023402923', 1),
   ('1719780011069067609', 1),
   ('1719811900001698168', 1),
   ('1719992238430654912', 1),
   ('1720808657263202511', 1),
   ('1720854326946439591', 1),
   ('1720886713390686603', 1),
   ('1720907605717946428', 1),
   ('1720913278010843482', 1),
   ('1721107312746668343', 1),
   ('1721108958960570620', 1),
   ('1721118822092996935', 1),
   ('1721120454285377902', 1),
   ('1721146972160065667', 1),
   ('1721155219394887752', 1),
   ('1721243469807501770', 1),
   ('1721274036771041491', 1),
   ('1721436883916329289', 1),
   ('1721443290464047286', 1),
   ('1721453324354961563', 1),
   ('1721453435583603072', 1),
   ('1721465199985562051', 1),
   ('1721470356026736673', 1),
   ('1721472173720666258', 1),
   ('1721497317411320033', 1),
   ('1721512535898161633', 1),
   ('1721516042319466589', 1),
   ('172