### Binary Classifier on a single label using text_tokens as content

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import twitter_preproc

conf = SparkConf().setAll([
    ("num-executors", 4), 
    ("total-executor-cores", 16), 
    ("executor-memory", "8g"),
    ("spark.yarn.executor.memoryOverhead", "64g")])
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [2]:
#datapath = "///tmp/traintweet_10k.tsv"
datapath = "///tmp/traintweet_1000.tsv"
ENGAGEMENTS = ["like", "reply", "retweet", "retweet_with_comment"]

In [16]:
from pyspark.sql.types import *
from pyspark.sql.functions import when

SCHEMA = StructType([
        StructField("text_tokens", StringType()),
        StructField("hashtags", StringType()),
        StructField("tweet_id", StringType()),
        StructField("present_media", StringType()),
        StructField("present_links", StringType()),
        StructField("present_domains", StringType()),
        StructField("tweet_type", StringType()),
        StructField("language", StringType()),
        StructField("tweet_timestamp", LongType()),
        StructField("engaged_with_user_id", StringType()),
        StructField("engaged_with_user_follower_count", LongType()),
        StructField("engaged_with_user_following_count", LongType()),
        StructField("engaged_with_user_is_verified", BooleanType()),
        StructField("engaged_with_user_account_creation", LongType()),
        StructField("engaging_user_id", StringType()),
        StructField("engaging_user_follower_count", LongType()),
        StructField("engaging_user_following_count", LongType()),
        StructField("engaging_user_is_verified", BooleanType()),
        StructField("engaging_user_account_creation", LongType()),
        StructField("engaged_follows_engaging", BooleanType()),
        StructField("reply_timestamp", LongType()),
        StructField("retweet_timestamp", LongType()),
        StructField("retweet_with_comment_timestamp", LongType()),
        StructField("like_timestamp", LongType())       
    ])

raw = spark.read.csv(path=datapath, sep="\x01", header=False, schema=SCHEMA)
df = raw.select(["tweet_id","engaging_user_id",
                                    "retweet_timestamp","reply_timestamp",
                                    "retweet_with_comment_timestamp","like_timestamp","text_tokens"])

for engagement in ENGAGEMENTS:
            df = df.withColumn(engagement, when(df[engagement + "_timestamp"].isNotNull(), 1).cast(ByteType()))\
                .drop(engagement + "_timestamp")

df = df.fillna(0, subset=ENGAGEMENTS)
        
df.show(5)

+--------------------+--------------------+--------------------+----+-----+-------+--------------------+
|            tweet_id|    engaging_user_id|         text_tokens|like|reply|retweet|retweet_with_comment|
+--------------------+--------------------+--------------------+----+-----+-------+--------------------+
|E7D6C5094767223F6...|00000776B07587ECA...|101	1942	18628	15...|   0|    0|      0|                   0|
|129F4A868712BA2B9...|00000B85AAF7DE172...|101	56898	137	948...|   1|    0|      1|                   0|
|04C6C2175852CDBBC...|00000E0C9B364891C...|101	98377	22627	3...|   1|    0|      0|                   0|
|168157826315514C1...|00000F04EEDBCF3E1...|101	56898	137	111...|   1|    0|      0|                   0|
|B3E3673782A69D9D8...|000010088197DA00D...|101	100	119	6694	...|   1|    0|      0|                   0|
+--------------------+--------------------+--------------------+----+-----+-------+--------------------+
only showing top 5 rows



### Handle BERT Tokens like words, transform text_tokens into a sparse DF-IDF feature vector

In [17]:
from pyspark.ml.feature import RegexTokenizer,NGram,CountVectorizer,IDF,StringIndexer,Normalizer
from pyspark.ml import Pipeline

#stringIndexer = StringIndexer(inputCol="engaging_user_id", outputCol="engaging_user_id_idx")
regexTokenizer = RegexTokenizer(inputCol="text_tokens", outputCol="terms", pattern="\t")
cv = CountVectorizer(inputCol="terms", outputCol="vector")
idf = IDF(inputCol="vector", outputCol="features")
normalizer=Normalizer(inputCol="features",outputCol="normed_features")
pipeline = Pipeline(stages=[regexTokenizer, cv,idf,normalizer])

model = pipeline.fit(df)
data = model.transform(df)

In [18]:
data.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- engaging_user_id: string (nullable = true)
 |-- text_tokens: string (nullable = true)
 |-- like: byte (nullable = true)
 |-- reply: byte (nullable = true)
 |-- retweet: byte (nullable = true)
 |-- retweet_with_comment: byte (nullable = true)
 |-- terms: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- normed_features: vector (nullable = true)



### Just keep the necessary columns

In [20]:
data = data.select("normed_features","tweet_id","engaging_user_id","like","reply","retweet","retweet_with_comment")
data.printSchema()

root
 |-- normed_features: vector (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- engaging_user_id: string (nullable = true)
 |-- like: byte (nullable = true)
 |-- reply: byte (nullable = true)
 |-- retweet: byte (nullable = true)
 |-- retweet_with_comment: byte (nullable = true)



In [21]:
import scipy.sparse as sps
# Transform PySpark SparseVectors into scipy.sparse.csr_matrix
tf = data.rdd.map(lambda row: ((row.engaging_user_id,row.like),sps.csr_matrix(row.normed_features)))

### Aggregation

1. Create a new key with user_id,engagement
2. Perform a mapreduce on the sps.csr_matrix using the + operator
3. Keep a count value with increments on each new observation
4. After the mapreduce, divide the aggregated feature vectors with the respective count value. Also store the count value with the vector for later advantages

[How to Aggregate and Average with mapreduce](https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth)

In [22]:
aTuple = (0,0)
aggregated = tf.aggregateByKey(aTuple,
                        lambda a,b: (a[0] + b,    a[1] + 1),
                        lambda a,b: (a[0] + b[0], a[1] + b[1]))

user_vectors = aggregated.mapValues(lambda v: (v[1],v[0]/v[1]))

Now, a tuple in the paired RDD has the form key:(user_id,engagement) value(count,features)

Sanity Description 
- tup[0][0] is user_id
- tup[0][1] is the engagement
- tup[1][0] is the count
- tup[1][1]is the feature vectors

My Reasoning:

I have to free the key from the engagement boolean, I do this because now I have exactly one, or 2 feature vectors corresponding to a single user. Ideally a single key holds 2 vectors for each (positive and negative feedback).

# TODO Sonntag
The when function in the lambda means, if the engagement is 0 than multiply the feature vector with -1, with that preperation we can later use the + operator to "substract" the positive with negative samples

In [23]:
user_vectors = user_vectors.map(lambda tup: (tup[0][0],(tup[0][1],tup[1][0],tup[1][1])))

In [None]:
#user_vectors = user_vectors.map(lambda tup: (tup[0][0],(tup[0][1],tup[1][0],tup[1][1])) if tup[0][1] == 1
                               #else (tup[0][0],(tup[0][1],tup[1][0],tup[1][1].multiply(-1))))

### Fat TODO:

Now I have to create as much negative feature vectors as there are positive vectors for a user to get a balanced user profile. I do this because there is much more negative feedback in the dataset (every obs with 0 as an engagement). 

### Here, the aggregated negatve features vectors are substracted from the aggregated positive vectors

The user_profiles rdd now holds a single user index with the final user profile.
This rdd is eventually the "model", from which the cosine distance to a new tweet can be calculated. The order of positive and negative feedback should be guaranteed from the steps above as the engagement were once in the key,

In [107]:
#user_vectors = user_vectors.mapValues(lambda v: (v[0],v[1],when(v[0] == 0,v[2].multiply(-1))))

In [24]:
user_profiles = user_vectors.reduceByKey(lambda accumulator,value:accumulator[2]+value[2])

In [25]:
u = user_profiles.take(1)
u


[('00000776B07587ECA9717BFC301F2D6E',
  (0, 1, <1x9846 sparse matrix of type '<class 'numpy.float64'>'
   	with 28 stored elements in Compressed Sparse Row format>))]

In [26]:
user_profiles = user_profiles.mapValues(lambda v: v[2])


### Evaluate User Profiles

In [15]:
u = user_profiles.take(1)
u

[(555.0, <1x9846 sparse matrix of type '<class 'numpy.float64'>'
  	with 28 stored elements in Compressed Sparse Row format>)]

In [None]:
testpath = 

raw = spark.read.csv(path=datapath, sep="\x01", header=False, schema=SCHEMA)
df = raw.select(["tweet_id","engaging_user_id",
                                    "retweet_timestamp","reply_timestamp",
                                    "retweet_with_comment_timestamp","like_timestamp","text_tokens"])

for engagement in ENGAGEMENTS:
            df = df.withColumn(engagement, when(df[engagement + "_timestamp"].isNotNull(), 1).cast(ByteType()))\
                .drop(engagement + "_timestamp")

df = df.fillna(0, subset=ENGAGEMENTS)

In [12]:
rows, cols = u[0][1][2].nonzero()
cols


array([  31,   74,   93,  161,  163,  193,  197,  219,  275,  289,  290,
        301,  372,  388,  402,  466,  654,  791,  829,  962, 1036, 1626,
       2226, 2319, 2341, 4938, 7309, 7836], dtype=int32)

In [13]:
u[0][1][2][:,93].toarray()

array([[0.22530648]])

In [11]:
aggregated = data.rdd.map(lambda row: ((row.engaging_user_id_idx,row.like),sps.csr_matrix(row.normed_features))).reduceByKey(lambda acc,v:acc+v)

In [100]:
from pyspark.sql.functions import udf
import scipy.sparse as sps
from scipy.spatial.distance import cosine

class CB_Recommender:
    def __init__(self, spark: SparkSession, sc: SparkContext,data:df):
        self.spark = spark
        self.sc = sc
        self.engagement = "like"
        d = data.select("tweet_id","normed_features","engaging_user_id_idx",self.engagement)
        
        self.data,self.test = d.randomSplit(([0.9,0.1]))
        self.engaged = self.data.filter(df[self.engagement]==1)
        self.not_engaged = self.data.filter(df[self.engagement]==0)
               
    def build_single_user_profile(self,user_id): 
        # Get every feature vector from each tweet with which the user had a positive interaction
        positive_vectors = self.engaged.where(self.engaged["engaging_user_id_idx"] == user_id)

        positive_count = positive_vectors.count()
        
        if(positive_count==0):
            return None

        # Same for the negative interactions
        negative_vectors = self.not_engaged.where(self.not_engaged["engaging_user_id_idx"] == user_id)

        # Usually there are much mure negative than positive sample, so we take random tweets to get a few negative samples
        # it can currently happen that a positive sample is randomly selected
        difference = positive_count - negative_vectors.count()
            
        if(difference > 0):
            sample = self.data.sample(False,0.1,seed=14).limit(difference)
            negative_vectors = negative_vectors.union(sample)
        
        pos_rdd = positive_vectors[["engaging_user_id_idx","normed_features"]].rdd  
        neg_rdd = negative_vectors[["engaging_user_id_idx","normed_features"]].rdd
        
        pos_count = pos_rdd.count()
        neg_count = neg_rdd.count()

        # Map: Transform every SparseVector into a scipy.sparse csr matrix, this allows us to aggretate the vectors using the + operator
        # Reduce: aggregate the sparse matrices

        pos_agg = pos_rdd.map(lambda row: (row.engaging_user_id_idx,sps.csr_matrix(row.normed_features))).reduceByKey(lambda acc,v:acc+v)
        neg_agg = neg_rdd.map(lambda row: (row.engaging_user_id_idx,sps.csr_matrix(row.normed_features))).reduceByKey(lambda acc,v:acc+v)

        positive_weight = 1
        negative_weight = 1

        # https://en.wikipedia.org/wiki/Rocchio_algorithm
        positive_feedback = positive_weight * (1 / pos_count) * pos_agg.collect()[0][1]
        negative_feedback = negative_weight * (1 / neg_count) * neg_agg.collect()[0][1] 

        user_profile = positive_feedback - negative_feedback

        # Result is a single sparse vector representing the users preferences
        return user_profile
    
    ## TODO, einfach alles im vorinhein berechnen und jedem user sein profile zuweisen
    def build_user_profiles(self):
        
        users = self.data.select("engaging_user_id_idx").distinct()

        # TODO make the function work in the rdd map
        user_profiles = users.rdd.map(lambda u:(u.engaging_user_id_idx,self.build_single_user_profile(u.engaging_user_id_idx)))

        return user_profiles
    
    def recommend(self,tweet_id,user_id):
        u = self.build_single_user_profile(user_id)
        
        if(u==None):
            return 1
      
        t = self.data.where(self.data.tweet_id == tweet_id)
        t = sps.csr_matrix(t.first().normed_features)
        
        return cosine(u.toarray(),t.toarray())
    
    # for every user in the training set make predictions on the test set
    def temp(self,user_id):
        tweets = self.test.select("normed_features","tweet_id").rdd.collect()
        preds = self.user_vs_all_tweets(user_id,tweets)
        users = self.data.select("engaging_user_id_idx").distinct().rdd.collect()
        
        for u in users:
            pass
            
        print(users.count())

In [101]:
cb = CB_Recommender(spark, sc,data)
user_profiles = cb.build_user_profiles().collect()
user_profiles

Traceback (most recent call last):
  File "/home/anaconda3/lib/python3.6/site-packages/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/anaconda3/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/home/anaconda3/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/home/anaconda3/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/home/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/anaconda3/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/home/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/anaconda3/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "/home/anacon

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o130.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



In [40]:
#1CF52497FFED4BB42FCAD52A244357B7 equal
#168157826315514C120494D4DF8E6216 irgendoana
cb = CB_Recommender(spark, sc,data)
sim = 1 - cb.recommend("1CF52497FFED4BB42FCAD52A244357B7",7000.0)
sim

0

In [35]:
test = preproc = twitter_preproc.twitter_preproc(spark, sc, "///tmp/test_1000/test.tsv", MF=True)
test

AnalysisException: 'Path does not exist: hdfs://nameservice1/tmp/test_1000/test.tsv;'

In [71]:
user_profiles

<1x31642 sparse matrix of type '<class 'numpy.float64'>'
	with 325 stored elements in Compressed Sparse Row format>

# Relevance Feedback with Rocchios method 

https://en.wikipedia.org/wiki/Rocchio_algorithm
7, S.36

1. Get every tweet with which a user has interacted
2. Split the user vectors into positive/negativ feedback.
3. Aggregate the respective vectors
4. Normalize them (1/N)
5. Multiply each with weights  __TODO__
6. Substract the negative feedback from the positive feedback
8. Multiply the resulting vector with the original Query Vector, __TODO__
7. ??? 
8. Profit

__TODO__ handle non existing feedback

In [53]:
import scipy.sparse as sps

# TODO: confidence weights for implicit feedback - > do we treat 0 as negative feedback?
def build_user_profile(df,user_id):  
    user_vectors = df.where(df["engaging_user_id_idx"] == user_id)
    
    positive_vectors = user_vectors.where(user_vectors["like"] == 1)
    negative_vectors = user_vectors.where(user_vectors["like"] == 0)
    
    difference = positive_vectors.count() - negative_vectors.count()
    
    # Take random tweets to fill negative samples, it can currently happen that a positive sample is randomly selected
    # double negative than positive samples
    sample = df.sample(False,0.1,seed=14).limit(difference * 2)
    
    negative_vectors = negative_vectors.union(sample)
    
    pos_rdd = positive_vectors[["engaging_user_id_idx","normed_features"]].rdd  
    neg_rdd = negative_vectors[["engaging_user_id_idx","normed_features"]].rdd
  
    pos_count = pos_rdd.count()
    neg_count = neg_rdd.count()
    
    # sum up the feature vectors with a mapreduce by transforming the sparse vectors into sparse matrices
    
    pos_agg = pos_rdd.map(lambda row: (row.engaging_user_id_idx,sps.csr_matrix(row.normed_features))).reduceByKey(lambda acc,v:acc+v)
    neg_agg = neg_rdd.map(lambda row: (row.engaging_user_id_idx,sps.csr_matrix(row.normed_features))).reduceByKey(lambda acc,v:acc+v)
    
    positive_weight = 1
    negative_weight = 1
    
    # https://en.wikipedia.org/wiki/Rocchio_algorithm
    positive_feedback = positive_weight * (1 / pos_count) * pos_agg.collect()[0][1]
    negative_feedback = negative_weight * (1 / neg_count) * neg_agg.collect()[0][1] 
    
    user_profile = positive_feedback - negative_feedback
      
    return user_profile

In [None]:
build_profile_udf = udf(lambda x: build_user_profile())

In [59]:
users = data.select("engaging_user_id_idx").distinct().rdd

user_profiles = users.map(lambda row: (row.engaging_user_id_idx,build_profiles(row.engaging_user_id_idx)))
print(user_profiles.collect())
                          

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 375.0 failed 4 times, most recent failure: Lost task 2.3 in stage 375.0 (TID 1239, c114.local, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-59-a86a17bf5e5c>", line 3, in <lambda>
NameError: name 'build_profiles' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-59-a86a17bf5e5c>", line 3, in <lambda>
NameError: name 'build_profiles' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [23]:
tweets = data.select("tweet_id","normed_features").rdd
tweet_features = tweets.map(lambda row: (row.tweet_id,sps.csc_matrix(row.normed_features)))

In [38]:
from scipy.spatial.distance import cosine

def recommend(df,tweet_id,user_id):
    u = build_user_profile(df,user_id)
    
    t = df.where(df["tweet_id"] == tweet_id)
    t = t.select("normed_features")
    t = sps.csc_matrix(t.first()[0])
    
    return cosine(u.toarray(),t.toarray())

In [44]:
recommend(data,"168157826315514C120494D4DF8E6216",600.0)

  (0, 0)	0.013845488990041259
  (0, 1)	0.1037001228911963
  (0, 2)	0.03465906704601485
  (0, 6)	0.03219979617852975
  (0, 7)	0.042707182116782164
  (0, 8)	0.03580897355378667
  (0, 9)	0.03609613951553191
  (0, 12)	0.05461235951258429
  (0, 18)	0.10389986122586105
  (0, 33)	0.11506590988176135
  (0, 47)	0.12169408024732281
  (0, 50)	0.14057545479780703
  (0, 72)	0.12927280178151165
  (0, 86)	0.13328953446406402
  (0, 94)	0.13572802356160849
  (0, 96)	0.1380532206938244
  (0, 367)	0.21522121140846792
  (0, 390)	0.22507047424705642
  (0, 744)	0.26057784325956845
  (0, 1192)	0.2755546241908368
  (0, 1789)	0.2916033205813707
  (0, 2065)	0.29705293353216194
  (0, 2115)	0.29705293353216194
  (0, 2837)	0.31540431372920796
  (0, 3174)	0.33164458503476435
  (0, 7624)	0.3702307841988475
  (0, 50)	0.07992438891110214
  (0, 101)	0.0895473623499102
  (0, 103)	0.0895473623499102
  (0, 113)	0.09339328673556299
  (0, 127)	0.19533118561151952
  (0, 139)	0.09912097231562234
  (0, 141)	0.1002636910332927


0.9772329220623733

In [80]:
users = data.select("engaging_user_id_idx").distinct().collect()


In [86]:
u = build_user_profile(data,1.0)

+--------------------+--------------------+----+-----+-------+--------------------+
|engaging_user_id_idx|            tweet_id|like|reply|retweet|retweet_with_comment|
+--------------------+--------------------+----+-----+-------+--------------------+
|                 1.0|012F5BF74EE06D38F...|   1|    0|      0|                   0|
|                 1.0|6494D28D09EE66C19...|   1|    0|      0|                   0|
|                 1.0|773040A6BB374BB0D...|   1|    0|      0|                   0|
|                 1.0|DB504512D01494FCE...|   1|    0|      0|                   0|
+--------------------+--------------------+----+-----+-------+--------------------+

+--------------------+--------+----+-----+-------+--------------------+
|engaging_user_id_idx|tweet_id|like|reply|retweet|retweet_with_comment|
+--------------------+--------+----+-----+-------+--------------------+
+--------------------+--------+----+-----+-------+--------------------+



In [35]:
t = tweet_features.map(lambda t: (t[0],cosine(t[1].toarray(),u.toarray())))

In [36]:
t.take(5)

[('E7D6C5094767223F6F8789A87A1937AB', 1.0),
 ('129F4A868712BA2B98D31AF98C3066E4', 0.9931761850625865),
 ('04C6C2175852CDBBC23B2446C7E7C22D', 0.9840771707836296),
 ('168157826315514C120494D4DF8E6216', 0.9871904330427985),
 ('B3E3673782A69D9D8A45D3B222F0B073', 0.9899149407959815)]

# TODO: Convert cosine distance to probability somehow?

In [9]:
sc.stop()

# Content Based approach (Memory-Based)

Our approach was to treat the BERT tokens like words and generate an id-idf feature vector. Every tweet is represented as a sparse feature vector, the user profile is generated with every tweet which the user has engaged with. 

We tried to use Rocchio's Method to generate relevance feedback for a user. This method was not feasable, due to the massive amount of features. For Rocchio's Method past rated items are split into two classes, positive feedback and negative feedback. To compute the user profile, one has to aggregate each feature vector from the positive and negative ones. This step was crucial for performance, because the aggreation of a sparse vector with ~32.000 features did not finish in a reasonable time.

## Vector aggregation in pyspark

It is neccessary in pyspark to transform a SparseVector into an intermediate format, in this case an array, to perform aggregate functions. So for the linear combination of our user feedback, we had to aggregate arrays instead of SparseVectors, which is also a factor for the poor performance. 

