In [None]:
!pip install implicit
!pip install pyspark

In [None]:
import pandas as pd
import numpy as np

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf

from scipy.sparse import csr_matrix
from implicit.als import AlternatingLeastSquares

import os, sys
module_path = os.path.abspath(os.path.join(os.pardir))
if module_path not in sys.path:
    sys.path.append(module_path)

# from content.src.metrics import map_k, precision_at_k, recall_at_k
from content.src.utils import prefilter_items
# from content.src.recommenders import MainRecommender

import warnings
warnings.simplefilter('ignore')

In [3]:
data = pd.read_csv('retail_train.csv')
item_features = pd.read_csv('product.csv')
user_features = pd.read_csv('hh_demographic.csv')

item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)

test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.6,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.0,1631,1,0.0,0.0


In [4]:
n_items_before = data_train['item_id'].nunique()
n_items_after = data_train['item_id'].nunique()
data_tarin = prefilter_items(data_train, 5000, item_features)

print(f'Decreased items before {n_items_before} and {n_items_after}')

Decreased items before 86865 and 86865


In [5]:
popularity = data_tarin.groupby('item_id')['quantity'].sum().reset_index()
popularity.rename(columns={'quantity': 'n_sold'}, inplace=True)

top_5000 = popularity.sort_values('n_sold', ascending=False).head(5000).item_id.tolist()

In [6]:
data_tarin.loc[~data_tarin['item_id'].isin(top_5000), 'item_id'] = 999999

In [7]:
user_item_matrix = pd.pivot_table(data_train, 
                                  index='user_id',
                                  columns='item_id', 
                                  values='quantity',
                                  aggfunc='count', 
                                  fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float)
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.tail(2)

item_id,25671,26081,26093,26190,26355,26426,26540,26601,26636,26691,...,17328742,17329473,17329749,17330255,17330511,17381856,17382205,17383227,17827644,17829232
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2499,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2500,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [8]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]

In [9]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.head(3)

Unnamed: 0,user_id,actual
0,1,"[821867, 834484, 856942, 865456, 889248, 90795..."
1,3,"[835476, 851057, 872021, 878302, 879948, 90963..."
2,6,"[920308, 926804, 946489, 1006718, 1017061, 107..."


In [10]:
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

# Spark Session

In [11]:
session = (
    SparkSession.builder.config("spark.driver.memory", "1g")
    .config("spark.sql.shuffle.partions", "50")
    .config("spark.driver.binAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")
    .master("local[1]")
    .enableHiveSupport()
    .getOrCreate()
          )

In [12]:
session

In [13]:
data_train['item_idx'] = data_train['item_id'].map(lambda x: itemid_to_id[x])
data_train['user_idx'] = data_train['user_id'].map(lambda x: userid_to_id[x])

In [14]:
spark_data_train = session.createDataFrame(data_train[['user_idx', 'item_idx', 'quantity']])

In [15]:
spark_data_train = spark_data_train.withColumnRenamed('quantity', 'relevance')

In [16]:
spark_data_train.show(10)

+--------+--------+---------+
|user_idx|item_idx|relevance|
+--------+--------+---------+
|    2373|   26161|        1|
|    2373|   29258|        1|
|    2373|   29609|        1|
|    2373|   34707|        1|
|    2373|   61355|        1|
|    2373|    6361|        2|
|    2373|   30403|        1|
|    2373|   35122|        1|
|    2373|   36914|        1|
|    2373|   54571|        1|
+--------+--------+---------+
only showing top 10 rows



In [17]:
model = ALS(
    rank=30,
    userCol='user_idx',
    itemCol='item_idx',
    ratingCol='relevance',
    implicitPrefs=True,
    seed=42,
    coldStartStrategy='drop'
    ).fit(spark_data_train)

In [18]:
recs_als = model.recommendForAllUsers(5)

In [19]:
recs_als.show()

+--------+--------------------+
|user_idx|     recommendations|
+--------+--------------------+
|       0|[{34707, 1.494534...|
|       1|[{34707, 1.183496...|
|       2|[{55576, 1.570052...|
|       3|[{28895, 0.600019...|
|       4|[{34707, 0.308095...|
|       5|[{43620, 2.684124...|
|       6|[{34707, 1.192634...|
|       7|[{34707, 1.691280...|
|       8|[{34707, 0.702424...|
|       9|[{37343, 0.263664...|
|      10|[{34707, 0.256787...|
|      11|[{34707, 0.431734...|
|      12|[{53118, 1.706271...|
|      13|[{34707, 1.449051...|
|      14|[{34707, 1.218879...|
|      15|[{54554, 1.000049...|
|      16|[{34707, 1.546676...|
|      17|[{34707, 1.510011...|
|      18|[{43620, 1.979471...|
|      19|[{34707, 1.555927...|
+--------+--------------------+
only showing top 20 rows



In [20]:
recs_als = (recs_als.withColumn("recommendations", sf.explode("recommendations"))
                    .withColumn("item_idx", sf.col("recommendations.item_idx"))
                    .withColumn("relevance", sf.col("recommendations.rating")
                    .cast(DoubleType()),)
                    .select("user_idx", "item_idx", "relevance")
            )

In [21]:
recs_als.show()

+--------+--------+------------------+
|user_idx|item_idx|         relevance|
+--------+--------+------------------+
|       0|   34707|1.4945346117019653|
|       0|    9799|1.4480890035629272|
|       0|   10399|  1.35391366481781|
|       0|   12927|1.3443881273269653|
|       0|   33415|1.3291007280349731|
|       1|   34707|1.1834964752197266|
|       1|   28895| 1.099549412727356|
|       1|   37343| 1.045573115348816|
|       1|   23590|    1.039306640625|
|       1|   25064|1.0116292238235474|
|       2|   55576|1.5700528621673584|
|       2|   28895|1.5342798233032227|
|       2|   37343|1.5120460987091064|
|       2|   20372|1.4370718002319336|
|       2|   12722| 1.432856798171997|
|       3|   28895| 0.600019097328186|
|       3|   20372|0.5996979475021362|
|       3|   60529|0.5773027539253235|
|       3|   37343|0.5562776923179626|
|       3|   51478|  0.54885333776474|
+--------+--------+------------------+
only showing top 20 rows



In [23]:
recs_als.toPandas().tail(4)

Unnamed: 0,user_idx,item_idx,relevance
12491,2498,28895,1.293806
12492,2498,43620,1.247538
12493,2498,23590,1.191971
12494,2498,39528,1.188851
