In [1]:
import os
# Создаем spark сессию
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as sql_func

spark = (
    SparkSession
    .builder
    .config('spark.driver.memory', '16G')
    .config('spark.sql.analyzer.failAmbiguousSelfJoin', 'False')
    .master("local[*]")
    .getOrCreate()
)

In [2]:
DATA_DIR = 'D:/Datasets/ml-latest-small/'

In [3]:
import pandas as pd
df = pd.read_csv(DATA_DIR + 'ratings.csv')

In [4]:
df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [5]:
# Сначала посмотрим на общее распределение тегов
ratings = (
    spark
    .read
    .csv(
        os.path.join(DATA_DIR, 'ratings.csv'),
        header=True,
        inferSchema=True
    )
    .cache()
)

In [6]:
# Сначала посмотрим на общее распределение тегов
raw_tags = (
    spark
    .read
    .csv(
        os.path.join(DATA_DIR, 'tags.csv'),
        header=True,
        inferSchema=True
    )
    .cache()
)

In [7]:
movies = (
    spark
    .read
    .csv(
        os.path.join(DATA_DIR, 'movies.csv'),
        header=True,
        inferSchema=True
    )
    # если используется меньше памяти,
    # то здесь можно взять не все данные, а небольшую выборку
    # даже при fraction=.01 качественная картина не меняеся
    .withColumn('genres_list', sql_func.split('genres', '\|'))
    .select('movieId', 'title', 'genres_list')
)

In [8]:
# нас ек будет интересовать, какой именно пользователь поставил тег и когда это произошло
tags = (
    raw_tags
    # теги могут различатьтся только регистром
    # поэтому приведём их все к верхнему
    .select(
        sql_func.col("movieId"),
        sql_func.upper(sql_func.col("tag")).alias("tag")
    )
    .groupBy("movieId")
    .agg(sql_func.collect_list("tag").alias("tags_list"))
    .join(movies, "movieId", "right")
    .cache()
)

In [9]:
# объединим теги и жанры в единое пространство текстовых фич
from pyspark.sql.types import ArrayType, StringType

# В Spark нет некоторых полезных функций, но легко можно создать свои (UDF - user defined function)
#  в часности, мы хотим провести все жанры также к верхнему регистру
list_concat = sql_func.udf(
    lambda one_list, another_list:
        [str.upper(elem) for elem in one_list] + (another_list if another_list else []),
    returnType=ArrayType(StringType())
)

content_features = (
    tags
    .select(
        "movieId",
        "title",
        list_concat("genres_list", "tags_list").alias("content_features")
    )
    .cache()
)

In [10]:
# посчитаем частоты встречаемости для тегов для всех фильмов
from pyspark.ml.feature import HashingTF

term_frequencies = HashingTF(
    # от каждого тега будет вычислен хэш
    # и по факту мы будем считать частоты бакетов хэшей, а не для самих тэгов
    numFeatures=1024,
    inputCol="content_features",
    outputCol="term_frequencies"
).transform(content_features).cache()

In [11]:
# теперь сделаем поправку на частоту тегов в целом, чтобы убрать неинформативные теги
# это второй шаг TF-IDF (term frequency, inverted document frequency)
from pyspark.ml.feature import IDF

idf_model = IDF(
    inputCol="term_frequencies",
    outputCol="tf_idf",
    minDocFreq=2
).fit(term_frequencies)
tf_idf = (
    idf_model
    .transform(term_frequencies)
    .select("movieId", "title", "tf_idf")
    .cache()
)

In [12]:
from sklearn.linear_model import ElasticNet
import numpy as np
from pyspark.sql.types import FloatType, ArrayType

def sklearn_lr(spark_x: list, spark_y: list) -> list:
    """
        spark_x: список pyspark.ml.linalg,SparseVector - фичи для регрессии
        spark_y: список занчений целевой переменной регрессии
        return: список коэффициентов регресии
    """
    # переводим данные из формата spark  в формат, удобный sklearn
    numpy_x = np.array([vector.toArray() for vector in spark_x])
    numpy_y = np.array(spark_y).reshape(-1, 1)
    # применяем обычную модель из sklearn
    lr = ElasticNet().fit(numpy_x, numpy_y)
    # возвращаем в ответ плотный вектор коэффициентов регрессии
    return (lr.sparse_coef_.todense().tolist()[0], lr.intercept_.tolist())

# определяем Spark UDF, которая обучит регрессия на своих аргементах
reg_udf = sql_func.udf(
    sklearn_lr,
    returnType=ArrayType(ArrayType(FloatType()))
)

In [13]:
# рабираем полученые данные на обучающую и тестовую выборки
train_data, test_data = ratings.join(tf_idf, "movieId").randomSplit([.8, .2], seed=42)
train_data.cache()
test_data.cache()
# строим для каждого пользователя свою модель регессии
model_coef = (
    train_data
    .groupBy("userId")
    .agg(
        sql_func.collect_list("tf_idf").alias("x"),
        sql_func.collect_list("rating").alias("y")
    )
    .withColumn("model_coef", reg_udf("x", "y"))
    .cache()
)

In [14]:
# на всякий случай сохраним полученные коэффициенты на диск
model_coef.coalesce(1).write.mode("overwrite").parquet(os.path.join(DATA_DIR, "model_coef.parquet"))

In [15]:
model_coef.count()

610

In [16]:
model_coef.show()

+------+--------------------+--------------------+--------------------+
|userId|                   x|                   y|          model_coef|
+------+--------------------+--------------------+--------------------+
|   148|[(1024,[11,88,113...|[4.0, 4.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|   463|[(1024,[89,143,14...|[4.5, 4.0, 4.0, 4...|[[0.0, 0.0, 0.0, ...|
|   471|[(1024,[156,273,4...|[5.0, 3.0, 4.5, 5...|[[0.0, 0.0, 0.0, ...|
|   496|[(1024,[409,668,7...|[1.0, 5.0, 5.0, 4...|[[0.0, 0.0, 0.0, ...|
|   243|[(1024,[263,287,6...|[5.0, 5.0, 4.0, 5...|[[0.0, 0.0, 0.0, ...|
|   392|[(1024,[45,219,75...|[3.0, 1.0, 2.0, 3...|[[0.0, 0.0, 0.0, ...|
|   540|[(1024,[219,263,2...|[3.5, 5.0, 4.5, 3...|[[0.0, 0.0, 0.0, ...|
|    31|[(1024,[836,837,8...|[3.0, 4.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|   516|[(1024,[15,143,14...|[5.0, 4.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|    85|[(1024,[219,668],...|[5.0, 5.0, 5.0, 4...|[[0.0, 0.0, 0.0, ...|
|   137|[(1024,[89,143,14...|[4.0, 5.0, 3.5, 4...|[[0.0, 0.0, 0.

In [17]:
from pyspark.ml.linalg import SparseVector

def lr_apply(x: SparseVector, lr_coef: list) -> float:
    """
        param x: векторр фич для регрессии
        param lr_coef: 
        return предсказанное моделью регрессии значение
    """
    return float(np.array(x).dot(np.array(lr_coef[0])) + lr_coef[1][0])

lr_apply_udf = sql_func.udf(lr_apply, returnType=ArrayType(FloatType()))

In [18]:
from pyspark.sql import DataFrame

def get_prediction(data: DataFrame) -> float:
    return(
        data
        .join(model_coef, "userId")
        .select(
            "userId",
            "rating",
            "movieId",
            "tf_idf",
            lr_apply_udf("tf_idf", "model_coef").alias("prediction"))
        .cache()
    )

In [19]:
train_prediction = get_prediction(train_data)
(
    train_prediction.write.mode("overwrite")
    .parquet(os.path.join(DATA_DIR, "train_prediction.parquet"))
)

In [20]:
train_prediction.show()

+------+------+-------+--------------------+----------+
|userId|rating|movieId|              tf_idf|prediction|
+------+------+-------+--------------------+----------+
|     1|   4.0|      1|(1024,[156,273,42...|      null|
|     5|   4.0|      1|(1024,[156,273,42...|      null|
|    15|   2.5|      1|(1024,[156,273,42...|      null|
|    17|   4.5|      1|(1024,[156,273,42...|      null|
|    18|   3.5|      1|(1024,[156,273,42...|      null|
|    21|   3.5|      1|(1024,[156,273,42...|      null|
|    31|   5.0|      1|(1024,[156,273,42...|      null|
|    32|   3.0|      1|(1024,[156,273,42...|      null|
|    33|   3.0|      1|(1024,[156,273,42...|      null|
|    40|   5.0|      1|(1024,[156,273,42...|      null|
|    44|   3.0|      1|(1024,[156,273,42...|      null|
|    45|   4.0|      1|(1024,[156,273,42...|      null|
|    46|   5.0|      1|(1024,[156,273,42...|      null|
|    50|   3.0|      1|(1024,[156,273,42...|      null|
|    54|   3.0|      1|(1024,[156,273,42...|    

In [21]:
def evaluate_prediction(prediction: DataFrame) -> float:
    return np.sqrt(
        prediction
        .selectExpr("""
        CASE
            WHEN prediction > 5. THEN 5.
            WHEN prediction < .5 THEN .5
            ELSE prediction
        END AS prediction
        """, "rating")
        .select(
            sql_func.pow(sql_func.col("rating") - sql_func.col("prediction"), 2)
            .alias("squared error")
        )
        .agg(sql_func.avg("squared_error"))
        .first()[0]
    )

In [22]:
evaluate_prediction(train_prediction)

AnalysisException: cannot resolve '(`prediction` > 5BD)' due to data type mismatch: differing types in '(`prediction` > 5BD)' (array<float> and decimal(1,0)).; line 3 pos 17;
'Project [CASE WHEN (prediction#3302 > 5) THEN 5 WHEN (prediction#3302 < 0.5) THEN 0.5 ELSE prediction#3302 END AS prediction#3660, rating#18]
+- Project [userId#16, rating#18, movieId#17, tf_idf#1440, lr_apply(tf_idf#1440, model_coef#1814) AS prediction#3302]
   +- Project [userId#16, movieId#17, rating#18, timestamp#19, title#105, tf_idf#1440, x#1807, y#1809, model_coef#1814]
      +- Join Inner, (userId#16 = userId#3288)
         :- Sample 0.0, 0.8, false, 42
         :  +- Sort [movieId#17 ASC NULLS FIRST, userId#16 ASC NULLS FIRST, rating#18 ASC NULLS FIRST, timestamp#19 ASC NULLS FIRST, title#105 ASC NULLS FIRST, tf_idf#1440 ASC NULLS FIRST], false
         :     +- Project [movieId#17, userId#16, rating#18, timestamp#19, title#105, tf_idf#1440]
         :        +- Join Inner, (movieId#17 = movieId#104)
         :           :- Relation[userId#16,movieId#17,rating#18,timestamp#19] csv
         :           +- Project [movieId#104, title#105, tf_idf#1440]
         :              +- Project [movieId#104, title#105, content_features#214, term_frequencies#295, UDF(term_frequencies#295) AS tf_idf#1440]
         :                 +- Project [movieId#104, title#105, content_features#214, UDF(content_features#214) AS term_frequencies#295]
         :                    +- Project [movieId#104, title#105, <lambda>(genres_list#110, tags_list#124) AS content_features#214]
         :                       +- Project [movieId#104, tags_list#124, title#105, genres_list#110]
         :                          +- Join RightOuter, (movieId#61 = movieId#104)
         :                             :- Aggregate [movieId#61], [movieId#61, collect_list(tag#118, 0, 0) AS tags_list#124]
         :                             :  +- Project [movieId#61, upper(tag#62) AS tag#118]
         :                             :     +- Relation[userId#60,movieId#61,tag#62,timestamp#63] csv
         :                             +- Project [movieId#104, title#105, genres_list#110]
         :                                +- Project [movieId#104, title#105, genres#106, split(genres#106, \|, -1) AS genres_list#110]
         :                                   +- Relation[movieId#104,title#105,genres#106] csv
         +- Project [userId#3288, x#1807, y#1809, sklearn_lr(x#1807, y#1809) AS model_coef#1814]
            +- Aggregate [userId#3288], [userId#3288, collect_list(tf_idf#1440, 0, 0) AS x#1807, collect_list(rating#3290, 0, 0) AS y#1809]
               +- Sample 0.0, 0.8, false, 42
                  +- Sort [movieId#3289 ASC NULLS FIRST, userId#3288 ASC NULLS FIRST, rating#3290 ASC NULLS FIRST, timestamp#3291 ASC NULLS FIRST, title#105 ASC NULLS FIRST, tf_idf#1440 ASC NULLS FIRST], false
                     +- Project [movieId#3289, userId#3288, rating#3290, timestamp#3291, title#105, tf_idf#1440]
                        +- Join Inner, (movieId#3289 = movieId#104)
                           :- Relation[userId#3288,movieId#3289,rating#3290,timestamp#3291] csv
                           +- Project [movieId#104, title#105, tf_idf#1440]
                              +- Project [movieId#104, title#105, content_features#214, term_frequencies#295, UDF(term_frequencies#295) AS tf_idf#1440]
                                 +- Project [movieId#104, title#105, content_features#214, UDF(content_features#214) AS term_frequencies#295]
                                    +- Project [movieId#104, title#105, <lambda>(genres_list#110, tags_list#124) AS content_features#214]
                                       +- Project [movieId#104, tags_list#124, title#105, genres_list#110]
                                          +- Join RightOuter, (movieId#61 = movieId#104)
                                             :- Aggregate [movieId#61], [movieId#61, collect_list(tag#118, 0, 0) AS tags_list#124]
                                             :  +- Project [movieId#61, upper(tag#62) AS tag#118]
                                             :     +- Relation[userId#60,movieId#61,tag#62,timestamp#63] csv
                                             +- Project [movieId#104, title#105, genres_list#110]
                                                +- Project [movieId#104, title#105, genres#106, split(genres#106, \|, -1) AS genres_list#110]
                                                   +- Relation[movieId#104,title#105,genres#106] csv


In [None]:
test_prediction = get_prediction(test_data)
(
    test_prediction.write.mode("overwrite")
    .parquet(os.path.join(DATA_DIR, "test_prediction.parquet"))
)

In [None]:
evaluate_prediction(test_prediction)

In [None]:
movies.where("title LIKE 'Top gun%'").select("title").toPandas()