In [19]:
import pandas as pd

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, sum

# Tạo SparkSession
spark = SparkSession.builder.appName("Prepare ALS Data")\
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

# Load dữ liệu clickstream
clickstream = spark.read.csv("../data/processed/event.csv", header=True, inferSchema=True)

# Gán điểm dựa trên event_name
clickstream = clickstream.withColumn("score", 
    when(col("event_name") == "ADD_TO_CART", 1)
    .when(col("event_name") == "BOOKING", 3)
    .otherwise(0))

# Tổng hợp điểm cho từng customer_id và product_id
als_data = clickstream.groupBy("customer_id", "product_id") \
    .agg(sum("score").alias("preference"))

# Lưu dữ liệu đã xử lý
# als_data.write.csv("als_data.csv", header=True)

In [21]:
als_data.show()

+-----------+----------+----------+
|customer_id|product_id|preference|
+-----------+----------+----------+
|    78414.0|   36359.0|         4|
|    15605.0|   45906.0|         4|
|    27858.0|   41824.0|         1|
|    89236.0|   31286.0|         4|
|    55709.0|   29026.0|         4|
|    43060.0|   53918.0|         1|
|    16585.0|   19629.0|         4|
|    32020.0|   53961.0|         4|
|    22787.0|   10088.0|         1|
|    46177.0|   32946.0|         5|
|    56319.0|   39177.0|         4|
|    56319.0|    7332.0|         4|
|    31173.0|   10176.0|         1|
|    68475.0|   24312.0|         4|
|    49171.0|   26611.0|         4|
|    97824.0|    7626.0|         4|
|    55538.0|   46780.0|         4|
|    89560.0|   59676.0|         1|
|    72152.0|   32066.0|         4|
|    21727.0|   22913.0|         1|
+-----------+----------+----------+
only showing top 20 rows



In [22]:
# Tìm giá trị cao nhất trong cột 'score'
from pyspark.sql.functions import max, min

max_score = als_data.agg(max("preference")).collect()[0][0]
min_score = als_data.agg(min("preference")).collect()[0][0]
print(f"Maximum score: {max_score}")
print(f"Minimum score: {min_score}")

Maximum score: 12
Minimum score: 1


In [23]:
df_train, df_test = als_data.randomSplit([0.7,0.3], seed=96)

In [24]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol='customer_id',
    itemCol='product_id',
    ratingCol='preference',
    coldStartStrategy='drop'
)

model = als.fit(df_train)

In [25]:
# prediction on test dataset:
df_test_prd = model.transform(df_test)
df_test_prd.show()

+-----------+----------+----------+----------+
|customer_id|product_id|preference|prediction|
+-----------+----------+----------+----------+
|      496.0|   12917.0|         4| 3.9842336|
|      496.0|   46830.0|         4| 3.1914058|
|      833.0|    3187.0|         4| 3.0874274|
|      833.0|    5504.0|         4| 3.9122088|
|      833.0|   57664.0|         4| 4.0283165|
|     1088.0|   12089.0|         4| 3.6327577|
|     1088.0|   25565.0|         1| 1.5979598|
|     1088.0|   29065.0|         4|  2.172713|
|     1088.0|   46823.0|         1|  2.650305|
|     1088.0|   47343.0|         1| 1.7782638|
|     1088.0|   47627.0|         1| 1.8678033|
|     1088.0|   49719.0|         4|  3.829351|
|     1238.0|   15630.0|         4|  4.063924|
|     1829.0|   15790.0|         4| 3.7018147|
|     1829.0|   18755.0|         1| 1.1800195|
|     1829.0|   28300.0|         4| 3.6398869|
|     1959.0|   48099.0|         4| 3.8311071|
|     2142.0|   25178.0|         1| 1.6189456|
|     2142.0|

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='preference',
    predictionCol='prediction'
)

In [27]:
rmse = evaluator.evaluate(df_test_prd.na.drop())
print(rmse)

0.9316944520991911


In [32]:
spark.stop()

In [31]:
model_path = "../models/als/"
model.write().overwrite().save(model_path)

Py4JJavaError: An error occurred while calling o625.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.recommendation.ALSModel$ALSModelWriter.saveImpl(ALS.scala:549)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
	at org.apache.hadoop.mapred.OutputCommitter.commitJob(OutputCommitter.java:291)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$3(SparkHadoopWriter.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
	... 51 more


In [14]:
model.userFactors.show(5, False)

+---+----------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                    |
+---+----------------------------------------------------------------------------------------------------------------------------+
|20 |[0.6807362, 0.31011897, 0.12753394, 0.45364946, -0.3580801, -0.012967266, 1.3934131, 0.19310178, -1.082696, 0.2812993]      |
|40 |[-0.13662344, 0.26364323, 0.07466937, 0.2693569, 0.029666223, -0.2907011, 1.2891877, -0.1529404, -0.9466689, -0.06268269]   |
|50 |[0.009621896, 0.28241834, 0.16843934, 0.7165552, 0.12422713, -1.4388404, 1.25759, 1.0114642, -0.50662875, -0.15065888]      |
|90 |[0.2194312, -0.4601535, -0.45368207, -0.041310523, 0.16983491, -0.11305215, 1.3755858, -0.66210586, -1.0669324, -0.23809767]|
|100|[0.1997282, 0.10376924, 0.12997991, 0.177001, -0.049248595, 0.08038205, 0.2750

In [15]:
rec_all_users = model.recommendForAllUsers(5).cache()
rec_all_users.show(5, False)

+-----------+-------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                  |
+-----------+-------------------------------------------------------------------------------------------------+
|463        |[{41262, 4.930607}, {28110, 4.895175}, {37234, 4.754526}, {41900, 4.7408643}, {56468, 4.697444}] |
|496        |[{6991, 6.5411954}, {59130, 5.3151345}, {25364, 5.2484584}, {22171, 5.238108}, {5994, 5.114468}] |
|833        |[{6991, 6.3968453}, {6455, 5.2672105}, {59130, 5.2389708}, {46813, 5.199222}, {42946, 5.1221056}]|
|1088       |[{47665, 5.419608}, {45643, 5.4180202}, {7953, 5.269348}, {1755, 5.147212}, {41262, 5.100894}]   |
|1238       |[{6991, 6.441902}, {6455, 5.1088386}, {59130, 5.1075234}, {46813, 5.080995}, {42946, 5.049011}]  |
+-----------+-------------------------------------------------------------------------------------------