# Преобразования и действия RDD

В этой лекции мы начнем углубляться в использование Spark и Python. Пожалуйста, просмотрите лекцию с полным объясненим.

## Важные термины

Давайте быстро рассмотрим некоторые важные термины:

Термин                 |Определение
----                   |-------
RDD                    |Устойчивый распределенный набор данных
Transformation (преобразование)        |Spark операция, которая создает RDD
Action (действие)                |Spark операция которая создает локальный объект
Spark Job              |Последовательность преобразований данных с конечным действием

## Создание RDD

Есть два обычных способа создания RDD:

Метод                      |Результат
----------                               |-------
`sc.parallelize(array)`                  |Создать RDD  элементов массива (или листа)
`sc.textFile(path/to/file)`                      |Создать RDD строк из файла 

## Преобразования RDD 

Мы можем использовать преобразования для создания набора инструкций, которые мы хотим выполнить на RDD (прежде чем мы вызовем действие и фактически выполним их).

Пример  преобразования                       |Результат
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Отбросить нечетные элементы
`map(lambda x: x * 2)`                   |Умножить каждый элемент RDD на `2`
`map(lambda x: x.split())`               |Разделить каждую строку на слова 
`flatMap(lambda x: x.split())`           |Разделить каждую строку на слова и сгладить последовательность
`sample(withReplacement=True,0.25)`      |Создать образец 25% элементов с заменой
`union(rdd)`                             |Добавить `rdd` к существующему RDD
`distinct()`                             |Удалить дубликаты в  RDD
`sortBy(lambda x: x, ascending=False)`   |Сортировать элементы по убыванию

## RDD действия

Когда у вас есть готовый «рецепт» преобразований, вы будете выполнять их, вызывая действие. Вот некоторые общепринятые действия:

Действие                             |Результат
----------                             |-------
`collect()`                            |Конвертировать RDD в список в памяти 
`take(3)`                              |Первые 3 элемента RDD 
`top(3)`                               |Топ-3 элемента of RDD
`takeSample(withReplacement=True,3)`   |Создать образец 3х элементов с заменой
`sum()`                                |Найти сумму элементов (предполагаются числовые элементы)
`mean()`                               |Найти среднее значение элементов (предполагаются числовые элементы)
`stdev()`                              |Найти отклонение элементов (предполагаются числовые элементы)

______
# Примеры

Лучший способ показать все - это пройтись по примерам! Для начала мы рассмотрим создание и работу с простым текстовым файлом, а затем перейдем к более реалистичным данным, как например данные о клиентах и продажах.

### Создание RDD из текстового файла:

** Создание текстового файла **

In [1]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Writing example2.txt


Теперь давайте выполним некоторые преобразования и действия над этим текстовым файлом:

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
# Покажите RDD
sc.textFile('example2.txt')

MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [5]:
# Сохраните ссылку на этот RDD
text_rdd = sc.textFile('example2.txt')

In [7]:
# Сопоставить функцию (или lambda-выражению) с каждой строкой
# Затем соберите результаты.
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

## Map против flatMap

In [8]:
# Соберите все в единый flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

# RDD и пары ключ-значение

Теперь, когда мы поработали с RDDs и с тем как агрегировать значения с ними, мы можем начать работать с парами ключ-значение (Key Value Pairs). Чтобы сделать это, давайте создадим некоторые фиктивные данные в виде нового текстового файла.

Эти данные представляют некоторые сервисы, проданные клиентам для некоего SAAS бизнеса.

In [9]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [10]:
services = sc.textFile('services.txt')

In [11]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [12]:
services.map(lambda x: x.split())

PythonRDD[10] at RDD at PythonRDD.scala:43

In [13]:
services.map(lambda x: x.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

Давайте удалим этот первый хэш-тег!

In [26]:
services.map(lambda x: x[1:] if x[0]=='#' else x).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [27]:
services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

## Использование пар ключ-значение  для операций

Теперь начнем использовать методы, которые объединяют lambda-выражения, использующие аргумент ByKey. Эти методы ByKey предполагают,что ваши данные находятся в форме Key,Value (ключ, значение).

Например, давайте узнаем общий объем продаж по штатам: 

In [28]:
# Из прошлого
cleanServ = services.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split())

In [29]:
cleanServ.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [52]:
# Начнем с практики захвата полей
cleanServ.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [43]:
# Продолжайте с reduceByKey
# Заметьте как он предполагает, что первый элемент является ключом!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Ой! Кажется мы забыли что суммы (amounts) все еще строки! Давайте исправим:

In [42]:
# Продолжайте с reduceByKey
# Заметьте как он предполагает, что первый элемент является ключом!
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

Мы можем продолжить наш анализ, сортируя этот вывод:

In [69]:
# Извлеките state и amounts
# Добавьте их
# Избавьтесь от ('State','Amount')
# Отсортируйте их по значению amount
cleanServ.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

** Не забудьте попробовать использовать распаковку для удобства чтения. Например: **

In [78]:
x = ['ID','State','Amount']

In [79]:
def func1(lst):
    return lst[-1]

In [83]:
def func2(id_st_amt):
    # Распакуйте Values
    (Id,st,amt) = id_st_amt
    return amt

In [84]:
func1(x)

'Amount'

In [85]:
func2(x)

'Amount'

# Прекрасная работа!