#### Introduction :

Les objets de la technologie Spark et leur utilisation à l’aide de commandes en Python, plus
précisément en utilisant l’API pyspark, puis execution des algorithmes d’apprentissage avec la librairie MLlib. 

#### Rappelons le contexte :
Les méthodes d’apprentissage statistique supervisée ou non déploient des algorithmes itératifs dont l’exécution sur des données gérées dans un cadre Hadoop provoquent des lectures / écritures à chaque itération. Les
temps d’exécution s’en trouvent fortement pénalisés.

#### Spark

• La technologie Spark y remédie en intégrant le concept de tables de données distribuées résilientes (Resilient Distributed Dataset ou RDD) 

==> Très schématiquement, chaque partition de données reste en mémoire sur son serveur de calcul entre deux itérations tout en gérant les principes de tolérance aux pannes.


• Les commandes spécifiques de Spark s’exécutent en Java, Scala et aussi pour certaines en Python. D’où l’intérêt de l’apprentissage de Python qui permet sans "trop" d’effort de franchir le changement d’échelle en volume, notamment par l’emploi de la librairie ou plutôt de l’interface de programmation (Application Interface programmation ou API) dédiée PySpark ; cette API propose une utilisation interactive.


• Spark intègre deux principales librairies :
    
    - SQL pour du requêtage dans des données volumineuses et structurées,
    - MLlib avec les principaux algorithmes d’apprentissage et méthodes statistique.
Deux autres librairies sont disponibles pour traiter des données en flux continu (streaming) ou celles de graphes et réseaux.


• La principale motivation pour utiliser Spark est que les mêmes programmes ou commandes sont utilisées pour exécuter des algorithmes d’apprentissage (librairie MLlib), que ce soit sur un poste isolé, sur un cluster, un ensemble de machines virtuelles sur un serveur Amazon, Google cloud, Azure ..., sur des données stockées dans un fichier ou distribuées dans un système Hadoop.

In [1]:
import time
import pyspark
import os
import csv
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf

L’environnement utilisé est décrit par la commande SparkContext initialisant un objet SparkConf qui définit la configuration utilisée comme par exemple l’URL du nœud "maître" (driver) du cluster de calcul utilisé, le
nombre de nœuds "esclaves" ou workers, leur espace mémoire réservé à chacun dans le cas de machines virtuelles.

In [None]:
config = SparkConf().setAll([('spark.cores.max', '3')])
sc = SparkContext(conf=config)

#### Nous manipulerons un RDD

In [3]:
# lecture et distribution du fichier
data = sc.textFile("HistorCommande.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2]))

In [8]:
data.take(5)

[('Albert', 'chocolat', '3.27'),
 ('Albert', 'cassoulet', '2.45'),
 ('Julie', 'coca', '3.20'),
 ('Dominique', 'tarte', '1.50'),
 ('Paul', 'cassoulet', '5.40')]

In [9]:
# nombre total de commandes
NbCommande=data.count()
print("Nb de commandes: %d" % NbCommande)

Nb de commandes: 5


In [10]:
# Nombre de clients uniques
ClientUnique = data.map(lambda record:record[0]).distinct().count()
print("Nb clients: %d" % ClientUnique)

Nb clients: 4


In [11]:
# Total des commandes
TotalCom = data.map(lambda record:float(record[2])).sum()
print("Total des commandes: %2.2f" % TotalCom)

Total des commandes: 15.82


In [12]:
# Produit le plus commandé
produits = data.map(lambda record:(record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect()
plusFreq = sorted(produits, key=lambda x: x[1],reverse=True)[0]
print ("Produit le plus populaire: %s avec %d commandes" % (plusFreq[0],plusFreq[1]))

Produit le plus populaire: cassoulet avec 2 commandes


Vector

Des vecteurs numériques sont distribuables sur les nœuds sous deux formats : dense ou creux. dans le dernier cas, seules les coordonnées non nulles
sont enregistrées.

Créations de vecteurs denses contenant les valeurs nulles :

In [13]:
from numpy import array
from pyspark.mllib.linalg import Vectors
# vecteur "dense"
# à partir de numpy
denseVec1=array([1.0,0.0,2.0,4.0,0.0])

# en utilisant la classe Vectors
denseVec2=Vectors.dense([1.0,0.0,2.0,4.0,0.0])

Créations de vecteurs creux, seules les valeurs non nulles sont identifiées et stockées. Il faut préciser la taille du vecteur et les coordonnées de ces valeurs non nulles. C’est défini par un dictionnaire ou par une liste d’indices et de valeurs.

In [14]:
sparseVec1 = Vectors.sparse(5, {0: 1.0, 2: 2.0,3: 4.0})
sparseVec2 = Vectors.sparse(5, [0, 2, 3], [1.0,2.0, 4.0])

In [19]:
sparseVec1.toArray(),sparseVec2.toArray()

(array([1., 0., 2., 4., 0.]), array([1., 0., 2., 4., 0.]))

#### MLlib

Exemple d'utilisation de la librairie ML lib

Générer le fichier ci-dessous dans le répertoire courant :

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2

In [20]:
# Déclaration des fonctions
from pyspark.mllib.clustering import KMeans
from numpy import array
from math import sqrt
# Lire et "distribuer" les données
data = sc.textFile("data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
parsedData.collect()

[array([0., 0., 0.]),
 array([0.1, 0.1, 0.1]),
 array([0.2, 0.2, 0.2]),
 array([9., 9., 9.]),
 array([9.1, 9.1, 9.1]),
 array([9.2, 9.2, 9.2])]

In [27]:
# Recherche des 2 classes
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Qualité de la classification
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sum([x**2 for x in (point - center)])

Inert = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
dist_moy=Inert/parsedData.count()
print("Distance au centre du cluster = ", str(dist_moy))

Distance au centre du cluster =  0.01999999999999993


In [28]:
clusters.predict([ 9., 9., 9.]),clusters.predict([ 0.1, 0.1, 0.1])

(1, 0)

In [29]:
# fonction lambda dans map pour "prédire" tous les vecteurs
parsedData.map(lambda point: clusters.predict(point)).collect()

[0, 0, 0, 1, 1, 1]

#### Resilient Distributred Datasets (RDD)

La notion de table de données résiliente est au cœur des fonctionnalités de Spark. Il s’agit d’un ensemble d’enregistrement ou objets d’un type spécifique, partitionnés ou distribués sur plusieurs nœuds du cluster. 

Cet objet est tolérant aux pannes, si un nœud est touché par une défaillance matérielle ou de réseau, la table résiliente est reconstruite automatiquement sur les autres nœuds et la tâche achevée.

Les opérations sur les RDD se déclinent "normalement" pour des données distribuées en étapes Map et Reduce.

La principale propriété des RDD de Spark est la possibilité de les cacher en mémoire (RAM) de chaque nœud. C’est ce qui permet d’économiser énormément d’accès disques qui sont le principal verrou, en terme de temps de calcul,
lors de l’exécution d’algorithmes itératifs.



#### Random Forest

In [31]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest

# construction des données
data = [
LabeledPoint(0.0, [0.0]),
LabeledPoint(0.0, [1.0]),
LabeledPoint(1.0, [2.0]),
LabeledPoint(1.0, [3.0])]

# distribution de la table
trainingData=sc.parallelize(data)
trainingData.collect()


[LabeledPoint(0.0, [0.0]),
 LabeledPoint(0.0, [1.0]),
 LabeledPoint(1.0, [2.0]),
 LabeledPoint(1.0, [3.0])]

In [32]:
# Estimation du modèle
model = RandomForest.trainClassifier(trainingData, 2, {}, 30, seed=42)
model.numTrees(),model.totalNumNodes()

(30, 82)

In [33]:
# "Affichage" de la forêt
print (model.toDebugString())

TreeEnsembleModel classifier with 30 trees

  Tree 0:
    Predict: 1.0
  Tree 1:
    If (feature 0 <= 1.5)
     Predict: 0.0
    Else (feature 0 > 1.5)
     Predict: 1.0
  Tree 2:
    If (feature 0 <= 1.5)
     Predict: 0.0
    Else (feature 0 > 1.5)
     Predict: 1.0
  Tree 3:
    If (feature 0 <= 0.5)
     Predict: 0.0
    Else (feature 0 > 0.5)
     Predict: 1.0
  Tree 4:
    If (feature 0 <= 0.5)
     Predict: 0.0
    Else (feature 0 > 0.5)
     Predict: 1.0
  Tree 5:
    Predict: 0.0
  Tree 6:
    If (feature 0 <= 1.5)
     Predict: 0.0
    Else (feature 0 > 1.5)
     Predict: 1.0
  Tree 7:
    If (feature 0 <= 1.5)
     Predict: 0.0
    Else (feature 0 > 1.5)
     Predict: 1.0
  Tree 8:
    If (feature 0 <= 1.5)
     Predict: 0.0
    Else (feature 0 > 1.5)
     Predict: 1.0
  Tree 9:
    If (feature 0 <= 0.5)
     Predict: 0.0
    Else (feature 0 > 0.5)
     Predict: 1.0
  Tree 10:
    If (feature 0 <= 0.5)
     Predict: 0.0
    Else (feature 0 > 0.5)
     Predict: 1.0
  Tree 11:

In [34]:
# Prévision d’un échantillon
rdd = sc.parallelize([[3.0], [1.0]])
model.predict(rdd).collect()

[1.0, 0.0]