In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.Builder().appName('pyspark').master('local[2]').getOrCreate()

In [None]:
def read_csv(path): return spark.read.option('header', 'True').csv(path)

In [None]:
df = read_csv('./public/label-*.csv')

In [None]:
df.count()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [None]:
df.printSchema()

In [None]:
(training_data, test_data) = df.randomSplit([0.8, 0.2])

In [None]:
def to_ranking(row):
    user_id = int(row['user_id'])
    new_rows = [ Row(userId=user_id, movieId=i, rating=float(row['time_slot_{}'.format(str(i))])) for i in range(0, 28)]
    return new_rows

In [None]:
training = spark.createDataFrame(\
    training_data.rdd.flatMap(lambda row: to_ranking(row))
)

test = spark.createDataFrame(\
    test_data.rdd.flatMap(lambda row: to_ranking(row))
)

In [None]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [None]:
predictions.show(10)

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))