In [1]:
# !hdfs dfs -put data/sample_movie_ratings.txt /sample_movie_ratings.txt

In [2]:
# Code from https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

lines = spark.read.option("header", "true").csv("data/interactions_train.csv").rdd
ratingsRDD = lines.map(lambda p: Row(userId=int(p[0]), recipeId=int(p[1]),
                                     rating=float(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

In [3]:
(unnorm_training, unnorm_test) = ratings.randomSplit([0.8, 0.2])
mean = unnorm_training.agg({'rating': 'mean'}).collect()[0][0]
std = unnorm_training.agg({'rating': 'std'}).collect()[0][0]
print(mean, std)
training = unnorm_training.withColumn("rating",(col("rating") - mean) / std)
test = unnorm_test.withColumn("rating",(col("rating") - mean) / std)

4.574791656605009 0.958166372324364


In [4]:
best_model = None
best_rmse = 100
for alpha in [0.01, 0.05, 0.1, 0.5, 1, 2]:
    for reg in [0.05, 0.1]:
        # Build the recommendation model using ALS on the training data
        # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
        als = ALS(rank=200, alpha=alpha, maxIter=10, regParam=reg, userCol="userId", itemCol="recipeId", ratingCol="rating",
                  coldStartStrategy="drop")
        model = als.fit(training)

        # Evaluate the model by computing the RMSE on the test data
        normalized_predictions = model.transform(test)
        predictions = normalized_predictions.withColumn(
            "rating",col("rating") * std + mean
        ).withColumn(
            "prediction",col("prediction") * std + mean
        )
        evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                        predictionCol="prediction")

        rmse = evaluator.evaluate(predictions)
        print(alpha, reg, rmse)
        if rmse < best_rmse:
            #print(f"Best model -> rank:{rank}, reg:{reg}, rmse:{rmse}")
            predictions.show()
            best_model = model
            best_rmse = rmse

        # Generate top 10 movie recommendations for each user
        #userRecs = model.recommendForAllUsers(10)
        # Generate top 10 user recommendations for each movie
        #recipeRecs = model.recommendForAllItems(10)

1 0.01 1.48371886416645
+------+--------+------+-------------------+
|userId|recipeId|rating|         prediction|
+------+--------+------+-------------------+
|  3794|    7508|   4.0| 10.241341926494322|
|  3794|   32045|   5.0|  4.067638789116292|
|  6357|     229|   4.0| 3.9965843885055836|
|  6357|    3722|   4.0|   4.42073889735041|
|  6357|    4174|   4.0| 4.7177544553732735|
|  6357|    5287|   5.0|  4.733217532082325|
|  6357|    8961|   0.0|-0.8306088962271225|
|  6357|    9233|   5.0|  6.697984928999947|
|  6357|   10392|   4.0|  4.731662295080575|
|  6357|   10455|   5.0| 5.7782395226640695|
|  6357|   11291|   4.0|    4.6651057272019|
|  6357|   11455|   5.0|  5.382089574138453|
|  6357|   11580|   5.0| 3.4035774470043627|
|  6357|   11642|   5.0|  4.670250358169597|
|  6357|   11644|   5.0|  4.497805618877775|
|  6357|   11749|   5.0|   4.52909787271883|
|  6357|   13366|   2.0| 3.7269355019082004|
|  6357|   13868|   4.0|  4.849276431205299|
|  6357|   14248|   5.0|  3.305

Py4JJavaError: An error occurred while calling o3444.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4351.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4351.0 (TID 8955) (aidans-air.lan executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:792)
	at org.apache.spark.ml.recommendation.ALS$.$anonfun$computeFactors$4(ALS.scala:1766)
	at org.apache.spark.ml.recommendation.ALS$$$Lambda$3818/281597203.apply(Unknown Source)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$3(PairRDDFunctions.scala:751)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3640/2119548997.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
	at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$4(CoGroupedRDD.scala:155)
	at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$4$adapted(CoGroupedRDD.scala:154)
	at org.apache.spark.rdd.CoGroupedRDD$$Lambda$3766/635422166.apply(Unknown Source)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
	at scala.collection.TraversableLike$WithFilter$$Lambda$133/7967307.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1075)
	at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:722)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:704)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:606)
	at sun.reflect.GeneratedMethodAccessor223.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:792)
	at org.apache.spark.ml.recommendation.ALS$.$anonfun$computeFactors$4(ALS.scala:1766)
	at org.apache.spark.ml.recommendation.ALS$$$Lambda$3818/281597203.apply(Unknown Source)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$3(PairRDDFunctions.scala:751)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3640/2119548997.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
	at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$4(CoGroupedRDD.scala:155)
	at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$4$adapted(CoGroupedRDD.scala:154)
	at org.apache.spark.rdd.CoGroupedRDD$$Lambda$3766/635422166.apply(Unknown Source)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
	at scala.collection.TraversableLike$WithFilter$$Lambda$133/7967307.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)


In [None]:
best_model.rank

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 53607)
Traceback (most recent call last):
  File "/Users/aidankeogh/opt/anaconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/aidankeogh/opt/anaconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/aidankeogh/opt/anaconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/aidankeogh/opt/anaconda3/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/Users/aidankeogh/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/Users/aidankeogh/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 235, in poll
    if func():
  Fi