<img src='https://spark.apache.org/images/spark-logo.png'>

## Что такое Apache spark

Библиотека для распределенных вычислений
- отказоустойчивая
- итеративная
- интерактивная
- интегрирована с hadoop
- написана на scala 
- очень популярная

## Как запускать pyspark notebook

In [None]:
import sys
import os

spark_home='/opt/spark-1.5.1-bin-hadoop2.6/'
# addin pyspark to current path
sys.path.append( spark_home+'/python' )
sys.path.append( spark_home+'/python/lib/py4j-0.8.2.1-src.zip' )

# Установка локальных переменных
os.environ["SPARK_HOME"] = spark_home
os.environ["HADOOP_HOME"] = '/opt/cloudera/parcels/CDH/lib/hadoop'
os.environ["HADOOP_YARN_HOME"] = '/opt/cloudera/parcels/CDH/lib/hadoop-yarn'
os.environ["YARN_CONF_DIR"] = '/etc/hadoop/conf.cloudera.yarn'
os.environ["SPARK_CLASSPATH"] = '/etc/hive/conf.cloudera.hive1'
os.environ["PYSPARK_SUBMIT_ARGS"] = '--master local[4] pyspark-shell'

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext

In [None]:
conf = SparkConf ().set( 'spark.app.name', 'test');
sc = SparkContext (conf= conf)

# Обработка данных на спарке

### Как выглядит программа на спарке

In [None]:
# Получаем данные
data = [1, 2, 3, 4, 5]
inputRDD = sc.parallelize(data)

# Преобразовываем данные
filteredRDD = inputRDD.filter(lambda x: x%2==0)

# Сохраняем / выдаем данные
print "число элементов в исходном датасете", inputRDD.count ()
print "число элементов в конечном датасете", filteredRDD.count()
print "конечный датасет:", filteredRDD.collect ()

In [None]:
print "тип спарковского датасета:", type (filteredRDD)

**RDD** (Resilient Distributed Dataset) - распределенные датасеты, с которыми работает spark

<img src='http://1.bp.blogspot.com/-GhyDINiSlRY/Uu9BH844SBI/AAAAAAAABDM/J12X3mHPw1E/s1600/P2.png' width='400'>

### Как заводить данные в spark

Загружать датасеты из локальной программы

In [None]:
data = [1, 2, 3, 4, 5]
inputRDD = sc.parallelize(data)
print "число элементов", inputRDD.count ()

Загружать датасетсы из hdfs / s3. TODO.  Разобраться с этим. возможно поднять hadoop кластер

In [None]:
inputRDD = sc.textFile("/testdata/splits_test")
print "число элементов", inputRDD.count ()

### Кеширование данных

RDD датасеты можно кешировать и хранить в оперативной памяти. 
Это позволяет
- делать итеративные операции над данными (напр. в машинном обучении)
- интерактивно исследовать данные

кеширование выполняется при помощи операции **cache** с датасетом

In [None]:
inputRDD.cache()

### Простые преобразования

Трансформации преобразовывают одни RDD в другие. Такие преобразования являются промежуточным, а не конечным результатом

<table>
<tr><td>**Преобразование**</td> <td>**Описание**</td></tr>

<tr><td>**map**(func)</td> <td>Вовзращает новый RDD полученный из применения функции funk к каждому элементу исходного датасета</td></tr>

<tr><td>**filter**(func)</td> <td>Возвращает датасет из элементов, для которых функция *funk* вернула *true*</td></tr>

<tr><td>**flatMap**(func)</td> <td> Аналогично функции **map** но каждый входной элемент может быть преобразован в 0 и более выходных элементов. (Поэтому функция должна возвращать список вместо одиночного объекта).</td></tr>

<tr><td>**distinct**([numTasks]))</td> <td>Возвращает новый датасет, который содержит разные элементы исходного датасета.</td></tr>

<tr><td>**sample**(withReplacement, fraction, seed)</td> <td>Выдает долю *fraction* данных с возвращением или без, используя заданный seed генератора случайных чисел</td></tr>

<tr><td>**union**(otherDataset)</td> <td>Возвращает новый датасет, который содерживт объединения элементов исходного датасета и датасета - аргумента </td></tr>

<tr><td>**intersection**(otherDataset)</td> <td>Вовзращает новый RDD, который содержит пересечение элементов исходного датасета с датасетом - аргументом</td></tr>

<tr><td>**cartesian**(otherDataset)</td> <td>Декартово произведение. При применении к датасетам типов T и U, возвращает датасет пар (T,U) (все возможные пары элементов)</td></tr>


</table>

Для примера будет разбирать лог действий пользователей в некоторой системе

In [None]:
# Будем разбирать вот этот датасет
for l in inputRDD.top(5): print l

**map** позволяет разделить строки на поля и выбрать только интересные нам

In [None]:
# Функция map позволяет разделить строки на поля и выбрать только интересные нам:
splittedRDD = inputRDD.map (lambda x: x.split(";")[:])

# Выводим на экран при помощи pandas data frame
import pandas
pandas.DataFrame(splittedRDD.top(5))

**filter** позволяет исключать из выборки строки, не попадающие под условие. 

Отфильтровываем заголовки

In [None]:
import re
filteredRDD = splittedRDD.filter (
lambda x: len (re.findall ("^\d{2}\.\d{2}.\d{4}.*", x[0])) != 0 ) 
pandas.DataFrame(filteredRDD.top(5))

**flatMap** позволяет разделить одну строку записей на несколько строк. 

Например если нам нужно выделить каждый параметр операции в отдельную строку

In [None]:
parsedParamsRDD = filteredRDD.flatMap (
lambda x: [[x[0],x[1],x[2],x[3],param] 
           for param in x[4].split(',')] )
pandas.DataFrame(parsedParamsRDD.top(5))

**sample** - простое решение задачи семплирования

In [None]:
print splittedRDD.count()
sampledRDD = splittedRDD.sample (False, 0.1, 3)
print sampledRDD.count()

**distinct** - способ удаления дублей

In [None]:
users = filteredRDD.map(lambda x: x[1])
distinct_users = users.distinct()
print "rows", users.count()
print "distinct users", distinct_users.count()

**union** и **intersection** - объединение / пересечение двух датасетов

In [None]:
uniqueRDD = inputRDD.distinct()
sample1 = uniqueRDD.sample(False, 0.1, 1)
sample2 = uniqueRDD.sample(False, 0.1, 2)

print "input size:         ", uniqueRDD.count()
print "sample 1 size:      ", sample1.count()
print "sample 2 size:      ", sample2.count()
print "union size:         ", sample1.union(sample2).count ()
print "intresection size:  ", sample1.intersection(sample2).count ()

## Действия

Действия превращают RDD во что-то другое. Например сохраняют в файл или возвращают значение в блокнот

<table>
<tr><td>**Action**</td> <td>**Meaning**</td></tr>
<tr><td>**reduce**(func)</td> <td>Аггрегирует элементы датасета, используя функцию *func* (которая принимает 2 аргумента и возвращает 1 значение). Функция должна быть коммутативной и ассоциативной так чтобы она могла правильно паралельно вычисляться</td></tr>

<tr><td>**collect**()</td> <td>
Вовзращат все элементы датасета как массив программе - драйверу. Это обычно полезно после фильтрации или другой операции, которая возвращает достаточно маленькое подмножество данных</td></tr>

<tr><td>**count**()</td> <td>Возвращает число элементов в датасете</td></tr>

<tr><td>**first**()</td> <td>Возвращает 1й элемент множество (аналогично *take (1)*)</td></tr>

<tr><td>**take**(n)</td> <td>
Возвращает массив первых *n* элементов в датасете. Помните, что сейчас этот метод выполняется не паралельно, а вместо этого программа-драйвер обсчитывает все элементы</td></tr>

<tr><td>**takeSample**(withReplacement, num, seed)</td> <td>
Возвращет массив со случайным семплом из num элементов датасета, с или без возвращения, используя заданный seed генератора случайных чисел
</td></tr>

<tr><td>**takeOrdered**(n, [ordering])</td> <td>
Возвращает первые *n* элементов RDD используя либо натуральный порядок или заданный компаратор</td></tr>

<tr><td>**saveAsTextFile**(path)</td> <td>
Пишет элементы датасета в текстовый файл (или набор текстовых файлов) в заданную дирекорию на логальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой хадупом. Спарк будет вызывать *toString* для каждого элемента для преобразования его в строку теста в файле</td></tr>

<tr><td>**foreach**(func)</td> <td>
Выполняет функцию *funk* для каждого элемента в датасете. Это обычно делается для побочных действий, таких как обновления аккумуляторов или  взаимодействия с внешней системой храния</td></tr>

</table>

**reduce** позволяет посчитать какую-нибудь статистику по датасету.<br>
Например найти минимальное и максимальное время лога

In [None]:
times = filteredRDD.map(lambda x:x[0])
print times.reduce(min), "-", times.reduce(max)

<img src='http://1.bp.blogspot.com/-NhP-r7kTpIw/UBfkiMj_1lI/AAAAAAAAAJ0/CV1gFLfNLW0/s1600/Types%2Bof%2BNoSQL%2B-%2Bkey%2Bvalue%2Bstore.jpg'>

# Работа с датасетами пар ключ значение

Для чего надо выделять ключ для записей в датасете?

- для группировки данных по ключу
- для джойна датасетов по ключу
- для сортировки

В общем выделение ключа очень полезно!

### Как выделить ключ из датасета?

Выделить ключ из датасета, можно, превратив строки в списки из 2х и более значений

In [None]:
inputRDD = sc.textFile("/testdata/splits_test")
# splitted RDD уже имеет первичный ключ "Субъект", то естьтот, кто делал операцию
from operator import itemgetter
keyValueRDD = inputRDD.map (lambda x: itemgetter(1,2,3,4,0)(x.split(";")))
import pandas
pandas.DataFrame(keyValueRDD.top(5))

### Что можно делать с key-value датасетом

<table>
<tr><td>**Преобразование**</td> <td>**Описание**</td></tr>

<tr><td>**groupByKey**([numTasks])</td> <td>Если применить к датасету пар (K, V), вернет датасет пар (K, Iterable&lt;V&gt;). 
<br>**Note:** При группировке для рассчета аггрегатов (таких как сумма или среднее), рекоммендуется использовать reduceByKey или combineByKey для лучшей производительности

<br>**Note:** По умолчанию уровень паралелизма на выходе зависит от числа партиций в родительском RDD. Можно передать опциональный аргумент numTasks для установки другого числа задач</td></tr>

<tr><td>**reduceByKey**(func, [numTasks])</td> <td>
При применении к датасету пар (k,v), возвращает новый датасет пар (K,V), где значения каждого ключа саггрегированны, через заданную reduce функцию. Как в groupByKey, число reduce задач задается необязательным вторым аргументом</td></tr>

<tr><td>**sortByKey**([ascending], [numTasks])</td> <td>
При применении к датасету пар (K,V), где K может быть отсортирован, возвращает датасет пар (K,V), отсортированных по ключу в порядке убывания или возрастания, в зависимости от первого аргумента</td></tr>

<tr><td>**join**(otherDataset, [numTasks])</td> <td>
При применении к датасетам (K,V) и (K,W), возвращает датасет пар (K,(V,W)), со всеми парами элементов для каждого из ключей. Внешние джойны также поддерживаются через leftOuterJoin и rightOuterJoin</td></tr>

<tr><td>**cogroup**(otherDataset, [numTasks])</td> <td>
При примененеии к датасетам типов (K,V) и (K,W), возвращает датасет кортежей (K, Iterable &lt;V&gt;, Iterable &lt;W&gt;). Эта операция также называется groupWith</td></tr>

</table>

<table>
<tr><td>**Действие**</td> <td>**Описание**</td></tr>
<tr><td>**countByKey**()</td> <td>Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.</td></tr>
</table>

# Примеры операций

### groupByKey

Выпишем все действия для каждого из пользователей

In [None]:
groupedRDD = keyValueRDD.map(lambda x:(x[0],x[4])).groupByKey()
pandas.DataFrame(groupedRDD.top(5))

### reduceByKey

Найдем время начала и конца работы пользователя

In [None]:
def findMinMax (minMax, newValue):
    newMin = newValue[0] if newValue[0] < minMax[0] else minMax[0]
    newMax = newValue[1] if newValue[1] > minMax[1] else minMax[1]
    return (newMin, newMax)

minMaxRDD = keyValueRDD.map(lambda x: (x[0],(x[4],x[4]))).reduceByKey(findMinMax)
pandas.DataFrame(minMaxRDD.top(5))

### sortByKey

Сортируем датасет по разным ключам

In [None]:
sorted1RDD = keyValueRDD.map(lambda x: (x[1],x)).sortByKey ()
pandas.DataFrame(sorted1RDD.top(5))

In [None]:
import random
sorted4RDD = keyValueRDD.map(lambda x: (random.randint(0,1000000),x)).sortByKey (ascending=True)
pandas.DataFrame(sorted4RDD.top(5))

### join

Старый добрый джойн как в sql. Давайте пересечь два семпла датасета

In [None]:
distinctRDD = keyValueRDD.map(lambda x:tuple(x))\
                         .distinct()\
                         .map(lambda x: (x[0]+" - " + x[4], x))
        
sampledRDD1 = distinctRDD.sample (False, 0.1, 1)
sampledRDD2 = distinctRDD.sample (False, 0.1, 2)

joinedRDD = sampledRDD1.join (sampledRDD2)

# Сколько строк оказалось в обоих датасетах?
print "Размер перечесения двух выборок", joinedRDD.count()

# Сколько разных записей оказалось имеют один ключ
print "Число случаев, когда ключ оказался уникальным", 
print joinedRDD.filter(lambda x:x[1][0] == x[1][1]).count()

pandas.DataFrame(joinedRDD.map(lambda x:(x[0],x[1][0],x[1][1])).top(5))

### countByKey

In [None]:
# countByKeyTable - это уже локальный словарик
countByKeyTable = keyValueRDD.countByKey()
pandas.DataFrame(countByKeyTable.items()[:10])

<br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>

# Литература

<a href='http://spark.apache.org/docs/latest/programming-guide.html'/>
<img src='http://spark.apache.org/docs/latest/img/spark-logo-hd.png' align='left' width=70> <br>&nbsp;&nbsp;&nbsp;Spark programming guide
</a>

<table border='0' color="#ff00ff">
<tr>
<td>
<a href="http://shop.oreilly.com/product/0636920028512.do">
<img src="http://akamaicovers.oreilly.com/images/0636920028512/cat.gif" />
**Holden Karau** Learning Spark
</a>
</td>
<td>
<a href="http://it-ebooks.info/book/5970/">
<img src="http://it-ebooks.info/images/ebooks/3/advanced_analytics_with_spark.jpg" />
**Sandy Ryza** Advanced Analytics with Spark
</a>
</td>
<td>
<a href="http://it-ebooks.info/book/6809/">
<img src="http://it-ebooks.info/images/ebooks/14/spark_for_python_developers.jpg" />
**Amit Nandi Spark** for Python Developers
</a>
</td>
</tr>
</table>
