#  Вебинар 7. Рекомендательные системы в бизнесе
## Домашнее задание

Установить pySpark и попробовать параметры для ALS.

1. Выбрать по 6 предсказаний для пользователей
2. Удалить фейковый элемент
3. Оставить 5 отсортированных предсказаний для каждого пользователя
4. Посчитать метрики (map@5, precision@5)

Определите сильно ли отличается качество ALS из implicit и pyspark, сравнивайте по метрикам map@5, precision@5

In [1]:
# ! pip install pyspark
# ! pip install pyarrow

In [2]:
import pandas as pd
import numpy as np

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf


# Для работы с матрицами
from scipy.sparse import csr_matrix, coo_matrix


# Матричная факторизация
from implicit.als import AlternatingLeastSquares
# from implicit.nearest_neighbours import bm25_weight, tfidf_weight

import os, sys

module_path = os.path.abspath(os.path.join(os.pardir))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from best_lib.metrics import precision_at_k, ap_k
from best_lib.utils import prefilter_items


In [3]:
PATH_DATA = "../data"
# PATH_DATA = "/content/drive/MyDrive/Colab Notebooks/21_рекомендательные_системы/data"

In [4]:
data = pd.read_csv(os.path.join(PATH_DATA,'retail_train.csv'))
item_features = pd.read_csv(os.path.join(PATH_DATA,'product.csv'))
user_features = pd.read_csv(os.path.join(PATH_DATA,'hh_demographic.csv'))

In [5]:
# column processing
item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)

In [6]:
# Используем train-test split по времени, а не случайно. Возьмем последние 3 недели в качестве теста

test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.6,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.0,1631,1,0.0,0.0


In [7]:
n_items_before = data_train['item_id'].nunique()

# data_train = prefilter_items(data_train, 5000, item_features)

n_items_after = data_train['item_id'].nunique()
print('Decreased # items from {} to {}'.format(n_items_before, n_items_after))

Decreased # items from 86865 to 86865


In [8]:
popularity = data_train.groupby('item_id')['quantity'].sum().reset_index()
popularity.rename(columns={'quantity': 'n_sold'}, inplace=True)

top_5000 = popularity.sort_values('n_sold', ascending=False).head(5000).item_id.tolist()

In [9]:
# Заведем фиктивный item_id (если юзер покупал товары из топ-5000, то он "купил" такой товар)

data_train.loc[~data_train['item_id'].isin(top_5000), 'item_id'] = 999999

user_item_matrix = pd.pivot_table(data_train,
                                 index='user_id',
                                 columns='item_id',
                                 values='quantity', # Можно пробовать другие варианты
                                 aggfunc='count',
                                 fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float) # необходимый тип матрицы для implicit

# переведем в формат saprse matrix (разреженной матрицы)

sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.head(3)

item_id,202291,397896,420647,480014,545926,707683,731106,818980,819063,819227,...,15778533,15831255,15926712,15926775,15926844,15926886,15927403,15927661,15927850,16809471
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [10]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]

In [11]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns = ['user_id', 'actual']
result.head(2)

Unnamed: 0,user_id,actual
0,1,"[821867, 834484, 856942, 865456, 914190, 95804..."
1,3,"[851057, 872021, 878302, 879948, 909638, 91320..."


In [12]:
# заведём словари между соответствующими идентификаторами user item 
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

## SparkSession

In [13]:
session = (
    SparkSession.builder.config("spark.driver.memory", "8g")
    .config("spark.sql.shuffle.partitions", "50")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")
    .master("local[*]")
    .enableHiveSupport()
    .getOrCreate()
)

22/12/25 02:32:34 WARN Utils: Your hostname, gans-System resolves to a loopback address: 127.0.1.1; using 192.168.1.71 instead (on interface wlp6s1)
22/12/25 02:32:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/25 02:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
session

In [15]:
spark_data_train=session.createDataFrame(data_train[["user_id", "item_id", "quantity"]])

In [16]:
spark_data_train=spark_data_train.withColumnRenamed("quantity", "relevance")

In [17]:
spark_data_train.show(10)

22/12/25 02:33:19 WARN TaskSetManager: Stage 0 contains a task of very large size (3321 KiB). The maximum recommended task size is 1000 KiB.


[Stage 0:>                                                          (0 + 1) / 1]

+-------+-------+---------+
|user_id|item_id|relevance|
+-------+-------+---------+
|   2375|1004906|        1|
|   2375|1033142|        1|
|   2375|1036325|        1|
|   2375|1082185|        1|
|   2375|8160430|        1|
|   2375| 826249|        2|
|   2375| 999999|        1|
|   2375|1085983|        1|
|   2375| 999999|        1|
|   2375| 999999|        1|
+-------+-------+---------+
only showing top 10 rows



                                                                                

In [18]:
model = ALS(
    rank=30,
    userCol="user_id",
    itemCol="item_id",
    ratingCol="relevance", 
    maxIter = 10,
    alpha = 1.0,
    regParam = 0.1,
    implicitPrefs=True,
    seed=42,
    coldStartStrategy="drop",
).fit(spark_data_train)

22/12/25 02:33:20 WARN TaskSetManager: Stage 1 contains a task of very large size (3321 KiB). The maximum recommended task size is 1000 KiB.
22/12/25 02:33:21 WARN TaskSetManager: Stage 2 contains a task of very large size (3321 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [19]:
model.userFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.042885095, -0...|
| 20|[0.02070611, -0.5...|
| 30|[-0.07214545, -0....|
| 40|[-0.09899722, -0....|
| 50|[-0.020767767, -0...|
| 60|[0.024843518, -0....|
| 70|[-0.083744325, -0...|
| 80|[-0.12929422, -0....|
| 90|[-0.098686494, -0...|
|100|[0.05294045, -0.3...|
|110|[-0.08737936, -0....|
|120|[-0.029122217, -0...|
|130|[-0.10858478, -0....|
|140|[-0.061264124, -0...|
|150|[-0.04698381, -0....|
|160|[-0.17012869, -0....|
|170|[0.0129210865, -0...|
|180|[-0.077923544, -0...|
|190|[-0.033504475, -0...|
|200|[-0.0054225125, -...|
+---+--------------------+
only showing top 20 rows



In [20]:
# users = pd.DataFrame([1, 2])
# users

In [21]:
# session.createDataFrame(users).show()

In [22]:
# recs_als = model.recommendForUserSubset(session.createDataFrame(users).withColumnRenamed("0", "user_id"),6)

In [23]:
recs_als = model.recommendForAllUsers(6)

In [24]:
recs_als.show()



+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      3|[{999999, 1.55052...|
|      4|[{999999, 1.56255...|
|      5|[{999999, 1.68301...|
|      7|[{999999, 1.74501...|
|      8|[{999999, 1.79657...|
|     11|[{999999, 0.89170...|
|     20|[{1404121, 2.0185...|
|     23|[{999999, 1.77064...|
|     28|[{999999, 1.73617...|
|     31|[{999999, 1.70128...|
|     32|[{999999, 1.73797...|
|     33|[{999999, 1.49812...|
|     34|[{999999, 1.35699...|
|     39|[{999999, 1.77272...|
|     40|[{1082185, 1.8518...|
|     48|[{999999, 1.62534...|
|     49|[{6544236, 1.8535...|
|     51|[{999999, 1.67037...|
|     53|[{999999, 1.71819...|
|     54|[{999999, 0.93344...|
+-------+--------------------+
only showing top 20 rows



                                                                                

In [25]:
recs_als=(recs_als.withColumn(
                    "recommendations", sf.explode("recommendations"))
              .withColumn("item_id", sf.col("recommendations.item_id"))
              .withColumn("relevance", sf.col("recommendations.rating").cast(DoubleType()),)
              .select("user_id", "item_id", "relevance")
         ) 

In [26]:
recs_als.show()



+-------+-------+------------------+
|user_id|item_id|         relevance|
+-------+-------+------------------+
|      3| 999999|1.5505210161209106|
|      3|1029743| 1.469294786453247|
|      3|1106523|1.4576222896575928|
|      3|1082185| 1.422029972076416|
|      3| 951590|1.4106340408325195|
|      3| 883404|1.3670177459716797|
|      4| 999999|1.5625593662261963|
|      4|1082185|1.3673204183578491|
|      4|1029743| 1.233508825302124|
|      4| 995242|1.1131083965301514|
|      4|1106523|1.1031150817871094|
|      4| 981760|1.0232075452804565|
|      5| 999999|1.6830198764801025|
|      5|1082185|1.4448405504226685|
|      5|1029743|1.2216860055923462|
|      5|6534178|1.2168898582458496|
|      5| 995242|1.1506799459457397|
|      5|1106523|1.0580171346664429|
|      7| 999999|1.7450158596038818|
|      7|1082185|1.6288162469863892|
+-------+-------+------------------+
only showing top 20 rows



                                                                                

In [27]:
df = recs_als.toPandas()
df = df[df.item_id != 999999]
df.drop(['relevance'], axis=1)
df = df.groupby('user_id')['item_id'].unique().reset_index()
df.columns=['user_id', 'item_id']
df.head(10)

                                                                                

Unnamed: 0,user_id,item_id
0,1,"[1082185, 1029743, 981760, 995242, 862349]"
1,2,"[1082185, 1029743, 1106523, 995242, 981760]"
2,3,"[1029743, 1106523, 1082185, 951590, 883404]"
3,4,"[1082185, 1029743, 995242, 1106523, 981760]"
4,5,"[1082185, 1029743, 6534178, 995242, 1106523]"
5,6,"[1082185, 1026118, 1051516, 854852, 878996]"
6,7,"[1082185, 1029743, 995242, 1106523, 981760]"
7,8,"[1082185, 1029743, 1106523, 981760, 883404]"
8,9,"[1082185, 1029743, 995242, 1106523, 6534178]"
9,10,"[1082185, 1029743, 6534178, 995242, 1106523]"


In [28]:

df = df.rename(columns={'item_id': 'pyspark_alt'})
df.head(10)

Unnamed: 0,user_id,pyspark_alt
0,1,"[1082185, 1029743, 981760, 995242, 862349]"
1,2,"[1082185, 1029743, 1106523, 995242, 981760]"
2,3,"[1029743, 1106523, 1082185, 951590, 883404]"
3,4,"[1082185, 1029743, 995242, 1106523, 981760]"
4,5,"[1082185, 1029743, 6534178, 995242, 1106523]"
5,6,"[1082185, 1026118, 1051516, 854852, 878996]"
6,7,"[1082185, 1029743, 995242, 1106523, 981760]"
7,8,"[1082185, 1029743, 1106523, 981760, 883404]"
8,9,"[1082185, 1029743, 995242, 1106523, 6534178]"
9,10,"[1082185, 1029743, 6534178, 995242, 1106523]"


In [29]:
result = result.merge(df, left_on='user_id', right_on='user_id')
# result.head(10)

## ALS implicit

In [30]:
%%time

model_als = AlternatingLeastSquares(factors=100, 
                                regularization=0.001,
                                # alpha=0.5,
                                iterations=15, 
                                calculate_training_loss=True, 
                                num_threads=4,                                
                                random_state=42)

model_als.fit(sparse_user_item,  # На вход item-user matrix
          show_progress=True)

  0%|          | 0/15 [00:00<?, ?it/s]

CPU times: user 1.82 s, sys: 108 ms, total: 1.93 s
Wall time: 1.92 s


In [31]:
recs = model_als.recommend(userid=userid_to_id[2],  # userid - id от 0 до N
                        user_items=sparse_user_item,   # на вход user-item matrix
                        N=5, # кол-во рекомендаций 
                        filter_already_liked_items=False, 
                        filter_items=[itemid_to_id[999999]], 
                        recalculate_user=False)

In [32]:
[id_to_itemid[rec] for rec in recs[0]]

[1133018, 1106523, 5569230, 1082185, 1053690]

In [33]:
def get_recommendations(user, model_als, sparse_user_item, N=5):
    res = [id_to_itemid[rec] for rec in 
                    model_als.recommend(userid=userid_to_id[user], 
                                    user_items=sparse_user_item[userid_to_id[user]],   # на вход user-item matrix
                                    N=N, 
                                    filter_already_liked_items=False, 
                                    filter_items=[itemid_to_id[999999]], 
                                    recalculate_user=False)[0]]
    return res

In [34]:
%%time
    
result['implicit_als'] = result['user_id'].map(lambda x: get_recommendations(x, model_als, sparse_user_item, N=5))
result.head(10)

CPU times: user 527 ms, sys: 0 ns, total: 527 ms
Wall time: 526 ms


Unnamed: 0,user_id,actual,pyspark_alt,implicit_als
0,1,"[821867, 834484, 856942, 865456, 914190, 95804...","[1082185, 1029743, 981760, 995242, 862349]","[1033142, 995242, 1029743, 5569374, 979707]"
1,3,"[851057, 872021, 878302, 879948, 909638, 91320...","[1029743, 1106523, 1082185, 951590, 883404]","[5568378, 965766, 5569327, 1106523, 951590]"
2,6,"[920308, 926804, 1017061, 1078346, 1120741, 82...","[1082185, 1026118, 1051516, 854852, 878996]","[1051516, 1007195, 866211, 923746, 1024306]"
3,7,"[840386, 889774, 898068, 909714, 953476, 97699...","[1082185, 1029743, 995242, 1106523, 981760]","[1133018, 938700, 1082185, 1058997, 826249]"
4,8,"[835098, 872137, 910439, 924610, 1041259, 5569...","[1082185, 1029743, 1106523, 981760, 883404]","[1053690, 938700, 1004906, 930917, 913785]"
5,9,"[864335, 1029743, 9297474, 889692, 919427, 995...","[1082185, 1029743, 995242, 1106523, 6534178]","[1082185, 1029743, 995242, 981760, 849843]"
6,13,"[6534178, 840361, 862070, 884897, 920308, 9504...","[1082185, 1029743, 1404121, 981760, 1106523]","[859075, 918335, 999714, 1106523, 5568378]"
7,14,"[840601, 933067, 951590, 952408, 965693, 98176...","[1082185, 1029743, 1106523, 981760, 995242]","[844165, 951590, 826249, 965766, 6534178]"
8,15,"[910439, 1082185, 959076, 1023958, 1082310, 82...","[1082185, 1029743, 995242, 1106523, 981760]","[1082185, 1127831, 878996, 981760, 1029743]"
9,16,"[1062973, 1082185]","[1082185, 6534178, 1029743, 995242, 1106523]","[1082185, 6534178, 981760, 995242, 1029743]"


In [35]:
print(f"Значение метрики precision@5 для ALS implicit: {result.apply(lambda row: precision_at_k(row['implicit_als'], row['actual']), axis=1).mean()}")
print(f"Значение метрики map@5 для ALS implicit: {result.apply(lambda row: ap_k(row['implicit_als'], row['actual']), axis=1).mean() * 100}")

Значение метрики precision@5 для ALS implicit: 0.18419452887537752
Значение метрики map@5 для ALS implicit: 31.32176629517061


In [36]:
print(f"Значение метрики precision@5 для ALS pyspark: {result.apply(lambda row: precision_at_k(row['pyspark_alt'], row['actual']), axis=1).mean()}")
print(f"Значение метрики map@5 для ALS pyspark: {result.apply(lambda row: ap_k(row['pyspark_alt'], row['actual']), axis=1).mean() * 100}")

Значение метрики precision@5 для ALS pyspark: 0.22684903748733218
Значение метрики map@5 для ALS pyspark: 43.832953394123855


__Вывод:__  
Сравнив метрики map@5, precision@5 можно сделать вывод, что ALS в pyspark показал себя лучше precision@5 больше на 0.04, а map@5 на 12,5 по сравнению с ALS в implicit.