#### Читаем данные:

In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

 val recipes = spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load("recipes.csv")

 val interactions = spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load("interactions.csv")

Intitializing Scala interpreter ...

Spark Web UI available at http://7024ae5bbc52:4040
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1604926017613)
SparkSession available as 'spark'


import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@35157d6d
recipes: org.apache.spark.sql.DataFrame = [name: string, id: int ... 10 more fields]
interactions: org.apache.spark.sql.DataFrame = [user_id: int, recipe_id: int ... 3 more fields]


0. Посмотрите на данные

In [2]:
recipes.printSchema

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: string (nullable = true)



In [3]:
recipes.show(1)

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
only showing top 1 row



In [4]:
interactions.printSchema

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)



In [5]:
interactions.show(5)

+-------+---------+----------+------+--------------------+
|user_id|recipe_id|      date|rating|              review|
+-------+---------+----------+------+--------------------+
|  38094|    40893|2003-02-17|     4|Great with a sala...|
|1293707|    40893|2011-12-21|     5|So simple, so del...|
|   8937|    44394|2002-12-01|     4|This worked very ...|
| 126440|    85009|2010-02-27|     5|I made the Mexica...|
|  57222|    85009|2011-10-01|     5|Made the cheddar ...|
+-------+---------+----------+------+--------------------+
only showing top 5 rows



1. Найдите пользователя с максимальным количеством отзывов

In [6]:
val max_reviews_user = interactions
    .groupBy("user_id")
    .agg(count("*") as "reviews_count")
    .orderBy(-$"reviews_count")
    .take(1)

max_reviews_user: Array[org.apache.spark.sql.Row] = Array([424680,7671])


2. Найдите пользователя, который потратил больше всех времени на готовку (придумайте как это примерно посчитать с имеющимися данными)

В данных есть рецепт с id, равным 261647, и временем изготовления в 2147483647 минут (максимальный Int), что кажется странным. Поэтому отфильтруем этот рецепт. Будем считать, что пользователь суммарное потраченное пользователем время на готовку равно сумме всех времен на готовку из рецептов, по которым пользователь оставлял оценки или отзывы (из таблички interactions).

In [7]:
val recipesWithTime = recipes
    .filter($"id" !== 261647)
    .select($"id" as "recipe_id", $"minutes")

val max_cooking_time_user = interactions
    .select($"user_id", $"recipe_id")
    .join(recipesWithTime, Seq("recipe_id"))
    .groupBy("user_id")
    .agg(sum($"minutes") as "total_cooking_time")
    .orderBy(-$"total_cooking_time")
    .take(1)

recipesWithTime: org.apache.spark.sql.DataFrame = [recipe_id: int, minutes: int]
max_cooking_time_user: Array[org.apache.spark.sql.Row] = Array([1072593,1296802])


3. Найдите пользователя, который дольше всего пользуется сайтом (придумайте как это примерно посчитать с имеющимися данными)


Не до конца понятно, что подразумевается под наиболее долгим пользованием сайтом. Посмотрим на самые ранние добавления рецептов от пользователей и на самые ранние взаимодействия пользователей (по таблицам recipes и interactions соответственно). Будем считать, что кто раньше всех "оставил свой след", тот и дольше всего пользуется сайтом.

In [8]:
val earliest_contributions = recipes
    .select($"contributor_id", to_timestamp($"submitted") as "date")
    .groupBy("contributor_id")
    .agg(min($"date") as "earliest_contribution")
    .orderBy($"earliest_contribution")
    .show(10)

+--------------+---------------------+
|contributor_id|earliest_contribution|
+--------------+---------------------+
|          1571|  1999-08-06 00:00:00|
|          1543|  1999-08-06 00:00:00|
|          1533|  1999-08-06 00:00:00|
|          1580|  1999-08-06 00:00:00|
|        115621|  1999-08-06 00:00:00|
|          1535|  1999-08-06 00:00:00|
|         39547|  1999-08-07 00:00:00|
|          1587|  1999-08-07 00:00:00|
|          1604|  1999-08-07 00:00:00|
|        174711|  1999-08-08 00:00:00|
+--------------+---------------------+
only showing top 10 rows



earliest_contributions: Unit = ()


In [9]:
val earliest_interactions = interactions
    .select($"user_id", to_timestamp($"date") as "date")
    .groupBy("user_id")
    .agg(min($"date") as "earliest_interaction")
    .orderBy($"earliest_interaction")
    .show(10)

+-------+--------------------+
|user_id|earliest_interaction|
+-------+--------------------+
|   2008| 2000-01-25 00:00:00|
|   2046| 2000-02-25 00:00:00|
|   1773| 2000-03-13 00:00:00|
|   2156| 2000-06-02 00:00:00|
|   1986| 2000-08-26 00:00:00|
|   2585| 2000-09-05 00:00:00|
|   2625| 2000-09-11 00:00:00|
|   2312| 2000-09-12 00:00:00|
|   2536| 2000-09-12 00:00:00|
|   2033| 2000-09-20 00:00:00|
+-------+--------------------+
only showing top 10 rows



earliest_interactions: Unit = ()


Видно, что самые ранние добавления рецептов были раньше, чем самые ранние взаимодействия, поэтому результат будет таковым:

In [10]:
val longest_time_user = recipes
    .select($"contributor_id", to_timestamp($"submitted") as "date")
    .groupBy("contributor_id")
    .agg(min($"date") as "earliest_contribution")
    .orderBy($"earliest_contribution")
    .take(1)

longest_time_user: Array[org.apache.spark.sql.Row] = Array([1580,1999-08-06 00:00:00.0])


4. Найдите пользователя, который берёт только новые рецепты (придумайте как это примерно посчитать с имеющимися данными)

Будем считать определенную дату за барьер, после которого рецепты считаются новыми. А сравнивать пользователей будем по отношению новых рецептов ко всем, которыми он "пользовался" (оставил отзыв, т.е. провзаимодейстовал).

In [11]:
recipes.printSchema
interactions.printSchema

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: string (nullable = true)

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)



In [12]:
val usersRecipes = interactions
    .select($"user_id" as "interaction_user_id", $"recipe_id")

val result = recipes
    .select($"id" as "recipe_id", to_timestamp($"submitted") as "submitted")
    .join(usersRecipes, Seq("recipe_id"))
    .groupBy("interaction_user_id")
    .agg(
        count("*") as "total_interacted_recipes",
        sum(when(datediff($"submitted", to_timestamp(lit("2018-01-01"))) > 1, lit(1)).otherwise(lit(0))) as "new_recipes"
    )
    .orderBy(-($"new_recipes".cast("double") / $"total_interacted_recipes"))
    .show()

+-------------------+------------------------+-----------+
|interaction_user_id|total_interacted_recipes|new_recipes|
+-------------------+------------------------+-----------+
|         2002276513|                       1|          1|
|            2389450|                       1|          1|
|         2002026576|                       1|          1|
|         2002326807|                       1|          1|
|         2002053137|                       1|          1|
|         2002259490|                       1|          1|
|         2001842075|                       1|          1|
|         1802507836|                       1|          1|
|         2002277099|                       1|          1|
|         2001936405|                       1|          1|
|         2001900143|                       1|          1|
|         2002162063|                       1|          1|
|         2002203938|                       1|          1|
|         2002225247|                       1|          

usersRecipes: org.apache.spark.sql.DataFrame = [interaction_user_id: int, recipe_id: int]
result: Unit = ()


Видно, что в топе полученной таблицы в основном пользователи с одним отзывом, который сделан на новый рецепт. Наверное возможно использовать какой-то другой способ сравнения, но я выбрал этот.

5. Найдите пользователя, который выбирает самые сложные рецепты. Метрика: 0.2 * (медианное количество шагов в выбранном рецепте) + 0.3 * (среднее количество ингредиентов) + (средняя длина описания)

Будем считать среднюю метрику по всем рецептам пользователя для решения задачи.

In [13]:
val recipesExtended = recipes
    .withColumn("median_steps", callUDF("percentile_approx", col("n_steps"), lit(0.5)).over(Window.partitionBy($"name")))
    .withColumn("avg_ingredients", avg($"n_ingredients".cast("int")).over(Window.partitionBy($"name")))
    .withColumn("avg_desc_len", avg(length($"description")).over(Window.partitionBy($"name")))

recipesExtended: org.apache.spark.sql.DataFrame = [name: string, id: int ... 13 more fields]


In [14]:
val userComplicatedRecipes = recipesExtended
    .select($"id" as "recipe_id", $"name", $"median_steps", $"avg_ingredients", $"avg_desc_len")
    .join(usersRecipes, Seq("recipe_id"))
    .withColumn("complexity_metric", 
                lit(0.2) * when($"median_steps".isNull, lit(0)).otherwise($"median_steps")
                + lit(0.3) * when($"avg_ingredients".isNull, lit(0)).otherwise($"avg_ingredients")
                + when($"avg_desc_len".isNull, lit(0)).otherwise($"avg_desc_len")
    )
    .groupBy("interaction_user_id")
    .agg(avg($"complexity_metric") as "avg_complexity")
    .orderBy(-$"avg_complexity")
    .take(1)

userComplicatedRecipes: Array[org.apache.spark.sql.Row] = Array([1802742723,6323.4])


6. Найдите количество пользователей, которые выбирают наиболее разнообразную пищу. 2 вариации:
    1. На основании тэгов
    2. На основании лемматизированных описаний и TF-IDF

`A.` Считаем, что чем больше различных тэгов использует пользователь, тем более разнообразную пищу он выбирает.

In [73]:
val unqTags = recipes
    .select($"id" as "recipe_id", explode(split(expr("substring(tags, 2, length(tags) - 1)"), ",")) as "tags")
    .join(interactions.select("user_id", "recipe_id"), Seq("recipe_id"))
    .groupBy("user_id")
    .agg(countDistinct($"tags") as "n_unq_tags")
    .orderBy(-$"n_unq_tags")
    .cache

val maxNumTags = unqTags.select(max("n_unq_tags")).take(1)(0)(0)

unqTags: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, n_unq_tags: bigint]
maxNumTags: Any = 643


In [74]:
unqTags
    .withColumn("max_num_tags", lit(maxNumTags))
    .filter($"n_unq_tags" === $"max_num_tags")
    .show

+-------+----------+------------+
|user_id|n_unq_tags|max_num_tags|
+-------+----------+------------+
| 424680|       643|         643|
+-------+----------+------------+



7. 


A. Найдите медианное значение калорийности блюд, приготовленных пользователями с более чем 5-ю отзывами

B. В качестве веса в медиане используйте количество отзывов на блюдо. Взвешенную медиану необходимо реализовать через оконные функции

8. Найдите слова в отзыве, которые наиболее коррелируют с хорошим рейтингом (4-5). Отфильтруйте слишком редкие слова

In [84]:
val min_freq = 150

val wordsRatings = interactions
    .select($"rating", explode(split(lower(regexp_replace($"review", "[^a-zA-Z ]", "")), " ")) as "word")
    .filter($"word" !== "")
    .withColumn("cnt_word", count("*") over Window.partitionBy($"word"))
    .filter($"cnt_word" >= min_freq && $"rating" >= 4)
    .withColumn("cnt_word_with_high_rating", count("*") over Window.partitionBy($"word"))
    .drop("rating")
    .distinct
    .select($"word", $"cnt_word_with_high_rating" / $"cnt_word" as "freq_in_high_rating", $"cnt_word")
    .orderBy(-$"freq_in_high_rating")
    .show

+-------------------+-------------------+--------+
|               word|freq_in_high_rating|cnt_word|
+-------------------+-------------------+--------+
|          honkitten|                1.0|     348|
|         keepermade|                1.0|     177|
|           gailanng|                1.0|     222|
|      perfectthanks|                1.0|     177|
|wholemealwholegrain|                1.0|     187|
|           adoptees|  0.998158379373849|    1086|
|         aussiekiwi| 0.9960474308300395|     253|
|          againmade| 0.9957264957264957|     234|
|             boomie| 0.9956709956709957|     231|
|          travelers| 0.9955555555555555|     225|
|          recipenap| 0.9952380952380953|     210|
|               pets| 0.9951923076923077|     208|
|               kitz|  0.995049504950495|     202|
|              katie| 0.9948717948717949|     195|
|            dragons| 0.9942528735632183|     174|
|            witchin| 0.9938650306748467|     163|
|           nibbling| 0.9937888

min_freq: Int = 150
wordsRatings: Unit = ()


9. Найдите теги, блюда с которыми имеют в среднем худшие отзывы

In [75]:
val tagsWorstReviews = recipes
    .select($"id" as "recipe_id", explode(split(expr("substring(tags, 2, length(tags) - 1)"), ", ")) as "tag")
    .withColumn("tag", trim($"tag", "' []"))
    .join(interactions.select("recipe_id", "rating"), Seq("recipe_id"))
    .groupBy("tag")
    .agg(avg("rating") as "avg_rating")
    .orderBy("avg_rating")
    .show

+--------------------+------------------+
|                 tag|        avg_rating|
+--------------------+------------------+
|    shrimp-main-dish|               0.0|
|lamb-sheep-main-dish|               0.0|
|           pot-roast|               0.0|
|   main-dish-seafood|               0.0|
|      beef-crock-pot|               0.0|
|           bean-soup|               3.0|
|      main-dish-beef|               3.0|
|     black-bean-soup|               3.0|
|       stews-poultry|3.3333333333333335|
|                bear|3.6666666666666665|
|    pressure-canning|3.6742424242424243|
|       sugar-cookies| 3.707070707070707|
|            honduran|3.7982456140350878|
| unprocessed-freezer| 3.824915824915825|
|fillings-and-fros...|3.9347826086956523|
|            birthday| 3.944741532976827|
|             jellies|3.9685714285714284|
|          water-bath|3.9867256637168142|
|served-hot-new-years|               4.0|
|      pork-crock-pot|               4.0|
+--------------------+------------

tagsWorstReviews: Unit = ()


10. Найдите contributor'а, который прислал:
    1. Наиболее используемый рецепт
    2. Наиболее высоко оцененный рецепт

`A.`

In [64]:
val mostUsedRecipe = interactions
    .groupBy("recipe_id")
    .agg(count("*") as "num_of_uses")
    .orderBy(-$"num_of_uses")
    .take(1)

mostUsedRecipe: Array[org.apache.spark.sql.Row] = Array([2886,1613])


In [65]:
recipes
    .filter($"id" === mostUsedRecipe(0)(0))
    .select($"contributor_id", $"id" as "recipe_id")
    .show()

+--------------+---------+
|contributor_id|recipe_id|
+--------------+---------+
|          1762|     2886|
+--------------+---------+



`B.` В данных много рецептов, в которых средний рейтинг рецепта равен 5.0 (их больше 100000). Поэтому еще посчитаем количество отзывов на рецепты, отфильтруем те, у которых рейтинг равен 5.0, и будем считать наиболее высоко оцененным рецептом тот, у которого рейтинг 5.0 и максимальное количество отзывов.

In [66]:
val mostRatedRecipe = interactions
    .groupBy("recipe_id")
    .agg(avg($"rating") as "avg_rating", count("*") as "num_of_uses")
    .filter($"avg_rating" === 5.0)
    .orderBy(-$"num_of_uses")
    .take(1)

mostRatedRecipe: Array[org.apache.spark.sql.Row] = Array([55309,5.0,52])


In [67]:
recipes
    .filter($"id" === mostRatedRecipe(0)(0))
    .select($"contributor_id", $"id" as "recipe_id")
    .show()

+--------------+---------+
|contributor_id|recipe_id|
+--------------+---------+
|         63098|    55309|
+--------------+---------+



11. Опишите процесс работы broadcast join

Насколько я понял, broadcast нужен для того, чтобы можно было эффективно выполнять join-ы больших датафреймов с небольшими. То есть Spark может распространить маленький датафрейм на все ноды кластера, а затем уже join может быть выполнен более эффективно на каждом из executor-ов между соответствующими партициями большого датафрейма и всем маленьким датафреймом (тогда требуется минимальный shuffle данных).

12. Опишите процесс работы cache и причины его использования

Cache нужен для переиспользования вычислений, которые предшествуют текущему. Потенциально это может ускорить другие запросы, использующие эти же данные. В DataFrame API есть два метода `cache()` и `persist()`, которые могут использоваться для кэширования. Различие в том, что во втором методе есть возможность выставить storageLevel, который будет указывать, где хранить эти данные (дефолтное значение — MEMORY_AND_DISK). Кэширование является lazy transformation, поэтому сразу после вызова функции с данными ничего не происходит, но query plan обновится с помощью добавления оператора InMemoryRelation. Spark будет искать данные из кэша и считывать их оттуда, если они доступны. Если он не найдет данные в кэше (например, при первом запуске запроса), то он сохранит эти данные там и будет использовать их сразу же после этого.