In [None]:
###########################  Workload 1 ################################

In [None]:
############## 1.1 import file and build document representation ##############

In [113]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType, NullType, IntegerType, FloatType
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.ml.feature import VectorAssembler, Word2Vec, HashingTF, IDF

In [141]:
# load tweets.json
spark = SparkSession.builder.appName("5349_a2").getOrCreate()
tweets_url = 'tweets.json'
tweets = spark.read.option("multiline","true").json(tweets_url).cache()

In [146]:
# create Document Representation
df = tweets.select("user_id", F.concat_ws(',',tweets.replyto_id, tweets.retweet_id).alias("rp_rt"))
df = df.groupby("user_id").agg(F.collect_list(df.rp_rt).alias("Document Representation"))

In [147]:
# delete Null value of document Representation
def block_transf(a):
    for i in a:
        if len(i) == 0:
            return None
        else:
            return a
none_udf = F.udf(block_transf, ArrayType(StringType()))
df = df.select("user_id", none_udf(df['Document Representation']).alias("Document Representation")).na.drop()
df.show(20,False)

+----------+---------------------------------------------------------------------------------------------------------+
|user_id   |Document Representation                                                                                  |
+----------+---------------------------------------------------------------------------------------------------------+
|15466159  |[1390027514332991489]                                                                                    |
|19652471  |[1390023742194061312]                                                                                    |
|30616018  |[1390026843068239874]                                                                                    |
|32947971  |[1390027514332991489]                                                                                    |
|43301934  |[1390022155350446082]                                                                                    |
|51797430  |[1390023742194061312]               

In [None]:
############## 1.2 Word2vec ##############

In [158]:
# feature extractor word2Vec
w2v = Word2Vec(vectorSize=100, inputCol="Document Representation", outputCol= "word2vec")
w2vmodel = w2v.fit(df)
df_w2v = w2vmodel.transform(df)
df_w2v.show(5)

+--------+-----------------------+--------------------+
| user_id|Document Representation|            word2vec|
+--------+-----------------------+--------------------+
|15466159|   [1390027514332991...|[0.08526910096406...|
|19652471|   [1390023742194061...|[-7.6902564615011...|
|30616018|   [1390026843068239...|[0.00153636932373...|
|32947971|   [1390027514332991...|[0.08526910096406...|
|43301934|   [1390022155350446...|[-9.3807739904150...|
+--------+-----------------------+--------------------+
only showing top 5 rows



In [159]:
# find target user
user_w2v = df_w2v.where("user_id == 202170318").collect()
for i in user_w2v:
    tweet_w2v = i['word2vec']

In [160]:
# find top five similarity user 
top_similarity_w2v = df_w2v.select("user_id","word2vec").rdd\
        .mapValues(lambda a: DenseVector(a).dot(tweet_w2v)/(DenseVector(a).norm(2)*tweet_w2v.norm(2)))\
        .sortBy(lambda x: x[1], False).take(6)

In [161]:
top_similarity_w2v[1:6]

[(1318440355, 0.8149268487958068),
 (40496226, 0.8149268487958068),
 (14250699, 0.8149268487958068),
 (3016373503, 0.8149268487958068),
 (3101168904, 0.8149268487958068)]

In [None]:
############## 1.3 TF-IDF ##############

In [162]:
hashingTF = HashingTF(inputCol="Document Representation", outputCol="rawFeatures", numFeatures=100) 
featurizedData = hashingTF.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="tf_idf")
idfModel = idf.fit(featurizedData)
df_tfidf = idfModel.transform(featurizedData)
df_tfidf.show(5)

+--------+-----------------------+----------------+--------------------+
| user_id|Document Representation|     rawFeatures|              tf_idf|
+--------+-----------------------+----------------+--------------------+
|15466159|   [1390027514332991...|(100,[36],[1.0])|(100,[36],[1.1972...|
|19652471|   [1390023742194061...|(100,[81],[1.0])|(100,[81],[1.3429...|
|30616018|   [1390026843068239...|(100,[96],[1.0])|(100,[96],[6.3039...|
|32947971|   [1390027514332991...|(100,[36],[1.0])|(100,[36],[1.1972...|
|43301934|   [1390022155350446...|(100,[38],[1.0])|(100,[38],[2.4884...|
+--------+-----------------------+----------------+--------------------+
only showing top 5 rows



In [163]:
# find target user
user_tfidf = df_tfidf.where("user_id == 202170318").collect()
for i in user_tfidf:
    tweet_tfidf = i['tf_idf']

In [164]:
# find top five similarity user 
top_similarity_tfidf = df_tfidf.select("user_id","tf_idf").rdd\
        .mapValues(lambda a: DenseVector(a).dot(tweet_tfidf)/(DenseVector(a).norm(2)*tweet_tfidf.norm(2)))\
        .sortBy(lambda x: x[1], False).take(6)

In [165]:
df_tfidf.select("user_id","tf_idf").rdd\
        .mapValues(lambda a: DenseVector(a).dot(tweet_tfidf)/(DenseVector(a).norm(2)*tweet_tfidf.norm(2)))\
        .sortBy(lambda x: x[1], False).take(20)

[(202170318, 1.0000000000000002),
 (1016974586, 0.5939757471583065),
 (712790648, 0.5562358036167038),
 (521300849, 0.5562358036167038),
 (429898007, 0.5562358036167038),
 (14202167, 0.5562358036167038),
 (17479538, 0.5562358036167038),
 (110504574, 0.5562358036167038),
 (918929317, 0.5562358036167038),
 (727437799897468928, 0.5562358036167038),
 (216123608, 0.5562358036167038),
 (65091071, 0.5562358036167038),
 (2713172484, 0.5562358036167038),
 (2903825767, 0.5562358036167038),
 (1234011788401725440, 0.5562358036167038),
 (985849749319356417, 0.5562358036167038),
 (35124792, 0.5562358036167038),
 (16422863, 0.5562358036167038),
 (3368587707, 0.5562358036167038),
 (321084701, 0.5562358036167038)]

In [166]:
top_similarity_tfidf[1:6]

[(1016974586, 0.5939757471583065),
 (712790648, 0.5562358036167038),
 (521300849, 0.5562358036167038),
 (429898007, 0.5562358036167038),
 (14202167, 0.5562358036167038)]

In [None]:
###########################  Workload 2 ################################

In [112]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row, Catalog
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, IndexToString

In [127]:
df2 = tweets.select(tweets["user_id"].alias("user"), F.explode(tweets['user_mentions.id']).alias('item'))\
                .groupBy(["user","item"]).count()
df2 = df2.select("user","item",df2["count"].alias("rating"))
df2.show(5)
df2.printSchema

+-------------------+-------------------+------+
|               user|               item|rating|
+-------------------+-------------------+------+
| 739595086044876800|         2207905453|     1|
|         4464358941|             807095|     1|
| 728324764113211394|1349149096909668363|     1|
|          186593993|          133081348|     1|
|1179209439087333376|           26574283|     1|
+-------------------+-------------------+------+
only showing top 5 rows



<bound method DataFrame.printSchema of DataFrame[user: bigint, item: bigint, rating: bigint]>

In [128]:
user_indexer = StringIndexer(inputCol='user', outputCol='user_2')
user_model = user_indexer.fit(df2)
inderer = user_model.transform(df2)

item_indexer = StringIndexer(inputCol='item', outputCol='item_2')
item_model = item_indexer.fit(inderer)
inderer_2 = item_model.transform(inderer)
inderer_2.show()

+-------------------+-------------------+------+------+------+
|               user|               item|rating|user_2|item_2|
+-------------------+-------------------+------+------+------+
| 739595086044876800|         2207905453|     1|  22.0| 512.0|
|         4464358941|             807095|     1| 731.0|   0.0|
| 728324764113211394|1349149096909668363|     1|7045.0|   9.0|
|          186593993|          133081348|     1|3605.0|   6.0|
|1179209439087333376|           26574283|     1|1690.0|   1.0|
|          807532302|           19881665|     1|7407.0|  33.0|
|1385343098566307841|1349149096909668363|     1| 494.0|   9.0|
|1289041202487021571|           15115280|     1|2282.0|   5.0|
|          226035819|          133081348|     1|4040.0|   6.0|
|          288840037|           15115280|     1|4895.0|   5.0|
|          357832718|            4970411|     1|5603.0|  24.0|
|         4586004016|           26574283|     1| 735.0|   1.0|
|          139668106|          191807697|     1|2925.0|

In [132]:
als = ALS(userCol='user_2', itemCol='item_2',numItemBlocks=5, regParam=0.01)
model = als.fit(inderer_2)

# recommand 5 items for each user
recommend = model.recommendForAllUsers(5)
recommendations = recommend.select("user_2","recommendations.item_2")
recommendations.show()

+------+--------------------+
|user_2|              item_2|
+------+--------------------+
|  1580|[72, 141, 42, 128...|
|  4900|[82, 33, 595, 206...|
|  5300|[82, 33, 595, 206...|
|  6620|[82, 33, 595, 206...|
|  7240|[158, 82, 85, 252...|
|  7340|[239, 196, 105, 5...|
|  7880|[72, 141, 42, 128...|
|   471|[47, 72, 85, 141,...|
|  1591|[82, 33, 595, 206...|
|  4101|[82, 33, 595, 206...|
|  1342|[82, 33, 595, 206...|
|  2122|[82, 33, 595, 206...|
|  2142|[173, 106, 124, 1...|
|  7982|[72, 141, 42, 128...|
|   463|[72, 141, 42, 128...|
|   833|[33, 115, 491, 22...|
|  5803|[99, 51, 30, 84, ...|
|  7253|[72, 141, 42, 128...|
|  7833|[150, 93, 168, 65...|
|  7993|[72, 141, 42, 128...|
+------+--------------------+
only showing top 20 rows



In [144]:
# transform user_2 to user
inverter = IndexToString(inputCol="user_2", outputCol="user", labels=user_model.labels)
itd = inverter.transform(recommendations)

# transform item_2 to item
itd_2 = itd.select("user", F.explode(itd['item_2']).alias('item_2'))

inverter_2 = IndexToString(inputCol="item_2", outputCol="item", labels=item_model.labels)
itd_3 = inverter_2.transform(itd_2)

# The final results
itd_4 = itd_3.select("user","item").groupBy("user").agg(F.collect_list("item"))
recommand_users = itd_4.select("user", itd_4['collect_list(item)'].alias('item'))
recommand_users.show(20,False)

+-------------------+-----------------------------------------------------------------+
|user               |item                                                             |
+-------------------+-----------------------------------------------------------------+
|492247078          |[14335586, 231510077, 33584794, 12133382, 738767160395321345]    |
|315386202          |[587591389, 398878622, 138203134, 14434063, 29780473]            |
|602280399          |[587591389, 398878622, 138203134, 14434063, 29780473]            |
|1339354662923808780|[587591389, 398878622, 138203134, 14434063, 29780473]            |
|828160430          |[14260960, 138203134, 398878622, 15907183, 9300262]              |
|1372166406972661770|[327449246, 94482117, 14167059, 91905327, 22053725]              |
|65201417           |[65201417, 17154865, 807357676300730368, 4970411, 15907183]      |
|17922098           |[20759034, 1643123766, 947605221416538112, 19881665, 298217736]  |
|46103045           |[29780473, 