In [35]:
# workload 1 

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType

In [37]:
# create a SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.
spark = SparkSession \
    .builder \
    .appName("workload1_Ass2") \
    .getOrCreate()

In [38]:
# load JSON file into Spark data frame 
tweets_df = spark.read.option("multiLine","true").json('tweets.json')
tweets_df.show(1)

+-------------------+---------+-------------------+----------+---------------+-------------------+---------------+--------------------+--------+-------------------+
|         created_at|hash_tags|                 id|replyto_id|replyto_user_id|         retweet_id|retweet_user_id|                text| user_id|      user_mentions|
+-------------------+---------+-------------------+----------+---------------+-------------------+---------------+--------------------+--------+-------------------+
|2021-05-05 23:37:51|     null|1390088382659895296|      null|           null|1390027514332991489|         807095|RT @nytimes: Brea...|17799542|[{807095, [3, 11]}]|
+-------------------+---------+-------------------+----------+---------------+-------------------+---------------+--------------------+--------+-------------------+
only showing top 1 row



In [39]:
# retweed_id is rt, replyto_id is rp
tweets_df.select("user_id","retweet_id","replyto_id").show()

+-------------------+-------------------+----------+
|            user_id|         retweet_id|replyto_id|
+-------------------+-------------------+----------+
|           17799542|1390027514332991489|      null|
|         1166466828|1390022155350446082|      null|
|1343606436149022723|1390050885229817856|      null|
| 930226031276982273|1390066365046865929|      null|
| 920858307392192513|1390027514332991489|      null|
|           21458110|1390025466539614212|      null|
| 787062740183552000|1390023742194061312|      null|
|          392646132|               null|      null|
|         2955789098|1390027514332991489|      null|
| 792380204287164416|               null|      null|
|          198453947|1390027514332991489|      null|
|         1431726547|1390066365046865929|      null|
|1245145031045980163|1390023742194061312|      null|
|         2181244875|1390039923588206598|      null|
|           34865264|1390023742194061312|      null|
|          179912903|1390087644235902979|     

In [40]:
# concat retweet_id and replyto_id within each retweet object, add into a new column 
import pyspark.sql.functions as F

def myConcat(*cols):
    return F.concat(*[F.coalesce(c, F.lit("")) for c in cols])

# create a new data frame contains only user_id with rt_rp_id.
df = tweets_df.withColumn("rt_rp_id", myConcat("retweet_id","replyto_id")).select("user_id","rt_rp_id")

In [41]:
# group by user_id and get document representation 
df_new = df.groupby("user_id").agg(F.collect_set("rt_rp_id")).withColumnRenamed("collect_set(rt_rp_id)", "Document_Representation")

In [42]:
from pyspark.sql.functions import *

dataframe = df_new.withColumn('Document_Representation', concat_ws(',', 'Document_Representation'))

In [43]:
dataframe.show(truncate=0) #complete version

+---------+---------------------------------------------------------------------------------------------------+
|user_id  |Document_Representation                                                                            |
+---------+---------------------------------------------------------------------------------------------------+
|15466159 |1390027514332991489                                                                                |
|19652471 |1390023742194061312                                                                                |
|30616018 |1390026843068239874                                                                                |
|32947971 |1390027514332991489                                                                                |
|33868781 |                                                                                                   |
|43301934 |1390022155350446082                                                                          

In [44]:
# Apply the feature extractor TF-IDF on the document representations.
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="Document_Representation", outputCol="words")
wordsData = tokenizer.transform(dataframe)
#wordsData.show(truncate=0)

# HashingTF is a Transformer, which converts wordsData into fixed-length feature vectors
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
#featurizedData.show(truncate=0)

# IDF is an Estimator which is fit on a dataset and produces an IDFModel.
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
# The IDFModel takes feature vectors and scales each feature 
# down-weights features which appear frequently in a corpus
rescaledData = idfModel.transform(featurizedData)
feature_df = rescaledData.select("user_id","features")
feature_df.show(truncate=0)

+---------+------------------------------+
|user_id  |features                      |
+---------+------------------------------+
|15466159 |(20,[16],[1.3580036303746605])|
|19652471 |(20,[1],[1.4810437488986479]) |
|30616018 |(20,[16],[1.3580036303746605])|
|32947971 |(20,[16],[1.3580036303746605])|
|33868781 |(20,[12],[1.959048944935679]) |
|43301934 |(20,[18],[2.5104734307600074])|
|51797430 |(20,[1],[1.4810437488986479]) |
|54579715 |(20,[16],[1.3580036303746605])|
|106456868|(20,[19],[3.919602900950915]) |
|109826391|(20,[16],[1.3580036303746605])|
|111249239|(20,[1],[1.4810437488986479]) |
|147126487|(20,[15],[4.509565845275629]) |
|157101980|(20,[16],[1.3580036303746605])|
|202170318|(20,[6],[3.4208058530505316]) |
|228531805|(20,[16],[1.3580036303746605])|
|235000057|(20,[12],[1.959048944935679]) |
|257121078|(20,[1],[1.4810437488986479]) |
|302864870|(20,[8],[3.3883354680197475]) |
|404262940|(20,[12],[1.959048944935679]) |
|417365324|(20,[3],[4.570190467092065])  |
+---------+

In [45]:
from pyspark.sql.types import *
# convert sparse vector to normal vector
vector_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))
vector_df = feature_df.select("user_id",vector_udf('features').alias('features'))

In [46]:
vector_df.show()
vector_df.cache()

+---------+--------------------+
|  user_id|            features|
+---------+--------------------+
| 15466159|[0.0, 0.0, 0.0, 0...|
| 19652471|[0.0, 1.481043748...|
| 30616018|[0.0, 0.0, 0.0, 0...|
| 32947971|[0.0, 0.0, 0.0, 0...|
| 33868781|[0.0, 0.0, 0.0, 0...|
| 43301934|[0.0, 0.0, 0.0, 0...|
| 51797430|[0.0, 1.481043748...|
| 54579715|[0.0, 0.0, 0.0, 0...|
|106456868|[0.0, 0.0, 0.0, 0...|
|109826391|[0.0, 0.0, 0.0, 0...|
|111249239|[0.0, 1.481043748...|
|147126487|[0.0, 0.0, 0.0, 0...|
|157101980|[0.0, 0.0, 0.0, 0...|
|202170318|[0.0, 0.0, 0.0, 0...|
|228531805|[0.0, 0.0, 0.0, 0...|
|235000057|[0.0, 0.0, 0.0, 0...|
|257121078|[0.0, 1.481043748...|
|302864870|[0.0, 0.0, 0.0, 0...|
|404262940|[0.0, 0.0, 0.0, 0...|
|417365324|[0.0, 0.0, 0.0, 4...|
+---------+--------------------+
only showing top 20 rows



DataFrame[user_id: bigint, features: array<double>]

In [47]:
test_item = vector_df.take(1)
test_input = spark.createDataFrame(test_item)
test_input.show()
test_value = test_input.select("features").collect()[0][0]
print(test_value)

+--------+--------------------+
| user_id|            features|
+--------+--------------------+
|15466159|[0.0, 0.0, 0.0, 0...|
+--------+--------------------+

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.3580036303746605, 0.0, 0.0, 0.0]


In [48]:
# Calculate cosine similarity between Vectors and test_input
import numpy as np
def cos_similarity(vec):
    sim = np.dot(test_value,vec)/(np.linalg.norm(test_value)*np.linalg.norm(vec))
    return sim.tolist()
    
cos_sim_udf = udf(cos_similarity, FloatType())

df_cos = vector_df.withColumn('cos_sim', cos_sim_udf('features')).select("user_id","features","cos_sim")
df_cos.show()
df_cos.cache()

+---------+--------------------+-------+
|  user_id|            features|cos_sim|
+---------+--------------------+-------+
| 15466159|[0.0, 0.0, 0.0, 0...|    1.0|
| 19652471|[0.0, 1.481043748...|    0.0|
| 30616018|[0.0, 0.0, 0.0, 0...|    1.0|
| 32947971|[0.0, 0.0, 0.0, 0...|    1.0|
| 33868781|[0.0, 0.0, 0.0, 0...|    0.0|
| 43301934|[0.0, 0.0, 0.0, 0...|    0.0|
| 51797430|[0.0, 1.481043748...|    0.0|
| 54579715|[0.0, 0.0, 0.0, 0...|    1.0|
|106456868|[0.0, 0.0, 0.0, 0...|    0.0|
|109826391|[0.0, 0.0, 0.0, 0...|    1.0|
|111249239|[0.0, 1.481043748...|    0.0|
|147126487|[0.0, 0.0, 0.0, 0...|    0.0|
|157101980|[0.0, 0.0, 0.0, 0...|    1.0|
|202170318|[0.0, 0.0, 0.0, 0...|    0.0|
|228531805|[0.0, 0.0, 0.0, 0...|    1.0|
|235000057|[0.0, 0.0, 0.0, 0...|    0.0|
|257121078|[0.0, 1.481043748...|    0.0|
|302864870|[0.0, 0.0, 0.0, 0...|    0.0|
|404262940|[0.0, 0.0, 0.0, 0...|    0.0|
|417365324|[0.0, 0.0, 0.0, 4...|    0.0|
+---------+--------------------+-------+
only showing top

DataFrame[user_id: bigint, features: array<double>, cos_sim: float]

In [49]:
# top 5 users with similar interest as a given user id, sorting by Descending order
top5users = df_cos.select('user_id','cos_sim').orderBy('cos_sim', ascending=False).limit(5).collect()
top_5_user_id = []
for x in top5users:
    top_5_user_id.append(x[0])
print("Using TF-IDF as feature extractor:")
print("The top 5 users with similar interest as a given user id:",top_5_user_id)

Using TF-IDF as feature extractor:
The top 5 users with similar interest as a given user id: [54579715, 3304296668, 109826391, 30616018, 157101980]


In [50]:
# using Word2Vec to extract features

In [51]:
# Apply the feature extractor Word2Vec on the document representations.
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=10, minCount=0, inputCol="Document_Representation", outputCol="result")
model = word2Vec.fit(df_new)
# The Word2VecModel transforms each document into a vector using the average of all words in the document
result = model.transform(df_new)
Word2Vec_result = result.collect()

#result.show(truncate=0)
for x1 in Word2Vec_result:
    print("Document Representation: {} => \nVector: {}\n".format(x1[1], x1[2]))

Document Representation: ['1390027514332991489'] => 
Vector: [-0.4883058965206146,-0.3890067934989929,-0.4080418348312378,-0.516539990901947,0.13311640918254852,0.29462137818336487,1.1575371026992798,0.582281231880188,-0.20210590958595276,0.8797650337219238]

Document Representation: ['1390023742194061312'] => 
Vector: [-0.06441959738731384,-0.02104375511407852,0.006334134843200445,0.004689395427703857,0.07248742133378983,-0.029715076088905334,0.2732863426208496,0.0945764109492302,-0.12920114398002625,0.10261623561382294]

Document Representation: ['1390026843068239874'] => 
Vector: [0.030103761702775955,0.03530328348278999,0.04215181991457939,0.022291850298643112,-0.03548181802034378,-0.0405881404876709,-0.023633623495697975,0.01391149777919054,0.03359648585319519,-0.03750377148389816]

Document Representation: ['1390027514332991489'] => 
Vector: [-0.4883058965206146,-0.3890067934989929,-0.4080418348312378,-0.516539990901947,0.13311640918254852,0.29462137818336487,1.1575371026992798,0

In [52]:
# test data 
test_item1 = result.take(1)
test_input1 = spark.createDataFrame(test_item1)
test_input1.cache()
test_input1.show()
test_value1 = test_input1.select("result").collect()[0][0]
print(test_value1)

+--------+-----------------------+--------------------+
| user_id|Document_Representation|              result|
+--------+-----------------------+--------------------+
|15466159|   [1390027514332991...|[-0.4883058965206...|
+--------+-----------------------+--------------------+

[-0.4883058965206146,-0.3890067934989929,-0.4080418348312378,-0.516539990901947,0.13311640918254852,0.29462137818336487,1.1575371026992798,0.582281231880188,-0.20210590958595276,0.8797650337219238]


In [53]:
# Calculate the cosine similarities using the values obtained in previous step 
from pyspark.sql.functions import udf, log
import numpy as np
def cos_similarity(vec):
    sim = np.dot(test_value1,vec)/(np.linalg.norm(test_value1)*np.linalg.norm(vec))
    return sim.tolist()
    
cos_sim_udf1 = udf(cos_similarity, FloatType())

df_cos1 = result.withColumn('cos_sim_Word2Vec', cos_sim_udf1('result')).select("user_id","result","cos_sim_Word2Vec")
df_cos1.show()
df_cos1.cache()

+---------+--------------------+----------------+
|  user_id|              result|cos_sim_Word2Vec|
+---------+--------------------+----------------+
| 15466159|[-0.4883058965206...|             1.0|
| 19652471|[-0.0644195973873...|       0.8111986|
| 30616018|[0.03010376170277...|     -0.69705015|
| 32947971|[-0.4883058965206...|             1.0|
| 33868781|[0.01977297663688...|      0.90047044|
| 43301934|[0.00894099567085...|     -0.39432248|
| 51797430|[-0.0644195973873...|       0.8111986|
| 54579715|[-0.4883058965206...|             1.0|
|106456868|[0.00344814918935...|      0.67461216|
|109826391|[-0.4883058965206...|             1.0|
|111249239|[-0.0644195973873...|       0.8111986|
|147126487|[0.01505714096128...|      0.35010928|
|157101980|[-0.0154105350375...|      -0.1579794|
|202170318|[-0.0146485993638...|       0.5105926|
|228531805|[-0.4883058965206...|             1.0|
|235000057|[0.01977297663688...|      0.90047044|
|257121078|[-0.0644195973873...|       0.8111986|


DataFrame[user_id: bigint, result: vector, cos_sim_Word2Vec: float]

In [54]:
# top 5 users with similar interest as a given user id, sorting by Descending order
top5users_Word2Vec = df_cos1.select('user_id','cos_sim_Word2Vec').orderBy('cos_sim_Word2Vec', ascending=False).limit(5).collect()
top_5_user_id_Word2Vec = []
for x in top5users_Word2Vec:
    top_5_user_id_Word2Vec.append(x[0])
print("Using Word2Vec as feature extractor:")
print("The top 5 users with similar interest as a given user id:", top_5_user_id_Word2Vec)

Using Word2Vec as feature extractor:
The top 5 users with similar interest as a given user id: [159323186, 14439894, 474917204, 15390287, 2255763426]


Workload 2

In [55]:
# prepare the raw data in the format as required by the collaborative filter algorithm
# show user_id, user_mentions
df_wk2 = tweets_df.select("user_id","user_mentions")

df_wk2.show(5)

+-------------------+--------------------+
|            user_id|       user_mentions|
+-------------------+--------------------+
|           17799542| [{807095, [3, 11]}]|
|         1166466828|[{380648579, [3, ...|
|1343606436149022723|[{191807697, [3, ...|
| 930226031276982273|[{15115280, [3, 1...|
| 920858307392192513| [{807095, [3, 11]}]|
+-------------------+--------------------+
only showing top 5 rows



In [56]:
from pyspark.sql.functions import col, explode

df_mention = df_wk2.withColumn("user_mentions", explode("user_mentions")) \
                .select("*", col("user_mentions")["id"].alias("mention_id")) \
                .select("user_id","mention_id")
df_mention.show(5)

+-------------------+----------+
|            user_id|mention_id|
+-------------------+----------+
|           17799542|    807095|
|         1166466828| 380648579|
|1343606436149022723| 191807697|
| 930226031276982273|  15115280|
| 920858307392192513|    807095|
+-------------------+----------+
only showing top 5 rows



In [57]:
# count rating: number of times a tweet user mentions a mention user.
from pyspark.sql.functions import *
df_rating = df_mention.groupBy("user_id","mention_id") \
            .agg(count("mention_id")).select("user_id","mention_id","count(mention_id)")

df_rating.show(5)

+-------------------+-------------------+-----------------+
|            user_id|         mention_id|count(mention_id)|
+-------------------+-------------------+-----------------+
| 739595086044876800|         2207905453|                1|
|         4464358941|             807095|                1|
| 728324764113211394|1349149096909668363|                1|
|          186593993|          133081348|                1|
|1179209439087333376|           26574283|                1|
+-------------------+-------------------+-----------------+
only showing top 5 rows



In [58]:
# prepare the raw data in the format as required by the collaborative filter algorithm
raw_data = df_rating.select("user_id", "mention_id", col("count(mention_id)").alias("rating"))
raw_data.show(5)
raw_data.cache()

+-------------------+-------------------+------+
|            user_id|         mention_id|rating|
+-------------------+-------------------+------+
| 739595086044876800|         2207905453|     1|
|         4464358941|             807095|     1|
| 728324764113211394|1349149096909668363|     1|
|          186593993|          133081348|     1|
|1179209439087333376|           26574283|     1|
+-------------------+-------------------+------+
only showing top 5 rows



DataFrame[user_id: bigint, mention_id: bigint, rating: bigint]

In [59]:
raw_dataframe = raw_data.withColumn("user_id_INT", raw_data["user_id"].cast(IntegerType())) \
                        .withColumn("mention_id_INT", raw_data["mention_id"].cast(IntegerType())) 

raw_dataframe.cache()

DataFrame[user_id: bigint, mention_id: bigint, rating: bigint, user_id_INT: int, mention_id_INT: int]

In [60]:
# Build a model to perform the recommendation.
# Alternating Least Squares (ALS) is a the model I'll use to fit my data and find similarities.
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

(training, test) = raw_dataframe.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_INT", itemCol="mention_id_INT", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [61]:
predictions = model.transform(test)
predictions.show(5)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

+-------------------+-------------------+------+-----------+--------------+----------+
|            user_id|         mention_id|rating|user_id_INT|mention_id_INT|prediction|
+-------------------+-------------------+------+-----------+--------------+----------+
|1149658282421837824|           17112878|     1| -522788864|      17112878| 0.9570924|
| 739595086044876800|          130557513|     2| -313241600|     130557513|0.67659044|
|          284383568|          130557513|     1|  284383568|     130557513| 0.4038978|
|          210620557|          185025785|     1|  210620557|     185025785|0.89816856|
|           60790159|1385797144209096705|     1|   60790159|     257261569|0.33871594|
+-------------------+-------------------+------+-----------+--------------+----------+
only showing top 5 rows

Root-mean-square error = 0.7848935311166044


In [62]:
# Generate top 5 mentioned users recommendations for each tweeter user
Rec = model.recommendForAllUsers(5)

In [63]:
Recommendation = Rec.join(predictions, "user_id_INT").select("user_id","recommendations")
Recommendation.show(truncate=0)
# in recommendations sections, {mention_user_id, predicted_mentioned_times}

+-------------------+-------------------------------------------------------------------------------------------------------------------------+
|user_id            |recommendations                                                                                                          |
+-------------------+-------------------------------------------------------------------------------------------------------------------------+
|143138173          |[{281877818, 1.8315904}, {15781769, 1.6910799}, {96900937, 1.5438443}, {65201417, 1.534229}, {22053725, 1.4049186}]      |
|6320792            |[{23424533, 1.4287063}, {61863570, 1.3783333}, {-1054571062, 1.0145872}, {90918200, 0.9939678}, {20298671, 0.9922338}]   |
|46058741           |[{1402351624, 2.8357756}, {14296273, 2.7093263}, {931571402, 2.5442154}, {59159771, 2.4908698}, {20759034, 2.3473892}]   |
|962749781536792577 |[{2729061, 3.5671666}, {309705905, 3.4268131}, {17674244, 3.3684978}, {1402351624, 2.743431}, {14296273, 2.678756}]