<div style="font-size:18pt;padding-top:20px; text-align:center">Домашнее задание 4. <b>Рекомендательные системы и Spark MLlib</b> </div><hr>
<div style="text-align:right;">Куценко А. А <span style="font-style: italic;font-weight: bold;">(ftruf357ft@gmail.com)</span></div>

In [1]:
# Вариант
eu_position = 8
eu_position % 3

2

## Задание 2. Коллаборативная фильтрация

- Вариант 1. По схожести пользователей
- Вариант 2. По схожести объектов

Этапы:
1. Разделите данные с рейтингами на обучающее (train_init - 0.8) и тестовое подмножества (test - 0.2), определите среднее значение рейтинга в обучающем подмножестве и вычислите `rmse` для тестового подмножества, если для всех значений из test предсказывается среднее значение рейтинга
2. Реализуйте коллаборативную фильтрацию в соответствии с вариантом. Для определения схожести используйте train_init, для расчета `rmse` - test
3. Определите `rmse` для тестового подмножества

Environment setup

In [2]:
import os
import sys

os.environ["SPARK_HOME"]="/home/ubuntu/BigData/spark"
os.environ["PYSPARK_PYTHON"]="/home/ubuntu/ML/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/ubuntu/ML/anaconda3/bin/python"

spark_home = os.environ.get("SPARK_HOME")
sys.path.insert(0, os.path.join(spark_home, "python"))
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

In [3]:
from pyspark import Row, RDD
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [4]:
class ItemBasedRecommend:
    """
    Main class that implements the item-based collaborative filtering.

    The approach includes:
    - cosine similarity calculation between items
    - weighted sum calculation for rating prediction

    Parameters
    ----------
    top_N_similarities : int, optional (default=20)
        Number of top similarities for a given item pair that will compose
        a similarity matrix. It is used in the train phase
    top_N_ratings : int, optional (default=10)
        Number of top ratings that return as a result of prediction for a
        given user

    Attributes
    ----------
    spark : SparkSession
    top_N_similarities : int
    top_N_ratings : int
    df_train : DataFrame
        Ratings of users in the following format: [usedId, movieId, rating]
    br_similarity : Broadcast
        Dictionary of similarities of item pairs
    br_P : Broadcast
        Set of items that contains df_train
    br_U : Broadcast
        Set of users  that contains df_train

    """
    def __init__(self, spark, top_N_similarities=20, top_N_ratings=None):
        self.spark = spark
        self.top_N_similarities = int(top_N_similarities)
        self.top_N_ratings = int(top_N_ratings) if top_N_ratings else None
        
    def train(self, df_train, top_N=None, user_column="user", item_column="item", rating_column="rating"):
        top_N = int(top_N or self.top_N_similarities)

        df = df_train.select(
            F.col(user_column).alias("user"),
            F.col(item_column).alias("item"),
            F.col(rating_column).alias("rating")
        ).repartition("user").cache()

        # Нормы
        df_norm = df.groupBy("item").agg(F.sqrt(F.sum(F.col("rating")**2)).alias("norm"))

        # Перемножение рейтингов
        df_dot = (
            df.alias("a")
            .join(df.alias("b"), "user")
            .where(F.col("a.item") < F.col("b.item"))
            .groupBy(F.col("a.item").alias("p1"), F.col("b.item").alias("p2"))
            .agg(F.sum(F.col("a.rating") * F.col("b.rating")).alias("dot"))
        )

        # Broadcast join с нормами
        df_norm_cached = df_norm.persist()

        df_sim = (
            df_dot
            .join(F.broadcast(df_norm_cached.withColumnRenamed("item", "p1").withColumnRenamed("norm", "n1")), "p1")
            .join(F.broadcast(df_norm_cached.withColumnRenamed("item", "p2").withColumnRenamed("norm", "n2")), "p2")
            .select(
                "p1", "p2",
                (F.col("dot") / (F.col("n1") * F.col("n2"))).alias("sim")
            )
        )

        # top-N
        window = Window.partitionBy("p1").orderBy(F.col("sim").desc())
        df_topN = df_sim.withColumn("rn", F.row_number().over(window)).filter(F.col("rn") <= top_N)

        # Сохраняем результат
        df_topN.write.mode("overwrite").parquet("hdfs:///models/similarity_matrix")
        
        P = set(row["item"] for row in df.select("item").distinct().collect())
        U = set(row["user"] for row in df.select("user").distinct().collect())

        self.br_P = self.spark.sparkContext.broadcast(P)
        self.br_U = self.spark.sparkContext.broadcast(U)
        self.df_train = df
        self.top_N_similarities = top_N
        return self
        
#     def train(self, df_train, top_N=None, user_column_name="user", item_column_name="item",
#               rating_column_name="rating"):
#         """
#         Calculate cosine similarities between all item pairs

#         Parameters
#         ----------
#         df_train : DataFrame
#             Ratings of users in the following format: [usedId, movieId, rating]
#         top_N : int or None
#             Number of top similarities for a given item pair that will compose
#             a similarity matrix. It is used in the train phase
#         rating_column_name : str
#         user_column_name : str
#         item_column_name : str

#         Returns
#         -------
#         self
#         """
#         top_N = int(top_N) if top_N else self.top_N_similarities
#         user_column_name = str(user_column_name)
#         item_column_name = str(item_column_name)
#         rating_column_name = str(rating_column_name)

#         clmn_names = [F.col(user_column_name).alias("user"),
#                       F.col(item_column_name).alias("item"),
#                       F.col(rating_column_name).alias("rating")]

#         df_train = df_train.select(clmn_names)

#         left_clmn_names = [F.col("user").alias("u"),
#                    F.col("item").alias("p1"),
#                    F.col("rating").alias("v1")]

#         right_clmn_names = [F.col("user").alias("u"),
#                     F.col("item").alias("p2"),
#                     F.col("rating").alias("v2")]
        
#         # Step 1. Create dot products
        
#         df_dot = df_train.select(left_clmn_names)\
#             .join(df_train.select(right_clmn_names), on="u")\
#             .where(F.col("p1") < F.col("p2"))\
#             .groupBy([F.col("p1"), F.col("p2")])\
#             .agg(F.sum(F.col("v1") * F.col("v2")).alias("dot"))

#         # Step 2. Calculate norms
        
#         df_norm = df_train.select(left_clmn_names)\
#             .groupBy(F.col("p1"))\
#             .agg(F.sqrt(F.sum(F.col("v1") * F.col("v1"))).alias("norm"))

#         similarity_clmns = [F.col("p1"), F.col("p2"), (F.col("dot")/F.col("n1")/F.col("n2")).alias("sim")]
        
#         # Step 3. Calculate similarities
        
#         df_similarity = df_dot.join(df_norm.select(F.col("p1"), F.col("norm").alias("n1")), on="p1")\
#                     .join(df_norm.select(F.col("p1").alias("p2"), F.col("norm").alias("n2")), on="p2")\
#                     .select(similarity_clmns)
        
#         # Step 4. Truncate similarities

#         window = Window.partitionBy(df_similarity["p1"]).orderBy(df_similarity["sim"].desc())
#         df_similarity_N = df_similarity.select("*", F.rank().over(window).alias("rank"))\
#                     .filter(F.col("rank") <= top_N)
        
#         # Step 5. Collect data (similarities, users, products) on driver
        
#         df_similarity_pn = df_similarity_N.toPandas()
#         dict_similarity = df_similarity_pn.set_index(["p1", "p2"]).to_dict()["sim"]
        
#         P = {el["item"] for el in df_train[[F.col("item")]].distinct().collect()}
#         U = {el["user"] for el in df_train[[F.col("user")]].distinct().collect()}
        
#         # Step 6. Broadcast data

#         self.br_similarity = self.spark.sparkContext.broadcast(dict_similarity)
#         self.br_P = self.spark.sparkContext.broadcast(P)
#         self.br_U = self.spark.sparkContext.broadcast(U)
        
#         self.top_N_similarities = top_N

#         self.df_train = df_train.persist()

#         return self
        
    def predict(self, user_id, item_id):
        """Predict rating for a given user and item"""
        raise Exception("Not implemented.")
    
    def recommend(self, user_ids, top_N_ratings=None, partition_num=20, grouped=True):
        """
        Recommend top N items to given users.

        Note: Returns non-zero evaluated (predicted) ratings

        Parameters
        ----------
        user_ids : list or set
            List of users for which we want to get recommendations
        top_N_ratings : int
            Number of top ratings that return as a result of prediction for a
            given user
        partition_num : int
            Number of partitions (tasks) to use to calculate predictions
        grouped : boolean
            If True then the method returns value will be a RDD with the following
            format:
                [(used_id : [(movieId, evaluated rating),.. ]),.. ]
            Otherwise a result will be DataFrame with the following format:
                [userId, movieId, evaluated ratings]
        Returns
        -------
        RDD or DataFrame
        """

        top_N_ratings = int(top_N_ratings) if top_N_ratings else self.top_N_ratings

        if not hasattr(self, "df_train"):
            raise Exception("It seems you haven't trained the model.")

        # Remove reference to the current class instance
        _predict_per_partition = ItemBasedRecommend._predict_per_partition
        br_similarity = self.br_similarity
        br_P = self.br_P

        # Predict ratings through the chain of transformations
        rdd_ratings_pred = self.df_train.where(F.col("user").isin(user_ids))\
                .repartition(partition_num, F.col("user"))\
                .sortWithinPartitions(F.col("user")).rdd\
                .mapPartitions(_predict_per_partition(br_similarity, br_P, top_N_ratings, grouped))
        
        return rdd_ratings_pred if grouped else rdd_ratings_pred.toDF()

    @staticmethod
    def _predict_per_partition(br_similarity, br_P, top_N_ratings, grouped):
        """Wrapper for prediction of ratings per partition"""

        def _predict(p_i, PR_u):
            """Predict a rating for a given user and item"""
            wsum_num = 0.0
            wsum_den = 0.0

            for p_j_r, r_j_r in PR_u.items():
                if p_j_r > p_i and (p_i, p_j_r) in br_similarity.value:
                    sim = br_similarity.value[(p_i, p_j_r)]
                    wsum_num += sim * r_j_r
                    wsum_den += sim
                elif p_j_r < p_i and (p_j_r, p_i) in br_similarity.value:
                    sim = br_similarity.value[(p_j_r, p_i)]
                    wsum_num += sim * r_j_r
                    wsum_den += sim

            return wsum_num/wsum_den if wsum_den > 0 else 0.0
        
        def _predict_per_user(PR_u):
            """
            Predict ratings for a given user

            Parameters
            ----------
            PR_u : list or set
                List of tuple (item, rating), where items are those to which the user has set ratings

            Returns
            -------
                List of evaluated ratings

            """
            result = list()
            for p_i in br_P.value:
                if p_i not in PR_u:
                    pred = _predict(p_i, PR_u)
                    if pred > 0:
                        result.append((p_i, pred))
            return result
        
        def _predict_per_partition_inner_grouped(ratings):
            """
            Predict per partition if grouped option is enabled

            Returns
            -------
            RDD
            """
            prev_user = None
            curr_user = None
            for r in ratings:
                curr_user = r["user"]
                if prev_user == curr_user:
                    PR_u[r["item"]] = r["rating"]
                else:
                    if prev_user:
                        yield prev_user, sorted(_predict_per_user(PR_u), key=lambda x: -x[1])[:top_N_ratings]
                    PR_u = dict()
                    PR_u[r["item"]] = r["rating"]
                    prev_user = curr_user
            # Emit values of the last user in the partition
            if curr_user and curr_user == prev_user:
                yield prev_user, sorted(_predict_per_user(PR_u), key=lambda x: -x[1])[:top_N_ratings]

        def _predict_per_partition_inner(ratings):
            """
            Predict per partition if grouped option is disabled

            Returns
            -------
            DataFrame
            """
            prev_user = None
            curr_user = None
            for r in ratings:
                curr_user = r["user"]
                if prev_user == curr_user:
                    PR_u[r["item"]] = r["rating"]
                else:
                    if prev_user:
                        for el in sorted(_predict_per_user(PR_u), key=lambda x: -x[1])[:top_N_ratings]:
                            yield Row(user=prev_user, item=el[0], rating_pred=el[1])
                    PR_u = dict()
                    PR_u[r["item"]] = r["rating"]
                    prev_user = curr_user
            # Emit values of the last user in the partition
            if curr_user and curr_user == prev_user:
                for el in sorted(_predict_per_user(PR_u), key=lambda x: -x[1])[:top_N_ratings]:
                    yield Row(user=prev_user, item=el[0], rating_pred=el[1])
        
        if grouped:
            return _predict_per_partition_inner_grouped
        
        return _predict_per_partition_inner

    def save(self):
        """Save similarities or ratings in external storage"""
        raise Exception("Not implemented.")
        
    def evaluate(self, df_test):
        """RMSE for test data (optimized for large datasets)"""
        br_similarity = getattr(self, "br_similarity", None)
        df_train_local = self.df_train

        # user -> list(item, rating)
        df_user_rats = (
            df_train_local.groupBy("user")
            .agg(F.collect_list(F.struct("item", "rating")).alias("ur_list"))
        )

        df_join = df_test.join(df_user_rats, on="user", how="inner")

        if br_similarity is not None and len(br_similarity.value) < 5_000_000:
            # ========== broadcast быстрый режим ==========
            def _predict_partition(rows):
                sim_dict = br_similarity.value
                for r in rows:
                    ur = r["ur_list"]
                    target_item = r["item"]
                    ws_sum = ws_den = 0.0
                    for x in ur:
                        j_item, r_val = x["item"], x["rating"]
                        if target_item == j_item:
                            continue
                        k1, k2 = (target_item, j_item), (j_item, target_item)
                        sim = sim_dict.get(k1, sim_dict.get(k2, 0.0))
                        if sim > 0:
                            ws_sum += sim * r_val
                            ws_den += sim
                    if ws_den > 0:
                        diff2 = (r["rating"] - ws_sum / ws_den) ** 2
                        yield (1, diff2)

            rdd = df_join.repartition(200).rdd.mapPartitions(_predict_partition)
            sum_count = rdd.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
            return (sum_count[1] / sum_count[0]) ** 0.5 if sum_count[0] else None

        # ========== distributed режим ==========
        if getattr(self, "df_similarity", None) is None:
            self.df_similarity = self.spark.read.parquet("hdfs:///models/similarity_matrix").cache()

        df_sim = self.df_similarity  # [p1, p2, sim]

        # Чёткие алиасы, чтобы не было двусмысленности
        df_test_expanded = (
            df_test
            .select(
                F.col("user"),
                F.col("item").alias("p1_t"),
                F.col("rating").alias("true_rating")
            )
        )

        df_user_items = (
            df_train_local
            .select(
                F.col("user"),
                F.col("item").alias("p2_u"),
                F.col("rating").alias("rating_u")
            )
        )

        # similarity также переименуем
        df_sim_named = df_sim.select(
            F.col("p1").alias("p1_s"),
            F.col("p2").alias("p2_s"),
            F.col("sim")
        )

        # выполняем join по явным алиасам
        df_pred = (
            df_test_expanded.alias("t")
            .join(df_user_items.alias("u"), "user")
            .join(
                df_sim_named.alias("s"),
                (F.col("t.p1_t") == F.col("s.p1_s")) &
                (F.col("u.p2_u") == F.col("s.p2_s")),
                "left"
            )
            .select(
                F.col("t.user"),
                F.col("t.p1_t").alias("item"),
                F.col("t.true_rating"),
                (F.col("s.sim") * F.col("u.rating_u")).alias("weighted_r"),
                F.col("s.sim")
            )
            .groupBy("user", "item", "true_rating")
            .agg(
                F.sum("weighted_r").alias("sum_wr"),
                F.sum("sim").alias("sum_sim")
            )
            .withColumn("pred", F.when(F.col("sum_sim") > 0, F.col("sum_wr") / F.col("sum_sim")))
            .filter(F.col("pred").isNotNull())
            .withColumn("diff2", (F.col("true_rating") - F.col("pred")) ** 2)
        )

        stat = df_pred.agg(F.count("*").alias("n"), F.sum("diff2").alias("s")).collect()[0]
        return (stat["s"] / stat["n"]) ** 0.5 if stat["n"] else None
    
    def evaluate_1(self, df_test):
        """RMSE for test data"""
        
        # Разделяем данные на партиции, чтобы каждая считала свои результаты
        br_similarity = self.br_similarity
        df_train_local = self.df_train
        top_items = self.br_P.value

        # Подготовим удобный dataframe user-item-rating для справки
        df_user_rats = df_train_local.groupBy("user").agg(
            F.collect_list(F.struct("item", "rating")).alias("ur_list")
        )
        df_join = df_test.join(df_user_rats, on="user", how="inner")

        # Создадим функцию для оценки на каждой партиции
        def _predict_partition(rows):
            for r in rows:
                ur = r["ur_list"]
                target_item = r["item"]
                ws_sum = 0.0
                ws_den = 0.0
                for x in ur:
                    j_item, r_val = x["item"], x["rating"]
                    if target_item == j_item:
                        continue
                    k1, k2 = (target_item, j_item), (j_item, target_item)
                    sim = 0.0
                    if k1 in br_similarity.value:
                        sim = br_similarity.value[k1]
                    elif k2 in br_similarity.value:
                        sim = br_similarity.value[k2]
                    if sim > 0:
                        ws_sum += sim * r_val
                        ws_den += sim
                pred = ws_sum / ws_den if ws_den > 0 else None
                if pred is not None:
                    diff2 = (r["rating"] - pred) ** 2
                    yield (1, diff2)  # счетчик и сумма квадратов ошибок

        # Применяем mapPartitions → редукция
        rdd = df_join.repartition(200).rdd.mapPartitions(_predict_partition)
        sum_count = rdd.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
        if sum_count[0] == 0:
            return None
        rmse = (sum_count[1] / sum_count[0]) ** 0.5
        return rmse

    def recommend_and_show(self, num_rows_to_display=None, **kwargs):
        """
        Launch the action operation to calculate recommendations, collect data
        on the driver and display them.
        """
        if num_rows_to_display:
            num_rows_to_display = int(num_rows_to_display)

        result = self.recommend(**kwargs)

        if isinstance(result, RDD):
            return result.take(num_rows_to_display) if num_rows_to_display else result.collect()
        elif isinstance(result, DataFrame):
            return result.show(num_rows_to_display) if num_rows_to_display else result.show()


In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("itemBasedRecommendationTest") \
        .config("spark.driver.memory", "32g") \
        .config("spark.executor.memory", "24g") \
        .config("spark.memory.fraction", "0.8") \
        .config("spark.memory.storageFraction", "0.3") \
        .config("spark.sql.shuffle.partitions", "2000") \
        .getOrCreate()

In [7]:
#!sudo apt install tree
!tree dataset
!ls -lah dataset/ml-latest-small/ratings.csv
!ls -lah dataset/ml-latest/ratings.csv

[01;34mdataset[00m
├── [01;34mml-latest[00m
│   ├── genome-scores.csv
│   ├── genome-tags.csv
│   ├── links.csv
│   ├── movies.csv
│   ├── ratings.csv
│   ├── README.txt
│   └── tags.csv
└── [01;34mml-latest-small[00m
    ├── links.csv
    ├── movies.csv
    ├── ratings.csv
    ├── README.txt
    └── tags.csv

2 directories, 12 files
-rw-r--r-- 1 ubuntu ubuntu 2.4M Sep 26  2018 dataset/ml-latest-small/ratings.csv
-rw-rw-r-- 1 ubuntu ubuntu 891M Jul 20  2023 dataset/ml-latest/ratings.csv


In [8]:
BASE_PATH = "/home/ubuntu/Documents/BigDataHWs/hw4/dataset"

SMALL_RATINGS_FILE = f"file://{BASE_PATH}/ml-latest-small/ratings.csv"
FULL_RATINGS_FILE = f"file://{BASE_PATH}/ml-latest/ratings.csv"

RATINGS_FILE = SMALL_RATINGS_FILE

In [9]:
df_rating = spark.read.load(RATINGS_FILE,
                          format="csv",
                          header="true",
                          inferSchema="true",
                          sep=",")

df_rating.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [10]:
df_rating_true = df_rating.\
    drop("timestamp").\
    withColumnRenamed("userId", "user").\
    withColumnRenamed("movieId", "item")

print("Total number of ratings:", df_rating_true.count())
df_rating_true.show(5)

Total number of ratings: 100836
+----+----+------+
|user|item|rating|
+----+----+------+
|   1|   1|   4.0|
|   1|   3|   4.0|
|   1|   6|   4.0|
|   1|  47|   5.0|
|   1|  50|   5.0|
+----+----+------+
only showing top 5 rows



In [11]:
df_train, df_test = df_rating_true.randomSplit([0.8, 0.2], seed=7)
df_train.persist(); df_test.persist()

DataFrame[user: int, item: int, rating: double]

In [12]:
df_train.show(5)

+----+----+------+
|user|item|rating|
+----+----+------+
|   1|   3|   4.0|
|   1|   6|   4.0|
|   1|  47|   5.0|
|   1|  50|   5.0|
|   1|  70|   3.0|
+----+----+------+
only showing top 5 rows



In [13]:
mean_rating = df_train.agg(F.avg('rating')).collect()[0][0]

print(f'Средний рейтинг фильмов в обучающем множестве: {mean_rating}')

Средний рейтинг фильмов в обучающем множестве: 3.5033925166530473


In [14]:
df_test_predict_mean = df_test.\
                        withColumn('pred', F.lit(mean_rating)).\
                        withColumn('SquaredError', (F.col("rating") - F.col("pred")) ** 2)

df_test_predict_mean.show(5)

+----+----+------+------------------+------------------+
|user|item|rating|              pred|      SquaredError|
+----+----+------+------------------+------------------+
|   1|   1|   4.0|3.5033925166530473|0.2466189925161939|
|   1| 157|   5.0|3.5033925166530473|2.2398339592100993|
|   1| 235|   4.0|3.5033925166530473|0.2466189925161939|
|   1| 260|   5.0|3.5033925166530473|2.2398339592100993|
|   1| 362|   5.0|3.5033925166530473|2.2398339592100993|
+----+----+------+------------------+------------------+
only showing top 5 rows



In [15]:
rmse_base = (df_test_predict_mean.agg(F.sum('SquaredError')).collect()[0][0] / df_test_predict_mean.count()) ** 0.5

print(f"RMSE, когда все предсказания равны среднему рейтингу: {rmse_base}")

RMSE, когда все предсказания равны среднему рейтингу: 1.0500201634697948


## TEST

In [16]:
top_N = 20

user_column_name="user"
item_column_name="item"
rating_column_name="rating"

clmn_names = [F.col(user_column_name).alias("user"),
                      F.col(item_column_name).alias("item"),
                      F.col(rating_column_name).alias("rating")]

clmn_names

[Column<b'user AS `user`'>,
 Column<b'item AS `item`'>,
 Column<b'rating AS `rating`'>]

In [17]:
df_train = df_train.select(clmn_names)
df_train.persist()
df_train.show(5)

+----+----+------+
|user|item|rating|
+----+----+------+
|   1|   3|   4.0|
|   1|   6|   4.0|
|   1|  47|   5.0|
|   1|  50|   5.0|
|   1|  70|   3.0|
+----+----+------+
only showing top 5 rows



In [18]:
left_clmn_names = [F.col("user").alias("u"),
           F.col("item").alias("p1"),
           F.col("rating").alias("v1")]

right_clmn_names = [F.col("user").alias("u"),
            F.col("item").alias("p2"),
            F.col("rating").alias("v2")]

In [22]:
# Step 1. Create dot products

df_dot = df_train.select(left_clmn_names) \
    .join(df_train.select(right_clmn_names), on="u") \
    .where(F.col("p1") < F.col("p2")) \
    .groupBy([F.col("p1"), F.col("p2")]) \
    .agg(F.sum(F.col("v1") * F.col("v2")).alias("dot"))

df_dot.show(5)

+----+----+------+
|  p1|  p2|   dot|
+----+----+------+
| 101|2033|  35.5|
| 590|2174|349.25|
|1031|3439|  32.0|
|1282|2450|  56.5|
|1617|2916| 349.0|
+----+----+------+
only showing top 5 rows



In [23]:
# Step 2. Calculate norms

df_norm = df_train \
    .withColumn("sq", F.col("rating") ** 2) \
    .groupBy("item") \
    .agg(F.sqrt(F.sum("sq")).alias("norm")) \
    .withColumnRenamed("item", "p1")

# df_norm = df_train.select(left_clmn_names) \
#     .groupBy(F.col("p1")) \
#     .agg(F.sqrt(F.sum(F.col("v1") * F.col("v1"))).alias("norm"))

df_norm.show(5)

+----+------------------+
|  p1|              norm|
+----+------------------+
|2366|15.827191791344413|
|1088|21.418449990603897|
|4519| 9.772410142846033|
|1591|12.338962679253067|
|3918|10.307764064044152|
+----+------------------+
only showing top 5 rows



In [25]:
similarity_clmns = [F.col("p1"), F.col("p2"), (F.col("dot")/F.col("n1")/F.col("n2")).alias("sim")]

similarity_clmns

[Column<b'p1'>, Column<b'p2'>, Column<b'((dot / n1) / n2) AS `sim`'>]

In [34]:
%time
# Step 3. Calculate similarities

# # Проверим фактические имена столбцов в df_norm
# print(df_norm.columns)
# # Допустим, вывод: ['p1', 'norm'] (если столбец уже p1),
# # либо ['item', 'norm'] — подставляем автоматически.

# # Унифицируем во временном df_norm_fixed с нужными именами:
# if "item" in df_norm.columns:
#     df_norm_fixed = df_norm.select(
#         F.col("item").alias("item_key"),
#         F.col("norm")
#     )
# else:
#     df_norm_fixed = df_norm.select(
#         F.col("p1").alias("item_key"),
#         F.col("norm")
#     )

# # Один широковещательный экземпляр, чтобы не гнать shuffle
# df_norm_b = F.broadcast(df_norm_fixed)

# # Явные объекты для join с уникальными именами
# norm_p1 = df_norm_b.select(
#     F.col("p1").alias("p1_key"),
#     F.col("norm").alias("n1")
# )
# norm_p2 = df_norm_b.select(
#     F.col("p2").alias("p2_key"),
#     F.col("norm").alias("n2")
# )

# # Основной расчёт
# df_similarity = (
#     df_dot
#     .join(norm_p1, df_dot.p1 == norm_p1.p1_key, "inner")
#     .join(norm_p2, df_dot.p2 == norm_p2.p2_key, "inner")
#     .select(
#         df_dot.p1.alias("p1"),
#         df_dot.p2.alias("p2"),
#         (F.col("dot") / (F.col("n1") * F.col("n2"))).alias("sim")
#     )
# )

# df_similarity.persist()

df_norm_cached = df_norm.persist()

df_similarity = df_dot \
    .join(df_norm_cached.select(F.col("p1"), F.col("norm").alias("n1")), on="p1") \
    .join(df_norm_cached.select(F.col("p1").alias("p2"), F.col("norm").alias("n2")), on="p2") \
    .select(similarity_clmns)

df_similarity.persist()

df_similarity.show(5)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.72 µs
+----+----+-------------------+
|  p1|  p2|                sim|
+----+----+-------------------+
| 101|2033|0.23471106442350215|
| 590|2174|0.24817296741926048|
|1031|3439|0.20931133554085943|
|1282|2450|0.28442234751656525|
|1617|2916| 0.3069735548256318|
+----+----+-------------------+
only showing top 5 rows



In [35]:
df_similarity.count()

9765973

In [36]:
# Step 4. Truncate similarities

# window = Window.partitionBy(df_similarity["p1"]).orderBy(df_similarity["sim"].desc())
# df_similarity_N = df_similarity.select("*", F.rank().over(window).alias("rank"))\
#             .filter(F.col("rank") <= top_N)

# df_similarity_N.where(F.col("p1") == 5943).show(100)

In [44]:
# prediction

user = 123
movieId = 96829

df_sim_filtered = df_similarity\
    .filter((F.col("p1") == movieId) | (F.col('p2') == movieId)).sort('p2', ascending=True)

df_predict_example = df_train\
    .where(F.col('user') == user)\
    .join(df_similarity, ((df_train.item == df_sim_filtered.p1) & (df_sim_filtered.p2 == movieId)) | 
                         ((df_train.item == df_sim_filtered.p2) & (df_sim_filtered.p1 == movieId))) \
    .sort('sim', ascending=False) \
    .limit(top_N)

df_predict_example2 = df_train.where(F.col('user') == user).join(df_norm, df_norm.p1 == df_train.item)

w_sum = df_predict_example\
    .select(F.sum(F.col("rating") * F.col("sim")).alias("weighted_sum"))\
    .collect()[0]["weighted_sum"]

w_norms = df_predict_example.agg(F.sum('sim')).collect()[0][0]

print(f"Prediction of rating for user {user} for movie {movieId}:")
print(w_sum / w_norms)

df_predict_example.show(100)

Prediction of rating for user 123 for movie 96829:
4.1205706441353716
+----+------+------+-----+------+-------------------+
|user|  item|rating|   p1|    p2|                sim|
+----+------+------+-----+------+-------------------+
| 123|104879|   4.0|96829|104879| 0.3209761539224515|
| 123|112552|   3.5|96829|112552| 0.2997193396242945|
| 123| 68554|   4.5|68554| 96829|0.25986315314949343|
| 123| 91542|   4.0|91542| 96829| 0.2505657519209857|
| 123|109487|   4.5|96829|109487|0.24915196995329322|
| 123| 51540|   4.0|51540| 96829|0.22468608298819884|
| 123| 99114|   4.5|96829| 99114|0.22222097541033137|
| 123|  2959|   4.5| 2959| 96829|0.21772448600571395|
| 123| 58803|   4.5|58803| 96829|0.20076712259557772|
| 123|  6016|   3.5| 6016| 96829| 0.1981394871073563|
| 123|112556|   4.5|96829|112556|0.19346644054835044|
| 123| 60684|   4.0|60684| 96829|  0.186581493359926|
| 123| 68157|   4.0|68157| 96829|0.17774415423474987|
| 123| 91500|   3.5|91500| 96829|0.17316948846420208|
| 123|115569

In [38]:
df_test.where(F.col('user') == 123).show()

+----+-----+------+
|user| item|rating|
+----+-----+------+
| 123|  293|   3.5|
| 123| 2329|   4.5|
| 123|96829|   4.5|
| 123|97923|   4.0|
+----+-----+------+



In [46]:
class ItemBasedCF:
    
    def __init__(self, spark, top_N_similarities=20, top_N_ratings=None):
        self.spark = spark
        self.top_N_similarities = int(top_N_similarities)
        self.top_N_ratings = int(top_N_ratings) if top_N_ratings else None
        
    def train(self, df_train, top_N=None, user_column_name="user", item_column_name="item",
              rating_column_name="rating"):
        top_N = int(top_N) if top_N else self.top_N_similarities
        user_column_name = str(user_column_name)
        item_column_name = str(item_column_name)
        rating_column_name = str(rating_column_name)
        
        clmn_names = [F.col(user_column_name).alias("user"),
                      F.col(item_column_name).alias("item"),
                      F.col(rating_column_name).alias("rating")]
        
        df_train = df_train.select(clmn_names)
        df_train.persist()
        
        left_clmn_names = [F.col("user").alias("u"),
           F.col("item").alias("p1"),
           F.col("rating").alias("v1")]

        right_clmn_names = [F.col("user").alias("u"),
                    F.col("item").alias("p2"),
                    F.col("rating").alias("v2")]
        
        # Step 1. Create dot products
        df_dot = df_train.select(left_clmn_names) \
            .join(df_train.select(right_clmn_names), on="u") \
            .where(F.col("p1") < F.col("p2")) \
            .groupBy([F.col("p1"), F.col("p2")]) \
            .agg(F.sum(F.col("v1") * F.col("v2")).alias("dot"))
        
        # Step 2. Calculate norms
        df_norm = df_train \
            .withColumn("sq", F.col("rating") ** 2) \
            .groupBy("item") \
            .agg(F.sqrt(F.sum("sq")).alias("norm")) \
            .withColumnRenamed("item", "p1")
        
        similarity_clmns = [F.col("p1"), F.col("p2"), (F.col("dot")/F.col("n1")/F.col("n2")).alias("sim")]
        
        # Step 3. Calculate similarities
        df_norm_cached = F.broadcast(df_norm)

        df_similarity = df_dot \
            .join(df_norm_cached.select(F.col("p1"), F.col("norm").alias("n1")), on="p1") \
            .join(df_norm_cached.select(F.col("p1").alias("p2"), F.col("norm").alias("n2")), on="p2") \
            .select(similarity_clmns)

        df_similarity.persist()
        
        self.df_train = df_train
        
        return self

    def predict(self, user_id, item_id):
        """Predict rating for a given user and item"""
        user = user_id
        movieId = item_id

        df_sim_filtered = df_similarity\
            .filter((F.col("p1") == movieId) | (F.col('p2') == movieId)).sort('p2', ascending=True)

        df_predict_example = df_train\
            .where(F.col('user') == user)\
            .join(df_similarity, ((df_train.item == df_sim_filtered.p1) & (df_sim_filtered.p2 == movieId)) | 
                                 ((df_train.item == df_sim_filtered.p2) & (df_sim_filtered.p1 == movieId))) \
            .sort('sim', ascending=False) \
            .limit(top_N)

        df_predict_example2 = df_train.where(F.col('user') == user).join(df_norm, df_norm.p1 == df_train.item)

        w_sum = df_predict_example\
            .select(F.sum(F.col("rating") * F.col("sim")).alias("weighted_sum"))\
            .collect()[0]["weighted_sum"]

        w_norms = df_predict_example.agg(F.sum('sim')).collect()[0][0]
        
        return w_sum / w_norms
    
     def evaluate(self, df_test):

### Обучение модели

In [None]:
model = ItemBasedRecommend(spark)

In [None]:
model.train(df_train)

In [None]:
rmse_cf = model.evaluate(df_test)
print(f"RMSE модели item-based CF: {rmse_cf:.4f}")

In [None]:
model = ItemBasedRecommend(spark)

In [None]:
model.train(df_train)

In [None]:
model.recommend_and_show(user_ids=model.br_U.value, grouped=False)

In [None]:
spark.stop()