In [1]:
!pip install pyspark

Collecting py4j==0.10.9 (from pyspark)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: py4j
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling py4j-0.10.9.7:
      Successfully uninstalled py4j-0.10.9.7
Successfully installed py4j-0.10.9
[0m

In [2]:
!pip install --upgrade py4j

Collecting py4j
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9
    Uninstalling py4j-0.10.9:
      Successfully uninstalled py4j-0.10.9
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pyspark 3.1.3 requires py4j==0.10.9, but you have py4j 0.10.9.7 which is incompatible.[0m[31m
[0mSuccessfully installed py4j-0.10.9.7
[0m

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

sc = SparkSession.builder.appName("Recommendations").config("spark.sql.files.maxPartitionBytes", 5000000).getOrCreate()
spark = SparkSession(sc)



In [4]:
transaction = spark.read.option("header",True) \
              .csv("gs://hnmt/transactions.csv")
transaction.printSchema()

                                                                                

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [5]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
min_date, max_date = transaction.select(min("t_dat"), max("t_dat")).first()
min_date, max_date

                                                                                

('2018-09-20', '2020-09-22')

In this transaction dataset we have 31,788,324 rows and 5 columns.Let's capture first what are the most recently bought articles.For recommendation I am selecting only date 2020-09-22 which is the last transaction date.</h1>

In [6]:

hm =  transaction.withColumn('t_dat', transaction['t_dat'].cast('string'))
hm = hm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
hm = hm.withColumn('year', year(col('date')))
hm = hm.withColumn('month', month(col('date')))
hm = hm.withColumn('day', date_format(col('date'), "d"))

hm = hm[hm['year'] == 2020]
hm = hm[hm['month'] == 9]
hm = hm[hm['day'] == 22]
transaction.unpersist()

# Prepare the dataset
hm = hm.groupby('customer_id', 'article_id').count()
hm.show(5)



+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|82be633d4ca5ea541...| 927172004|    1|
|3ec69c320b6aced81...| 631536021|    1|
|a2be13b3998897084...| 828991003|    1|
|1b2867a6205a7a528...| 894788003|    1|
|d2f89ce024ac8d4fb...| 849738002|    1|
+--------------------+----------+-----+
only showing top 5 rows



                                                                                

In [7]:
print((hm.count(), len(hm.columns)))



(1592, 3)


                                                                                

In [8]:
# Count the total number of article in the dataset
numerator = hm.select("count").count()

# Count the number of distinct customerid and distinct articleid
num_users = hm.select("customer_id").distinct().count()
num_articles = hm.select("article_id").distinct().count()

# Set the denominator equal to the number of customer multiplied by the number of articles
denominator = num_users * num_articles

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("Sparsity: ", "%.2f" % sparsity + "%.")



Sparsity:  99.91%.


                                                                                

In [9]:
userId_count = hm.groupBy("customer_id").count().orderBy('count', ascending=False)
userId_count.show()



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|af189ae7ead49625d...|    5|
|a88ade5b98f2bbd4d...|    3|
|17f39ee02b4dd5207...|    3|
|a3e49519308109be0...|    3|
|b3d8252651bfc39d7...|    3|
|02557f2324a3d792b...|    3|
|3bc6e152ae9934244...|    3|
|82f6ae6750bf405f3...|    3|
|54e8ebd39543b5a4d...|    3|
|cdced65c2f229cc01...|    3|
|c987d6ac4fef3ea1e...|    3|
|25f205769ce0472f2...|    3|
|dc1b173e541f8d3c1...|    3|
|ba06d797232ec40e0...|    3|
|1a42bca32c5816966...|    3|
|77f0df38f68e62211...|    3|
|92713e3aa7d55dd3d...|    3|
|a865b2486c390f9fe...|    3|
|64e77fb36ec90ef3b...|    3|
|7b3787571cdfa173e...|    3|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [10]:
articleId_count = hm.groupBy("article_id").count().orderBy('count', ascending=False)
articleId_count.show()



+----------+-----+
|article_id|count|
+----------+-----+
| 791587001|    6|
| 787946002|    6|
| 573085028|    6|
| 886566001|    6|
| 929275001|    6|
| 866731001|    6|
| 915529005|    5|
| 898692006|    5|
| 915529003|    5|
| 714790020|    5|
| 930380001|    5|
| 924243002|    5|
| 923340001|    4|
| 905957007|    4|
| 896169002|    4|
| 714790028|    4|
| 863583001|    4|
| 788575002|    4|
| 903420001|    4|
| 909014001|    4|
+----------+-----+
only showing top 20 rows



                                                                                

In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [12]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(hm.columns)-set(['count'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(hm).transform(hm)
transformed.show()



+--------------------+----------+-----+----------------+-----------------+
|         customer_id|article_id|count|article_id_index|customer_id_index|
+--------------------+----------+-----+----------------+-----------------+
|82be633d4ca5ea541...| 927172004|    1|          1180.0|            810.0|
|3ec69c320b6aced81...| 631536021|    1|           327.0|            490.0|
|a2be13b3998897084...| 828991003|    1|           136.0|            973.0|
|1b2867a6205a7a528...| 894788003|    1|           951.0|            292.0|
|d2f89ce024ac8d4fb...| 849738002|    1|           707.0|            130.0|
|6350568da716f096e...| 448509014|    1|            81.0|             69.0|
|91fdfb07f31100dd6...| 842360001|    1|           692.0|            896.0|
|0003e867a930d0d68...| 827487003|    1|           642.0|            151.0|
|2f03d37985cfef3cf...| 908584001|    1|           203.0|            393.0|
|b49647f84a99ced53...| 905660002|    1|          1027.0|           1044.0|
|5252a97cea20c3f4d...| 79

                                                                                

In [13]:
(training,test)=transformed.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


#create ALS model
als=ALS(userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)

#tune model using ParamGridBuilder
param_grid = ParamGridBuilder()\
            .addGrid(als.rank, [15,20,25])\
            .addGrid(als.maxIter,[5,10,15])\
            .addGrid(als.regParam,[0.09,0.14,0.19])\
            .build()
#define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = "rmse",labelCol = 'count', predictionCol = 'prediction')

#Build cross validation using CrossValidator
cv = CrossValidator(estimator=als,estimatorParamMaps=param_grid, evaluator=evaluator,numFolds=3)


#Fit ALS model to training data
model = cv.fit(training)

                                                                                

In [None]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)


In [None]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

#Generate predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

                                                                                

In [None]:
#print evaluation metrics and model parameters
print("RMSE =" + str(rmse))
print("**Best Model**")
print("Rank : {}".format(best_model.rank))
print("MaxIter: {}".format(best_model._java_obj.parent().getMaxIter()))
print("RegParam: {}".format(best_model._java_obj.parent().getRegParam()))

RMSE =0.6909940669462347
**Best Model**
Rank : 25
MaxIter: 5
RegParam: 0.09


In [None]:
pip install --upgrade pyspark

Collecting pyspark
  Using cached pyspark-3.4.0-py2.py3-none-any.whl
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.1.3
    Can't uninstall 'pyspark'. No files were found to uninstall.
Successfully installed pyspark-3.4.0
[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
pip install --upgrade py4j

[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("my-app") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


In [None]:
user_recs = best_model.recommendForAllItems(10)
user_recs.show(10)



+----------------+--------------------+
|article_id_index|     recommendations|
+----------------+--------------------+
|               1|[{545, 1.8978157}...|
|              12|[{545, 1.3193089}...|
|              22|[{1417, 0.9236878...|
|              26|[{8, 0.9273679}, ...|
|              27|[{75, 0.9286343},...|
|              28|[{68, 0.92859066}...|
|              31|[{36, 0.92532855}...|
|              34|[{1288, 0.9250935...|
|              44|[{79, 0.9222984},...|
|              47|[{545, 0.9665778}...|
+----------------+--------------------+
only showing top 10 rows



                                                                                

In [None]:
df_recom = best_model.recommendForAllUsers(10)
df_recom.show(10)



+-----------------+--------------------+
|customer_id_index|     recommendations|
+-----------------+--------------------+
|                1|[{1135, 0.9607761...|
|               12|[{377, 0.92105657...|
|               22|[{1100, 0.947925}...|
|               26|[{825, 0.9317713}...|
|               27|[{1192, 0.97896},...|
|               28|[{241, 0.92162794...|
|               31|[{1135, 1.9058945...|
|               34|[{659, 0.94389564...|
|               44|[{435, 0.9251446}...|
|               47|[{558, 1.1301042}...|
+-----------------+--------------------+
only showing top 10 rows



                                                                                

In [None]:
df_recom = df_recom.select("customer_id_index","recommendations.article_id_index")
df_recom.show(10)
df_recom = df_recom.toPandas()

                                                                                

+-----------------+--------------------+
|customer_id_index|    article_id_index|
+-----------------+--------------------+
|                1|[1135, 1192, 659,...|
|               12|[644, 865, 377, 8...|
|               22|[1100, 252, 82, 5...|
|               26|[825, 52, 987, 11...|
|               27|[1192, 825, 1135,...|
|               28|[241, 1135, 1192,...|
|               31|[1135, 11, 825, 6...|
|               34|[659, 215, 300, 1...|
|               44|[435, 699, 89, 36...|
|               47|[558, 1219, 1100,...|
+-----------------+--------------------+
only showing top 10 rows



                                                                                

In [None]:
df_recom.sort_values('customer_id_index')

Unnamed: 0,customer_id_index,article_id_index
861,0,"[129, 1132, 751, 638, 1135, 222, 1, 188, 313, ..."
0,1,"[1135, 1192, 659, 259, 267, 825, 558, 170, 19,..."
862,2,"[3, 715, 27, 6, 994, 393, 35, 214, 479, 337]"
288,3,"[659, 1135, 701, 633, 1101, 124, 561, 392, 0, ..."
577,4,"[441, 957, 558, 368, 104, 237, 938, 737, 599, ..."
...,...,...
860,1414,"[1192, 1209, 825, 1, 1033, 217, 50, 12, 1110, ..."
285,1415,"[659, 1074, 1135, 671, 1100, 145, 183, 546, 22..."
1139,1416,"[1192, 586, 659, 1135, 825, 758, 1143, 532, 38..."
286,1417,"[22, 1192, 1135, 659, 1210, 993, 164, 825, 110..."
