# Importando bibliotecas necessárias

- Tensorflow e keras: Criação e treinamento do modelo
- Sklearn: Funções de processamento de dados
- matplotlib e seaborn: plotagem de gráficos do dataset
- pandas: Manipulação dos datasets em CSV

In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Embedding, Input, Dot, Flatten, Dense, Dropout, Concatenate, Lambda, BatchNormalization
from tensorflow.keras.losses import Huber
from tensorflow.keras.callbacks import EarlyStopping

from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt

import pandas as pd

from keras.regularizers import l2
from keras.src.utils import plot_model

import seaborn as sns

from sklearn.metrics.pairwise import cosine_similarity

# Pré-processamento de dataset

Vamos abrir e pré-processar os dados do nosso dataset

In [None]:
ratings_df = pd.read_csv('books/Ratings.csv')
books_df = pd.read_csv('books/Books.csv')
users_df = pd.read_csv('books/Users.csv')

## Analisando a distribuição das notas

Com o histograma, percebemos que existe uma grande predominancia de avaliações 0, que podem enviezar o nosso modelo.

In [None]:
plt.figure(figsize=(8, 5))
sns.histplot(ratings_df["Book-Rating"], bins=11, kde=False)
plt.title("Distribuição das notas")
plt.xlabel("Rating")
plt.ylabel("Frequência")
plt.grid(True)
plt.show()

Removemos então as avaliações nota 0.

In [None]:
ratings_df = ratings_df[ratings_df["Book-Rating"] > 0]

In [None]:
plt.figure(figsize=(8, 5))
sns.histplot(ratings_df["Book-Rating"], bins=10, kde=False)  # Ratings vão de 0 a 10
plt.title("Distribuição das notas")
plt.xlabel("Rating")
plt.ylabel("Frequência")
plt.grid(True)
plt.show()

## Codificar os IDs para índices inteiros discretos

Dados não-numéricos (ISBN do livro, nome do autor, etc) devem ser convertidos em valores discretos para input na rede

In [None]:
books_df = books_df[["ISBN", "Book-Author"]].dropna()
books_df["Book-Author"] = books_df["Book-Author"].str.strip().str.lower()

isbn_to_title = dict(zip(books_df["ISBN"], books_df["Book-Title"]))

def convert_isbn_to_title(isbn):
    return isbn, isbn_to_title.get(isbn, "Título não encontrado")

In [None]:
author2id = {author: idx for idx, author in enumerate(books_df["Book-Author"].unique())}
books_df["author_id"] = books_df["Book-Author"].map(author2id)
isbn_to_author_id = dict(zip(books_df["ISBN"], books_df["author_id"]))

In [None]:
ratings_df["Book-Author"] = ratings_df["ISBN"].map(isbn_to_author_id)

ratings_df = ratings_df.dropna(subset=["Book-Author"])
ratings_df["Book-Author"] = ratings_df["Book-Author"].astype(int)

A idade precisa ser normalizada e usuários sem idade informada, recebem a idade média do dataset para evitar valores nulos, uma vez que a idade é um atributo importante para classificar similaridade entre usuários

In [None]:
users_df["Age"] = users_df["Age"].fillna(users_df["Age"].median())

user_age_map = dict(zip(users_df["User-ID"], users_df["Age"]))
ratings_df["user_age"] = ratings_df["User-ID"].map(user_age_map)

ratings_df["user_age"] = ratings_df["user_age"].astype(float)

scaler_age = StandardScaler()
ratings_df["user_age_scaled"] = scaler_age.fit_transform(ratings_df[["user_age"]])

As notas também são normalizadas para mitigar possíveis viéses causados pela distribuição desigual de notas no dataset.

In [None]:
scaler = StandardScaler()
ratings_df["Book-Rating"] = scaler.fit_transform(ratings_df[["Book-Rating"]])


print(f"Média (do scaler): {scaler.mean_[0]:.2f}")
print(f"Desvio padrão (do scaler): {scaler.scale_[0]:.2f}")

In [None]:
user_encoder = LabelEncoder()
book_encoder = LabelEncoder()
author_encoder = LabelEncoder()

ratings_df["User-ID"] = user_encoder.fit_transform(ratings_df["User-ID"])
ratings_df["ISBN"] = book_encoder.fit_transform(ratings_df["ISBN"])
ratings_df["Book-Author"] = author_encoder.fit_transform(ratings_df["Book-Author"])

num_users = ratings_df["User-ID"].nunique()
num_books = ratings_df["ISBN"].nunique()
num_authors = ratings_df["Book-Author"].nunique()

In [None]:
train_df, val_df = train_test_split(ratings_df, test_size=0.2, random_state=42)

## Avaliando a esparcidade do dataset

Espacidade, ou a predominancia de valores ausentes em uma matriz de dados, é uma métrica útil para guiar decisões a respeito da arquitetura e tamanho do modelo

In [None]:
def compute_sparsity(df, user_col="User-ID", item_col="ISBN"):
    num_users = df[user_col].nunique()
    num_items = df[item_col].nunique()
    num_interactions = len(df)

    total_possible = num_users * num_items
    sparsity = 1 - (num_interactions / total_possible)

    print(f"Número de usuários: {num_users}")
    print(f"Número de livros: {num_items}")
    print(f"Número de interações: {num_interactions}")
    print(f"Total possível de interações: {total_possible}")
    print(f"Esparsidade: {sparsity:.4f} ({sparsity*100:.2f}%)")

    return sparsity

compute_sparsity(train_df)

# Criando o modelo

In [None]:
embedding_dim = 4

user_input = Input(shape=(1,), name="user_input")
book_input = Input(shape=(1,), name="book_input")
author_input = Input(shape=(1,), name="author_input")
age_input = Input(shape=(1,), name="user_age")

user_embedding = Embedding(num_users, embedding_dim, name="user_embedding", embeddings_regularizer=l2(1e-6))(user_input)
book_embedding = Embedding(num_books, embedding_dim, name="book_embedding", embeddings_regularizer=l2(1e-6))(book_input)
author_embedding = Embedding(num_authors, embedding_dim, name="author_embedding", embeddings_regularizer=l2(1e-6))(author_input)

user_vec = Flatten()(user_embedding)
book_vec = Flatten()(book_embedding)
author_vec = Flatten()(author_embedding)

x = Concatenate()([user_vec, book_vec, author_vec, age_input])

outputs = Dense(1)(x)

model = Model(inputs=[user_input, book_input, author_input, age_input], outputs=outputs)
model.compile(optimizer="adam", loss=Huber(), metrics=["mae"])
model.summary()

In [None]:
plot_model(model, to_file="model_plot.png", show_shapes=True, show_layer_names=True, dpi=300)

# Treinando a rede

In [None]:
early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

In [None]:
history = model.fit(
    [train_df["User-ID"].values, train_df["ISBN"].values, train_df["Book-Author"].values, train_df["user_age_scaled"].values],
    train_df["Book-Rating"].values,
    epochs=50,
    verbose=1,
    batch_size=32,
    validation_data=(
        [val_df["User-ID"].values, val_df["ISBN"].values, val_df["Book-Author"].values, val_df["user_age_scaled"].values],
        val_df["Book-Rating"].values
    ),
    callbacks=[early_stop])

In [None]:
history.history['loss']

In [None]:
history.history['val_loss']

In [None]:
plt.plot(history.history['loss'])
plt.xlabel("Época")
plt.ylabel("Loss (Erro de Huber)")
plt.title("Loss vs Época")
plt.grid(True)
plt.show()

In [None]:
plt.plot(history.history['val_loss'])
plt.xlabel("Época")
plt.ylabel("Val Loss (Erro de Huber)")
plt.title("Val Loss vs Época")
plt.grid(True)
plt.show()

Podemos salvar o modelo para não precisar treiná-lo novamente toda vez

In [None]:
model.save("embeddings.keras")

# Resultados

Podemos abrir o nosso modelo salvo e utilizar os embeddings gerados para calcular similaridades

In [None]:
model = load_model("embeddings.keras")

In [None]:
book_embeddings = model.get_layer("book_embedding").get_weights()[0]
user_embeddings = model.get_layer("user_embedding").get_weights()[0]
author_embeddings =model.get_layer("author_embedding").get_weights()[0]

## Encontrando similares on-the-fly

A abordagem mais comum é gerar uma matriz de distancia com todos os itens que geraram embeddings (livros, autores, usuários). Porém essas matrizes seriam muito grandes devido a quantidade de itens que estamos lidando.
Portanto, utilizaremos uma função que calcula as distancias apenas de um único item determinado, ordenando os resultados de maneira decrescente para obter os itens mais próximos.

In [None]:
def recommend_similar_books_fast(book_id, top_n=5):
    try:
        book_idx = book_encoder.transform([book_id])[0]
    except:
        print("Book ID não encontrado.")
        return []

    target_embedding = book_embeddings[book_idx].reshape(1, -1)

    similarities = cosine_similarity(target_embedding, book_embeddings)[0]

    similar_indices = similarities.argsort()[::-1][1:top_n+1]

    similar_books = [book_encoder.inverse_transform([i])[0] for i in similar_indices]
    scores = [round(similarities[i], 4) for i in similar_indices]

    return list(zip(similar_books, scores))

In [None]:
def recommend_similar_users_fast(user_id, top_n=5):
    try:
        user_idx = user_encoder.transform([user_id])[0]
    except:
        print("User ID não encontrado.")
        return []

    target_embedding = user_embeddings[user_idx].reshape(1, -1)

    similarities = cosine_similarity(target_embedding, user_embeddings)[0]

    similar_indices = similarities.argsort()[::-1][1:top_n+1]

    similar_users = [user_encoder.inverse_transform([i])[0] for i in similar_indices]
    scores = [round(similarities[i], 4) for i in similar_indices]

    return list(zip(similar_users, scores))

In [None]:
def recommend_similar_authors_fast(author_name, top_n=5):
    try:
        author_idx = author2id[author_name.lower().strip()]
    except KeyError:
        print("Autor não encontrado.")
        return []

    target_embedding = author_embeddings[author_idx].reshape(1, -1)

    similarities = cosine_similarity(target_embedding, author_embeddings)[0]

    similar_indices = similarities.argsort()[::-1][1:top_n+1]

    id2author = {idx: author for author, idx in author2id.items()}
    similar_authors = [(id2author[i], round(similarities[i], 4)) for i in similar_indices]

    return similar_authors

## Visualizando os resultados

In [None]:
recommend_similar_users_fast(276729, top_n=5)

In [None]:
recommend_similar_authors_fast("Machado de Assis", top_n=5)

In [None]:
recommended_books = recommend_similar_books_fast("0385504209", top_n=5)
convert_isbn_to_title(recommended_books[1][0])