**Building recommender system using content based filtering approach**

The following code:

* Builds weighted one-hot endcoded item embeddings in the feature space
* Projecting customers into the embeddings space
* Performing dimensionality reduction usng PCA and picking the first 150 principal component
* Finds N similar items using ApproximateNearestKneighbor from spark MLLib

Cold start approach: recommend most frequent items.

Input data limited to 10000 transactions due to memory constraints

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
     |████████████████████████████████| 281.4 MB 8.7 kB/s             
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
     |████████████████████████████████| 199 kB 44.3 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=4248697715cb37285103b8737717c3f2feb2766b567258c994dade3297f879a5
  Stored in directory: /root/.cache/pip/wheels/07/fb/67/b9f2c0242d156eaa136b45ae4fd99d3e7c0ecc2acfd26f47b9
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.4
    Uninstalling py4j-0.10.9.4:
      Successfully uninstalled py4j-0.10.9.4
Successfully installed py4j-0.10.9.5 pyspark-

In [3]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from skimage import io
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col, lit, lower
from pyspark.ml.feature import BucketedRandomProjectionLSH

In [4]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# spark = SparkSession.builder.appName('Recommendations').getOrCreate()

spark = (SparkSession.builder.appName("HM-Recommendations").config("spark.executor.memory", "14g").getOrCreate())

features = ['article_id', 'prod_name', 'product_type_name','product_group_name', 'graphical_appearance_name', 'colour_group_name',
            'perceived_colour_value_name','perceived_colour_master_name','department_name', 'index_name','index_group_name', 'section_name',
            'garment_group_name', 'detail_desc']

feature_subset = ['product_group_name', 'graphical_appearance_name', 'colour_group_name', 'perceived_colour_value_name',
                  'perceived_colour_master_name', 'department_name', 'index_name', 'index_group_name', 'section_name', 'garment_group_name']
    

rcmnds = spark.read.options(header=True).csv('../input/h-and-m-personalized-fashion-recommendations/sample_submission.csv').select('customer_id')
rcmnds.show(5)

transactions = spark.read.options(header=True).csv("../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv")\
                    .drop('sales_channel_id').drop('price').limit(10000)
transactions.show(5)
# df = spark.read.csv('../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv', header=True, inferSchema=True)

items = spark.read.options(header=True).csv("../input/h-and-m-personalized-fashion-recommendations/articles.csv").select(features)
items.show(5)
# articles = spark.read.csv('../input/h-and-m-personalized-fashion-recommendations/articles.csv', header=True, inferSchema=True)

# df = users.join(articles, on='article_id')

# df = df.select('t_dat', 'customer_id', 'article_id', 'prod_name', 'product_type_name', 'product_group_name',
# 'graphical_appearance_name', 'colour_group_name', 'perceived_colour_value_name', 'perceived_colour_master_name',
# 'department_name', 'index_name', 'index_group_name', 'section_name', 'garment_group_name', 'detail_desc')

# df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/27 05:58:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+--------------------+
|         customer_id|
+--------------------+
|00000dbacae5abe5e...|
|0000423b00ade9141...|
|000058a12d5b43e67...|
|00005ca1c9ed5f514...|
|00006413d8573cd20...|
+--------------------+
only showing top 5 rows

+----------+--------------------+----------+
|     t_dat|         customer_id|article_id|
+----------+--------------------+----------+
|2018-09-20|000058a12d5b43e67...|0663713001|
|2018-09-20|000058a12d5b43e67...|0541518023|
|2018-09-20|00007d2de826758b6...|0505221004|
|2018-09-20|00007d2de826758b6...|0685687003|
|2018-09-20|00007d2de826758b6...|0685687004|
+----------+--------------------+----------+
only showing top 5 rows

+----------+-----------------+-----------------+------------------+-------------------------+-----------------+---------------------------+----------------------------+---------------+----------------+----------------+--------------------+------------------+--------------------+
|article_id|        prod_name|product_type_name|product_gr

In [5]:
# import numpy as np 
# import pandas as pd 
# import matplotlib.pyplot as plt
# import plotly.graph_objects as go
# from skimage import io

# df = pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv', chunksize=100000)
# articles = pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/articles.csv')
# users = next(df)
# df = users.merge(articles, on='article_id')
# df = df[['t_dat', 'customer_id', 'article_id', 'prod_name', 'product_type_name',
#        'product_group_name', 
#        'graphical_appearance_name', 'colour_group_name',
#        'perceived_colour_value_name',
#        'perceived_colour_master_name',
#        'department_name', 'index_name',
#        'index_group_name', 'section_name',
#        'garment_group_name', 'detail_desc']]

# feature_subset = ['product_group_name', 
#        'graphical_appearance_name', 'colour_group_name',
#        'perceived_colour_value_name',
#        'perceived_colour_master_name',
#        'department_name', 'index_name',
#        'index_group_name', 'section_name',
#        'garment_group_name']

# df.head()

# #Choose features to build feature space
# features = feature_subset
# df1 = df[['customer_id', 'article_id'] + features]
# dummies_df = pd.get_dummies(df1, columns=features)
# dummies_df

# minimum_items = 2
# groupby_customer = dummies_df.groupby('customer_id')

# l = []
# cutomer_ids = []
# article_ids = []
# for key in groupby_customer.groups.keys():
#     temp = groupby_customer.get_group(key)
#     if temp.article_id.nunique() >= minimum_items:
#         l.append(temp.drop('article_id', axis=1).sum(numeric_only=True).values)
#         cutomer_ids.append(key)
#         article_ids.extend(temp.article_id.values.tolist())

# user_feature = pd.DataFrame(l, columns = dummies_df.columns[2:])
# normalized_user_feature = user_feature.div(user_feature.sum(axis=1), axis=0)
# normalized_user_feature.insert(0, 'customer_id', cutomer_ids)
# normalized_user_feature = normalized_user_feature.set_index('customer_id')
# normalized_user_feature

# item_feature = dummies_df.drop_duplicates(subset='article_id')
# item_feature = item_feature[item_feature.article_id.isin(article_ids)].drop('customer_id', axis=1)
# item_feature = item_feature.set_index('article_id')
# item_feature

# scores = normalized_user_feature.dot(item_feature.T)
# scores

# def get_rcmnd(customer_id, scores):
#     cutomer_scores = scores.loc[customer_id]
#     customer_prev_items = groupby_customer.get_group(customer_id)['article_id']
#     prev_dropped = cutomer_scores.drop(customer_prev_items.values)
#     ordered = prev_dropped.sort_values(ascending=False)   
#     return ordered, customer_prev_items

# def plot_prev(prev_items):
#     fig = plt.figure(figsize=(20, 10))
#     for item, i in zip(prev_items, range(1, len(prev_items)+1)):
#         item = '0' + str(item)
#         sub = item[:3]
#         image = path + "/"+ sub + "/"+ item +".jpg"
#         image = plt.imread(image)
#         fig.add_subplot(1, 6, i)
#         plt.imshow(image)

# def plot_rcmnd(rcmnds):
#     fig = plt.figure(figsize=(20, 10))
#     for item, i in zip(rcmnds, range(1, k+1)):
#         item = '0' + str(item)
#         sub = item[:3]
#         image = path + "/"+ sub + "/"+ item +".jpg"
#         image = plt.imread(image)
#         fig.add_subplot(1, 6, i)
#         plt.imshow(image)

# from sklearn.decomposition import PCA
# pca = PCA(n_components=100)
# pca.fit(normalized_user_feature)
# pca.explained_variance_ratio_.sum()

# user_feature_pca = pd.DataFrame(pca.transform(normalized_user_feature), columns=['component_{}'.format(i) for i in range(1, 101)]).set_index(normalized_user_feature.index)
# item_feature_pca = pd.DataFrame(pca.transform(item_feature), columns=['component_{}'.format(i) for i in range(1, 101)]).set_index(item_feature.index)

# scores_pca = user_feature_pca.dot(item_feature_pca.T)



In [6]:
# features = feature_subset
# df1 = df.select(['customer_id', 'article_id'] + features)
# df1.show
#Choose features to build feature space
# features = feature_subset
# df1 = df.select('customer_id', 'article_id', *features)
# dummies_df = df1.select('customer_id', 'article_id', *[udf(lambda x: int(bool(x)), IntegerType())(col(c)).alias(c) for c in features])
# dummies_df = pd.get_dummies(dummies_df.toPandas(), columns=features)
# dummies_df

# minimum_items = 2
# groupby_customer = dummies_df.groupby('customer_id')

# l = []
# cutomer_ids = []
# article_ids = []
# for key in groupby_customer.groups.keys():
#     temp = groupby_customer.get_group(key)
#     if temp.article_id.nunique() >= minimum_items:
#         l.append(temp.drop('article_id', axis=1).sum(numeric_only=True).values)
#         cutomer_ids.append(key)
#         article_ids.extend(temp.article_id.values.tolist())
        
# user_feature = pd.DataFrame(l, columns = dummies_df.columns[2:])
# normalized_user_feature = user_feature.div(user_feature.sum(axis=1), axis=0)
# normalized_user_feature.insert(0, 'customer_id', cutomer_ids)
# normalized_user_feature = normalized_user_feature.set_index('customer_id')
# print(normalized_user_feature)

# item_feature = dummies_df.drop_duplicates(subset='article_id')
# item_feature = item_feature[item_feature.article_id.isin(article_ids)].drop('customer_id', axis=1)
# item_feature = item_feature.set_index('article_id')
# print(item_feature)

# scores = normalized_user_feature.dot(item_feature.T)
# print(scores)

# def get_rcmnd(customer_id, scores):
#     cutomer_scores = scores.loc[customer_id]
#     customer_prev_items = groupby_customer.get_group(customer_id)['article_id']
#     prev_dropped = cutomer_scores.drop(customer_prev_items.values)
#     ordered = prev_dropped.sort_values(ascending=False)
#     return ordered, customer_prev_items

# def plot_prev(prev_items):
#     fig = plt.figure(figsize=(20, 10))
#     for item, i in zip(prev_items, range(1, len(prev_items)+1)):
#         item = '0' + str(item)
#         sub = item[:3]
#         image = path + "/"+ sub + "/"+ item +".jpg"
#         image = plt.imread(image)
#         fig.add_subplot(1, 6, i)
#         plt.imshow(image)
    
# def plot_rcmnd(rcmnds):
#     fig = plt.figure(figsize=(20, 10))
#     for item, i in zip(rcmnds, range(1, k+1)):
#         item = '0' + str(item)
#         sub = item[:3]
#         image = path + "/"+ sub + "/"+ item +".jpg"
#         image = plt.imread(image)
#         fig.add_subplot(1, 6, i)
#         plt.imshow(image)
        
        
# from pyspark.ml.feature import PCA

# pca = PCA(k=100, inputCol='normalized_user_feature', outputCol='user_feature_pca')
# pca_model = pca.fit(user_feature)
# user_feature_pca = pca_model.transform(user_feature).select('user_feature_pca')

In [7]:
def to_lower(items):
    for c in feature_subset:
        items = items.withColumn(c, lower(col(c)))
    
    return items

def ohe(items):
    keys = ['article_id']
    def join_all(dfs,keys):
        if len(dfs) > 1:
            return dfs[0].join(join_all(dfs[1:],keys), on = keys, how = 'inner')
        else:
            return dfs[0]

    dfs = []
    combined = []
    for pivot_col in feature_subset:
        pivotDF = items.groupBy(keys).pivot(pivot_col).count()
        new_names = pivotDF.columns[:len(keys)] +  ["e_{0}_{1}".format(pivot_col, i) for i, c in enumerate(pivotDF.columns[len(keys):])]        
        newdf = pivotDF.toDF(*new_names).fillna(0)    
        combined.append(newdf)

    item_feature = join_all(combined,keys)
    
    return item_feature

items = to_lower(items)
item_feature = ohe(items)
# print(item_feature)

transactions = transactions.join(item_feature, on='article_id', how='left').sort('t_dat').drop(*features[1:])

dummy_features = transactions.columns[3:]
# print(dummy_features)

user_feature = transactions.groupBy('customer_id').sum(*dummy_features)
# print(user_feature)

                                                                                

In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline

def scale(df, col):
    
    assembler = VectorAssembler(inputCols=df.columns[1:], outputCol="sparse_features")
    feature_vectors = assembler.transform(df).select(*(col, "sparse_features"))

    scaler = StandardScaler(inputCol="sparse_features", outputCol="scaled_features")
    scalerModel = scaler.fit(feature_vectors)
    
    scaled_feature_vectors = scalerModel.transform(feature_vectors).select(*(col, "scaled_features"))
    
    return scaled_feature_vectors


def get_pca(df, col):
    
    pca = PCA(k=100, inputCol="scaled_features", outputCol="pca")
    pcaModel = pca.fit(df)
    
    return pcaModel

scaled_user_feature = scale(user_feature, 'customer_id')
scaled_item_feature = scale(item_feature, 'article_id')

pca_model = get_pca(scaled_user_feature, 'customer_id')
print(pca_model)

user_feature_pca = pca_model.transform(scaled_user_feature)
print(user_feature_pca)

item_feature_pca = pca_model.transform(scaled_item_feature)
print(item_feature_pca)

23/02/27 05:58:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 118:>                                                        (0 + 1) / 1]

23/02/27 05:59:16 WARN DAGScheduler: Broadcasting large task binary with size 1090.9 KiB


                                                                                

23/02/27 05:59:58 WARN DAGScheduler: Broadcasting large task binary with size 1091.5 KiB




23/02/27 06:00:25 WARN DAGScheduler: Broadcasting large task binary with size 1241.9 KiB


[Stage 188:>                                                        (0 + 1) / 1]

23/02/27 06:00:30 WARN DAGScheduler: Broadcasting large task binary with size 1790.3 KiB


[Stage 213:>                                                        (0 + 1) / 1]

23/02/27 06:00:33 WARN DAGScheduler: Broadcasting large task binary with size 1904.1 KiB


[Stage 269:>                                                        (0 + 1) / 1]

23/02/27 06:00:50 WARN DAGScheduler: Broadcasting large task binary with size 1160.5 KiB




23/02/27 06:01:21 WARN DAGScheduler: Broadcasting large task binary with size 1196.2 KiB


[Stage 345:>                                                        (0 + 1) / 1]

23/02/27 06:01:37 WARN DAGScheduler: Broadcasting large task binary with size 1087.8 KiB


                                                                                

23/02/27 06:01:58 WARN DAGScheduler: Broadcasting large task binary with size 1088.5 KiB




23/02/27 06:02:16 WARN DAGScheduler: Broadcasting large task binary with size 1238.8 KiB


[Stage 415:>                                                        (0 + 1) / 1]

23/02/27 06:02:19 WARN DAGScheduler: Broadcasting large task binary with size 1667.8 KiB


                                                                                

23/02/27 06:02:20 WARN DAGScheduler: Broadcasting large task binary with size 1667.8 KiB


                                                                                

23/02/27 06:02:21 WARN DAGScheduler: Broadcasting large task binary with size 1669.8 KiB


                                                                                

23/02/27 06:02:24 WARN DAGScheduler: Broadcasting large task binary with size 1668.2 KiB


                                                                                

23/02/27 06:02:25 WARN DAGScheduler: Broadcasting large task binary with size 1668.9 KiB


                                                                                

23/02/27 06:02:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/02/27 06:02:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
PCAModel: uid=PCA_dbfd5acf29c6, k=100
DataFrame[customer_id: string, scaled_features: vector, pca: vector]
DataFrame[article_id: string, scaled_features: vector, pca: vector]


In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as F

flagged = rcmnds.join(user_feature_pca.withColumn('flag', F.lit(True)), 'customer_id', 'left').fillna(False)
print(flagged)

cold_start = flagged.where('!flag').drop('flag')
print(cold_start)

with_history = flagged.where('flag').drop('flag')
print(with_history)

rows = with_history.collect()
print(rows[0])

def get_rcmnds(customer, k=12):
    brp = BucketedRandomProjectionLSH(inputCol="pca", outputCol="hashes", seed=12345, bucketLength=1.0)
    model = brp.fit(user_feature_pca)
    temp = model.approxNearestNeighbors(item_feature_pca, customer.pca, k).select('article_id').collect()
    return temp

customers = []
items = []
for row in rows:
    temp = get_rcmnds(row)
    customers.append(row[0])
    items.append(' '.join([i[0] for i in temp]))

# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType

# # Định nghĩa hàm user-defined để lấy kết quả tư vấn cho từng khách hàng
# @udf(returnType=StringType())
# def get_rcmnds_udf(customer):
#     brp = BucketedRandomProjectionLSH(inputCol="pca", outputCol="hashes", seed=12345, bucketLength=1.0)
#     model = brp.fit(user_feature_pca)
#     temp = model.approxNearestNeighbors(item_feature_pca, customer.pca, k).select('article_id').collect()
#     return ' '.join([i[0] for i in temp])

# # Tách dữ liệu đầu vào thành các phần nhỏ để xử lý song song
# num_partitions = 4
# partitioned_data = rows.repartition(num_partitions)

# # Sử dụng hàm user-defined và caching để lưu trữ kết quả tính toán
# partitioned_data = partitioned_data.withColumn('items', get_rcmnds_udf(partitioned_data[0])).cache()

# # Thu thập kết quả tính toán và nối chúng lại
# customers = [row[0] for row in partitioned_data.collect()]
# items = [' '.join(row[1].split()) for row in partitioned_data.collect()]


DataFrame[customer_id: string, scaled_features: vector, pca: vector, flag: boolean]
DataFrame[customer_id: string, scaled_features: vector, pca: vector]
DataFrame[customer_id: string, scaled_features: vector, pca: vector]


[Stage 704:>                                                        (0 + 1) / 1]

23/02/27 06:07:54 WARN DAGScheduler: Broadcasting large task binary with size 1090.9 KiB


                                                                                

23/02/27 06:08:21 WARN DAGScheduler: Broadcasting large task binary with size 1091.5 KiB




23/02/27 06:08:42 WARN DAGScheduler: Broadcasting large task binary with size 1241.9 KiB


[Stage 774:>                                                        (0 + 1) / 1]

23/02/27 06:08:46 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

Row(customer_id='000058a12d5b43e67d225668fa1f8d618c13dc232df0cad8ffe7ad4a1091e318', scaled_features=SparseVector(467, {16: 2.3241, 44: 0.9758, 50: 0.6153, 73: 2.609, 100: 0.4532, 101: 1.0403, 108: 0.6199, 119: 1.8555, 169: 3.4127, 192: 1.578, 382: 1.9109, 387: 0.8273, 436: 2.5315, 464: 2.1834}), pca=DenseVector([-1.5512, 0.7372, 1.9728, -2.3521, -2.3177, 0.2438, 1.5743, -1.258, -2.4537, -0.3919, 1.0904, -0.2851, -0.2111, 0.0758, 0.4033, 0.6975, 0.0137, 0.1488, 0.0697, -0.3862, -0.081, -0.149, -0.1779, -0.0541, -0.0647, 0.1924, 0.0005, 0.8418, 0.5, 0.0659, -0.0915, 0.1752, 0.2201, -0.2583, -0.3694, -0.4289, 0.2696, 0.2568, -0.212, -0.1437, 0.0077, 0.2443, 0.0297, 0.0317, 0.1871, -0.3317, -0.3604, -0.3497, -0.5827, 0.0597, 0.1464, 0.0459, -0.4952, 0.1354, 0.2018, 0.1793, 0.0304, -0.2281, -0.0038, 0.4217, -0.2414, 0.0497, -0.3532, -0.6692, -0.1722, -0.0783, 0.326, -0.689, -0.1874, -0.3543, 0.1921, -0.3941, 0.1569, 0.114, -0.5421, 0.0324, -0.6049, -0.9496, 0.1455, -0.5577, 0.406, -0.0216, 

[Stage 830:>                                                        (0 + 1) / 1]

23/02/27 06:09:06 WARN DAGScheduler: Broadcasting large task binary with size 1489.0 KiB


[Stage 881:>                                                        (0 + 1) / 1]

23/02/27 06:10:03 WARN DAGScheduler: Broadcasting large task binary with size 1489.0 KiB


[Stage 902:>                                                        (0 + 2) / 2]

In [None]:
import pandas as pd
with_history_df = pd.DataFrame({'customer_id':customers, 'items':items})

most_freq = transactions.groupBy('article_id').count().sort(col('count').desc()).limit(12).collect()
print(most_freq)

default = [i[0] for i in most_freq]
print(default)



cold_start = cold_start.withColumn('items', lit(' '.join(default))).select(*('customer_id', 'items')).toPandas()
cold_start.head()

all_rcmnds = cold_start.append(with_history_df)
print(all_rcmnds)

evaluator=RegressionEvaluator(metricName="rmse",labelCol="count",predictionCol="prediction")
rmse=evaluator.evaluate(all_rcmnds)
print(rmse)
# all_rcmnds.to_csv('submission.csv', index=False)