In [1]:
import random
import findspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import rank, col, udf
from pyspark.ml.evaluation import RegressionEvaluator

findspark.init()
spark = SparkSession.builder.appName('TestRecommender')\
                    .config('spark.executors.cores', 4)\
                    .getOrCreate()
ratings = spark.read.option('header', 'true').option("inferSchema", "true").csv('ml-latest-small/ratings.csv')

In [2]:
ARHR = 0
hitCount = 0

def createSeed(x):
    return random.randint(1,21)*x

randomSeed=udf(lambda x: createSeed(x), IntegerType())

window = Window.partitionBy(ratings['userId']).orderBy(randomSeed('timestamp'))

onlyBestRatings = ratings.where('rating > 4.5')
leftOutTestDataset = onlyBestRatings.select('*', F.rank().over(window).alias('rank'))\
                                    .where((F.col('rank') <=1) )\
                                    .orderBy('userId').drop('rank').cache()

trainingDataset = ratings.subtract(leftOutTestDataset).cache()

als = ALS(maxIter=20, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')
model = als.fit(trainingDataset)

leftOutPredictions = model.transform(leftOutTestDataset)

userRecs = model.recommendForAllUsers(30)

# PosExplode is important now!
topNRecommendations = userRecs.select('userId', F.posexplode('recommendations'))\
                              .select('userId',
                                      F.col('col.movieId').alias('movieId'),
                                      F.col('col.rating').alias('prediction'),
                                      (F.col('pos') + 1).alias('rank'))

hitRecommendations = topNRecommendations.alias('a')\
                                        .join(leftOutPredictions.alias('b'),
                                              (F.col('a.userId') == F.col('b.userId')) &
                                              (F.col('a.movieId') == F.col('b.movieId')))\
                                        .select('a.userId', 'a.movieId', 'b.prediction', 'rating', 'rank')\
                                        .withColumn('reciprocalHit', 1/F.col('rank'))


In [3]:
hitRecommendations.show()

+------+-------+----------+------+----+-------------------+
|userId|movieId|prediction|rating|rank|      reciprocalHit|
+------+-------+----------+------+----+-------------------+
|   577|   5618| 5.8065705|   5.0|  14|0.07142857142857142|
|   446|    318|  5.454803|   5.0|  30|0.03333333333333333|
|   514|   1111|  5.470815|   5.0|   6|0.16666666666666666|
|   520|   1221| 5.0117245|   5.0|  30|0.03333333333333333|
+------+-------+----------+------+----+-------------------+

