In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder.appName("CollaborativeFilteringExample").getOrCreate()

In [3]:
path = r"C:\Users\ASUS\Documents\12_FALL 23\CSE488-Big Data Analytics\Project\Book reviews\Book reviews\BX-Book-Ratings.csv"


In [4]:
data = spark.read.csv(path, header=True, inferSchema=True)


In [5]:
data.head()

Row("User-ID";"ISBN";"Book-Rating"='"276725";"034545104X";"0"')

In [6]:



user_indexer = StringIndexer(inputCol="User-ID", outputCol="userIndex").fit(data)
isbn_indexer = StringIndexer(inputCol="ISBN", outputCol="isbnIndex").fit(data)

# Apply indexing to the DataFrame
data = user_indexer.transform(data)
data = isbn_indexer.transform(data)

# Split the data into training and testing sets
(training, test) = data.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS
als = ALS(maxIter=5, regParam=0.01, userCol="userIndex", itemCol="isbnIndex", ratingCol="Book-Rating", coldStartStrategy="drop")
model = als.fit(training)

# Generate top-k recommendations for each user
userRecs = model.recommendForAllUsers(10)  # You can change 10 to the desired value of k

# Create a feature vector for k-NN clustering
feature_cols = ["userIndex", "isbnIndex", "Book-Rating"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data_for_knn = assembler.transform(data)

# Perform k-NN clustering using KMeans
kmeans = KMeans(k=5, seed=1, featuresCol="features", predictionCol="cluster")
knn_model = kmeans.fit(data_for_knn)
clustered_data = knn_model.transform(data_for_knn)

# Show the clustering results
clustered_data.select("User-ID", "ISBN", "Book-Rating", "cluster").show()

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Book-Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")


Py4JJavaError: An error occurred while calling o33.fit.
: org.apache.spark.SparkException: Input column User-ID does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
