In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline


# Create a Spark session with increased resources
spark = SparkSession.builder \
    .appName("Amazon Recommendation Model with ALS") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Read data from MongoDB and cache it
df = spark.read.format("mongo") \
    .option("uri", "mongodb://localhost:27017/amazon.model") \
    .load() \
    .dropna()

In [2]:
df = df.withColumn("overall", when(df.overall > 4, 1).otherwise(0))
sampled_data = df.sampleBy("overall", fractions={0: 0.001, 1: 0.001}, seed=42)
sampled_data.cache()
sampled_data.describe().show()


+-------+--------------------+-------------------+--------------------+
|summary|                asin|            overall|          reviewerID|
+-------+--------------------+-------------------+--------------------+
|  count|              232108|             232108|              232108|
|   mean|1.1571355080580082E9| 0.6372249125407138|                null|
| stddev|1.1955291145389128E9|0.48080174639693984|                null|
|    min|          0001046314|                  0|A0003492LQH8LJXPWDMZ|
|    max|          B01HJG3UQI|                  1|         AZZZZJYGA32|
+-------+--------------------+-------------------+--------------------+



In [3]:
from pyspark.ml.feature import StringIndexer
import pickle


sampled_data.cache()

# Index the reviewerID, asin, and overall columns
reviewer_indexer = StringIndexer(inputCol="reviewerID", outputCol="reviewer_index").fit(sampled_data)
reviewer_indexer.save("reviewer_indexer")

asin_indexer = StringIndexer(inputCol="asin", outputCol="asin_index").fit(sampled_data)
asin_indexer.save("asin_indexer")


overall_indexer = StringIndexer(inputCol="overall", outputCol="overall_index").fit(sampled_data)
overall_indexer.save("overall_indexer")


# Apply the indexers to the data
indexed_data = reviewer_indexer.transform(sampled_data)
indexed_data = asin_indexer.transform(indexed_data)
indexed_data = overall_indexer.transform(indexed_data)

(training_data, test_data) = indexed_data.randomSplit([0.7, 0.3], seed=42)

# Train the recommendation model
from pyspark.ml.recommendation import ALS

als = ALS(userCol="reviewer_index", itemCol="asin_index", ratingCol="overall",
         coldStartStrategy="drop", nonnegative=True, implicitPrefs=False,maxIter= 3,regParam=0.09,rank=8)
model = als.fit(training_data)



In [4]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.7066874033473669


In [5]:
from pyspark.sql.functions import col, expr
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))
predictions = predictions.withColumn("prediction", predictions["prediction"].cast(DoubleType()))
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(labelCol='overall',metricName='accuracy')
print("accuracy : " , multi_evaluator.evaluate(predictions))
# Filter out rows where the "prediction" column is null or not a number
#f1 = MulticlassClassificationEvaluator(labelCol='overall',metricName='f1')
#print("f1 score  : " , f1.evaluate(predictions))


evaluator = MulticlassClassificationEvaluator(labelCol="overall",
                                              predictionCol="prediction",
                                              metricName="f1")

f1_score = evaluator.evaluate(predictions)

print("F1 score = " + str(f1_score))
evaluator = MulticlassClassificationEvaluator(labelCol="overall",
                                              predictionCol="prediction",
                                              metricName="weightedRecall")



print("recall : ",str(evaluator.evaluate(predictions)))










Root-mean-square error = 0.7066874033473669
accuracy :  0.2577565632458234
F1 score = 0.19250991440414877
recall :  0.2577565632458234


In [6]:
model.save("als_modelFinal")
    