## Best practices and recommendations

### Регрессия. Использование классификационных моделей для регрессии.
    * взвешенный KNN
![](https://i.stack.imgur.com/gAILq.png)
    * Decision Tree
![](http://ogrisel.github.io/scikit-learn.org/sklearn-tutorial/_images/plot_tree_regression_1.png)
    * Boosting
![](http://scikit-learn.org/stable/_images/sphx_glr_plot_adaboost_regression_001.png)

### Best practices
1. Будет сложнее чем кажется.
2. Обещай меньше, делай больше.
3. Принцип Парето - 80% результата достигается за 20% времени. Имейте разумные ожидания и метрики когда нужно остановиться.
4. Знай свои данные
    * Что такое нули в данных?
    * Что означают отсутствующие данные?
    * Что означают статистические выбросы? Игнорировать или фокусироваться?
    * Откуда пришли данные для обучения, кто присваивал классы?
    * Имеет ли значение порядок строк в датасете?
5. Документировать всё!

![](http://scikit-learn.org/stable/_static/ml_map.png)

### Алгоритм построения модели

0. Выберите метрику для своей задачи!
1. Разделите данные на обучающую выборку и тестовую. 
2. Используя обучающую выборку - найдите оптимальные параметры с помощью кроссвалидации.
3. Используя оптимальные параметры - обучите модель на всей обучающей выборке.
4. Проверьте полученную модель используя выбранную метрику на тестовых данных - это итоговый результат.

### Нормализация
1. Будьте осторожны - не допускайте "протечек" информации из тестовой выборки.
2. Думайте о том, когда она полезна.
3. Нормализация после PCA может быть вредной. Нормализация ДО - необходима.
4. Экспериментируйте :)

### Несбалансированные данные

1. Subsample / oversample
2. Использование весов.
3. Stratified CV

![](http://scikit-learn.org/stable/_images/sphx_glr_plot_separating_hyperplane_unbalanced_001.png)

### Отсутствующие данные

1. Выкинуть? - может привести к слишком маленькому датасету
2. Заполнить средним? - не зависит от других значений в записи
3. Использовать регрессию для заполнения? - Затратно, не всегда возможно

### Рекоммендационные системы

Источники:
1. ["Collaborative Filtering Recommender Systems"](http://files.grouplens.org/papers/FnT%20CF%20Recsys%20Survey.pdf)
2. ["Collaborative Filtering" - Stanford slides](http://web.stanford.edu/~lmackey/papers/cf_slides-pml09.pdf)

Два с половиной подхода:
1. [Коллаборативная фильтрация](https://ru.wikipedia.org/wiki/%D0%9A%D0%BE%D0%BB%D0%BB%D0%B0%D0%B1%D0%BE%D1%80%D0%B0%D1%82%D0%B8%D0%B2%D0%BD%D0%B0%D1%8F_%D1%84%D0%B8%D0%BB%D1%8C%D1%82%D1%80%D0%B0%D1%86%D0%B8%D1%8F). Основная идея - людям, похожим на меня, нравятся вещи как мне. Люди, которым нравятся вещи как мне - похожи на меня. Абсолютно не зависит от данных пользователя. Работает, как по пользователям, так и по рекомендованным элементам. Требует некоторого времени/данных для настройки.
2. [Фильтрация на основании данных контента](http://recommender-systems.org/content-based-filtering/). Каждый элемент описывается параметрами. На основании параметров вычисляются "расстояния" между элементами. Рекомендуются элементы близкие к понравившимся пользователю (и далекие от непонравившихся). Работает даже для 1й записи.
3. В качестве простейшей рекоммендационной системы можно расценивать оценку как отсутствующие данные и действовать так же как в общем случае. 
    * Заполнить средним
    * Свести к регрессии. Чем хорош KNN? Что такое расстояние в данном контексте? Коэффициент Пирсона, [cosine similarity](https://ru.wikipedia.org/wiki/%D0%9A%D0%BE%D1%8D%D1%84%D1%84%D0%B8%D1%86%D0%B8%D0%B5%D0%BD%D1%82_%D0%9E%D1%82%D0%B8%D0%B0%D0%B8)
    * [SVD](https://en.wikipedia.org/wiki/Singular_value_decomposition)
    


# Big Data

## [MapReduce](https://en.wikipedia.org/wiki/MapReduce)

Что такое MapReduce?

* Модель программирования для больших наборов данных.
* Параллельный распределенный алгоритм
* Фреймворк для кластера
* Способ мышления

Изначально разработан гуглом.  
[Hadoop](https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html) - Популярная имплементация MapReduce на Java  
MrJob - Python интерфейс

![](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTwjmNVU_9E_2AN4zC_pB7M8uT38Tuf71XlZRAl7gv7Oemzo928)

2 главных шага.

1. MAP - фильтрация и сортировка
2. REDUCE - аггрегация результатов




In [12]:
%%file wordcounter1.py
from mrjob.job import MRJob


class mrWordCount(MRJob):
    def mapper(self, key, line):
        for word in line.split(' '):
            yield word.lower(), 1

    def reducer(self, word, occurences):
        yield word, sum(occurences)


if __name__ == '__main__':
    mrWordCount.run()

Overwriting wordcounter1.py


In [13]:
!/home/modintsov/.virtualenvs/ds2017/bin/python wordcounter1.py < loremipsum.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/wordcounter1.modintsov.20170228.085828.322637
Running step 1 of 1...
reading from STDIN
Streaming final output from /tmp/wordcounter1.modintsov.20170228.085828.322637/output...
""	4
"a"	3
"ac"	4
"adipiscing"	1
"aenean"	1
"aliquam"	2
"aliquam."	1
"aliquet"	2
"amet"	2
"amet,"	1
"at"	6
"auctor"	4
"auctor,"	1
"bibendum"	3
"bibendum,"	1
"blandit"	1
"condimentum"	1
"congue"	4
"consectetur"	3
"consequat"	1
"convallis"	1
"cras"	2
"curabitur"	2
"cursus"	2
"dapibus"	1
"diam"	1
"diam."	1
"dictum"	1
"dictumst."	1
"dignissim"	5
"dolor"	2
"dolor."	1
"donec"	4
"dui"	1
"duis"	3
"efficitur"	3
"egestas"	1
"eget"	6
"eget,"	2
"eget."	2
"eleifend."	1
"elementum"	1
"elementum."	1
"elit"	1
"elit,"	1
"elit."	1
"enim"	2
"enim."	3
"erat"	3
"erat."	1
"eros"	1
"eros,"	1
"est"	2
"est,"	1
"est."	1
"et"	4
"et,"	1
"et."	1
"etiam"	1
"eu"	3
"eu,"	1
"euismod"	2
"ex"	2
"ex."	2
"facilisis"	1
"faucibus"	3
"felis"	2
"felis."	1
"fermentum"	2
"f

![](http://1.bp.blogspot.com/-Nm8n33ZWE5o/ToCbSHYVwfI/AAAAAAAAQ2Q/9_I_1l0QpW8/s1600/education-trends.PNG)

In [14]:
%%file anagram.py
from mrjob.job import MRJob

class MRAnagram(MRJob):

    def mapper(self, _, line):
        # Convert word into a list of characters, sort them, and convert
        # back to a string.
        letters = list(line)
        letters.sort()

        # Key is the sorted word, value is the regular word.
        yield letters, line

    def reducer(self, _, words):
        # Get the list of words containing these letters.
        anagrams = [w for w in words]

        # Only yield results if there are at least two words which are
        # anagrams of each other.
        if len(anagrams) > 1:
            yield len(anagrams), anagrams


if __name__ == "__main__":
    MRAnagram.run()

Writing anagram.py


In [18]:
!/home/modintsov/.virtualenvs/ds2017/bin/python anagram.py < words.txt > anagrams.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/anagram.modintsov.20170228.093500.829051
Running step 1 of 1...
reading from STDIN
Streaming final output from /tmp/anagram.modintsov.20170228.093500.829051/output...
Removing temp directory /tmp/anagram.modintsov.20170228.093500.829051...


In [19]:
!tail anagrams.txt

2	["st", "ts"]
2	["usw", "wus"]
2	["su", "us"]
2	["sv", "vs"]
2	["sw", "ws"]
2	["tty", "tyt"]
2	["tu", "ut"]
2	["tv", "vt"]
2	["ux", "xu"]
2	["", ""]


### Важность локальной аггрегации.

Идеальные характеристики масштабирования:

* В два раза больше данных - в два раза дольше вычисления
* В два раза больше ресурсов - в два раза короче вычисления

Почему это недостижимый идеал?

* Синхронизация требует коммуникации
* Коммуникация убивает производительность

Значит надо найти способ избежать коммуникаций!

* Уменьшаем количество промежуточных данных локальной аггрегацией.
* 2 варианта: combiners, in-mapper combining

### [Combiner](https://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm)

![](https://image.slidesharecdn.com/hadoopmr-150613152227-lva1-app6891/95/xml-parsing-with-map-reduce-35-638.jpg?cb=1434209024)

* Мини-reduce
* Получает данные только из своего маппера
* Значительно уменьшает сетевой траффик
* Не имеет доступа к другим мапперам
* Не гарантирует что имеет все данные по ключу
* Не гарантирует что будет запущен вовсе!
* Может быть запущен несколько раз!
* Формат результата должен соответствовать формату результата маппера.

In [16]:
%%file wordcounter2.py
from mrjob.job import MRJob


class mrWordCount(MRJob):
    def mapper(self, key, line):
        for word in line.split(' '):
            yield word.lower(), 1
            
    def combiner(self, word, occurences):
        yield word, sum(occurences)

    def reducer(self, word, occurences):
        yield word, sum(occurences)


if __name__ == '__main__':
    mrWordCount.run()

Writing wordcounter2.py


In [17]:
!/home/modintsov/.virtualenvs/ds2017/bin/python wordcounter2.py < loremipsum.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/wordcounter2.modintsov.20170228.093333.815104
Running step 1 of 1...
reading from STDIN
Streaming final output from /tmp/wordcounter2.modintsov.20170228.093333.815104/output...
""	4
"a"	3
"ac"	4
"adipiscing"	1
"aenean"	1
"aliquam"	2
"aliquam."	1
"aliquet"	2
"amet"	2
"amet,"	1
"at"	6
"auctor"	4
"auctor,"	1
"bibendum"	3
"bibendum,"	1
"blandit"	1
"condimentum"	1
"congue"	4
"consectetur"	3
"consequat"	1
"convallis"	1
"cras"	2
"curabitur"	2
"cursus"	2
"dapibus"	1
"diam"	1
"diam."	1
"dictum"	1
"dictumst."	1
"dignissim"	5
"dolor"	2
"dolor."	1
"donec"	4
"dui"	1
"duis"	3
"efficitur"	3
"egestas"	1
"eget"	6
"eget,"	2
"eget."	2
"eleifend."	1
"elementum"	1
"elementum."	1
"elit"	1
"elit,"	1
"elit."	1
"enim"	2
"enim."	3
"erat"	3
"erat."	1
"eros"	1
"eros,"	1
"est"	2
"est,"	1
"est."	1
"et"	4
"et,"	1
"et."	1
"etiam"	1
"eu"	3
"eu,"	1
"euismod"	2
"ex"	2
"ex."	2
"facilisis"	1
"faucibus"	3
"felis"	2
"felis."	1
"fermentum"	2
"f

* Комбинатор должен иметь такой же формат вывода как маппер, но такую же сигнатуру (входящие параметры) как reducer. Иногда код для reducer совпадает с кодом комбинатора. Часто нет - проверяйте!
* Помните: комбинаторы это опциональная оптимизация. Наличие/отсутствие их не должно влиять на корректность алгоритма. Могут быть запущены 0, 1, X раз.

### [In-Mapper Combining](https://dzone.com/articles/designing-mapreduce-algorithms)

Комбинатор может быть пропущен, маппер всегда гарантировано запускается.  
Так давайте запихнем комбинатор в маппер!  
Используем stateful код для этого.  

Может запутать все еще сильнее :(

Преимущества:

* Скорость
* Гарантия исполнения

Недостатки:

* Требуется вручную работать с памятью
* Очень легко допустить ошибку
* "Закат солнца вручную"(с)


In [22]:
%%file wordcounter3.py
from mrjob.job import MRJob
from collections import defaultdict

class mrWordCount(MRJob):
    def __init__(self, *args, **kwargs):
        super(mrWordCount, self).__init__(*args, **kwargs)
        self.localWordCount = defaultdict(int)
    
    def mapper(self, key, line):
        if False:
            yield
        for word in line.split(' '):
            self.localWordCount[word.lower()] += 1
            
    def mapper_final(self):
        for (word, count) in self.localWordCount.iteritems():
            yield word, count

    def reducer(self, word, occurences):
        yield word, sum(occurences)


if __name__ == '__main__':
    mrWordCount.run()

Overwriting wordcounter3.py


In [23]:
!/home/modintsov/.virtualenvs/ds2017/bin/python wordcounter3.py < loremipsum.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/wordcounter3.modintsov.20170228.095435.454158
Running step 1 of 1...
reading from STDIN
Streaming final output from /tmp/wordcounter3.modintsov.20170228.095435.454158/output...
""	4
"a"	3
"ac"	4
"adipiscing"	1
"aenean"	1
"aliquam"	2
"aliquam."	1
"aliquet"	2
"amet"	2
"amet,"	1
"at"	6
"auctor"	4
"auctor,"	1
"bibendum"	3
"bibendum,"	1
"blandit"	1
"condimentum"	1
"congue"	4
"consectetur"	3
"consequat"	1
"convallis"	1
"cras"	2
"curabitur"	2
"cursus"	2
"dapibus"	1
"diam"	1
"diam."	1
"dictum"	1
"dictumst."	1
"dignissim"	5
"dolor"	2
"dolor."	1
"donec"	4
"dui"	1
"duis"	3
"efficitur"	3
"egestas"	1
"eget"	6
"eget,"	2
"eget."	2
"eleifend."	1
"elementum"	1
"elementum."	1
"elit"	1
"elit,"	1
"elit."	1
"enim"	2
"enim."	3
"erat"	3
"erat."	1
"eros"	1
"eros,"	1
"est"	2
"est,"	1
"est."	1
"et"	4
"et,"	1
"et."	1
"etiam"	1
"eu"	3
"eu,"	1
"euismod"	2
"ex"	2
"ex.

Что лучше выбрать?

* Для больших словарей?
* Для словарей с неравномерным распределением?

Многие алгоритмы ML воспроизведены на архитектуре MapReduce.
* [SVM](http://www.sonaliagarwal.com/anu.pdf)
* [Random Forest](http://www.vision.cs.chubu.ac.jp/MPRG/C_group/C075_wakayama2015.pdf)  
...

## [Spark](https://en.wikipedia.org/wiki/Apache_Spark)

Зачем создавать сабсеты шаг за шагом по мере надобности? Мы можем создать их все, записать на диск и отдать параллельным процессам на обработку.  
Если мы работаем с одним куском данных - зачем копировать на диск - это медленно. Копируем кусок памяти и отдадим его.  
Если мы никогда ничего не пишем в данные - зачем копровать ВООБЩЕ? Отдадим многим процессам один общий кусок данных для чтения.  

Некоторые элементы Data Science ["Embarrasingly parallel"](https://en.wikipedia.org/wiki/Embarrassingly_parallel).

1. Cross Validation
2. Grid Search
3. Bagging/Random Forests
...

Если вы работаете только на одной машине - [multiprocessing](https://docs.python.org/2/library/multiprocessing.html) ваш друг.

[Мануал](https://mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python/) по работе с multiprocessing.

1. Мастер-процесс подготавливает данные
2. Постулируем, что эти данные будут исключительно read-only
3. Создадим пул рабочих подпроцессов, а в каждом из них - пул подпроцессов для grid search.
4. Каждый рабочий роцесс вычисляет validation score
5. Один "reducer" вычисляет средние значения по gridsearch процессам, другой - выбирает максимум из средних.
6. Отдаем параметры "наверх" в мастер процесс.

Hadoop, как и многие реализации MapReduce сохраняет результат каждой промежуточной операции на диск. Следующие шаги загружают с диска.
Почему? Операции с диском затратные!


Checkpoints! Шаг закончился успешно - сохрани. Не закончился - вернись к предыдущему этапу и повтори на новом процессе.  
Ошибки параллельного программирования - ад дебага.  
Машины в облаке умирают.  
Не всегда фиксить ошибки - лучшее решение. Даже гугл так делает.

Разработчики sklearn тоже это знают и понимают. [joblib](https://pypi.python.org/pypi/joblib) используется во многих внутренних процессах при n_jobs>1.

Что если ваши данные БОЛЬШИЕ?   
Запихнем их в кластер и используем MapReduce.  
Коммуникации медленные, работа с диском медленная :(

MapReduce - идея [функционального программирования](https://ru.wikipedia.org/wiki/%D0%A4%D1%83%D0%BD%D0%BA%D1%86%D0%B8%D0%BE%D0%BD%D0%B0%D0%BB%D1%8C%D0%BD%D0%BE%D0%B5_%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5).  
Никаких [побочных эффектов](https://ru.wikipedia.org/wiki/%D0%9F%D0%BE%D0%B1%D0%BE%D1%87%D0%BD%D1%8B%D0%B9_%D1%8D%D1%84%D1%84%D0%B5%D0%BA%D1%82_(%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5)), никакого состояния.  
Функции соединены потоком данных.  
Гарантия "восстанавливаемого" состояния в любой момент.

![](https://www.mapr.com/sites/default/files/blogimages/Spark-core-stack-DB.jpg)

![](http://spark.apache.org/docs/latest/img/cluster-overview.png)

![](https://image.slidesharecdn.com/numbaspark-160406151628/95/gpu-computing-with-apache-spark-and-python-30-638.jpg?cb=1459955819)

### Сравнение с Hadoop (почему Spark лучше)

1. Может работать в памяти
2. Данные кешируются (диск/память) для дальнейшего исполбзования
3. **Скорость** - сортировка 100TB занимает 23 минуты на 206 машинах в Spark, 72 минуты на 2100 машинах в Hadoop (эксперимент 2013 года).
4. [RDD](https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm) (resilient distrivuted dataset) - абстракция поверх источника данных, оптимизированная для параллельных вычислений. API позволяет легко запрашивать данные с каждого этапа конвеера вычислений при этом избегая избыточных вычислений и затрат на репликацию.
5. Python/Java/Scala
6. Легче чем Hadoop во многих аспектах

Spark работает исполняя программу-драйвер, которая выполняет параллельные операции на RDD в кластере (или на локальной машине).  
Типы операций :
1. Transformations - создают новый RDD из существующего (помним, read-only!). Аналог map.
2. Actions - возвращают данные в драйвер. Аналог reduce.

[Короткие примеры](http://spark.apache.org/examples.html)

```
text_file = sc.textFile("hdfs://...")
# OR sc.parallelize(['list', 'of', 'values'])
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
```

### Дополнительный функционал

1. Прямая работа с dataframes, файлами, базами данных...
2. Работа с RDD через SQL
3. Потоковая обработка и онлайн-алгоритмы
4. Библиотека MLLIB
5. Операции на графах (GraphX)
6. Коннекторы к другим облачным приложениям (YARN, MESOS)

### [Инструкция по установке на Linux](https://www.dataquest.io/blog/pyspark-installation-guide/)

У меня не завелось в юпитере :( скорее всего конфликт версий.

### Все в консоль! {spark_install_dir}/bin/pyspark

Примеры кода для тестов 

```
sc

sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)

# Laaaaazy...until 
wordsRDD.collect()

№ Cache on!
wordsRDD.cache()

def makePlural(word):
    return word + 's'
print makePlural('cat')

pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.first()
print pluralRDD.take(2)

pluralRDD.collect()

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y)
                       .collect())
print wordCountsCollected

print (wordsRDD
    .map(lambda w: (w, 1))
    .reduceByKey(lambda x,y: x+y)).toDebugString()
    
stopwords=[e.strip() for e in open("/home/modintsov/workspace/datascienceua_2017/lesson_12/english.stop.txt").readlines()]

juliusrdd=sc.textFile("/home/modintsov/workspace/datascienceua_2017/lesson_12/caesar.txt")
```

```
juliusrdd.flatMap(lambda line: line.split()).count()

juliusrdd.flatMap(lambda line: line.split()).map(lambda word: word.strip().lower()).take(20)

juliusrdd.flatMap(lambda line: line.split())\
    .map(lambda word: word.strip().lower())\
    .filter(lambda word: word not in stopwords).take(20)
    
juliusrdd.flatMap(lambda line: line.split())\
    .map(lambda word: word.strip().lower())\
    .filter(lambda word: word not in stopwords)\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b).take(20)
    
juliusrdd.flatMap(lambda line: line.split())\
    .map(lambda word: word.strip().lower())\
    .filter(lambda word: word not in stopwords)\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .takeOrdered(20, lambda x: -x[1])
    
captions, counts=zip(*juliusrdd.flatMap(lambda line: line.split())\
    .map(lambda word: word.strip().lower())\
    .filter(lambda word: word not in stopwords)\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .takeOrdered(20, lambda x: -x[1]))

pos = np.arange(len(counts))
plt.bar(pos, counts)
plt.xticks(pos+0.4, captions, rotation=90)
plt.show()
```