In [17]:
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
pd.options.display.float_format = '{:.2f}'.format
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_colwidth', None)

from src.metrics import precision_at_k, recall_at_k
from src.recommenders import MainRecommender

from lightgbm import LGBMClassifier

import findspark
findspark.init()
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf

In [166]:
train = pd.read_csv('data/retail_train.csv')
test = pd.read_csv('data/retail_test.csv')
item_features = pd.read_csv('data/product.csv')
user_features = pd.read_csv('data/hh_demographic.csv')

In [292]:
#Внесем в датасет сведения о средней цене на товар
prices = pd.concat([train,test]).groupby('item_id', as_index=False).agg({'quantity' : sum, 'sales_value' : sum})
prices['price'] = prices.sales_value/prices.quantity
train = pd.merge(train,prices[['item_id', 'price']], on='item_id', how='left')
train

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc,price_x,binary,price_y,price_x.1,price_y.1
0,2375,26984851472,1,1004906,1,1.39,364,-0.60,1631,1,0.00,0.00,2.39,1,2.40,2.40,2.40
1,2375,26984851472,1,1033142,1,0.82,364,0.00,1631,1,0.00,0.00,0.95,1,0.94,0.94,0.94
2,2375,26984851472,1,1036325,1,0.99,364,-0.30,1631,1,0.00,0.00,1.32,1,1.32,1.32,1.32
3,2375,26984851472,1,1082185,1,1.21,364,0.00,1631,1,0.00,0.00,0.96,1,0.96,0.96,0.96
4,2375,26984851472,1,8160430,1,1.50,364,-0.39,1631,1,0.00,0.00,1.68,1,1.74,1.74,1.74
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2396799,1613,41655820646,663,16102849,1,2.00,3262,-1.15,1231,95,0.00,0.00,2.00,1,2.00,2.00,2.00
2396800,1001,41655829421,663,13217063,1,1.69,3131,0.00,2231,95,0.00,0.00,1.69,1,1.14,1.14,1.14
2396801,1001,41655829421,663,13217800,1,1.69,3131,0.00,2231,95,0.00,0.00,1.69,1,1.34,1.34,1.34
2396802,1167,41656790510,663,6410462,22451,43.98,3385,-0.65,1059,95,0.00,0.00,0.00,1,0.00,0.00,0.00


In [169]:
#Сформируем необходимые данные для сравнения на валидационной выборке
result = test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.shape

(1885, 2)

In [170]:
#СОставим матрицу клиент-пользователь, взяв за основу бинарный принцип
recs = MainRecommender(train, values='binary')
recs.info()

В матрице пользователей - 2499, товаров - 89051, покупок- 1314372, плотность матрицы составляет 0.0059


In [171]:
#Классический ALS метод (со взвешиванием)
recs.fit()

In [182]:
p = recs.overall_top_purchases.item_id.tolist()
def calc_metrics(n):
    x = recs.predict(n=n, filter_already_liked_items=False)
    x = x.reset_index().rename(columns={'index' : 'user_id'})
    result1 = result.merge(x, on='user_id', how='left')
    result1 = result1.query('рекомендации == рекомендации')
    
    #Если число рекомендаций меньше требуемого, добавляем в них товары из списка самых популярных, но так, чтобы
    #товары не повторялись, до тех пор пока число товаров не станет равным требуемому
    result1.рекомендации = result1.рекомендации.apply(lambda x : [y for y in x if y != 'n/a'])
    for f in result1.рекомендации: 
        if len(f) < n:
                for item in p:
                    if len(f) < n and item not in f:
                            f.append(item)
    result1['common'] = result1.apply(lambda row: set(row.actual).intersection(set(row.рекомендации[:5])), axis=1)
    precision_at_5 = (result1.common.apply(lambda x : len(x))/5).mean()
    recall = result1.apply(lambda row : len(set(row.actual).intersection(set(row.рекомендации)))/len(row['actual']), axis=1).mean()
    result1['recs5_weighted_prices'] = result1.рекомендации.apply(lambda rec : sum([prices[prices.item_id == item].price.values[0] for item in rec[:5]]))
    result1['common_weighted_prices'] = result1.common.apply(lambda rec : sum([prices[prices.item_id == item].price.values[0] for item in rec]))
    money_precision_at_5= (result1.common_weighted_prices/result1.recs5_weighted_prices).mean()
    return result1, precision_at_5 , money_precision_at_5, recall
print(f'Precision at 5 составило {round(calc_metrics(20)[1],4)}, money precision at 5 -  {round(calc_metrics(20)[2],4)}')
print(f'Значение recall составило {calc_metrics(20)[3]}, {calc_metrics(50)[3]}, {calc_metrics(100)[3]}, {calc_metrics(200)[3]}, {calc_metrics(500)[3]}')
print('при числе кандидатов 20, 50, 100, 200 и 500 соотвественно')

Precision at 5 составило 0.1296, money precision at 5 -  0.1229
Значение recall составило 0.06898101225910765, 0.1249684627361574, 0.18224669349937406, 0.2537121811715705, 0.3714519816070413
при числе кандидатов 20, 50, 100, 200 и 500 соотвественно


In [187]:
recs.fit(weighted=False)
print(f'Precision at 5 составило {round(calc_metrics(20)[1],4)}, money precision at 5 -  {round(calc_metrics(20)[2],4)}')
print(f'Значение recall составило {calc_metrics(20)[3]}, {calc_metrics(50)[3]}, {calc_metrics(100)[3]}, {calc_metrics(200)[3]}, {calc_metrics(500)[3]}')
print('при числе кандидатов 20, 50, 100, 200 и 500 соотвественно')

Precision at 5 составило 0.1763, money precision at 5 -  0.1691
Значение recall составило 0.08483263377284067, 0.14578738852735673, 0.20498070607893384, 0.2745005699123277, 0.3810531039177072
при числе кандидатов 20, 50, 100, 200 и 500 соотвественно


In [185]:
#Метод рекомендаций на основе ранее купленных товаров. Опытным путем было установлено оптимальное число ближайших соседей (2)
recs.fit(own=True, knn=2)

In [186]:
print(f'Precision at 5 составило {round(calc_metrics(20)[1],4)}, money precision at 5 -  {round(calc_metrics(20)[2],4)}')
print(f'Значение recall составило {calc_metrics(20)[3]}, {calc_metrics(50)[3]}, {calc_metrics(100)[3]}, {calc_metrics(200)[3]}, {calc_metrics(500)[3]}')
print('при числе кандидатов 20, 50, 100, 200 и 500 соотвественно')

Precision at 5 составило 0.2536, money precision at 5 -  0.2442
Значение recall составило 0.09712697000008642, 0.13375984670443938, 0.1587473028966563, 0.1922246613461064, 0.2659120365787343
при числе кандидатов 20, 50, 100, 200 и 500 соотвественно


In [188]:
session = (SparkSession.builder.config('spark.driver.host', 'localhost').config('spark.sql.shuffle.partitions', '100').\
           config('spark.driver.bindAddress', '127.0.0.1').master('local[*]').enableHiveSupport().getOrCreate())

In [190]:
train['binary'] = 1
spark_data_train = session.createDataFrame(train[['user_id', 'item_id', 'binary']])
spark_data_train = spark_data_train.withColumnRenamed('binary', 'relevance')

In [223]:
model = ALS(rank=125, blockSize=1024, numUserBlocks=22,  userCol='user_id', alpha = 1, maxIter=12,itemCol='item_id',
            ratingCol='relevance', regParam=0.12,  implicitPrefs=True, seed=42, coldStartStrategy='drop').\
            fit(spark_data_train)
recs_als = model.recommendForAllUsers(100)
recs_als = (recs_als.withColumn('recommendations', sf.explode('recommendations')).withColumn('item_id', sf.col('recommendations.item_id')).\
    withColumn('relevance', sf.col('recommendations.rating')).select('user_id', 'item_id', 'relevance'))
preds_raw = recs_als.toPandas()

preds = preds_raw.groupby('user_id', as_index=False).agg({'item_id' : lambda x : list(x)}).rename(columns={'item_id' : 'рекомендации'})
result1 = result.merge(preds, on='user_id', how='inner')
result1['common'] = result1.apply(lambda row: set(row.actual).intersection(set(row.рекомендации[:5])), axis=1)
precision_at_5 = (result1.common.apply(lambda x : len(x))/5).mean()
recall = result1.apply(lambda row : len(set(row.actual).intersection(set(row.рекомендации)))/len(row['actual']), axis=1).mean()
result1['recs5_weighted_prices'] = result1.рекомендации.apply(lambda rec : sum([prices[prices.item_id == item].price.values[0] for item in rec[:5]]))
result1['common_weighted_prices'] = result1.common.apply(lambda rec : sum([prices[prices.item_id == item].price.values[0] for item in rec]))
money_precision_at_5= (result1.common_weighted_prices/result1.recs5_weighted_prices).mean()
print(f'Precision at 5 составило {round(precision_at_5,4)}, money precision at 5 -  {round(money_precision_at_5,4)}')
print(f'Значение recall при числе кандидатов 100 составило {round(recall,4)}')

Precision at 5 составило 0.2786, money precision at 5 -  0.2684
Значение recall при числе кандидатов 100 составило 0.2219


Было опробовано 4 модели: ALS, ALS со взвешиванием, Item-Item на сонове товаров, ранее купленных покупателем и ALS на основе PySpark. 

Из них наилучшие результаты (хотя и с наименьшей скоростью) показал PySpark, на 100 кандидатах recall - 0.22 и money_precision_at_5 - 0.28.

Возьмем эти результаты как исходные данные для классификации.

In [239]:
model_preds = result1[['user_id', 'actual',	'рекомендации']]

In [241]:
preds = model_preds.drop('actual',1).explode('рекомендации')
preds['predicted'] = 1
preds = preds.set_index(['user_id', 'рекомендации'])
actual = model_preds.drop('рекомендации',1).explode('actual').sort_values(['user_id', 'actual']).reset_index(drop=True)
actual['true'] = 1
actual = actual.set_index(['user_id', 'actual'])


In [229]:
preds_actual= pd.concat([preds,actual],1).fillna(0)
preds_actual['target'] = (preds_actual.predicted == preds_actual.true)*1
preds_actual = preds_actual.reset_index().drop(['predicted', 'true'],1).rename(columns={'level_1' : 'item_id'})
preds_actual

Unnamed: 0,user_id,item_id,target
0,1,856942,0
1,1,1049998,1
2,1,1082185,0
3,1,1074612,0
4,1,5577022,0
...,...,...,...
251711,2500,15801331,0
251712,2500,15831322,0
251713,2500,17169644,0
251714,2500,17328953,0


In [230]:
preds_actual.target.mean()

0.06338889860000954

In [231]:
#Посчитаем средний чек для каждого пользователя
cheque = train.groupby('user_id', as_index=False).agg({'sales_value' : ['sum', 'count']})
cheque.columns = ['user_id', 'summ', 'countt']
cheque['av_cheque'] = cheque.summ/cheque.countt
cheque= cheque[['user_id', 'av_cheque']]
cheque

Unnamed: 0,user_id,av_cheque
0,1,2.49
1,2,2.78
2,3,2.92
3,4,3.99
4,5,3.42
...,...,...
2494,2496,2.89
2495,2497,3.53
2496,2498,3.01
2497,2499,2.97


In [234]:
#Посчитаем средний чек в каждой категории для каждого пользователя
item_features.columns = [col.lower() for col in item_features.columns]
item_features = item_features.rename(columns={'product_id' : 'item_id'})
cats_raw = pd.merge(train, item_features, on='item_id')
cats = cats_raw.groupby(['user_id', 'commodity_desc'], as_index=False).sales_value.sum()
cats = cats.rename(columns={'sales_value' : 'av_cat_cheque'})
cats

Unnamed: 0,user_id,commodity_desc,av_cat_cheque
0,1,,0.00
1,1,AIR CARE,57.66
2,1,ANALGESICS,9.98
3,1,APPLES,24.89
4,1,BACON,11.29
...,...,...,...
280317,2500,VEGETABLES SALAD,0.99
280318,2500,VITAMINS,24.98
280319,2500,WAREHOUSE SNACKS,36.22
280320,2500,WATER - CARBONATED/FLVRD DRINK,10.28


In [235]:
#Посчитаем для каждого товара отношение его цены к средней цене по категории
cat_av_price = cats_raw.groupby('commodity_desc', as_index=False).price.mean().rename(columns= {'price' : 'cat_av_price'})
cat_av_price  = pd.merge(cats_raw[['item_id', 'commodity_desc', 'price']],cat_av_price, on='commodity_desc')
cat_av_price['price_to_av_cat_price'] = cat_av_price.price/cat_av_price.cat_av_price
cat_av_price = cat_av_price.groupby('item_id', as_index=False).price_to_av_cat_price.mean()
cat_av_price 

Unnamed: 0,item_id,price_to_av_cat_price
0,25671,1.61
1,26081,0.29
2,26093,0.86
3,26190,1.11
4,26355,0.45
...,...,...
89046,17991689,1.67
89047,17991691,1.67
89048,18000012,0.73
89049,18024155,1.68


In [237]:
#Добавим в модель рассчитанные ранее фичи
final = preds_actual.copy()
final = pd.merge(preds_actual, item_features[['item_id', 'manufacturer', 'department', 'commodity_desc', 'sub_commodity_desc']], on='item_id', how='left')
user_features.rename(columns = {'household_key' : 'user_id'}, inplace=True)

final = pd.merge(final, cheque, on='user_id', how='left').fillna(0)
final = pd.merge(final,cats,on=['user_id', 'commodity_desc'], how='left').fillna(0)
final = pd.merge(final,prices,on='item_id', how='left').fillna(0)
final = pd.merge(final,cat_av_price ,on='item_id', how='left').fillna(0)
final

Unnamed: 0,user_id,item_id,target,manufacturer,department,commodity_desc,sub_commodity_desc,av_cheque,av_cat_cheque,quantity,sales_value,price,price_to_av_cat_price
0,1,856942,0,159,GROCERY,BAKED BREAD/BUNS/ROLLS,FRUIT/BREAKFAST BREAD,2.49,294.02,312.00,868.30,2.78,1.87
1,1,1049998,1,348,DRUG GM,CANDY - PACKAGED,GUM (PACKAGED),2.49,205.02,60.00,107.04,1.78,0.97
2,1,1082185,0,2,PRODUCE,TROPICAL FRUIT,BANANAS,2.49,36.44,28384.00,27291.02,0.96,0.92
3,1,1074612,0,282,GROCERY,VEGETABLES - SHELF STABLE,BEANS GREEN: FS/WHL/CUT,2.49,80.38,155.00,152.93,0.99,1.22
4,1,5577022,0,1194,GROCERY,REFRGRATD JUICES/DRNKS,DAIRY CASE 100% PURE JUICE - O,2.49,129.81,193.00,527.20,2.73,1.41
...,...,...,...,...,...,...,...,...,...,...,...,...,...
251711,2500,15801331,0,194,GROCERY,BAKING MIXES,BROWNIE MIX,3.71,27.82,4.00,7.96,1.99,1.41
251712,2500,15831322,0,1011,GROCERY,PAPER HOUSEWARES,DESIGNER PAPER: MEDIUM WEIGHT,3.71,0.00,16.00,73.24,4.58,2.24
251713,2500,17169644,0,1136,GROCERY,HOUSEHOLD CLEANG NEEDS,TOOLS - FLOOR & FURNITURE,3.71,45.94,0.00,0.00,0.00,0.00
251714,2500,17328953,0,2236,DRUG GM,INFANT CARE PRODUCTS,FEEDING ACCESSORIES BOTTLES,3.71,68.71,0.00,0.00,0.00,0.00


In [242]:
X_train = final[['user_id', 'item_id', 'department', 'commodity_desc', 'sub_commodity_desc', 'av_cheque','price_to_av_cat_price']]
y_train = final.target
cat_feats = ['department','commodity_desc','sub_commodity_desc']
X_train[cat_feats] = X_train[cat_feats].astype('category')

In [323]:
lgb = LGBMClassifier(max_depth=20, n_estimators=1000, random_state=42,categorical_column=cat_feats)
lgb.fit(X_train, y_train)
preds = lgb.predict_proba(X_train)
final['preds'] = preds[:,1]
res = final[['user_id', 'item_id', 'preds']].sort_values(['user_id', 'preds'], ascending=[True, False]).groupby('user_id')['item_id'].unique().to_frame()
res['рекомендации']= res.item_id.apply(lambda x: x[:5])
res = res.reset_index().drop('item_id',1)

In [324]:
#Посмотрим, каких пользователей модель не смогла предсказать (т. к. мы поставили параметр coldStartStrategy == drop)
not_recognized_user = result[~result.user_id.isin(res.user_id)].user_id
not_recognized_user

1762    2325
Name: user_id, dtype: int64

In [325]:
#Для этого пользователя укажем в виде рекомендаций 5 наиболее популярных товаров
res.loc[len(res)] = [not_recognized_user.values[0], recs.overall_top_purchases.item_id[:5].to_list()]
res

Unnamed: 0,user_id,рекомендации
0,1,"[931136, 9527558, 961554, 1049998, 1075074]"
1,2,"[940947, 1133018, 1026118, 961554, 1082185]"
2,3,"[1133018, 1082185, 995242, 953476, 1029743]"
3,6,"[995242, 1137688, 1119051, 1133018, 986912]"
4,7,"[1013321, 938700, 5568378, 961554, 1003188]"
...,...,...
1880,2497,"[6534178, 1082185, 914190, 899624, 1074405]"
1881,2498,"[6534178, 914190, 1053690, 5568378, 1058997]"
1882,2499,"[914190, 947798, 961554, 5568378, 5569327]"
1883,2500,"[1058997, 995242, 1082185, 843756, 5569230]"


In [340]:
res.to_csv('preds.csv')

In [334]:
#Мы сделали предсказания для всех юзеров, имеющихся в таблице test
sorted(res.user_id.to_list()) == sorted(result.user_id.to_list()) 

True

In [337]:
result1 = result.merge(res, on='user_id', how='left')
result1 = result1.query('рекомендации == рекомендации')
result1['common'] = result1.apply(lambda row: set(row.actual).intersection(set(row.рекомендации[:5])), axis=1)
precision_at_5 = (result1.common.apply(lambda x : len(x))/5).mean()
recall = result1.apply(lambda row : len(set(row.actual).intersection(set(row.рекомендации)))/len(row['actual']), axis=1).mean()
result1['recs5_weighted_prices'] = result1.рекомендации.apply(lambda rec : sum([prices[prices.item_id == item].price.values[0] for item in rec[:5]]))
result1['common_weighted_prices'] = result1.common.apply(lambda rec : sum([prices[prices.item_id == item].price.values[0] for item in rec]))
money_precision_at_5= (result1.common_weighted_prices/result1.recs5_weighted_prices).mean()

In [338]:
precision_at_5, money_precision_at_5,recall

(0.5159681697612732, 0.5716466855569206, 0.09119125117331399)

В результате использования двухступенчатой модели удалось существенно повысить метрики качества 