In [16]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
# install PySpark for this notebook

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!rm "spark-3.0.1-bin-hadoop2.7.tgz"
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"
import findspark
findspark.init("/content/spark-3.0.1-bin-hadoop2.7")

In [18]:
# !rm -rf "spark-3.0.1-bin-hadoop2.7"

In [19]:
from pyspark.sql import Row
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating

import numpy as np
import pandas as pd

In [20]:
PROJECT_DIR = "/content/drive/My Drive/Project_EnsembleLearning/"

In [21]:
# create spark context

sc = SparkContext.getOrCreate(SparkConf())
sc.setCheckpointDir(PROJECT_DIR + "spark_checkpoint_dir")

In [22]:
# user defined function to parse the loaded file line by file

def parseline(line):
    fields = line.split(',')
    userId = fields[0]
    movieId = fields[1]
    rating = fields[2]
    return (userId, movieId, rating)

In [23]:
# load training dataset

lines = sc.textFile(PROJECT_DIR + 'dataset_split/training_set.csv')
parsedlines = lines.map(parseline)
header = parsedlines.first()

# remove the header and make sure the rest of the parsed lines are correct
parsedlines = parsedlines.filter(lambda line: line != header)

training_set = parsedlines.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])) ).cache()
training_set.take(5)

[Rating(user=1, product=481, rating=3.5),
 Rating(user=1, product=1591, rating=1.5),
 Rating(user=1, product=2478, rating=4.0),
 Rating(user=1, product=2840, rating=3.0),
 Rating(user=1, product=3698, rating=3.5)]

In [24]:
# load validation dataset

lines = sc.textFile(PROJECT_DIR + 'dataset_split/validation_set.csv')
parsedlines = lines.map(parseline)
header = parsedlines.first()

# remove the header and make sure the rest of the parsed lines are correct
parsedlines = parsedlines.filter(lambda line: line != header)

validation_set_with_true_ratings = parsedlines.map(
    lambda l: (int(l[0]), int(l[1]), float(l[2]))).cache()

validation_set = validation_set_with_true_ratings.map(lambda t: (t[0], t[1]))

In [25]:
# Training the model using Alternating Least Squares

rank = 5
numIterations = 100
model = ALS.train(training_set, rank, numIterations)

In [26]:
# predict ratings for the validation set using the trained ALS model

predictions = model.predictAll(validation_set).collect()

In [27]:
predictions[0]

Rating(user=9591, product=32196, rating=4.228439292409687)

In [28]:
df_validate = pd.DataFrame(validation_set_with_true_ratings.collect())

df_result_als = pd.DataFrame(predictions)
df_result_als.columns = ['userId', 'movieId', 'predicted_rating']
              
df_result_als['rating'] = df_validate.iloc[:, 2]

df_result_als.head()

Unnamed: 0,userId,movieId,predicted_rating,rating
0,9591,32196,4.228439,3.5
1,5219,32196,2.83522,2.5
2,4796,91902,2.468013,4.5
3,4979,91902,1.387823,3.5
4,3846,68522,5.526903,4.5


In [29]:
# save prediction to file

df_result_als.to_csv(
    PROJECT_DIR + 'predictions/prediction_als.csv', index=False)