<a href="https://colab.research.google.com/github/SvetlanaTsim/recommendation_systems/blob/main/lesson_07/hw_07_recsys.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m23.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=40f3d83566c72c9798a3621ae9147b8888c07ae93d67969e8e4b5b20e5e5966a
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
!pip install implicit==0.6.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting implicit==0.6.0
  Downloading implicit-0.6.0-cp38-cp38-manylinux2014_x86_64.whl (18.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.6/18.6 MB[0m [31m59.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: implicit
Successfully installed implicit-0.6.0


In [4]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf

# Для работы с матрицами
from scipy.sparse import csr_matrix, coo_matrix

from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import bm25_weight, tfidf_weight

In [5]:
#metrics

"""
Metrics for RecSys
"""
import numpy as np


def hit_rate(recommended_list, bought_list):
    bought_list = np.array(bought_list)
    recommended_list = np.array(recommended_list)
    flags = np.isin(bought_list, recommended_list)
    return (flags.sum() > 0) * 1


def hit_rate_at_k(recommended_list, bought_list, k=5):
    return hit_rate(recommended_list[:k], bought_list)


def precision(recommended_list, bought_list):
    bought_list = np.array(bought_list)
    recommended_list = np.array(recommended_list)
    flags = np.isin(bought_list, recommended_list)
    return flags.sum() / len(recommended_list)


def precision_at_k(recommended_list, bought_list, k=5):
    return precision(recommended_list[:k], bought_list)


def money_precision_at_k(recommended_list, bought_list, prices_recommended, k=5):
    recommended_list = np.array(recommended_list)[:k]
    prices_recommended = np.array(prices_recommended)[:k]
    flags = np.isin(recommended_list, bought_list)
    return np.dot(flags, prices_recommended).sum() / prices_recommended.sum()


def recall(recommended_list, bought_list):
    bought_list = np.array(bought_list)
    recommended_list = np.array(recommended_list)
    flags = np.isin(bought_list, recommended_list)
    return flags.sum() / len(bought_list)


def recall_at_k(recommended_list, bought_list, k=5):
    return recall(recommended_list[:k], bought_list)


def money_recall_at_k(recommended_list, bought_list, prices_recommended, prices_bought, k=5):
    bought_list = np.array(bought_list)
    recommended_list = np.array(recommended_list)[:k]
    prices_recommended = np.array(prices_recommended)[:k]
    prices_bought = np.array(prices_bought)
    flags = np.isin(recommended_list, bought_list)
    return np.dot(flags, prices_recommended).sum() / prices_bought.sum()


def ap_k(recommended_list, bought_list, k=5):
    bought_list = np.array(bought_list)
    recommended_list = np.array(recommended_list)
    recommended_list = recommended_list[recommended_list <= k]

    relevant_indexes = np.nonzero(np.isin(recommended_list, bought_list))[0]
    if len(relevant_indexes) == 0:
        return 0
    amount_relevant = len(relevant_indexes)


    sum_ = sum(
        [precision_at_k(recommended_list, bought_list, k=index_relevant + 1) for index_relevant in relevant_indexes])
    return sum_ / amount_relevant
     


In [6]:
# #utils

"""
Filters for RecSys
"""

import pandas as pd
import numpy as np


def prefilter_items(data, take_n_popular=5000, item_features=None):
    """Prefilter items and take top popular"""

    # Delete rare categories (department)
    if item_features is not None:
        department_size = pd.DataFrame(item_features. \
                                       groupby('department')['item_id'].nunique(). \
                                       sort_values(ascending=False)).reset_index()

        department_size.columns = ['department', 'n_items']
        rare_departments = department_size[department_size['n_items'] < 150].department.tolist()
        items_in_rare_departments = item_features[
            item_features['department'].isin(rare_departments)].item_id.unique().tolist()

        data = data[~data['item_id'].isin(items_in_rare_departments)]

    # Delete cheap items (non profit). Price one purchase from mailing is 1 dollar
    data['price'] = data['sales_value'] / (np.maximum(data['quantity'], 1))
    data = data[data['price'] > 2]

    # Delete expensive items
    data = data[data['price'] < 50]

    # Get top popular items
    popularity = data.groupby('item_id')['quantity'].sum().reset_index()
    popularity.rename(columns={'quantity': 'n_sold'}, inplace=True)
    top = popularity.sort_values('n_sold', ascending=False).head(take_n_popular).item_id.tolist()

    # Set a fake id for non popular items
    data.loc[~data['item_id'].isin(top), 'item_id'] = 999999

    return data


def postfilter_items(user_id, recommednations):
    """Postfilter items after fit models"""

    # What time to show?
    # How often to show?
    pass

In [9]:
data = pd.read_csv('retail_train.csv')
item_features = pd.read_csv('product.csv')
user_features = pd.read_csv('hh_demographic.csv')

# column processing
item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)



In [10]:
# train test split
test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984850000.0,1.0,1004906.0,1.0,1.39,364.0,-0.6,1631.0,1.0,0.0,0.0
1,2375,26984850000.0,1.0,1033142.0,1.0,0.82,364.0,0.0,1631.0,1.0,0.0,0.0


In [11]:
popularity = data_train.groupby('item_id')['quantity'].sum().reset_index()
popularity.rename(columns={'quantity': 'n_sold'}, inplace=True)

top_5000 = popularity.sort_values('n_sold', ascending=False).head(5000).item_id.tolist()

In [12]:
# Заведем фиктивный item_id (если юзер покупал товары из топ-5000, то он "купил" такой товар)
data_train.loc[~data_train['item_id'].isin(top_5000), 'item_id'] = 999999

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)


In [13]:
user_item_matrix = pd.pivot_table(data_train, 
                                  index='user_id', columns='item_id', 
                                  values='quantity', # Можно пробоват ьдругие варианты
                                  aggfunc='count', 
                                  fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float) # необходимый тип матрицы для implicit

In [14]:
# переведем в формат saprse matrix
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.head(3)

item_id,202291.0,397896.0,420647.0,480014.0,545926.0,707683.0,731106.0,818980.0,819063.0,819255.0,...,15596488.0,15596515.0,15778533.0,15831255.0,15926844.0,15926886.0,15927403.0,15927661.0,15927850.0,16809471.0
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [16]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]

In [17]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.head(2)

Unnamed: 0,user_id,actual
0,1,"[829323.0, 851515.0, 940947.0, 995242.0, 10552..."
1,2,"[821083.0, 828106.0, 830960.0, 833025.0, 83813..."


In [18]:
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

###SparkSession

In [19]:
# session = (
    
# )


session = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

In [20]:
session 

In [24]:
!pip install pyarrow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [21]:
spark_data_train = session.createDataFrame(data_train[['user_id', 'item_id', 'quantity']])

In [22]:
spark_data_train = spark_data_train.withColumnRenamed('quantity', 'relevance')

In [23]:
spark_data_train.show(10)

+-------+---------+---------+
|user_id|  item_id|relevance|
+-------+---------+---------+
|   2375|1004906.0|      1.0|
|   2375|1033142.0|      1.0|
|   2375|1036325.0|      1.0|
|   2375|1082185.0|      1.0|
|   2375|8160430.0|      1.0|
|   2375| 826249.0|      2.0|
|   2375| 999999.0|      1.0|
|   2375|1085983.0|      1.0|
|   2375| 999999.0|      1.0|
|   2375| 999999.0|      1.0|
+-------+---------+---------+
only showing top 10 rows



In [26]:
model = ALS(
    rank=30,
    userCol='user_id',
    itemCol='item_id',
    ratingCol='relevance',
    implicitPrefs=True,
    seed=42,
    coldStartStrategy='drop',
).fit(spark_data_train)

In [27]:
recs_als = model.recommendForAllItems(5)

In [28]:
recs_als.show()

+-------+--------------------+
|item_id|     recommendations|
+-------+--------------------+
| 202291|[{2498, 0.9832718...|
| 420647|[{1272, 1.2137792...|
| 480014|[{1988, 3.0684826...|
| 731106|[{1237, 0.9969104...|
| 818980|[{1020, 0.6730642...|
| 819304|[{19, 0.9638285},...|
| 819308|[{1111, 1.0787512...|
| 819643|[{2317, 0.7874579...|
| 819765|[{1959, 1.3689586...|
| 819927|[{1845, 1.2582301...|
| 820082|[{1248, 0.6003673...|
| 820165|[{1260, 1.416674}...|
| 820291|[{1430, 0.7399742...|
| 820301|[{222, 1.1165347}...|
| 820321|[{88, 0.8844796},...|
| 820518|[{1835, 0.808106}...|
| 820560|[{2076, 1.359143}...|
| 820895|[{2142, 0.7028105...|
| 821025|[{2322, 0.6525344...|
| 821200|[{149, 0.9699792}...|
+-------+--------------------+
only showing top 20 rows



In [37]:
recs_als.count()

5001

In [29]:
model.itemFactors.show()

+------+--------------------+
|    id|            features|
+------+--------------------+
|818980|[-0.06131731, 0.0...|
|819330|[0.19052994, -0.1...|
|819840|[0.15041186, 0.07...|
|820560|[0.81883645, -0.2...|
|821200|[0.07871767, 0.00...|
|821730|[-0.1441819, -0.3...|
|822140|[-0.008198519, 0....|
|823990|[-0.39197034, 0.0...|
|824180|[-0.29060042, -0....|
|825650|[0.16561802, -0.1...|
|825970|[-0.09399322, -0....|
|826790|[-0.050618473, -0...|
|826860|[-0.0052224435, -...|
|827180|[-0.06405448, -0....|
|827570|[0.15111898, -0.2...|
|830750|[-0.20631053, -0....|
|830960|[0.017235119, -0....|
|831390|[-0.038623985, -0...|
|832760|[0.039238922, -0....|
|833860|[-0.04058077, -0....|
+------+--------------------+
only showing top 20 rows



In [30]:
model.userFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.06181012, -0....|
| 20|[-3.5484874E-4, -...|
| 30|[-0.08585472, -0....|
| 40|[-0.10397756, -0....|
| 50|[-0.030362777, -0...|
| 60|[0.01337036, -0.3...|
| 70|[-0.0978701, -0.1...|
| 80|[-0.13015419, -0....|
| 90|[-0.11980926, -0....|
|100|[0.035559118, -0....|
|110|[-0.10410147, -0....|
|120|[-0.03773101, -0....|
|130|[-0.12886226, -0....|
|140|[-0.10761751, -0....|
|150|[-0.06221408, -0....|
|160|[-0.16185153, -0....|
|170|[-0.0099004945, -...|
|180|[-0.08816957, -0....|
|190|[-0.04970625, -0....|
|200|[-0.041131075, -0...|
+---+--------------------+
only showing top 20 rows



In [33]:
users = pd.DataFrame([1, 2])
users

Unnamed: 0,0
0,1
1,2


In [34]:
session.createDataFrame(users).show()

+---+
|  0|
+---+
|  1|
|  2|
+---+



In [35]:
recs_subset = model.recommendForUserSubset(session.createDataFrame(users).withColumnRenamed('0', 'user_id'), 5)

In [36]:
recs_subset.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{999999, 1.76383...|
|      2|[{999999, 1.75938...|
+-------+--------------------+



In [45]:
recs_users_als = model.recommendForAllUsers(6)

In [46]:
recs_users_als.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{999999, 1.76383...|
|      3|[{999999, 1.55272...|
|      5|[{999999, 1.68676...|
|      6|[{1082185, 1.8150...|
|      9|[{999999, 1.45858...|
|     12|[{999999, 1.40141...|
|     13|[{999999, 1.67087...|
|     15|[{999999, 1.64558...|
|     16|[{999999, 1.75619...|
|     17|[{6544236, 2.1117...|
|     19|[{1068719, 1.7989...|
|     20|[{1404121, 1.8674...|
|     22|[{999999, 1.71348...|
|     26|[{999999, 1.65016...|
|     27|[{999999, 1.72187...|
|     28|[{999999, 1.72880...|
|     31|[{999999, 1.67803...|
|     34|[{999999, 1.35025...|
|     35|[{999999, 1.54782...|
|     37|[{999999, 1.52969...|
+-------+--------------------+
only showing top 20 rows



In [47]:
recs_users_als.count()

2499

In [48]:
recs_als = (recs_users_als.withColumn(
    'recommendations', sf.explode('recommendations')
  )
    .withColumn('item_id', sf.col('recommendations.item_id'))
    .withColumn('relevance', 
                sf.col('recommendations.rating').cast(DoubleType())
                )
    .select('user_id', 'item_id', 'relevance')
    )

In [49]:
recs_als

DataFrame[user_id: int, item_id: int, relevance: double]

In [50]:
recs_als.show()

+-------+-------+------------------+
|user_id|item_id|         relevance|
+-------+-------+------------------+
|      1| 999999|1.7638300657272339|
|      1|1082185| 1.750781536102295|
|      1|1029743|1.4700732231140137|
|      1| 995242|1.4617059230804443|
|      1| 981760| 1.459898591041565|
|      1| 856942| 1.371649146080017|
|      3| 999999|1.5527293682098389|
|      3|1029743|1.4677079916000366|
|      3|1106523|1.4580351114273071|
|      3| 951590|1.4193614721298218|
|      3|1082185|1.4147295951843262|
|      3| 883404|1.3634978532791138|
|      5| 999999|1.6867679357528687|
|      5|1082185| 1.447524905204773|
|      5|6534178|1.2370492219924927|
|      5|1029743|1.2242306470870972|
|      5| 995242|1.1465785503387451|
|      5|1106523|1.0603070259094238|
|      6|1082185|1.8150135278701782|
|      6| 999999| 1.741112232208252|
+-------+-------+------------------+
only showing top 20 rows



In [52]:
recs_als.toPandas()

Unnamed: 0,user_id,item_id,relevance
0,1,999999,1.763830
1,1,1082185,1.750782
2,1,1029743,1.470073
3,1,995242,1.461706
4,1,981760,1.459899
...,...,...,...
14989,2500,1082185,1.576736
14990,2500,1404121,1.504669
14991,2500,1029743,1.398164
14992,2500,981760,1.302276


In [53]:
recs_df = recs_als.toPandas()

In [54]:
result_als = recs_df.groupby('user_id')['item_id'].unique().reset_index()
result_als.columns=['user_id', 'recs_als']
result_als.head()

Unnamed: 0,user_id,recs_als
0,1,"[999999, 1082185, 1029743, 995242, 981760, 856..."
1,2,"[999999, 1082185, 1029743, 1106523, 995242, 98..."
2,3,"[999999, 1029743, 1106523, 951590, 1082185, 88..."
3,4,"[999999, 1082185, 1029743, 995242, 1106523, 98..."
4,5,"[999999, 1082185, 6534178, 1029743, 995242, 11..."


In [55]:
result_als['recs_als'] = result_als['recs_als'].apply(lambda x: x[1:])

result_als.head()

Unnamed: 0,user_id,recs_als
0,1,"[1082185, 1029743, 995242, 981760, 856942]"
1,2,"[1082185, 1029743, 1106523, 995242, 981760]"
2,3,"[1029743, 1106523, 951590, 1082185, 883404]"
3,4,"[1082185, 1029743, 995242, 1106523, 981760]"
4,5,"[1082185, 6534178, 1029743, 995242, 1106523]"


In [57]:
result = result.merge(result_als, on='user_id', how='left')
result.head()

Unnamed: 0,user_id,actual,recs_als
0,1,"[829323.0, 851515.0, 940947.0, 995242.0, 10552...","[1082185, 1029743, 995242, 981760, 856942]"
1,2,"[821083.0, 828106.0, 830960.0, 833025.0, 83813...","[1082185, 1029743, 1106523, 995242, 981760]"
2,4,"[831063.0, 883932.0, 891423.0, 908283.0, 95496...","[1082185, 1029743, 995242, 1106523, 981760]"
3,6,"[850102.0, 897088.0, 1062782.0, 1078346.0, 103...","[999999, 1026118, 878996, 1051516, 854852]"
4,7,"[922307.0, 965797.0, 1022003.0, 1064441.0, 108...","[1082185, 1029743, 995242, 1106523, 981760]"


In [58]:
result.apply(lambda row: precision_at_k(row['recs_als'], row['actual']), axis=1).mean()

0.18002183406113542

In [59]:
result.apply(lambda row:ap_k(row['recs_als'], row['actual']), axis=1).mean()

0.0

Вывод precision@5 получился сравнительно неплохой 0.18002183406113542 при применении ALS из pyspark. Сортировка (relevance) работает плохо - map@5 - 0.

###Implicit AlS

In [60]:
user_item_matrix_bm25 = bm25_weight(user_item_matrix.T).T  # Применяется к item-user матрице ! 

In [61]:
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

In [68]:
def get_recommendations(user, model, N=5):
    res = [id_to_itemid[rec] for rec in 
                    model.recommend(userid=userid_to_id[user], 
                                    user_items=sparse_user_item[userid_to_id[user]],   # на вход user-item matrix
                                    N=N, 
                                    filter_already_liked_items=False,
                                    filter_items=[itemid_to_id[999999]], 
                                    recalculate_user=True)[0]]
    return res

In [69]:
model_b = AlternatingLeastSquares(factors=200, 
                                      regularization=0.001,
                                      iterations=30, 
                                      calculate_training_loss=True, 
                                      num_threads=4)
      
model_b.fit(user_item_matrix_bm25.T.T.tocsr(),  # На вход item-user matrix
          show_progress=True)
result['ALS_bm25'] = result['user_id'].apply(lambda x: get_recommendations(x, model=model_b, N=5))
result.apply(lambda row: precision_at_k(row['ALS_bm25'] , row['actual']), axis=1).mean()

  0%|          | 0/30 [00:00<?, ?it/s]

0.21364628820960702

In [70]:
result.apply(lambda row: ap_k(row['ALS_bm25'] , row['actual']), axis=1).mean()

0.0

###Вывод

При применении ALS из модуля implicit precision@5

0.21364628820960702.

map@5 - 0

При применении ALS из pyspark precision@5

0.18002183406113542. 

Сортировка (relevance) работает плохо - map@5 - 0.