# Prepare environment

In [1]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:10 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [829 kB]
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.lau

In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
%cd '/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_3_RecomendationSystem_TikiOnlineShopping'

/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_3_RecomendationSystem_TikiOnlineShopping


In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import statistics as stats
import math

# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains,\
isnan, udf, hour, array_min, array_max, countDistinct, regexp_extract, count, when
from pyspark.sql.functions import avg, struct, sum, explode
from pyspark.sql.types import *

from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.stat import ChiSquareTest

#Pysark ml
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


#Support function
def show_head(df):
  display(df.limit(5).toPandas())

def check_null(df):
    df.select([count(when(isnan(c) | df[c].isNull(), c)).alias(c) for c in df.columns]).show()


In [5]:
MAX_MEMORY = '12G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10005) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark(appName):
    spark = SparkSession \
        .builder \
        .appName(appName) \
        .config(conf=conf) \
        .getOrCreate()
    spark.sparkContext.setCheckpointDir('/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_3_RecomendationSystem_TikiOnlineShopping/spark_checkpoints')
    return spark

spark = init_spark("Tiki collaborative filtering")
spark

# Bước 1: Hiểu vấn đề, ngữ cảnh

### Ngữ cảnh
Giả sử doanh nghiệp thương mại điện tử top 2 Việt Name, tên là Tiki, chưa xây dựng hệ thống gợi ý sản phẩm tiêu dùng. Hiện Tiki muốn xây dựng hệ thống gợi ý này, để phát triển doanh nghiệp và cạnh tranh với các doanh nghiêp khác.

Và chúng ta là người được mời để phát triển hệ thống này.

### Xác định vấn đề
#### Mục tiêu:
Xây dựng hệ thống gợi ý sản phẩm tiêu dùng cho doanh nghiệp thương mại điện tử.
#### Giải pháp:
+ Content base recomendation (Gợi ý dựa trên sự tương đồng giữa các sản phẩm)
+ Collaborative filtering recomendation (Gợi ý dựa trên sự tương đồng và hành vi của các khách hàng với nhau)

Trong notebook này sẽ xây dựng hệ thống gợi ý dựa trên collaborative filtering

# Bước 2: Thu thập, hiểu dữ liệu

Dữ liệu đã được cấp sẵng với các thông tin như sau:

Dữ liệu được cung cấp sẵn gồm có các tập tin:
ReviewRaw.csv chứa thông tin sản phẩm,
review và rating cho các sản phẩm thuộc các nhóm hàng
hóa như Mobile_Tablet, TV_Audio, Laptop, Camera,
Accessory.

Gồm các cột như sau: 
+ customer_id: mã khách hàng
+ product_id: mã sản phẩm 
+ name: tên khách hàng
+ full_name: họ tên của khách hàng
+ create_time: thời điểm phản hồi
+ rating: số sao đánh giá sản phẩm (1: tệ, đến 5: tốt)
+ title: tiêu đề đánh giá
+ content: nội dung đánh giá

In [6]:
data = spark.read.csv('/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_3_RecomendationSystem_TikiOnlineShopping/ReviewRaw.csv',
                      header=True, inferSchema=True).cache()
data.show(5)

+-----------+----------+-----------------+----------------+-------------------+------+------------------+--------------------+
|customer_id|product_id|             name|       full_name|       created_time|rating|             title|             content|
+-----------+----------+-----------------+----------------+-------------------+------+------------------+--------------------+
|     709310|  10001012| Lân Nguyễn Hoàng|Lân Nguyễn Hoàng|               null|     3|Ko dùng đc thẻ nhớ|Lúcđầu quên thông...|
|   10701688|  10001012| Nguyễn Khánh Hòa|Nguyễn Khánh Hòa|               null|     5|   Cực kì hài lòng|Tiki giao hàng nh...|
|   11763074|  10001012|  Toàn Phạm Khánh| Toàn Phạm Khánh|2019-04-17 15:42:45|     5|   Cực kì hài lòng|chất lượng camera...|
|    9909549|  10001012|Nguyen Quang Minh|            null|               null|     5|      Rất hài lòng|Hàng được đóng gó...|
|    1827148|  10001012|      Phạm Bá Đức|     Phạm Bá Đức|               null|     5|   Cực kì hài lòng|dễ cài

# Bước 3: Tiền xử lý dữ liệu
Vì sẽ dùng thuật toán ALS để xây dựng hệ thống gợi ý sản phẩm, nên chỉ cần select các features: customer_id, product_id, rating.

Các bước tiền xử lý sẽ tiến hành như sau:  
+ Loại bỏ dữ liệu trùng lắp (một khách hàng chỉ rating duy nhất một sản phẩm)
+ Loại bỏ dữ liệu thiếu 
+ Chuyển kiểu dữ liệu cho chính xác


In [7]:
#Select useful columns for collaborative filtering  algothirm
data = data[['customer_id', 'product_id', 'rating' ]]
data.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- rating: string (nullable = true)



In [8]:

print(f'There are total {data.count()} records')

There are total 365821 records


In [9]:
#Drop duplicate data
data = data.drop_duplicates(subset=['customer_id', 'product_id'])

In [10]:

print(f'There are total {data.count()} records after drop duplicates')

There are total 361121 records after drop duplicates


In [11]:
#Detect missing values
check_null(data)

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|          1|       783|  1618|
+-----------+----------+------+



In [12]:
#Drop misssing values
data = data.dropna()
check_null(data)

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|          0|         0|     0|
+-----------+----------+------+



In [13]:
#Cast columns type
data = data.withColumn('rating', data['rating'].cast(FloatType()))
#Overview data
data.describe().show()

+-------+--------------------+--------------------+------------------+
|summary|         customer_id|          product_id|            rating|
+-------+--------------------+--------------------+------------------+
|  count|              359503|              359503|            359451|
|   mean|   9170476.046532072|2.4342564193709295E7| 4.473725208721078|
| stddev|   6307651.621988519|2.3756596933877297E7|1.0184288775309063|
|    min|- 4/12/2020: Tiki...| 13.000mAh thì mì...|               1.0|
|    max|        Ưu điểm: Nhỏ|             9996258|               5.0|
+-------+--------------------+--------------------+------------------+



In [14]:
#Select records where customer_id and product_id is not numberical
temp = data.filter(
    (data.customer_id.rlike('\D+')) | 
     (data.product_id.rlike('\D+'))
     )

print(f"There are total {temp.count()} invalid records")
temp.show(52)

There are total 52 invalid records
+--------------------+--------------------+------+
|         customer_id|          product_id|rating|
+--------------------+--------------------+------+
|Bàm phím theo mìn...|      không ồn nhiều|  null|
|- Đầu tiên là về ...| 13.000mAh thì mì...|  null|
|Vẫn thiết kế nhỏ ...| vừa không gây bí...|  null|
|Sau một thời gian...| mình thấy ổ cứng...|  null|
|             Tóm lại| ước gì mình đã k...|  null|
|  Về thiết kế sp đẹp| nhìn khá sang trọng|  null|
|Con này là bản A2...| đẹp hơn hộp đợt ...|  null|
|Giá quá ưu đãi ch...|      dung lượng cao|  null|
|Xét tổng quan thì...| gia công tốt bên...|  null|
| Bao da tặng kèm đẹp| bỏ ổ cứng vô nhì...|  null|
|Tai nghe này hình...|             gọn nhẹ|  null|
|   Về mặt dung lượng| 500gb không nhiề...|  null|
|Về chất lượng âm ...|                 5tr|  null|
|2. Về Anker: cục ...|                 đẹp|  null|
|           - Giá rẻ |     dung lượng lớn |  null|
|Bàn phím phổ thôn...| tuy nhien nút En...|  nu

In [15]:
data = data.filter(
    ~(data.customer_id.rlike('\D+')) &
     ~(data.product_id.rlike('\D+'))
     )
data.describe().show()

+-------+-----------------+--------------------+------------------+
|summary|      customer_id|          product_id|            rating|
+-------+-----------------+--------------------+------------------+
|  count|           359451|              359451|            359451|
|   mean|9170476.046532072|2.4342564193709295E7| 4.473725208721078|
| stddev|6307651.621988519|2.3756596933877297E7|1.0184288775309063|
|    min|               10|            10001012|               1.0|
|    max|          9999890|             9996258|               5.0|
+-------+-----------------+--------------------+------------------+



In [16]:
users_count = data.select('customer_id').distinct().count()
items_count = data.select('product_id').distinct().count()
numerator = data.count()

print(f"Sparsity: {1-(numerator/users_count/items_count):.2f}")
print(f'Number of users: {users_count}')
print(f'Number of items: {items_count}')

Sparsity: 1.00
Number of users: 251467
Number of items: 4218


Số lượng khách hàng gấp 5 lần số lượng sản phẩm, chứng tỏ là, những sản phẩm này rất đắc khách.

In [17]:
#indexer string columns
data = data.withColumn('customer_id', data['customer_id'].cast(LongType()))
data = data.withColumn('product_id', data['product_id'].cast(LongType()))
data.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- rating: float (nullable = true)



In [18]:
user_col = 'customer_id'
item_col = 'product_id'
rating_col = 'rating'

# Bước 4: Xây dựng model

## Tunning model

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel, TrainValidationSplit

In [20]:
#Take a piece of data for tunning, because fit all dataset will break my ram
train, test = data.select([user_col, item_col, rating_col]).sample(fraction = 0.8, seed = 2021).randomSplit([0.7, 0.2], seed = 2021)

In [21]:
#Init model
als = ALS(userCol = user_col, itemCol = item_col, ratingCol = rating_col,
    nonnegative=True, coldStartStrategy='drop', implicitPrefs = False
)
#Defind params grid
paramsGrid = ParamGridBuilder()\
                        .addGrid(als.rank, [10, 50, 100]) \
                        .addGrid(als.maxIter, [10, 50]) \
                        .addGrid(als.regParam, [.01, .05, .1]) \
                        .build()


In [22]:

tvs = TrainValidationSplit(estimator=als,
                           estimatorParamMaps=paramsGrid,
                           evaluator=RegressionEvaluator(labelCol='rating'),
                           trainRatio=0.7,
                           parallelism=65)
model = tvs.fit(train)

Total estimation time was 42 mins

In [24]:
predictions = model.bestModel.transform(text)
show_head(predictions)

Unnamed: 0,customer_id,product_id,rating,prediction
0,7523016,1675793,5.0,3.968034
1,11840091,2069769,3.0,3.432528
2,7681066,2086067,5.0,3.990786
3,12577003,2086067,5.0,3.867913
4,5235,2774881,5.0,4.46608


In [49]:
#Evaluate model
evaluator = RegressionEvaluator(labelCol=rating_col)
print(f"RMSE: {evaluator.evaluate(predictions):.2f}")

RMSE: 1.18


In [31]:
#Statistic overview rating columns
text.select('rating').describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|             64031|
|   mean| 4.476066280395433|
| stddev|1.0139667440765376|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



Độ lệch chuẩn trong dữ liệu thực tế là 1.0, trong khi, độ sai số của model dự đoán là 1.18. Với điểm số này, model trên có thể nói là hoạt động tương đối tốt.


In [44]:
model.bestModel.rank

100

In [45]:
(model.bestModel
    ._java_obj     # Get Java object
    .parent()      # Get parent (ALS estimator)
    .getMaxIter()) # Get maxIter

50

In [47]:
(model.bestModel
    ._java_obj     # Get Java object
    .parent()      # Get parent (ALS estimator)
    .getRegParam()) # Get getRegParam

0.1

In [50]:
#Fit model to all dataset with best param
als = ALS(
    userCol = user_col,
    itemCol = item_col,
    ratingCol = rating_col,
    nonnegative=True,
    coldStartStrategy='drop',
    implicitPrefs = False,
    rank = 100, regParam = 0.1, maxIter = 50
).fit(data)

In [51]:
test_data = data.sample(False,0.0001, 2022)
test_data.count()

38

In [52]:
recommends = als.recommendForUserSubset(test_data, numItems=5)
recommends.show( truncate=False)

+-----------+---------------------------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                                      |
+-----------+---------------------------------------------------------------------------------------------------------------------+
|16133138   |[{74227763, 4.986838}, {76732229, 4.941821}, {19395453, 4.909876}, {48520298, 4.871111}, {67985251, 4.84216}]        |
|15808261   |[{25171293, 1.9515743}, {15287960, 1.8828714}, {19420381, 1.8732599}, {76732229, 1.8670377}, {187278, 1.848959}]     |
|5650336    |[{76732229, 5.0044746}, {11191145, 4.888131}, {19395453, 4.747975}, {67985251, 4.7458663}, {50592901, 4.696976}]     |
|6325137    |[{76732229, 4.753544}, {1819263, 4.66037}, {19395453, 4.6191115}, {13392137, 4.5540023}, {53751834, 4.526345}]       |
|15533534   |[{76732229, 5.14967}, {19395453, 5.031311}, {2383179, 5.001319}

In [53]:
product_recommeds = als.recommendForItemSubset(test_data, numUsers=5)
product_recommeds.show( truncate=False)

+----------+------------------------------------------------------------------------------------------------------------------+
|product_id|recommendations                                                                                                   |
+----------+------------------------------------------------------------------------------------------------------------------+
|56799621  |[{5501942, 5.159139}, {1230935, 5.0629935}, {16284452, 5.0166187}, {16876819, 5.0166187}, {19323492, 4.9607573}]  |
|249953    |[{5501942, 5.677355}, {18185771, 5.413895}, {1230935, 5.3004174}, {5758917, 5.286437}, {10770051, 5.242272}]      |
|11191145  |[{18185771, 5.017337}, {5501942, 5.017262}, {6437047, 4.926277}, {1230935, 4.922798}, {15103000, 4.888131}]       |
|25171293  |[{14857161, 4.8801656}, {13537371, 4.8801656}, {15668016, 4.8801656}, {16459380, 4.878936}, {16554170, 4.878936}] |
|11794382  |[{5501942, 4.9889393}, {18185771, 4.957399}, {2007830, 4.892455}, {1013140, 4.892455}, {6159

# Step 5: Report
Hệ thống gợi ý sản phẩm dựa trên ALS model bởi pyspark được xây dựng.

Model có thể gợi ý sản phẩm cho từng khách hàng và ngược lại khách hàng cho từng sản phẩm.

Đa phần các sản phẩm đều có điểm đánh giá khá cao, trung bình là trên 4.0.

Khi gợi ý, model cần có mã khách hàng với định dạng là 'customer_id' để gợi ý sản phẩm cho khách hàng. Sau khi gợi ý model sẽ xuất ra một danh sách các sản phẩm và rating dự đoán, bằng định dạng pair value {mã sản phẩm, rating}

Ngoài ra model chỉ gợi ý được cho các khách hàng đã có  trong tập dữ liệu reviewRaw.csv.



In [54]:
#Save model for later use
als.save('/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_3_RecomendationSystem_TikiOnlineShopping/collaborative_filtering_model')


In [56]:
#Save clean data
data.toPandas().to_csv("/content/gdrive/MyDrive/LDS02_k271_NguyenMinhQuan/Problem_3_RecomendationSystem_TikiOnlineShopping/review_clean.csv")

In [57]:
spark.stop()