In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from functools import reduce

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Amazon Book Recommendation") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "50g") \
    .getOrCreate()


In [6]:
file_path = r"C:\Users\abdel\Downloads\BigData_Project\Sales_Data\Books_rating.csv"
rating_df = spark.read.csv(file_path, header=True, inferSchema=True)
columns = ['Id','User_id','review/score','Title','review/text']
new_names = ['book_id','user_id','rating','title','review']
rating_df = rating_df.select([col for col in columns]).toDF(*new_names)
rating_df = rating_df.withColumn("rating", rating_df["rating"].cast("float"))

In [7]:
rating_df = rating_df.na.drop("any")

In [8]:
rating_df.printSchema()
print("Size of values: ",rating_df.count())
rating_df.show(5)

root
 |-- book_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- title: string (nullable = true)
 |-- review: string (nullable = true)

Size of values:  2420256
+----------+--------------+------+--------------------+--------------------+
|   book_id|       user_id|rating|               title|              review|
+----------+--------------+------+--------------------+--------------------+
|1882931173| AVCGYZL8FQQTD|   4.0|Its Only Art If I...|This is only for ...|
|0826414346|A30TK6U7DNS82R|   5.0|Dr. Seuss: Americ...|I don't care much...|
|0826414346|A3UH4UZ4RSVO82|   5.0|Dr. Seuss: Americ...|"If people become...|
|0826414346|A2MVUWT453QH61|   4.0|Dr. Seuss: Americ...|Theodore Seuss Ge...|
|0826414346|A22X4XUPKF66MR|   4.0|Dr. Seuss: Americ...|"Philip Nel - Dr....|
+----------+--------------+------+--------------------+--------------------+
only showing top 5 rows



In [10]:
sampled_rating = rating_df.sample(withReplacement=False, fraction=0.1, seed=42)

# Show the sampled data
sampled_rating.show()
sampled_rating.count()

+----------+--------------+------+--------------------+--------------------+
|   book_id|       user_id|rating|               title|              review|
+----------+--------------+------+--------------------+--------------------+
|0826414346|A2RSSXTDZDUSH4|   5.0|Dr. Seuss: Americ...|"When I recieved ...|
|0595344550| AUR0VA5H0C66C|   1.0|Whispers of the W...|"This is a self-p...|
|0595344550| ACO23CG8K8T77|   5.0|Whispers of the W...|I read the review...|
|0802841899|A2H2LORTA5EZY2|   4.0|The Church of Chr...|This is a very us...|
|0854968350| ATDE9JYCPI0L1|   2.0|Muslim Women's Ch...|"I was excited to...|
|0918973031|A32ZQ5DEXBL60Z|   5.0|Dramatica for Scr...|"I think the hard...|
|0918973031|A1X1CW1GXKC50V|   5.0|Dramatica for Scr...|This is the way t...|
|0792391810| A29Z0B2L367ZO|   5.0|Vector Quantizati...|It seems somebody...|
|0974289108|A3AJA5ADM3Q8LM|   5.0|"The Ultimate Gui...|Boy am I lucky to...|
|B000NKGYMK|A22T74YNRM8NTK|   5.0|    Alaska Sourdough|Make the most sub...|

242687

In [11]:
product_id_threshold = 200
user_id_threshold = 10
product_id_counts = sampled_rating.groupBy("book_id").count().filter(f"count >= {product_id_threshold}")
user_id_counts = sampled_rating.groupBy("user_id").count().filter(f"count >= {user_id_threshold}")

In [12]:
filtered_df = sampled_rating.join(product_id_counts, on="book_id", how="inner") \
                .join(user_id_counts, on="user_id", how="inner") \
                .drop("count")

In [13]:
string_indexer_product = StringIndexer(inputCol="book_id", outputCol="BookIdIndex")
string_indexer_user = StringIndexer(inputCol="user_id", outputCol="UserIdIndex")

In [14]:
pipeline = Pipeline(stages=[string_indexer_product, string_indexer_user])
indexed_df = pipeline.fit(filtered_df).transform(filtered_df)

In [18]:
(train_data, test_data) = indexed_df.randomSplit([0.8, 0.2], seed=42)

In [19]:
als = ALS(maxIter=5, regParam=0.01, userCol="UserIdIndex", itemCol="BookIdIndex", ratingCol="rating")
model = als.fit(train_data)

In [20]:
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): nan


In [21]:
# Generate recommendations
user_recs = model.recommendForAllUsers(5)

In [22]:
try:
    book_id_to_title = dict(
        indexed_df.select("BookIdIndex", "title")
        .rdd.map(lambda r: (r.BookIdIndex, r.title))
        .collect()
    )
    print("Lookup dictionary created successfully!")
except Exception as e:
    print("Error:", e)
    # Add additional error handling or logging as needed

Error: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 223.0 failed 1 times, most recent failure: Lost task 3.0 in stage 223.0 (TID 1064) (reem executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runT