# 1. Spark application

## Запускаем спарк-сессию (SparkSession) с мастером YARN и 2-мя экзекьюторами

In [5]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
!hdfs dfsadmin -safemode leave

Safe mode is OFF


In [6]:
conf = SparkConf().set("spark.executor.instances", "2")

In [7]:
spark = SparkSession.builder.config(conf=conf).master("yarn").appName("samoilenko_spark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/07 15:08:39 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


#### YARN

![YARN](../images/YARN.png)

#### Spark application UI

![Spark_UI](../images/Spark_UI.png)

## Читаем таблицы ratings, tags и отображаем количество строчек

In [9]:
!hdfs dfs -rm -r ml-latest-small
!hdfs dfs -put ml-latest-small .

rm: `ml-latest-small': No such file or directory


In [10]:
from pyspark.sql.types import *

In [148]:
ratings_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", LongType()),
])

tags_schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("tag", StringType()),
    StructField("timestamp", LongType()),
])

In [149]:
ratings_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(ratings_schema)\
    .load("ml-latest-small/ratings.csv")

tags_df = spark\
    .read\
    .format("csv")\
    .option("header", "True")\
    .schema(tags_schema)\
    .load("ml-latest-small/tags.csv")

In [27]:
import pyspark.sql.functions as f

In [150]:
ratings_df = ratings_df.withColumn('timestamp', f.from_unixtime(ratings_df.timestamp))
tags_df = tags_df.withColumn('timestamp', f.from_unixtime(tags_df.timestamp))

In [24]:
ratings_df.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 22:45:03|
|     1|      3|   4.0|2000-07-30 22:20:47|
|     1|      6|   4.0|2000-07-30 22:37:04|
|     1|     47|   5.0|2000-07-30 23:03:35|
|     1|     50|   5.0|2000-07-30 22:48:51|
+------+-------+------+-------------------+


In [16]:
tags_df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+------+-------+---------------+-------------------+
|userId|movieId|            tag|          timestamp|
+------+-------+---------------+-------------------+
|     2|  60756|          funny|2015-10-24 22:29:54|
|     2|  60756|Highly quotable|2015-10-24 22:29:56|
|     2|  60756|   will ferrell|2015-10-24 22:29:52|
|     2|  89774|   Boxing story|2015-10-24 22:33:27|
|     2|  89774|            MMA|2015-10-24 22:33:20|
+------+-------+---------------+-------------------+


                                                                                

In [28]:
ratings_df.count()

100836

In [21]:
tags_df.count()

3683

#### Spark job

Выполнено 2 стейджа и 2 таски (по 1 в каждом)

![Job_count](../images/Job_count.png)

# 2. Работа с данными

## • Посчитать количество уникальных фильмов и уникальных юзеров в таблице “ratings”

#### Уникальные фильмы

In [64]:
ratings_df.select(f.countDistinct("movieId")).first()[0]

9724

#### Уникальные пользователи

In [65]:
ratings_df.select(f.countDistinct("userId")).first()[0]

610

## • Посчитать, сколько было поставлено оценок >= 4.0

In [70]:
ratings_df.filter(f.col("rating") >= 4.0).count()

48580

## • Вывести топ-100 фильмов с самым высоким рейтингом

In [81]:
ratings_df\
    .groupBy("movieId")\
    .agg(f.avg(f.col("rating")).alias("avg_rating"))\
    .sort(f.desc("avg_rating"))\
    .limit(100)\
    .show(100)

+-------+----------+
|movieId|avg_rating|
+-------+----------+
|   2196|       5.0|
|  88448|       5.0|
| 172909|       5.0|
|  67618|       5.0|
|   3496|       5.0|
|    148|       5.0|
| 142444|       5.0|
|    496|       5.0|
|   8911|       5.0|
| 118834|       5.0|
| 173963|       5.0|
| 156025|       5.0|
|   5513|       5.0|
| 120130|       5.0|
|  26350|       5.0|
|   1349|       5.0|
| 147300|       5.0|
| 122092|       5.0|
|    633|       5.0|
|  71268|       5.0|
|    876|       5.0|
|  99636|       5.0|
|  53578|       5.0|
|   6086|       5.0|
| 160644|       5.0|
|  47736|       5.0|
|   5490|       5.0|
| 164367|       5.0|
| 172583|       5.0|
| 172589|       5.0|
| 126921|       5.0|
|   3473|       5.0|
|   3795|       5.0|
|  50999|       5.0|
|   3941|       5.0|
| 141718|       5.0|
|    626|       5.0|
|  85295|       5.0|
|  26928|       5.0|
| 173619|       5.0|
|  40491|       5.0|
|   2824|       5.0|
| 167064|       5.0|
|  26147|       5.0|
|  25906|    

## • Посчитать разницу во времени в секундах между временем тегирования пользователя данного фильма и временем, когда пользователь поставил оценку фильму

In [156]:
time_diff_df = tags_df.alias("t").join(ratings_df.alias("r"), ["userId", "movieId"], how="inner")

In [165]:
time_diff_df = time_diff_df.withColumn("time_diff", f.abs(f.unix_timestamp("t.timestamp") - f.unix_timestamp("r.timestamp")))

In [168]:
time_diff_df.show(10)

+------+-------+-----------------+-------------------+------+-------------------+---------+
|userId|movieId|              tag|          timestamp|rating|          timestamp|time_diff|
+------+-------+-----------------+-------------------+------+-------------------+---------+
|     2|  60756|     will ferrell|2015-10-24 22:29:52|   5.0|2015-10-24 22:29:40|       12|
|     2|  60756|  Highly quotable|2015-10-24 22:29:56|   5.0|2015-10-24 22:29:40|       16|
|     2|  60756|            funny|2015-10-24 22:29:54|   5.0|2015-10-24 22:29:40|       14|
|     2|  89774|        Tom Hardy|2015-10-24 22:33:25|   5.0|2015-10-24 22:33:09|       16|
|     2|  89774|              MMA|2015-10-24 22:33:20|   5.0|2015-10-24 22:33:09|       11|
|     2|  89774|     Boxing story|2015-10-24 22:33:27|   5.0|2015-10-24 22:33:09|       18|
|     2| 106782|  Martin Scorsese|2015-10-24 22:30:56|   5.0|2015-10-24 22:29:26|       90|
|     2| 106782|Leonardo DiCaprio|2015-10-24 22:30:51|   5.0|2015-10-24 22:29:26

#### Cредняя дельта по времени

In [169]:
time_diff_df\
    .select(f.avg("time_diff"))\
    .first()[0]

29203715.568469506

## • Посчитать среднюю оценку от каждого пользователя

In [170]:
avg_ratings_per_user = ratings_df\
    .groupBy("userId")\
    .agg(f.avg(f.col("rating")).alias("avg_rating"))

In [171]:
avg_ratings_per_user.show(10)

+------+------------------+
|userId|        avg_rating|
+------+------------------+
|   148|3.7395833333333335|
|   463| 3.787878787878788|
|   471|             3.875|
|   496| 3.413793103448276|
|   243| 4.138888888888889|
|   392|               3.2|
|   540|               4.0|
|    31|              3.92|
|   516|3.6923076923076925|
|    85|3.7058823529411766|
+------+------------------+


#### Cреднее от всех усредненных оценок всех пользователей

In [172]:
avg_ratings_per_user\
    .select(f.avg("avg_rating"))\
    .first()[0]

3.6572223377474016