# Matrix Factorization

In [1]:
import findspark
findspark.init("/usr/spark-2.4.1")
import pyspark
from pyspark import SQLContext
from pyspark.sql.functions import  when, col


pyspark.SparkContext.setSystemProperty('spark.executor.memory', '14g')
sc = pyspark.SparkContext()
sql = SQLContext(sc)

In [2]:
datafile = "data/training_sample.tsv"

train_df = (sql.read
    .format("csv")
    .option("header", "false")
    .option("sep", "\x01")
    .load(datafile,  inferSchema="true")
    .toDF("text_tokens", "hashtags", "tweet_id", "present_media", "present_links", "present_domains","tweet_type", "language", "tweet_timestamp", "engaged_with_user_id", "engaged_with_user_follower_count","engaged_with_user_following_count", "engaged_with_user_is_verified", "engaged_with_user_account_creation",\
               "engaging_user_id", "engaging_user_follower_count", "engaging_user_following_count", "engaging_user_is_verified","engaging_user_account_creation", "engaged_follows_engaging", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp"))

train_df = train_df.select("engaging_user_id", "tweet_id", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp")


In [3]:
tweet2id = train_df.select("tweet_id").rdd.map(lambda x: x[0]).distinct().zipWithUniqueId()
user2id = train_df.select("engaging_user_id").rdd.map(lambda x: x[0]).distinct().zipWithUniqueId()

In [4]:
tweet2id = tweet2id.toDF().withColumnRenamed("_1", "tweet_id_str").withColumnRenamed("_2", "tweet")
user2id = user2id.toDF().withColumnRenamed("_1", "user_id_str").withColumnRenamed("_2", "user")

In [5]:
from pyspark.sql.functions import  when, col

train_df = train_df.join(tweet2id, col("tweet_id") == col("tweet_id_str"))
train_df = train_df.join(user2id, col("engaging_user_id") == col("user_id_str"))
train_df = train_df.select("user", "tweet", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp")

In [6]:
train_df.take(1)

[Row(user=1192, tweet=6464, reply_timestamp=None, retweet_timestamp=None, retweet_with_comment_timestamp=None, like_timestamp=None)]

In [7]:
train_df = train_df.withColumnRenamed("engaging_user_id", "user")
train_df = train_df.withColumnRenamed("tweet_id","tweet" )


In [8]:
target_cols = ["reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp"]

def encode_response(x):
    return when(col(x).isNull(), float(0)).otherwise(float(1))

for target_col in target_cols:
    train_df = train_df.withColumn(target_col[:-10], encode_response(target_col))


In [9]:
train_df = train_df.select("user", "tweet", "reply", "retweet", "retweet_with_comment", "like")

In [10]:
datafile_val = "data/competition_test.tsv"

test_df = (sql.read
    .format("csv")
    .option("header", "false")
    .option("sep", "\x01")
    .load(datafile_val,  inferSchema="true")
    .toDF("text_tokens", "hashtags", "tweet_id", "present_media", "present_links", "present_domains","tweet_type", "language", "tweet_timestamp", "engaged_with_user_id", "engaged_with_user_follower_count","engaged_with_user_following_count", "engaged_with_user_is_verified", "engaged_with_user_account_creation",\
               "engaging_user_id", "engaging_user_follower_count", "engaging_user_following_count", "engaging_user_is_verified","engaging_user_account_creation", "engaged_follows_engaging"))

test_df = test_df.select("tweet_id","engaging_user_id")

In [11]:
tweet2id_new = test_df.select("tweet_id").rdd.map(lambda x: x[0]).distinct().zipWithUniqueId()
user2id_new = test_df.select("engaging_user_id").rdd.map(lambda x: x[0]).distinct().zipWithUniqueId()
tweet2id_new = tweet2id_new.toDF().withColumnRenamed("_1", "tweet_id_str_new").withColumnRenamed("_2", "tweet_new")
user2id_new = user2id_new.toDF().withColumnRenamed("_1", "user_id_str_new").withColumnRenamed("_2", "user_new")

In [12]:
test_df = test_df.join(tweet2id_new, col("tweet_id") == col("tweet_id_str_new"), "left_outer")
test_df = test_df.join(user2id_new, col("engaging_user_id") == col("user_id_str_new"), "left_outer")

In [13]:
test_df = test_df.join(tweet2id, col("tweet_id") == col("tweet_id_str"), "left_outer")
test_df = test_df.join(user2id, col("engaging_user_id") == col("user_id_str"), "left_outer")

In [14]:
max_user_id = user2id.groupBy().max("user").collect()[0][0]
max_tweet_id = tweet2id.groupBy().max("tweet").collect()[0][0]

def create_index(old, new):
    if old == "user":
        max_val = max_user_id
    elif old == "tweet":
        max_val = max_tweet_id
    return when(col(old).isNull(), col(new) + max_val).otherwise(col(old))


test_df = test_df.withColumn("user", create_index("user", "user_new"))
test_df = test_df.withColumn("tweet", create_index("tweet", "tweet_new"))

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

models = {}

maxIter=20
regParam=0.001
rank=20

for target_col in target_cols:
    target_col = target_col[:-10]
    print("Training Model for {}".format(target_col))
    models[target_col] = ALS(maxIter=maxIter, regParam=regParam, rank=rank, 
          userCol="user", itemCol="tweet", ratingCol=target_col,
          coldStartStrategy="nan", implicitPrefs=True).fit(train_df)
    
    # Evaluate the model by computing the RMSE on the test data
    test_df = models[target_col].transform(test_df)
    test_df = test_df.withColumnRenamed("prediction", target_col )
    

Training Model for reply
Training Model for retweet
Training Model for retweet_with_comment
Training Model for like


In [16]:
test_df.take(10)

[Row(tweet_id='8B5438E60671925BC9FA37DBF81A1FF3', engaging_user_id='F5EA455081075054AF7F483503352B23', tweet_id_str_new='8B5438E60671925BC9FA37DBF81A1FF3', tweet_new=227, user_id_str_new='F5EA455081075054AF7F483503352B23', user_new=4235531, tweet_id_str=None, tweet=80332, user_id_str=None, user=4245928, reply=nan, retweet=nan, retweet_with_comment=nan, like=nan),
 Row(tweet_id='4EFDBBFC5DBF463C7965F239C466AD6D', engaging_user_id='A3A0F9071D457C09C3825B90A8A1C402', tweet_id_str_new='4EFDBBFC5DBF463C7965F239C466AD6D', tweet_new=346, user_id_str_new='A3A0F9071D457C09C3825B90A8A1C402', user_new=1432024, tweet_id_str=None, tweet=80451, user_id_str=None, user=1442421, reply=nan, retweet=nan, retweet_with_comment=nan, like=nan),
 Row(tweet_id='886998CD47370112AB6C905FC18CFC03', engaging_user_id='2A3EBD0405E9DFC1CEB50B72B486524E', tweet_id_str_new='886998CD47370112AB6C905FC18CFC03', tweet_new=474, user_id_str_new='2A3EBD0405E9DFC1CEB50B72B486524E', user_new=672346, tweet_id_str=None, tweet=805

In [26]:
from pyspark.sql.functions import rand,when,isnan

def fallback_prediction(x):
    return when(isnan(x), rand()).otherwise(col(x))

for target_col in target_cols:
        test_df = test_df.withColumn(target_col[:-10], fallback_prediction(target_col[:-10]))

In [27]:
test_df.take(10)

[Row(tweet_id='8B5438E60671925BC9FA37DBF81A1FF3', engaging_user_id='F5EA455081075054AF7F483503352B23', tweet_id_str_new='8B5438E60671925BC9FA37DBF81A1FF3', tweet_new=227, user_id_str_new='F5EA455081075054AF7F483503352B23', user_new=4235531, tweet_id_str=None, tweet=80332, user_id_str=None, user=4245928, reply=0.6175814194076299, retweet=0.36664128426510845, retweet_with_comment=0.9105523205447466, like=0.7404391343869123),
 Row(tweet_id='4EFDBBFC5DBF463C7965F239C466AD6D', engaging_user_id='A3A0F9071D457C09C3825B90A8A1C402', tweet_id_str_new='4EFDBBFC5DBF463C7965F239C466AD6D', tweet_new=346, user_id_str_new='A3A0F9071D457C09C3825B90A8A1C402', user_new=1432024, tweet_id_str=None, tweet=80451, user_id_str=None, user=1442421, reply=0.931873319102214, retweet=0.7289156038330726, retweet_with_comment=0.9639885931071754, like=0.9167910497795758),
 Row(tweet_id='886998CD47370112AB6C905FC18CFC03', engaging_user_id='2A3EBD0405E9DFC1CEB50B72B486524E', tweet_id_str_new='886998CD47370112AB6C905FC18

In [None]:
#from pyspark.mllib.evaluation import BinaryClassificationMetrics##

#metrics = {}

#for target_col in target_cols:
#    target_col = target_col[:-10]
#    predictionAndLabels = test.rdd.map(lambda r: (r[target_col+"_pred"], r[target_col]))
#    metric = BinaryClassificationMetrics(predictionAndLabels)
#    metrics[target_col] = metric.areaUnderPR
#    print("For {}: Area under PR = {}".format(target_col, metrics[target_col]))

In [None]:
#a = sc.parallelize(metrics.items())

#a.coalesce(1).saveAsTextFile("test")