# Installation

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!cp /content/drive/MyDrive/MMDS-data/spark-3.1.1-bin-hadoop3.2.tgz .
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

# Assignment

In [None]:
import pyspark
import os
import numpy as np
from pyspark.sql import SparkSession,\
                        types as T,\
                        functions as F,\
                        DataFrame,\
                        Row
from pyspark.mllib.linalg import distributed, SparseVector

## Define

In [None]:
class CollaborativeFiltering:
    def __init__(self,
                 N: int,
                 df: DataFrame,
                 user_column: str="user",
                 item_column: str="item",
                 rating_column: str="rating") -> None:
        self.N = N
        self.df = df
        self.user_column = user_column
        self.item_column = item_column
        self.rating_column = rating_column

        self.df_user_vector = self._create_sparse_vectors(
            df=self.df,
            user_column=self.user_column,
            item_column=self.item_column,
            rating_column=self.rating_column)


    def _create_sparse_vectors(self,
                               df: DataFrame,
                               user_column: str="user",
                               item_column: str="item",
                               rating_column: str="rating") -> DataFrame:
        # Group item and rating by user
        df_user_items_ratings = df\
            .distinct()\
            .orderBy(item_column, ascending=True)\
            .groupBy(user_column)\
            .agg(F.collect_list(F.col(item_column)).alias("items"),
                 F.collect_list(F.col(rating_column)).alias("ratings"))

        # Create Sparse Vector by user
        num_items = df.distinct()\
                      .select(item_column)\
                      .distinct()\
                      .count()

        df_user_vectors = df_user_items_ratings\
          .rdd\
          .map(lambda row: (row[user_column],
                            SparseVector(num_items,
                                        row["items"],
                                        row["ratings"])))\
          .toDF([user_column, "vector"])

        return df_user_vectors


    def _get_n_similar_users(self,
                             df: DataFrame,
                             user_vector: SparseVector,
                             n_user: int) -> DataFrame:
        def calculate_similarity(rx: SparseVector,
                                          ry: SparseVector) -> float:
            try:
                # Calculate weighted avg of vectors
                m_rx = rx.values.sum() / len(rx.values)
                m_ry = ry.values.sum() / len(ry.values)

                # Get Sxy: set of items is rated by both users x and y
                S_xy,\
                rx_intersection_indices,\
                ry_intersection_indices = np.intersect1d(rx.indices,
                                                        ry.indices,
                                                        return_indices=True)

                # rx_s, ry_s
                rx_s = rx.values[rx_intersection_indices]
                ry_s = ry.values[ry_intersection_indices]

                # rx_s - avg rx, ry_s - avg ry
                sub_rx_s = rx_s - m_rx
                sub_ry_s = ry_s - m_ry

                # Pearson correlation coefficient
                pearson = sum(sub_rx_s * sub_ry_s) / \
                          (np.linalg.norm(sub_rx_s) * np.linalg.norm(sub_ry_s))

                return float(pearson)
            except:
                return 0.0

        #----------------------------------------------------
        n_similar_users = df\
            .rdd\
            .map(lambda row:\
                Row(user=row["user"],
                    vector=row["vector"],
                    coefficient=calculate_similarity(user_vector,
                                                     row["vector"])))\
            .toDF()\
            .dropna()\
            .orderBy(F.col("coefficient").desc())\
            .limit(n_user)

        return n_similar_users


    def predict(self,
                user_vector: SparseVector,
                n_items: int) -> DataFrame:
        # Find N similar users
        self.df_n_similar_users = self._get_n_similar_users(
            self.df_user_vector,
            user_vector,
            self.N)

        # Prediction for items of user: (rx = user_vector)

## Test

### Create Spark Session

In [None]:
ss = SparkSession\
      .builder\
      .master("local[*]")\
      .appName("Endterm Q3 - Collaborative Filtering")\
      .getOrCreate()

In [None]:
ss

### Set up and read data

In [None]:
# Create a symbolic link to ratings2k.csv if current dir not exist ratings2k.csv
inputFile = "ratings2k.csv"
inputPath = f"/content/drive/MyDrive/HK1 2024 - 2025/Xử lý dữ liệu lớn/Cuối kỳ/{inputFile}"

if not os.path.exists(inputFile):
  os.symlink(inputPath, inputFile)

In [None]:
# Read input data
schema = T.StructType([
    T.StructField(name="index", dataType=T.IntegerType()),
    T.StructField(name="user", dataType=T.IntegerType()),
    T.StructField(name="item", dataType=T.IntegerType()),
    T.StructField(name="rating", dataType=T.DoubleType())
])

df = ss.read.csv(path=inputFile,
                 schema=schema,
                 header=True)

In [None]:
(df.count(),
 df.distinct().count(),
 df.select(["user", "item", "rating"]).distinct().count())

(2365, 2365, 2365)

In [None]:
df.show(truncate=False)

+-----+----+----+------+
|index|user|item|rating|
+-----+----+----+------+
|0    |73  |52  |4.0   |
|1    |36  |239 |3.0   |
|2    |72  |26  |1.0   |
|3    |59  |430 |2.5   |
|4    |72  |284 |3.0   |
|5    |36  |277 |3.0   |
|6    |72  |426 |4.0   |
|7    |18  |163 |3.0   |
|8    |67  |93  |4.0   |
|9    |59  |22  |3.5   |
|10   |8   |174 |2.0   |
|11   |5   |149 |2.0   |
|12   |26  |322 |3.5   |
|13   |8   |416 |4.0   |
|14   |31  |25  |2.0   |
|15   |41  |83  |2.0   |
|16   |25  |321 |3.0   |
|17   |47  |193 |3.0   |
|18   |9   |455 |4.0   |
|19   |43  |216 |3.5   |
+-----+----+----+------+
only showing top 20 rows



In [None]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: double (nullable = true)



In [None]:
df.summary().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|            index|              user|              item|            rating|
+-------+-----------------+------------------+------------------+------------------+
|  count|             2365|              2365|              2365|              2365|
|   mean|           1182.0|38.002536997885834|221.79957716701904| 3.641860465116279|
| stddev|682.8610156295838| 23.50116086683459|130.38801426427398|1.0067781732668075|
|    min|                0|                 1|                 0|               0.5|
|    25%|              591|                15|               114|               3.0|
|    50%|             1182|                36|               214|               4.0|
|    75%|             1773|                61|               324|               4.0|
|    max|             2364|                75|               466|               5.0|
+-------+-----------------+------------------+------------------+

### Test Collaborative Filtering

In [None]:
cf = CollaborativeFiltering(5, df)

In [None]:
cf.df_user_vector.show(truncate=False)

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
test_vector = cf.df_user_vector.filter(F.col("user") == 53).collect()[0]["vector"]

In [None]:
cf.predict(test_vector, 4)

In [None]:
cf.df_n_similar_users.show()

+----+--------------------+-----------+
|user|              vector|coefficient|
+----+--------------------+-----------+
|  64|(467,[0,89,144,17...|        1.0|
|  45|(467,[53,54,75,76...|        1.0|
|  15|(467,[67,77,119,1...|        1.0|
|  53|(467,[288,324,333...|        1.0|
|  37|(467,[25,34,36,61...|        1.0|
+----+--------------------+-----------+

