## Workload 1

In [1]:
# Import all necessary libraries and setup the environment for matplotlib
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Normalizer, VectorAssembler
from pyspark.ml.linalg import *
from pyspark.sql.types import * 
from pyspark.sql.functions import *
import pyspark.sql.functions as f
import numpy as np
import matplotlib.pyplot as plt

In [2]:
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

In [3]:
spark = SparkSession \
    .builder \
    .appName("Assignment 2") \
    .getOrCreate()
tweeets_data = spark.read.option('multiline','true').json('tweets.json')

In [4]:
tweets_agg = tweeets_data.groupby("user_id").agg(f.concat_ws(" ", f.collect_list(tweeets_data.id)).alias('agg_tweets'),
                                    f.concat_ws(" ", f.collect_list(tweeets_data.retweet_user_id)).alias('agg_retweet_users'),
                                    f.concat_ws(" ", f.collect_list(tweeets_data.retweet_id)).alias('agg_retweets'),
                                    f.concat_ws(" ", f.collect_list(tweeets_data.replyto_user_id)).alias('agg_reply_users'),
                                    f.concat_ws(" ", f.collect_list(tweeets_data.replyto_id)).alias('agg_replies'))

In [5]:
tweets_agg = tweets_agg.withColumn("agg_reply_users", blank_as_null("agg_reply_users"))
tweets_agg = tweets_agg.withColumn("agg_retweet_users", blank_as_null("agg_retweet_users"))
tweets_agg = tweets_agg.withColumn("agg_retweets", blank_as_null("agg_retweets"))
tweets_agg = tweets_agg.withColumn("agg_replies", blank_as_null("agg_replies"))

In [6]:
tweets_processed = tweets_agg.select('*',concat_ws(' ','agg_retweets','agg_replies').alias('agg_tweet_respond'))

In [7]:
tokenizer = Tokenizer(inputCol='agg_tweet_respond',
    outputCol="vectors")
tweets_vectors = tokenizer.transform(tweets_processed)

In [8]:
hashingTF = HashingTF(inputCol="vectors", outputCol="tf")
tf = hashingTF.transform(tweets_vectors)

idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
tfidf = idf.transform(tf)

In [9]:
normalizer = Normalizer(inputCol="feature", outputCol="norm")
tf_data = normalizer.transform(tfidf)

In [10]:
selected_id = 202170318
tweets_user_filtered = tf_data.where(f'user_id = {selected_id}')
compare_vector = tweets_user_filtered.first()['norm']

In [11]:
def cos_sim(a,b=compare_vector):
    return float(a.dot(b) / (a.norm(2) * b.norm(2)))
cos_function = udf(cos_sim, FloatType())

In [12]:
tf_data = tf_data.withColumn("CosineSim",cos_function('norm'))
tf_data = tf_data.where(f'user_id <> {selected_id}')

In [13]:
sorted_output_tf = tf_data.filter(tf_data.CosineSim > 0).sort(col('CosineSim').desc())
sorted_output_tf.show(5,truncate=False)

+-------------------+-------------------+-----------------+-------------------+---------------+-----------+-------------------+---------------------+-----------------------+-------------------------------------+-----------------------+----------+
|user_id            |agg_tweets         |agg_retweet_users|agg_retweets       |agg_reply_users|agg_replies|agg_tweet_respond  |vectors              |tf                     |feature                              |norm                   |CosineSim |
+-------------------+-------------------+-----------------+-------------------+---------------+-----------+-------------------+---------------------+-----------------------+-------------------------------------+-----------------------+----------+
|727437799897468928 |1390068292220506112|96900937         |1390068042474917888|null           |null       |1390068042474917888|[1390068042474917888]|(262144,[232689],[1.0])|(262144,[232689],[7.188628509504587])|(262144,[232689],[1.0])|0.55613005|
|2903825767 

In [14]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="vectors", outputCol="features")

tweets_cv = cv.fit(tweets_vectors)

cv_data = tweets_cv.transform(tweets_vectors)

In [15]:
selected_id = 202170318
tweets_user_filtered = cv_data.where(f'user_id = {selected_id}')
compare_vector = tweets_user_filtered.first()['features']

In [16]:
def cos_sim(a,b=compare_vector):
    return float(a.dot(b) / (a.norm(2) * b.norm(2)))
cos_function = udf(cos_sim, FloatType())

In [17]:
cv_data = cv_data.withColumn("CosineSim",cos_function('features'))
cv_data = cv_data.where(f'user_id <> {selected_id}')

In [18]:
sorted_output_cv = cv_data.filter(cv_data.CosineSim > 0).sort(col('CosineSim').desc())
sorted_output_cv.show(5,truncate=False)

+----------+---------------------------------------+-------------------+---------------------------------------+---------------+-----------+---------------------------------------+------------------------------------------+----------------------+---------+
|user_id   |agg_tweets                             |agg_retweet_users  |agg_retweets                           |agg_reply_users|agg_replies|agg_tweet_respond                      |vectors                                   |features              |CosineSim|
+----------+---------------------------------------+-------------------+---------------------------------------+---------------+-----------+---------------------------------------+------------------------------------------+----------------------+---------+
|3101168904|1390055316683853824 1390055278486310914|380648579 380648579|1390047008971444231 1390022155350446082|null           |null       |1390047008971444231 1390022155350446082|[1390047008971444231, 1390022155350446082]|(656,[

## Workload 2

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [20]:
tweeets_data.show(5,truncate=False)

+-------------------+---------------------+-------------------+----------+---------------+-------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------------+
|created_at         |hash_tags            |id                 |replyto_id|replyto_user_id|retweet_id         |retweet_user_id|text                                                                                                                                            |user_id            |user_mentions         |
+-------------------+---------------------+-------------------+----------+---------------+-------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------------+
|2021-05-05 23:37:51|null                 |139008838265

In [21]:
wl2 = tweeets_data.withColumn("mentioned_users", tweeets_data["user_mentions"].getField('id'))
wl2.show()

+-------------------+--------------------+-------------------+----------+---------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+
|         created_at|           hash_tags|                 id|replyto_id|replyto_user_id|         retweet_id|   retweet_user_id|                text|            user_id|       user_mentions|     mentioned_users|
+-------------------+--------------------+-------------------+----------+---------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+
|2021-05-05 23:37:51|                null|1390088382659895296|      null|           null|1390027514332991489|            807095|RT @nytimes: Brea...|           17799542| [{807095, [3, 11]}]|            [807095]|
|2021-05-05 23:37:45|[{[9, 18], BREAKI...|1390088354717474822|      null|           null|1390022155350446082|         380648579|RT @AFP: #BREAKIN...|   

In [47]:
wl2_users = wl2.select(col('user_id'),col('mentioned_users'))

In [48]:
wl2_users = wl2_users.withColumn("mentioned_users", explode("mentioned_users"))

In [49]:
wl2_users_agg = wl2_users.groupBy(col('user_id'),col('mentioned_users')).count()

In [64]:
wl2_users_agg.orderBy(wl2_users_agg["count"].desc()).show()

+-------------------+------------------+-----+
|            user_id|   mentioned_users|count|
+-------------------+------------------+-----+
|         3094649957|          24259259|    6|
| 898230537244073986|         281877818|    3|
|         3094649957|        3094649957|    3|
|1286417148848480257|          55060090|    3|
|          138011877|         380648579|    2|
|1343557199394525186|         380648579|    2|
|1379485472158842887|          56488059|    2|
| 722337933907398656|         380648579|    2|
|1343362946852937728|        1643123766|    2|
|1345652009618255872|        1643123766|    2|
|          175604092|842678603305377793|    2|
|           36968645|          22429979|    2|
|          473714282|          19881665|    2|
|1331611799053955073|          17674244|    2|
|          480875170|         380648579|    2|
| 918084577377136640|            807095|    2|
|          109025128|          61863570|    2|
|          205463662|         380648579|    2|
|           9

In [81]:
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, IndexToString

stringIndexer_uid = StringIndexer(inputCol="user_id", outputCol="user_id_indexed",stringOrderType="frequencyDesc")
model_uid = stringIndexer_uid.fit(wl2_users_agg)

stringIndexer_mentioned = StringIndexer(inputCol="mentioned_users", outputCol="mentioned_users_indexed",stringOrderType="frequencyDesc")
model_mu = stringIndexer_mentioned.fit(wl2_users_agg)

td = model_uid.transform(wl2_users_agg)
wl2_users_transformed = model_mu.transform(td)

In [114]:
wl2_users_transformed.count()

9792

In [119]:
converter = IndexToString(inputCol="user_id_indexed", outputCol="original_user_id_indexed")
converted = converter.transform(wl2_users_transformed)

In [120]:
converted.show()

+-------------------+-------------------+-----+---------------+-----------------------+------------------------+
|            user_id|    mentioned_users|count|user_id_indexed|mentioned_users_indexed|original_user_id_indexed|
+-------------------+-------------------+-----+---------------+-----------------------+------------------------+
| 739595086044876800|         2207905453|    1|           22.0|                  512.0|      739595086044876800|
|         4464358941|             807095|    1|          731.0|                    0.0|              4464358941|
| 728324764113211394|1349149096909668363|    1|         7045.0|                    9.0|      728324764113211394|
|          186593993|          133081348|    1|         3605.0|                    6.0|               186593993|
|1179209439087333376|           26574283|    1|         1690.0|                    1.0|     1179209439087333376|
|          807532302|           19881665|    1|         7407.0|                   33.0|         

In [92]:
wl2_users_transformed

DataFrame[user_id: bigint, mentioned_users: bigint, count: bigint, user_id_indexed: double, mentioned_users_indexed: double]

In [73]:
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_indexed", itemCol="mentioned_users_indexed", ratingCol="count",
          coldStartStrategy="drop")
model = als.fit(wl2_users_transformed)

In [79]:
model_recs = model.recommendForAllUsers(5)

In [104]:
model_recs = model_recs.withColumn("user_id_indexed", model_recs["user_id_indexed"].cast("double"))

In [117]:
model_recs.show()

+---------------+--------------------+
|user_id_indexed|     recommendations|
+---------------+--------------------+
|         1580.0|[{80, 3.1511617},...|
|         4900.0|[{82, 4.5872145},...|
|         5300.0|[{82, 4.5872145},...|
|         6620.0|[{82, 4.5872145},...|
|         7240.0|[{55, 2.980116}, ...|
|         7340.0|[{72, 2.7744668},...|
|         7880.0|[{80, 3.1511617},...|
|          471.0|[{99, 2.0167801},...|
|         1591.0|[{82, 4.5872145},...|
|         4101.0|[{82, 4.5872145},...|
|         1342.0|[{82, 4.5872145},...|
|         2122.0|[{82, 4.5872145},...|
|         2142.0|[{235, 0.9960255}...|
|         7982.0|[{80, 3.1511617},...|
|          463.0|[{53, 3.2644851},...|
|          833.0|[{187, 1.5952177}...|
|         5803.0|[{99, 0.9981464},...|
|         7253.0|[{80, 3.1511617},...|
|         7833.0|[{138, 2.5645926}...|
|         7993.0|[{80, 3.1511617},...|
+---------------+--------------------+
only showing top 20 rows



In [118]:
labelReverse = IndexToString().setInputCol("user_id_indexed")
labelReverse.transform(model_recs).show()

Py4JJavaError: An error occurred while calling o1454.transform.
: java.lang.ClassCastException: class org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to class org.apache.spark.ml.attribute.NominalAttribute (org.apache.spark.ml.attribute.UnresolvedAttribute$ and org.apache.spark.ml.attribute.NominalAttribute are in unnamed module of loader 'app')
	at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:605)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
