In [1]:
import pandas as pd
import numpy as np

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf

# Для работы с матрицами
from scipy.sparse import csr_matrix, coo_matrix

# Матричная факторизация
from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import bm25_weight, tfidf_weight


# Функции из 1-ого вебинара
import os, sys

module_path = os.path.abspath(os.path.join(os.pardir))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from best_rec_lib.metrics import precision_at_k, ap_k, recall_at_k
from best_rec_lib.utils import prefilter_items

import warnings
warnings.filterwarnings("ignore")

In [2]:
data = pd.read_csv('../retail_train.csv')
item_features = pd.read_csv('../product.csv')
user_features = pd.read_csv('../hh_demographic.csv')

# column processing
item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)

# train test split
test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.6,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.0,1631,1,0.0,0.0


In [3]:
n_items_before = data_train['item_id'].nunique()

data_train = prefilter_items(data_train, 5000, item_features)

n_items_after = data_train['item_id'].nunique()
print('Decreased # items from {} to {}'.format(n_items_before, n_items_after))

Decreased # items from 86865 to 5001


In [4]:
user_item_matrix = pd.pivot_table(data_train, 
                                  index='user_id', columns='item_id', 
                                  values='quantity', # Можно пробоват ьдругие варианты
                                  aggfunc='count', 
                                  fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float) # необходимый тип матрицы для implicit

# переведем в формат saprse matrix
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.head(2)

item_id,117847,279994,818981,819255,819308,819400,819487,819590,819594,819840,...,15926775,15926844,15926886,15972074,15972298,15972565,15972790,16100266,16729299,16729415
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [5]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]

In [6]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.head(2)

Unnamed: 0,user_id,actual
0,1,"[856942, 865456, 951954, 971585, 979707, 99065..."
1,3,[920626]


In [7]:
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

##### Spark

In [8]:
session = (SparkSession.builder.config('spark.executor.memory', "1500mb")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")
    .config('spark.executor.instances', 4)
    .config('spark.executor.cores', 4)
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .master("local[*]")
    .enableHiveSupport()
    .getOrCreate()
    )

In [9]:
session

In [10]:
# session.stop()

In [11]:
df_to_spark = data_train[["user_id", "item_id", "quantity"]]

In [12]:
spark_data_train = session.createDataFrame(df_to_spark)

In [13]:
spark_data_train = spark_data_train.withColumnRenamed("quantity", "relevance")

In [14]:
def get_recommendations_spark(df, N=5):
    spam = list(df.item_id)
    if 999999 in spam:
        spam.remove(999999)
    
    return spam[:N]

In [15]:
def als_spark(rec_met, result, spark_data_train, factors, reg_st, iterations, alpha, N=5):
    i = 1
    for fact in factors:
        for reg in reg_st:
            for itrn in iterations:
                for alp in alpha:
                    
                    print(f'{i}/{len(factors)*len(reg_st)*len(iterations)*len(alpha)}', fact, reg, itrn, alp)

                    model = ALS(rank=fact, 
                                userCol="user_id",
                                itemCol="item_id", 
                                ratingCol="relevance", 
                                implicitPrefs=True, # тип ALS
                                coldStartStrategy="drop",
                                maxIter=itrn, 
                                alpha=alp, 
                                regParam=reg,
                                seed=42
                    ).fit(spark_data_train)

                    # предсказания для всех пользователей
                    recs_als = model.recommendForAllUsers(6)

                    # Разворачиваем рекомендации через функцию explode
                    recs_als = (recs_als
                                .withColumn("recommendations", sf.explode("recommendations"))
                                .withColumn("item_id", sf.col("recommendations.item_id"))
                                .withColumn("relevance", sf.col("recommendations.rating").cast(DoubleType()),)
                                .select("user_id", "item_id", "relevance")
                        )

                    recs_als = recs_als.toPandas()
                    users = list(set(recs_als.user_id))
                    new_test_users = set(result['user_id']) - set(users)
                    
                    if new_test_users:
                        result = result[~result['user_id'].isin(new_test_users)]


                    result[f'als_spark_{fact}_{reg}_{itrn}'] = result['user_id'].map(lambda x: get_recommendations_spark(recs_als.loc[recs_als['user_id'] == x], N))
                    rec_met[f'als_spark_{fact}_{reg}_{itrn}'] = result.apply(lambda row: recall_at_k(row[f'als_spark_{fact}_{reg}_{itrn}'], row['actual'], N), axis=1).mean()
                    
                    i += 1
                    
    return result, rec_met

In [16]:
# Словарь метрик
rec_met = dict()

# Списки гиперпараметров для моделей
factors = [200, 250, 300, 350, 500]
reg_st = [0.03, 0.05, 0.08]
alpha = [0.05, 0.1, 0.25, 0.5]
iterations = [5, 8, 10, 15, 20]

In [17]:
%%time
result, rec_met = als_spark(rec_met, result, spark_data_train, factors, reg_st, iterations, alpha, N=500)

1/300 200 0.03 5 0.05
2/300 200 0.03 5 0.1
3/300 200 0.03 5 0.25
4/300 200 0.03 5 0.5
5/300 200 0.03 8 0.05
6/300 200 0.03 8 0.1
7/300 200 0.03 8 0.25
8/300 200 0.03 8 0.5
9/300 200 0.03 10 0.05
10/300 200 0.03 10 0.1
11/300 200 0.03 10 0.25
12/300 200 0.03 10 0.5
13/300 200 0.03 15 0.05
14/300 200 0.03 15 0.1
15/300 200 0.03 15 0.25
16/300 200 0.03 15 0.5
17/300 200 0.03 20 0.05
18/300 200 0.03 20 0.1
19/300 200 0.03 20 0.25
20/300 200 0.03 20 0.5
21/300 200 0.05 5 0.05
22/300 200 0.05 5 0.1
23/300 200 0.05 5 0.25
24/300 200 0.05 5 0.5
25/300 200 0.05 8 0.05
26/300 200 0.05 8 0.1
27/300 200 0.05 8 0.25
28/300 200 0.05 8 0.5
29/300 200 0.05 10 0.05
30/300 200 0.05 10 0.1
31/300 200 0.05 10 0.25
32/300 200 0.05 10 0.5
33/300 200 0.05 15 0.05
34/300 200 0.05 15 0.1
35/300 200 0.05 15 0.25
36/300 200 0.05 15 0.5
37/300 200 0.05 20 0.05
38/300 200 0.05 20 0.1
39/300 200 0.05 20 0.25
40/300 200 0.05 20 0.5
41/300 200 0.08 5 0.05
42/300 200 0.08 5 0.1
43/300 200 0.08 5 0.25
44/300 200 0.08 5

Py4JJavaError: An error occurred while calling o24462.fit.
: org.apache.spark.SparkException: Job 6497 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1188)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1186)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1186)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2887)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2784)
	at org.apache.spark.SparkContext.$anonfun$stop$11(SparkContext.scala:2095)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2095)
	at org.apache.spark.SparkContext.$anonfun$new$35(SparkContext.scala:660)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1200)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1193)
	at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1777)
	at org.apache.spark.ml.recommendation.ALS$.computeFactors(ALS.scala:1699)
	at org.apache.spark.ml.recommendation.ALS$.$anonfun$train$8(ALS.scala:1015)
	at org.apache.spark.ml.recommendation.ALS$.$anonfun$train$8$adapted(ALS.scala:1011)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1011)
	at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:722)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:704)
	at sun.reflect.GeneratedMethodAccessor210.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)


In [None]:
result

In [18]:
# data = [rec_met]
data = pd.DataFrame(rec_met, index =['Recall@k'])
data = data.T

In [23]:
data.sort_values("Recall@k", ascending=False)

Unnamed: 0,Recall@k
als_spark_350_0.03_5,0.103738
als_spark_350_0.03_10,0.103149
als_spark_350_0.03_8,0.103097
als_spark_350_0.05_8,0.103009
als_spark_350_0.05_10,0.102947
als_spark_350_0.05_15,0.102747
als_spark_300_0.03_20,0.102709
als_spark_350_0.03_15,0.102623
als_spark_300_0.03_5,0.102584
als_spark_350_0.03_20,0.102512


In [20]:
# Лучшие параметры по метрике Recall@5
data[data['Recall@k'] == data['Recall@k'].max()]

Unnamed: 0,Recall@k
als_spark_350_0.03_5,0.103738


In [63]:
session.stop()