# Machine learning sur big data en spark et TensorFlow

## Scala, Accumulators, DataFrames and Datasets

### Intérêts de Scala

- robustesses des applications (i.e ne doit pas cacher de mauvaises suprrises), exemple python : typage dynamique donc erreurs potentielles à l'exécution (*runtime*) ;
- performances : JVM, compilation en byte code, optimisation, etc. ;
- concurrence : gestion des threads, etc. ;

### Typage dynamique vs statique

A la différence python qui est dynamiquement typé, Scala est statiquement typé.

En scala, lorsque je crée une variable `x`, je dois lui attribuer un type, par exemple `Int`, `Double`, `String`, etc.
Scala dispose d'un algorithme d'inférence de type qui permet de déduire le type de la variable `x` :

```scala
scala> val x = 42
x: Int = 42
```

### Typage en Scala

![Typage en scala.](typage_scala.png "Typage en scala.")

**Attention il manque des inclusions sur cette image, par exemple, `Int` est un sous-ensemble de `Double`.**

### `List`, `Array` and `Set` en Scala


```scala
scala> 8 * 5 + 2
res0: Int = 42
scala> 0.5 * res0
res57: Double = 21.0
scala> "Hello, " + res0
res58: String = Hello, 42
scala> 1 to 10
res0: scala.collection.immutable.Range.Inclusive = Range 1 to 10
// Range for populating sequences
scala> val x = (1 to 10).toList // equivalent range in python
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //  List[int] : indique le type la valeur dans cette collection
scala> val x = (1 to 10).toArray // dans la descente du gradient, on utilise des arrays
x: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val x = (1 to 10).toSet // Set : collection d'éléments uniques, en l'occurrence il n'y a pas de doublons mais si on ajoute des éléments en doubles, il n'en prendra qu'un seul
x: scala.collection.immutable.Set[Int] = Set(5, 10, 1, 6, 9, 2, 7, 3, 8, 4)
```
Différence entre `List`, `Array` et `Set` :
- le nombre d'éléments est fixe dans une `Array` et on peut accéder à un élément par son index ;
- les `array` sont stockés de façon contigue en mémoire, ce qui permet d'accéder à un élément en temps constant ;
- `List` occupe beaucoup plus d'espaces en mémoire que `Array` car chaque élément de la liste est un objet qui contient la valeur et une référence vers l'élément suivant ;
- Dans les `List` il faut suivres les pointeurs alors que pour `Array` on accède directement à l'élément ;
- `List` est une collection immuable, on ne peut pas ajouter ou supprimer des éléments, on peut seulement créer une nouvelle liste en ajoutant ou supprimant des éléments ;
- `List` : plus d'éfficacité pour ajouter ou supprimer des éléments en début de liste ;
- `Array` : plus d'éfficacité pour accéder à un élément par son index et donc sur le plan de la mémoire ;
- `Set` : collection d'éléments uniques, en l'occurrence il n'y a pas de doublons mais si on ajoute des éléments en doubles, il n'en prendra qu'un seul ;
- `Set` est immutable, ordonné, propriété d'appartenance (*membership*) : avec des `Set` on peut vérifier si un élément appartient à un ensemble en temps quasi constant ;
- **`Set` est plus rapide que `List` pour vérifier si un élément appartient à un ensemble ;**

Si jamais l'on dispose d'une collection avec de sdoublons et l'on a besoin de vérifier l'appartenance d'un élément à cette collection, dans ce cas on peut utiliser un dictionnaire en python (`dict`) ou un `Set` en Scala.
Les `Set` et les `dict` ont une structure équivalente, ils sont basés sur des tables de hachage (*hash tables*).

faire une boucle for sur la liste
Créer une liste de dictinnaires associés à chaque élément
parcourir la liste et incrémenter le compteur de chaque élément
```scala
scala> val x = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val y = x.map(i => (i, 1))
y: List[(Int, Int)] = List((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1))
scala> val z = y.groupBy(_._1).mapValues(_.size)
z: scala.collection.immutable.Map[Int,Int] = Map(5 -> 1, 10 -> 1, 1 -> 1, 6 -> 1, 9 -> 1, 2 -> 1, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 1)
```

### Conditional expressions
```scala
scala> val x=1
x: Int = 1
scala> if (x > 0) 1 else -1
res1: Int = 1
scala> val s = if (x > 0) 1 else -1
s: Int = 1
```

### Block expressions and assignements
```scala
scala> { r = r * n; n -= 1 }
scala> var c = { r = r * n; n -= 1 }
c: Unit = ()
scala>
scala> var x0,y0=0.1
x0: Double = 0.1
y0: Double = 0.1
scala> { val dx = x - x0; val dy = y - y0; sqrt(dx * dx + dy * dy) } // le type de cette expression est le type associé à la dernière valeur de l'expression : dx * dx + dy * dy qui est une racine carrée donc Double
res40: Double = 1.2727922061357855
```

### Loops
```scala
scala> while (n > 0) { r=r* n
        | n -= 1
        | }
scala> for (i <- 1 to n) // il n'y pas de in comme en python mais plutôt un <- (on peut parcourir des List, Arrays, Sets, etc.). Il s'agit d'une structure parraisseuse (lazy). On peut aussi utiliser until pour exclure la dernière valeur.
        | r=r* i
scala> val s = "Hello"
s: String = Hello
scala> var sum = 0
sum: Int = 0
scala> for (i <- 0 until s.length) // Last value for i is s.length - 1
        | sum += s(i)
```

### Functions

```scala
scala> def abs(x: Double) = if (x >= 0) x else -x
abs: (x: Double)Double // Type de la fonction : Double et de la sortie de la fonction : Double. Remarque on peut mettre le type de sortie de la fonction mais il faut prendre garde à ce que le type de sortie soit le même que le type de la valeur de retour de la fonction. Si l'on met un int, cela fonctionnera car int est un sous-ensemble de double.
scala> def fac(n : Int) = {
      | var r = 1
      | for (i <- 1 to n) r = r * i
      | r
      | }
fac: (n: Int)Int
scala>
scala> def fac(n: Int):Int =
      | if (n <= 0) 1 else n * fac(n - 1) //fonction récursive
fac: (n: Int)Int
scala>
```

Lorsque l'on dispose d'une fonction récursive, le typage est obligatoire.

### Pattern matching

#### Rappels concernant les Patterns :
- Motifs ;
- En général lorsqu'il est question de pattern, il s'agit de valeurs que l'on peut réprésenter dans un langage de programation avec des parties qui ne sont pas complètement sépécifiées. Et qu'il faudra spécifier lorsque l'on fera le *pattern matching*.

**exemple :**

Ce pattern contient des éléments qui ne sont pas spécifiés. Si l'on dispose ce pattern : (a, 3), on va vérifier si le tuple : (7,3) correspond à ce pattern. Si c'est le cas, on va extraire la valeur de `a`. en scala, il est même possible de faire du pattern matching sur des types de données complexes comme des listes, des arbres, etc.

```scala
scala> val x = (1, 2, 3)
x: (Int, Int, Int) = (1,2,3)
scala> val (a, b, c) = x
a: Int = 1
b: Int = 2
c: Int = 3

scala> def fact(n: Int): Int = n match {
      | case 0 => 1 // si n = 0 on retourne 1
      | case n => n * fact(n - 1) // sinon on retourne n * fact(n - 1)
      | }
fact: (n: Int)Int
```

`case` va avec `match` mais la réciproque n'est pas garantie (*i.e.* `match` peut être utilisé sans `case`).

```scala
scala> r.map(c => c._1) //on veut retourner la première composante de c. equivalent en python : lambda c: c[0]
scala> (k, v).map(case(k,v) => k) // équivalent à r.map(c => c._1). Dans ce cas cela fonctionne systématiquement et renvoie le premier élément du tuple. Si la valeur d'entrée ne correspond pas au case un erreur est levée en indiquant qu'il manque un case.

scala> def f(x: Any): String = x match {
      | case i:Int => "integer: " + i // pattern qui décrit une composante typage. Cela match si x est un entier
      | case _:Double => "a double" // pattern qui match si x est un double
      | case s:String => "I want to say " + s // pattern qui match si x est une chaine de caractères
      | case _: Any => "I do not know"} // pattern qui match si x est de n'importe quel autre type que ceux précédemment cités
f: (x: Any)String
scala> f(3)
res2: String = integer: 3
scala> f(())
res3: String = I do not know
scala> f(8.6)
res4: String = a double
scala> f("Hey")
res5: String = I want to say Hey
```

### Variable-Length Arrays

```scala
scala> import scala.collection.mutable.ArrayBuffer
scala> val b = ArrayBuffer[Int]()
b: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer()
scala> b += 1 // On ajoute un élément à l'ArrayBuffer b
res17: b.type = ArrayBuffer(1)
scala> b += (1, 2, 3, 5)
res18: b.type = ArrayBuffer(1, 1, 2, 3, 5)
scala> b ++= Array(8, 13, 21) // Ajout d'une séquence à une ArrayBuffer. En l'occurrence on ajoute un Array à l'ArrayBuffer. Dans ce cas c'est possible par l'intermédiaire de l'opérateur ++= mais ce n'est pas toujours le ca.
res19: b.type = ArrayBuffer(1, 1, 2, 3, 5, 8, 13, 21)
scala> b.trimEnd(5)
scala> b
res27: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(1, 1, 2)
```

### Traversing Arrays

```scala
scala> b ++= Array(8, 13, 21)
res19: b.type = ArrayBuffer(1, 1, 2, 3, 5, 8, 13, 21)
scala> b.trimEnd(5)
scala> for (i <- 0 until b.length)
      | println(i + ": " + b(i))
0: 1
1: 1
2: 2
scala> for (elem <- b)
      | println(elem)
1
1
2
```
### Transforming Arrays

```scala
scala> b ++= Array(8, 13, 21)
res19: b.type = ArrayBuffer(1, 1, 2, 3, 5, 8, 13, 21)
scala> b.trimEnd(5)
scala> for (elem <- b if elem % 2 == 0) yield 2 * elem //yield permet de retourner une valeur et génère à la demande cet élément. Il s'agit d'une opération lazy.
res25: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(4)
scala> b.filter(x => x % 2 == 0 ).map(x => 2 * x) // en remplaçant b par un rdd cela devient du code spark et le code sera exécuté de manière distribuée. Ce n'est pas la cas pour for (elem <- b if elem % 2 == 0) yield 2 * elem
res26: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(4)
scala> b.filter(_ % 2 == 0).map(2 * _) // Losqu'il s'agit d'un seul élément on peut utiliser _ à la place de x. Cela permet de simplifier le code.
res27: scala.collection.mutable.ArrayBuffer[Int] = ArrayBuffer(4)
```

Remarque : L'équivalent `lambda x :` est `x =>` en Scala.

### Quelques éléments supplémentaires

```scala
scala> // calculer la somme de toutes les valeurs de b
scala> b.reduce((x, y) => x + y) // en python lambda x, y : x + y. reduce est une opération qui prend une fonction à deux arguments et qui va appliquer cette fonction à chaque élément de la liste. Cela va donner un résultat final.
scala> b = ArrayBuffer(1, 1, 2)
scala> b.zip(b) // prende deux collections de même longueur est va créer une collection de tuples. Dans ce cas on a une collection de tuples de la forme (1, 1), (1, 1), (2, 2). Il faut que les collections aient la même longueur.
scala> b.zipWithIndex // va donner une collection de tuples de la forme (1, 0), (1, 1), (2, 2). Cela permet de donner un index à chaque élément de la collection.
```


## Descente du gradient

### Premiers pas

```scala
scala> val v1 = Array(1.0, 1.0)
scala> val v2 = Array(1.0, 1.0)
scala> // somme, produit, scalaire, soustraction, produit par un scalaire
scala> // Exemples :
scala> // Somme : sum(v1, v1) ----> (2.0, 2.0)
scala> def sum(v1: Array[Double], v2:Array[Double]):Array[Double] = {
  (v1 zip v2).map{case (a, b) => a + b}
}
scala> sum(v1, v1)
scala> res0: Array[Double] = Array(2.0, 2.0)
scala> // Produit : prodScal(v1,v2) ------> 2
scala> // sub(v1, v2) ------> (0,0)
scala> // prodByscal(5, v1) ------> (5.0, 5.0)


scala> val v1 = Array(1.0, 1.0)
scala> val v2 = Array(1.0, 1.0)
scala> // somme, produite, scalaire, soustraction, produit par un scalaire
scala> // Exemples :
scala> // Somme : sum(v1, v1) ----> (2.0, 2.0)
scala> def sum(v1: Array[Double], v2: Array[Double]) = {
    | val n = v1.length
    | val res = new Array[Double](n)
    | for (i <- 0 until n) 
    | | res(i) = v1(i) + v2(i)
    | | res
    | };
```

## Descente du gradient


![Régression linéaire.](Linear_regression.png "Régression linéaire.")

$f_{w}(x) = w \cdot \phi(x)$ (avec ou sans biais ou avec de l'*offset* ou sans *offset*).

Guider l'apprentissage de façon à ce ques les écarts entre les valeurs prédites et les valeurs réelles soient minimisés ($(w \cdot \phi(x)) - y$).

Régression $L_2$ :
$\text{Loss}_{\text{squared}}(x,y,\textbf{w}) = (w \cdot \phi(x) - y)^{2}$.
- La valeur de $\textbf{w}$ qui minimise la fonction de perte est la moyenne de $y$ ;
- La moyenne s'accomode à l'ensemble des exemples.

$\text{Loss}_{\text{absdev}}(x,y,\textbf{w}) = \vert w \cdot \phi(x) - y \vert$.
- La valeur de $\textbf{w}$ qui minimise la fonction de perte est la médiane de $y$ ;
- La médiane est plus robuste aux valeurs extrêmes.

### Gradient

$$
\begin{align*}
\text{TrainLoss}(\textbf{w}) &= \frac{1}{\vert D_{train} \vert} \sum_{i=1}^{n} \text{Loss}(x_i, y_i, \textbf{w}) \\
&= \min\limits_{\textbf{w} \in \mathbb{R}^{d}} \text{TrainLoss}(\textbf{w})
\end{align*}
$$

1. Initialize $\textbf{w} = [0, \cdots, 0]$ ;
2. For $\text{t} \in [1, \cdots, T]$:
    - $\textbf{w} \leftarrow \textbf{w} - \underbrace{\eta}_{\text{step size}} \nabla_{w} \underbrace{\text{TrainLoss}(\textbf{w})}_{\text{gradient}}$
    - $\eta$ : *step size* ;
    - $\nabla_{\textbf{w}} \text{TrainLoss}(\textbf{w})$ : *gradient* de la fonction de perte.

Spark permet de faire du parallélisme, donc la combinaison des hyperparamètres comme $\eta$, est possible.

Il est possible que la loss ne soit pas concave notamment en *deep learning*. C'est la raison pour laquelle est utilisée la descente du gradient stochastique (adaptatif).
Par exemple : *Adam* (qui regarde une partie du passé), *Adagrad*, *RMSprop*, etc.
    