# A. CODE
- Đây là code được xây dựng recommendations được sử dụng phương thức ALS trong PySpark và Jupyter Notebook

# B. DATA INFO
- Dữ liệu được cào từ dữ liệu Tiki và có trên 300k dòng với rất nhiều cột
- Chúng tôi sẽ recommender model sử dụng 75% của dữ liệu làm tập train và 25% con lại làm tập test

# C. THỰC HIỆN BÀI TOÁN

### 1.1. Import thư viện

In [1]:
import findspark
findspark.init()
# import libraries
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from datetime import datetime
from pyspark.sql.functions import to_date, dayofweek, to_timestamp
from pyspark.sql import types 
from pyspark.sql.types import DateType
from pyspark.sql.functions import year, month
from pyspark.sql.functions import dayofmonth, weekofyear
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import coalesce, first, lit
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import datediff
from pyspark.sql.functions import when

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.sql.functions import isnan, when, count, col, rand, udf, mean, stddev, log

# Converting String to index
from pyspark.ml import Pipeline

# Create ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import itertools

### 1.2. Thực hiện bài toán

#### 1.2.1. Step 1: Create the SparkSession Object

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommender Systems - Hệ thống đề xuất').getOrCreate()

#### 1.2.2. Step 2: Read the Dataset
- Chung ta khi load và đọc dữ liệu với Spark phải sử dụng dataframe.

In [3]:
# Loads data
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType
file_path = 'data_analysis/data_merge.csv'
df = spark.read.csv(file_path,inferSchema=True,header=True,  encoding='utf-8')
df.show(5)

+---+-----------+--------------------+---------------+------------+--------------------+--------------------+-----+----------+-------+------+
|_c0|fea_item_id|            fea_name|fea_customer_id|fea_rating_y|               image|                 url|brand|list_price|  price|rating|
+---+-----------+--------------------+---------------+------------+--------------------+--------------------+-----+----------+-------+------+
|  0|   48102821|tai nghe bluetoot...|         159465|         4.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|
|  1|   48102821|tai nghe bluetoot...|       16122994|         5.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|
|  2|   48102821|tai nghe bluetoot...|       18720987|         2.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|
|  3|   48102821|tai nghe bluetoot...|       19631063|         4.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|
|  4| 

#### 1.2.3. Step 3: Exploratory Data Analysis
- Khám phá dữ liệu bới vì xem xét dữ liệu, điều kiện dữ liệu và đếm xem có bao nhiều mã hàng, bao nhiêu sao đánh giá...

In [4]:
df.count()

361087

In [5]:
df.columns

['_c0',
 'fea_item_id',
 'fea_name',
 'fea_customer_id',
 'fea_rating_y',
 'image',
 'url',
 'brand',
 'list_price',
 'price',
 'rating']

In [6]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- fea_item_id: integer (nullable = true)
 |-- fea_name: string (nullable = true)
 |-- fea_customer_id: integer (nullable = true)
 |-- fea_rating_y: double (nullable = true)
 |-- image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- list_price: double (nullable = true)
 |-- price: double (nullable = true)
 |-- rating: double (nullable = true)



##### Nhận xét
- Kiểu dữ liệu đã như mong muốn
- Chọn lại dữ liệu đúng với bài toán

In [7]:
lst = ['fea_item_id','fea_name','fea_customer_id','fea_rating_y','image','url','brand','list_price','price','rating']

In [8]:
data_sub = df.select(lst)

In [9]:
data_sub.printSchema()

root
 |-- fea_item_id: integer (nullable = true)
 |-- fea_name: string (nullable = true)
 |-- fea_customer_id: integer (nullable = true)
 |-- fea_rating_y: double (nullable = true)
 |-- image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- list_price: double (nullable = true)
 |-- price: double (nullable = true)
 |-- rating: double (nullable = true)



In [10]:
# Chuyển kiểu dữ liệu cho tập data
data_sub = data_sub.withColumn("fea_customer_id", data_sub["fea_customer_id"].cast(IntegerType()))
data_sub = data_sub.withColumn("fea_item_id", data_sub["fea_item_id"].cast(IntegerType()))
data_sub = data_sub.withColumn("fea_rating_y", data_sub["fea_rating_y"].cast(DoubleType()))

In [11]:
data_sub.printSchema()

root
 |-- fea_item_id: integer (nullable = true)
 |-- fea_name: string (nullable = true)
 |-- fea_customer_id: integer (nullable = true)
 |-- fea_rating_y: double (nullable = true)
 |-- image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- list_price: double (nullable = true)
 |-- price: double (nullable = true)
 |-- rating: double (nullable = true)



In [12]:
data_sub.orderBy(rand()).show(10, False)

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------+-------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------+---------+------+
|fea_item_id|fea_name                                                                                                                                           |fea_customer_id|fea_rating_y|image                                                                                                        |url                                                                                                                                                       |brand     |list_price|price    |rating|
+---------

In [13]:
data_sub.where(col("fea_customer_id").isNull())

DataFrame[fea_item_id: int, fea_name: string, fea_customer_id: int, fea_rating_y: double, image: string, url: string, brand: string, list_price: double, price: double, rating: double]

In [14]:
data_sub.filter(data_sub.fea_customer_id.isNull()).count()

0

In [15]:
data_sub.groupBy('fea_customer_id').count().orderBy('count',ascending=False).show(10,False) 

+---------------+-----+
|fea_customer_id|count|
+---------------+-----+
|7737978        |50   |
|7280719        |40   |
|6106142        |37   |
|7377207        |35   |
|1064154        |34   |
|1425077        |32   |
|1046981        |31   |
|6177374        |30   |
|6844844        |29   |
|717732         |28   |
+---------------+-----+
only showing top 10 rows



In [16]:
data_sub.groupBy('fea_customer_id').count().orderBy('count',ascending=True).show(10,False)

+---------------+-----+
|fea_customer_id|count|
+---------------+-----+
|1367425        |1    |
|1694268        |1    |
|17634780       |1    |
|11569360       |1    |
|8068093        |1    |
|19850258       |1    |
|563859         |1    |
|20214885       |1    |
|466586         |1    |
|15220455       |1    |
+---------------+-----+
only showing top 10 rows



##### Nhận xét
- Ta thất khách hàng mua nhiều nhất là khách hàng 7737978 có 50 đơn hàng ít nhất là chỉ có 1 đơn hàng

In [17]:
data_sub.groupBy('fea_name').count().orderBy('count',ascending=False).show(10,False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|fea_name                                                                                                                                                    |count|
+------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|chuột không dây logitech m331 silenthàng chính hãng                                                                                                         |4715 |
|tai nghe nhét tai mi basic xiaomi hsej3jyhàng chính hãng                                                                                                    |2629 |
|pin sạc dự_phòng xiaomi redmi 2 mah pb2lmz tích_hợp cổng usb typec in hỗ_trợ sạc nhanh 18 whàng chính hãng                                                  |2419 |
|usb kings

##### Nhận xét
- Mã item 299461 nhiều đờn hàng nhất với 4715 đơn hàng

#### 1.2.4. Step 4: Feature Engineering
- Chúng tôi sẽ convert những column categorical sang numerical values using StringIndexex

In [18]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString

In [19]:
stringIndexer = StringIndexer(inputCol="fea_name", outputCol="title_new")

In [20]:
model = stringIndexer.fit(data_sub)

In [21]:
indexed = model.transform(data_sub)

In [22]:
indexed.show(10)

+-----------+--------------------+---------------+------------+--------------------+--------------------+-----+----------+-------+------+---------+
|fea_item_id|            fea_name|fea_customer_id|fea_rating_y|               image|                 url|brand|list_price|  price|rating|title_new|
+-----------+--------------------+---------------+------------+--------------------+--------------------+-----+----------+-------+------+---------+
|   48102821|tai nghe bluetoot...|         159465|         4.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|     23.0|
|   48102821|tai nghe bluetoot...|       16122994|         5.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|     23.0|
|   48102821|tai nghe bluetoot...|       18720987|         2.0|https://salt.tiki...|https://tai-nghe-...|  OEM|  300000.0|77000.0|   4.0|     23.0|
|   48102821|tai nghe bluetoot...|       19631063|         4.0|https://salt.tiki...|https://tai-nghe-...|  OEM| 

In [23]:
indexed.groupBy('fea_item_id').count().orderBy('count',ascending=False).show(10,False)

+-----------+-----+
|fea_item_id|count|
+-----------+-----+
|299461     |4715 |
|1600005    |2629 |
|47321729   |2419 |
|405243     |2316 |
|8141868    |2202 |
|389145     |2077 |
|487511     |2057 |
|547563     |2006 |
|591960     |1962 |
|43107185   |1933 |
+-----------+-----+
only showing top 10 rows



#### Nhận xét:
- Do model mã id là kiểu số nên không cần chuyển đổi qua mã sản phẩm

#### 1.2.5. Step 5: Splitting the Dataset
- Chung ta có thể so sánh dữ liệu xây dựng mô hình đề xuất, chung có thể chia dữ liệu theo dữ liệu training và test
- Chung ta có thể chia trong khoảng 75 đến 25 chỉ số train mode và test accuracy

In [24]:
train,test=data_sub.randomSplit([0.75,0.25])

In [25]:
train.count()

270675

In [26]:
test.count()

90412

#### 1.2.6. Step 6: Build and Train Recommender Model
- Chúng ta có thể import ALS thư viện Pyspark machine learning và xây dựng mô hình training dữ liệu
- Có rất nhiều parameter nhưng có tuned cải thiện hiệu suất mô hình
- Có 2 para quan trọng đó là: 
    + nonnegative =‘True’: Nó không thêm negative ratings in recommendations
    + coldStartStrategy=‘drop’ to prevent any NaN ratings predictions

In [27]:
from pyspark.ml.recommendation import ALS

In [28]:
rec=ALS(maxIter=15, userCol='fea_customer_id',itemCol='fea_item_id',ratingCol='fea_rating_y', nonnegative=True, coldStartStrategy="drop")

In [29]:
rec_model=rec.fit(train)

#### 1.2.7. Step 7: Predictions and Evaluation on Test Data
- Hoàn thành 1 phần bài tập là kiểm tra hiệu quả của mô hình hoặc test dữ liệu
- Chúng ta sử dụng các hàm làm dự đoán trên tập test data và RegressionEvaluate để check the RMSE value của tập dữ liệu

In [30]:
predicted_ratings=rec_model.transform(test)

In [31]:
predicted_ratings.printSchema()

root
 |-- fea_item_id: integer (nullable = true)
 |-- fea_name: string (nullable = true)
 |-- fea_customer_id: integer (nullable = true)
 |-- fea_rating_y: double (nullable = true)
 |-- image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- list_price: double (nullable = true)
 |-- price: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- prediction: float (nullable = false)



In [32]:
predicted_ratings.orderBy(rand()).show(20)

+-----------+--------------------+---------------+------------+--------------------+--------------------+--------+----------+---------+------+----------+
|fea_item_id|            fea_name|fea_customer_id|fea_rating_y|               image|                 url|   brand|list_price|    price|rating|prediction|
+-----------+--------------------+---------------+------------+--------------------+--------------------+--------+----------+---------+------+----------+
|     249953|dây_cáp sạc micro...|       19021227|         5.0|https://salt.tiki...|https://day-cap-s...|   Anker|   99000.0|  69000.0|   4.7|  4.104461|
|   75643830|ốp lưng apple sil...|       11183810|         5.0|https://salt.tiki...|https://op-lung-a...|   Apple| 1200000.0| 490000.0|   4.8|   4.71597|
|     487524|thẻ nhớ micro sd ...|        9101139|         5.0|https://salt.tiki...|https://the-nho-m...| Samsung|  350000.0| 239000.0|   4.6| 3.9841497|
|     180912|bàn_phím bluetoot...|       18057770|         5.0|https://salt.

In [33]:
### Đánh giá
from pyspark.ml.evaluation import RegressionEvaluator

In [34]:
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='fea_rating_y')

In [35]:
rmse=evaluator.evaluate(predicted_ratings)

In [36]:
rmse

1.3823175203078157

#### Nhận xét:
- RMSE rất cao, cho nên cần xem và chọn tham số phù hợp với bài toán

#### 1.2.8. Step 8: Recommend Top item That Active User Might Like
- Sau khi kiểm tra hiểu quả về mô hình và tuning the hyperparameters
- Chúng ta có thể duy chuyển để xuất theo top items tới user id đó và họ có thể nhìn và thích

In [37]:
indexed.columns

['fea_item_id',
 'fea_name',
 'fea_customer_id',
 'fea_rating_y',
 'image',
 'url',
 'brand',
 'list_price',
 'price',
 'rating',
 'title_new']

In [38]:
lst_ = ['fea_item_id','fea_name','fea_customer_id','fea_rating_y','image','url','brand','list_price','price','rating']
unique_item=data_sub.select(lst_).distinct()

In [39]:
unique_item.show(10,False)

+-----------+------------------------------------------------------------------------------------------+---------------+------------+-------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+--------+----------+--------+------+
|fea_item_id|fea_name                                                                                  |fea_customer_id|fea_rating_y|image                                                                                                        |url                                                                                                                              |brand   |list_price|price   |rating|
+-----------+------------------------------------------------------------------------------------------+---------------+------------+-----------------------------------------------

##### Vì vậy chúng ta thấy có tổng cộng 4,154 item trong dataframe

In [40]:
a = unique_item.alias('a')
a

DataFrame[fea_item_id: int, fea_name: string, fea_customer_id: int, fea_rating_y: double, image: string, url: string, brand: string, list_price: double, price: double, rating: double]

##### Chúng ta có thể chọn user không có dữ liệu, chúng ta có thể sử để xuất 1 item khác cho người dùng

In [41]:
fea_customer_id=16762580

##### Chúng ta sẽ lọc item người dùng đã mua và đánh giá tích cực nó

In [42]:
lst__ =['fea_item_id','image','url','brand','list_price','price','rating']
item_choose=data_sub.filter(data_sub['fea_customer_id'] ==fea_customer_id).select(lst__).distinct()

In [43]:
item_choose.count()

2

In [44]:
b=item_choose.alias('b')

##### Vì vậy, Có 48 item trong tổng số 4154 item đó khách hàng sản sàng đánh giá => Vì vậy Chung ta sẽ để xuất những item con lại cho khách hàng. Chúng ta sẽ combine 2 bảng và chung ta có thể lọc những để xuất có thể null và joined 2 bảng lại với nhau

In [45]:
total_item = a.join(b, a['fea_item_id'] == b['fea_item_id'],how='left')

In [46]:
total_item.show(10,False)

+-----------+-----------------------------+---------------+------------+-----------------------------------------------------------------------------------------------+--------------------------------------------------------------------+-----+----------+---------+------+-----------+-----+----+-----+----------+-----+------+
|fea_item_id|fea_name                     |fea_customer_id|fea_rating_y|image                                                                                          |url                                                                 |brand|list_price|price    |rating|fea_item_id|image|url |brand|list_price|price|rating|
+-----------+-----------------------------+---------------+------------+-----------------------------------------------------------------------------------------------+--------------------------------------------------------------------+-----+----------+---------+------+-----------+-----+----+-----+----------+-----+------+
|605223     |tivi sony hd

In [47]:
remaining_item=total_item.where(col("b.fea_item_id").isNull()).select(a.fea_item_id).distinct()

In [48]:
remaining_item.show(10,False)

+-----------+
|fea_item_id|
+-----------+
|605223     |
|1675793    |
|2069769    |
|2086067    |
|2774881    |
|3222489    |
|4193095    |
|5983423    |
|10723695   |
|11415485   |
+-----------+
only showing top 10 rows



In [49]:
remaining_item=remaining_item.withColumn("fea_customer_id",lit(int(fea_customer_id)))

In [50]:
remaining_item.show(10,False)

+-----------+---------------+
|fea_item_id|fea_customer_id|
+-----------+---------------+
|605223     |16762580       |
|1675793    |16762580       |
|2069769    |16762580       |
|2086067    |16762580       |
|2774881    |16762580       |
|3222489    |16762580       |
|4193095    |16762580       |
|5983423    |16762580       |
|10723695   |16762580       |
|11415485   |16762580       |
+-----------+---------------+
only showing top 10 rows



##### Kết thúc, chúng ta có thể dự đoán phần con lại dữ liệu item cho userid sử dụng để xuất 1 mô hình đó chúng ta có thể xây chúng 1 cách dễ dàng. Chúng ta có thể lọc 1 top đề xuất có đánh giá cao về dự đoán ratings

In [51]:
recommendations=rec_model.transform(remaining_item).orderBy('prediction',ascending=False)

In [52]:
recommendations.show(5,False)

+-----------+---------------+----------+
|fea_item_id|fea_customer_id|prediction|
+-----------+---------------+----------+
|55515547   |16762580       |6.143086  |
|62723100   |16762580       |6.1235785 |
|74893380   |16762580       |5.9154935 |
|74897040   |16762580       |5.8990493 |
|44349631   |16762580       |5.834206  |
+-----------+---------------+----------+
only showing top 5 rows



In [53]:
recommendations_ =recommendations.toPandas()
recommendations_.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4169 entries, 0 to 4168
Data columns (total 3 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   fea_item_id      4169 non-null   int32  
 1   fea_customer_id  4169 non-null   int32  
 2   prediction       4169 non-null   float32
dtypes: float32(1), int32(2)
memory usage: 49.0 KB


##### Nhận xét:
- Những item trên được đánh giá sao cao cho user 7737978. Chúng ta có thể trực qua hóa hơn bằng cách thêm tiêu đề cho item trở lại các khuyện nghị cho khách hàng. Chúng ta có thể sử dụng chức năng Indextostring  để bổ sung tiêu đề

In [54]:
item_title = IndexToString(inputCol="fea_item_id",outputCol="title",labels=model.labels)

In [55]:
final_recommendations=item_title.transform(recommendations)
final_recommendations

DataFrame[fea_item_id: int, fea_customer_id: int, prediction: float, title: string]

In [56]:
#final_recommendations.show(5,False)

In [57]:
import pickle
pickle.dump(rec_model,open('rec_model.pkl','wb'))

Py4JError: An error occurred while calling o226.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



##### Vì vậy đề xuất cho khách hàng mua những sản phẩm trên.