In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

sc = SparkSession.builder.appName("Recommendations").config("spark.sql.files.maxPartitionBytes", 5000000).getOrCreate()
spark = SparkSession(sc)

In [3]:
transaction = spark.read.option("header",True) \
              .csv("transactions_train.csv")
transaction.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [4]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
min_date, max_date = transaction.select(min("t_dat"), max("t_dat")).first()
min_date, max_date

('2018-09-20', '2020-09-22')

In [5]:
hm =  transaction.withColumn('t_dat', transaction['t_dat'].cast('string'))
hm = hm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
hm = hm.withColumn('year', year(col('date')))
hm = hm.withColumn('month', month(col('date')))
hm = hm.withColumn('day', date_format(col('date'), "d"))

hm = hm[hm['year'] == 2020]
hm = hm[hm['month'] == 9]
hm = hm[hm['day'] == 22]
transaction.unpersist()

# Prepare the dataset
hm = hm.groupby('customer_id', 'article_id').count()
hm.show()

+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|00f7bc5c0df4c615b...|0780418013|    1|
|02094817e46f3b692...|0791587001|    1|
|0333e5dda0257e9f4...|0839332002|    2|
|07c7a1172caf8fb97...|0573085043|    1|
|081373184e601470c...|0714790020|    1|
|09bec2a61046ccbea...|0860336002|    1|
|0be4f1ecce204ee32...|0573085028|    1|
|0c4b30343292b5101...|0918522001|    1|
|0e10e02358875468b...|0579541001|    1|
|0fc371e67e61a31d7...|0907170001|    1|
|10817b19177f6a53e...|0718278019|    1|
|10ac90988da6052dd...|0934212003|    1|
|14a298482fa7f9d52...|0894353002|    1|
|14f4b1b17991c32d2...|0803685001|    1|
|1601fa3c3f39aa623...|0730683001|    1|
|164e1a251f0e3d764...|0831267001|    1|
|165d2c0b0128d5619...|0909081004|    1|
|166546028742fe655...|0767423013|    1|
|189b7275c513a84c8...|0877711001|    1|
|1918933afff08e955...|0914672001|    1|
+--------------------+----------+-----+
only showing top 20 rows



In [5]:
print((hm.count(), len(hm.columns)))

(29486, 3)


In [6]:
# Count the total number of article count in the dataset
numerator = hm.select("count").count()

# Count the number of distinct customerid and distinct articleid
num_users = hm.select("customer_id").distinct().count()
num_articles = hm.select("article_id").distinct().count()

# Set the denominator equal to the number of customer multiplied by the number of articles
denominator = num_users * num_articles

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("Sparsity: ", "%.2f" % sparsity + "%.")

Sparsity:  99.96%.


In [7]:
userId_count = hm.groupBy("customer_id").count().orderBy('count', ascending=False)
userId_count.show()

+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|30b6056bacc5f5c9d...|   28|
|5e8fb4d457fdffc61...|   28|
|dc1b173e541f8d3c1...|   27|
|6335d496ef463bc40...|   25|
|1796e87fd2e88932b...|   25|
|f50287d9cf052d4b4...|   24|
|54e8ebd39543b5a4d...|   23|
|fd5ce8716faf00f6a...|   23|
|850ec77661a417d27...|   22|
|ad3663a848dccbdda...|   21|
|32f3a6a7ce63d302c...|   21|
|b606fe5786c00151a...|   21|
|298523b6637340717...|   21|
|b49647f84a99ced53...|   21|
|fc783381f1ea2174c...|   21|
|a08e284bb18add2d7...|   21|
|383e1b07e2c1fe169...|   21|
|3ca77aab50ae4532b...|   20|
|2a721767cd9864ed5...|   20|
|af5166e0f89b0d433...|   19|
+--------------------+-----+
only showing top 20 rows



In [8]:
articleId_count = hm.groupBy("article_id").count().orderBy('count', ascending=False)
articleId_count.show()

+----------+-----+
|article_id|count|
+----------+-----+
|0924243002|   91|
|0918522001|   88|
|0866731001|   78|
|0751471001|   75|
|0448509014|   73|
|0714790020|   72|
|0762846027|   68|
|0928206001|   67|
|0893432002|   66|
|0918292001|   65|
|0915529005|   64|
|0788575004|   63|
|0915529003|   63|
|0863583001|   60|
|0930380001|   59|
|0573085028|   59|
|0919273002|   58|
|0850917001|   57|
|0573085042|   56|
|0874110016|   53|
+----------+-----+
only showing top 20 rows



In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [10]:
#Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(hm.columns)-set(['count'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(hm).transform(hm)
transformed.show()

+--------------------+----------+-----+-----------------+----------------+
|         customer_id|article_id|count|customer_id_index|article_id_index|
+--------------------+----------+-----+-----------------+----------------+
|00f7bc5c0df4c615b...|0780418013|    1|            783.0|          2237.0|
|02094817e46f3b692...|0791587001|    1|            785.0|            35.0|
|0333e5dda0257e9f4...|0839332002|    2|           4098.0|           732.0|
|07c7a1172caf8fb97...|0573085043|    1|           1702.0|            44.0|
|081373184e601470c...|0714790020|    1|           4146.0|             5.0|
|09bec2a61046ccbea...|0860336002|    1|           6792.0|          2368.0|
|0be4f1ecce204ee32...|0573085028|    1|            799.0|            14.0|
|0c4b30343292b5101...|0918522001|    1|           6825.0|             1.0|
|0e10e02358875468b...|0579541001|    1|           2689.0|            53.0|
|0fc371e67e61a31d7...|0907170001|    1|           1737.0|          1978.0|
|10817b19177f6a53e...|071

In [11]:
(training,test)=transformed.randomSplit([0.8, 0.2])

In [12]:
#Creating ALS model and fitting data
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

In [13]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="count",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)


In [14]:
print("RMSE="+str(rmse))

RMSE=0.47922604279966746


In [15]:
predictions.show()

+--------------------+----------+-----+-----------------+----------------+----------+
|         customer_id|article_id|count|customer_id_index|article_id_index|prediction|
+--------------------+----------+-----+-----------------+----------------+----------+
|1f4d4f43ace92c96f...|0902419001|    1|            148.0|           280.0| 1.1043952|
|1f4d4f43ace92c96f...|0910448001|    1|            148.0|          2605.0|0.82738125|
|1f4d4f43ace92c96f...|0922037001|    1|            148.0|          1624.0| 0.8225643|
|47b77e73dd60193bc...|0826498001|    1|           1959.0|          1182.0|0.93602675|
|6e31154ec977d9074...|0579541072|    1|           1342.0|          1365.0|0.78117466|
|6e31154ec977d9074...|0898350001|    1|           1342.0|          2537.0| 0.5135857|
|c61f3119ea64671ca...|0875217002|    1|            496.0|           874.0| 0.7637671|
|f33b01d62c213df7e...|0598755001|    1|           3997.0|           522.0| 0.7098505|
|0066eb74327937182...|0850917001|    1|            540

In [16]:
#Providing Recommendations by Article id
user_recs=model.recommendForAllItems(10).show(10)



+----------------+--------------------+
|article_id_index|     recommendations|
+----------------+--------------------+
|               1|[{4907, 4.023111}...|
|              12|[{4907, 5.1407924...|
|              13|[{4907, 4.8948035...|
|              22|[{4907, 4.967992}...|
|              26|[{4907, 5.1711617...|
|              27|[{9001, 4.4441276...|
|              28|[{4907, 4.238902}...|
|              31|[{4907, 5.0039315...|
|              34|[{9001, 5.5608296...|
|              44|[{9001, 4.0315247...|
+----------------+--------------------+
only showing top 10 rows



In [17]:
#Providing Recommendations by Customer id
item_recs=model.recommendForAllUsers(10).show(10)

+-----------------+--------------------+
|customer_id_index|     recommendations|
+-----------------+--------------------+
|                1|[{6383, 3.007121}...|
|               12|[{1891, 3.2939425...|
|               13|[{1661, 3.1329327...|
|               22|[{5040, 5.125327}...|
|               26|[{4146, 2.883401}...|
|               27|[{4146, 2.9115942...|
|               28|[{4146, 2.9942784...|
|               31|[{4146, 2.6066234...|
|               34|[{6383, 2.5930874...|
|               44|[{5040, 2.973228}...|
+-----------------+--------------------+
only showing top 10 rows



In [18]:
%%time
userRecsDf = model.recommendForAllUsers(10).cache()
userRecsDf.count()

Wall time: 3min 47s


9656

In [19]:
userRecsDf.printSchema()

root
 |-- customer_id_index: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- article_id_index: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [20]:
userRecsDf.select("customer_id_index","recommendations.article_id_index").show(10,False)

+-----------------+------------------------------------------------------------+
|customer_id_index|article_id_index                                            |
+-----------------+------------------------------------------------------------+
|1580             |[4146, 6383, 5040, 1891, 1661, 3018, 4405, 1035, 5111, 1221]|
|5300             |[4146, 1891, 1661, 3018, 5040, 6383, 4405, 3870, 5891, 1221]|
|6620             |[5040, 6383, 1891, 1661, 3018, 4146, 3870, 5111, 4405, 1035]|
|7340             |[4146, 1661, 3018, 1891, 6383, 5040, 4405, 5111, 1035, 3031]|
|7880             |[3013, 4146, 7828, 6874, 1661, 354, 7398, 2126, 2979, 1221] |
|9900             |[1891, 5040, 6383, 7093, 7092, 2151, 2917, 690, 1661, 294]  |
|471              |[5040, 6383, 1891, 1661, 3018, 4146, 3870, 2511, 4910, 3869]|
|1591             |[5040, 6383, 4146, 1661, 1891, 3018, 4405, 2511, 1035, 3870]|
|4101             |[1891, 3013, 6383, 5040, 6874, 3018, 1035, 4862, 7707, 3750]|
|1342             |[5040, 63

In [21]:
import gc #This is to free up the memory
gc.collect()

430

In [22]:
#Converting back to string form
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pandas as pd
recs=model.recommendForAllUsers(10).toPandas()
nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['customer_id_index'], value_name = "recommendations") \
            .drop("variable", axis = 1) \
            .dropna() 
nrecs=nrecs.sort_values('customer_id_index')
nrecs=pd.concat([nrecs['recommendations'].apply(pd.Series), nrecs['customer_id_index']], axis = 1)


In [23]:
nrecs.columns = ['ArticleID_index','count','UserID_index']
md=transformed.select(transformed['article_id'],transformed['article_id_index'],transformed['customer_id'],transformed['customer_id_index'])
md=md.toPandas()


In [24]:
dict1 =dict(zip(md['article_id_index'],md['article_id']))
dict2=dict(zip(md['customer_id_index'],md['customer_id']))
nrecs['article_id']=nrecs['ArticleID_index'].map(dict1)
nrecs['customer_id']=nrecs['UserID_index'].map(dict2)

In [25]:
nrecs=nrecs.sort_values('customer_id')
nrecs.reset_index(drop=True, inplace=True)
new=nrecs[['customer_id','article_id','count']]
new['recommendations'] = list(new.article_id)
res=new[['customer_id','recommendations']]  
res_new=res['recommendations'].groupby([res.customer_id]).apply(list).reset_index()
#print(res_new)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  new['recommendations'] = list(new.article_id)


In [26]:
#Final result Recommendations by Customer id
res_new[:2000]

Unnamed: 0,customer_id,recommendations
0,0003e867a930d0d6842f923d6ba7c9b77aba33fe2a0fbf...,"[0742561003, 0857347002, 0750330003, 087163800..."
1,000525e3fe01600d717da8423643a8303390a055c578ed...,"[0757971006, 0871638002, 0316441001, 085734700..."
2,0010e8eb18f131e724d6997909af0808adbba057529edb...,"[0872901005, 0750481010, 0316441001, 057104800..."
3,001436e2c83cda28548dd668cfc7d621d70d2baf6f6cf0...,"[0316441001, 0516000087, 0297078008, 074256100..."
4,0026ebdd70715d8fa2befa14dfed317a1ffe5451aba839...,"[0872901005, 0750481010, 0857347002, 029707800..."
...,...,...
1995,3372b6226d27ab39d62a1f31e69e761b34f1684f886f93...,"[0894481001, 0571048002, 0742561003, 075797100..."
1996,3376dc15b643b7294a24fd4cc31f75fd9fb6811cb1dfb2...,"[0903735002, 0883808001, 0757971006, 087290100..."
1997,337780f0c7153a7ca8cf56acc6c86e041d6e983526830d...,"[0883808001, 0297078008, 0857347002, 087163800..."
1998,33848752d4704c79d4b9a11acf939fd8cf6b39db5ea4e3...,"[0883808001, 0571048002, 0297078008, 031644100..."
