## Preparations

In [1]:
! pip install rs-datasets

%load_ext autoreload
%autoreload 2

%config Completer.use_jedi = False

import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)

import logging
import time

from pyspark.sql import functions as sf, types as st
from pyspark.sql.types import IntegerType

from replay.data_preparator import DataPreparator, Indexer
from replay.experiment import Experiment
from replay.metrics import Coverage, HitRate, MRR, MAP, NDCG, Surprisal
from replay.models import (
    ALSWrap, 
    ADMMSLIM, 
    ItemKNN,
    LightFMWrap, 
    MultVAE, 
    NeuroMF, 
    SLIM, 
    PopRec, 
    RandomRec,
    UCB,
    KL_UCB,
    Wilson, 
    Word2VecRec,
)

from replay.models.base_rec import HybridRecommender
from replay.session_handler import State
from replay.splitters import DateSplitter
from replay.utils import get_log_info
from rs_datasets import MovieLens



In [2]:
spark = State().session
spark

spark.sparkContext.setLogLevel('ERROR')

logger = logging.getLogger("replay")

K = 10
K_list_metrics = [1, 5, 10]
BUDGET = 20
BUDGET_NN = 10
SEED = 12345

23/06/19 16:18:52 WARN Utils: Your hostname, UX430 resolves to a loopback address: 127.0.1.1; using 192.168.1.64 instead (on interface wlp2s0)
23/06/19 16:18:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/06/19 16:18:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/06/19 16:18:53 WARN DependencyUtils: Local jar /home/arqa/MyRePlay/my_experiments/jars/replay_2.12-0.1.jar does not exist, skipping.
23/06/19 16:18:53 INFO SparkContext: Running Spark version 3.1.3
23/06/19 16:18:53 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/06/19 16:18:53 INFO ResourceUtils: No custom resources configured for spark.driver.
23/06/19 16:18:53 INFO SparkContext: Submitted application: p

In [3]:
data = MovieLens("100k")
data.info()

ratings


Unnamed: 0,user_id,item_id,rating,timestamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116



users


Unnamed: 0,user_id,gender,age,occupation,zip_code
0,1,24,M,technician,85711
1,2,53,F,other,94043
2,3,23,M,writer,32067



items


Unnamed: 0,item_id,title,release_date,imdb_url,unknown,Action,Adventure,Animation,Children's,Comedy,...,Fantasy,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),01-Jan-1995,http://us.imdb.com/M/title-exact?Toy%20Story%2...,False,False,False,True,True,True,...,False,False,False,False,False,False,False,False,False,False
1,2,GoldenEye (1995),01-Jan-1995,http://us.imdb.com/M/title-exact?GoldenEye%20(...,False,True,True,False,False,False,...,False,False,False,False,False,False,False,True,False,False
2,3,Four Rooms (1995),01-Jan-1995,http://us.imdb.com/M/title-exact?Four%20Rooms%...,False,False,False,False,False,False,...,False,False,False,False,False,False,False,True,False,False





In [4]:
preparator = DataPreparator()

log = preparator.transform(columns_mapping={'user_id': 'user_id',
                                      'item_id': 'item_id',
                                      'relevance': 'rating',
                                      'timestamp': 'timestamp'
                                     }, 
                           data=data.ratings)

log.show(2)

19-Jun-23 16:18:59, replay, INFO: Columns with ids of users or items are present in mapping. The dataframe will be treated as an interactions log.
                                                                                

+-------+-------+---------+-------------------+
|user_id|item_id|relevance|          timestamp|
+-------+-------+---------+-------------------+
|    196|    242|      3.0|1997-12-04 18:55:49|
|    186|    302|      3.0|1998-04-04 23:22:22|
+-------+-------+---------+-------------------+
only showing top 2 rows



In [5]:
# will consider ratings >= 3 as positive feedback. A positive feedback is treated with relevance = 1
only_positives_log = log.filter(sf.col('relevance') >= 3).withColumn('relevance', sf.lit(1))
only_positives_log.count()

# we will use only algorithms which do not require user and item features and thus set feature dataframes to None
user_features=None
item_features=None

In [6]:
indexer = Indexer(user_col='user_id', item_col='item_id')

indexer.fit(users=log.select('user_id'),
           items=log.select('item_id'))

log_replay = indexer.transform(df=only_positives_log)
log_replay.show(2)

                                                                                

+--------+--------+---------+-------------------+
|user_idx|item_idx|relevance|          timestamp|
+--------+--------+---------+-------------------+
|     645|     287|        1|1997-12-04 18:55:49|
|     382|      37|        1|1998-04-04 23:22:22|
+--------+--------+---------+-------------------+
only showing top 2 rows



In [7]:
# train/test split 
train_spl = DateSplitter(
    test_start=0.2,
    drop_cold_items=True,
    drop_cold_users=True,

)
train, test = train_spl.split(log_replay)
print('train info:\n', get_log_info(train))
print('test info:\n', get_log_info(test))

print("train is cashed:", train.is_cached)

# train/test split for hyperparameters selection
opt_train, opt_val = train_spl.split(train)
opt_train.count(), opt_val.count()

print("opt train is cashed: ", opt_train.is_cached)

train info:
 total lines: 66016, total users: 752, total items: 1501
test info:
 total lines: 2279, total users: 104, total items: 824
train is cashed: True
opt train is cashed:  True


In [8]:
# negative feedback will be used for Wilson and UCB models
only_negatives_log = indexer.transform(df=log.filter(sf.col('relevance') < 3).withColumn('relevance', sf.lit(0.)))
test_start = test.agg(sf.min('timestamp')).collect()[0][0]

# train with both positive and negative feedback
pos_neg_train=(train
              .withColumn('relevance', sf.lit(1.))
              .union(only_negatives_log.filter(sf.col('timestamp') < test_start))
             )
pos_neg_train.cache()
pos_neg_train.count()

train.show(2)

+--------+--------+---------+-------------------+
|user_idx|item_idx|relevance|          timestamp|
+--------+--------+---------+-------------------+
|     645|     287|        1|1997-12-04 18:55:49|
|     283|     124|        1|1998-01-07 17:20:06|
+--------+--------+---------+-------------------+
only showing top 2 rows



## KL-UCB

In [9]:
log = pos_neg_train

In [10]:
items_counts_aggr = log.groupby("item_idx").agg(
    sf.sum("relevance").alias("pos"),
    sf.count("relevance").alias("total"),
)

full_count = log.count()
print(full_count)
items_counts_aggr.show(3)

80253
+--------+-----+-----+
|item_idx|  pos|total|
+--------+-----+-----+
|      18|276.0|  310|
|      14|301.0|  313|
|     403| 58.0|   64|
+--------+-----+-----+
only showing top 3 rows



### to/from pandas dataframe

In [33]:
from scipy.optimize import brentq
import numpy as np
coef = 0.0

def get_ucb(row) :
    p = row.pos / row.total
    eps = 1e-12
    rhs = np.log(full_count) + coef * np.log(np.log(full_count))
    
    if (p == 0) :
        def problem(q) : 
            return np.log(1/(1-q)) - rhs
        ucb = brentq(problem, 0, 1-eps)
        return ucb
    
    if (p == 1) :
        def problem(q) : 
            return np.log(1/q) - rhs
        ucb = brentq(problem, 0+eps, 1)
        return ucb
        
    def Bernoulli_KL(p,q) :
        return p * np.log(p/q) + (1-p) * np.log((1-p)/(1-q))
    
    def problem(q) :
        return row.total * Bernoulli_KL(p, q) - rhs
    
    ucb = brentq(problem, p, 1-eps)
    return ucb

In [35]:
%%timeit
pd_df = items_counts_aggr.toPandas()
pd_df['relevance'] = pd_df[['pos', 'total']].apply(get_ucb, axis=1)
sp_df = State().session.createDataFrame(pd_df[['item_idx', 'relevance']])
sp_df.show(3)

+--------+------------------+
|item_idx|         relevance|
+--------+------------------+
|      18|0.9557570882379833|
|      14| 0.993369593851037|
|     403| 0.994167574847216|
+--------+------------------+
only showing top 3 rows

+--------+------------------+
|item_idx|         relevance|
+--------+------------------+
|      18|0.9557570882379833|
|      14| 0.993369593851037|
|     403| 0.994167574847216|
+--------+------------------+
only showing top 3 rows

+--------+------------------+
|item_idx|         relevance|
+--------+------------------+
|      18|0.9557570882379833|
|      14| 0.993369593851037|
|     403| 0.994167574847216|
+--------+------------------+
only showing top 3 rows

+--------+------------------+
|item_idx|         relevance|
+--------+------------------+
|      18|0.9557570882379833|
|      14| 0.993369593851037|
|     403| 0.994167574847216|
+--------+------------------+
only showing top 3 rows

+--------+------------------+
|item_idx|         relevance|


### PySpark UDF

In [11]:
from scipy.optimize import brentq
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
coef = 0.0

@udf(returnType=DoubleType())
def get_ucb(pos, total) :
    p = pos / total
    eps = 1e-12
    rhs = np.log(full_count) + coef * np.log(np.log(full_count))
    
    if (p == 0) :
        def problem(q) : 
            return np.log(1/(1-q)) - rhs
        ucb = brentq(problem, 0, 1-eps)
        return ucb
    
    if (p == 1) :
        def problem(q) : 
            return np.log(1/q) - rhs
        ucb = brentq(problem, 0+eps, 1)
        return ucb
        
    def Bernoulli_KL(p,q) :
        return p * np.log(p/q) + (1-p) * np.log((1-p)/(1-q))
    
    def problem(q) :
        return total * Bernoulli_KL(p, q) - rhs
    
    ucb = brentq(problem, p, 1-eps)
    return ucb

In [12]:
#%%timeit
items_counts = items_counts_aggr.withColumn(
            "relevance", get_ucb("pos", "total")
        )

item_popularity = items_counts.drop("pos", "total")
item_popularity.cache().count()
item_popularity.show()



+--------+------------------+
|item_idx|         relevance|
+--------+------------------+
|      18|0.9557570882379833|
|      14| 0.993369593851037|
|     403| 0.994167574847216|
|     365|0.7888825885230141|
|      38|0.9205038472286439|
|     348|0.9723769262045451|
|     225|0.9894810088361161|
|      46|0.9991579806365702|
|     343|0.9645615995280864|
|     406|0.9709802035192856|
|     475|0.9999999014167881|
|     257|0.9592150887384809|
|     687| 0.914985202296716|
|     280|0.9060944651219291|
|     300| 0.979202371585167|
|     619|0.7458957668891314|
|     161|0.9847566821177637|
|     263|0.8682098609240727|
|     443|0.9966422673284837|
|     748|0.9998869021701624|
+--------+------------------+
only showing top 20 rows



                                                                                

### PySpark's Pandas UDF (DOES NOT WORK, idk why)

In [23]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
from typing import Iterator, Tuple
import numpy as np
import pandas as pd
from scipy.optimize import brentq

coef = 0.0
eps = 1e-12
rhs = np.log(full_count) + coef * np.log(np.log(full_count))

def Bernoulli_KL(p,q) :
    return p * np.log(p/q) + (1-p) * np.log((1-p)/(1-q))

@pandas_udf("double")
def get_ucb(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    ucb = 0.0
    for pos, total in iterator:
        p = pos / total
        ucb = brentq(lambda q: np.log(1/(1-q)) - rhs, 0, 1-eps)
        #p = pos / total
        #if   (p == 0) :
        #    ucb = brentq(lambda q: np.log(1/(1-q)) - rhs, 0, 1-eps)
        #elif (p == 1) :
        #    ucb = brentq(lambda q: np.log(1/q) - rhs    , 0+eps, 1)
        #else :
        #    ucb = brentq(lambda q: total * Bernoulli_KL(p, q) - rhs, p, 1-eps)
        yield ucb

In [24]:

items_counts = items_counts_aggr.withColumn(
            "relevance", get_ucb("pos", "total")
        )

item_popularity = items_counts.drop("pos", "total")
item_popularity.show(3)

23/06/19 16:23:38 ERROR Executor: Exception in task 0.0 in stage 105.0 (TID 860)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 81, in dump_stream
    for batch in iterator:
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 81, in dump_stream
    for batch in iterator:
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 266, in init_stream_yield_batches
    for series in iterator:
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 356, in func
    for result_batch, result_type in result_iter:
  File "/home/arqa/MyRePlay/.venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 114, in verify_result_type
    raise TypeError("Return type of the user-defined function should be "
TypeError: Return type of the user-defined function should be Pandas.Series, but is <class 'float'>


### PySpark's Pandas UDF + SciPy Multivariate Optimization

In [None]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

@pandas_udf("double")
def get_ucb(pos: pd.Series, total: pd.Series) -> pd.Series:
    # NOT IMPLEMENTED