# Visão Geral: Filtragem Colaborativa Item-Item com Agrupamento

Este notebook apresenta um sistema de recomendação baseado em filtragem colaborativa item-item que utiliza **clustering KMeans** para melhorar a escalabilidade e eficiência. A abordagem foi projetada para recomendar filmes aos users com base na similaridade entre itens dentro de clusters.

## Abordagem

1. **Preparação dos Dados**: Carregamento do dataset MovieLens e divisão em conjuntos de treino e teste.
2. **Vetorização dos Itens**: Cada filme é representado como um vetor esparso das avaliações dos users.
3. **Normalização**: Os vetores dos itens são normalizados utilizando a norma L2 para garantir comparações justas de similaridade.
4. **Agrupamento com KMeans**: Os itens são agrupados em clusters usando o KMeans. Isso restringe o cálculo de similaridade apenas aos itens do mesmo cluster, reduzindo o custo computacional.
  - O número de clusters pode ser ajustado conforme o tamanho do dataset e o nível de granularidade desejado.
  - Cada item é atribuído a um cluster, que será usado para limitar a busca por itens similares.

5. **Similaridade Cosseno dentro dos Clusters**: Para cada par de itens no mesmo cluster, calcula-se a similaridade cosseno:
  
  $$
  \text{cosine\_sim}(i, j) = \frac{\vec{v}_i \cdot \vec{v}_j}{\|\vec{v}_i\| \|\vec{v}_j\|}
  $$
  onde $\vec{v}_i$ e $\vec{v}_j$ são os vetores normalizados dos itens $i$ e $j$.

6. **Predição**: Para cada usuário e filme alvo no conjunto de teste, prevê-se a nota usando uma média ponderada das avaliações do usuário para filmes similares no mesmo cluster:
  
  $$
  \hat{r}_{ui} = \frac{\sum_{j \in N(i)} s_{ij} \cdot r_{uj}}{\sum_{j \in N(i)} |s_{ij}|}
  $$
  - $N(i)$: conjunto de vizinhos do filme $i$ (dentro do mesmo cluster)
  - $s_{ij}$: similaridade entre o filme $i$ e o filme $j$
  - $r_{uj}$: avaliação dada pelo usuário $u$ ao filme $j$

7. **Avaliação**: As predições são avaliadas utilizando RMSE e MAE para medir a precisão das recomendações.

## Por que usar clustering?

Ao agrupar os itens, limitamos a busca por similaridade a grupos menores, acelerando o processo e tornando-o viável para grandes volumes de dados. Essa abordagem equilibra a qualidade das recomendações e a eficiência computacional.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, struct, sum as sql_sum, udf
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.ml.feature import Normalizer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import time
import math
import os

In [None]:
start_time = time.time() # para controlor o tempo de duracao

In [None]:
spark = SparkSession.builder \
    .appName("ItemItemCF") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

25/05/31 00:16:18 WARN Utils: Your hostname, cristianonicolau.local resolves to a loopback address: 127.0.0.1; using 192.168.1.122 instead (on interface en0)
25/05/31 00:16:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/31 00:16:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/31 00:16:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
df ={
    "small": "./data/ml-latest-small/ratings.csv",
    "ml-1m": "./data/ml-1m/ratings.csv",
    "ml-10m": "./data/ml-10m/ratings.csv",
    "ml-20m": "./data/ml-20m/ratings.csv",
    "ml-25m": "./data/ml-25m/ratings.csv",
}

data_sel = "ml-1m" 

Abaixo defino os caminhos para diferentes datasets do MovieLens. A ideia é quando na primeira exucaçao para dar download aos datasets usar o codigo abaixo

In [5]:
# dataset_links = {
#     "small": "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip",
#     "ml-1m": "https://files.grouplens.org/datasets/movielens/ml-1m.zip",
#     "ml-10m": "https://files.grouplens.org/datasets/movielens/ml-10m.zip",
#     "ml-20m": "https://files.grouplens.org/datasets/movielens/ml-20m.zip",
#     "ml-25m": "https://files.grouplens.org/datasets/movielens/ml-25m.zip"
# }

# def download_and_extract_dataset(dataset_name):
#     if dataset_name not in dataset_links:
#         raise ValueError(f"Dataset '{dataset_name}' not found. Available datasets: {list(dataset_links.keys())}")

#     url = dataset_links[dataset_name]
#     os.system(f"wget {url} -O {dataset_name}.zip")
#     os.system(f"unzip {dataset_name}.zip -d {dataset_name}")
#     print(f"Downloaded and extracted {dataset_name} dataset.")

In [6]:
PATH = df[data_sel]
# download_and_extract_dataset("ml-20m")  # Change to desired dataset
# PATH = "ml-20m/ml-20m/ratings.csv"  # Adjust path based on the dataset
data = spark.read.csv(PATH, header=True, inferSchema=True) \
            .select("userId", "movieId", "rating")

print("=== DATASET STATISTICS ===")
print("Number of ratings: ", data.count())
print("Average rating: ", data.agg(F.avg("rating")).first()[0])
print("Minimum rating: ", data.agg(F.min("rating")).first()[0])
print("Maximum rating: ", data.agg(F.max("rating")).first()[0])
print("Number of users: ", data.select("userId").distinct().count())
print("Number of movies: ", data.select("movieId").distinct().count())

data.take(5)

                                                                                

=== DATASET STATISTICS ===
Number of ratings:  1000209
Average rating:  3.581564453029317
Minimum rating:  1
Maximum rating:  5
Number of users:  6040
Number of movies:  3706


[Row(userId=1, movieId=1193, rating=5),
 Row(userId=1, movieId=661, rating=3),
 Row(userId=1, movieId=914, rating=3),
 Row(userId=1, movieId=3408, rating=4),
 Row(userId=1, movieId=2355, rating=5)]

In [None]:
# split dos dados em treino e teste, 90 e 10 respetivamente
ratings, test = data.randomSplit([0.9, 0.1], seed=42)
print(f"Training set size: {ratings.count()}")
print(f"Test set size: {test.count()}")
ratings.cache()
test.cache()

                                                                                

Training set size: 899812
Test set size: 100397


DataFrame[userId: int, movieId: int, rating: int]

### Criação dos Vetores dos Itens
A eguir transformo as avaliações dos users em vetores esparsos, onde cada dimensão representa um user e o valor é a nota atribuída ao item, é necessartio para calcular similaridades entre os itens de forma eficiente.

In [None]:
def to_sparse_vector(user_ratings, size):
    # Sort by userId garantindo que os indices estao ordenados
    sorted_pairs = sorted(user_ratings, key=lambda x: x.userId)
    indices = [x.userId - 1 for x in sorted_pairs] # Assuming userIds start at 1
    values = [float(x.rating) for x in sorted_pairs]
    return Vectors.sparse(size, indices, values)

In [9]:
spark.udf.register("to_sparse_vector", to_sparse_vector)

item_user = ratings.groupBy("movieId") \
    .agg(collect_list(struct("userId", "rating")).alias("user_ratings"))

### Agrupamento das Avaliações por Item
Registramos a função de conversão para vetor esparso como UDF no Spark e agrupamos as avaliações por `movieId`, preparando os dados para a criação dos vetores de características dos itens.

### Criação dos Vetores de Características dos Itens
Para cada item, criamos um vetor esparso representando as avaliações dos users. Esses vetores serão usados para calcular similaridades entre itens e para o agrupamento via clustering.

In [10]:
num_users = ratings.select("userId").distinct().count()

item_vectors_rdd = item_user.rdd.map(
    lambda row: Row(
        movieId=row["movieId"],
        features=to_sparse_vector(row["user_ratings"], num_users)
    )
)
item_vectors = spark.createDataFrame(item_vectors_rdd)
item_vectors.cache()
print("=== ITEM VECTORS ===")
item_vectors.show(5, truncate=False)

                                                                                

=== ITEM VECTORS ===


[Stage 44:>                                                         (0 + 8) / 8]

+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

### Normalização dos Vetores
Normalizamos os vetores dos itens utilizando a norma L2. Isso permite que a distância Euclidiana aproxime a similaridade cosseno, facilitando o uso de clustering e cálculo de similaridades.

In [11]:
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
normalized_item_vectors = normalizer.transform(item_vectors)
normalized_item_vectors.cache() # Cache for LSH

print("Normalized Item Vectors (sample):")
normalized_item_vectors.show(5, truncate=False)

Normalized Item Vectors (sample):
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Agrupamento dos Itens com KMeans
Aplicamos o algoritmo de clustering KMeans sobre os vetores normalizados dos itens. Cada item é atribuído a um cluster, o que permite restringir a busca de vizinhos apenas aos itens do mesmo cluster, tornando o processo mais eficiente.

In [12]:
num_items = item_vectors.count()
k = max(10, min(100, int(math.sqrt(num_items))))
kmeans = KMeans(k=k, seed=42, featuresCol="norm_features", predictionCol="cluster")
kmeans_model = kmeans.fit(normalized_item_vectors)
clustered_items = kmeans_model.transform(normalized_item_vectors).select("movieId", "cluster", "features")


25/05/31 00:16:30 WARN Instrumentation: [0b12aa93] Input vectors will be blockified to blocks, and then cached during training. Be careful of double caching!
25/05/31 00:16:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/31 00:16:33 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:16:33 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB


In [13]:
from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import DoubleType
def cosine_sim(v1, v2):
    return float(v1.dot(v2)) / (float(v1.norm(2)) * float(v2.norm(2)))

cosine_sim_udf = udf(lambda x, y: float(cosine_sim(x, y)), returnType=DoubleType())


### Definição da Similaridade Cosseno
Definimos uma função para calcular a similaridade cosseno entre dois vetores. Essa métrica será utilizada para identificar itens similares dentro de cada cluster.

### Procuramos por Itens Similares dentro de Clusters
Realizamos um cross join entre os itens de cada cluster e calculamos a similaridade cosseno entre eles. Apenas pares distintos são considerados. Os pares são duplicados para garantir simetria.

In [None]:
# Primeiro, vamos fazer o cross join dos itens que pertencem ao mesmo cluster
cross_joined = clustered_items.alias("a").join(
    clustered_items.alias("b"),
    (col("a.cluster") == col("b.cluster")) & (col("a.movieId") < col("b.movieId"))
)

# Em seguida, calculamos a similaridade cosseno entre os pares de itens
similarities = cross_joined.withColumn(
    "cosine_sim", cosine_sim_udf(col("a.features"), col("b.features"))
).select(
    col("a.movieId").alias("i_mv"),
    col("b.movieId").alias("j_mv"),
    "cosine_sim"
)

# Agora, vamos unir as similaridades para garantir que cada par (i, j) e (j, i) apareça
similarities = similarities.union(
    similarities.selectExpr("j_mv as i_mv", "i_mv as j_mv", "cosine_sim")
).cache()


In [None]:
# Começamos por unir os dados de teste com as similaridades
test_neighbors = test.alias("t") \
    .join(similarities.alias("s"), col("t.movieId") == col("s.i_mv"))

# Em seguida, unimos com os ratings para obter as classificações dos vizinhos
test_with_ratings = test_neighbors \
    .join(ratings.alias("r"), (col("t.userId") == col("r.userId")) & (col("s.j_mv") == col("r.movieId"))) \
    .select(
        col("t.userId"),
        col("t.movieId").alias("target_movie"),
        col("s.j_mv").alias("neighbor_movie"),
        col("s.cosine_sim"),
        col("r.rating").alias("neighbor_rating")
    )

print("Test Data with Neighbor Ratings (sample):")
test_with_ratings.show(5, truncate=False)

Test Data with Neighbor Ratings (sample):


25/05/31 00:16:34 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:16:34 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:16:34 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:16:34 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:16:35 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:17:19 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:17:22 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
[Stage 168:>                                                        (0 + 1) / 1]

+------+------------+--------------+-------------------+---------------+
|userId|target_movie|neighbor_movie|cosine_sim         |neighbor_rating|
+------+------------+--------------+-------------------+---------------+
|1     |588         |531           |0.25318648459870136|4              |
|1     |595         |531           |0.2682293510466324 |4              |
|1     |588         |594           |0.45936688757507194|4              |
|1     |595         |594           |0.5126766114118322 |4              |
|1     |588         |1566          |0.42012325590165744|4              |
+------+------------+--------------+-------------------+---------------+
only showing top 5 rows



                                                                                

## Predição das Avaliações
Para prever a nota que um user daria a um filme, procuramos os vizinhos do item alvo e as avaliações do user nesses vizinhos. 

In [None]:
# calculamos a soma ponderada das classificações dos vizinhos
weighted_sums = test_with_ratings.groupBy("userId", "target_movie").agg(
    sql_sum(col("cosine_sim") * col("neighbor_rating")).alias("weighted_rating_sum"),
    sql_sum(F.abs(col("cosine_sim"))).alias("similarity_sum") 
)

# Agora, calculamos a previsão da classificação para cada user e filme alvo
predictions = weighted_sums.withColumn(
    "pred_rating",
    F.when(
        col("similarity_sum") > 0,
        col("weighted_rating_sum") / col("similarity_sum")
    ).otherwise(None) 
).filter(col("pred_rating").isNotNull()) \
 .select("userId", "target_movie", "pred_rating")

print("Predicted Ratings (sample):")
predictions.show(5, truncate=False)


Predicted Ratings (sample):


25/05/31 00:17:23 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:17:25 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/05/31 00:17:29 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


+------+------------+------------------+
|userId|target_movie|pred_rating       |
+------+------------+------------------+
|26    |2403        |2.948758554673438 |
|58    |32          |3.964580125015903 |
|97    |1248        |4.667354193446279 |
|123   |2502        |2.726767936516566 |
|131   |628         |3.4406109335616875|
+------+------------+------------------+
only showing top 5 rows



                                                                                

In [None]:
# Por fim, unimos as previsões com os dados de teste para obter as classificações reais
final_results = predictions.alias("p") \
    .join(test.alias("t"), (col("p.userId") == col("t.userId")) & (col("p.target_movie") == col("t.movieId"))) \
    .select(
        col("p.userId"),
        col("p.target_movie"),
        col("p.pred_rating"),
        col("t.rating").alias("actual_rating")
    )

final_results_filtered = final_results.filter(col("pred_rating").isNotNull()) # garante que apenas previsões válidas sejam mantidas
final_results_filtered.cache()

print("Final Results with Predictions and Actuals (sample):")
final_results_filtered.show(10, truncate=False)


Final Results with Predictions and Actuals (sample):


25/05/31 00:17:30 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/05/31 00:17:32 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/05/31 00:17:35 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


+------+------------+------------------+-------------+
|userId|target_movie|pred_rating       |actual_rating|
+------+------------+------------------+-------------+
|26    |2403        |2.948758554673438 |3            |
|58    |32          |3.964580125015903 |5            |
|97    |1248        |4.667354193446279 |5            |
|123   |2502        |2.726767936516566 |3            |
|131   |628         |3.4406109335616875|3            |
|148   |1894        |3.49470001270962  |4            |
|166   |1297        |3.8032702823273605|3            |
|187   |1057        |4.348987812110765 |5            |
|195   |1573        |3.9175901147874392|3            |
|219   |480         |3.7172059356197384|3            |
+------+------------+------------------+-------------+
only showing top 10 rows



25/05/31 00:17:36 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
                                                                                

## Avaliação das Predições
Comparamos as predições com as avaliações reais do conjunto de teste utilizando as métricas RMSE (Root Mean Squared Error) e MAE (Mean Absolute Error), que quantificam o erro das recomendações.

In [18]:
# calculate RMSE
evaluator = RegressionEvaluator(
    labelCol="actual_rating",
    predictionCol="pred_rating",
    metricName="rmse"
)
rmse = evaluator.evaluate(final_results_filtered)
print(f"Root Mean Squared Error (RMSE): {rmse}")

#calculate MAE
mae_evaluator = RegressionEvaluator(
    labelCol="actual_rating",
    predictionCol="pred_rating",
    metricName="mae"
)
mae = mae_evaluator.evaluate(final_results_filtered)
print(f"Mean Absolute Error (MAE): {mae}")

25/05/31 00:17:36 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/05/31 00:17:36 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/05/31 00:17:36 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


Root Mean Squared Error (RMSE): 0.9614173994293513
Mean Absolute Error (MAE): 0.7502739194296247


25/05/31 00:17:36 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


In [None]:
# Tempo total de execução
end_time = time.time()
execution_time = round(end_time - start_time, 2)

summary = pd.DataFrame([{
    'Dataset': data_sel,
    'Train Size': ratings.count(),
    'Test Size': test.count(),
    'Similarity': 'Cosine (within cluster)',
    'Clustering': f"KMeans (k={k})",
    'RMSE': rmse,
    'MAE': mae,
    'Execution Time (s)': round(time.time() - start_time, 2)
}])


# Salvar o resumo
output_path = './output/item_item_cf_summary.csv'
if os.path.exists(output_path):
    summary.to_csv(output_path, mode='a', header=False, index=False)
else:
    summary.to_csv(output_path, index=False)

print("Resultados salvos em './output/'")


25/05/31 00:17:37 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


Resultados salvos em './output/'


In [20]:
spark.stop()