# Sparkathon 

## Real time machine learning

### 14.02.2018

### Test środowiska

In [None]:
sc

In [None]:
spark

### Agenda

1. Trochę teorii o przetwarzaniu strumieni w Sparku
2. Dwa różnych API
3. Estymatory dostępne w Sparku w trybie real time
4. Przykład pracującego w trybie Real Time Estymatora
6. Serving vs training


## SPARK (STRUCTURED) STREAMING
### Trochę teorii o przetwarzaniu strumieni w Sparku

##### Batch vs Streaming

Przy przekształcaniu batch'owym dla nas dostępny "komplet" danych do obróbki.

Głowna idea w przekształceniu strumieniowym w tym żeby traktować dane jak tabełę do którę są dodawane ciągle są dodawane nowe linii.


Jakie są różnicy pomiędzy tymi trybami przekształeń danych:

**Stream**

1. Robi przekształcenie albo na jednym elemncie albo na kilku elementach (na oknie) który sytem dostał
2. Przekształcenia zazwyczaj proste (ze wzgłędu na potrzebę szybkiej odpowiedzi)
3. Obliczenia zazyczaj niezależne (w kontekscie okna lub rekordu)
4. Asynchroniczne zazwyczaj źródło danych nie interesuje się u nas czy otrzymaliśmy dane :)

**Batch**

1. Ma dostęp do wszystkich danych
2. Może sobie pozwolić na przekształcenia o dowolnym skomplikowaniu, Bo:
3. Job może trwać minuty (i godziny ;)


### Dwa różnych API dla Streaming'u i Dwa różnych API dla Machine Learning'u

Spark ma bogatą historię wsparcia przetwarzania strumieniowego

W 2012 do Spark'u dołączył projekt SPark Streaming ze Swoim **DStreams** API.
To było jedno z pierwszych API (przynajmniej tak wyszło z mojego researchu :) którę dawało developerom możliwość 
użycia funkcji wysokopoziomowych, jakich jak _map_ i _reduce_.

Dotychczas jest Spark Streaming jest używany w wielu organizacji (patrząc na ilość pytań na StackOverflow).

Jednak _DStreams_ jak i RDD opierają się na obiektach Java/Python co w praktyce zawęża pole możliwych optymalizacji).


W 2016 więc powstał project Structured Streaming który używa DataFrame'y i Datasety. Structured Streaming API jest oznaczone jako stabilne w wersji Spark'a 2.2.0 więc jest gotowy do komercyjnego użycia.


## SPARK MACHINE LEARNING (ML/MLlib)
### Klasa ***Pipeline*** i jej działanie

_Pipeline_ używa *Dataframe*'a jako źrodła danych. Dataframe może mieć różne dane: tekst, wertory, labeli wektorów wejściowych, predykcję, etc.

_Pipeline_ to jest klasa którą reprezentuję sekwencję przekstałceń danych albo _Transformer_ albo _Estimator_.



Pipeline ma metodę _fit()_ którą dopasowywuje ten Pipeline do danych. Po wykonaniu Metody _fit()_ Pipeline się dopasowuje do danych i wyniku otrzymujemy _PipelineModel_

Kiedy metoda _fit()_ jest wołana:
    * Dla każdego _Transformer'a_ w pipelinie wykonuje się metoda transform na tym Dataframe.
    * Dla _Estimator'ów_ wykonuje się metoda _fit()_ która przekształca ten Estymator w Transformer. Jeżeli Pipeline ma więcej niż jeden Estymator, to na nie ostatnich też jest wykonywana _transform()_
    
    
#### Przykład Pipeline
przed wywołaniem metody _fit()_:

![przed wykonaniem fit()](https://spark.apache.org/docs/latest/img/ml-Pipeline.png)

1. Tokenizer — dzieli tekst na słowe
2. HashingTF — konwertuje słowa na wektory
3. Logistic regression — klasyfikuje wektor wejściowy


po wykonaniu metody _fit()_:
![po wykonaniu fit()](https://spark.apache.org/docs/latest/img/ml-PipelineModel.png)


#### Przykładowy Pipeline

### Klasy i pojęcia używane w Machine Learning'u w Spark'u

* Estimator
    * Predictor (coś co daję predykcję, czyli zwraca klasę lub prawdopodobieństwo dla wertora wejściowego)
        * Classifier
            * Probabilistic
            * OneVsRest
            * ...
        * Regressor
            * LinearRegressor
            * GeneralizedLR
        * Random Forest/Tree
    * **ALS model** (omówiliśmy na poprzednim meetupie, materiały dostępne są [tu](https://github.com/addmeaning/sparkathon-als)
    * Pipeline (jedno lub więcej przekstałceń, albo Estimator albo Transformer)
    * MinMaxScaler, LDA, etc
* Transformer (przekstałca DataFrame na DataFrame (zazwyczaj wzbogacony o kolumnę)
    * Model (= algoritm + dane)
        * PipelineModel (fitted model)
    * One-Hot encoder, etc

In [None]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")

val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")

val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.001)

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training.asInstanceOf[Dataset[_]])

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq((4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test.asInstanceOf[Dataset[_]]).select("id", "text", "probability", "prediction").collect().foreach {
    case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }


### Estymatory w Sparku dostępne w trybie Real Time

Używając API Dataframe/Pipeline wszystkie Estymatory którę dobrzę się skladają w Pipeline (czyli te, których dodanie kolumny z predykcją następuje po wywołaniu metody _transform()_ mogą służyć dla tego żeby używać ich w czasie rzeczywistym :)

### Czas na przykład
![](https://upload.wikimedia.org/wikipedia/commons/thumb/7/70/Solid_white.svg/768px-Solid_white.svg.png)

### Serving vs training
Na poprzednim przykładzie widać że algorytm może być stosowany dla danych streaming'owych natomiast nie jest update'owany na bierząco.

Na drugim przykładzie pokaże mock systemu, który mogł pomóc w rozwiązaniu tego problemu.


### Ograniczenia

API cały czas się ewoluuje i jak widzicie nie wszystkie możliwości są dostępnę (szczególnie w DataFrame/Structured Streaming API).

Natomiast są też w starym API Spark'u algorytmy którę mogą updejtować swoje koeficienty.

To są: (TODO)
    * First
    * Second
    * Third


### A jak u konkurencji

* Flink: biblioteka ml działa wyłącznie dla dla DataSet API (czyli batch). Nie można konwertować DataStream na DataSet. Też zapłanowany development 

* Amazon Machine learning (co prawda to serwis a nie framework) nie ma retraining'u

* Oryx 2 (To w sumie Spark) używa rozwiązania która opiera się na logice 2 przykładu (ale oczywiście tam jest wszystko poprawnie rozbudowano)

* Mahout ma Streaming Kmeans (spark też ma)

* Tensorlow no online retraining
    