In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, exp, lit, expr, array, size, when, array_intersect
from pyspark.sql.types import IntegerType
from sklearn.model_selection import train_test_split
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("MovieLensALS").getOrCreate()

data_path = '../data/ml-100k/u.data'
columns = ["userId", "movieId", "rating", "timestamp"]
ratings = spark.read.csv(data_path, sep="\t", inferSchema=True).toDF(*columns)

ratings = ratings.withColumn("userId", col("userId").cast(IntegerType()))
ratings = ratings.withColumn("movieId", col("movieId").cast(IntegerType()))

max_timestamp = ratings.agg({'timestamp': 'max'}).collect()[0][0]
ratings = ratings.withColumn('weight', exp(-(lit(max_timestamp) - col('timestamp')) / lit(10**6)))

24/11/27 14:00:29 WARN Utils: Your hostname, MacBook-Air-Nasdorm.local resolves to a loopback address: 127.0.0.1; using 192.168.31.71 instead (on interface en0)
24/11/27 14:00:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/27 14:00:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
movies_path = '../data/ml-100k/u.item'

movies_columns = [
    "movieId", "title", "release_date", "video_release_date", "imdb_url",
    "unknown", "Action", "Adventure", "Animation", "Children's", "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
    "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

movies = spark.read.csv(movies_path, sep="|", inferSchema=True).toDF(*movies_columns)

movies.show(5)


+-------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movieId|            title|release_date|video_release_date|            imdb_url|unknown|Action|Adventure|Animation|Children's|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|      1| Toy Story (1995)| 01-Jan-1995|              NULL|http://us.imdb.co...|      0|     0|        0|        1|         1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|      2| GoldenEye (1995)| 01-Jan-1995|              NULL|h

In [4]:
user_activity = ratings.groupBy("userId").count()
user_activity.orderBy("count", ascending=False).show(5)

movie_popularity = ratings.groupBy("movieId").count()
movie_popularity.orderBy("count", ascending=False).show(5)


active_users = user_activity.filter(col("count") >= 5).select("userId")
filtered_ratings = ratings.join(active_users, on="userId", how="inner")

popular_movies = movie_popularity.filter(col("count") >= 5).select("movieId")
filtered_ratings = filtered_ratings.join(popular_movies, on="movieId", how="inner")


print(f"Количество записей после фильтрации: {filtered_ratings.count()}")
filtered_ratings.show(5)



+------+-----+
|userId|count|
+------+-----+
|   405|  737|
|   655|  685|
|    13|  636|
|   450|  540|
|   276|  518|
+------+-----+
only showing top 5 rows

+-------+-----+
|movieId|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
+-------+-----+
only showing top 5 rows

Количество записей после фильтрации: 99287
+-------+------+------+---------+--------------------+
|movieId|userId|rating|timestamp|              weight|
+-------+------+------+---------+--------------------+
|    242|   196|     3|881250949|5.928798377342577E-6|
|    302|   186|     3|891717742| 0.20827499106941064|
|    377|    22|     1|878887116|5.576568655535087E-7|
|     51|   244|     2|880606923|3.113649646606838E-6|
|    346|   166|     1|886397596|0.001018889469706...|
+-------+------+------+---------+--------------------+
only showing top 5 rows



In [5]:
avg_ratings = filtered_ratings.groupBy("userId").avg("rating").withColumnRenamed("avg(rating)", "avg_user_rating")
normalized_ratings = filtered_ratings.join(avg_ratings, on="userId")
normalized_ratings = normalized_ratings.withColumn("normalized_rating", col("rating") - col("avg_user_rating"))


In [6]:
# Split data into training and testing sets
(train_data, test_data) = normalized_ratings.randomSplit([0.8, 0.2], seed=42)

# Build ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)

# Define parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

# Set up cross-validation
crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel

# Print best hyperparameters
print(f"Лучший ранг: {best_model.rank}")
print(f"Лучший параметр регуляризации: {best_model._java_obj.parent().getRegParam()}")
print(f"Лучшее число итераций: {best_model._java_obj.parent().getMaxIter()}")

predictions = best_model.transform(test_data)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error: {rmse}")


24/11/27 14:00:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/11/27 14:00:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/11/27 14:00:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/11/27 14:00:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Лучший ранг: 30
Лучший параметр регуляризации: 0.1
Лучшее число итераций: 15
Root-mean-square error: 0.9146304444654756


In [7]:
# Combining ALS predictions with movie data
predictions_with_content = best_model.transform(normalized_ratings).join(movies, on="movieId", how="left")

# Add the popularity of movies
movie_popularity = ratings.groupBy("movieId").count().withColumnRenamed("count", "popularity")
predictions_with_content = predictions_with_content.join(movie_popularity, on="movieId", how="left")

# Build final dataset
feature_columns = [
    "prediction", "popularity", "Action", "Adventure", "Animation", "Children's", "Comedy",
    "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical",
    "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

# Combine all features in vector
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
final_data = assembler.transform(predictions_with_content)

# Checking final structure
final_data.select("features", "rating").show(5, truncate=False)


24/11/27 14:01:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------------------------------------------------+------+
|features                                                      |rating|
+--------------------------------------------------------------+------+
|(20,[0,1,2,7,9],[3.3366076946258545,413.0,1.0,1.0,1.0])       |1     |
|(20,[0,1,6,7],[4.144967555999756,241.0,1.0,1.0])              |5     |
|(20,[0,1,2,3,16,17],[3.726996898651123,151.0,1.0,1.0,1.0,1.0])|4     |
|(20,[0,1,2,3,6,15],[4.275229454040527,324.0,1.0,1.0,1.0,1.0]) |5     |
|(20,[0,1,4,6],[4.048789978027344,66.0,1.0,1.0])               |4     |
+--------------------------------------------------------------+------+
only showing top 5 rows



In [8]:
final_data_pd = final_data.select("features", "rating").toPandas()

X = np.vstack(final_data_pd["features"].values)
y = final_data_pd["rating"].values


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# LightGBM
model = LGBMRegressor()
model.fit(X_train, y_train)

# Predict and score
y_pred = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print(f"RMSE для гибридной модели: {rmse}")


[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.004210 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 528
[LightGBM] [Info] Number of data points in the train set: 79429, number of used features: 20
[LightGBM] [Info] Start training from score 3.535988
RMSE для гибридной модели: 0.6610522241515715


In [9]:
# Test date for user 
test_user = 10
user_data = final_data.filter(col("userId") == test_user).select("features", "movieId")

# Predict Hybrid model (LightGBM)
user_data_pd = user_data.toPandas()
X_user = np.vstack(user_data_pd["features"].values)
predicted_ratings = model.predict(X_user)

# Add predict LightGBM in DataFrame
lightgbm_predictions = pd.DataFrame({
    "movieId": user_data_pd["movieId"],
    "lightgbm_predicted_rating": predicted_ratings
})

# ALS predict for user
als_predictions = predictions_with_content.filter(col("userId") == test_user).select("movieId", "prediction")

# Convert predict ALS in Pandas
als_predictions_pd = als_predictions.toPandas()

# Merge ALS и LightGBM on movieId
final_predictions = pd.merge(
    als_predictions_pd,
    lightgbm_predictions,
    on="movieId"
)

# Weighing predictions
alpha = 0.7
final_predictions["final_predicted_rating"] = (
    alpha * final_predictions["prediction"] + (1 - alpha) * final_predictions["lightgbm_predicted_rating"]
)

# Sorting movies according final predicted rating 
final_recommendations = final_predictions.sort_values(by="final_predicted_rating", ascending=False)

# Print Top-10 movies
print("Топ-10 фильмов для пользователя:")
print(final_recommendations.head(10))


Топ-10 фильмов для пользователя:
     movieId  prediction  lightgbm_predicted_rating  final_predicted_rating
78       483    4.854363                   4.916336                4.872955
81       127    4.725800                   4.930145                4.787103
94       357    4.734693                   4.904951                4.785770
120      134    4.728677                   4.898746                4.779697
29        64    4.711306                   4.914461                4.772252
128      513    4.687511                   4.891517                4.748713
80       603    4.685423                   4.883041                4.744708
117      178    4.663743                   4.858371                4.722132
40       498    4.655439                   4.862816                4.717652
83        98    4.651610                   4.831943                4.705710
