In [1]:
from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

from MovieLens import MovieLens

In [2]:
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .getOrCreate()

    lines = spark.read.option("header", "true").csv("/Users/nick.aristidou@convexin.com/Documents/Projects/Python/py.rec.sys/ml-latest-small/ratings.csv").rdd

    ratingsRDD = lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=int(p[3])))
    
    ratings = spark.createDataFrame(ratingsRDD)
    
    (training, test) = ratings.randomSplit([0.8, 0.2])

    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
    model = als.fit(training)

    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))

    userRecs = model.recommendForAllUsers(10)
    
    user85Recs = userRecs.filter(userRecs['userId'] == 85).collect()
    
    spark.stop()

    ml = MovieLens()
    ml.loadMovieLensLatestSmall()
        
    for row in user85Recs:
        for rec in row.recommendations:
            print(ml.getMovieName(rec.movieId))

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/09 14:41:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/09 14:41:33 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
21/11/09 14:41:33 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
21/11/09 14:41:33 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Root-mean-square error = 1.1037331264504389




American Sniper (2014)
Hachiko: A Dog's Story (a.k.a. Hachi: A Dog's Tale) (2009)
Zero Dark Thirty (2012)
Fistful of Dollars, A (Per un pugno di dollari) (1964)
Cabin Boy (1994)
Event Horizon (1997)
Way of the Gun, The (2000)
Interiors (1978)
Treasure Planet (2002)
Last Temptation of Christ, The (1988)


## Using 20M dataset

In [3]:
from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

import csv

In [4]:
import multiprocessing

multiprocessing.cpu_count()

12

In [None]:
def loadMovieNames():
    movieID_to_name = {}
    with open("../py.rec.sys/ml-20m/ratings.csv", newline='', encoding='ISO-8859-1') as csvfile:
        movieReader = csv.reader(csvfile)
        next(movieReader)  #Skip header line
        for row in movieReader:
            movieID = int(row[0])
            movieName = row[1]
            movieID_to_name[movieID] = movieName
    return movieID_to_name

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .config("spark.executor.cores", '4')\
        .getOrCreate()

    lines = spark.read.option("header", "true").csv("../py.rec.sys/ml-20m/ratings.csv").rdd

    ratingsRDD = lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=int(p[3])))
    
    ratings = spark.createDataFrame(ratingsRDD)
    
    (training, test) = ratings.randomSplit([0.8, 0.2])

    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
    model = als.fit(training)

    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))

    userRecs = model.recommendForAllUsers(10)
    
    user85Recs = userRecs.filter(userRecs['userId'] == 85).collect()
    
    spark.stop()

    movieID_to_name = loadMovieNames()
        
    for row in user85Recs:
        for rec in row.recommendations:
            if rec.movieId in movieID_to_name:
                print(movieID_to_name[rec.movieId])