Load datasets
-----------------

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import Column, col, count, mean, udf, UserDefinedFunction
import re

genresList = ["Crime", "Romance", "Thriller", "Adventure", "Drama", "War", "Documentary", "Fantasy", "Mystery", \
                  "Musical", "Animation", "Film-Noir", "(no genres listed)", "IMAX", "Horror", "Western", \
                  "Comedy", "Children", "Action", "Sci-Fi"]

spark = SparkSession.builder.appName("Recommendation ALS").config("spark.executor.memory", "3g")\
    .config("spark.driver.cores", "4").getOrCreate()

# do something to prove it works
movies_df = spark.read.option("header", "true").csv("data/movies.csv", inferSchema=True)
links_df = spark.read.option("header", "true").csv("data/links.csv", inferSchema=True).cache()
movies_df = movies_df.join(links_df, on = ['movieId']).cache()
ratings_df = spark.read.option("header", "true").csv("data/ratings.csv", inferSchema=True).cache()
tags_df = spark.read.option("header", "true").csv("data/tags.csv", inferSchema=True).cache()

def setGenresMatrix(genres):
    movieGenresMatrix = []
    movieGenresList = genres.split('|')
    for x in genresList:
        if (x in movieGenresList):
            movieGenresMatrix.append(1)
        else:
            movieGenresMatrix.append(0) 
    return movieGenresMatrix

udf_parse_genres = UserDefinedFunction(lambda str: setGenresMatrix(str), ArrayType(IntegerType()))


movies_df = movies_df.withColumn("genresMatrix", udf_parse_genres(col("genres")))
ratings_df = ratings_df.groupBy("movieId").agg(mean("rating").alias("mean_rating"), count("rating")\
                                               .alias("count_rating"))
movies_df = movies_df.join(ratings_df, on = "movieId").select("movieId", "title", "genresMatrix", \
                                                              "mean_rating", "count_rating")

def getYear(title):
    result = re.search(r'\(\d{4}\)', title)
    if result:
        found = result.group(0).strip('(').strip(')')
    else: 
        found = 0
    return int(found)

udf_parse_year = udf(lambda str: getYear(str), IntegerType())
movies_df = movies_df.withColumn("year", udf_parse_year(col("title")))

from pyspark.sql.functions import regexp_replace
movies_df = movies_df.withColumn("title", regexp_replace("title", "\(\d{4}\)", ""))

In [2]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import concat, collect_set, udf, when
from pyspark.sql.types import ArrayType, StringType
from functools import reduce

# Tokenize text
tokenizer = Tokenizer(inputCol='tag', outputCol='tags_token')
df_words_token = tokenizer.transform(tags_df).select('movieId', 'tags_token')

# Remove stop words
remover = StopWordsRemover(inputCol='tags_token', outputCol='tags_clean')
df_words_no_stopw = remover.transform(df_words_token).select("movieId", "tags_clean")

def fudf(val):
    return reduce (lambda x, y:x+y, val)

flattenUdf = udf(fudf, ArrayType(StringType()))

df_words_no_stopw = df_words_no_stopw.groupBy("movieId").agg(collect_set("tags_clean")).select("movieId", \
                        flattenUdf("collect_set(tags_clean)").alias("tags_clean"))

def null1_as_empty_list(x):
    return when(col(x) != None, col(x)).otherwise([])

fillNullUdf = udf(null1_as_empty_list, ArrayType(StringType()))

movies_df = movies_df.join(df_words_no_stopw, on="movieId", how="left").cache()
movies_df.show()

+-------+--------------------+--------------------+------------------+------------+----+--------------------+
|movieId|               title|        genresMatrix|       mean_rating|count_rating|year|          tags_clean|
+-------+--------------------+--------------------+------------------+------------+----+--------------------+
|      1|          Toy Story |[0, 0, 0, 1, 0, 0...|3.9209302325581397|         215|1995|        [fun, pixar]|
|      2|            Jumanji |[0, 0, 0, 1, 0, 0...|3.4318181818181817|         110|1995|[magic, board, ga...|
|      3|   Grumpier Old Men |[0, 1, 0, 0, 0, 0...|3.2596153846153846|          52|1995|        [moldy, old]|
|      4|  Waiting to Exhale |[0, 1, 0, 0, 1, 0...| 2.357142857142857|           7|1995|                null|
|      5|Father of the Bri...|[0, 0, 0, 0, 0, 0...|3.0714285714285716|          49|1995| [pregnancy, remake]|
|      6|               Heat |[1, 0, 1, 0, 0, 0...| 3.946078431372549|         102|1995|                null|
|      7| 

Compute the item feature vector
------

In [3]:
genresSimilarityWeight = 0.8
tagsSimilarityWeight = 2
tagsSimilarityWeight = 2
yearDistanceWeight = 0.1
ratingAvgWeight = 0.2

Check similarity of movies
------

Step 4. Compute the cosine similarities and predict item ratings
--------

In [4]:
movieId = 9

basisGenres = movies_df.filter(movies_df['movieId'] == movieId).select("genresMatrix").collect()[0][0]
basisYear = movies_df.filter(movies_df['movieId'] == movieId).select('year').collect()[0][0]
basisRatingAvg = movies_df.filter(movies_df['movieId'] == movieId).select('mean_rating').collect()[0][0]

from scipy.spatial.distance import cosine
from pyspark.sql.functions import col, udf, abs
from pyspark.sql.types import DoubleType

def consineFunc(genresVal):
    return float(cosine(basisGenres, genresVal))

consineUdf = udf(consineFunc, DoubleType())

movies_df.show()

from pyspark.ml.feature import Word2Vec
w2v = Word2Vec(vectorSize=100, minCount=1, inputCol='tags_clean', outputCol = 'vector')
model = w2v.fit(movies_df)
model.show()

moviesWithSim = movies_df.withColumn("similarity", consineUdf("genresMatrix") * genresSimilarityWeight + \
                                    abs(basisRatingAvg - col("mean_rating")) * ratingAvgWeight + \
                                    abs(basisYear - col("year")) / 100 * yearDistanceWeight + \
)

moviesWithSim.sort("similarity", ascending = False).select("movieId", "title", "similarity").show(10)

+-------+--------------------+--------------------+------------------+------------+----+--------------------+
|movieId|               title|        genresMatrix|       mean_rating|count_rating|year|          tags_clean|
+-------+--------------------+--------------------+------------------+------------+----+--------------------+
|      1|          Toy Story |[0, 0, 0, 1, 0, 0...|3.9209302325581397|         215|1995|        [fun, pixar]|
|      2|            Jumanji |[0, 0, 0, 1, 0, 0...|3.4318181818181817|         110|1995|[magic, board, ga...|
|      3|   Grumpier Old Men |[0, 1, 0, 0, 0, 0...|3.2596153846153846|          52|1995|        [moldy, old]|
|      4|  Waiting to Exhale |[0, 1, 0, 0, 1, 0...| 2.357142857142857|           7|1995|                null|
|      5|Father of the Bri...|[0, 0, 0, 0, 0, 0...|3.0714285714285716|          49|1995| [pregnancy, remake]|
|      6|               Heat |[1, 0, 1, 0, 0, 0...| 3.946078431372549|         102|1995|                null|
|      7| 

Py4JJavaError: An error occurred while calling o239.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 416, localhost, executor driver): java.lang.NullPointerException
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.mllib.feature.Word2Vec.learnVocab(Word2Vec.scala:196)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:309)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:186)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


References
--------------

* [Content Based Recommender System in Python](https://medium.com/@tomar.ankur287/content-based-recommender-system-in-python-2e8e94b16b9e)

* [Data Science Series: Content-based Recommender System using Azure Databricks](https://visualbi.com/blogs/business-intelligence/data-science/data-science-series-content-based-recommender-system-using-azure-databricks/)

* [Movie Recommendation Algorithm](https://www.kaggle.com/bakostamas/movie-recommendation-algorithm)