# ivi Big Data school

## pyspark

### Получение данных

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() # гвоздь программы

### Создание pyspark DataFrame

In [2]:
# читаем из подготовленного CSV
# cache - означает, что данные будут не только считаны с диска, но и подгружены память
# по умолчанию, без кэширования, Spark работает так же, как и MapReduce - читает с диска и пишет на диск
df = spark.read.csv("content_watch.csv", inferSchema=True, header=True, nullValue="None").cache()
# смотрим, что получилось
df.show()

+----------+--------------------+----------+-------------+
|   user_id|       operator_name|content_id|show_duration|
+----------+--------------------+----------+-------------+
| 827375963|          Rostelecom|      9966|         1400|
| 890810589|          LLC TOMTEL|     10140|         1271|
| 929088258|OJS Moscow city t...|      9572|           30|
| 987577016|          Rostelecom|      8014|         1310|
| 573841374|OJS Moscow city t...|      9983|         3205|
| 757852413|Ekaterinburg-2000...|      9572|         3105|
| 991611208|RECONN. Operator ...|      9983|         2998|
| 914647005|          Rostelecom|      7097|         2870|
| 924157490|Multiservice Netw...|     10073|         2569|
|1005188286|   Prime-Service LLC|      9572|         3095|
| 956265133|OJS Moscow city t...|     10427|         3085|
| 736604024|     E-Light-Telecom|      9572|         3095|
| 852100063|     E-Light-Telecom|      9983|         3111|
| 966231004|Closed Joint Stoc...|      9195|            

### Первый взгляд на данные

In [3]:
print("только имена колонок:")
print(df.columns)
print("\nимена колонок с типами данных:")
print(df.dtypes)
print("\nполное описание типа данных строки в DataFrame в синтаксисе pyspark.sql.types:")
print(df.schema)
# метод describe возвращает DataFrame, а не просто печатает табличку
# поэтому мы ещё вызываем метод show, который просто печатает
print("\nпростейшая статистика:")
df.describe().show()

только имена колонок:
['user_id', 'operator_name', 'content_id', 'show_duration']

имена колонок с типами данных:
[('user_id', 'int'), ('operator_name', 'string'), ('content_id', 'int'), ('show_duration', 'int')]

полное описание типа данных строки в DataFrame в синтаксисе pyspark.sql.types:
StructType(List(StructField(user_id,IntegerType,true),StructField(operator_name,StringType,true),StructField(content_id,IntegerType,true),StructField(show_duration,IntegerType,true)))

простейшая статистика:
+-------+--------------------+--------------------+------------------+------------------+
|summary|             user_id|       operator_name|        content_id|     show_duration|
+-------+--------------------+--------------------+------------------+------------------+
|  count|               20000|               20000|             20000|             17928|
|   mean|   8.6525109116155E8|                null|        17391.6428|1272.5749665327978|
| stddev|1.6811632041240516E8|                nul

Обратите внимание, что:
* Spark обычно правильно определяет тип колонок в CSV (но это требует двойного прохода по данным во время чтения)
* чтобы избежать двойного чтения CSV, можно передать схему данных в метод csv
* у строк в DataFrame, вообще говоря, нет порядка - в production они хранятся распределённо на нескольких серверах
* поэтому сортировать строки в самом DataFrame _невозможно_
* но можно сортировать результаты различных операций с DataFrame

#### Доступ к данным

как брать различные срезы данных

In [4]:
# Срез по колонкам 
col_slice = ["user_id", "content_id"]
# можно просить показать разное количество строк
df[col_slice].show(3)

+---------+----------+
|  user_id|content_id|
+---------+----------+
|827375963|      9966|
|890810589|     10140|
|929088258|      9572|
+---------+----------+
only showing top 3 rows



Обратите внимание:
* в Spark строки DataFrame, вообще говоря, не хранятся в памяти одной машины в определённом порядке
* поэтому никаких loc/iloc, аналогичных pandas, в Spark нет

### Подготовка данных

In [5]:
# сколько пустых значений в разных колонках
from pyspark.sql.functions import isnull

def print_null_stat(df):
    for column_name in df.columns:
        print(column_name, df.where(isnull(column_name)).count())

# заполним пустые значения нулями
print("было")
print_null_stat(df)
df = df.fillna(0)
print("\nстало")
print_null_stat(df)

было
user_id 0
operator_name 0
content_id 0
show_duration 2072

стало
user_id 0
operator_name 0
content_id 0
show_duration 0


In [6]:
from pyspark.sql.functions import col

# другие способ получить логический срез
df[col("content_id") == 7029].show(3)
df[col("content_id") == 7029][["content_id"]].show(3)

+---------+-------------+----------+-------------+
|  user_id|operator_name|content_id|show_duration|
+---------+-------------+----------+-------------+
|603396239| Beeline Home|      7029|           65|
|985861659|   Skynet Ltd|      7029|           19|
|958739701|    VimpelCom|      7029|           11|
+---------+-------------+----------+-------------+
only showing top 3 rows

+----------+
|content_id|
+----------+
|      7029|
|      7029|
|      7029|
+----------+
only showing top 3 rows



In [7]:
# можно строить срезы по нескольким условиям
# в виде строки
df.where("operator_name == 'Beeline Home' and content_id == '7029'").show(3)
# или последовательно применяя фильтры
# использовать функцию col необязательно - Spark поддерживает много разных синтаксисов
df.where(df.operator_name == 'Beeline Home')\
    .where(df.content_id == '7029').show(3)

+----------+-------------+----------+-------------+
|   user_id|operator_name|content_id|show_duration|
+----------+-------------+----------+-------------+
| 603396239| Beeline Home|      7029|           65|
| 788312739| Beeline Home|      7029|            0|
|1008874267| Beeline Home|      7029|          126|
+----------+-------------+----------+-------------+
only showing top 3 rows

+----------+-------------+----------+-------------+
|   user_id|operator_name|content_id|show_duration|
+----------+-------------+----------+-------------+
| 603396239| Beeline Home|      7029|           65|
| 788312739| Beeline Home|      7029|            0|
|1008874267| Beeline Home|      7029|          126|
+----------+-------------+----------+-------------+
only showing top 3 rows



### Агрегация данных

In [8]:
from pyspark.sql.functions import mean

# Агрегаты можно вычислять по столбцам
df.agg(mean('show_duration')).show()
# можно добавить фильтрацию
df[df["operator_name"]=="Beeline Home"].agg(mean('show_duration')).show()

+------------------+
|avg(show_duration)|
+------------------+
|         1140.7362|
+------------------+

+------------------+
|avg(show_duration)|
+------------------+
|1124.2944693572497|
+------------------+



In [9]:
from pyspark.sql.functions import max, min

# продвинутая агрегация
df.groupby("user_id").agg(mean("show_duration"), max("show_duration"), min("show_duration")).show(3)

+----------+------------------+------------------+------------------+
|   user_id|avg(show_duration)|max(show_duration)|min(show_duration)|
+----------+------------------+------------------+------------------+
|1007882880|             240.0|               240|               240|
| 643879096|            3185.0|              3185|              3185|
| 897823457|3101.6666666666665|              3150|              3070|
+----------+------------------+------------------+------------------+
only showing top 3 rows



## SQL-style: джойны

в Spark можно объединять разные DataFrame друг с другом в SQL-подобном стиле

In [10]:
# подготовим данные для join
content_watch = df
content = spark.read.csv("content_title.csv", inferSchema=True, header=True, nullValue="None").cache()
content.show()

+----------+--------------------+
|content_id|       content_title|
+----------+--------------------+
|      9966|                Луна|
|     10140|             Солдаты|
|      9572|               Мажор|
|      8014|Клуб Винкс – Школ...|
|      9983|               Метод|
|      7097|      Закрытая школа|
|     10073|             Косатка|
|     10427|Петровка, 38. Ком...|
|      9195|Смерть шпионам: С...|
|      9786|        Женская доля|
|     10208|Даша-путешественница|
|     10590|            Чичилэнд|
|      9797|Новые приключения...|
|     10596|         Добрый Комо|
|     10956|             Подмена|
|     10275|            Три кота|
|      7384|На безымянной высоте|
|     10939|          Район тьмы|
|      7423|       Неудача Пуаро|
|      6793|     Битва за Москву|
+----------+--------------------+
only showing top 20 rows



In [11]:
content_watch.join(content, how="left", on="content_id").show()

+----------+----------+--------------------+-------------+--------------------+
|content_id|   user_id|       operator_name|show_duration|       content_title|
+----------+----------+--------------------+-------------+--------------------+
|      9966| 827375963|          Rostelecom|         1400|                Луна|
|     10140| 890810589|          LLC TOMTEL|         1271|             Солдаты|
|      9572| 929088258|OJS Moscow city t...|           30|               Мажор|
|      8014| 987577016|          Rostelecom|         1310|Клуб Винкс – Школ...|
|      9983| 573841374|OJS Moscow city t...|         3205|               Метод|
|      9572| 757852413|Ekaterinburg-2000...|         3105|               Мажор|
|      9983| 991611208|RECONN. Operator ...|         2998|               Метод|
|      7097| 914647005|          Rostelecom|         2870|      Закрытая школа|
|     10073| 924157490|Multiservice Netw...|         2569|             Косатка|
|      9572|1005188286|   Prime-Service 

# Домашнее задание

## Разминочная часть

In [12]:
# Ваш код здесь

У какого оператора больше всего пользователей?

In [13]:
# Ваш код здесь

Какой средний показатель по числу просмотренных фильмов среди мужчин - пользователе Ростелекома?

In [14]:
# Ваш код здесь

Построить по данным файла content_watch.csv матрицу user-item

В матрице должно быть 3 столбца

Первый столбец - user_id

Второй столбец - list из content_id, которые смотрел user_id. Оставлять нужно только уникальные id контента - повторов быть не должно


In [15]:
# Ваш код здесь