In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("MovieLens-ALS")
         .master("local[*]")
         .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
         .getOrCreate())

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/11 12:10:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
ratings = (spark.read
           .option("delimiter", "\t")
           .option("inferSchema", True)
           .csv("data/u.data")
           .toDF("user", "item", "rating", "timestamp"))

ratings.show(5)

+----+----+------+---------+
|user|item|rating|timestamp|
+----+----+------+---------+
| 196| 242|     3|881250949|
| 186| 302|     3|891717742|
|  22| 377|     1|878887116|
| 244|  51|     2|880606923|
| 166| 346|     1|886397596|
+----+----+------+---------+
only showing top 5 rows


In [4]:
train, test = ratings.randomSplit([0.8, 0.2], seed=42)

train.count(), test.count()

(79901, 20099)

In [7]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als = ALS(
    userCol="user",
    itemCol="item",
    ratingCol="rating",
    rank=20,
    regParam=0.1,
    maxIter=10,
    coldStartStrategy="drop",
    nonnegative=True
)

model = als.fit(train)
pred = model.transform(test)


In [10]:

pred.select("user","item","rating","prediction").show()

+----+----+------+----------+
|user|item|rating|prediction|
+----+----+------+----------+
| 148|   8|     4| 3.7730875|
| 148|  56|     5| 3.4646716|
| 148|  71|     5| 3.3524654|
| 148| 133|     5| 3.1331747|
| 148| 169|     5| 4.8557777|
| 148| 172|     5| 4.2456045|
| 148| 194|     5| 3.9707322|
| 148| 222|     4| 3.9441469|
| 148| 357|     5| 3.2085667|
| 148| 432|     5| 3.8137212|
| 148| 474|     5|  4.074557|
| 148| 529|     5| 3.8872902|
| 148| 588|     4| 3.5053844|
| 148| 713|     3| 3.5958986|
| 463|  24|     3| 2.5844703|
| 463|  50|     4| 3.6943824|
| 463| 117|     3| 2.6383386|
| 463| 124|     5| 3.5298643|
| 463| 147|     3| 1.8913786|
| 463| 149|     2| 3.0361662|
+----+----+------+----------+
only showing top 20 rows


In [11]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(pred)
rmse

0.9142874727436217

Креирање на for циклус со цел итерација на разни параметри за да може да се изврѓи експериментирање со помош на резултатите

In [14]:

ranks = [10, 20, 50]
regParams = [0.01, 0.1, 1.0]
maxIters = [5, 10, 20]

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

results = []

for r in ranks:
    for reg in regParams:
        for it in maxIters:
            als = ALS(
                userCol="user",
                itemCol="item",
                ratingCol="rating",
                rank=r,
                regParam=reg,
                maxIter=it,
                coldStartStrategy="drop",
                nonnegative=True
            )
            model = als.fit(train)
            preds = model.transform(test)
            rmse = evaluator.evaluate(preds)

            results.append((r, reg, it, rmse))
            print(f"rank={r}, regParam={reg}, maxIter={it} → RMSE={rmse:.4f}")

rank=10, regParam=0.01, maxIter=5 → RMSE=1.0070
rank=10, regParam=0.01, maxIter=10 → RMSE=1.0090
rank=10, regParam=0.01, maxIter=20 → RMSE=1.0105
rank=10, regParam=0.1, maxIter=5 → RMSE=0.9213
rank=10, regParam=0.1, maxIter=10 → RMSE=0.9148
rank=10, regParam=0.1, maxIter=20 → RMSE=0.9132
rank=10, regParam=1.0, maxIter=5 → RMSE=1.3664
rank=10, regParam=1.0, maxIter=10 → RMSE=1.3668
rank=10, regParam=1.0, maxIter=20 → RMSE=1.3668
rank=20, regParam=0.01, maxIter=5 → RMSE=1.0489
rank=20, regParam=0.01, maxIter=10 → RMSE=1.0711
rank=20, regParam=0.01, maxIter=20 → RMSE=1.0868
rank=20, regParam=0.1, maxIter=5 → RMSE=0.9200
rank=20, regParam=0.1, maxIter=10 → RMSE=0.9143
rank=20, regParam=0.1, maxIter=20 → RMSE=0.9118
rank=20, regParam=1.0, maxIter=5 → RMSE=1.3669
rank=20, regParam=1.0, maxIter=10 → RMSE=1.3668
rank=20, regParam=1.0, maxIter=20 → RMSE=1.3668
rank=50, regParam=0.01, maxIter=5 → RMSE=1.0786
rank=50, regParam=0.01, maxIter=10 → RMSE=1.1033
rank=50, regParam=0.01, maxIter=20 → RM

Овој дел претставува дополнителна споредба (baseline) со Python библиотеката Surprise, која е специјализирана за recommender системи. Целта е да се споредат резултатите од Spark ALS со алтернативни модели како SVD и KNN.

In [2]:
import pandas as pd
from surprise import Dataset, Reader, SVD, KNNWithMeans
from surprise.model_selection import train_test_split
from surprise.accuracy import rmse

In [3]:
df = pd.read_csv(
    "data/u.data",
    sep="\t",
    names=["user", "item", "rating", "timestamp"]
)

reader = Reader(rating_scale=(1, 5))
data = Dataset.load_from_df(df[["user", "item", "rating"]], reader)

trainset, testset = train_test_split(data, test_size=0.2, random_state=42)

In [4]:
svd = SVD(random_state=42)
svd.fit(trainset)

predictions_svd = svd.test(testset)
rmse_svd = rmse(predictions_svd)

RMSE: 0.9352


In [5]:
knn = KNNWithMeans(sim_options={"name": "cosine", "user_based": False})
knn.fit(trainset)

predictions_knn = knn.test(testset)
rmse_knn = rmse(predictions_knn)

Computing the cosine similarity matrix...
Done computing similarity matrix.
RMSE: 0.9402


Со Spark ALS беше постигната најниска RMSE вредност од 0.9106 при параметри
rank = 50, regParam = 0.1, maxIter = 20.

Конфигурацијата rank = 20, regParam = 0.1, maxIter = 10–20 дава слични резултати
(RMSE ≈ 0.91) со помала пресметковна сложеност.

Ниската регуларизација (regParam = 0.01) доведува до повисока грешка
(RMSE > 1.0), додека високата (regParam = 1.0) резултира со underfitting
(RMSE ≈ 1.37).

Во бонус анализата со Surprise, SVD моделот постигна RMSE ≈ 0.935,
а KNN моделот RMSE ≈ 0.9402.

Spark ALS моделот покажа подобра точност од Surprise моделите и поголема стабилност
при работа со податочното множество MovieLens 100k.