In [2]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, TimestampType

In [3]:
! ls -la

total 1115132
drwxrwxr-x  6 big big      4096 May 15 21:12 .
drwxr-xr-x 33 big big      4096 May 12 14:59 ..
-rw-rw-r--  1 big big       662 May 15 17:03 derby.log
drwxrwxr-x  8 big big      4096 May 15 21:05 .git
-rw-rw-r--  1 big big         6 May  8 20:47 .gitignore
drwxrwxr-x  2 big big      4096 May 11 21:44 .ipynb_checkpoints
-rw-rw-r--  1 big big 570895494 May  8 17:13 local_part.csv
-rw-r--r--  1 big big 171002105 May 11 21:13 local_test.csv
-rw-r--r--  1 big big 399893389 May 11 21:13 local_train.csv
drwxrwxr-x  3 big big      4096 May 15 20:46 metastore_db
-rw-rw-r--  1 big big      3716 May 12 22:05 Preare_data.ipynb
-rw-rw-r--  1 big big     48031 May 15 21:12 Recommender.ipynb
drwxr-xr-x  3 big big      4096 May 15 20:08 spark-warehouse


In [4]:
# all field types reference
#__all__ = [
#   "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
#   "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
#   "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"]

In [5]:
schema = StructType([ \
    StructField("event_time",TimestampType(),True),\
    StructField("event_type",StringType(),False),\
    StructField("product_id",IntegerType(),True),\
    StructField("category_id", LongType(), True),\
    StructField("category_code", StringType(), True),\
    StructField("brand", StringType(), True),\
    StructField("price", DoubleType(), True),\
    StructField("user_id", IntegerType(), True),\
    StructField("user_session", StringType(), True),\
  ])

In [6]:
df=spark.read.csv('local_part.csv', schema=schema, inferSchema=True) # Read the dataset

In [7]:
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [8]:
df.show()

+-------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|    brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+
|2019-10-01 01:00:01|      view|   1307067|2053013558920217191|  computers.notebook|   lenovo|251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 01:00:20|      view|   1003306|2053013555631882655|electronics.smart...|    apple|588.77|555446831|6ec635da-ea15-4a5...|
|2019-10-01 01:00:25|      view|  27500014|2053013554692358509|                null|  redmond| 37.98|555217733|74d40a28-41f9-432...|
|2019-10-01 01:00:31|      view|  28718079|2053013565362668491|  apparel.shoes.keds|  respect| 66.67|545323115|75fb5d0c-e907-429...|
|2019-10-01 01:00:31|      view|   3900746|2053013552326770905|applia

In [9]:
# Users events count
df.groupBy('user_id').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|user_id  |count|
+---------+-----+
|512475445|806  |
|512365995|421  |
|526731152|306  |
|513021392|293  |
|512505687|283  |
|546270188|258  |
|546159478|257  |
|516308435|253  |
|514649263|240  |
|551211823|238  |
+---------+-----+
only showing top 10 rows



In [10]:
df.filter(df.user_id == 512475445).groupBy('event_type').count().orderBy('count',ascending=False).show(10,False)

+----------+-----+
|event_type|count|
+----------+-----+
|view      |806  |
+----------+-----+



In [11]:
# brands events count
df.groupBy('brand').count().orderBy('count',ascending=False).show(10,False)

+-------+------+
|brand  |count |
+-------+------+
|null   |610994|
|samsung|527583|
|apple  |411810|
|xiaomi |308436|
|huawei |110888|
|lucente|65673 |
|lg     |56602 |
|bosch  |56118 |
|oppo   |48640 |
|sony   |45535 |
+-------+------+
only showing top 10 rows



In [12]:
# events count
df.groupBy('event_type').count().orderBy('count',ascending=False).show(10,False)

+----------+-------+
|event_type|count  |
+----------+-------+
|view      |4078750|
|cart      |92702  |
|purchase  |74196  |
+----------+-------+



In [13]:
# convert event_type to raiting such us purchase = 1, view and chart = 0
# so if user bought item, he "rated" it
dictionary = {"purchase": "1", "view": "0", "cart": "0"}
df2 = df.na.replace(dictionary,"event_type")
#df2.show()

# cast to integer
df_data = df2.select(df2["event_type"].cast(IntegerType()), df2["user_id"], df2["product_id"]) 
df_data.show()



+----------+---------+----------+
|event_type|  user_id|product_id|
+----------+---------+----------+
|         0|550050854|   1307067|
|         0|555446831|   1003306|
|         0|555217733|  27500014|
|         0|545323115|  28718079|
|         0|555444559|   3900746|
|         0|515454339|  12712064|
|         0|551377651|   1003141|
|         0|519885473|   4100126|
|         0|555447577|  28717211|
|         0|512558158|   1004659|
|         0|525856698|  26500144|
|         0|544648245|   4300070|
|         0|513457407|   1004792|
|         0|550050854|   1306631|
|         0|514336739|   1004321|
|         0|537918940|   1004545|
|         0|555447570|  28715758|
|         0|519885473|   4100274|
|         0|516896785|   1004792|
|         0|555447748|  26201000|
+----------+---------+----------+
only showing top 20 rows



In [14]:
# let's check
df_data.groupBy('event_type').count().orderBy('count',ascending=False).show(10,False)

+----------+-------+
|event_type|count  |
+----------+-------+
|0         |4171452|
|1         |74196  |
+----------+-------+



In [52]:
# number of unic users
df_data.groupBy('user_id').count().distinct().count()


1390837

In [15]:
# splitting dataset
train,test=df_data.randomSplit(weights = [0.8,0.2], seed = 10)

In [16]:
print(train.count())
print(test.count())

3395720
849928


In [17]:
# set up a model
recomend=ALS(\
    maxIter=3,\
    regParam=0.5,\
    implicitPrefs=True,\
    alpha=0.9,\
    rank=8,\
    userCol='user_id',\
    itemCol='product_id',\
    ratingCol='event_type',\
    nonnegative=True,\
    coldStartStrategy="drop"\
)

In [18]:
# train the model
recomend_model=recomend.fit(train)

In [19]:
# make predictions on test
predicted_ratings=recomend_model.transform(test)

In [20]:
# predicted_ratings.printSchema()

In [21]:
# show some predictions
predicted_ratings.filter(predicted_ratings.event_type == 1).orderBy(rand()).show(100)

+----------+---------+----------+------------+
|event_type|  user_id|product_id|  prediction|
+----------+---------+----------+------------+
|         1|512558706|   1005159|         0.0|
|         1|515242739|  18600019|         0.0|
|         1|519267944|   1005140|3.2751838E-4|
|         1|559524022|   1002524|         0.0|
|         1|556741155|   4804055| 0.001007902|
|         1|560775426|   1005238|         0.0|
|         1|562809255|   1004756|         0.0|
|         1|525020216|   1005100|         0.0|
|         1|557033537|  24600473|         0.0|
|         1|512597857|  13201074|2.1164446E-6|
|         1|551028149|   2702628|         0.0|
|         1|513894001|  12703493|         0.0|
|         1|512968819|   3600231|         0.0|
|         1|532099282|  11200192|         0.0|
|         1|514365999|   1306359|         0.0|
|         1|538708068|  26200589|         0.0|
|         1|517242367|   4804056|         0.0|
|         1|514202136|  28300545|         0.0|
|         1|5

In [22]:
# set up evaluator with Root Mean Square Error
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='event_type')

In [23]:
rmse=evaluator.evaluate(predicted_ratings.filter(predicted_ratings.event_type == 1))

# implicit 0.13331617684682437
print(rmse)

0.9996355583647871


In [24]:
rmse=evaluator.evaluate(predicted_ratings.filter(predicted_ratings.event_type == 0))

In [25]:
print(rmse)

0.0005144584109137656


In [26]:
evaluator.evaluate(predicted_ratings)

0.13332779932204264

In [30]:
# save the ALS model for further use if required

modelpath = "ALSmodel"
recomend_model.write().overwrite().save(modelpath)

In [31]:
! ls -la

total 1117096
drwxrwxr-x  7 big big      4096 May 21 10:17 .
drwxr-xr-x 33 big big      4096 May 12 14:59 ..
drwxr-xr-x  5 big big      4096 May 21 10:17 ALSmodel
-rw-rw-r--  1 big big   2003444 May 21 10:05 derby.log
drwxrwxr-x  8 big big      4096 May 15 21:05 .git
-rw-rw-r--  1 big big         6 May  8 20:47 .gitignore
drwxrwxr-x  2 big big      4096 May 11 21:44 .ipynb_checkpoints
-rw-rw-r--  1 big big 570895494 May  8 17:13 local_part.csv
-rw-r--r--  1 big big 171002105 May 11 21:13 local_test.csv
-rw-r--r--  1 big big 399893389 May 11 21:13 local_train.csv
drwxrwxr-x  3 big big      4096 May 15 20:46 metastore_db
-rw-rw-r--  1 big big      3716 May 12 22:05 Preare_data.ipynb
-rw-rw-r--  1 big big     52950 May 21 10:06 Recommender.ipynb
drwxr-xr-x  3 big big      4096 May 15 20:08 spark-warehouse


In [27]:
users = train.select(recomend.getUserCol()).distinct().limit(3)
# !!! time consuming operation !!!
userSubsetRecs = recomend_model.recommendForUserSubset(users, 2)

In [28]:
# !!! time consuming operation !!!
userSubsetRecs.show(10, truncate=False)

+---------+--------------------------------+
|user_id  |recommendations                 |
+---------+--------------------------------+
|445162060|[[1002100, 0.0], [1002460, 0.0]]|
|369454898|[[1002100, 0.0], [1002460, 0.0]]|
|420412138|[[1002100, 0.0], [1002460, 0.0]]|
+---------+--------------------------------+



In [32]:
outputUserSubsetRecs = "userSubsetRecs.parquet"

userSubsetRecs.write.mode("overwrite").parquet(outputUserSubsetRecs)

In [33]:
! ls -la

total 1117104
drwxrwxr-x  8 big big      4096 May 21 10:26 .
drwxr-xr-x 33 big big      4096 May 12 14:59 ..
drwxr-xr-x  5 big big      4096 May 21 10:17 ALSmodel
-rw-rw-r--  1 big big   2003444 May 21 10:05 derby.log
drwxrwxr-x  8 big big      4096 May 15 21:05 .git
-rw-rw-r--  1 big big         6 May  8 20:47 .gitignore
drwxrwxr-x  2 big big      4096 May 11 21:44 .ipynb_checkpoints
-rw-rw-r--  1 big big 570895494 May  8 17:13 local_part.csv
-rw-r--r--  1 big big 171002105 May 11 21:13 local_test.csv
-rw-r--r--  1 big big 399893389 May 11 21:13 local_train.csv
drwxrwxr-x  3 big big      4096 May 15 20:46 metastore_db
-rw-rw-r--  1 big big      3716 May 12 22:05 Preare_data.ipynb
-rw-rw-r--  1 big big     54942 May 21 10:26 Recommender.ipynb
drwxr-xr-x  3 big big      4096 May 15 20:08 spark-warehouse
drwxr-xr-x  2 big big      4096 May 21 10:21 userSubsetRecs.parquet


In [None]:
# userSubsetRecs.write.mode("overwrite").saveAsTable("UserSubsetRecommendationsTable")

In [106]:
# !!! time consuming operation !!!

# followed code need to perform on AWS EMR cluster --------------------------------
userRecs = recomend_model.recommendForAllUsers(10)
# itemRecs = model.recommendForAllItems(10)

In [107]:
# !!! time consuming operation !!!
userRecs.show(10, truncate=False)

+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id  |recommendations                                                                                                                                                           |
+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|240522111|[[12707130, 0.0], [12707140, 0.0], [12707250, 0.0], [12707290, 0.0], [12707300, 0.0], [12707310, 0.0], [12707320, 0.0], [12707340, 0.0], [12707370, 0.0], [12707380, 0.0]]|
|303418896|[[12707130, 0.0], [12707140, 0.0], [12707250, 0.0], [12707290, 0.0], [12707300, 0.0], [12707310, 0.0], [12707320, 0.0], [12707340, 0.0], [12707370, 0.0], [12707380, 0.0]]|
|357446328|[[12707130, 0.0], [12707140, 0.0], [12707250, 0.0], [12707290, 0.0], [1270

In [110]:
# !!! time consuming operation !!!
userRecs.write.mode("overwrite").saveAsTable("UserRecommendationsTable")
# after saving tables on AWS, need to come back and make recommendations in notebook ------------------

KeyboardInterrupt: 

In [35]:
! ls -la

total 1117104
drwxrwxr-x  8 big big      4096 May 21 10:50 .
drwxr-xr-x 33 big big      4096 May 12 14:59 ..
drwxr-xr-x  5 big big      4096 May 21 10:17 ALSmodel
-rw-rw-r--  1 big big   2003444 May 21 10:05 derby.log
drwxrwxr-x  8 big big      4096 May 15 21:05 .git
-rw-rw-r--  1 big big         6 May  8 20:47 .gitignore
drwxrwxr-x  2 big big      4096 May 11 21:44 .ipynb_checkpoints
-rw-rw-r--  1 big big 570895494 May  8 17:13 local_part.csv
-rw-r--r--  1 big big 171002105 May 11 21:13 local_test.csv
-rw-r--r--  1 big big 399893389 May 11 21:13 local_train.csv
drwxrwxr-x  3 big big      4096 May 15 20:46 metastore_db
-rw-rw-r--  1 big big      3716 May 12 22:05 Preare_data.ipynb
-rw-rw-r--  1 big big     56641 May 21 10:50 Recommender.ipynb
drwxr-xr-x  3 big big      4096 May 15 20:08 spark-warehouse
drwxr-xr-x  2 big big      4096 May 21 10:21 userSubsetRecs.parquet


In [36]:
dfUserSubsetRec = spark.read.load(outputUserSubsetRecs)

In [39]:
dfUserSubsetRec.show()

+---------+--------------------+
|  user_id|     recommendations|
+---------+--------------------+
|445162060|[[1002100, 0.0], ...|
|369454898|[[1002100, 0.0], ...|
|420412138|[[1002100, 0.0], ...|
+---------+--------------------+



In [42]:
dfUserSubsetRec.distinct().count()

3

In [41]:
# show recommendations for user
user_id=445162060
dfUserSubsetRec.filter(dfUserSubsetRec.user_id == user_id).show(1, truncate=False)

+---------+--------------------------------+
|user_id  |recommendations                 |
+---------+--------------------------------+
|445162060|[[1002100, 0.0], [1002460, 0.0]]|
+---------+--------------------------------+



In [74]:
# TODO delete all below ====================================================================


# number of unic products as items
unique_items=df_data.select('product_id').selectExpr("product_id as product_id_1").distinct()
print(unique_items.count())

125057


In [86]:
# user id for making recommendations
user_id=531210799

In [87]:
# number of products user interact
purchased_items=df_data.filter(df_data['user_id'] ==  user_id).select('product_id').distinct()  

In [88]:
print(purchased_items.count())

2


In [89]:
# join purchaised and all items
total_items = unique_items.join(\
                            purchased_items,\
                            unique_items.product_id_1 == purchased_items.product_id,\
                            how='left'\
                            )
print(total_items.count())

125057


In [90]:
total_items.show(10,False)

+------------+----------+
|product_id_1|product_id|
+------------+----------+
|1003938     |null      |
|1004666     |null      |
|1004739     |null      |
|1005158     |null      |
|1201512     |null      |
|1305803     |null      |
|1306176     |null      |
|1307005     |null      |
|1307184     |null      |
|1307463     |null      |
+------------+----------+
only showing top 10 rows



In [91]:
# select items that user didnt purchaise
remaining_items=total_items.where(col("product_id").isNull()).select(unique_items.product_id_1).distinct()
print(remaining_items.count())

125055


In [92]:
# add user_id col
remaining_items=remaining_items.\
    withColumn("user_id",lit(int(user_id))).\
    selectExpr("product_id_1 as product_id","user_id as user_id")

In [93]:
remaining_items.show(10,False)

+----------+---------+
|product_id|user_id  |
+----------+---------+
|1003938   |531210799|
|1004666   |531210799|
|1004739   |531210799|
|1005158   |531210799|
|1201512   |531210799|
|1305803   |531210799|
|1306176   |531210799|
|1307005   |531210799|
|1307184   |531210799|
|1307463   |531210799|
+----------+---------+
only showing top 10 rows



In [94]:
# get the predictions
predictions = recomend_model.transform(remaining_items).collect()

In [95]:
recommendations=recomend_model.transform(remaining_items).orderBy('prediction',ascending=False)

In [96]:
recommendations.show(100)

+----------+---------+------------+
|product_id|  user_id|  prediction|
+----------+---------+------------+
|   1004767|531210799|0.0066990647|
|   1004833|531210799|0.0036313545|
|   1005100|531210799| 0.002678161|
|   1004870|531210799|0.0024105322|
|   1004659|531210799| 0.002405511|
|   1004249|531210799| 0.002393179|
|   1002544|531210799|0.0023075289|
|   1004258|531210799|0.0021013005|
|   1002524|531210799|0.0020997336|
|   1004857|531210799| 0.002052007|
|   1004739|531210799|0.0020427264|
|   1004873|531210799|0.0019401289|
|   1005132|531210799|0.0018956043|
|   1005115|531210799|0.0018012377|
|   4804055|531210799|0.0017952044|
|   8700219|531210799|0.0017400818|
|   4804056|531210799|0.0017235356|
|  26300626|531210799|0.0016977447|
|   1005098|531210799|0.0016694906|
|   1004250|531210799|0.0016502171|
|   1004858|531210799|0.0016070176|
|   1005160|531210799|0.0016009217|
|   3901175|531210799|0.0015766728|
|   1004838|531210799|0.0015733948|
|   4803791|531210799|0.0015