<center>
<img src="../../img/ml_theme.png">
# Дополнительное профессиональное <br> образование НИУ ВШЭ
#### Программа "Машинное обучение и майнинг данных"
<img src="../../img/faculty_logo.jpg" height="240" width="240">
## Автор материала: преподаватель Факультета Компьютерных Наук НИУ ВШЭ Кашницкий Юрий
</center>
Материал распространяется на условиях лицензии <a href="https://opensource.org/licenses/MS-RL">Ms-RL</a>. Можно использовать в любых целях, кроме коммерческих, но с обязательным упоминанием автора материала.

# Занятие 9. Машинное обучение с Apache Spark
## Часть 1. Введение в Apache Spark MLLib

## Прдеставление обучающей выборки в Apache Spark

#### Методом `parallelize()` уже пользовались

In [2]:
data = sc.parallelize([[2.11, 5.012, 1.5, 2.6],
                       [-21.11, 7.042, 2.5, 2.8]])

В MLLib есть плотные и разреженные вектора: **Vectors.dense** и **Vectors.sparse**. На практике в случае больших массивов данных используются разреженные вектора. Чтоб создать разреженный вектор, надо указать общее число элементов и перечислить индексы и значения ненулевых элементов.

In [3]:
from pyspark.mllib.linalg import Vectors
dense_vector = Vectors.dense ([2.11, 5.012, 1.5, 2.6])
sparse_vector = Vectors.sparse(10, [0,2,5,8], 
                               [2.11, 5.012, 1.5, 2.6]) # номера ненулевых признаков и их значения
print(dense_vector, sparse_vector)

(DenseVector([2.11, 5.012, 1.5, 2.6]), SparseVector(10, {0: 2.11, 2: 5.012, 5: 1.5, 8: 2.6}))


#### Для помеченных точек обучающей выборки есть специальный класс **LabeledPoint**
- LabeledPoint.features - любой из векторов выше<br>
- LabeledPoint.label - имеет тип float и может принимать значения (0.0, 1.0, 2.0 - для классификации, и любое число - для регрессии)

In [5]:
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, sparse_vector)

LabeledPoint(1.0, (10,[0,2,5,8],[2.11,5.012,1.5,2.6]))

## Загрузка данных в Spark

#### Можно сгенерировать **LabeledPoint** из примеров самому

In [8]:
iris = sc.textFile ("../../data/iris.txt")

iris.map(lambda x: [float(v) for v in x.split(",")]).map(lambda x: LabeledPoint (x[-1], x[:-1])).take(10)

[LabeledPoint(0.0, [5.1,3.5,1.4,0.2]),
 LabeledPoint(0.0, [4.9,3.0,1.4,0.2]),
 LabeledPoint(0.0, [4.7,3.2,1.3,0.2]),
 LabeledPoint(0.0, [4.6,3.1,1.5,0.2]),
 LabeledPoint(0.0, [5.0,3.6,1.4,0.2]),
 LabeledPoint(0.0, [5.4,3.9,1.7,0.4]),
 LabeledPoint(0.0, [4.6,3.4,1.4,0.3]),
 LabeledPoint(0.0, [5.0,3.4,1.5,0.2]),
 LabeledPoint(0.0, [4.4,2.9,1.4,0.2]),
 LabeledPoint(0.0, [4.9,3.1,1.5,0.1])]

#### Для данных в  формате **LibSVM** есть модуль **MLUtils**

In [10]:
from pyspark.mllib.util import MLUtils
diabetes = MLUtils.loadLibSVMFile(sc, "../../data/diabetes.libsvm")
diabetes.take(2)

[LabeledPoint(-1.0, (8,[0,1,2,3,4,5,6,7],[6.0,148.0,72.0,35.0,0.0,33.599998,0.627,50.0])),
 LabeledPoint(1.0, (8,[0,1,2,3,4,5,6,7],[1.0,85.0,66.0,29.0,0.0,26.6,0.351,31.0]))]

## Работа с признаками

- базовые статистики, распределения,..

Прчитаем еще раз данные и отделим признаки от меток целевого класса.

In [12]:
from pyspark.mllib.util import MLUtils

diabetes = MLUtils.loadLibSVMFile(sc, "../../data/diabetes.libsvm")

features = diabetes.map(lambda x:x.features)
labels = diabetes.map(lambda x:x.label)
print("Features: ", features.take(2))
print("Labels: ", labels.take(2))

('Features: ', [SparseVector(8, {0: 6.0, 1: 148.0, 2: 72.0, 3: 35.0, 4: 0.0, 5: 33.6, 6: 0.627, 7: 50.0}), SparseVector(8, {0: 1.0, 1: 85.0, 2: 66.0, 3: 29.0, 4: 0.0, 5: 26.6, 6: 0.351, 7: 31.0})])
('Labels: ', [-1.0, 1.0])


#### Есть класс **Statistics** из **pyspark.mllib.stat**, с помощью которого можно получить базовую информацию о признаках.

In [13]:
from pyspark.mllib.stat import Statistics
summary = Statistics.colStats(features)

In [14]:
summary.numNonzeros()[:6]
summary.mean()
summary.min()
summary.max()

array([  17.      ,  199.      ,  122.      ,   99.      ,  846.      ,
         67.099998,    2.42    ,   81.      ])

### Корреляции

In [15]:
Statistics.corr(features)

array([[ 1.        ,  0.12945867,  0.14128198, -0.08167177, -0.07353461,
         0.0176831 , -0.03352267,  0.54434123],
       [ 0.12945867,  1.        ,  0.15258959,  0.05732789,  0.33135711,
         0.22107107,  0.1373373 ,  0.26351432],
       [ 0.14128198,  0.15258959,  1.        ,  0.20737054,  0.08893338,
         0.28180529,  0.04126495,  0.23952795],
       [-0.08167177,  0.05732789,  0.20737054,  1.        ,  0.43678257,
         0.3925732 ,  0.18392757, -0.11397026],
       [-0.07353461,  0.33135711,  0.08893338,  0.43678257,  1.        ,
         0.19785907,  0.18507093, -0.04216295],
       [ 0.0176831 ,  0.22107107,  0.28180529,  0.3925732 ,  0.19785907,
         1.        ,  0.14064695,  0.03624187],
       [-0.03352267,  0.1373373 ,  0.04126495,  0.18392757,  0.18507093,
         0.14064695,  1.        ,  0.03356131],
       [ 0.54434123,  0.26351432,  0.23952795, -0.11397026, -0.04216295,
         0.03624187,  0.03356131,  1.        ]])

## Семплирование
#### Не входит в MLlib, но это тоже можно делать: как случайные семплы, так и со стратификацией (для устранения перекоса в обучающей выборке)

#### Обычное сэмплирование. Аналог train_test_split из scikit-learn

In [16]:
(train, test) = diabetes.randomSplit([0.8, 0.2], 1344)

<h4>Стратифицированные семплы</h4>

- когда есть перекос в данных и нужно этот перекос исправить - поэтому это работает только с парами <ключ, значение><br>

fractions = {"a": 0.5, "b": 0.2}<br>
RDD.sampleByKey(withReplacement, fractions, seed)

In [17]:
# Выделяем целевую функцию в качестве ключа, считаем частоты
diabetes.map(lambda x: (x.label, x)).countByKey()

(diabetes
    .map (lambda x: (x.label, x))
    .sampleByKey(False, {-1:0.25, 1:0.5}, 41)
    .countByKey())

defaultdict(<type 'int'>, {1.0: 252, -1.0: 77})

<h3>Распределения</h3>
Можно генерировать случайные распределения: <br>
**from pyspark.mllib.random import RandomRDDs** <br><br>
- uniformRDD (sc, size, numPartitions, seed) <br>
- normalRDD (sc, size, numPartitions, seed) <br>
- poissonRDD (sc, size, numPartitions, seed) <br>

<h3>Масштабирование признаков</h3> 

Особенно важно в алгоритмах, в которых используется градиентный спуск (лог-регрессия, SVM, бустинг). Класс **StandardScaler** работает только с плотными векторами (!)

In [18]:
from pyspark.mllib.feature import StandardScaler

scaler1 = StandardScaler(withMean=True, withStd=True).fit(features)

scaler1.transform (features.map(lambda x:x.toArray())).take(10)

[DenseVector([0.6395, 0.8478, 0.1495, 0.9067, -0.6924, 0.2039, 0.4682, 1.4251]),
 DenseVector([-0.8443, -1.1227, -0.1604, 0.5306, -0.6924, -0.684, -0.3648, -0.1905]),
 DenseVector([1.2331, 1.9425, -0.2638, -1.2874, -0.6924, -1.1025, 0.604, -0.1055]),
 DenseVector([-0.8443, -0.9976, -0.1604, 0.1544, 0.1232, -0.4937, -0.9202, -1.0409]),
 DenseVector([-1.1411, 0.5037, -1.5037, 0.9067, 0.7653, 1.4088, 5.4813, -0.0205]),
 DenseVector([0.3428, -0.1531, 0.2529, -1.2874, -0.6924, -0.8108, -0.8175, -0.2756]),
 DenseVector([-0.2508, -1.3416, -0.9871, 0.7186, 0.0712, -0.1259, -0.6757, -0.6157]),
 DenseVector([1.8266, -0.1844, -3.5703, -1.2874, -0.6924, 0.4195, -1.0198, -0.3606]),
 DenseVector([-0.5476, 2.3803, 0.0462, 1.5336, 4.0193, -0.1893, -0.9473, 1.6802]),
 DenseVector([1.2331, 0.1284, 1.3895, -1.2874, -0.6924, -4.0578, -0.724, 1.7652])]

<h1>Классификация и регрессия</h1>

In [19]:
from pyspark.mllib.util import MLUtils
diabetes = MLUtils.loadLibSVMFile(sc, "../../data/diabetes.libsvm")

# делим выборку на обучающую и тестовую
(train, test) = diabetes.randomSplit([0.8, 0.2], seed=1)

In [20]:
train.take(5)

[LabeledPoint(1.0, (8,[0,1,2,3,4,5,6,7],[1.0,85.0,66.0,29.0,0.0,26.6,0.351,31.0])),
 LabeledPoint(-1.0, (8,[0,1,2,3,4,5,6,7],[8.0,183.0,64.0,0.0,0.0,23.299999,0.672,32.0])),
 LabeledPoint(1.0, (8,[0,1,2,3,4,5,6,7],[1.0,89.0,66.0,23.0,94.0,28.1,0.167,21.0])),
 LabeledPoint(-1.0, (8,[0,1,2,3,4,5,6,7],[0.0,137.0,40.0,35.0,168.0,43.099998,2.288,33.0])),
 LabeledPoint(1.0, (8,[0,1,2,3,4,5,6,7],[5.0,116.0,74.0,0.0,0.0,25.6,0.201,30.0]))]

<h3>Линейные классификаторы</h3>

В линейных классификаторах решается по сути задача оптимизации:<br>
<br>
$$f(w) = \lambda R(w) + \frac{1}{n} \sum_{i=1}^n L(w;x_i,y_i) $$<br>
<br>
$L(w;x,y)$ - функция потерь<br>
$R(w)$ - функция регуляризации<br><br>
- Для SVM: $L(w;x,y) = \max\{0,1-yw^Tx\}, y \in \{-1, +1\}$<br><br>
- Для логистической регрессии: $L(w;x,y) = log(1+\exp(-yw^Tx)), y \in \{-1,+1\}$<br><br>


В Mllib все делается методом стохастического градиентного спуска SGD:<br>
 - pyspark.mllib.classification.SVMWithSGD<br>
 - pyspark.mllib.classification.LogisticRegressionWithSGD<br>
<br><br>
**classifier.train(trainDataset)** - обучает модель, среди параметров, как и в scikit-learn:<br>
- data – обучающая выборка RDD of LabeledPoint.<br>
- iterations – число итераций (по умолчанию: 100).<br>
- step – шаг SGD (по умолчанию: 1.0).<br>
- miniBatchFraction – доля данных которая используется на каждой SGD итерации<br>
- initialWeights – начальные веса (по умолчанию None)<br>
- regParam – параметр регуляризации (по умолчанию: 0.01)<br>
- regType – тип регуляризатора (по умолчанию "l2")

**Градиентный спуск**

При стандартном (или «пакетном», «batch») градиентном спуске для корректировки параметров модели используется градиент. Градиент обычно считается как сумма градиентов, вызванных каждым элементом обучения. Вектор параметров изменяется в направлении антиградиента с заданным шагом. Поэтому стандартному градиентному спуску требуется один проход по обучающим данным до того, как он сможет менять параметры.

<img src='../../img/grad_descent.png'>

**Стохастический градиентный спуск**

При стохастическом (или «оперативном») градиентном спуске значение градиента аппроксимируются градиентом функции потерь, вычисленном только на одном элементе обучения. 
То есть а каждом шаге случайно выбирается $x_i$, и "шаг" делается в сторону уменьшения функции по переменной $x_i$. 
Затем параметры изменяются пропорционально приближенному градиенту. Таким образом, параметры модели изменяются после каждого объекта обучения. Для больших массивов данных стохастический градиентный спуск может дать значительное преимущество в скорости по сравнению со стандартным градиентным спуском.
Между этими двумя видами градиентного спуска существует компромисс, называемый иногда «mini-batch». В этом случае градиент аппроксимируется суммой для небольшого количества обучающих примеров.
Стохастический градиентный спуск является одной из форм стохастического приближения. Теория стохастических приближений даёт условия сходимости метода стохастического градиентного спуска.

<img src='../../img/stochastic.png'>

In [21]:
import numpy as np
import pyspark.mllib.classification as cl

def acc_score(actual, predicted):
    actual, predicted = np.array(actual), np.array(predicted)
    return np.sum(actual == predicted) / float(actual.shape[0])

def change_label(lp):
    """
    Substitute label -1 to 0 for Logit.
    """
    label, features = lp.label, lp.features
    if label == -1:
        label = 0 
    return LabeledPoint(label, features)

diabet_train, diabet_test = diabetes.map(lambda lp: change_label(lp)).randomSplit([0.8, 0.2], seed=13)
diabet_test_labels = diabet_test.map(lambda lp: lp.label).collect()
model = cl.LogisticRegressionWithSGD.train(diabet_train, iterations=500, regType="l1")
diabet_test_predicted = diabet_test.map(lambda x: model.predict(x.features)).collect()
print("Accuracy: ", acc_score(diabet_test_labels, diabet_test_predicted))

('Accuracy: ', 0.62666666666666671)


<h3>Линейная регрессия</h3>

Тут, фактически, решается та же задача оптимизации, только с другой функцией потерь:<br>
<br>
$$\Large f(w) = \lambda R(w) + \frac{1}{n} \sum_{i=1}^n L(w;x_i,y_i) $$<br>

$$\Large L(w;x,y) = \frac{1}{2} (w^Tx-y)^2, y \in \mathbb{R}$$<br>

- Если $R(w)=0$, то это **линейная регрессия** - LinearRegressionWithSGD
- Если $R(w)=||w||_2$ - **лассо-регерессия** -  LassoWithSGD
- Если $R(w)=\frac{1}{2}||w||_2^2$ - **Ridge-регрессия** - RidgeRegressionWithSGD

Параметры такие же, как и у линейных классификаторов (см. выше).

In [22]:
import pyspark.mllib.regression as reg
model = reg.RidgeRegressionWithSGD.train(train)

<h3>Наивный Байес</h3>

Наивный Баейс принимает на вход всего **2** параметра: **обучающую выборку** и **параметр сглаживания**<br>

In [23]:
from pyspark.mllib.classification import NaiveBayes

model = NaiveBayes.train(diabet_train)
diabet_test_predicted = diabet_test.map(lambda x: model.predict(x.features)).collect()
print("Accuracy: ", acc_score(diabet_test_labels, diabet_test_predicted))

('Accuracy: ', 0.56666666666666665)


## Дерево решений

В Spark это **DecisionTree.trainClassifier** и **DecisionTree.trainRegressor**

- **data** – обучающая выборка, RDD
- **numClasses** – число классов (только для классификатора)
- **categoricalFeaturesInfo** – указание категорий для категориальных признаков
- **impurity** – критерий качества разбиения. Значения:
 - “gini” (для классификации, рекомендуется)
 - “entropy” (для классификации)
 - "variance" (для регрессии)
- **maxDepth** – макс глубина дерева
- ...

In [64]:
from pyspark.mllib.tree import DecisionTree

model = DecisionTree.trainClassifier(diabet_train, numClasses=2, 
                                     categoricalFeaturesInfo={}, 
                                     impurity='entropy', 
                                     maxDepth=5)
diabet_test_predicted = model.predict(diabet_test.map(lambda lp: lp.features))
print("Accuracy: ", acc_score(diabet_test_labels, diabet_test_predicted.collect()))

('Accuracy: ', 0.66000000000000003)


## Случайный лес

- **trainClassifier** для обучения классификатора 
- **trainRegression** для обучения функции регрессии

Параметры:

- **data** – обучающая выборка, RDD LabeledPoint. Классов может быть несколько
- **numClasses** – число классов для классификации
- **categoricalFeaturesInfo** – информация по категориальным признакам. Запись (n -> k) показывает, что признак n - категориальный и имеет k категорий, начинащихся с 0: $\{0, 1, ..., k-1\}$.
- **numTrees** – число деревьев в лесу
- **featureSubsetStrategy** - число признаков, которые рассматриваются для разбиения в каждом узле. Возможные варианты:
 - **“auto”** (по умолчанию, если одно дерево - выбираются все признаки, иначе - берется корень из числа признаков)
 - **“all”**
 - **“sqrt”**
 - **“log2”**
 - **“onethird”**
 
- **impurity** – критерий качества разбиения. Значения:
 - “gini” (для классификации, рекомендуется)
 - “entropy” (для классификации)
 - "variance" (для регрессии)
- **maxDepth** – максимальная глубина дерева
- **maxBins** – максимальное число корзин, используемое для разделения признаков (по умолчанию 100)
- **seed** – для воспроизводимой случайности bootstrap и выбора подмножества признаков

In [65]:
from pyspark.mllib.tree import RandomForest
model = RandomForest.trainClassifier(diabet_train, numClasses=2, 
                                     categoricalFeaturesInfo={}, numTrees = 100, 
                                     featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=20)

In [66]:
diabet_test_predicted = model.predict(diabet_test.map(lambda lp: lp.features))
print("Accuracy: ", acc_score(diabet_test_labels, diabet_test_predicted.collect()))

('Accuracy: ', 0.69333333333333336)


## Кластеризация KMeans

Обучение модели - KMeans.train с параметрами
- **dataset** - датасет векторов
- **k** - число кластеров
- **maxIterations** - число итераций сходимости
- **initializationMode** - дотупные варианты "k-means||" (по умолчанию), "random"
- **runs** - (по умолчанию 1)

In [67]:
from pyspark.mllib.clustering import KMeans

iris = sc.textFile ("../../data/iris.txt")

iris_features ,iris_labels = iris.map(lambda x: Vectors.dense([float(v) for v in x.split(",")[:-1]])), \
                            iris.map(lambda x: int(x.split(",")[-1])).collect()

# Build the model (cluster the data)
clusters = KMeans.train(iris_features, 3, maxIterations = 100,
        runs=10, initializationMode="random")

cluster_labels = iris_features.map(lambda vec: clusters.predict(vec)).collect()