In [10]:
import pyspark.sql.functions as F
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("BookRecommendation").getOrCreate()

# Load data into a DataFrame (assuming it's a CSV file)
df = spark.read.csv("filtered_books.csv", header=True, inferSchema=True)

# Filter out missing values if needed
df = df.filter(df.title.isNotNull() & df.average_rating.isNotNull())

# Select relevant columns (excluding shelves)
df = df.select("asin", "title", "author_name", "average_rating", "description")

# Tokenize title and description for text processing
tokenizer_title = Tokenizer(inputCol="title", outputCol="title_tokens")
df = tokenizer_title.transform(df)

tokenizer_description = Tokenizer(inputCol="description", outputCol="description_tokens")
df = tokenizer_description.transform(df)

                                                                                

In [11]:
from pyspark.ml.feature import HashingTF, IDF

# TF-IDF Vectorizer for title tokens
hashingTF = HashingTF(inputCol="title_tokens", outputCol="title_tf", numFeatures=10000)
df = hashingTF.transform(df)

idf = IDF(inputCol="title_tf", outputCol="title_tfidf")
idfModel = idf.fit(df)
df = idfModel.transform(df)


                                                                                

In [23]:
from pyspark.sql.functions import col

def cosine_similarity(v1, v2):
    if v1 is None or v2 is None:
        return float(0)  # or return None, depending on your requirements
    return float(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))

cosine_sim_udf = udf(cosine_similarity, DoubleType())


In [22]:
# Check for null values in features

df.filter(df.features.isNull()).show()


+----+-----+-----------+--------------+-----------+------------+------------------+--------+-----------+--------+
|asin|title|author_name|average_rating|description|title_tokens|description_tokens|title_tf|title_tfidf|features|
+----+-----+-----------+--------------+-----------+------------+------------------+--------+-----------+--------+
+----+-----+-----------+--------------+-----------+------------+------------------+--------+-----------+--------+



In [None]:
# Check for null values in features
df.filter(df.features.isNull()).show()

# Confirm the existence of the book with asin "10"
df.filter(df.asin == "10").show()

# Self-join the dataframe to calculate similarity with all other books
df_with_sim = df.alias("i").join(df.alias("j"), F.col("i.asin") != F.col("j.asin"))

# Filter out rows with null features
df_with_sim = df_with_sim.filter(F.col("i.features").isNotNull() & F.col("j.features").isNotNull())

# UDF for Cosine Similarity
def cosine_similarity(v1, v2):
    return float(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))

cosine_sim_udf = udf(cosine_similarity, DoubleType())

# Calculate similarity
df_with_sim = df_with_sim.withColumn("similarity", cosine_sim_udf(F.col("i.features"), F.col("j.features")))

# Get top 5 similar books for a given book
top5_books = df_with_sim.filter(F.col("i.asin") == "10").orderBy(F.desc("similarity")).limit(5)
top5_books.show()


24/09/25 16:42:53 ERROR Executor: Exception in task 0.0 in stage 38.0 (TID 177)
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`Tokenizer$$Lambda$3600/0x00000008414cd040`: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apac

Py4JJavaError: An error occurred while calling o885.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 177) (192.168.29.248 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`Tokenizer$$Lambda$3600/0x00000008414cd040`: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.foreach(WholeStageCodegenEvaluatorFactory.scala:41)
	at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.compute(CartesianProductExec.scala:46)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`Tokenizer$$Lambda$3600/0x00000008414cd040`: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.foreach(WholeStageCodegenEvaluatorFactory.scala:41)
	at org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.compute(CartesianProductExec.scala:46)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.lang.NullPointerException


In [35]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Tokenizer, HashingTF, IDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
import numpy as np
from pyspark.ml.linalg import Vectors
import h5py

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BookRecommendationSystem") \
    .getOrCreate()

# Load data into a DataFrame (assuming it's a CSV file)
df = spark.read.csv("filtered_books.csv", header=True, inferSchema=True)

# Filter out missing values if needed
df = df.filter(df.title.isNotNull() & df.average_rating.isNotNull())

# Select relevant columns (excluding shelves)
df = df.select("asin", "title", "author_name", "average_rating", "description")

# Tokenize the title and description
title_tokenizer = Tokenizer(inputCol="title", outputCol="title_tokens")
df = title_tokenizer.transform(df)

# Compute term frequency for title
hashingTF_title = HashingTF(inputCol="title_tokens", outputCol="title_tf", numFeatures=10000)
df = hashingTF_title.transform(df)

# Compute IDF for title
idf_title = IDF(inputCol="title_tf", outputCol="title_tfidf")
idf_model = idf_title.fit(df)
df = idf_model.transform(df)

# Combine the features into one feature vector
assembler = VectorAssembler(inputCols=["title_tfidf"], outputCol="features")
df = assembler.transform(df)

# Check for null values in features
null_features = df.filter(df.features.isNull())
if null_features.count() > 0:
    print("Rows with null features:")
    null_features.show()

# UDF for Cosine Similarity
def cosine_similarity(v1, v2):
    if v1 is None or v2 is None:
        return float(0)
    v1_array = v1.toArray()
    v2_array = v2.toArray()
    return float(np.dot(v1_array, v2_array) / (np.linalg.norm(v1_array) * np.linalg.norm(v2_array)))

cosine_sim_udf = udf(cosine_similarity, DoubleType())

# Filter out null features before joining
df_filtered = df.filter(df.features.isNotNull())

# Self-join the DataFrame to calculate similarity with all other books
df_with_sim = df_filtered.alias("i").join(df_filtered.alias("j"), col("i.asin") != col("j.asin"))

# Calculate similarity
df_with_sim = df_with_sim.withColumn("similarity", cosine_sim_udf(col("i.features"), col("j.features")))

# Function to get top N similar books using KNN
def get_top_n_similar_books(df_sim, asin, title, n=5):
    return df_sim.filter(
        (col("i.asin") == asin) & 
        (col("j.asin") != asin) & 
        (col("i.title") != col("j.title"))  # Ensure titles are not the same
    ).orderBy(col("similarity").desc()).limit(n)


# Example usage
top5_books = get_top_n_similar_books(df_with_sim, "67", 5)
#convert top5_books to dict

# Save model as dictionary
top5_books_dict = top5_books.select("j.asin", "j.title", "similarity").rdd.map(lambda x: x.asDict()).collect()
# Show the results
print(top5_books_dict)
top5_books.select("j.asin", "j.title", "similarity").show()

# Save model as H5 file (optional)
with h5py.File('book_recommendation_model.h5', 'w') as hf:
    hf.create_dataset("book_similarities", data=top5_books.select("similarity").rdd.flatMap(lambda x: x).collect())
    hf.create_dataset("book_ids", data=top5_books.select("j.asin").rdd.flatMap(lambda x: x).collect())


                                                                                

[{'asin': '11685', 'title': 'Poems of the Known World', 'similarity': 0.8378670690517136}, {'asin': '7420', 'title': 'The Basic Bakunin', 'similarity': 0.6779153380617311}, {'asin': '72', 'title': 'Artesia: Adventures in the Known World', 'similarity': 0.6254417483401823}, {'asin': '</p><p>The case quickly becomes the new Trial of the Century. Mafia don Dominic Cavello', 'title': ' known as the Electrician', 'similarity': 0.5313938082301664}, {'asin': '100346', 'title': 'Three Books of Known Space', 'similarity': 0.5133964614194417}]


                                                                                

+--------------------+--------------------+------------------+
|                asin|               title|        similarity|
+--------------------+--------------------+------------------+
|               11685|Poems of the Know...|0.8378670690517136|
|                7420|   The Basic Bakunin|0.6779153380617311|
|                  72|Artesia: Adventur...|0.6254417483401823|
|</p><p>The case q...| known as the Ele...|0.5313938082301664|
|              100346|Three Books of Kn...|0.5133964614194417|
+--------------------+--------------------+------------------+



                                                                                

In [49]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Tokenizer, HashingTF, IDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
import numpy as np
import h5py
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.neighbors import NearestNeighbors

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BookRecommendationSystem") \
    .master("local[*]") \
    .getOrCreate()


# Load data into a DataFrame (assuming it's a CSV file)
df = spark.read.csv("filtered_books.csv", header=True, inferSchema=True)

# Filter out missing values if needed
df = df.filter(df.title.isNotNull() & df.average_rating.isNotNull())

# Select relevant columns (excluding shelves)
df = df.select("asin", "title", "author_name", "average_rating", "description")

# Tokenize the title and description
title_tokenizer = Tokenizer(inputCol="title", outputCol="title_tokens")
df = title_tokenizer.transform(df)

# Compute term frequency for title
hashingTF_title = HashingTF(inputCol="title_tokens", outputCol="title_tf", numFeatures=10000)
df = hashingTF_title.transform(df)

# Compute IDF for title
idf_title = IDF(inputCol="title_tf", outputCol="title_tfidf")
idf_model = idf_title.fit(df)
df = idf_model.transform(df)

# Combine the features into one feature vector
assembler = VectorAssembler(inputCols=["title_tfidf"], outputCol="features")
df = assembler.transform(df)

# Check for null values in features
null_features = df.filter(df.features.isNull())
if null_features.count() > 0:
    print("Rows with null features:")
    null_features.show()

# UDF for Cosine Similarity
def cosine_similarity(v1, v2):
    if v1 is None or v2 is None:
        return float(0)
    v1_array = v1.toArray()
    v2_array = v2.toArray()
    return float(np.dot(v1_array, v2_array) / (np.linalg.norm(v1_array) * np.linalg.norm(v2_array)))

cosine_sim_udf = udf(cosine_similarity, DoubleType())

# Filter out null features before joining
df_filtered = df.filter(df.features.isNotNull())

# Self-join the DataFrame to calculate similarity with all other books
df_with_sim = df_filtered.alias("i").join(df_filtered.alias("j"), col("i.asin") != col("j.asin"))

# Calculate similarity using Cosine Similarity
df_with_sim = df_with_sim.withColumn("similarity", cosine_sim_udf(col("i.features"), col("j.features")))

# Function to get top N similar books using Cosine Similarity
def get_top_n_similar_books(df_sim, asin, n=5):
    return df_sim.filter(
        (col("i.asin") == asin) & 
        (col("j.asin") != asin) & 
        (col("i.title") != col("j.title"))  # Ensure titles are not the same
    ).orderBy(col("similarity").desc()).limit(n)

# Get recommendations using Cosine Similarity
asin_example = "67"  # Example ASIN
top5_books_cosine = get_top_n_similar_books(df_with_sim, asin_example, 5)

# Convert top5_books to dict for Cosine Similarity
top5_books_dict_cosine = top5_books_cosine.select("j.asin", "j.title", "similarity").rdd.map(lambda x: x.asDict()).collect()
print("Cosine Similarity Recommendations:", top5_books_dict_cosine)

# KNN implementation using scikit-learn
# Get the features as a numpy array for scikit-learn
features_array = np.array(df.select("features").rdd.map(lambda x: x.features.toArray()).collect())

# Train the KNN model using scikit-learn
knn = NearestNeighbors(n_neighbors=5, algorithm='auto')
knn.fit(features_array)

# Example: Get the 5 nearest neighbors for the first book
distances, indices = knn.kneighbors(features_array[0].reshape(1, -1))

# Collect the recommended ASINs and their distances
knn_recommendations = []
for i in range(len(indices[0])):
    asin_knn = df.select("asin").collect()[indices[0][i]].asin
    knn_recommendations.append({'asin': asin_knn, 'distance': distances[0][i]})

print("KNN Recommendations:", knn_recommendations)

# Function to evaluate recommendations
def evaluate_recommendations(true_recommendations, predicted_recommendations):
    y_true = np.array(true_recommendations)
    y_pred = np.array(predicted_recommendations)

    precision = precision_score(y_true, y_pred, average='binary')
    recall = recall_score(y_true, y_pred, average='binary')
    f1 = f1_score(y_true, y_pred, average='binary')

    return precision, recall, f1

# Placeholder for actual relevant recommendations
true_recommendations = [...]  # Actual ASINs of books that are relevant

# Get recommended ASINs from the results
predicted_recommendations_cosine = [book['asin'] for book in top5_books_dict_cosine]
predicted_recommendations_knn = [book['asin'] for book in knn_recommendations]

# Evaluate the recommendations
precision_cosine, recall_cosine, f1_cosine = evaluate_recommendations(true_recommendations, predicted_recommendations_cosine)
precision_knn, recall_knn, f1_knn = evaluate_recommendations(true_recommendations, predicted_recommendations_knn)

# Print evaluation results
print(f"Cosine Similarity - Precision: {precision_cosine}, Recall: {recall_cosine}, F1 Score: {f1_cosine}")
print(f"KNN - Precision: {precision_knn}, Recall: {recall_knn}, F1 Score: {f1_knn}")

# Save model as H5 file (optional)
with h5py.File('book_recommendation_model.h5', 'w') as hf:
    hf.create_dataset("book_similarities", data=top5_books_cosine.select("similarity").rdd.flatMap(lambda x: x).collect())
    hf.create_dataset("book_ids", data=top5_books_cosine.select("j.asin").rdd.flatMap(lambda x: x).collect())


ConnectionRefusedError: [Errno 111] Connection refused