# Spark SQL

Before you begin, make sure that you've installed spark of version `2.3` or higher (see `README.md`)

**Links**

* https://spark.apache.org/docs/latest/sql-getting-started.html

In [1]:
!pwd && ls -lah | grep README

/workspace/spark_sql/notebooks
-rwxr-xr-x 1 root root 1.2K Feb  3 20:03 README.md


## Запуск spark

In [2]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt

import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "10g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [3]:
! spark-submit --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_242
Branch 
Compiled by user  on 2019-08-27T21:31:02Z
Revision 
Url 
Type --help for more information.


In [4]:
DATA_PATH = '/workspace/data/ml-25m'

RATINGS_PATH = os.path.join(DATA_PATH, 'ratings.csv')
MOVIES_PATH = os.path.join(DATA_PATH, 'movies.csv')
TAGS_PATH = os.path.join(DATA_PATH, 'tags.csv')

---
## DataFrame creation

DataFrame можно создать несколькими способами:

* из файла
* из существующего RDD
* из другого DataFrame'a

### From file

В случае с созданием из csv файла, может понадобится указать схему

In [5]:
from pyspark.sql.types import *


schema = StructType([
    StructField('user_id', IntegerType()),
    StructField('movie_id', IntegerType()),
    StructField('rating', FloatType()),
    StructField('timestamp', IntegerType())
])

ratings_df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .schema(schema) \
    .options(header='true', delimiter=',') \
    .load('file:///' + RATINGS_PATH)

In [6]:
ratings_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: integer (nullable = true)



In [7]:
ratings_df.take(5)

[Row(user_id=1, movie_id=296, rating=5.0, timestamp=1147880044),
 Row(user_id=1, movie_id=306, rating=3.5, timestamp=1147868817),
 Row(user_id=1, movie_id=307, rating=5.0, timestamp=1147868828),
 Row(user_id=1, movie_id=665, rating=5.0, timestamp=1147878820),
 Row(user_id=1, movie_id=899, rating=3.5, timestamp=1147868510)]

### From RDD

Для того чтобы построить DataFrame из RDD нужно у RDD вызвать метод `toDF`.

*Remark:* RDD можно получить из DataFrame с помощью аттрибута `rdd`.

In [8]:
ratings = sc.textFile('file:///' + RATINGS_PATH)

In [9]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,296,5.0,1147880044',
 '1,306,3.5,1147868817',
 '1,307,5.0,1147868828',
 '1,665,5.0,1147878820']

In [10]:
from pyspark.sql import Row


ratings_df = ratings \
    .map(lambda s: s.split(',')) \
    .filter(lambda arr: arr[0].isdigit()) \
    .map(lambda arr: Row(user_id=int(arr[0]), 
                         movie_id=int(arr[1]), 
                         rating=float(arr[2]), 
                         timestamp=int(arr[3])))\
    .toDF()

In [11]:
ratings_df.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- user_id: long (nullable = true)



---
## DataFrame API

In [12]:
import pyspark.sql.functions as F


movies_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + MOVIES_PATH)

In [13]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



### Фильмы с наибольшим средним рейтингом

Найти 10 фильмов с наибольшим средним рейтингом. Вывести их названия и средний рейтинг.

Сравните код ниже с кодом, использующим RDD API (см. `apache_spark/notebooks/spark_examples.ipynb`):

```
ratings \
    .map(lambda r: (r.movie_id, (r.rating, 1))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .mapValues(lambda ratings: ratings[0] / ratings[1]) \
    .join(movies) \
    .sortBy(lambda key_value: key_value[1][0], ascending=False) \
    .take(10)
```

In [None]:
ratings_df \
    .groupby('movie_id') \
    .agg(F.mean('rating').alias('mean_rating'), 
         F.count('rating').alias('ratings_count')) \
    .join(movies_df, ratings_df['movie_id'] == movies_df['movieId'], how='inner') \
    .sort(F.col('mean_rating').desc()) \
    .take(10)

### Фильмы с наибольшим числом оценок

Найти 10 фильмов с наибольшим числом оценок. Вывести их названия и число оценок

In [None]:
movies_frequency_pdf = ratings_df \
    .groupby('movie_id') \
    .count() \
    .join(movies_df, ratings_df['movie_id'] == movies_df['movieId']) \
    .sort(F.col('count').desc()) \
    .toPandas()

In [None]:
movies_frequency_pdf.head(10)

---
## SQL

In [None]:
ratings_df.createTempView('ratings')
movies_df.createTempView('movies')

In [None]:
query = """
    SELECT movie_id, COUNT(*), first(title) as title
    FROM ratings INNER JOIN movies ON ratings.movie_id == movies.movieId
    WHERE movies.title LIKE '%(1994)%'
    GROUP BY movie_id
    ORDER BY COUNT(*) DESC
"""

movies_frequency = spark.sql(query)
movies_frequency.show()

In [None]:
spark.sql(query).explain(True)

---
## User Defined Function (UDF)


### Количество вышедших фильмов по годам

Мы знаем, что название фильма содержит информацию о дате выхода

In [None]:
import re


def get_release_year(title):
    result = re.match(r'.*(\(\d+\))', title)
    return int(result.group(1)[1:-1]) if result is not None else None


get_release_year_udf = F.udf(get_release_year, IntegerType())

In [None]:
relase_count_by_year = movies_df \
    .withColumn('year', get_release_year_udf('title')) \
    .filter((F.col('year').isNotNull()) & (F.col('year') < 2025)) \
    .groupby('year') \
    .count() \
    .sort(F.col('year')) \
    .toPandas()

In [None]:
relase_count_by_year.head()

In [None]:
plt.figure(figsize=(12, 5))
relase_count_by_year.set_index('year')['count'].plot()
plt.show()

---
## Window Functions

Хотим понять, сколько времени проходит между последовательными оценками для пользователей, более формально:

Для пользователя $u$ есть последовательность $(m_1, r_1, t_1), \ldots, (m_n, r_n, t_n)$, где $t_i \leq t_{i+1}$. Рассмотрим последовательность $\Delta_i = t_{i+1} - t_i$, для $i=1,\ldots,n-1$.

Хотим построить распределение величины $\Delta_i$ используя информацию обо всех пользователях.

In [31]:
from pyspark.sql.window import Window


user_window = Window.orderBy('timestamp').partitionBy('user_id')

ratings_df \
    .withColumn('next_timestamp', F.lead('timestamp').over(user_window)) \
    .filter(F.col('next_timestamp').isNotNull()) \
    .take(10)

[Row(user_id=964, movie_id=1641, rating=5.0, timestamp=898993065, next_timestamp=898993065),
 Row(user_id=964, movie_id=1704, rating=4.0, timestamp=898993065, next_timestamp=898993065),
 Row(user_id=964, movie_id=1721, rating=3.0, timestamp=898993065, next_timestamp=898993197),
 Row(user_id=964, movie_id=1639, rating=3.0, timestamp=898993197, next_timestamp=898993197),
 Row(user_id=964, movie_id=1653, rating=5.0, timestamp=898993197, next_timestamp=898993197),
 Row(user_id=964, movie_id=1727, rating=4.0, timestamp=898993197, next_timestamp=898993275),
 Row(user_id=964, movie_id=1682, rating=4.0, timestamp=898993275, next_timestamp=898993275),
 Row(user_id=964, movie_id=2801, rating=5.0, timestamp=898993275, next_timestamp=898993309),
 Row(user_id=964, movie_id=1057, rating=4.0, timestamp=898993309, next_timestamp=898993310),
 Row(user_id=964, movie_id=1407, rating=4.0, timestamp=898993310, next_timestamp=898993337)]

In [None]:
DAY = 24 * 60 * 60

ratings_df \
    .withColumn('next_timestamp', F.lead('timestamp').over(user_window)) \
    .filter(F.col('next_timestamp').isNotNull()) \
    .withColumn('delta', (F.col('next_timestamp') - F.col('timestamp')) / F.lit(DAY)) \
    .select('delta') \
    .filter(F.col('delta') <= 365) \
    .sample(False, 0.1) \
    .toPandas().plot.hist(bins=100, logy=True)

---
# Упражнения. Recommender System

Пусть $U$ - множество пользователей, $I$ - множество фильмов, и $R = (r_{ui})$ - матрица рейтингов. 

Через $R_u$ будем обозначать строку матрицы $R$, соотвествующую пользователю $u$.

## Most similar items

Для каждого фильма $i$ определим множество $U(i) = \{ u \in U \mid r_{ui} \neq 0 \}$ - множество пользователей, поставивших фильму $i$ оценку.

Тогда мы можем определить множество $I(i) = \left\{ i' \in I ~\Big|~ sim(i, i') = \frac{|U(i) ~\cap~ U(i')|}{|U(i) ~\cup~ U(i')|} > \delta \right\}$ - множество похожих фильмов.

Можно отсортировать элементы множества $I(i)$ по неубыванию $sim(i, i')$.

Реализуйте функцию, которая на вход получает `movie_id` и возвращает топ `N` фильмов отранжированных по $sim$.

выберем произвольных 10к пользователей

In [14]:
n_users = 10000
selected_users = ratings_df \
    .select('user_id') \
    .distinct() \
    .sample(False, 0.01) \
    .limit(n_users)
# from here: https://stackoverflow.com/a/32837900

In [15]:
ratings_df = ratings_df \
    .join(selected_users, 'user_id')

In [16]:
ratings_df.persist()
ratings_df.count()

261425

In [17]:
movies_df = movies_df.withColumnRenamed("movieId", "movie_id")

In [18]:
movies_df.limit(3).toPandas()

Unnamed: 0,movie_id,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance


In [19]:
def sim_films(ratings_df, movies_df, movie_id, N=10):
    movie_reviewers = ratings_df \
        .filter(F.col('movie_id') == movie_id) \
        .select("user_id")    
    movie_reviewers_count = movie_reviewers.count()
    
    intersections = ratings_df \
        .join(movie_reviewers, 'user_id') \
        .groupby('movie_id') \
        .count() \
        .withColumnRenamed('count', 'intersection')
    
    movie_count = ratings_df \
        .groupby('movie_id') \
        .count()
    
    result = intersections \
      .join(movie_count, 'movie_id')
    
    sim = result \
        .withColumn('sim', result['intersection'] / (movie_reviewers_count + result['count'] - result['intersection'])) \
        .sort(F.col('sim').desc()) \
        .select('movie_id', 'sim') \
        .limit(N) \
        .join(movies_df, "movie_id") \
        .sort(F.col('sim').desc())
    
    return sim

In [22]:
sim = sim_films(ratings_df, movies_df, 356)

In [23]:
sim.toPandas()

Unnamed: 0,movie_id,sim,title,genres
0,356,1.0,Forrest Gump (1994),Comedy|Drama|Romance|War
1,480,0.53401,Jurassic Park (1993),Action|Adventure|Sci-Fi|Thriller
2,296,0.519177,Pulp Fiction (1994),Comedy|Crime|Drama|Thriller
3,318,0.499094,"Shawshank Redemption, The (1994)",Crime|Drama
4,593,0.489202,"Silence of the Lambs, The (1991)",Crime|Horror|Thriller
5,110,0.471679,Braveheart (1995),Action|Drama|War
6,589,0.469806,Terminator 2: Judgment Day (1991),Action|Sci-Fi
7,527,0.447475,Schindler's List (1993),Drama|War
8,2571,0.435514,"Matrix, The (1999)",Action|Sci-Fi|Thriller
9,150,0.433298,Apollo 13 (1995),Adventure|Drama|IMAX


Построим несколько вариантов простой Рекомендательной Системы.

*Основная цель:* порекомендовать фильм пользователю. (Для простоты опустим информацию о самих рейтингах)

## Methods

При разработке сложных методов, сначала нужно выбрать несколько простотых методов (`Baseline`), относительно которых мы будем сравнивать новый метод.

### POP

Будем всегда рекомендовать самые популярные фильмы (с наибольшим цислом оценок пользователей).

Не смотря на свою простоту, в ряде задач показывает себя достаточно хорошо.

In [24]:
import abc

In [25]:
class Recommender(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def recommend(self, users): raise NotImplementedError

In [69]:
class Popular(Recommender):
    def __init__(self, ratings_df, N=10):
        self.N = N
        self.pop = ratings_df \
                    .groupby('movie_id') \
                    .count() \
                    .sort(F.col('count').desc()) \
                    .limit(N) \
                    .select('movie_id') \
                    .withColumn("rank", F.monotonically_increasing_id()) \
                    .persist()
    
    def recommend(self, users):
        # TODO: возможно, стоит учитывать какие фильмы уже посмотрел пользователь
        return users \
                .crossJoin(self.pop) \
                .sort(F.col('user_id'), F.col('rank'))

In [71]:
p = Popular(ratings_df, N=4)

In [72]:
p.recommend(selected_users.limit(100)).limit(10).toPandas()

Unnamed: 0,user_id,movie_id,rank
0,287,356,0
1,287,318,1
2,287,296,2
3,287,593,3
4,964,356,0
5,964,318,1
6,964,296,2
7,964,593,3
8,1636,356,0
9,1636,318,1


### User-based Collaborative Filtering

1. Для пользователя $u$ определим множество похожих пользователей как 

$$U(u) = \left\{ u' \in U \mid sim(u, u') > \alpha \right\},$$

где $sim(u, u')$ — одна из возможных мер близости $u'$ к $u$, например, косинусная близость между $R_{u}$ и $R_{u'}$

2. $$I(u) = \left\{ i \in I ~\Big|~ B(i) = \frac{|U(u) ~\cap~ U(i)|}{|U(u) ~\cup~ U(i)|} > 0 \right\}$$


3. Отсортировать $i \in I(u)$ по убыванию $B(i)$, взять top $N$

In [55]:
class UCF(Recommender):
    def __init__(self, ratings_df):
        self.pop = ratings_df

### Item-kNN

1. Для пользователя $u$ мы знаем $R(u) = \{ i \in I \mid r_{ui} \neq 0 \} $


2. Для каждого $i \in R(u)$ построим $I(i)$ - множество похожих фильмов


3. Отсортируем элементы множества $ \bigcup_{i \in R(u)} I(i) $, возьмем top $N$

In [None]:
######################################
######### YOUR CODE HERE #############
######################################

## Evaluation Protocol

Теперь опишем, как будем оценивать качество рассматриваемых методов.

### Data Splits

Так как в данных `ratings` есть поле `timestamp`, то для каждого пользователя отсортируем его рейтинги по времени и первые $80\%$ рейтингов отнесем в `Train`, еще $10\%$ в `Validation`, и остальное в `Test`.


In [32]:
def train_val_test_split(df, train_size=0.8, valid_size=0.1):
    # https://stackoverflow.com/a/51773836
    user_window = Window.orderBy('timestamp').partitionBy('user_id')
    
    df_with_ranks = df \
      .withColumn('rank', F.percent_rank().over(user_window))

    train = df_with_ranks \
      .filter(df_with_ranks['rank'] < train_size) \
      .drop('rank')

    valid = df_with_ranks \
      .filter((train_size <= df_with_ranks['rank'])  \
                          & (df_with_ranks['rank'] < train_size + valid_size)) \
      .drop('rank')

    test = df_with_ranks \
      .filter(train_size + valid_size <= df_with_ranks['rank']) \
      .drop('rank')
    
    return train, valid, test

In [33]:
train, valid, test = train_val_test_split(ratings_df)

In [34]:
train.limit(3).toPandas()

Unnamed: 0,user_id,movie_id,rating,timestamp
0,964,1641,5.0,898993065
1,964,1704,4.0,898993065
2,964,1721,3.0,898993065


### Metrics

В `Test` для каждого пользователя есть `user_id, R_u`, где $R_u$ - множество фильмов, которым он поставил оценку. Для того чтобы оценить качество рассматриваемых методов будем для пользователя строить список из $N$ рекомендаций $\hat{R}_{1\colon N}$ и считать метрики.

* $$ Precision@N = \frac{|R_u \cap \hat{R}_{1\colon N} |}{N} $$
* $$ Recall@N = \frac{|R_u \cap \hat{R}_{1\colon N} |}{|R_u|} $$
* $Map@N$

где $N \in \{1,5,10\}$

https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

In [80]:
def calculate_metrics(predicted, test, n):    
    true_positive_counts = predicted \
        .join(test, on=['user_id', 'movie_id']) \
        .count()
    
    precision = true_positive_counts / (n * test.count())
    recall = true_positive_counts / test.count()
    
    return precision, recall

## Experimental Results

Посмотрим на результаты

*Remark* скорее всего посчитать метрики для всех пользователей будет довольно долго - можно ограничится, например, случайными 10к пользователями

**Pop**

In [78]:
N = [1, 5, 10]

In [82]:
for n in N:
    pop = Popular(train, N=n)
    
    users_to_predict = valid \
                            .select('user_id') \
                            .distinct()
    
    predictions = pop.recommend(users_to_predict)
    
    precision, recall = calculate_metrics(predictions, valid, n)
    
    print("N={}, precision={}, recall={}".format(n, precision, recall))

N=1, precision=0.0013907668533899942, recall=0.0013907668533899942
N=5, precision=0.0014834846436159938, recall=0.007417423218079969
N=10, precision=0.0014216727834653275, recall=0.014216727834653274
