https://ieeexplore.ieee.org/document/7072954
TDIDF cosine similarity

https://towardsdatascience.com/calculating-document-similarities-using-bert-and-other-models-b2c1a29c9630
Calculating document similarities

https://edstem.org/courses/5492/discussion/469433
Usually would be using feature extraction on tweets themselves, but for sake of simplicity we are doing it on the document represention instead.

https://edstem.org/courses/5492/discussion/471511
NB: USE STRING REPRESENTATIONS OF THE DOCUMENT REPRESENTATION; (539, 47, 4) == "539 47 4"

In [1]:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.functions import split
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Word2Vec
import random
from numpy import dot
from numpy.linalg import norm
import time
start = time.time()

## Workload 1

### Data-Loading and Creating Document Representation

In [2]:
# Begin Spark session.
spark = SparkSession \
    .builder \
    .appName("mdon9995 Assignment 2") \
    .getOrCreate()

sc = spark.sparkContext

# Load data from file.
file_path = "tweets.json"
tweets_data_raw = spark.read.option("multiline", "true").json(file_path)

# Show schema for later reference.
tweets_data_raw.printSchema()

# Create temporary view for SQL.
tweets_data_raw.createOrReplaceTempView("tweets")

root
 |-- created_at: string (nullable = true)
 |-- hash_tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- replyto_id: long (nullable = true)
 |-- replyto_user_id: long (nullable = true)
 |-- retweet_id: long (nullable = true)
 |-- retweet_user_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)



In [3]:


# Retrieve user, reply and retweet ids per entry.
# By using concat_ws we also remove the NULLS that occur.
df = spark.sql("""
                    SELECT user_id, CONCAT_WS(" ", replyto_id, retweet_id) AS reply_retweet
                    FROM tweets
                    WHERE replyto_id IS NOT NULL
                    OR retweet_id IS NOT NULL
                    """)
# df.show(20, False)
print("Extracted {} entries from json file.".format(df.count()))
print("\n")

# Grouping by user_id, aggregate by concatenating across entries, with final column name of doc_rep
df_group = df.groupby("user_id")\
            .agg(f.concat_ws(" ", f.collect_list(df.reply_retweet))\
            .alias("doc_rep")).cache()
# df_group.show(20, False)
print("Grouped into {} entries.".format(df_group.count()))

Extracted 8833 entries from json file.


Grouped into 8223 entries.


### Generate an input user_id

In [4]:
# I recognise this takes a few extra computational steps, but generally user_id would be predetermined and wouldn't take time to run.


random.seed(430113983)
user_id = random.choice(df_group.select("user_id").collect())[0]
print('Randomly generated user id: {}'.format(user_id))

# Take input user_id and return its features.
def get_user(user_id, data, features):
    user_features = data.filter(data.user_id == user_id).select("user_id", features)
    return user_features

# Define cosine similarity function
def cosine(a,b):
    return dot(a, b)/(norm(a)*norm(b))

Randomly generated user id: 1205906586734125056


### TF-IDF Feature Extraction

In [5]:
# Tokenize the input 'sentences' to individual 'words'.
tokenizer = Tokenizer(inputCol="doc_rep", outputCol="id")
data_id = tokenizer.transform(df_group)

# Can introduce more for testing, but at a cost to computation. It seemed 20 is suggested as a default and it worked fine.
numFeatures = 50

# Create hashing table for term frequency count.
hashingTF = HashingTF(inputCol="id", outputCol="rawFeatures", numFeatures=numFeatures)
data_featurized = hashingTF.transform(data_id)
data_featurized.cache() # We will call this twice for TF-IDF

# Calculating IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(data_featurized)
data_rescaled = idf_model.transform(data_featurized)

# Run above function, extract features only.
input_user_features = get_user(user_id, data_rescaled, "features").collect()[0][1]

In [6]:
# Filter out input user from comparisons, then conver to rdd for mapping.
# I think udfs would work better here, but I was unable to implement due to time constraints.
v = data_rescaled.select("user_id", "features").filter(data_rescaled.user_id != user_id).rdd

# Computing cosine similarity between input user's features and the entire dataset.
similarities = v.map(lambda x: (x[0], cosine(input_user_features, x[1])))

# Sorting above.
top_five = similarities.sortBy(lambda x: x[1], ascending=False)

# Collect here due to multiple calls soon for printing output.
top_five_list = top_five.collect()

# I noticed a lot of tweets having the same similarities of 1, so the code below counts the amount.
# Extracting the fifth highest similarity that was included. Any similarities that are the same should thus also be included.
# [4] refers to the above, while [1] extracts the similarity score from the fifth most similar word.
top_similarity = top_five_list[4][1]
top_one_same = top_five.filter(lambda x: x[1] == top_similarity).collect()

In [7]:
# Returning results for TDIDF.
print('Using TD-IDF for feature extraction, the top 5 users with similar interests were:')

for i in range(5):
    print('user_id: {}, similarity score: {:.3f}'.format(top_five_list[i][0], top_five_list[i][1]))

if len(top_one_same) > 5:
    print('There were also {} omitted entries with a similarity score of {:.3f}'.format(len(top_one_same)-5, top_similarity))

Using TD-IDF for feature extraction, the top 5 users with similar interests were:
user_id: 302864870, similarity score: 1.000
user_id: 1214658729938997253, similarity score: 1.000
user_id: 1382024523097239556, similarity score: 1.000
user_id: 770655466590445568, similarity score: 1.000
user_id: 25576953, similarity score: 1.000
There were also 189 omitted entries with a similarity score of 1.000


### Word2Vec Feature Extraction

In [8]:


# type(doc_rep) = string, needed array<string> as per Word2Vec requirements
df_w2v = df_group.withColumn('split', split(df_group["doc_rep"], " ")).drop("doc_rep")

# Initialise word2vec model, I found having a larger vector size had an effect on the amount of similar vectors to a given user_id
# Therefore I kept it fairly large, as it did not have a noticeable effect on runtime on my machine.
word2vec = Word2Vec(vectorSize=50, minCount=0, inputCol="split", outputCol="result")

# Fit and Transform model on prepared document representation (with split)
model = word2vec.fit(df_w2v)
result = model.transform(df_w2v)

In [9]:
# Returning features for input user_id.
# [0]["result"] is for extraction from dataframe purposes.
input_user_features_w2v = get_user(user_id, result, "result")\
                            .collect()[0]["result"]

In [10]:
# Filter out input user, then rdd.map cosine similarity
similarities_w2v = result.filter(result.user_id != user_id).rdd.map(lambda x: (x[0], cosine(input_user_features_w2v, x[2])))

# Sorting above.
top_five_w2v = similarities_w2v.sortBy(lambda x: x[1], ascending=False)

# Collect here due to multiple calls soon for printing output.
top_five_list_w2v = top_five_w2v.collect()

# I noticed a lot of tweets having the same similarities of 1, so the code below counts the amount.
# Extracting the fifth highest similarity that was included. Any similarities that are the same should thus also be included.
# [4] refers to the above, while [1] extracts the similarity score from the fifth most similar word.
top_similarity_w2v = top_five_list_w2v[4][1]
top_one_same_w2v = top_five_w2v.filter(lambda x: x[1] == top_similarity_w2v).collect()

In [11]:
# Returning results for W2V.
print('Using Word2Vec for feature extraction, the top 5 users with similar interests were:')

for i in range(5):
    print('user_id: {}, similarity score: {:.3f}'.format(top_five_list_w2v[i][0], top_five_list_w2v[i][1]))

if len(top_one_same_w2v) > 5:
    print('There were also {} omitted entries with a similarity score of {:.3f}'.format(len(top_one_same_w2v)-5, top_similarity_w2v))

Using Word2Vec for feature extraction, the top 5 users with similar interests were:
user_id: 302864870, similarity score: 1.000
user_id: 1214658729938997253, similarity score: 1.000
user_id: 1382024523097239556, similarity score: 1.000
user_id: 770655466590445568, similarity score: 1.000
user_id: 25576953, similarity score: 1.000
There were also 131 omitted entries with a similarity score of 1.000


Currently takes just under 18 seconds to run on my machine. I believe the DataFrame -> RDD call (for mapping purposes) is not the most efficient way (I have included part of the code below), but due to time constraints I must stay with what works for now.

There is also the possibility of using udfs on the DataFrame, but again I was unable to get those to work (as of week 10).

In [12]:
# synonyms = model.findSynonyms(input_user_features_w2v, 5)
# for row in synonyms.collect():
#     print(row)
# # THIS SAVES A LOT OF TIME VS RDD
# # UNABLE TO GET THIS TO WORK WITHOUT ITERATING THROUGH 'RESULT' TO FIND USER_IDs THAT MATCH THE WORD. ALSO HAVE TO CONSIDER WHEN TO END.

In [13]:
end = time.time()
print('Total time taken for workload 1: {}'.format(end-start))

Total time taken for workload 1: 21.52614665031433


# Workload 2

In [14]:
from pyspark.sql.functions import split, explode
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.mllib.recommendation import Rating
from pyspark.ml.recommendation import ALS

In [15]:
start = time.time()
# Playing around with concat_ws to disentangle the array of user_mentions.
# For ease of splitting later on, as it creates an array of strings from array of bigint
# No filtering as we need to give recommendations for every user.

df = spark.sql("""
                    SELECT user_id, concat_ws(",", user_mentions.id) AS user_mentions
                    FROM tweets
                    """)
# df.show(10, False)

In [16]:
# Exploding above arrays, user mentions are split over rows if in array.
df_split = df.withColumn("user_mention", explode(split(df["user_mentions"], ","))).drop("user_mentions")

# df_split.show(10, False)

# Group array with count, thus creating the rating column we need.
df_group = df_split.groupBy("user_id", "user_mention").count()

NB: There are user_ids who have not mentioned any other users, these have been left blank. For the purposes of converting from bigint to int (via indexing), as well as to pass onto the Collaborative Filtering framework, these nulls were converted to an index of 2 (denoting 3rd most common user_mention). I initially aimed to filter it out at the end, but it seems the recommendation system knew to remove them automatically, as it seems no one is ever suggested 2 in the final result.

In [17]:
# Define indexer parameters.
# A pipeline method was used to index over both user_id and user_mention to ensure no OOV elements can occur during recommendation

indexers = [StringIndexer(inputCol="user_id", outputCol="user_id_index") ,\
            StringIndexer(inputCol="user_mention", outputCol="user_mention_index")]
pipeline = Pipeline(stages=indexers)

# Execute indexing
df_index = pipeline.fit(df_group).transform(df_group).cache()

# From https://stackoverflow.com/questions/36942233/apply-stringindexer-to-several-columns-in-a-pyspark-dataframe

In [18]:
# Cast indexed columns to correct datatypes for Collaborative Filtering
df_ratings = df_index.withColumn("user", df_index["user_id_index"].cast('int')).drop("user_id_index")\
                    .withColumn("product", df_index["user_mention_index"].cast('int')).drop("user_mention_index")\
                    .withColumn("rating", df_index["count"].cast('float')).drop("count")


# Defining parameters for ALS
topx = 5
als = ALS(maxIter=10, regParam=0.01, userCol="user", itemCol="product", ratingCol="rating")

# Fit ALS
model = als.fit(df_ratings)

# Final Output
# User Recommendations for all users, in indexed form. An example function is written below to return recommended
# mention users. An alternative is to use a further mapping to restore all initial values.
userRecs = model.recommendForAllUsers(topx).cache()

For an input user_id, this sample code returns the top 5. There is a noticeable lag due to the index2user function that calls back to df_index.

This ultimately solve the big_int issue via indexing but is a bit slow. I'm sure there is a better way, but due to time constraints this works for now.

In [19]:
# # Defining functions for user_id retrieval and index retrieval for final output
# # Filter first then distinct was markedly quicker than the other way around.
# def user2index(user_id):
#     return df_index.select("user_id_index").filter(df_index.user_id == user_id).distinct().collect()[0][0]

# def index2user(index_id):
#     return df_index.select("user_mention").filter(df_index.user_mention_index == index_id).distinct().collect()[0][0]

# def recommend2user(user_id):    
#     user_index = user2index(user_id)
#     user_rec = userRecs.filter(userRecs.user == user_index).collect()
#     print('The top {} user recommendations for user {} are:'.format(topx, user_id))
#     i = 1
#     for row in user_rec[0][1]:
#         print('{}: {}'.format(i, index2user(row['product'])))
#         i += 1
        
# recommend2user(user_id)

The top 5 user recommendations for user 1205906586734125056 are:
1: 14335586
2: 254999238
3: 40851610
4: 23424533
5: 22053725


In [20]:
# #Collect top 5 mention indices and convert to userIds
# def rec_mention(line):
#     mentions = []
#     user, recommended_mention_indices = line
#     user = index2user(user)
#     for i in range(len(recommended_mention_indices)):
#         mention_index = recommended_mention_indices[i][0]
#         mention = index2user(mention_index)
#         mentions.append(mention)
#     return (user, mentions)

# userRecs.rdd.map(lambda row: rec_mention(row)).take(5)

In [21]:
# Following RecommendForAllUsers from:
# https://towardsdatascience.com/collaborative-filtering-in-pyspark-52617dd91194

# TODO: Create rating column; need to aggregate by? DONE
# should be able to pass above into the collaborative filtering algorithm for result DONE?
# TODO: Run on EMR cluster
# TODO: Report writeup

In [22]:
# Extracting user_ids only (still indexed).
user_rec = userRecs.select("user", userRecs.recommendations[0]["product"].alias("Top 1")\
                          , userRecs.recommendations[1]["product"].alias("Top 2")\
                          , userRecs.recommendations[2]["product"].alias("Top 3")\
                          , userRecs.recommendations[3]["product"].alias("Top 4")\
                          , userRecs.recommendations[4]["product"].alias("Top 5"))


# Apply index to string to revert all indices to original format. I apologise in advance for how hardcoded this is.
# Time-constraints and a goal of using as little python data structures as possible led to a bit of a mess.
tmp = df_index.select(df_index.user_id_index.alias("user"), df_index.user_id.alias("user_id")).distinct()
user_rec_id = user_rec.join(tmp, on=['user'], how = 'left').drop("user").cache()

tmp = df_index.select(df_index.user_mention_index.alias("Top 1"), df_index.user_mention.alias("Top 1_id")).distinct()
user_rec_id = user_rec_id.join(tmp, on=['Top 1'], how = 'left').drop("Top 1").cache()

tmp = df_index.select(df_index.user_mention_index.alias("Top 2"), df_index.user_mention.alias("Top 2_id")).distinct()
user_rec_id = user_rec_id.join(tmp, on=['Top 2'], how = 'left').drop("Top 2").cache()

tmp = df_index.select(df_index.user_mention_index.alias("Top 3"), df_index.user_mention.alias("Top 3_id")).distinct()
user_rec_id = user_rec_id.join(tmp, on=['Top 3'], how = 'left').drop("Top 3").cache()

tmp = df_index.select(df_index.user_mention_index.alias("Top 4"), df_index.user_mention.alias("Top 4_id")).distinct()
user_rec_id = user_rec_id.join(tmp, on=['Top 4'], how = 'left').drop("Top 4").cache()

tmp = df_index.select(df_index.user_mention_index.alias("Top 5"), df_index.user_mention.alias("Top 5_id")).distinct()
user_rec_id = user_rec_id.join(tmp, on=['Top 5'], how = 'left').drop("Top 5").cache()

In [23]:
# Printing final output.

print("Printing first 50 rows of output:")
user_rec_id.show(50, False)

Printing first 50 rows of output:
+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+
|user_id            |Top 1_id           |Top 2_id          |Top 3_id           |Top 4_id           |Top 5_id           |
+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+
|1135586475024822275|14296273           |757303975         |198118653          |200737738          |45935021           |
|250843319          |23922797           |370219796         |1078401427347857408|157981564          |114968487          |
|2806767541         |15012486           |29780473          |24259259           |2704294333         |1353769946556325889|
|422496868          |85583894           |33584794          |4970411            |65201417           |4207961            |
|54579715           |17154865           |254999238         |130557513          |854725669          |24259259           

In [24]:
end = time.time()
print('Total time taken for workload 2: {}'.format(end-start))

Total time taken for workload 2: 11.753997802734375
