
## Домашнее задание №7

    Выбрать по 6 предсказаний для пользователей
    Удалить фейковый элемент
    Оставить 5 остортированных предсказаний для каждого пользователя
    Посчитать метрику (map@5, precision@5)

Определите сильно ли отличается качество ALS из implicit и pyspark, сравнивайте по метрикам map@5, precision@5


In [1]:
import pandas as pd
import numpy as np

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf

# Для работы с матрицами
from scipy.sparse import csr_matrix, coo_matrix

# Матричная факторизация
from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import bm25_weight, tfidf_weight


# Функции из 1-ого вебинара
import os, sys

module_path = os.path.abspath(os.path.join(os.pardir))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from best_rec_lib.metrics import precision_at_k, ap_k, recall_at_k
from best_rec_lib.utils import prefilter_items

import warnings
warnings.filterwarnings("ignore")

In [2]:
data = pd.read_csv('retail_train.csv')
item_features = pd.read_csv('product.csv')
user_features = pd.read_csv('hh_demographic.csv')

# column processing
item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)

# train test split
test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.6,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.0,1631,1,0.0,0.0


In [3]:
n_items_before = data_train['item_id'].nunique()

data_train = prefilter_items(data_train, 5000, item_features)

n_items_after = data_train['item_id'].nunique()

In [4]:
user_item_matrix = pd.pivot_table(data_train, 
                                  index='user_id', columns='item_id', 
                                  values='quantity', # Можно пробоват ьдругие варианты
                                  aggfunc='count', 
                                  fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float) # необходимый тип матрицы для implicit

# переведем в формат saprse matrix
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.head(2)

item_id,25671,26081,26093,26190,26355,26426,26540,26601,26636,26691,...,17328742,17329473,17329749,17330255,17330511,17381856,17382205,17383227,17827644,17829232
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [5]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]

In [6]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.head(2)

Unnamed: 0,user_id,actual
0,1,"[821867, 834484, 856942, 865456, 889248, 90795..."
1,3,"[835476, 851057, 872021, 878302, 879948, 90963..."


In [7]:
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

In [8]:
session = (SparkSession.builder.config('spark.executor.memory', "1500mb")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")
    .config('spark.executor.instances', 2)
    .config('spark.executor.cores', 2)
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .master("local[*]")
    .enableHiveSupport()
    .getOrCreate()
    )

In [9]:
session

In [10]:
df_to_spark = data_train[["user_id", "item_id", "quantity"]]

In [11]:
spark_data_train = session.createDataFrame(df_to_spark)

In [12]:
spark_data_train = spark_data_train.withColumnRenamed("quantity", "relevance")

In [13]:
def get_recommendations_spark(df, N=5):
    spam = list(df.item_id)
    if 999999 in spam:
        spam.remove(999999)
    
    return spam[:N]

In [14]:
def als_spark(prec_met, map_met, result, spark_data_train, factors, reg_st, iterations, alpha, N=5):
    i = 1
    for fact in factors:
        for reg in reg_st:
            for itrn in iterations:
                for alp in alpha:
                    
                    print(f'{i}/{len(factors)*len(reg_st)*len(iterations)*len(alpha)}', fact, reg, itrn, alp)

                    model = ALS(rank=fact, 
                                userCol="user_id",
                                itemCol="item_id", 
                                ratingCol="relevance", 
                                implicitPrefs=True, # тип ALS
                                coldStartStrategy="drop",
                                maxIter=itrn, 
                                alpha=alp, 
                                regParam=reg,
                                seed=42
                    ).fit(spark_data_train)

                    # предсказания для всех пользователей
                    recs_als = model.recommendForAllUsers(6)

                    # Разворачиваем рекомендации через функцию explode
                    recs_als = (recs_als
                                .withColumn("recommendations", sf.explode("recommendations"))
                                .withColumn("item_id", sf.col("recommendations.item_id"))
                                .withColumn("relevance", sf.col("recommendations.rating").cast(DoubleType()),)
                                .select("user_id", "item_id", "relevance")
                        )

                    recs_als = recs_als.toPandas()
                    users = list(set(recs_als.user_id))
                    new_test_users = set(result['user_id']) - set(users)
                    
                    if new_test_users:
                        result = result[~result['user_id'].isin(new_test_users)]


                    result[f'als_spark_{fact}_{reg}_{alp}_{itrn}'] = result['user_id'].map(lambda x: get_recommendations_spark(recs_als.loc[recs_als['user_id'] == x], N))
                    prec_met[f'als_spark_{fact}_{reg}_{alp}_{itrn}'] = result.apply(lambda row: precision_at_k(row[f'als_spark_{fact}_{reg}_{alp}_{itrn}'], row['actual']), axis=1).mean()
                    map_met[f'als_spark_{fact}_{reg}_{alp}_{itrn}'] = result.apply(lambda row: ap_k(row[f'als_spark_{fact}_{reg}_{alp}_{itrn}'], row['actual']), axis=1).mean()
                    
                    i += 1
                    
    return result, prec_met, map_met

In [15]:
# Словари метрик
prec_met = dict()
map_met = dict()

# Списки гиперпараметров для моделей
factors = [50, 150, 200]
reg_st = [0.005, 0.01, 0.03]
alpha = [0.25, 0.5, 1.0, 1.5]
iterations = [1, 2, 5, 8, 10]

In [16]:
%%time
result, prec_met, map_met = als_spark(prec_met, map_met, result, spark_data_train, factors, reg_st, iterations, alpha, N=5)

1/180 50 0.005 1 0.25
2/180 50 0.005 1 0.5
3/180 50 0.005 1 1.0
4/180 50 0.005 1 1.5
5/180 50 0.005 2 0.25
6/180 50 0.005 2 0.5
7/180 50 0.005 2 1.0
8/180 50 0.005 2 1.5
9/180 50 0.005 5 0.25
10/180 50 0.005 5 0.5
11/180 50 0.005 5 1.0
12/180 50 0.005 5 1.5
13/180 50 0.005 8 0.25
14/180 50 0.005 8 0.5
15/180 50 0.005 8 1.0
16/180 50 0.005 8 1.5
17/180 50 0.005 10 0.25
18/180 50 0.005 10 0.5
19/180 50 0.005 10 1.0
20/180 50 0.005 10 1.5
21/180 50 0.01 1 0.25
22/180 50 0.01 1 0.5
23/180 50 0.01 1 1.0
24/180 50 0.01 1 1.5
25/180 50 0.01 2 0.25
26/180 50 0.01 2 0.5
27/180 50 0.01 2 1.0
28/180 50 0.01 2 1.5
29/180 50 0.01 5 0.25
30/180 50 0.01 5 0.5
31/180 50 0.01 5 1.0
32/180 50 0.01 5 1.5
33/180 50 0.01 8 0.25
34/180 50 0.01 8 0.5
35/180 50 0.01 8 1.0
36/180 50 0.01 8 1.5
37/180 50 0.01 10 0.25
38/180 50 0.01 10 0.5
39/180 50 0.01 10 1.0
40/180 50 0.01 10 1.5
41/180 50 0.03 1 0.25
42/180 50 0.03 1 0.5
43/180 50 0.03 1 1.0
44/180 50 0.03 1 1.5
45/180 50 0.03 2 0.25
46/180 50 0.03 2 0.5
47/

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\Alexis\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\Alexis\Anaconda3\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\Alexis\Anaconda3\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [17]:
data = [prec_met, map_met]
data = pd.DataFrame(data, index =['Precision@k', 'MAP@k'])
data = data.T

In [18]:
data.sort_values('Precision@k', ascending=False)

Unnamed: 0,Precision@k,MAP@k
als_spark_200_0.01_0.25_2,0.360020,0.293402
als_spark_200_0.005_0.25_2,0.358550,0.292600
als_spark_200_0.01_0.25_5,0.356492,0.287865
als_spark_200_0.005_0.25_5,0.355512,0.286577
als_spark_200_0.01_0.25_8,0.355316,0.286799
...,...,...
als_spark_50_0.01_1.0_1,0.142871,0.072796
als_spark_50_0.03_1.0_1,0.120039,0.054973
als_spark_50_0.005_1.5_1,0.110534,0.052252
als_spark_50_0.01_1.5_1,0.106712,0.049520


In [19]:
data.sort_values('MAP@k', ascending=False)

Unnamed: 0,Precision@k,MAP@k
als_spark_200_0.01_0.25_2,0.360020,0.293402
als_spark_200_0.005_0.25_2,0.358550,0.292600
als_spark_200_0.01_0.25_5,0.356492,0.287865
als_spark_200_0.01_0.25_8,0.355316,0.286799
als_spark_200_0.005_0.25_5,0.355512,0.286577
...,...,...
als_spark_50_0.01_1.0_1,0.142871,0.072796
als_spark_50_0.03_1.0_1,0.120039,0.054973
als_spark_50_0.005_1.5_1,0.110534,0.052252
als_spark_50_0.01_1.5_1,0.106712,0.049520


In [20]:
# Лучшие параметры по метрике Precision@5
data[data['Precision@k'] == data['Precision@k'].max()]

Unnamed: 0,Precision@k,MAP@k
als_spark_200_0.01_0.25_2,0.36002,0.293402


In [21]:
# Лучшие параметры по метрике MAP@5
data[data['MAP@k'] == data['MAP@k'].max()]

Unnamed: 0,Precision@k,MAP@k
als_spark_200_0.01_0.25_2,0.36002,0.293402


In [None]:
# c factors = 200 самые высокие метрики, но GridSearch считается вечность:( -- пришлось остановить

In [15]:
# Словари метрик
prec_met = dict()
map_met = dict()

# Списки гиперпараметров для моделей
factors = [250]#[250, 300]
reg_st = [0.01]#[0.01, 0.05]
alpha = [0.05]#[0.05, 0.1]
iterations = [2]

In [16]:
%%time
result, prec_met, map_met = als_spark(prec_met, map_met, result, spark_data_train, factors, reg_st, iterations, alpha, N=5)

1/1 250 0.01 2 0.05
Wall time: 4min 38s


In [17]:
data = [prec_met, map_met]
data = pd.DataFrame(data, index =['Precision@k', 'MAP@k'])
data = data.T

In [18]:
data.sort_values('Precision@k', ascending=False)

Unnamed: 0,Precision@k,MAP@k
als_spark_250_0.01_0.05_2,0.372562,0.309002


In [19]:
data.sort_values('MAP@k', ascending=False)

Unnamed: 0,Precision@k,MAP@k
als_spark_250_0.01_0.05_2,0.372562,0.309002


In [20]:
# Лучшие параметры по метрике Precision@5
data[data['Precision@k'] == data['Precision@k'].max()]

Unnamed: 0,Precision@k,MAP@k
als_spark_250_0.01_0.05_2,0.372562,0.309002


In [21]:
# Лучшие параметры по метрике MAP@5
data[data['MAP@k'] == data['MAP@k'].max()]

Unnamed: 0,Precision@k,MAP@k
als_spark_250_0.01_0.05_2,0.372562,0.309002


In [22]:
session.stop()

In [23]:
def get_recommendations(user, model, sparse_user_item, N=5):
    res = [id_to_itemid[rec] for rec in 
                    model.recommend(userid=userid_to_id[user], # можно вставить несколько пользователей
                                    user_items=sparse_user_item[userid_to_id[user]],   # на вход user-item matrix
                                    N=N, 
                                    filter_already_liked_items=False, 
                                    filter_items=[itemid_to_id[999999]], 
                                    recalculate_user=True)[0]] # recalculate_user=True это для перерасчета для новых users
    return res

In [24]:
%%time
bm25_user_item_matrix = bm25_weight(user_item_matrix.T).T.tocsr()

model = AlternatingLeastSquares(factors=250, 
                regularization=0.05,
                iterations=2,
                calculate_training_loss=True, 
                random_state=42)

model.fit(bm25_user_item_matrix, show_progress=False)

Wall time: 3.58 s


In [25]:
test_users = result.shape[0]
new_test_users = len(set(data_test['user_id']) - set(data_train['user_id']))

print('В тестовом дата сете {} юзеров'.format(test_users))
print('В тестовом дата сете {} новых юзеров'.format(new_test_users))

В тестовом дата сете 2041 юзеров
В тестовом дата сете 0 новых юзеров


In [26]:
# уберем пользователей, которых нет в трейне
new_test_users = set(data_test['user_id']) - set(data_train['user_id'])
result = result[~result['user_id'].isin(new_test_users)]

In [27]:
result['als_bm25_T'] = result['user_id'].map(lambda x: get_recommendations(x, model, sparse_user_item, 5))
print('Precision@k: ', result.apply(lambda row: precision_at_k(row[f'als_bm25_T'], row['actual']), axis=1).mean())
print('MAP@k: ', result.apply(lambda row: ap_k(row[f'als_bm25_T'], row['actual']), axis=1).mean())

KeyError: 999999

Вывод: модель ALS Spark показала значительно лучшие результаты