In [66]:
! python3 -V

Python 3.10.12


In [67]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col, when

In [68]:
spark = SparkSession.builder.appName('SparkOperations').getOrCreate()

In [69]:
df = spark.read.json('movies.json')
df.show(5)

+-----------+----------+--------------------+--------------------+-----+--------------------+----------+--------------+
|helpfulness|product_id|        profile_name|              review|score|             summary|      time|       user_id|
+-----------+----------+--------------------+--------------------+-----+--------------------+----------+--------------+
|        7/7|B003AI2VGA|Brian E. Erland "...|Synopsis: On the ...|  3.0|"There Is So Much...|1182729600|A141HP4LYPWMSR|
|        4/4|B003AI2VGA|          Grady Harp|THE VIRGIN OF JUA...|  3.0|Worthwhile and Im...|1181952000|A328S9RN3U5M68|
|       8/10|B003AI2VGA|Chrissy K. McVay ...|The scenes in thi...|  5.0|This movie needed...|1164844800|A1I7QGUDP043DG|
|        1/1|B003AI2VGA|        golgotha.gov|THE VIRGIN OF JUA...|  3.0|distantly based o...|1197158400|A1M5405JH9THP9|
|        1/1|B003AI2VGA|KerrLines "&#34;M...|Informationally, ...|  3.0|"What's going on ...|1188345600| ATXL536YX71TR|
+-----------+----------+----------------

In [70]:
df

DataFrame[helpfulness: string, product_id: string, profile_name: string, review: string, score: double, summary: string, time: bigint, user_id: string]

In [71]:
from pyspark.ml.feature import StringIndexer

data = spark.read.json('movies.json')

# Initialize StringIndexer
indexer = StringIndexer(inputCol="product_id", outputCol="indexed_product_id")

# Fit and transform the data
data = indexer.fit(data).transform(data)

# Initialize StringIndexer
indexer = StringIndexer(inputCol="user_id", outputCol="indexed_user_id")

# Fit and transform the data
data = indexer.fit(data).transform(data)


# Prepare data (convert to appropriate types)
# For ALS, you need integer IDs, so ensure that userId and movieId are integers
data = data.withColumn("user_id", data["indexed_user_id"].cast("integer"))
data = data.withColumn("product_id", data["indexed_product_id"].cast("integer"))
data = data.withColumn("score", data["score"].cast("float"))

# Split the data into training and test sets
(training_data, test_data) = data.randomSplit([0.8, 0.2])

training_data.show(3)

+-----------+----------+-------------+--------------------+-----+--------------------+----------+-------+------------------+---------------+
|helpfulness|product_id| profile_name|              review|score|             summary|      time|user_id|indexed_product_id|indexed_user_id|
+-----------+----------+-------------+--------------------+-----+--------------------+----------+-------+------------------+---------------+
|        0/0|         6|      2Foxcee|I was in awe when...|  5.0|  Nostalgic Viewing!|1128988800|  27849|               6.0|        27849.0|
|        0/0|         6|A Buckeye Fan|How much did we l...|  5.0|            Memories|1203724800|   8280|               6.0|         8280.0|
|        0/0|         6|    A. Bilger|The songs I remem...|  5.0|Beat my high expe...|1263772800|  26952|               6.0|        26952.0|
+-----------+----------+-------------+--------------------+-----+--------------------+----------+-------+------------------+---------------+
only showing 

24/08/20 15:45:16 WARN DAGScheduler: Broadcasting large task binary with size 1434.9 KiB


In [72]:
data.select('user_id', 'product_id', 'score').show(50)

+-------+----------+-----+
|user_id|product_id|score|
+-------+----------+-----+
|     32|       731|  3.0|
|      3|       731|  3.0|
|    312|       731|  5.0|
|  10917|       731|  3.0|
|    173|       731|  3.0|
|  28065|       731|  2.0|
|  34353|       731|  1.0|
|  31316|       527|  5.0|
|  27884|       527|  5.0|
|  19575|       527|  5.0|
|  20592|       527|  4.0|
|  31841|       527|  5.0|
|   9472|       527|  5.0|
|  12215|       527|  5.0|
|  11514|       527|  4.0|
|  27333|       527|  5.0|
|  31888|       527|  5.0|
|  29767|       527|  5.0|
|   5346|       527|  5.0|
|   5905|       527|  5.0|
|  23015|       374|  5.0|
|   1208|       374|  5.0|
|  13062|       374|  5.0|
|  18159|       374|  5.0|
|   8881|       374|  5.0|
|  28273|       374|  5.0|
|  32273|       374|  5.0|
|     36|       374|  3.0|
|  10212|       374|  5.0|
|   3409|       374|  4.0|
|  30271|       374|  5.0|
|   7535|       374|  5.0|
|  34752|       374|  5.0|
|  27172|       374|  5.0|
|

24/08/20 15:45:16 WARN DAGScheduler: Broadcasting large task binary with size 1417.9 KiB


In [73]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize ALS model
als = ALS(
    maxIter=10,
    regParam=0.01,
    userCol="user_id",
    itemCol="product_id",
    ratingCol="score",
    coldStartStrategy="drop"
)

# Fit the model on the training data
model = als.fit(training_data)

# Make predictions on the test data
predictions = model.transform(test_data)

print("Predictions on test-data:")
predictions.select('prediction').show(5)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="score",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# Generate top 10 movie recommendations for each user
user_recs = model.recommendForAllUsers(10)
user_recs.show()

# Generate top 10 user recommendations for each movie
movie_recs = model.recommendForAllItems(10)
movie_recs.show()

# Stop the Spark session
spark.stop()

24/08/20 15:45:16 WARN DAGScheduler: Broadcasting large task binary with size 1445.6 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1448.0 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1449.5 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1450.8 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1449.7 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1451.0 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1451.8 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1454.9 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1456.3 KiB
24/08/20 15:45:17 WARN DAGScheduler: Broadcasting large task binary with size 1457.6 KiB
24/08/20 15:45:18 WARN DAGScheduler: Broadcasting large task binary with size 1459.0 KiB
24/08/20 15:45:18 WAR

Predictions on test-data:
+----------+
|prediction|
+----------+
| 0.7946394|
| 2.1851506|
|  2.264963|
|0.18443355|
|0.37264916|
+----------+
only showing top 5 rows



24/08/20 15:45:20 WARN DAGScheduler: Broadcasting large task binary with size 1433.1 KiB
24/08/20 15:45:21 WARN DAGScheduler: Broadcasting large task binary with size 1489.8 KiB
24/08/20 15:45:21 WARN DAGScheduler: Broadcasting large task binary with size 1488.4 KiB
24/08/20 15:45:21 WARN DAGScheduler: Broadcasting large task binary with size 1535.1 KiB
24/08/20 15:45:21 WARN DAGScheduler: Broadcasting large task binary with size 1536.9 KiB


Root-mean-square error = 3.705745542003063


24/08/20 15:45:23 WARN DAGScheduler: Broadcasting large task binary with size 1530.5 KiB
24/08/20 15:45:23 WARN DAGScheduler: Broadcasting large task binary with size 1536.9 KiB


+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|     31|[{295, 24.73688},...|
|     53|[{280, 34.535423}...|
|     65|[{524, 30.038858}...|
|     85|[{717, 29.74633},...|
|    133|[{609, 26.783337}...|
|    137|[{497, 18.771286}...|
|    148|[{409, 38.030365}...|
|    243|[{557, 21.46371},...|
|    251|[{524, 21.065498}...|
|    255|[{609, 40.45455},...|
|    296|[{485, 32.664505}...|
|    392|[{487, 21.62275},...|
|    451|[{484, 18.76271},...|
|    458|[{777, 12.584575}...|
|    463|[{540, 18.650095}...|
|    471|[{482, 20.19489},...|
|    472|[{540, 20.023144}...|
|    481|[{661, 22.829561}...|
|    496|[{280, 18.78143},...|
|    516|[{924, 20.769314}...|
+-------+--------------------+
only showing top 20 rows





+----------+--------------------+
|product_id|     recommendations|
+----------+--------------------+
|         1|[{114, 7.04719}, ...|
|         3|[{164, 8.059383},...|
|         6|[{53, 7.3535886},...|
|        12|[{274, 9.24094}, ...|
|        13|[{152, 7.7875967}...|
|        16|[{142, 9.762384},...|
|        20|[{255, 8.019079},...|
|        22|[{276, 8.496781},...|
|        26|[{244, 14.589832}...|
|        27|[{158, 8.411977},...|
|        28|[{291, 9.180564},...|
|        31|[{255, 10.677561}...|
|        34|[{255, 11.425577}...|
|        40|[{123, 14.355308}...|
|        44|[{357, 12.652631}...|
|        47|[{53, 17.900494},...|
|        52|[{276, 12.373009}...|
|        53|[{189, 12.72678},...|
|        54|[{214, 15.111511}...|
|        57|[{244, 18.820381}...|
+----------+--------------------+
only showing top 20 rows



24/08/20 15:45:24 WARN DAGScheduler: Broadcasting large task binary with size 1530.0 KiB
                                                                                