In [None]:
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round, monotonically_increasing_id, col
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark import SparkFiles
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.feature import Normalizer, BucketedRandomProjectionLSH
from sentence_transformers import SentenceTransformer
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import UniversalSentenceEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import concat_ws, col
from pyspark.ml.feature import Normalizer, BucketedRandomProjectionLSH

# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F
import requests
import matplotlib.pyplot as plt
import time
import csv
import os
import pandas as pd



GITHUB_URL = "https://raw.githubusercontent.com/farhodibr/datasets/heads/main/books_recommender/"

In [None]:
from pyspark import SparkConf
conf = SparkConf()
conf.setMaster("local[*]").setAppName("MySparkApp")
conf.set("spark.executor.memory", "10g")



spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setCheckpointDir("checkpoint_dir_als")


In [None]:
#spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()
#import subprocess
#subprocess.run(["winutils","ls"], check=True)



In [None]:

books_file_name = "Books"
ratings_file_name = "Ratings"
users_file_name = "Users"
filenames = [books_file_name, ratings_file_name, users_file_name]

for filename in filenames:
    download_url = f"{GITHUB_URL}/{filename}.csv"
    spark.sparkContext.addFile(download_url)

ratings_df = spark.read.csv(SparkFiles.get(f"{ratings_file_name}.csv"),
  header=True,
  inferSchema=True,
  sep=",")

books_df = spark.read.csv(SparkFiles.get(f"{books_file_name}.csv"),
  header=True,
  inferSchema=True,
  sep=",")

users_df = spark.read.csv(SparkFiles.get(f"{users_file_name}.csv"),
  header=True,
  inferSchema=True,
  sep=",")

ratings_df.show(5)

In [None]:
# ALS data preparation - ALS good for sparse data

users = users_df.select("User-ID").distinct()
# We dont wan't the same data/id to be used in different partitions, so just force use 1 partition for this
users = users.coalesce(1)
users = users.withColumn("userIntId", monotonically_increasing_id()).persist()

books = ratings_df.select("ISBN").distinct()
# We dont wan't the same data/id to be used in different partitions, so just force use 1 partition for this
books = books.coalesce(1)
# users_df = users_df.withColumn("userIntId", monotonically_increasing_id()).persist() # use persist to keep these values the same
books = books.withColumn("bookIntId", monotonically_increasing_id()).persist()

ratings_df_int_ids = ratings_df.join(users, "User-ID", "left").join(books, "ISBN", "left")
ratings_df_int_ids.show()

In [None]:
ratings = ratings_df_int_ids.select(col("userIntId").alias("userId"), col("bookIntId").alias("bookId"), col("Book-Rating").alias("rating"))
ratings.show()

In [None]:
# Split the ratings dataframe into training and test data
(training_data, test_data) = ratings.randomSplit([0.7, 0.3], seed=42)

# Set the ALS hyperparameters

als = ALS(userCol="userId", itemCol="bookId", ratingCol="rating", rank = 10, maxIter = 15, regParam = .1,
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training_data
model = als.fit(training_data)

# Generate predictions on the test_data
test_predictions = model.transform(test_data)

In [None]:
test_predictions.show()

In [None]:
""" # Hyperparameter tuning

# Takes too long at the moment
# param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [10, 50, 75, 100]) \
#            .addGrid(als.maxIter, [5, 50, 100, 200]) \
#            .addGrid(als.regParam, [.01, .05, .1, .15]) \
#            .build()

param_grid = ParamGridBuilder() \
           .addGrid(als.rank, [10, 50]) \
           .addGrid(als.maxIter, [5, 50]) \
           .addGrid(als.regParam, [.01, .05]) \
           .build()

# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Use cross validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Checkpoint the training data to truncate its lineage.
# This is a lazy operation, it will be triggered by the .fit() call.
training_data_chkp = training_data.checkpoint()

# Fit the cross validator on the CHECKPOINTED DataFrame.
model = cv.fit(training_data_chkp)

# --- MODIFICATION END ---

best_model = model.bestModel


print (f"{len(param_grid)} models tested") """

In [None]:
""" # Print "Rank"
print("  Rank:", best_model.rank)
# Print "RegParam"
print("  params:", best_model.params) """

In [None]:
""" test_data_cached = test_data.cache()
test_predictions = best_model.transform(test_data_cached)

# Calculate the RMSE of test_predictions
RMSE = evaluator.evaluate(test_predictions)
print(RMSE) """

In [None]:
def create_k_core_subset(data, min_user_ratings=10, min_item_ratings=5):
    """
    Creates a k-core subset of a ratings dataframe.

    Args:
        data (pd.DataFrame): The ratings dataframe with 'user_id' and 'isbn' columns.
        min_user_ratings (int): The minimum number of ratings a user must have.
        min_item_ratings (int): The minimum number of ratings an item must have.

    Returns:
        pd.DataFrame: A new dataframe representing the k-core.
    """
    print("Starting k-core filtering...")
    k_core_data = data.alias("new_df")

    while True:
        # Store the original size to check for changes
        original_rows = k_core_data.count()
        print(f"Current dataset size: {original_rows} ratings")

        # Filter by minimum item ratings
        item_counts= k_core_data.groupBy('ISBN').count()
        items_to_keep= item_counts.filter(F.col("count") >= min_item_ratings)
        k_core_data =k_core_data.join(items_to_keep.select('ISBN'), 'ISBN', 'inner')

        user_counts = k_core_data.groupBy('User-ID').count()
        users_to_keep = user_counts.filter(F.col("count") >= min_user_ratings)
        # Keep only the User-IDs that are in our users_to_keep dataframe
        k_core_data = k_core_data.join(users_to_keep.select('User-ID'), 'User-ID', 'inner')

        # Checkpointing breaks the long lineage chain in iterative algorithms
        # to prevent StackOverflow errors
        k_core_data = k_core_data.checkpoint()

        current_rows = k_core_data.count()
        # Check if the size has stabilized
        if current_rows == original_rows:
            print(f"\\nK-core filtering complete. Final dataset size: {current_rows} ratings")
            break

    return k_core_data

# takes a while
# 
dense_subset = create_k_core_subset(ratings_df_int_ids, min_user_ratings=15, min_item_ratings=10)

In [None]:
def create_head_tail_slice(ratings_df, n_head_users, n_tail_users, n_head_books, n_tail_books, seed=123):
    """
    Performs head/tail sampling on a Spark DataFrame to create a representative subset.

    Args:
        ratings_df (DataFrame): The input ratings Spark DataFrame. Must contain 'User-ID' and 'ISBN'.
        n_head_users (int): Number of most active users to select.
        n_tail_users (int): Number of random less-active users to select.
        n_head_books (int): Number of most popular books to select from the user slice.
        n_tail_books (int): Number of random less-popular books to select.
        seed (int): Random seed for reproducibility.

    Returns:
        DataFrame: The final sampled slice.
    """
    

    # Get user counts 
    user_counts = ratings_df.groupBy("User-ID").count().withColumnRenamed("count", "user_rating_count")

    # Get the head users (most active)
    head_users_df = user_counts.orderBy(F.desc("user_rating_count")).limit(n_head_users)

    # Get the tail candidates by finding users not in the head list
    tail_candidates_df = user_counts.join(head_users_df, "User-ID", "left_anti")

    # Get the tail users by sampling randomly from the candidates
    # We do this by assigning a random number and taking the top N
    tail_users_df = tail_candidates_df.withColumn("random", F.rand(seed)).orderBy("random").limit(n_tail_users)

    # Combine head and tail users
    selected_users_df = head_users_df.select("User-ID").union(tail_users_df.select("User-ID"))

    # Restrict ratings to only the selected users (the user slice)
    user_slice_df = ratings_df.join(selected_users_df, "User-ID", "inner")
    
    # It's a good practice to cache intermediate results that will be reused
    user_slice_df.cache()
    
    # Get book counts from the user_slice
    book_counts_sub = user_slice_df.groupBy("ISBN").count().withColumnRenamed("count", "book_rating_count")

    # Get head books
    head_books_sub_df = book_counts_sub.orderBy(F.desc("book_rating_count")).limit(n_head_books)

    # Get tail book candidates
    tail_candidates_sub_df = book_counts_sub.join(head_books_sub_df, "ISBN", "left_anti")

    # Get tail books
    tail_books_sub_df = tail_candidates_sub_df.withColumn("random", F.rand(seed + 1)).orderBy("random").limit(n_tail_books)

    # Combine head and tail books
    selected_books_df = head_books_sub_df.select("ISBN").union(tail_books_sub_df.select("ISBN"))

    # Build the final slice by joining the user_slice with selected books
    final_slice_df = user_slice_df.join(selected_books_df, "ISBN", "inner")

    # Clean up the cache
    user_slice_df.unpersist()

    return final_slice_df


In [None]:
#we will pick 720 users and 800 books to create "long-tail" dataset
n_head_users, n_tail_users = 300, 800
n_head_books, n_tail_books = 300, 800

#create the final slice
final_slice = create_head_tail_slice(
    dense_subset,
    n_head_users, n_tail_users,
    n_head_books, n_tail_books
)

final_slice.cache()

display("Final slice shape:", (final_slice.count(), len(final_slice.columns)))
display("Selected users:", final_slice.select("User-ID").distinct().count())
display("Selected books:", final_slice.select('ISBN').distinct().count())

In [None]:
print("\\nCalculating long-tail distribution...")
bc_df = final_slice.groupBy("ISBN").count().orderBy(F.desc("count"))

# Collect the result to the driver as a pandas DataFrame
bc_pandas_df = bc_df.toPandas()

# Now, use the pandas logic for plotting
cum = bc_pandas_df['count'].cumsum() / bc_pandas_df['count'].sum()
n80 = (cum <= 0.8).sum()

#The denominator is the total number of unique books in the slice
total_books = len(bc_pandas_df)
print(f"{n80}/{total_books} books cover 80% → {n80/total_books:.1%}")

# Plotting using matplotlib
plt.hist(bc_pandas_df['count'].values, bins=20)
plt.title(f"{total_books}-book slice: ratings per book")
plt.xlabel("Ratings per book")
plt.ylabel("Number of books")
plt.show()

In [None]:
GITHUB_RAW_BASE = "https://raw.githubusercontent.com/farhodibr/PROJECT5/main"
SINGLE_URL      = f"{GITHUB_RAW_BASE}/book_metadata/metadata_all.csv"
PARTS_URL_FMT   = f"{GITHUB_RAW_BASE}/book_metadata_parts/metadata_part_{{}}.csv"

OUTPUT_DIR      = "book_metadata_parts"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# test if a raw GitHub URL exists 
def remote_exists(url: str) -> bool:
    try:
        return requests.head(url, timeout=5).status_code == 200
    except requests.RequestException:
        return False

final_schema = StructType([
    StructField("ISBN",             StringType(), True),
    StructField("title",            StringType(), True),
    StructField("description",      StringType(), True),
    StructField("authors",          StringType(), True),
    StructField("publish_date",     StringType(), True),
    StructField("publisher",        StringType(), True),
    StructField("number_of_pages",  IntegerType(), True),
    StructField("subjects",         StringType(), True)
])

# Try loading the single combined CSV from GitHub 
if remote_exists(SINGLE_URL):
    print("✅ Found metadata_all.csv on GitHub. Loading via pandas → Spark.")
    pdf = pd.read_csv(SINGLE_URL)
    # ensure integer pages
    pdf['number_of_pages'] = pdf['number_of_pages'].fillna(0).astype(int)
    metadata_df = (
        spark.createDataFrame(pdf, schema=final_schema)
             .cache()
    )

# Else try loading numbered parts from GitHub 
elif remote_exists(PARTS_URL_FMT.format(1)):
    print("found metadata parts on GitHub. Loading all parts via pandas → Spark.")
    parts_urls = []
    i = 1
    while remote_exists(PARTS_URL_FMT.format(i)):
        parts_urls.append(PARTS_URL_FMT.format(i))
        i += 1

    pdfs = []
    for url in parts_urls:
        part = pd.read_csv(url)
        part['number_of_pages'] = part['number_of_pages'].fillna(0).astype(int)
        pdfs.append(part)

    all_pdf = pd.concat(pdfs, ignore_index=True).drop_duplicates("ISBN")
    metadata_df = (
        spark.createDataFrame(all_pdf, schema=final_schema)
             .cache()
    )

#Otherwise fall back to fetching via Google Books API
else:
    print("No GitHub metadata found. Fetching via Google Books API…")
    GOOGLE_API_KEY = ""
    if not GOOGLE_API_KEY:
        raise RuntimeError("Please set the GOOGLE_BOOKS_API_KEY env var.")

    def build_url(isbn: str) -> str:
        return (
            f"https://www.googleapis.com/books/v1/volumes"
            f"?q=isbn:{isbn}&key={GOOGLE_API_KEY}"
        )

    def fetch_single_book_metadata(isbn, max_retries=3, initial_delay=1):
        url   = build_url(isbn)
        delay = initial_delay
        for attempt in range(1, max_retries+1):
            resp = requests.get(url, timeout=15)
            if resp.status_code == 429:
                ra   = resp.headers.get("Retry-After")
                wait = float(ra) if ra else delay
                print(f"[429] {isbn}, retry {attempt}/{max_retries} in {wait}s")
                time.sleep(wait)
                delay *= 2
                continue
            try:
                resp.raise_for_status()
                data = resp.json()
            except Exception as e:
                print(f"[ERROR] {isbn} attempt {attempt}: {e}")
                time.sleep(delay)
                delay *= 2
                continue
            items = data.get("items")
            if not items:
                return {"ISBN": isbn}
            info  = items[0].get("volumeInfo", {})
            pages = info.get("pageCount")
            try:
                pages = int(pages) if pages is not None else 0
            except:
                pages = 0
            time.sleep(0.3)
            return {
                "ISBN":            isbn,
                "title":           info.get("title", ""),
                "description":     info.get("description", ""),
                "authors":         ", ".join(info.get("authors", [])),
                "publish_date":    info.get("publishedDate", ""),
                "publisher":       info.get("publisher", ""),
                "number_of_pages": pages,
                "subjects":        ", ".join(info.get("categories", []))
            }
        print(f"[FAILED] ISBN {isbn} after {max_retries} attempts.")
        return {"ISBN": isbn, "number_of_pages": 0}

    def chunker(seq, size):
        for i in range(0, len(seq), size):
            yield seq[i:i+size]

    isbn_list = [r.ISBN for r in final_slice.select("ISBN").distinct().collect()]
    batches   = list(chunker(isbn_list, 100))

    all_results = []
    for idx, batch in enumerate(batches, 1):
        print(f"Batch {idx}/{len(batches)}: fetching {len(batch)} ISBNs…")
        with ThreadPoolExecutor(max_workers=2) as exe:
            futures = [exe.submit(fetch_single_book_metadata, isbn) for isbn in batch]
            for fut in as_completed(futures):
                all_results.append(fut.result())
        time.sleep(10)

    # save and load via Spark
    local_csv = f"{OUTPUT_DIR}/metadata_all_local.csv"
    pd.DataFrame(all_results).to_csv(local_csv, index=False)
    metadata_df = (
        spark.read
             .schema(final_schema)
             .option("header", True)
             .csv(local_csv)
             .cache()
    )


print("Metadata schema:")
metadata_df.printSchema()

preview_cols = ["ISBN", "title", "authors", "publish_date", "number_of_pages"]

print("Sample metadata (subset of columns):")
print(pdf[preview_cols]
      .head(5)
      .to_string(index=False))

print("\n Description snippet (first 150 chars):")
snippet = pdf[["ISBN", "description"]].copy()
snippet["desc_snippet"] = snippet["description"].str.slice(0, 150)
print(snippet[["ISBN", "desc_snippet"]]
      .head(5)
      .to_string(index=False))


In [None]:
from pyspark.sql import SparkSession
import os

# Define a compatible set of versions 
spark_nlp_version = "5.3.3" # A stable version for Spark 3.5.x
spark_version_scope = "3.5"
nlp_package = f"com.johnsnowlabs.nlp:spark-nlp-spark{spark_version_scope}_2.12:{spark_nlp_version}"

print(f"Configuring Spark Session to use Spark NLP package: {nlp_package}")

# Stop any existing session to ensure a clean start
try:
    SparkSession.builder.getOrCreate().stop()
    print("Stopped existing Spark session.")
except:
    pass

# Build a new session with the correct Spark NLP package included
spark = SparkSession.builder \
    .appName("SparkNLP_Book_Recommender") \
    .master("local[*]") \
    .config("spark.driver.memory", "16G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", nlp_package) \
    .getOrCreate()

print("Spark session configured and started successfully.")

import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.ml import Pipeline

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import UniversalSentenceEncoder
from pyspark.ml.feature import Normalizer, BucketedRandomProjectionLSH

print("\n--- Content-Based Recommender Setup ---")
print(f"Spark NLP version: {sparknlp.version()}")
print(f"Apache Spark version: {spark.version}")


metadata_path = "https://raw.githubusercontent.com/farhodibr/PROJECT5/refs/heads/main/book_metadata/metadata_all.csv"

if not os.path.exists(metadata_path):
    print(f"ERROR: '{metadata_path}' not found. Please ensure the file is in the correct directory.")
else:
    final_schema = StructType([
        StructField("ISBN", StringType(), True),
        StructField("title", StringType(), True),
        StructField("description", StringType(), True),
        StructField("authors", StringType(), True),
        StructField("publish_date", StringType(), True),
        StructField("publisher", StringType(), True),
        StructField("number_of_pages", IntegerType(), True),
        StructField("subjects", StringType(), True)
    ])

    metadata_df = (
        spark.read
             .schema(final_schema)
             .option("header", True)
             .csv(metadata_path)
             .na.fill({"title": "No Title", "description": "No Description"}) # Handle potential nulls
             .cache()
    )
    print("✅ Book metadata loaded successfully.")
    metadata_df.select("ISBN", "title").show(5, truncate=False)

    # We will convert book titles and descriptions into numerical vectors (embeddings).
    # Combine title and description into a single feature column
    df_for_nlp = metadata_df.withColumn(
        "text",
        F.concat_ws(" ", F.col("title"), F.col("description"))
    ).filter(F.col("text") != "No Title No Description")

    # Define the Spark NLP pipeline stages
    document_assembler = DocumentAssembler() \
        .setInputCol("text") \
        .setOutputCol("document")

    use = UniversalSentenceEncoder.pretrained() \
        .setInputCols(["document"]) \
        .setOutputCol("sentence_embeddings")

    finisher = Finisher() \
        .setInputCols(["sentence_embeddings"]) \
        .setOutputCols(["finished_embeddings"]) \
        .setOutputAsVector(True)

    nlp_pipeline = Pipeline(stages=[document_assembler, use, finisher])

    # Generate the embeddings
    print("\nGenerating book embeddings with Spark NLP (this may take a few minutes)...")
    embeddings_df = nlp_pipeline.fit(df_for_nlp).transform(df_for_nlp)
    
    # Extract the vector from the finished array
    embeddings_df = embeddings_df.withColumn("embedding", F.col("finished_embeddings")[0])
    print("✅ Embeddings generated.")


    # Use LSH to Find Similar Books 
    # Locality Sensitive Hashing (LSH) is an efficient way to find nearest neighbors.
    
    # Normalize embeddings for LSH
    normalizer = Normalizer(inputCol="embedding", outputCol="norm_embedding", p=2.0)
    normalized_df = normalizer.transform(embeddings_df)

    # Create and train the LSH model
    brp = BucketedRandomProjectionLSH(
        inputCol="norm_embedding",
        outputCol="hashes",
        bucketLength=2.0,
        numHashTables=5
    )
    
    print("\nTraining Locality Sensitive Hashing (LSH) model...")
    lsh_model = brp.fit(normalized_df)
    print("✅ LSH model trained.")


    # Create the Recommendation Function 
    # This function takes a book title and finds similar books.

    def get_content_based_recs(book_title: str, num_recs: int = 10) -> DataFrame:
        """
        Finds and returns content-based book recommendations for a given book title.
        """
        print("\n" + "="*50)
        print(f"Finding recommendations for: '{book_title}'")
        print("="*50)

        try:
            # Find the embedding vector for the input book
            book_embedding = normalized_df.filter(F.col("title") == book_title).first()["norm_embedding"]
        except (TypeError, IndexError):
            print(f"Could not find the book '{book_title}' in the dataset.")
            return spark.createDataFrame([], schema="title STRING, authors STRING, distance FLOAT")

        # Use the LSH model to find the most similar items
        print(f"Searching for the top {num_recs} most similar books...")
        recommendations_df = lsh_model.approxNearestNeighbors(normalized_df, book_embedding, num_recs + 1)

        # Format and display the results
        recs = recommendations_df.select("title", "authors", F.col("distCol").alias("distance")) \
                                 .filter(F.col("title") != book_title) \
                                 .orderBy("distance")
        
        if recs.count() == 0:
            print("No similar books found.")
        else:
            print("\n--- Recommendations ---")
            recs.show(num_recs, truncate=False)
        
        return recs

    # -
    # test content-based recommender directly.
    lost_world_recs = get_content_based_recs("The Lost World")
    beach_recs = get_content_based_recs("The Beach")

