# Projet KMEANS en Spark
##### Daniel OMOLA & Hy-Boui CHANG

L'objectif du projet est d'explorer les différentes possibilités d'améliorer un programme de KMeans sous Spark.

Pour cela, nous utilisons un jeu de données très connu sur les fleurs d'IRIS, et d'appliquer l'algorithme KMeans dessus.  
Une première version du programme est fourni, en RDD sous Python.




## Chapitre 0 : Description du jeu de données
Le jeu de données est disponible sous l'adresse suivante : https://www.dropbox.com/s/9kits2euwawcsj0/iris.data.txt.
Ce jeu de données comprend un total de 150 observations, réparties de manière égale entre les trois espèces de fleurs d'iris (setosa, virginica et versicolor). Quatre caractéristiques sont mesurées pour chaque observation (il s'agit de la longueur et la largeur du sépale et du pétale, en centimètres).


## Chapitre 1 : analyse du programme de départ (RDD Python)

Afin de décrire le programme KMeans fourni en Python RDD, nous le lançons en local, étape par étape (et non le fichier kmeans-dario-x.py dans son ensemble).

Nous avons téléchargé le jeu de données et nous l'avons placé en local.
Après avoir lancé Pyspark, il convient de charger le jeu de données :

In [None]:
lines = sc.textFile("iris.data.txt")

Le jeu de données est modifié / préparé : 
 -  étape 1 : séparer les éléments identifiés par une virgule
 -  étape 2 : mettre les 4 premiers éléments en nombre flottant, puis le type d'iris. Le tout est regroupé ensemble, il s'agit de x[0]
 -  étape 3 : ajouter un index à la fin de chaque donnée (début index : 0). On obtient ici x[1]
 -  étape 4 : on met permute l'index et les données : donc on inverse x[0] avec x[1]
 
 Voici un aperçu de ce que l'on obtient : __[(104, [6.5, 3.0, 5.8, 2.2, 'Iris-virginica']), ....__

In [None]:
data = lines.map(lambda x: x.split(','))\
            .map(lambda x: [float(i) for i in x[:4]] + [x[4] ])\
            .zipWithIndex()\
            .map(lambda x: (x[1],x[0]))

Dans notre jeu de données, nous savons ici par avance qu'il y a 3 clusters à trouver : setosa, virginica et versicolor.
Il s'agit alors ici de choisir au hasard 3 points comme centroïdes potentiels, et d'effectuer les calculs d'ajustement après.
A nouveau, on ajouter ici un index commençant par 0, qu'on déplace au début des lignes de données.

On obtient par exemple : 
__[(0, [5.2, 4.1, 1.5, 0.1]),
 (1, [6.4, 2.9, 4.3, 1.3]),
 (2, [5.1, 3.7, 1.5, 0.4])]__

Remarque 1 : la fonction "takeSample" admet le paramètre "withoutReplacement" car il s'agit de choisir 3 centroïdes distincts.

Remarque 2 : si nous ne savions pas à l'avance que notre jeu de données était optimum avec 3 clusters, il aurait fallu lancer .......XXXXX......

In [None]:
nb_clusters=3
centroides = sc.parallelize(data.takeSample('withoutReplacment',nb_clusters))\
              .zipWithIndex()\
              .map(lambda x: (x[1],x[0][1][:-1]))

Nous allons effectuer un premier calcul : le produit cartésian. L'objectif est de coupler chaque ligne de données (150 lignes) avec chacun des 3 centroïdes.
Nous remarquons dores et déjà que cette opération prend beaucoup de temps.

In [None]:
joined = data.cartesian(centroides)

Nous allons ensuite calculer la distance euclidienne entre chaque point (chaque donnée avec ses 4 valeurs) et les centroïdes.

Nous commençons par définir la fonction de calcul de la distance euclienne, puis nous appliquons cette fonction à notre RDD. Prenons par exemple le point suivant :
__((10, [5.4, 3.7, 1.5, 0.2, 'Iris-setosa']), (0, [5.4, 3.4, 1.5, 0.4]))__, 

et décortiquons la commande
__dist = joined.map(lambda x: (x[0][0],(x[1][0], computeDistance(x[0][1][:-1], x[1][1]))))__

 - premier élément : x[0][0] = 10 (index de la ligne considérée)
 - second élément :   
1ère partie : x[1][0] = 0 (c'est l'index du centroïde)  
2nde partie : computeDistance(x[0][1][:-1], x[1][1]) (distance euclidienne des 4 chiffres)  

A la fin, on obtient en sortie : __(10, (0, 0.360555))__


In [None]:
from math import sqrt

def computeDistance(x,y):
    return sqrt(sum([(a - b)**2 for a,b in zip(x,y)]))

dist = joined.map(lambda x: (x[0][0],(x[1][0], computeDistance(x[0][1][:-1], x[1][1]))))

La prochaine étape consiste à grouper par l'index de chaque donnée.
Pour chaque donnée, on obtient les distances pour les 3 centroïde.

Voici un exemple de ce qu'on obtient :
__(0, [(0, 0.866025403784438), (1, 3.7), (2, 0.5385164807134504)])__

In [None]:
dist_list = dist.groupByKey().mapValues(list)

Nous allons maintenant calculer le centroïde le plus proche de chaque point.  
Pour ce faire, une fonction "closestCluster" est créée et son but est de garder pour chaque point, les données du centroïde le plus proche.

Nous appliquons la fonction closestCluster à dist_list (la liste des 3 distances). Voici le résultat sur le point d'index 0 :
 - en entrée : __(0, [(0, 0.866025403784438), (1, 3.7), (2, 0.5385164807134504)])__
 - en sortie : __(0, (2, 0.5385164807134504))__

In [None]:
def closestCluster(dist_list):
    cluster = dist_list[0][0]  # 0
    min_dist = dist_list[0][1] # ? j'aurais mis dist-list[1][1] ?
    for elem in dist_list:     # ? j'aurais mis "for elem in dist_list[1]"
        if elem[1] < min_dist:
            cluster = elem[0]
            min_dist = elem[1]
    return (cluster,min_dist)

min_dist = dist_list.mapValues(closestCluster)

Notre objectif est de pouvoir recalculer les 3 nouveaux centroïdes, sachant les points liés aux centroïdes actuels.  
Pour cela, nous allons récupérer l'index du centroïde, et les coordonnées de chaque point associés.  

assignment ==> nous rapratrions les données complètes du point  
clusters ==> nous ne garderons que l'index du centroïde, et les 4 valeurs du point associé (de la ligne de donnée concernée)

Voici un exemple de ce que l'on obtient :__(2, [5.1, 3.5, 1.4, 0.2])__  
Le centroïde d'index 2, est lié à un point dont les valeurs sont (5.1, 3.5, 1.4, 0.2).  
Nous avons toujours nos 150 lignes de données de départ.



In [None]:
assignment = min_dist.join(data)
clusters = assignment.map(lambda x: (x[1][0][0], x[1][1][:-1]))

Nous allons pouvoir calculer les nouveaux centroïdes (qui doivent être au barycentre de points assignés à chaque centroïde).

count ==> fonction qui ajoute un "1" à chaque occurence du même centroïde, et qui les compte, ce que revient à comptabiliser le nbre de lignes de données pour chaque centroïde  
somme ==> fonction qui additionne les 4 valeurs d'une donnée, 1 à 1 et par centroide

moyenneList ==> appliquée dans le code générant "centroidesCluster", il permet de recalculer les "baricentres" de l'ensemble des données associés à un centroïde.


In [None]:
count = clusters.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)

def sumList(x,y):
    return [x[i]+y[i] for i in range(len(x))]

somme = clusters.reduceByKey(sumList)

def moyenneList(x,n):
    return [x[i]/n for i in range(len(x))]

centroidesCluster = somme.join(count).map(lambda x : (x[0],moyenneList(x[1][0],x[1][1])))

## Chapitre 2 : Optimisations

### Modification des paramètres

In [None]:
# graphe pour déterminer le nbre d'itérations optimal
# ne pas prendre les 4 données mais peut être que 1 car elles sont toutes correlées ... ==> a faire si on a le temps
# hyperparamètres ?? on laisse tomber
# passer en array ??

### JOIN et CARTESIAN prennent du temps de calcul

In [None]:
# filtrer avant de JOIN ou Cartesian

### Programmer en SCALA pour gagner en rapidité d'exécution
Spark est implémenté en Java. L’API Python permet de faire beaucoup de choses mais :

Elle ne sera jamais aussi complète que l’API Java.

Elle sera plus lente que l’API Java ou Scala (car Scala est une surcouche fonctionnelle de Java).

In [None]:
val lines = sc.textFile("iris.data.txt")




### Passage en DATAFRAME

les RDD (Resilient Distributed Data) sont immutables et compile-time type-safe. They can be used on unstructured data. Lazy.
Offre controle et flexibilité.
Low-level API
Type-safe.
RDD : on dit comment faire (et pas ce qu'on veut faire).


In [None]:
# predicate pushdown
# QBO
# CBO

### Passage en DATASET

In [None]:
# DatFrame = dataset[Row]
# que sur scala ??

## Data Serialisation Format

In [None]:
# Par défaut, Spark utiliser "Java serialization" qui est lent.
# alternative : utiliser Kryo (plus rapide et plus compact) ou encore mieux on sait que les dataFrame et DataSet utilisent la sérialisation TUNGSTEN.

val sparkConf : SparkCond = new SparkConf()
  .set("spark.serializer", "org.apach.spark.serializer.KryoSerializer")

val sparkSession : SparkSession = SparkSession
  .builder()
  .config(sparkConf)
  .getOrCreate()

# enregistrement d'une classe custum avec Krio
sparkConf.registerKryoClasses(Array(classOf[MyCustomeClass]))

## Format de stockage

In [None]:
# utilisation d'un format binaire (plutôt que txt ou csv) 
# ==> par exemple, utiliser Apache Parquet, Apach Avro


## Broadcast Join

In [None]:
val sparkConf : SparkConf = new SparkConf()
  .set("spark.sql.autoBroadcastJoinThreshold", "2147483648")
  .set("spark.sql.broadcasTimeout", "900") # modifier le tiemOut par défaut de 300 s

# forcer le broadcast
val result=bigDataFrame.join(broadcast(smallDataFrame))

## Amélioration de la mémoire

In [None]:
# utilisation du cache()
# utilisation de persist()

With cache(), you use only the default storage level :
   MEMORY_ONLY for RDD
   MEMORY_AND_DISK for Dataset
   With persist(), you can specify which storage level you want for both RDD and Dataset.

Use persist() if you want to assign a storage level other than :
   MEMORY_ONLY to the RDD
   or MEMORY_AND_DISK for Dataset

Which Storage Level to Choose?
If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

## Lancement dans le cloud : hardware tuning

In [None]:
# https://youtu.be/1fEXmuaEGjQ vers 16min
# mettre assez d'executor
# trop de executor cores ==> trop de parallelisme ==> reduire
# pas assez de partitions
# persist in memory and disk with serialization
# off head memory for caching
# pb de données non uniformément distribuées dans les partitions ==> au moment du join, on a un soucis



# modification du nbre d'éxecutors
--num-executors = 5
--executor-cores=14
--executor-memory=18GB

# dynamic allocation : utiliser autant de ressources que possible selon les besoins

# nbre de partitions pour le parallelism

# GC Tuning

# data locality

# dr elephant
