In [1]:
import numpy as np, pandas as pd

df_full = pd.read_csv("ratings.dat", delimiter="::", engine="python", header=None)
df_train = pd.read_csv("train.csv")
df_test = pd.read_csv("test.csv")

df_full.columns = ["UserId", "ItemId", "Rating", "Timestamp"]
df_full = df_full.drop("Timestamp", axis=1)
df_full["UserId"], _ = pd.factorize(df_full["UserId"])
df_full["ItemId"], _ = pd.factorize(df_full["ItemId"])
df_train["UserId"], users_train = pd.factorize(df_train["UserId"])
df_train["ItemId"], items_train = pd.factorize(df_train["ItemId"])
df_test["UserId"] = pd.Categorical(df_test["UserId"], users_train).codes
df_test["ItemId"] = pd.Categorical(df_test["ItemId"], items_train).codes

### Spark does not perform mean centering, so it has to be done manually
df_full["Rating"] -= df_full["Rating"].mean()
train_mean = df_train["Rating"].mean()
df_train["Rating"] -= train_mean
df_test["Rating"] -= train_mean

print(df_full.shape)
print(df_train.shape)
print(df_test.shape)

(10000054, 3)
(8000043, 3)
(1999975, 3)


In [2]:
import findspark ### https://github.com/minrk/findspark
findspark.init('spark-3.1.1-bin-hadoop2.7/')

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS

spark = (
    SparkSession
        .builder
        .config("spark.driver.memory", "8g")
        .getOrCreate()
)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [3]:
df_full_spark = spark.createDataFrame(df_full)
df_train_spark = spark.createDataFrame(df_train)
df_test_spark = spark.createDataFrame(df_test)

als = ALS(rank=50, seed=1, maxIter=15, regParam=0.05,
          userCol='UserId', itemCol='ItemId', ratingCol='Rating',
          checkpointInterval=20)

In [4]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

In [5]:
%%time
model = als.fit(df_full_spark)

CPU times: user 15.1 ms, sys: 4.44 ms, total: 19.6 ms
Wall time: 1min 21s


In [6]:
def print_rmse(errors):
    rmse = np.sqrt(np.mean(np.array(errors) ** 2))
    print("RMSE: %f" % rmse)

model = als.fit(df_train_spark)
errors = (
    model
        .transform(df_test_spark)
        .rdd
        .map(lambda x: x.Rating - x.prediction)
        .collect()
)
print_rmse(errors)

RMSE: 0.791316
