SIMPLE TESTS

In [2]:
import os

os.environ["PYSPARK_PYTHON"] = "python"

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master("local[*]")\
    .getOrCreate()

In [4]:
from pyspark.sql.functions import col

csv_anime = spark.read.format("csv")\
    .option("header", "true")\
    .option("escape", "\"")\
    .option("inferSchema", "true")\
    .load("../csv/data.csv")\
    .withColumn("score", col("score").cast("double"))

In [5]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler()\
    .setInputCols([col for col in csv_anime.columns if col != "score"])\
    .setOutputCol("features")

data_df = assembler\
    .transform(csv_anime)\
    .select("features", "score")

In [6]:
train_df, test_df = data_df.randomSplit([0.8, 0.2])

In [76]:
from pyspark.ml.regression import (
    DecisionTreeRegressor,
    RandomForestRegressor,
    GBTRegressor,
)

print("Decision Tree Regressor")
model_tree = DecisionTreeRegressor()\
    .setLabelCol("score")\
    .setFeaturesCol("features")\
    .fit(train_df)

print("Random Forest Regressor")
model_random_forest = RandomForestRegressor()\
    .setLabelCol("score")\
    .setFeaturesCol("features")\
    .setNumTrees(10)\
    .fit(train_df)

print("Gradient Boosted Tree Regressor")
model_gbt = GBTRegressor()\
    .setLabelCol("score")\
    .setFeaturesCol("features")\
    .fit(train_df)


In [77]:
predictions_tree = model_tree.transform(test_df)
predictions_random_forest = model_random_forest.transform(test_df)
predictions_gbt = model_gbt.transform(test_df)

In [78]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="score",
    predictionCol="prediction",
    metricName="rmse",
)

error_tree = evaluator.evaluate(predictions_tree)
print(f"Decision tree: rmse = {error_tree}")

error_random_forest = evaluator.evaluate(predictions_random_forest)
print(f"Random forest: rmse = {error_random_forest}")

error_gbt = evaluator.evaluate(predictions_gbt)
print(f"Gradient Boosted Tree: rmse = {error_gbt}")

Decision tree: rmse = 3.044417074181475
Random forest: rmse = 3.0435001772367203
Gradient Boosted Tree: rmse = 2.9433101966278787


MATRIX FACTORIZATION

In [10]:
from pyspark.mllib.recommendation import ALS, Rating

from pyspark.sql.functions import col

csv_anime = spark.read.format("csv")\
    .option("header", "true")\
    .option("escape", "\"")\
    .option("inferSchema", "true")\
    .load("../csv/data2.csv")\
    .withColumn("rating", col("rating").cast("double"))

In [20]:
ratings = csv_anime.rdd.map(lambda row: Rating(row.user, row.product, row.rating))

In [22]:
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

In [24]:
testdata = ratings.map(lambda p: (p[0], p[1]))

In [27]:
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))

In [30]:
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

In [31]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()

In [32]:
import math

RMSE = math.sqrt(MSE)

In [33]:
print("RMSE: " + str(RMSE))

RMSE: 0.30479637608477866
