# Import Library

In [None]:
!pip install -q tensorflow-ranking
!pip install -q tensorflow-recommenders

In [None]:
!sudo apt-get install tesseract-ocr
!pip install pytesseract

In [None]:
import os
import csv
import pprint
import tempfile
import shutil
import kagglehub
import pickle
from typing import Dict, Text
import ipywidgets as widgets
import random
from IPython.display import display, HTML

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.preprocessing.text import tokenizer_from_json
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Flatten, Concatenate, Dense
from tensorflow.keras.optimizers import Adam
import regex as re
import string
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
os.environ['TF_USE_LEGACY_KERAS'] = '1'

In [None]:
import tensorflow_recommenders as tfrs
import tensorflow_ranking as tfr

In [None]:
from PIL import Image
import pytesseract

# Import Dataset

In [None]:
mohamedbakhet_amazon_books_reviews_path = kagglehub.dataset_download('mohamedbakhet/amazon-books-reviews')
print('Data source import complete')

In [None]:
# Make folder 'data'
data_folder = 'data'
os.makedirs(data_folder, exist_ok=True)
shutil.move(mohamedbakhet_amazon_books_reviews_path, os.path.join(data_folder, 'amazon-books-reviews'))
print(f'Dataset has been moved to folder {data_folder}')

In [None]:
reviews_file_path = 'data/amazon-books-reviews/Books_rating.csv'
books_details_file_path = 'data/amazon-books-reviews/books_data.csv'

# Load the reviews file
reviews_df = pd.read_csv(reviews_file_path)

# Load the books details file
books_details_df = pd.read_csv(books_details_file_path)

In [None]:
# Explore the reviews dataset
print("Reviews Dataset:")
print(reviews_df.info())

# Explore the books details dataset
print("\nBooks Details Dataset:")
print(books_details_df.info())

# Preprocessing Dataset

In [None]:
# Check for missing values in reviews dataset
reviews_missing = reviews_df.isnull().sum()

# Check for missing values in books details dataset
books_missing = books_details_df.isnull().sum()

# Display missing values
print("Reviews Missing Values:")
print(reviews_missing)

print("\nBooks Details Missing Values:")
print(books_missing)

In [None]:
# Drop rows with missing 'Title' and 'User_id'
reviews_df = reviews_df.dropna(subset=['Title', 'User_id'])

# Drop column that we won't be using it
reviews_df = reviews_df.drop(columns=['profileName'])

# Fill missing values in 'review/summary' and 'review/text' with empty strings
reviews_df['review/summary'] = reviews_df['review/summary'].fillna('')
reviews_df['review/text'] = reviews_df['review/text'].fillna('')

# Display updated information about missing values
reviews_missing_values = reviews_df.isnull().sum()
print("Reviews Missing Values After Handling:")
print(reviews_missing_values)

In [None]:
# Drop rows with missing 'Title'
books_details_df = books_details_df.dropna(subset=['Title'])

# Impute missing values in 'ratingsCount' with the median
books_details_df['ratingsCount'] = books_details_df['ratingsCount'].fillna(books_details_df['ratingsCount'].median())

# Fill missing values in textual columns with empty strings
textual_columns = ['description', 'authors', 'publisher', 'publishedDate', 'categories']
books_details_df[textual_columns] = books_details_df[textual_columns].fillna('')

# Dropping columns we are not going to use
# books_details_df = books_details_df.drop(columns=['image', 'previewLink', 'infoLink'])

# Display updated information about missing values
books_details_missing_values = books_details_df.isnull().sum()
print("Books Details Missing Values After Handling:")
print(books_details_missing_values)

In [None]:
def preprocess_genre_column(df, column_name):
    def process_genre(genre):
        # Remove square brackets and quotes
        cleaned_genre = genre.strip("[]").replace("'", "").strip()
        # Replace '&' with ',' and split by ','
        # genres = cleaned_genre.replace('&', ',').split(',')
        # Strip whitespace, convert to lowercase, and sort the genres alphabetically
        # sorted_genres = sorted(g.strip().lower() for g in genres)
        # Join back with ', '
        # return ', '.join(sorted_genres)
        return cleaned_genre
    # Apply the processing function to the specified column
    df[column_name] = df[column_name].apply(process_genre)
    return df

In [None]:
books_details_df = preprocess_genre_column(books_details_df, 'categories')

In [None]:
books_details_df = preprocess_genre_column(books_details_df, 'authors')

In [None]:
reviews_df.head()

In [None]:
books_details_df.head()

In [None]:
merged_df = pd.merge(reviews_df, books_details_df, on='Title', how='left')
merged_df.head()

In [None]:
len(merged_df)

In [None]:
merged_df = merged_df.drop_duplicates()

In [None]:
len(merged_df)

In [None]:
merged_df_missing_values = merged_df.isnull().sum()
print("Merged Dataframe Missing Values:")
print(merged_df_missing_values)

## Sampling Dataset

In [None]:
# Get the top 10 most frequent genres in the 'categories' column
top_10_frequent_genres = merged_df['categories'].value_counts().head(11)

print("Top 10 most frequent genres in the 'genre' column:")
print(top_10_frequent_genres)

In [None]:
categories = [
    'Fiction',
    'Juvenile Fiction',
    'Biography & Autobiography',
    'Religion',
    'History',
    'Business & Economics',
    'Computers',
    'Social Science',
    'Cooking',
    'Self-Help'
]

# First, filter merged_df to exclude rows where 'image', 'previewLink', or 'infoLink' are NaN
filtered_df = merged_df.dropna(subset=['image', 'previewLink', 'infoLink', 'Price'])

# Then, filter further to only include rows where the 'categories' column is in the specified categories list
filtered_df = filtered_df[filtered_df['categories'].isin(categories)]

# Sample 5000 rows per category
sampled_df = filtered_df.groupby('categories').apply(lambda x: x.sample(n=5000, random_state=42))

# Reset the index after sampling
sampled_df.reset_index(drop=True, inplace=True)

In [None]:
# Display the result
sampled_df['categories'].value_counts()

In [None]:
sampled_df.head()

In [None]:
sampled_df.info()

In [None]:
sampled_df.to_csv('sampled_categories.csv', index=False)

In [None]:
capstone_path = kagglehub.dataset_download("dikiiwahyudi/capstone-literify")
print('Data source import complete')

In [None]:
# Make folder 'data'
data_folder = 'data'
os.makedirs(data_folder, exist_ok=True)
shutil.move(capstone_path, os.path.join(data_folder, 'full-data'))
print(f'Dataset has been moved to folder {data_folder}')

In [None]:
df = pd.read_csv("/content/data/full-data/sampled_categories.csv", on_bad_lines='skip')

In [None]:
df.head()

In [None]:
df.info()

In [None]:
df['categories'].value_counts()

In [None]:
duplicate_rows = df[df.duplicated()]
print(duplicate_rows)

# Model 1: Without Genre

## Preparing Dataset

In [None]:
df = df.rename(columns={'review/score': 'user_rating', 'User_id': 'user_id', 'Title': 'book_title'})

In [None]:
df[['user_id', 'book_title', 'user_rating']].head()

In [None]:
unique_books_df = df[['book_title']].drop_duplicates()
unique_books_df.head()

In [None]:
ratings = tf.data.Dataset.from_tensor_slices(dict(df[['user_id', 'book_title', 'user_rating']]))
books = tf.data.Dataset.from_tensor_slices(dict(unique_books_df[['book_title']]))

ratings = ratings.map(lambda x: {
    "book_title": x["book_title"],
    "user_id": x["user_id"],
    "rating": float(x["user_rating"])
})

books = books.map(lambda x: x["book_title"])

In [None]:
tf.random.set_seed(42)

# Calculate the total number of elements in the dataset
total_count = len(ratings)

# Determine the size of the training and testing datasets
train_size = int(0.8 * total_count)
test_size = total_count - train_size

# Shuffle the dataset
shuffled = ratings.shuffle(total_count, seed=42, reshuffle_each_iteration=False)

# Split the dataset into 80% training and 20% testing
train = shuffled.take(train_size)
test = shuffled.skip(train_size).take(test_size)

In [None]:
book_titles = books.batch(1_000)
user_ids = ratings.batch(1_000).map(lambda x: x["user_id"])

unique_book_titles = np.unique(np.concatenate(list(book_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

print('Unique books: {}'.format(len(unique_book_titles)))
print('Unique users: {}'.format(len(unique_user_ids)))

## Architecture

In [None]:
class BookModel(tfrs.models.Model):
    def __init__(self, rating_weight: float, retrieval_weight: float) -> None:
        super().__init__()

        embedding_dimension = 32

        # User and book models with smaller embeddings
        self.book_model: tf.keras.layers.Layer = tf.keras.Sequential([
            tf.keras.layers.StringLookup(vocabulary=unique_book_titles, mask_token=None),
            tf.keras.layers.Embedding(len(unique_book_titles) + 1, embedding_dimension)
        ])
        self.user_model: tf.keras.layers.Layer = tf.keras.Sequential([
            tf.keras.layers.StringLookup(vocabulary=unique_user_ids, mask_token=None),
            tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
        ])

        # Rating model
        self.rating_model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(1),
        ])

        # Ranking task
        self.rating_task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
            loss=tf.keras.losses.MeanSquaredError(),
            metrics=[tf.keras.metrics.RootMeanSquaredError()],
        )
        # Retrieval task
        self.retrieval_task: tf.keras.layers.Layer = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(
                candidates=books.batch(248).map(self.book_model)
            )
        )

        # Loss weights
        self.rating_weight = rating_weight
        self.retrieval_weight = retrieval_weight

    def call(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
        user_embeddings = self.user_model(features["user_id"])
        book_embeddings = self.book_model(features["book_title"])

        return (
            user_embeddings,
            book_embeddings,
            self.rating_model(
                tf.concat([user_embeddings, book_embeddings], axis=1)
            ),
        )

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        ratings = features.pop("rating")
        user_embeddings, book_embeddings, rating_predictions = self(features)

        # Compute loss for each task
        rating_loss = self.rating_task(labels=ratings, predictions=rating_predictions)
        retrieval_loss = self.retrieval_task(user_embeddings, book_embeddings)

        # Combine losses using weights
        return (self.rating_weight * rating_loss + self.retrieval_weight * retrieval_loss)

## Fitting and Evaluating

In [None]:
# Instantiate model
model = BookModel(rating_weight=1.0, retrieval_weight=1.0)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))

In [None]:
cached_train = train.shuffle(1_000).batch(248).cache()
cached_test = test.batch(248).cache()

In [None]:
# Early Stopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="root_mean_squared_error",
    patience=2,
    restore_best_weights=True
)

In [None]:
model.fit(cached_train, validation_data=cached_test, epochs=10, callbacks=[early_stopping])

In [None]:
metrics = model.evaluate(cached_test, return_dict=True)

print(f"\nRetrieval top-100 accuracy: {metrics['factorized_top_k/top_100_categorical_accuracy']:.3f}")
print(f"Ranking RMSE: {metrics['root_mean_squared_error']:.3f}")

## Save Model

In [None]:
model.save_weights('model_recomendation_weights.h5')

In [None]:
unique_user_ids[:5]

In [None]:
with open('user_ids.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['user_id']) # Write the header
    for user_id in unique_user_ids:
        writer.writerow([user_id.decode('utf-8')]) # Decode the byte string to a regular string and write

## Prediction

In [None]:
model.load_weights("/content/model_recomendation_weights.h5")

In [None]:
content_df = pd.read_csv('/content/content_df.csv')

In [None]:
content_df.head()

In [None]:
user_ids_df = pd.read_csv('/content/user_ids.csv')

In [None]:
user_ids_df.head()

In [None]:
# Function to display catalog-style recommendations
def display_catalog(recommendations, top_n=3):
    display_str =  f"<h3>Top {top_n} Recommendations:</h3><br>"

    for idx, row in recommendations.iterrows():
        display_str += f"""
            <div style="border: 1px solid #ddd; padding: 10px; margin-bottom: 10px;">
                <h4>{row['book_title']}</h4>
                <p><strong>Authors:</strong> {row['authors']}</p>
                <p><strong>Genre:</strong> {row['genre']}</p>
                <p><strong>Publisher:</strong> {row['publisher']}</p>
                <p><strong>Price:</strong> {row['Price']}</p>
                <p><strong>Description:</strong> {row['description']}</p>
                <img src="{row['image']}" alt="{row['book_title']}" width="100" height="150" style="display:block; margin-top: 10px;">
                <a href="{row['previewLink']}" target="_blank">Preview</a> |
                <a href="{row['infoLink']}" target="_blank">More Info</a>
            </div>
        """

    # Display the catalog-style information
    display(HTML(display_str))

In [None]:
def predict_book_recomendation(user, filtered_books_df, top_n=3):
    books = tf.data.Dataset.from_tensor_slices(dict(filtered_books_df[['book_title']]))
    books = books.map(lambda x: x["book_title"])

    # Create a model that takes in raw query features
    index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)

    # Recommends books out of the entire books dataset
    index.index_from_dataset(
        tf.data.Dataset.zip((books.batch(100), books.batch(100).map(model.book_model)))
    )

    # Get recommendations
    _, titles = index(tf.constant([str(user)]))
    recommended_titles = [title.decode("utf-8") for title in titles[0, :top_n].numpy()]

    # Filter details from the input DataFrame
    recommendations = filtered_books_df[filtered_books_df['book_title'].isin(recommended_titles)]
    return display_catalog(recommendations, top_n=3)
def predict_rating(user, book):
    trained_book_embeddings, trained_user_embeddings, predicted_rating = model({
          "user_id": np.array([str(user)]),
          "book_title": np.array([book])
      })
    print("Predicted rating for {}: {}".format(book, predicted_rating.numpy()[0][0]))

In [None]:
predict_book_recomendation("AA", content_df, 5)

In [None]:
predict_rating("123","Minions")

In [None]:
def predict_book(user_ids, content_df, model, predict_genre, top_n=3):
    # Randomly sample a user_id from the user_ids array
    user = random.choice(user_ids['user_id'].tolist())  # Get a random user_id from the list

    # Ask the user if they want recommendations based on a predicted genre
    genre_based = input("Would you like to see recommendations based on a predicted genre? (yes/no): ").strip().lower()

    if genre_based == "yes":
        # Filter books based on the selected genre
        filtered_books_df = content_df[content_df['genre'].isin(predict_genre)].drop_duplicates().reset_index(drop=True)
        recommendations = predict_book_recomendation(user, filtered_books_df, top_n)
    else:
        # If user doesn't want genre-based, use the entire dataset and remove duplicates
        filtered_books_df = content_df.drop_duplicates().reset_index(drop=True)
        recommendations = predict_book_recomendation(user, filtered_books_df, top_n)

In [None]:
# Example Usage
predict_genre = ["Fiction"]
predict_book(user_ids_df, content_df, model, predict_genre, top_n=3)

# Model 2: With Genre

## Preparing Dataset

In [None]:
merged_df = merged_df.rename(columns={'review/score': 'user_rating', 'User_id': 'user_id', 'Title': 'book_title', 'categories': 'genre'})

In [None]:
merged_df[['user_id', 'book_title', 'user_rating', 'genre']].head()

In [None]:
def preprocess_genre_column(df, column_name):
    def process_genre(genre):
        # Remove square brackets and quotes
        cleaned_genre = genre.strip("[]").replace("'", "").strip()
        # Replace '&' with ',' and split by ','
        genres = cleaned_genre.replace('&', ',').split(',')
        # Strip whitespace, convert to lowercase, and sort the genres alphabetically
        sorted_genres = sorted(g.strip().lower() for g in genres)
        # Join back with ', '
        return ', '.join(sorted_genres)

    # Apply the processing function to the specified column
    df[column_name] = df[column_name].apply(process_genre)
    return df

In [None]:
merged_df = preprocess_genre_column(merged_df, 'genre')

In [None]:
merged_df['genre'].head()

In [None]:
merged_df['genre'].nunique()

In [None]:
merged_df_model = pd.DataFrame(merged_df, columns=['user_id', 'book_title', 'genre', 'user_rating'])

In [None]:
# Split the data into train and test sets (80/20 split)
train_df, test_df = train_test_split(merged_df_model, test_size=0.2, random_state=42)

In [None]:
len(train_df), len(test_df)

In [None]:
def input_model(df):
  # Prepare the input data for training and testing
  df_user_ids = df['user_id'].values
  df_book_titles = df['book_title'].values
  df_genres = df['genre'].values
  df_ratings = df['user_rating'].values
  return df_user_ids, df_book_titles, df_genres, df_ratings

In [None]:
train_user_ids, train_book_titles, train_genres, train_ratings = input_model(train_df)
test_user_ids, test_book_titles, test_genres, test_ratings = input_model(test_df)

In [None]:
len(train_user_ids), len(train_book_titles), len(train_genres), len(train_ratings)

In [None]:
len(test_user_ids), len(test_book_titles), len(test_genres), len(test_ratings)

## Vocabulary

In [None]:
user_vocab = tf.keras.layers.StringLookup(vocabulary=merged_df_model['user_id'].astype(str).unique(), mask_token=None)
book_vocab = tf.keras.layers.StringLookup(vocabulary=merged_df_model['book_title'].astype(str).unique(), mask_token=None)
genre_vocab = tf.keras.layers.StringLookup(vocabulary=merged_df_model['genre'].astype(str).unique(), mask_token=None)

In [None]:
# Get the vocabulary from StringLookup
user_vocab_list = user_vocab.get_vocabulary()
book_vocab_list = book_vocab.get_vocabulary()
genre_vocab_list = genre_vocab.get_vocabulary()

# Save the vocabulary to .pkl files
with open("user_vocab.pkl", "wb") as uv_file:
    pickle.dump(user_vocab_list, uv_file)

with open("book_vocab.pkl", "wb") as bv_file:
    pickle.dump(book_vocab_list, bv_file)

with open("genre_vocab.pkl", "wb") as gv_file:
    pickle.dump(genre_vocab_list, gv_file)

In [None]:
train_user_ids = user_vocab(train_user_ids)
train_book_titles = book_vocab(train_book_titles)
train_genres = genre_vocab(train_genres)
test_user_ids = user_vocab(test_user_ids)
test_book_titles = book_vocab(test_book_titles)
test_genres = genre_vocab(test_genres)

## Architecture

In [None]:
# Define the hybrid collaborative filtering + content-based model
def create_hybrid_model(user_vocab, movie_vocab, genre_vocab, embedding_dim=64):
    # User input (Collaborative Filtering part)
    user_input = Input(shape=(1,))
    user_embedding = Embedding(user_vocab.vocabulary_size() + 1, embedding_dim)(user_input)
    user_embedding = Flatten()(user_embedding)

    # Book input (Collaborative Filtering part)
    book_input = Input(shape=(1,))
    book_embedding = Embedding(book_vocab.vocabulary_size() + 1, embedding_dim)(book_input)
    book_embedding = Flatten()(book_embedding)

    # Genre input (Content-based Filtering part)
    genre_input = Input(shape=(1,))
    genre_embedding = Embedding(genre_vocab.vocabulary_size() + 1, embedding_dim)(genre_input)
    genre_embedding = Flatten()(genre_embedding)

    # Combine collaborative filtering and content-based filtering
    combined = Concatenate()([user_embedding, book_embedding, genre_embedding])

    # Fully connected layer for prediction
    dense_layer = Dense(128, activation='relu')(combined)
    output = Dense(1)(dense_layer)

    model = Model(inputs=[user_input, book_input, genre_input], outputs=output)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error', metrics=['RootMeanSquaredError'])

    return model

## Fitting and Evaluating

In [None]:
# Create the hybrid model
model = create_hybrid_model(user_vocab, book_vocab, genre_vocab, embedding_dim=64)
model.summary()

In [None]:
model.fit(
    [train_user_ids, train_book_titles, train_genres], train_ratings,
    epochs=3, batch_size=2048,
    validation_data=([test_user_ids, test_book_titles, test_genres], test_ratings)
    )

## Save Model

In [None]:
model.save('recommender_model.keras') # Save the model weights
print("Model weights saved successfully!")

In [None]:
# Save only the model weights
model.save_weights('hybrid_model_weights.h5')

In [None]:
model.load_weights("/content/hybrid_model_weights.h5")

## Prediction

In [None]:
def load_resources(user_vocab_path, book_vocab_path, genre_vocab_path):
    """
    Load the recommender model and vocabularies from saved files.

    Parameters:
    - model_path: Path to the saved Keras model (.h5 file).
    - user_vocab_path: Path to the user vocabulary pickle file (.pkl).
    - book_vocab_path: Path to the book vocabulary pickle file (.pkl).
    - genre_vocab_path: Path to the genre vocabulary pickle file (.pkl).

    Returns:
    - model: Loaded Keras model.
    - user_vocab, book_vocab, genre_vocab: Loaded vocabularies.
    """
    # Load the trained Keras model
    #model = tf.saved_model.load(model_path)

    # Load the vocabularies
    with open(user_vocab_path, 'rb') as file:
        user_vocab = pickle.load(file)
    with open(book_vocab_path, 'rb') as file:
        book_vocab = pickle.load(file)
    with open(genre_vocab_path, 'rb') as file:
        genre_vocab = pickle.load(file)

    return user_vocab, book_vocab, genre_vocab

In [None]:
# Path to saved files
user_vocab_path = "user_vocab.pkl"
book_vocab_path = "book_vocab.pkl"
genre_vocab_path = "genre_vocab.pkl"

# Load resources
user_vocab, book_vocab, genre_vocab = load_resources(user_vocab_path, book_vocab_path, genre_vocab_path)

In [None]:
books = tf.data.Dataset.from_tensor_slices(dict(merged_df[['book_title']]))

In [None]:
books = books.map(lambda x: {
    "book_title": x["book_title"]
})

In [None]:
print(type(user_vocab))
print(type(book_vocab))
print(type(genre_vocab))

In [None]:
import tensorflow as tf

# Safe lookup function for lists
def safe_lookup(vocab_list, value, default=0):
    try:
        return vocab_list.index(value)  # Get index if found in list
    except ValueError:
        return default  # Return default value (0) if not found

# Example user, genre, and book title data
user_ids = ["42"]  # List of user IDs
genres = ["['religion']"]  # List of genres
book_titles = ["The Bible", "Introduction to Programming", "The Catcher in the Rye"]  # List of book titles

# Encode the inputs using the vocabularies
encoded_user_id = [safe_lookup(user_vocab, user) for user in user_ids]
encoded_genre = [safe_lookup(genre_vocab, genre) for genre in genres]
encoded_book_titles = [safe_lookup(book_vocab, title) for title in book_titles]

# Convert them to tensors
inputs = {
    'input_1': tf.convert_to_tensor(encoded_user_id),  # user_id (numerical)
    'input_2': tf.convert_to_tensor(encoded_genre),    # genre (numerical)
    'input_3': tf.convert_to_tensor(encoded_book_titles)  # book_title (numerical)
}

# Get book recommendations for user 42
scores = model(inputs)
titles = tfr.utils.sort_by_scores(scores, [tf.convert_to_tensor(book_titles)])[0]
print(f"Top 5 recommendations for user 42: {titles[0, :5]}")

In [None]:
for input_layer in model.inputs:
    print(input_layer.name)

In [None]:
def get_book_recommendations(model, genre_vocab, books_df, user_id, user_genre_preferences, batch_size=2000):
    """
    Generate book recommendations for a specific user based on their genre preferences.

    Parameters:
    - model: Trained recommendation model.
    - genre_vocab: Genre vocabulary to map genres to integer IDs.
    - books_df: DataFrame containing the book dataset.
    - user_id: ID of the user (e.g., 42).
    - user_genre_preferences: List of genres the user likes (e.g., ['Fiction', 'Science Fiction']).
    - batch_size: Number of books to process per batch (default: 2000).

    Returns:
    - top_recommendations: The top 5 book recommendations for the user.
    """
    # Convert genre preferences to tensor using genre_vocab
    genre_ids = [genre_vocab.get(genre, -1) for genre in user_genre_preferences]
    genre_tensor = tf.convert_to_tensor(genre_ids, dtype=tf.int32)

    # Create a TensorFlow dataset of book titles for batching
    book_titles = books_df['Title'].values
    book_titles = tf.data.Dataset.from_tensor_slices(book_titles)

    # Batch the dataset to process in chunks
    book_titles = book_titles.batch(batch_size)

    # Initialize a list to store all recommendations
    all_recommendations = []

    # Process all batches
    for batch_titles in book_titles:
        # Generate the input for the user (including genre preferences)
        inputs = {
            "user_id": tf.expand_dims(tf.repeat(user_id, repeats=batch_titles.shape[0]), axis=0),  # User ID repeated for each book in the batch
            "book_title": tf.expand_dims(batch_titles, axis=0),  # Book titles as input for the model
            "genre": tf.expand_dims(tf.repeat(genre_tensor, repeats=batch_titles.shape[0], axis=0), axis=0)  # User genre preferences
        }

        # Get book recommendations for the user
        scores = model(inputs)

        # Sort the results by the generated scores
        titles = tfr.utils.sort_by_scores(scores, [tf.expand_dims(batch_titles, axis=0)])[0]

        # Store the recommendations for this batch
        all_recommendations.extend(titles[0].numpy().tolist())  # Convert to list of titles and add to all recommendations

    # Extract the top 5 recommendations from all batches
    top_recommendations = all_recommendations[:5]

    return top_recommendations

In [None]:

# Example usage:
# Assuming you have the model, genre_vocab, and books_df loaded already

user_id = "42"  # Example user ID
user_genre_preferences = ['Fiction', 'Science Fiction', 'Fantasy']  # Example genre preferences

# Get book recommendations for the user
top_recommendations = get_book_recommendations(
    model, genre_vocab, book_details_df, user_id, user_genre_preferences
)

print(f"Top 5 recommendations for user {user_id}: {top_recommendations}")

In [None]:
# Example usage:
user_id = "42"  # Example user ID
user_genre_preferences = ['Fiction', 'Science Fiction', 'Fantasy']  # Example genre preferences

# Get book recommendations for the user
top_recommendations = get_book_recommendations(
    model, genre_vocab, books_details_df, user_id, user_genre_preferences
)

print(f"Top 5 recommendations for user {user_id}: {top_recommendations}")

In [None]:
merged_df['genre'].unique()

In [None]:
# Example
user_id = "A30TK6U7DNS82R"
favorite_genres = ['religion']

# Books recomendation
recommended_books = recommend_books(
    user_id=user_id,
    favorite_genres=favorite_genres,
    model=recommender_model,
    user_vocab=user_vocab,
    book_vocab=book_vocab,
    genre_vocab=genre_vocab,
    num_books=200000,
    top_n=3
)

# Output hasil rekomendasi
print("Top 3 Recommended Books:", recommended_books)

In [None]:
titles_to_search = ['Wuthering Heights (College classics in English)', 'The Scarlet Letter (Courage Unabridged Classics)']
filtered_df = merged_df[merged_df['book_title'].isin(titles_to_search)][['genre', 'book_title', 'user_rating']]
filtered_df

# Model 3: Content-Based Filtering

## Modelling

In [None]:
df = df.rename(columns={'review/score': 'user_rating', 'User_id': 'user_id', 'Title': 'book_title', 'categories': 'genre'})

In [None]:
df.head()

In [None]:
df['genre'].value_counts()

In [None]:
summary_df = df.groupby('book_title').agg(
    average_rating=('user_rating', 'mean'),  # Calculate average of ratings
    rating_count=('user_rating', 'size')     # Count the number of ratings (i.e., number of entries)
).reset_index()

In [None]:
summary_df.head()

In [None]:
duplicate_rows = summary_df[summary_df.duplicated()]
print(duplicate_rows)

In [None]:
df_unique = df[["book_title", "description", "authors", "genre", "publisher", "Price", "image", "previewLink", "infoLink"]].drop_duplicates()
df_unique.head()

In [None]:
join_df = df_unique.merge(summary_df, on='book_title', how='left')

In [None]:
join_df.head()

In [None]:
duplicate_rows = join_df[join_df.duplicated()]
print(duplicate_rows)

In [None]:
R = join_df['average_rating']
v = join_df['rating_count']
# Only consider movies that have more votes than at least 80% of the movies in our dataset
m = join_df['rating_count'].quantile(0.8)
C = join_df['average_rating'].mean()

join_df['weighted_average'] = (R*v + C*m)/(v+m)

In [None]:
scaler = MinMaxScaler()
scaled = scaler.fit_transform(join_df[['weighted_average']])
weighted_df = pd.DataFrame(scaled, columns=['weighted_average'])

weighted_df.index = join_df['book_title']

In [None]:
weighted_df_sorted = weighted_df.sort_values(by='weighted_average', ascending=False)
weighted_df_sorted.head(10)

In [None]:
def remove_punc1(text):
    # Check if the text is not NaN or a non-string value
    if isinstance(text, str):
        cleaned = text.translate(str.maketrans('', '', string.punctuation)).lower()
        clean_text = cleaned.translate(str.maketrans('', '', string.digits))
        return clean_text
    return ''  # Return an empty string if the value is not a string
def remove_punc2(text):
    # Check if the text is a string
    if isinstance(text, str):
        cleaned = text.translate(str.maketrans('', '', string.punctuation)).lower()
        clean_text = cleaned.translate(str.maketrans('', '', string.digits))
        return clean_text
    return ''  # Return an empty string if the value is not a string

In [None]:
content_df = df_unique[['book_title', 'description', 'authors', 'publisher', 'genre']]

In [None]:
# content_df['Title_Content'] = content_df['Title'].apply(remove_punc1)
content_df['description'] = content_df['description'].apply(remove_punc1)
content_df['authors'] = content_df['authors'].apply(remove_punc2)
content_df['publisher'] = content_df['publisher'].apply(remove_punc1)
content_df['genre'] = content_df['genre'].apply(remove_punc2)
content_df['bag_of_words'] = ''
content_df['bag_of_words'] = content_df[content_df.columns[1:]].apply(lambda x: ' '.join(x), axis=1)
content_df.set_index('book_title', inplace=True)

content_df = content_df[['bag_of_words']]
content_df.head()

In [None]:
content_df = weighted_df_sorted.merge(content_df, left_index=True, right_index=True, how='left')

tfidf = TfidfVectorizer(stop_words='english', min_df=5)
tfidf_matrix = tfidf.fit_transform(content_df['bag_of_words'])
tfidf_matrix.shape

In [None]:
cos_sim = cosine_similarity(tfidf_matrix)
cos_sim.shape

In [None]:
content_df = content_df.reset_index()
content_df.head()

In [None]:
join_df = join_df.merge(content_df, on='book_title', how='left')
join_df.head()

In [None]:
join_df.info()

In [None]:
join_df = join_df.drop(columns=['weighted_average_x'])

In [None]:
join_df = join_df.rename(columns={'weighted_average_y': 'weighted_average'})

In [None]:
join_df.head()

In [None]:
join_df.info()

In [None]:
duplicate_rows = join_df[join_df.duplicated()]
duplicate_rows

In [None]:
join_df = join_df.drop_duplicates()

In [None]:
duplicate_rows = join_df[join_df.duplicated()]
duplicate_rows

## Save Model

In [None]:
join_df.to_csv('content_df.csv', index=False)

In [None]:
pickle.dump(tfidf_matrix, open('cosine_similarity.pkl', 'wb'))

## Prediction

In [None]:
# Function to display catalog-style recommendations
def display_catalog(recommendations, top_n=3):
    display_str = f"<h3>Top {top_n} Recommendations:</h3><br>"

    for idx, row in recommendations.iterrows():
        display_str += f"""
            <div style="border: 1px solid #ddd; padding: 10px; margin-bottom: 10px;">
                <h4>{row['book_title']}</h4>
                <p><strong>Authors:</strong> {row['authors']}</p>
                <p><strong>Genre:</strong> {row['genre']}</p>
                <p><strong>Publisher:</strong> {row['publisher']}</p>
                <p><strong>Price:</strong> {row['Price']}</p>
                <p><strong>Description:</strong> {row['description']}</p>
                <img src="{row['image']}" alt="{row['book_title']}" width="100" height="150" style="display:block; margin-top: 10px;">
                <a href="{row['previewLink']}" target="_blank">Preview</a> |
                <a href="{row['infoLink']}" target="_blank">More Info</a>
            </div>
        """

    # Display the catalog-style information
    display(HTML(display_str))

In [None]:
def predict(title, data, cos_sim, similarity_weight=0.7, top_n=3):
    index_movie = data[data['book_title'] == title].index
    similarity = cos_sim[index_movie].T

    sim_df = pd.DataFrame(similarity, columns=['similarity'])
    final_df = pd.concat([data, sim_df], axis=1)

    final_df['final_score'] = final_df['weighted_average']*(1-similarity_weight) + final_df['similarity']*similarity_weight

    final_df_sorted = final_df.sort_values(by='final_score', ascending=False).head(top_n)
    final_df_sorted_show = final_df_sorted[['book_title', 'description', 'authors', 'genre', 'publisher', 'Price', 'image', 'previewLink', 'infoLink']]
    return display_catalog(final_df_sorted_show, top_n=3)

In [None]:
# Load the content_df from CSV
content_df = pd.read_csv('/content/content_df.csv')

# Load the cosine similarity matrix from pickle
cos_sim = pickle.load(open('/content/cosine_similarity.pkl', 'rb'))
# Convert the cosine similarity matrix to a dense format
cos_sim_dense = cos_sim.toarray()

In [None]:
# Create a dropdown widget for book titles
dropdown = widgets.Dropdown(
    options=content_df['book_title'].tolist(),
    description='Book Title:',
    disabled=False
)

# Function to display top 5 recommendations when a book is selected
def on_select(change):
    selected_title = change.new
    recommendations_html = predict(selected_title, content_df, cos_sim_dense, similarity_weight=0.7, top_n=5)

    # Display the catalog-style information
    display(HTML(recommendations_html))

# Attach the function to the dropdown widget
dropdown.observe(on_select, names='value')

# Display the dropdown widget
display(dropdown)

# Model 4: Text Classification

## Preparing Dataset

In [None]:
df = df.rename(columns={'review/score': 'user_rating', 'User_id': 'user_id', 'Title': 'book_title', 'categories': 'genre'})

In [None]:
df[['book_title', 'authors', 'genre']].head()

In [None]:
unique_books_df = df[['book_title', 'authors', 'genre']].drop_duplicates()
unique_books_df.head()

In [None]:
len(unique_books_df)

In [None]:
unique_books_df['title_author'] = unique_books_df['book_title'] + " " + unique_books_df['authors']

In [None]:
unique_books_df = unique_books_df.dropna(subset=['title_author'])

In [None]:
len(unique_books_df)

In [None]:
from sklearn.preprocessing import LabelEncoder

# Instantiate the LabelEncoder
label_encoder = LabelEncoder()

In [None]:
unique_books_df['genre_encoded'] = label_encoder.fit_transform(unique_books_df['genre'])

In [None]:
# Save the label_encoder to a file
with open('label_encoder.pkl', 'wb') as file:
    pickle.dump(label_encoder, file)

In [None]:
unique_books_df.head()

In [None]:
unique_books_df['genre_encoded'].value_counts()

## Paramaters

In [None]:
# Number of examples to use for training
TRAINING_SIZE = 20000

# Vocabulary size of the tokenizer
VOCAB_SIZE = 10000

# Maximum length of the padded sequences
MAX_LENGTH = 32

# Type of padding
PADDING_TYPE = 'pre'

# Specifies how to truncate the sequences
TRUNC_TYPE = 'post'

## Split Dataset

In [None]:
def remove_stopwords(sentence):
    # List of stopwords
    stopwords = ["a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "could", "did", "do", "does", "doing", "down", "during", "each", "few", "for", "from", "further", "had", "has", "have", "having", "he", "he'd", "he'll", "he's", "her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", "i've", "if", "in", "into", "is", "it", "it's", "its", "itself", "let's", "me", "more", "most", "my", "myself", "nor", "of", "on", "once", "only", "or", "other", "ought", "our", "ours", "ourselves", "out", "over", "own", "same", "she", "she'd", "she'll", "she's", "should", "so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", "themselves", "then", "there", "there's", "these", "they", "they'd", "they'll", "they're", "they've", "this", "those", "through", "to", "too", "under", "until", "up", "very", "was", "we", "we'd", "we'll", "we're", "we've", "were", "what", "what's", "when", "when's", "where", "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with", "would", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves" ]

    # Sentence converted to lowercase-only
    sentence = sentence.lower()

    # Get all the comma separated words in a list
    word_list = sentence.split()

    # Keep all the words which are not stopwords
    words = [w for w in word_list if w not in stopwords]

    # Reconstruct sentence after discarding all stopwords
    sentence = " ".join(words)

    return sentence

In [None]:
train_df, test_df = train_test_split(
    unique_books_df[["title_author", "genre_encoded"]],
    test_size=0.2,
    random_state=42,
    stratify=unique_books_df["genre_encoded"]  # Ensure proportional split based on 'genre'
)

# Check the distribution of genres
print("Training set genre distribution:\n", train_df["genre_encoded"].value_counts(normalize=True))
print("\nTesting set genre distribution:\n", test_df["genre_encoded"].value_counts(normalize=True))

In [None]:
train_sentences = train_df['title_author'].apply(remove_stopwords).values
train_labels = train_df['genre_encoded'].values

test_sentences = test_df['title_author'].apply(remove_stopwords).values
test_labels = test_df['genre_encoded'].values

## Data Preprocessing

In [None]:
# Instantiate the vectorization layer
vectorize_layer = tf.keras.layers.TextVectorization(max_tokens=VOCAB_SIZE)

# Generate the vocabulary based on the training inputs
vectorize_layer.adapt(train_sentences)

In [None]:
# Save the vocabulary to a file
vocabulary = vectorize_layer.get_vocabulary()

# Save the vocabulary using pickle
with open('vectorizer_vocab.pkl', 'wb') as file:
    pickle.dump(vocabulary, file)

In [None]:
# Put the sentences and labels in a tf.data.Dataset
train_dataset = tf.data.Dataset.from_tensor_slices((train_sentences,train_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_sentences,test_labels))

In [None]:
def preprocessing_fn(dataset):
  '''Generates padded sequences from a tf.data.Dataset'''

  # Apply the vectorization layer to the string features
  dataset_sequences = dataset.map(
      lambda text, label: (vectorize_layer(text), label)
      )

  # Put all elements in a single ragged batch
  dataset_sequences = dataset_sequences.ragged_batch(
      batch_size=dataset_sequences.cardinality()
      )

  # Output a tensor from the single batch. Extract the sequences and labels.
  sequences, labels = dataset_sequences.get_single_element()

  # Pad the sequences
  padded_sequences = tf.keras.utils.pad_sequences(
      sequences.numpy(),
      maxlen=MAX_LENGTH,
      truncating=TRUNC_TYPE,
      padding=PADDING_TYPE
      )

  # Convert back to a tf.data.Dataset
  padded_sequences = tf.data.Dataset.from_tensor_slices(padded_sequences)
  labels = tf.data.Dataset.from_tensor_slices(labels)

  # Combine the padded sequences and labels
  dataset_vectorized = tf.data.Dataset.zip(padded_sequences, labels)

  return dataset_vectorized

In [None]:
# Preprocess the train and test data
train_dataset_vectorized = train_dataset.apply(preprocessing_fn)
test_dataset_vectorized = test_dataset.apply(preprocessing_fn)

In [None]:
# View 2 training sequences and its labels
for example in train_dataset_vectorized.take(2):
  print(example)
  print()

In [None]:
SHUFFLE_BUFFER_SIZE = 1000
PREFETCH_BUFFER_SIZE = tf.data.AUTOTUNE
BATCH_SIZE = 32

# Optimize and batch the datasets for training
train_dataset_final = (train_dataset_vectorized
                       .cache()
                       .shuffle(SHUFFLE_BUFFER_SIZE)
                       .prefetch(PREFETCH_BUFFER_SIZE)
                       .batch(BATCH_SIZE)
                       )

test_dataset_final = (test_dataset_vectorized
                      .cache()
                      .prefetch(PREFETCH_BUFFER_SIZE)
                      .batch(BATCH_SIZE)
                      )

## Plot Utility

In [None]:
def plot_loss_acc(history):
  '''Plots the training and validation loss and accuracy from a history object'''
  acc = history.history['accuracy']
  val_acc = history.history['val_accuracy']
  loss = history.history['loss']
  val_loss = history.history['val_loss']

  epochs = range(len(acc))

  fig, ax = plt.subplots(1,2, figsize=(12, 6))
  ax[0].plot(epochs, acc, 'bo', label='Training accuracy')
  ax[0].plot(epochs, val_acc, 'b', label='Validation accuracy')
  ax[0].set_title('Training and validation accuracy')
  ax[0].set_xlabel('epochs')
  ax[0].set_ylabel('accuracy')
  ax[0].legend()

  ax[1].plot(epochs, loss, 'bo', label='Training Loss')
  ax[1].plot(epochs, val_loss, 'b', label='Validation Loss')
  ax[1].set_title('Training and validation loss')
  ax[1].set_xlabel('epochs')
  ax[1].set_ylabel('loss')
  ax[1].legend()

  plt.show()

## Build and Compile the Model

In [None]:
# Parameters
EMBEDDING_DIM = 16
LSTM_DIM = 32
DENSE_DIM = 24

# Model definition with LSTM
model_lstm = tf.keras.Sequential([
    tf.keras.Input(shape=(MAX_LENGTH,)),
    tf.keras.layers.Embedding(input_dim=VOCAB_SIZE, output_dim=EMBEDDING_DIM),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(LSTM_DIM)),
    tf.keras.layers.Dense(DENSE_DIM, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

# Set the training parameters
model_lstm.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['accuracy'])

# Print the model summary
model_lstm.summary()

## Train the Model

In [None]:
NUM_EPOCHS = 10

# Train the model
history_lstm = model_lstm.fit(train_dataset_final, epochs=NUM_EPOCHS, validation_data=test_dataset_final)

In [None]:
# Plot the accuracy and loss
plot_loss_acc(history_lstm)

## Save Model

In [None]:
# Save the model weights
model_lstm.save_weights('model_genre_classification_weights.h5')

## Prediction

In [None]:
# Load the model weights (this assumes the model is already defined)
model_lstm.load_weights('/content/model_genre_classification_weights.h5')

In [None]:
# Define the text preprocessing and prediction function
def predict_genre(text):
    # Preprocess the text input (remove stopwords and vectorize)
    processed_text = remove_stopwords(text)  # Assuming 'remove_stopwords' is defined

    # Apply the text vectorization
    vectorized_text = vectorize_layer([processed_text])  # Apply vectorization

    # Pad the vectorized input to ensure it's of the correct length (MAX_LENGTH)
    padded_text = tf.keras.preprocessing.sequence.pad_sequences(vectorized_text, maxlen=MAX_LENGTH, padding=PADDING_TYPE, truncating=TRUNC_TYPE)

    # Predict the genre probabilities
    genre_probabilities = model_lstm.predict(padded_text)

    # Get the predicted genre index (class with the highest probability)
    predicted_genre_index = np.argmax(genre_probabilities, axis=1)[0]

    # Map the predicted index to the genre name using the inverse of the LabelEncoder
    predicted_genre = label_encoder.inverse_transform([predicted_genre_index])[0]

    return predicted_genre

In [None]:
with open('label_encoder.pkl', 'rb') as file:
    label_encoder = pickle.load(file)

In [None]:
input_text = "I'll Be Seeing You"
predicted_genre = predict_genre(input_text)
print(f'The predicted genre for the "{input_text}" text is: "{predicted_genre}"')

# Model 5: OCR

In [None]:
def extract_text_from_image(image_path):
    try:
        # Open the image
        img = Image.open(image_path)

        # Extract text using Tesseract
        extracted_text = pytesseract.image_to_string(img)

        # Clean the extracted text
        preprocessed_text = ' '.join(extracted_text.split())

        # Display the extracted text
        print("Extracted text from the image:")
        print(preprocessed_text)

        # Ask the user if the result is correct
        is_correct = input("\nIs the extracted text correct? (y/n): ").strip().lower()

        # If incorrect, allow manual input
        if is_correct == 'n':
            preprocessed_text = input("Please enter the text manually: ").strip()

        # Return the final text
        return preprocessed_text

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

In [None]:
# Example usage
image_path = '/content/book-covers-big-2019101610.jpg'
result_text = extract_text_from_image(image_path)
print("\nFinal processed text:")
print(result_text)