### Rate Movies
#### 1. Generate user input to test ML algorithm using provided rateMovies script

In [14]:
import sys
import os
from os import remove
from os.path import join, isfile
from time import time

topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""

parentDir = os.path.abspath('')
ratingsFile = join(parentDir, "personalRatings.txt")

if isfile(ratingsFile):
    r = input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
        remove(ratingsFile)
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print(prompt)

now = int(time())
n = 0

f = open(ratingsFile, 'w')
for text in topMovies.split("\n"):
    ls = text.strip().split(",")
    valid = False
    while not valid:
        rStr = input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print(prompt)
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print("No rating provided!")

Please rate the following movie (1-5 (best), or 0 if not seen): 


### Load Data
#### 1. Create RDDs

In [15]:
import findspark
from pyspark.sql import SparkSession

findspark.init()
spark = SparkSession.builder.master("local")\
    .appName ("Movie Recommendation Engine")\
    .config("spark.executor.memory ", "1.5gb") \
    .getOrCreate()
sc = spark.sparkContext

movies_rdd = sc.textFile("movielens/medium/movies.dat")
ratings_rdd = sc.textFile("movielens/medium/ratings.dat")
personal_ratings_rdd = sc.textFile("personalRatings.txt")

### Data Exploration
#### 1. Split RDD lines on :: separators
#### 2. Convert RDDs to dataframes and set column headings
#### 3. Show first 3 rows of new dataframes to confirm correct format

In [16]:
from pyspark.sql import Row

print("Movies:")
movies_rdd = movies_rdd.map(lambda line: line.split("::"))
movies_df = movies_rdd.map(lambda line: Row(MovieID=line[0], Title=line[1], Genres=line[2])).toDF()
movies_df.show(3, truncate=False)

print("Ratings:")
ratings_rdd = ratings_rdd.map(lambda line: line.split("::"))
ratings_df = ratings_rdd.map(lambda line: Row(UserID=line[0], MovieID=line[1], Rating=line[2], Timestamp=line[3])).toDF()
ratings_df.show(3, truncate=False)

print("Personal Ratings:")
personal_ratings_rdd = personal_ratings_rdd.map(lambda line: line.split("::"))
personal_ratings_df = personal_ratings_rdd.map(lambda line: Row(UserID=line[0], MovieID=line[1], Rating=line[2], Timestamp=line[3])).toDF()
personal_ratings_df.show(3, truncate=False)

Movies:
+-------+-----------------------+----------------------------+
|MovieID|Title                  |Genres                      |
+-------+-----------------------+----------------------------+
|1      |Toy Story (1995)       |Animation|Children's|Comedy |
|2      |Jumanji (1995)         |Adventure|Children's|Fantasy|
|3      |Grumpier Old Men (1995)|Comedy|Romance              |
+-------+-----------------------+----------------------------+
only showing top 3 rows

Ratings:
+------+-------+------+---------+
|UserID|MovieID|Rating|Timestamp|
+------+-------+------+---------+
|1     |1193   |5     |978300760|
|1     |661    |3     |978302109|
|1     |914    |3     |978301968|
+------+-------+------+---------+
only showing top 3 rows

Personal Ratings:
+------+-------+------+----------+
|UserID|MovieID|Rating|Timestamp |
+------+-------+------+----------+
|0     |780    |5     |1633962673|
|0     |1210   |3     |1633962673|
|0     |648    |4     |1633962673|
+------+-------+------+---

#### 4. Check schemas and convert data types where appropriate

In [17]:
from pyspark.sql.types import IntegerType, FloatType

print("Movies")
movies_df.printSchema()
print("Ratings")
ratings_df.printSchema()
print("Personal Ratings")
personal_ratings_df.printSchema()

movies_df = movies_df.withColumn("MovieID", movies_df["MovieID"].cast(IntegerType()))

ratings_df = ratings_df.withColumn("UserID", ratings_df["UserID"].cast(IntegerType()))\
    .withColumn("MovieID", ratings_df["MovieID"].cast(IntegerType()))\
    .withColumn("Rating", ratings_df["Rating"].cast(IntegerType()))\
    .withColumn("Timestamp", ratings_df["Timestamp"].cast("bigint"))

personal_ratings_df = personal_ratings_df.withColumn("UserID", personal_ratings_df["UserID"].cast(IntegerType()))\
    .withColumn("MovieID", personal_ratings_df["MovieID"].cast(IntegerType()))\
    .withColumn("Rating", personal_ratings_df["Rating"].cast(FloatType()))\
    .withColumn("Timestamp", personal_ratings_df["Timestamp"].cast("bigint"))

Movies
root
 |-- MovieID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = true)

Ratings
root
 |-- UserID: string (nullable = true)
 |-- MovieID: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Timestamp: string (nullable = true)

Personal Ratings
root
 |-- UserID: string (nullable = true)
 |-- MovieID: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Timestamp: string (nullable = true)



### Data Pre-Processing
#### 1. Removes row with null values in selected columns

In [18]:
movies_df.na.drop("any")
ratings_df.na.drop(subset=["UserID", "MovieID", "Rating"])

DataFrame[UserID: int, MovieID: int, Rating: int, Timestamp: bigint]

#### 2. Convert unix timestamp data to date/time format

In [19]:
from pyspark.sql.functions import col
from pyspark.sql.functions import from_unixtime

ratings_df = ratings_df.withColumn("Timestamp", from_unixtime(col("Timestamp"), "dd-MM-yyyy HH:mm:ss"))
ratings_df = ratings_df.withColumnRenamed("Timestamp", "Date")

personal_ratings_df = personal_ratings_df.withColumn("Timestamp", from_unixtime(col("Timestamp"), "dd-MM-yyyy HH:mm:ss"))
personal_ratings_df = personal_ratings_df.withColumnRenamed("Timestamp", "Date")


#### 3. Add current user ratings to main ratings file
#### 4. Join movies to ratings file to show titles and genres

In [20]:
# add personal ratings from user input to existing ratings
ratings_df = personal_ratings_df.union(ratings_df)

# join ratings dataframe to movies dataframe
movie_ratings_df = ratings_df.join(movies_df, ["MovieID"])
movie_ratings_df.drop(movies_df.MovieID)

DataFrame[MovieID: int, UserID: int, Rating: float, Date: string, Title: string, Genres: string]

### Standardisation
#### 1.  Normalise data

In [21]:
from pyspark.sql.functions import col

# Divide all ratings values by 5 so each is a value between 0 and 1
movie_ratings_df = movie_ratings_df.withColumn("Rating", col("Rating")/5)
movie_ratings_df.show()

+-------+------+------+-------------------+--------------------+------+
|MovieID|UserID|Rating|               Date|               Title|Genres|
+-------+------+------+-------------------+--------------------+------+
|    148|    53|   1.0|28-12-2000 07:17:06|Awfully Big Adven...| Drama|
|    148|   216|   0.4|15-12-2000 08:53:59|Awfully Big Adven...| Drama|
|    148|   424|   0.8|18-07-2002 15:40:24|Awfully Big Adven...| Drama|
|    148|   482|   0.4|07-12-2000 20:12:34|Awfully Big Adven...| Drama|
|    148|   673|   1.0|30-11-2000 21:47:04|Awfully Big Adven...| Drama|
|    148|   752|   0.8|14-08-2002 08:12:15|Awfully Big Adven...| Drama|
|    148|   840|   0.2|08-12-2000 17:08:58|Awfully Big Adven...| Drama|
|    148|  1069|   0.4|23-11-2000 02:05:35|Awfully Big Adven...| Drama|
|    148|  1150|   0.4|22-11-2000 06:38:26|Awfully Big Adven...| Drama|
|    148|  1242|   0.6|22-11-2000 16:19:36|Awfully Big Adven...| Drama|
|    148|  1605|   0.4|22-11-2000 21:57:01|Awfully Big Adven...|

### Alternating Least Squares Algorithm
#### 1. Create Training and Testing Datasets

In [22]:
# create training and test datasets
(training_data_df, test_data_df) = movie_ratings_df.randomSplit([0.6, 0.4], seed=1234)

print('Training: {0}, test: {1}\n'.format(training_data_df.count(), test_data_df.count()))
training_data_df.show(3)
test_data_df.show(3)

Training: 600559, test: 399659

+-------+------+------+-------------------+--------------------+------+
|MovieID|UserID|Rating|               Date|               Title|Genres|
+-------+------+------+-------------------+--------------------+------+
|    148|   424|   0.8|18-07-2002 15:40:24|Awfully Big Adven...| Drama|
|    148|   482|   0.4|07-12-2000 20:12:34|Awfully Big Adven...| Drama|
|    148|   752|   0.8|14-08-2002 08:12:15|Awfully Big Adven...| Drama|
+-------+------+------+-------------------+--------------------+------+
only showing top 3 rows

+-------+------+------+-------------------+--------------------+------+
|MovieID|UserID|Rating|               Date|               Title|Genres|
+-------+------+------+-------------------+--------------------+------+
|    148|    53|   1.0|28-12-2000 07:17:06|Awfully Big Adven...| Drama|
|    148|   216|   0.4|15-12-2000 08:53:59|Awfully Big Adven...| Drama|
|    148|   673|   1.0|30-11-2000 21:47:04|Awfully Big Adven...| Drama|
+------

#### 2. Build and train model

In [23]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Create ALS and set parameters
als = ALS(maxIter=20, regParam=0.1, userCol="UserID", itemCol="MovieID", ratingCol="Rating",coldStartStrategy="drop",
          implicitPrefs=False, nonnegative=True, rank=2)

# train the model using training dataset
model = als.fit(training_data_df)

#test the model using the training dataset
predictions = model.transform(training_data_df)
regression_eval = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")

# calculate RMSE
rmse = regression_eval.evaluate(predictions)
print("Training - Root Mean Squared Error: {}".format(str(rmse)))

Training - Root Mean Squared Error: 0.20614037522156928


#### 3. Test model

In [24]:
#test the model using the test dataset
predictions = model.transform(test_data_df)
regression_eval = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")

# calculate RMSE
rmse = regression_eval.evaluate(predictions)
print("Testing - Root Mean Squared Error: {}".format(str(rmse)))

Testing - Root Mean Squared Error: 0.20825857531182482


#### 4. Generate predictions

In [25]:
from pyspark.sql.functions import explode

# generate predictions for user 0
user = personal_ratings_df.select(als.getUserCol()).distinct().sort("UserID", ascending=True).limit(1)
personal_recommendations = model.recommendForUserSubset(user, 5)
personal_recommendations.show()

output = personal_recommendations.select(col("UserID"),explode(col("recommendations")).alias("recommendations"))
output = output.withColumn("recommendations", output["recommendations"].getItem("MovieID")).drop("UserID")
output = output.join(movies_df, output.recommendations == movies_df.MovieID, "inner").select(movies_df.Title)

output_list = output.collect()

print("Movies recommended For you:")
i = 1
for movie in output_list:
    print("{i}. {movie}".format(i = i, movie = movie.Title))
    i = i +1

+------+--------------------+
|UserID|     recommendations|
+------+--------------------+
|     0|[{3382, 1.5766106...|
+------+--------------------+

Movies recommended For you:
1. Hour of the Pig, The (1993)
2. Foreign Student (1994)
3. Song of Freedom (1936)
4. JLG/JLG - autoportrait de d�cembre (1994)
5. Mamma Roma (1962)
