In [1]:
import re
import csv
import time
import gc
import math
import numpy as np

import findspark
# Find Spark Locally
location = findspark.find()
findspark.init(location, edit_rc=True)

import pyspark as ps    # for the pyspark suite
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType, DateType, TimestampType
import pyspark.sql.functions as F

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("anime recommender") \
            .getOrCreate()

sc = spark.sparkContext

from pyspark.ml.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lower
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

## Load data
---

In [2]:
anime_filename = '../data/anime.csv'
ratings_filename = '../data/rating.csv'

In [3]:
# anime_raw = sc.textFile(anime_filename)
# ratings_raw = sc.textFile(ratings_filename)
anime_raw = spark.read.load(anime_filename,format='csv',header=True,inferSchema=True)
ratings_raw = spark.read.load(ratings_filename,format='csv',header=True,inferSchema=True)

In [4]:
anime_df = anime_raw.select(['anime_id','name'])

In [32]:
ratings_df = ratings_raw.select(['user_id','anime_id','rating'])

In [34]:
ratings_df = ratings_df.withColumn("rating", ratings_df["rating"].cast(FloatType()).alias("rating"))

In [35]:
ratings_df

DataFrame[user_id: int, anime_id: int, rating: float]

In [None]:
# ratings_df = ratings_df.withColumnRenamed('user_id','user')

In [59]:
# Define ALS model
model = ALS(
    userCol = 'user_id',
    itemCol = 'anime_id',
    ratingCol = 'rating',
    coldStartStrategy = 'drop'
)

In [60]:
# Split data
train,val,test = ratings_df.randomSplit((0.6,0.2,0.2))

### Running Methods / ALS Fitting

#### 1) tune model

In [8]:
maxIter = 10
ranks = np.arange(1, 11, 1).tolist()
regParams = np.arange(.1,1.1,0.1).tolist()

In [None]:
model

In [61]:
best_model = tune_ALS(model,train,val,10,regParams,ranks)

1 latent factors and regularization = 0.1: validation RMSE is 2.2011464978950492
1 latent factors and regularization = 0.2: validation RMSE is 2.202827835923233
1 latent factors and regularization = 0.30000000000000004: validation RMSE is 2.2096377309066715
1 latent factors and regularization = 0.4: validation RMSE is 2.2234510200660518
1 latent factors and regularization = 0.5: validation RMSE is 2.2445293865526303
1 latent factors and regularization = 0.6: validation RMSE is 2.271520275949202
1 latent factors and regularization = 0.7000000000000001: validation RMSE is 2.3025550723490604
1 latent factors and regularization = 0.8: validation RMSE is 2.3363427072759335
1 latent factors and regularization = 0.9: validation RMSE is 2.372644176592594
1 latent factors and regularization = 1.0: validation RMSE is 2.411826121534815
2 latent factors and regularization = 0.1: validation RMSE is 2.1367807788500066
2 latent factors and regularization = 0.2: validation RMSE is 2.1379416558518836
2

10 latent factors and regularization = 0.9: validation RMSE is 2.371091980684274
10 latent factors and regularization = 1.0: validation RMSE is 2.4113934214006436

The best model has 10 latent factors and regularization = 0.2


In [62]:
predictions = best_model.transform(test)

In [64]:
predictions = predictions.na.drop()
evaluator = RegressionEvaluator(metricName="rmse",
                                    labelCol="rating",
                                    predictionCol="prediction")

rmse = evaluator.evaluate(predictions)
print('The out-of-sample RMSE of the best tuned model is:', rmse)

The out-of-sample RMSE of the best tuned model is: 2.0566319906497097


In [71]:
del train, val, test, predictions, evaluator
gc.collect()


4877

In [72]:
best_model

ALSModel: uid=ALS_aeaa57d5214b, rank=10

In [38]:
als=ALS(userCol='user_id',itemCol='anime_id',rank=1,maxIter=10,regParam=.1)

In [39]:
model = als.fit(train)

In [18]:
model

ALSModel: uid=ALS_f9f1e73d6b49, rank=1

In [40]:
predictions = model.transform(val)

In [41]:
predictions

DataFrame[user_id: int, anime_id: int, rating: float, prediction: float]

In [42]:
predictions.take(2)

[Row(user_id=28932, anime_id=148, rating=7.0, prediction=6.485733509063721),
 Row(user_id=7266, anime_id=148, rating=10.0, prediction=6.97218132019043)]

In [None]:
# Check if there are NANs in predictions (debugging why my rmse is returning NaN)

In [48]:
predictions.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in predictions.columns]).show()

+-------+--------+------+----------+
|user_id|anime_id|rating|prediction|
+-------+--------+------+----------+
|      0|       0|     0|      1393|
+-------+--------+------+----------+



In [53]:
predictions = predictions.na.drop()

In [54]:
predictions.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in predictions.columns]).show()

+-------+--------+------+----------+
|user_id|anime_id|rating|prediction|
+-------+--------+------+----------+
|      0|       0|     0|         0|
+-------+--------+------+----------+



In [55]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [56]:
evaluator.evaluate(predictions)

2.200517412304992

In [11]:
def test_run_tuning(model,training_data,validation_data,maxIter,regParam,rank):
    min_error = math.inf
    best_rank = -1
    best_regularization = 0
    best_model = None
    
    als=ALS(userCol='user_id',itemCol='anime_id',rank=rank,maxIter=maxIter,regParam=regParam)
    model = als.fit(training_data)
    predictions = model.transform(validation_data)
    evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print(rmse)
    if rmse < min_error:
        min_error = rmse
        best_rank = rank
        best_regularization = reg
        best_model = model
        
    return best_model

#### Methods

In [57]:
def tune_ALS(model,training_data, validation_data, maxIter, regParams, ranks):
    
    min_error = math.inf
    best_rank = -1
    best_regularization = 0
    best_model = None
    
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            als = ALS(userCol='user_id',itemCol='anime_id',rank=rank,maxIter=maxIter,regParam=reg)
            # train ALS model
            model = als.fit(training_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)
            # drop na in predictions
            predictions = predictions.na.drop()
            
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="rating",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return best_model    

In [None]:
def tune_model(maxIter,regParams,ranks,split_ratio=(0.6,0.2,0.2)):
    train, val, test = ratings_df.randomSplit(split_ratio)
    # tune model to get best model for predictions
    tuned_model = tune_ALS(model, train, val, maxIter, regParams, ranks)
    
    # test model
    predictions = tuned_model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse",
                                    labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print('The out-of-sample RMSE of the best tuned model is:', rmse)
    # clean up
    del train, val, test, predictions, evaluator
    gc.collect()

In [66]:
# Regex matching closest name to animes
def regex_matching(fav_anime):
    print('You have input anime:', fav_anime)
    matches_df = anime_df \
        .filter(
            lower(
                col('name')
            ).like('%{}%'.format(fav_anime.lower()))
        ) \
        .select('anime_id', 'name')
    if not len(matches_df.take(1)):
        print('Oops! No match is found')
    else:
        anime_ids = matches_df.rdd.map(lambda r: r[0]).collect()
        names = matches_df.rdd.map(lambda r: r[1]).collect()
        print('Found possible matches in our database: '
              '{0}\n'.format([x for x in names]))
        return anime_ids

In [67]:
# Append a user's anime ratings to ratings_df
def append_ratings(user_id,anime_ids):
     # create new user rdd
    user_rdd = self.sc.parallelize(
        [(user_id, anime_id, 5.0) for anime_id in anime_ids])
    # transform to user rows
    user_rows = user_rdd.map(
        lambda x: Row(
            user_id=int(x[0]),
            anime_id=int(x[1]),
            rating=float(x[2])
        )
    )
    # transform rows to spark DF
    user_df = spark.createDataFrame(user_rows) \
        .select(ratings_df.columns)
    # append to ratingsDF
    ratings_df = ratings_df.union(user_df)

In [68]:
def create_inference_data(user_id, anime_ids):
    """
    input:
        user_id: int
        anime_ids: list
        
    return:
        inference_df: dataframe
    """
    
    other_anime_ids = anime_df \
        .filter(~col('anime_id').isin(anime_ids)) \
        .select(['anime_id']) \
        .rdd.map(lambda r: r[0]) \
        .collect()
    
    # create inference rdd
    inference_rdd = sc.parallelize(
        [(user_id, anime_id) for anime_id in other_anime_ids]
    ).map(
        lambda x: Row(
            user_id=int(x[0]),
            anime_id=int(x[1]),
        )
    )
    # transform to inference DF
    inference_df = spark.createDataFrame(inference_rdd) \
        .select(['user_id', 'anime_id'])
    
    return inference_df

In [75]:
def make_inference(model,fav_anime,n_recommendations):
    # create a userId
    user_id = ratings_df.agg({"user_id": "max"}).collect()[0][0] + 1
    # get movieIds of favorite movies
    anime_ids = regex_matching(fav_anime)
    # append new user with his/her ratings into data
    append_ratings(user_id, anime_ids)
    # matrix factorization
    model = model.fit(ratings_df)
    # get data for inferencing
    inference_df = create_inference_data(user_id, anime_ids)
    # make inference
    return model.transform(inference_df) \
        .select(['anime_id', 'prediction']) \
        .orderBy('prediction', ascending=False) \
        .rdd.map(lambda r: (r[0], r[1])) \
        .take(n_recommendations)

In [73]:
def make_recommendations(fav_anime,n_recommendations):
    print('Recommendation system start to make inference ...')
    t0 = time.time()
    raw_recommends = \
        make_inference(best_model, fav_anime, n_recommendations)
    anime_ids = [r[0] for r in raw_recommends]
    scores = [r[1] for r in raw_recommends]
    print('It took my system {:.2f}s to make inference \n\
          '.format(time.time() - t0))
    # get movie titles
    anime_titles = anime_df \
        .filter(col('anime_id').isin(anime_ids)) \
        .select('name') \
        .rdd.map(lambda r: r[0]) \
        .collect()
    # print recommendations
    print('Recommendations for {}:'.format(fav_anime))
    for i in range(len(anime_titles)):
        print('{0}: {1}, with rating '
              'of {2}'.format(i+1, anime_titles[i], scores[i]))

In [76]:
make_recommendations('naruto',10)

Recommendation system start to make inference ...
You have input anime: naruto


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8840.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8840.0 (TID 74372, c02wj19whtd5.local, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


---

In [None]:
new_ratings = ratings_raw.filter(lambda line: line != header) \
            .map(lambda line: line.split(",")) \
            .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))

In [None]:
anime_id, name, genre, type, episodes, rating, members = [ '{}'.format(x) for x in list(csv.reader([input_string], delimiter=',', quotechar='"'))[0] ]

In [None]:
header = ratings_RDD.take(1)[0]
        return ratings_RDD \
            .filter(lambda line: line != header) \
            .map(lambda line: line.split(",")) \
            .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))

In [None]:
def clean_anime_data(input_string):
    anime_id, name, genre, type, episodes, rating, members = [ '{}'.format(x) for x in list(csv.reader([input_string], delimiter=',', quotechar='"'))[0] ]
    anime_id = int(anime_id)
    episodes = int(episodes)
    rating = float(rating)
    members = int(members)
    return [(anime_id, name, type,rating,members, token) for token in genre.split(',')]

In [None]:
anime_clean = anime_raw.flatMap(clean_anime_data)

In [None]:
print(anime_clean.take(10))

In [None]:
anime_schema = StructType( [
    StructField('anime_id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('type',StringType(),True),
    StructField('rating',FloatType(),True),
    StructField('members',IntegerType(),True),
    StructField('genre',StringType(),True) ] )

anime = spark.createDataFrame(anime_clean, anime_schema)

In [None]:
anime

In [None]:
# pivot movie genres
anime = anime.groupBy("anime_id", "name", "type","rating","members")\
               .pivot("genre")\
               .agg(F.count(F.col('genre')))\
               .na.fill(0)

anime.show(5)
anime.printSchema()