In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [2]:
conf = SparkConf() \
    .set("spark.mongodb.input.uri", "mongodb://localhost:27017/amazon.reviews") \
    .set("spark.mongodb.output.uri", "mongodb://localhost:27017/amazon.reviews") \
    .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .set("spark.kryoserializer.buffer.max", "512m") # set the Kryo serialization buffer size

# create a Spark session
spark = SparkSession.builder \
    .appName("MongoDB to PySpark") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config(conf=conf) \
    .getOrCreate()

In [3]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Show the first 10 rows of the DataFrame
df.show(10)

+--------------------+----------+-------+--------------------+--------------+--------+
|                 _id|      asin|overall|             summary|unixReviewTime|verified|
+--------------------+----------+-------+--------------------+--------------+--------+
|{645781fe5f85f503...|B017O9P72A|    1.0|very buggy doesnt...|    1449792000|   false|
|{645781fe5f85f503...|B017O9P72A|    4.0|      so far so good|    1449532800|   false|
|{645781fe5f85f503...|B017O9P72A|    1.0|         time waster|    1449446400|   false|
|{645781fe5f85f503...|B017O9P72A|    2.0|               buggy|    1449273600|   false|
|{645781fe5f85f503...|B017O9P72A|    1.0|     stopped working|    1517529600|   false|
|{645781fe5f85f503...|B017O9P72A|    5.0|               great|    1515974400|   false|
|{645781fe5f85f503...|B017O9P72A|    1.0|        returning to|    1515110400|   false|
|{645781fe5f85f503...|B017O9P72A|    1.0|can not connect t...|    1515024000|   false|
|{645781fe5f85f503...|B017O9P72A|    1.0|co

In [4]:
import nltk
from pyspark.sql.functions import udf
from pyspark.sql.functions import udf, when
from pyspark.sql.types import ArrayType, StringType

nltk.download('stopwords')
nltk.download('punkt')

# Define the two lists
positive_words  = ["excellent", "awesome", "effective", "outstanding", "amazing", "good", "great", "love", "perfect", "devotion"]
negative_words  = ["terrible", "poor", "disappointing", "awful", "horrible", "dreadful", "unsatisfactory", "subpar", "lousy", "inferior"]

# Define a UDF to tokenize, lowercase, and remove stop words while also filtering based on the two lists
stopwords = set(nltk.corpus.stopwords.words('english'))
def preprocess_summary(summary):
    tokens = nltk.tokenize.word_tokenize(summary.lower())
    tokens = [token for token in tokens if token not in stopwords and (token in positive_words or token in negative_words)]
    return tokens

preprocess_summary_udf = udf(preprocess_summary, ArrayType(StringType()))
df = df.withColumn("verified", when(df["verified"]=="true", 1).otherwise(0))
# Apply the UDF to the "summary" column and create a new column "processed_summary"
df = df.withColumn("processed_summary", preprocess_summary_udf(df['summary']))

# Show the first 10 rows of the processed summary column
df.show(10, False)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\PC\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\PC\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


+--------------------------+----------+-------+-----------------------+--------------+--------+-----------------+
|_id                       |asin      |overall|summary                |unixReviewTime|verified|processed_summary|
+--------------------------+----------+-------+-----------------------+--------------+--------+-----------------+
|{645781fe5f85f503f9406617}|B017O9P72A|1.0    |very buggy doesnt work |1449792000    |0       |[]               |
|{645781fe5f85f503f9406618}|B017O9P72A|4.0    |so far so good         |1449532800    |0       |[good]           |
|{645781fe5f85f503f9406619}|B017O9P72A|1.0    |time waster            |1449446400    |0       |[]               |
|{645781fe5f85f503f940661a}|B017O9P72A|2.0    |buggy                  |1449273600    |0       |[]               |
|{645781fe5f85f503f940661b}|B017O9P72A|1.0    |stopped working        |1517529600    |0       |[]               |
|{645781fe5f85f503f940661c}|B017O9P72A|5.0    |great                  |1515974400    |0 

In [5]:
df.show(10)

+--------------------+----------+-------+--------------------+--------------+--------+-----------------+
|                 _id|      asin|overall|             summary|unixReviewTime|verified|processed_summary|
+--------------------+----------+-------+--------------------+--------------+--------+-----------------+
|{645781fe5f85f503...|B017O9P72A|    1.0|very buggy doesnt...|    1449792000|       0|               []|
|{645781fe5f85f503...|B017O9P72A|    4.0|      so far so good|    1449532800|       0|           [good]|
|{645781fe5f85f503...|B017O9P72A|    1.0|         time waster|    1449446400|       0|               []|
|{645781fe5f85f503...|B017O9P72A|    2.0|               buggy|    1449273600|       0|               []|
|{645781fe5f85f503...|B017O9P72A|    1.0|     stopped working|    1517529600|       0|               []|
|{645781fe5f85f503...|B017O9P72A|    5.0|               great|    1515974400|       0|          [great]|
|{645781fe5f85f503...|B017O9P72A|    1.0|        return

In [6]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())

from pyspark.ml.feature import StringIndexer
#StringIndexer transformer
indexer = StringIndexer(inputCol="asin", outputCol="asin_index")
indexed_df = indexer.fit(df).transform(df)
print(indexed_df)

DataFrame[_id: struct<oid:string>, asin: string, overall: double, summary: string, unixReviewTime: int, verified: int, processed_summary: array<string>, id: bigint, asin_index: double]


In [7]:
indexed_df.show(10)

+--------------------+----------+-------+--------------------+--------------+--------+-----------------+---+----------+
|                 _id|      asin|overall|             summary|unixReviewTime|verified|processed_summary| id|asin_index|
+--------------------+----------+-------+--------------------+--------------+--------+-----------------+---+----------+
|{645781fe5f85f503...|B017O9P72A|    1.0|very buggy doesnt...|    1449792000|       0|               []|  0|  241721.0|
|{645781fe5f85f503...|B017O9P72A|    4.0|      so far so good|    1449532800|       0|           [good]|  1|  241721.0|
|{645781fe5f85f503...|B017O9P72A|    1.0|         time waster|    1449446400|       0|               []|  2|  241721.0|
|{645781fe5f85f503...|B017O9P72A|    2.0|               buggy|    1449273600|       0|               []|  3|  241721.0|
|{645781fe5f85f503...|B017O9P72A|    1.0|     stopped working|    1517529600|       0|               []|  4|  241721.0|
|{645781fe5f85f503...|B017O9P72A|    5.0

In [8]:
df = indexed_df.drop("summary", "_id","asin")
# df.write.parquet("data_for_training.parquet")
df.show(10)

+-------+--------------+--------+-----------------+---+----------+
|overall|unixReviewTime|verified|processed_summary| id|asin_index|
+-------+--------------+--------+-----------------+---+----------+
|    1.0|    1449792000|       0|               []|  0|  241721.0|
|    4.0|    1449532800|       0|           [good]|  1|  241721.0|
|    1.0|    1449446400|       0|               []|  2|  241721.0|
|    2.0|    1449273600|       0|               []|  3|  241721.0|
|    1.0|    1517529600|       0|               []|  4|  241721.0|
|    5.0|    1515974400|       0|          [great]|  5|  241721.0|
|    1.0|    1515110400|       0|               []|  6|  241721.0|
|    1.0|    1515024000|       0|               []|  7|  241721.0|
|    1.0|    1514592000|       0|               []|  8|  241721.0|
|    1.0|    1514505600|       0|               []|  9|  241721.0|
+-------+--------------+--------+-----------------+---+----------+
only showing top 10 rows



In [9]:
# Sample the original dataframe
sampled_df = df.sample(withReplacement=False, fraction=0.1, seed=42)
sampled_df.show(10)


+-------+--------------+--------+-----------------+---+----------+
|overall|unixReviewTime|verified|processed_summary| id|asin_index|
+-------+--------------+--------+-----------------+---+----------+
|    1.0|    1515024000|       0|               []|  7|  241721.0|
|    1.0|    1510272000|       0|               []| 16|  241721.0|
|    1.0|    1505260800|       0|               []| 18|  241721.0|
|    1.0|    1482969600|       0|               []| 50|  241721.0|
|    1.0|    1482710400|       0|               []| 58|  241721.0|
|    4.0|    1481241600|       0|               []| 63|  241721.0|
|    1.0|    1480982400|       0|               []| 66|  241721.0|
|    5.0|    1477353600|       0|               []| 72|  241721.0|
|    1.0|    1476921600|       0|               []| 79|  241721.0|
|    5.0|    1472860800|       0|               []| 88|  241721.0|
+-------+--------------+--------+-----------------+---+----------+
only showing top 10 rows



In [17]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Cast the "id" column to integer
sampled_df = sampled_df.withColumn("id", col("id").cast("integer"))

# Split the data into training and test sets
(training, test) = sampled_df.randomSplit([0.8, 0.2])

# Create the ALS model
als = ALS(userCol="id", itemCol="asin_index", ratingCol="overall", coldStartStrategy="drop")

# Fit the model to the training data
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="overall", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

userRecs = model.recommendForAllUsers(3)
# Show the recommendations for the first user
userRecs.filter(col("id") == 0).show()
print('yes')
userRecs.write.parquet("user_recommendations.parquet")