In [31]:
import os
import pyspark
import sys
from pyspark.sql import SparkSession
from pyspark.mllib.recommendation import ALS
from os.path import join, isfile, dirname

In [40]:
def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return int(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

In [41]:
def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

In [42]:
def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print("File %s does not exist." % ratingsFile)
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print("No ratings provided.")
        sys.exit(1)
    else:
        return ratings

In [43]:
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [49]:
spark = SparkSession.builder \
    .master("local")\
    .appName("Movie Recommendation Engine")\
    .config("spark.executor.memory", "1gb")\
    .getOrCreate()

sc = spark.sparkContext

In [50]:
path = "C:/Users/AineR/Desktop/movielens/medium/"
ratings = sc.textFile(path + "ratings.dat")
movies = sc.textFile(path + "movies.dat")
users = sc.textFile(path + "users.dat")
myRatings = loadRatings(os.path.abspath("C:/Users/AineR/Desktop/movielens/personalRatings.txt"))
myRatingsRDD = sc.parallelize(myRatings, 1)

In [51]:
ratings = ratings.map(parseRating)
movies = movies.map(parseMovie).collect()

In [52]:
ratings.take(2)
movies.take(3)

AttributeError: 'list' object has no attribute 'take'