In [1]:
import os

import pandas as pd
import numpy as np

import time

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col
import pyspark.sql.functions as sf
import pyspark.sql.types as st
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes, FMClassifier
from pyspark.ml import Pipeline, PipelineModel
from sim4rec.modules import Simulator

from replay.metrics import NDCG, Precision, RocAuc, Metric
from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score, f1_score
from replay.data_preparator import Indexer

from sim4rec.utils import pandas_to_spark, VectorElementExtractor
from sim4rec.modules import RealDataGenerator, SDVDataGenerator
from sim4rec.modules import evaluate_synthetic, EvaluateMetrics
from sim4rec.response import ParametricResponseFunction, BernoulliResponse, NoiseResponse

from replay.models import UCB, ItemKNN
from replay.models import RandomRec

%matplotlib inline
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

spark = SparkSession.builder\
    .appName('simulator_movielens')\
    .master('local[*]')\
    .config('spark.sql.shuffle.partitions', '192')\
    .config('spark.default.parallelism', '192')\
    .config('spark.driver.extraJavaOptions', '-XX:+UseG1GC')\
    .config('spark.executor.extraJavaOptions', '-XX:+UseG1GC')\
    .config('spark.sql.autoBroadcastJoinThreshold', '-1')\
    .config('spark.driver.memory', '256g')\
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

def calc_metric(response_df):
    return response_df.groupBy("user_idx").agg(sf.sum("response").alias("num_positive")).select(sf.mean("num_positive")).collect()[0][0]

  from .autonotebook import tqdm as notebook_tqdm


24/04/09 09:43:18 WARN Utils: Your hostname, ecs-syudosaev-big resolves to a loopback address: 127.0.1.1; using 10.11.12.124 instead (on interface eth0)
24/04/09 09:43:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/04/09 09:43:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/09 09:43:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
USER_PREFIX = 'user_'
ITEM_PREFIX = 'item_'

USER_SCHEMA = st.StructType(
    [st.StructField('user_idx', st.IntegerType())] +\
    [st.StructField(f'genre{i}', st.DoubleType()) for i in range(19)] +\
    [st.StructField('rating_avg', st.DoubleType())] +\
    [st.StructField(f'w2v_{i}', st.DoubleType()) for i in range(300)]
)
ITEM_SCHEMA = st.StructType(
    [st.StructField('item_idx', st.IntegerType())] +\
    [st.StructField('year', st.IntegerType())] +\
    [st.StructField('rating_avg', st.DoubleType())] +\
    [st.StructField(f'genre{i}', st.DoubleType()) for i in range(19)] +\
    [st.StructField(f'w2v_{i}', st.DoubleType()) for i in range(300)]
)
LOG_SCHEMA = st.StructType([
    st.StructField('user_idx', st.IntegerType()),
    st.StructField('item_idx', st.IntegerType()),
    st.StructField('relevance', st.DoubleType()),
    st.StructField('timestamp', st.IntegerType())
])

users_df_train = spark.read.csv('train/users.csv', header=True, schema=USER_SCHEMA)
items_df_train = spark.read.csv('train/items.csv', header=True, schema=ITEM_SCHEMA)
log_df_train   = spark.read.csv('train/rating.csv', header=True, schema=LOG_SCHEMA)

users_df_train = users_df_train.withColumnRenamed("user_idx", "user_id")
items_df_train = items_df_train.withColumnRenamed("item_idx", "item_id")
log_df_train = log_df_train.withColumnRenamed("user_idx", "user_id")
log_df_train = log_df_train.withColumnRenamed("item_idx", "item_id")

log_df_train = log_df_train.join(users_df_train, log_df_train['user_id'] == users_df_train['user_id'], 'leftsemi')
log_df_train = log_df_train.join(items_df_train, log_df_train['item_id'] == items_df_train['item_id'], 'leftsemi')

for c in users_df_train.columns[1:]:
    users_df_train = users_df_train.withColumnRenamed(c, 'user_' + c)
    
for c in items_df_train.columns[1:]:
    items_df_train = items_df_train.withColumnRenamed(c, 'item_' + c)

log_df_train = log_df_train.withColumn(
    'relevance', sf.when(sf.col('relevance') > 3, 1).otherwise(0))

users_df_train = users_df_train.cache()
items_df_train = items_df_train.cache()
log_df_train = log_df_train.cache()

print(users_df_train.count())
print(items_df_train.count())
print(log_df_train.count())

                                                                                

80650


                                                                                

27278




10000132


                                                                                

In [3]:
users_df_val = spark.read.csv('val/users.csv', header=True, schema=USER_SCHEMA)
items_df_val = spark.read.csv('val/items.csv', header=True, schema=ITEM_SCHEMA)
log_df_val   = spark.read.csv('val/rating.csv', header=True, schema=LOG_SCHEMA)

users_df_val = users_df_val.withColumnRenamed("user_idx", "user_id")
items_df_val = items_df_val.withColumnRenamed("item_idx", "item_id")
log_df_val = log_df_val.withColumnRenamed("user_idx", "user_id")
log_df_val = log_df_val.withColumnRenamed("item_idx", "item_id")

log_df_val = log_df_val.join(users_df_val, log_df_val['user_id'] == users_df_val['user_id'], 'leftsemi')
log_df_val = log_df_val.join(items_df_val, log_df_val['item_id'] == items_df_val['item_id'], 'leftsemi')

for c in users_df_val.columns[1:]:
    users_df_val = users_df_val.withColumnRenamed(c, 'user_' + c)

for c in items_df_val.columns[1:]:
    items_df_val = items_df_val.withColumnRenamed(c, 'item_' + c)

log_df_val = log_df_val.withColumn(
    'relevance', sf.when(sf.col('relevance') > 3, 1).otherwise(0))

users_df_val = users_df_val.cache()
items_df_val = items_df_val.cache()
log_df_val = log_df_val.cache()

print(users_df_val.count())
print(items_df_val.count())
print(log_df_val.count())

                                                                                

106573
27278




5000065


                                                                                

In [4]:
users_df_test = spark.read.csv('test/users.csv', header=True, schema=USER_SCHEMA)
items_df_test = spark.read.csv('test/items.csv', header=True, schema=ITEM_SCHEMA)
log_df_test   = spark.read.csv('test/rating.csv', header=True, schema=LOG_SCHEMA)

users_df_test = users_df_test.withColumnRenamed("user_idx", "user_id")
items_df_test = items_df_test.withColumnRenamed("item_idx", "item_id")
log_df_test = log_df_test.withColumnRenamed("user_idx", "user_id")
log_df_test = log_df_test.withColumnRenamed("item_idx", "item_id")

log_df_test = log_df_test.join(users_df_test, log_df_test['user_id'] == users_df_test['user_id'], 'leftsemi')
log_df_test = log_df_test.join(items_df_test, log_df_test['item_id'] == items_df_test['item_id'], 'leftsemi')

log_df_test = log_df_test.withColumn(
    'relevance', sf.when(sf.col('relevance') > 3, 1).otherwise(0))

log_df_test = log_df_test.cache()

print(log_df_test.count())

                                                                                

138493


                                                                                

27125




5000066


                                                                                

In [5]:
va = VectorAssembler(
    inputCols=users_df_train.columns[1:],
    outputCol='features'
)
pca = PCA(k=9, inputCol="features")
pca.setOutputCol("pca_features")
model = pca.fit(va.transform(users_df_train))
sum(model.explainedVariance)

                                                                                

0.9216818424353502

In [6]:
pca_users = model.transform(va.transform(users_df_train)).select(['user_id', 'pca_features'])
pca_users_val = model.transform(va.transform(users_df_val)).select(['user_id', 'pca_features'])

pca_users = (pca_users.withColumn('user_feature', vector_to_array('pca_features'))).select(['user_id'] + [col('user_feature')[i] for i in range(9)])
pca_users_val = (pca_users_val.withColumn('user_feature', vector_to_array('pca_features'))).select(['user_id'] + [col('user_feature')[i] for i in range(9)])

# Обучение генератора

Также в ноутбуке movielens_embeddings.ipynb был сделан вывод о том, что модель генерации GaussianCopula является наиболее подходящей в случае датасета MovieLens и эмбеддингов на основе PCA. Поэтому здесь будет использована именно эта модель.

In [7]:
items_generator = RealDataGenerator(label='items_real', seed=1234)

users_generator = SDVDataGenerator(
    label='synth',
    id_column_name='user_id',
    model_name='gaussiancopula',
    parallelization_level=4,
    device_name='cpu',
    seed=1234
)

users_generator.fit(pca_users.drop('user_id'))
items_generator.fit(items_df_train)

real_users = pca_users.sample(0.12)
syn_users = users_generator.generate(real_users.count())
_ = items_generator.generate(items_df_train.select('item_id').distinct().count())

train_df = log_df_val.join(pca_users, 'user_id', 'left')\
                     .join(items_df_train, 'item_id', 'left')\
                     .drop('timestamp')
train_df = train_df.na.drop()
train_df.count()

                                                                                

574910

In [8]:
item_svd = spark.read.csv('item_svd.csv', header=True, inferSchema=True)
user_svd = spark.read.csv('user_svd.csv', header=True, inferSchema=True)

train_df = train_df.join(item_svd, on='item_id', how='inner')
train_df = train_df.join(user_svd, on='user_id', how='inner')

test_df = log_df_test.join(pca_users_val, 'user_id', 'left')\
                     .join(items_df_val, 'item_id', 'left')\
                     .drop('timestamp')
test_df = test_df.na.drop()

test_df = test_df.join(item_svd, on='item_id', how='inner')
test_df = test_df.join(user_svd, on='user_id', how='inner')

test_df.count()

                                                                                

89829

In [9]:
print(train_df.count())
print(train_df.select('user_id').distinct().count())
print(train_df.select('item_id').distinct().count())
print(train_df.count() / (train_df.select('user_id').distinct().count() * train_df.select('item_id').distinct().count()))
print()

print(test_df.count())
print(test_df.select('user_id').distinct().count())
print(test_df.select('item_id').distinct().count())
print(test_df.count() / (test_df.select('user_id').distinct().count() * test_df.select('item_id').distinct().count()))

                                                                                ]

402391


                                                                                

4207


                                                                                

7816


                                                                                

0.012237457481182967



                                                                                

89829


                                                                                

1709


                                                                                

6748




0.007789317893380107


                                                                                

In [12]:
va = VectorAssembler(
    inputCols=pca_users.columns[1:] + items_df_train.columns[1:],
    outputCol='features'
)

fm = FMClassifier(
    featuresCol='features',
    labelCol='relevance',
    probabilityCol='proba'
)

rf = RandomForestClassifier(
    featuresCol='features',
    labelCol='relevance',
    probabilityCol='proba'
)

lr = LogisticRegression(
    featuresCol='features',
    labelCol='relevance',
    probabilityCol='proba'
)

lr_model = lr.fit(va.transform(train_df))
rf_model = rf.fit(va.transform(train_df))
fm_model = fm.fit(va.transform(train_df))
vee = VectorElementExtractor(inputCol='proba', outputCol='scores', index=1)
mc = ParametricResponseFunction(inputCols=['scores'], outputCol='__pr', weights=[0.25])
br = BernoulliResponse(inputCol='__pr', outputCol='response', seed=1234)
pipeline_lr = PipelineModel(stages=[va, lr_model, vee, mc, br])
pipeline_rf = PipelineModel(stages=[va, rf_model, vee, mc, br])
pipeline_fm = PipelineModel(stages=[va, fm_model, vee, mc, br])

noise_resp = NoiseResponse(mu=0.5, sigma=0.2, outputCol='__noise')
br = BernoulliResponse(inputCol='__noise', outputCol='response')
pipeline_rand = PipelineModel(stages=[noise_resp, br])



In [14]:
def get_baseline(test_df):
    test_df = test_df.toPandas()
    test_df['baseline'] = (test_df['item_rating_avg']-test_df['item_rating_avg'].min())/(test_df['item_rating_avg'].max()-test_df['item_rating_avg'].min())
    test_df['baseline_bin'] = np.where(test_df['baseline'] > 0.5, 1, 0) 
    print(f"ROC AUC (baseline): {roc_auc_score(test_df.relevance, test_df.baseline)}")
    print(f"Precision (baseline): {precision_score(test_df.relevance, test_df.baseline_bin)}")
    print(f"Recall (baseline): {recall_score(test_df.relevance, test_df.baseline_bin)}")
    print(f"Accuracy (baseline): {accuracy_score(test_df.relevance, test_df.baseline_bin)}")
    print(f"F1 (baseline): {f1_score(test_df.relevance, test_df.baseline_bin)}")
    print()

def assess_models(model, test_df):
    pred_df = model.transform(test_df).select("user_id", "item_id", "relevance", "scores").toPandas()
    pred_df['response_bin'] = np.where(pred_df['scores'] > 0.5, 1, 0)
    print(f"ROC AUC (classificator): {roc_auc_score(pred_df.relevance, pred_df.scores)}")
    print(f"Precision (classificator): {precision_score(pred_df.relevance, pred_df.response_bin)}")
    print(f"Recall (classificator): {recall_score(pred_df.relevance, pred_df.response_bin)}")
    print(f"Accuracy (classificator): {accuracy_score(pred_df.relevance, pred_df.response_bin)}")
    print(f"F1 (classificator): {f1_score(pred_df.relevance, pred_df.response_bin)}")
    print()

get_baseline(test_df)
assess_models(pipeline_lr, test_df)
assess_models(pipeline_rf, test_df)
assess_models(pipeline_fm, test_df)

                                                                                ]]

ROC AUC (baseline): 0.722652072451
Precision (baseline): 0.6595070712780113
Recall (baseline): 0.9842190355686771
Accuracy (baseline): 0.6681138607799263
F1 (baseline): 0.7897902344438569



                                                                                ]]

ROC AUC (classificator): 0.7605809344563693
Precision (classificator): 0.7461095100864553
Recall (classificator): 0.8644559257697174
Accuracy (classificator): 0.7277939195582718
F1 (classificator): 0.8009345946562026





ROC AUC (classificator): 0.7233617927263982
Precision (classificator): 0.680325600120779
Recall (classificator): 0.9502846900042177
Accuracy (classificator): 0.6856471740751873
F1 (classificator): 0.7929583247793061



                                                                                

В качестве бейзлайна будем использовать рекомендательную систему, которая предлагает пользователю случайные предложения.

In [15]:
indexer = Indexer(user_col='user_id', item_col='item_id')
indexer.fit(users=users_generator.sample(1.0), items=items_df_train)

dummy_log = pandas_to_spark(pd.DataFrame({'user_id' : [1], 'item_id' : [1], 'relevance' : [0.0]}))

ucb_lr = UCB()
ucb_lr.fit(indexer.transform(dummy_log))

ucb_rf = UCB()
ucb_rf.fit(indexer.transform(dummy_log))

ucb_fm = UCB()
ucb_fm.fit(indexer.transform(dummy_log))

ucb_rand = UCB()
ucb_rand.fit(indexer.transform(dummy_log))

                                                                                

In [16]:
evaluator = EvaluateMetrics(
    userKeyCol='user_id',
    itemKeyCol='item_id',
    predictionCol='relevance',
    labelCol='response',
    replay_label_filter=1.0,
    replay_metrics={NDCG() : 5, Precision() : 5, RocAuc(): 5}
)

In [19]:
def calc_metric(response_df):
    return (response_df
            .groupBy("user_id").agg(sf.sum("response").alias("num_positive"))
            .select(sf.mean("num_positive")).collect()[0][0]
           )

In [20]:
def do_a_cycle(simul, model, pipeline, iteration, metrics):
    users = simul.sample_users(0.5).dropna().cache()
    items = simul.sample_items(0.2).dropna().cache()
    log = simul.get_log(user_df=users)
    log = dummy_log if log is None else log
    log = log.cache()

    recs = model.predict(
        log=indexer.transform(log),
        k=5,
        users=indexer.transform(users),
        items=indexer.transform(items),
        filter_seen_items=True
    )
    recs = indexer.inverse_transform(recs).cache()

    resp = simul.sample_responses(
        recs_df=recs,
        user_features=users,
        item_features=items,
        action_models=pipeline
    ).select('user_id', 'item_id', 'relevance', 'response').cache()
    simul.update_log(resp, iteration=iteration)

    metrics.append([evaluator(resp), calc_metric(resp)])

    model._clear_cache()
    ucb_train_log = simul.log.cache()
    model.fit(log=indexer.transform(
        ucb_train_log.select('user_id', 'item_id', 'response').withColumnRenamed('response', 'relevance')
    ))

    log.unpersist()
    users.unpersist()
    items.unpersist()
    recs.unpersist()
    resp.unpersist()
    ucb_train_log.unpersist()

sim_lr = Simulator(users_generator, items_generator, f'checkpoints/lr', None, 'user_id', 'item_id', spark)
sim_rf = Simulator(users_generator, items_generator, f'checkpoints/rf', None, 'user_id', 'item_id', spark)
sim_fm = Simulator(users_generator, items_generator, f'checkpoints/fm', None, 'user_id', 'item_id', spark)
sim_rand = Simulator(users_generator, items_generator, f'checkpoints/rand', None, 'user_id', 'item_id', spark)

lr_metrics = []
rf_metrics = []
fm_metrics = []
rnd_metrics = []

for i in range(50):
    print(f'------------------------Stage {i}------------------------')
    start_iter = time.time()

    do_a_cycle(sim_lr, ucb_lr, pipeline_lr, i, lr_metrics)
    do_a_cycle(sim_rf, ucb_rf, pipeline_rf, i, rf_metrics)
    do_a_cycle(sim_fm, ucb_fm, pipeline_fm, i, fm_metrics)
    do_a_cycle(sim_rand, ucb_rand, pipeline_rand, i, rnd_metrics)
    
    end_iter = time.time()
    print(f"Time of {i+1} iteration: ")
    print(end_iter - start_iter)

------------------------Stage 0------------------------


                                                                                