In [None]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
import findspark
findspark.init()

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
[33m0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Connec[0m                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
[33m0% [Connecting to archive.ubuntu.com] [1 InRelease 43.1 kB/88.7 kB 49%] [Connec[0m[33m0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [1 InRelease 5[0m[33m0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to[0m                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)[0m                                                                               Get:4 http://ppa.launchpad.net/c

In [None]:
from google.colab import drive
drive.mount("/content/gdrive", force_remount = True)

Mounted at /content/gdrive


In [None]:
%cd '/content/gdrive/My Drive/LDS9_K265_DinhVietHa/LDS9_K265_DinhVietHa_Cuoi_ky/'

/content/gdrive/My Drive/LDS9_K265_DinhVietHa/LDS9_K265_DinhVietHa_Cuoi_ky


In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [None]:
spark = SparkSession \
  .builder \
  .master("local[*]")\
  .appName("amazon_office_product") \
  .config("spark.memory.fraction", 0.8) \
  .config("spark.executor.memory", "10g") \
  .config("spark.driver.memory", "10g")\
  .config("spark.sql.shuffle.partitions" , "800") \
  .config("spark.memory.offHeap.enabled",'true')\
  .config("spark.memory.offHeap.size","10g")\
  .getOrCreate()
spark

In [None]:
data = spark.read.csv(".//Du lieu cung cap/ratings_Office_Products.csv", inferSchema=True, header=None)

In [None]:
data.show(5, truncate=False)

+--------------+----------+---+----------+
|_c0           |_c1       |_c2|_c3       |
+--------------+----------+---+----------+
|A2UESEUCI73CBO|0078800242|5.0|1374192000|
|A3BBNK2R5TUYGV|0113000316|5.0|1359417600|
|A5J78T14FJ5DU |0113000316|3.0|1318723200|
|A2P462UH5L6T57|043928631X|5.0|1356912000|
|A2E0X1MWNRTQF4|0439340039|1.0|1379721600|
+--------------+----------+---+----------+
only showing top 5 rows



In [None]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)



In [None]:
data.count()

1243186

Change column name to user,item,rating,timestamp

In [None]:
data = data.withColumnRenamed('_c0','user')\
          .withColumnRenamed('_c1','item')\
          .withColumnRenamed('_c2','rating')\
          .withColumnRenamed('_c3','timestamp')

In [None]:
data.show(5)

+--------------+----------+------+----------+
|          user|      item|rating| timestamp|
+--------------+----------+------+----------+
|A2UESEUCI73CBO|0078800242|   5.0|1374192000|
|A3BBNK2R5TUYGV|0113000316|   5.0|1359417600|
| A5J78T14FJ5DU|0113000316|   3.0|1318723200|
|A2P462UH5L6T57|043928631X|   5.0|1356912000|
|A2E0X1MWNRTQF4|0439340039|   1.0|1379721600|
+--------------+----------+------+----------+
only showing top 5 rows



Build a model to predict overalls for products that have not been selected by users.

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import isnan, when, count, explode

In [None]:
data.select([count(when(col(c).isNull(),c)).alias(c) for c in data.columns]).toPandas().T

Unnamed: 0,0
user,0
item,0
rating,0
timestamp,0


There is no null data.

In [None]:
data_sub = data.select("user","item","rating")

In [None]:
# distinct users and movies
users = data_sub.select("user").distinct().count()
products = data_sub.select("item").distinct().count()
numerator = data_sub.count()

In [None]:
display(numerator, users, products)

1243186

909314

130006

In [None]:
# Number of ratings matrix ould contain if no empty cells
denominator = users*products
denominator

118216275884

In [None]:
# calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
print("sparsity: "), sparsity

sparsity: 


(None, 0.9999894838000039)

High sparsity - a typical property in sentiment data where there are high variety of products but customer only rates few of them.

In [None]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
# create an indexer
indexer = StringIndexer(inputCol='item',outputCol='item_idx')

# indexer creates categories in the data
indexer_model = indexer.fit(data_sub)

#indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='user', outputCol='user_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [None]:
data_indexed.show(5)

+--------------+----------+------+--------+--------+
|          user|      item|rating|item_idx|user_idx|
+--------------+----------+------+--------+--------+
|A2UESEUCI73CBO|0078800242|   5.0| 89108.0|161237.0|
|A3BBNK2R5TUYGV|0113000316|   5.0| 55258.0|280397.0|
| A5J78T14FJ5DU|0113000316|   3.0| 55258.0| 28050.0|
|A2P462UH5L6T57|043928631X|   5.0| 84359.0|865217.0|
|A2E0X1MWNRTQF4|0439340039|   1.0|117083.0|482385.0|
+--------------+----------+------+--------+--------+
only showing top 5 rows



In [None]:
data_indexed.select([count(when(col(c).isNull(),c)).alias(c) for c in data_indexed.columns]).toPandas().T

Unnamed: 0,0
user,0
item,0
rating,0
item_idx,0
user_idx,0


In [None]:
# smaller dataset so we will use 0.8/0.2
(training, testing) = data_indexed.randomSplit([0.7,0.3])

In [None]:
# create ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
als = ALS(maxIter=20,
          regParam=0.5, alpha=0.01,
          rank=15,
          userCol='user_idx',
          itemCol='item_idx',
          ratingCol='rating',
          coldStartStrategy='drop',
          seed = 2000,
          nonnegative=True)

In [None]:
model = als.fit(training)

In [None]:
# evaluate the model by computing RMSE
predictions = model.transform(testing)

In [None]:
#predictions.select(['item_idx','user_idx',
#                   'rating','prediction']).show(5)

In [None]:
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol='rating',
                                predictionCol='prediction')

In [None]:
rmse = evaluator.evaluate(predictions)
print("Root mean squared error= " + str(rmse))

Root mean squared error= 1.713459977320878


On average, this model is 1.71 from perfect recommendations. --> not a very good result, but this is the best I've got so far after many attempts to tune this model.

### Providing recommendations: for 3 users

In [None]:
import numpy as np

In [None]:
from pyspark.sql.types import StringType, IntegerType, StructType

get the index of the 3 users

In [None]:
three_users = ['A00473363TJ8YSZ3YAGG9','A335QXPTV1RIV1','ATIMW8SYGAASW']

In [None]:
three_users_df = spark.createDataFrame(three_users, StringType())
three_users_df.show()

+--------------------+
|               value|
+--------------------+
|A00473363TJ8YSZ3Y...|
|      A335QXPTV1RIV1|
|       ATIMW8SYGAASW|
+--------------------+



In [None]:
three_users_df = three_users_df.withColumnRenamed("value","user")

In [None]:
three_id_df = indexer1_model.transform(three_users_df)

In [None]:
three_id_df.show()

+--------------------+--------+
|                user|user_idx|
+--------------------+--------+
|A00473363TJ8YSZ3Y...| 12866.0|
|      A335QXPTV1RIV1|  2079.0|
|       ATIMW8SYGAASW|  2336.0|
+--------------------+--------+



In [None]:
user_3_recs = model.recommendForUserSubset(three_id_df,5)

In [None]:
user_3_recs.show(truncate=50)

+--------+--------------------------------------------------+
|user_idx|                                   recommendations|
+--------+--------------------------------------------------+
|    2336|[[64972, 4.7276554], [99249, 4.655932], [70216,...|
|   12866|[[70386, 4.4134884], [126736, 4.3012595], [7021...|
|    2079|[[75906, 4.3477488], [70216, 4.037935], [59015,...|
+--------+--------------------------------------------------+



Recommendations for the 3 users are products with high rating (>4). 

-> our model still works in giving good recommendations for customers.