Задача № 1:  3

Вариант 3. Thriller, Sci-Fi, Adventure

Один фильм может принадлежать разным жанрам

In [12]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import mean, lit, sum, count
from pyspark.ml.evaluation import RegressionEvaluator

sc = SparkContext.getOrCreate()
sqlc = SQLContext(sc)

small_ratings_path = "/user/cloudera/hw2/task1/small/ratings.csv"

df_small_ratings = sqlContext.read.load(small_ratings_path, 
                          format="com.databricks.spark.csv", 
                          header="true", 
                          inferSchema="true", sep=",")
df_small_ratings.persist()
df_small_ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [13]:
df_train, df_test = df_small_ratings.randomSplit([0.8, 0.2], seed=12)
df_train.persist(); df_test.persist()

df_train.count(), df_test.count()

(80555, 20281)

In [17]:
mean_movie_rating = df_train.select(mean("rating").alias("avr")).collect()[0]["avr"]
print(f'Mean rating: {small_mean_movie_rating}\n')

Mean rating: 3.501936565079759



In [18]:
rmse_evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

df_test_with_prediction = df_test.withColumn('prediction', F.lit(mean_movie_rating))
df_test_with_prediction.show(5)

print(f'RMSE: {rmse_evaluator.evaluate(df_test_with_prediction)}\n')

+------+-------+------+---------+-----------------+
|userId|movieId|rating|timestamp|       prediction|
+------+-------+------+---------+-----------------+
|     1|     70|   3.0|964982400|3.501936565079759|
|     1|    157|   5.0|964984100|3.501936565079759|
|     1|    163|   5.0|964983650|3.501936565079759|
|     1|    260|   5.0|964981680|3.501936565079759|
|     1|    441|   4.0|964980868|3.501936565079759|
+------+-------+------+---------+-----------------+
only showing top 5 rows

RMSE: 1.0374281641248655



In [20]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Параметры
ranks = (5, 10, 15)  # Kоличество факторов
regParams = (0.001, 0.01, 0.1, 1, 10)  # Pегуляризация
kfolds = 4

als = ALS(
    seed=123,
    maxIter=10,
    numUserBlocks=10,
    numItemBlocks=10,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop',
)
paramsGrid = (
    ParamGridBuilder()
        .addGrid(als.rank, ranks)
        .addGrid(als.regParam, regParams)
        .build()
)
cross_validator = CrossValidator(estimator=als,
                                 estimatorParamMaps=paramsGrid,
                                 evaluator=rmse_evaluator,
                                 numFolds=kfolds)

def calculate_rmse_for_best_als_model(df_train, df_test):
    cv_model = cross_validator.fit(df_train)
    print(f'Best rank: {cv_model.bestModel.rank}')
    # FIXME(a.telyshev): py4j.Py4JException: Method getRegParam([]) does not exist
    # print(f'Best regularization: {cv_model.bestModel._java_obj.getRegParam()}')
    
    predictions = cv_model.transform(df_test)
    predictions.show(5)
    
    return rmse_evaluator.evaluate(predictions)

In [21]:
print('Small data processing...')
print(f'RMSE: {calculate_rmse_for_best_als_model(df_train, df_test)}\n')

Small data processing...
Best rank: 5
+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   133|    471|   4.0| 843491793| 2.5265322|
|   182|    471|   4.5|1054779644| 4.1597166|
|   218|    471|   4.0|1111624874| 3.3507972|
|   474|    471|   3.0| 974668858| 2.8759074|
|   387|    471|   3.0|1139047519| 3.3282547|
+------+-------+------+----------+----------+
only showing top 5 rows

RMSE: 0.8828629996870051

