# Affinity Propagation en pySpark

## Qu'est-ce que c'est ?
L'_Affinity Propagation_ est un algorithme de clustering (apprentissage non supervisé) ne requérant pas pour l'utilisateur de choisir le nombre initial de partitions souhaitées. Cela est particulièrement utile lorsque qu'il n'existe pas d'_a priori_ concernant le nombre de clusters.
En cela, l'_Affinity propagation_ s'oppose aux méthodes classiques (K-means, mean-shift, etc.).

<ol> 
    <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Test_de_l'affinity_propagation_de_Scikit_Learn](#Test-de-l'affinity-propagation-de-Scikit-learn)</span></li>
    
    <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Lecture_du_dataset_via_Spark_et_conversion](#Lecture-du-dataset-via-Spark-et-conversion)</span></li>
    <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Définition_des_préférences](#Définition-des-préférences)</span></li>
    <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Définition_des_similarités](#Définition-des-similarités)</span></li>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Actualisation_des_matrices_Availabilities_et_Responsabilities](#Actualisation-des-matrices-Availabilities-et-Responsabilities)</span></li>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Test_sur_les_données_simulées](#Tests-sur-les-données-simulées)</span></li>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Application_avec_les_Iris_de_Fisher](#Application-avec-les-Iris-de-Fisher)</span></li>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Conclusions_et_critiques_du_travail_rendu](#Conclusions-et-critiques-du-travail-rendu)</span></li>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Bibliographie](#Bibliographie)</span></li>


</ol>



Commençons par importer les packages nécessaires au projet.

In [2]:
from pyspark import SparkConf, SparkContext
sc = SparkContext.getOrCreate()
import pandas as pd
import numpy as np
import time as t
import os
chemin = '/Users/pierredesmet/Documents/Documents Word/Etudes/UTT/ENSAE/Semestre 1/Éléments logiciels pour le traitement des données massives'
os.chdir(chemin)

from sklearn.cluster import AffinityPropagation
from sklearn import metrics
from sklearn.datasets.samples_generator import make_blobs

## Test de l'_affinity propagation_ de Scikit-learn

Nous essayons dans un premier temps l'algorithme tel qu'il est implémenté dans Scikit-Learn (<a href = 'http://scikit-learn.org/stable/auto_examples/cluster/plot_affinity_propagation.html'>ici</a>), par curiosité.

Nous générons pour cela un data set composé de 1000 individus décrits par 2 attributs. Ces données sont générées à partir de 3 centres, en dimension 2.

In [3]:
# Générons des données à partir de 3 centres initiaux
centers = [[1, 1], [-1, -1], [1, -1]]
X, labels_true = make_blobs(n_samples=50, centers=centers, cluster_std=0.5,
                            random_state=0)

In [4]:
print(X[:4])
print(labels_true[:4])

[[ 1.10413749 -0.51168048]
 [ 1.76638961  1.73467938]
 [ 1.00525001 -0.10706475]
 [-1.33623022 -1.17977658]]
[2 0 2 1]


À partir de ces données, nous pouvons lancer l'algorithme de Scikit-Learn, lequel propose effectivement **3 centres** pour segmenter les donneés :

In [5]:
af = AffinityPropagation().fit(X) # @param : preference=-50
cluster_centers_indices = af.cluster_centers_indices_
labels = af.labels_
print('Estimated number of clusters: %d' % len(cluster_centers_indices))
print("Homogeneity: %0.3f" % metrics.homogeneity_score(labels_true, labels))

Estimated number of clusters: 3
Homogeneity: 0.931


In [6]:
print('Les centres retenus sont :\n',X[cluster_centers_indices])

Les centres retenus sont :
 [[ 1.06345605 -0.79900532]
 [-1.31716105 -1.18137058]
 [ 1.38051886  1.06083751]]


Notons que si l'on augmentait à 1000 le nombre d'individus générés, cet algorithme proposerait beaucoup plus de partitions (258).

## Lecture du dataset _via_ Spark et conversion
Comme précédemment, nous allons générer un jeu de données avec un peu plus d'individus cette fois.

In [7]:
centers = [[4, 4], [-5, -5], [1, 1]]
X, labels_true = make_blobs(n_samples=1500, centers=centers, cluster_std=0.5, random_state=0)

In [8]:
pd.DataFrame(X).to_csv('Dataset.csv', index = False, header = False) # enregistrement des enregistrements X...
data = sc.textFile('Dataset.csv').cache()# ...et lecture de ces derniers avec Spark

In [9]:
data = data.map(lambda line: line.split(","))

In [10]:
# Conversion en float des strings
n_obs = data.count() 
n_cols = len(data.take(1)[0])
data = data.map(lambda line : [float(line[i]) for i in range(n_cols)])

In [11]:
# Print des 3 premières observations
data.take(3)

[[-4.143847389328352, -5.396057510282575],
 [1.4130628542155965, 0.9711217208106436],
 [-4.770207547582972, -4.85997106998116]]

Lorsque les données ne sont pas sur la même échelle, il vaut mieux les centrer et les réduire. Ici ce n'est pas nécessaire, mais faisons-le tout de même pour l'exemple.

**Remarque** : nous aurions aussi pu utiliser la fonction `sklearn.preprocessing.scale` de Scikit-learn.

In [12]:
# La transposée d'un RDD sert pour centrer-réduire
def transpose_rdd(rdd):
    return sc.parallelize(np.array(rdd.collect()).transpose())  

def centre_reduit(rdd):
    data_transposed = transpose_rdd(rdd)
    data_processed_rdd = data_transposed.map(lambda record : (record-np.mean(record)) / np.std(record))
    return transpose_rdd(data_processed_rdd)

In [13]:
# On centre et on réduit toutes les données
data = centre_reduit(data)
data.take(3)

[array([-1.09866357, -1.43086088]),
 array([ 0.38183502,  0.25986154]),
 array([-1.26554143, -1.28851003])]

## Définition des préférences

In [14]:
# Cette fonction retourne un vecteur composé des moyennes (resp. médianes) de toutes les données
def preference(rdd,fonction):
    if fonction == 'moyenne':
        vecteur = rdd.map(lambda feature : np.mean(feature))
        return [vecteur.mean()]*rdd.count()
    elif fonction == 'médiane':
        median = np.median(pd.DataFrame(rdd.collect()))
        return [median]*rdd.count()
# test
pref = preference(data,'moyenne')
pref[:3]

[3.6711374680938509e-17, 3.6711374680938509e-17, 3.6711374680938509e-17]

## Définition des similarités
L'algorithme d'_affinity propagation_ utilise une matrice de similarité, dont les coefficients $c_{i,j}$ correspondent à la similarité (mesure euclidienne négative) entre une observation $x_i$ et une observation $x_j$.

Pour calculer cette matrice, nous définirons plusieurs fonctions dont les dépendances fonctionnelles et les rôles sont précisés ci-dessous.
<img src = 'http://img11.hostingpics.net/pics/942659Capturedcran20170119173305.png'/ width = 800px>

### Accès à des élements d'une RDD _via_ leur indice
Par la suite, il sera pratique d'accéder aux éléments des _Resilient Distributed Dataset_ par leur indice. Créons la fonction permettant cela.

In [15]:
# Retourne la valeur associée à un indice
def index_to_value(rdd,index):
    keyval = rdd.zipWithIndex().map(lambda item : (item[1],item[0]))
    return keyval.lookup(index-1)
# test
rdd_test = sc.parallelize(['5','6','7'])
index_to_value(rdd_test,2)

['6']

### Distance euclidienne négative

In [16]:
# Retourne la distance euclidienne négative entre deux vecteurs x1 et x2
def similarity(x1,x2):
    sim = np.linalg.norm(np.array(x1)-np.array(x2))
    return(-sim*sim)
# Test
similarity([1,2,3],[3,2,3])

-4.0

Pour des besoins futurs, on souhaite connaître "la similarité d'une chaîne de caractère", p. ex. "12.34". Celle-ci correspond à la distance euclidienne négative entre l'observation $x_{12}$ et l'observation $x_{34}$.

In [17]:
def retourne_similarite(chaine):
    ligne_num = int(chaine.split('.')[0])
    colonne_num = int(chaine.split('.')[1])
    return similarity(X[ligne_num-1], X[colonne_num-1]) # utilise un df pandas
    #return similarity(index_to_value(data,ligne_num),index_to_value(data,colonne_num)) #utilise un rdd pyspark
# Test
retourne_similarite("12.34")

-0.13186194667095397

### Matrice d'index

La méthodologie que nous adopterons est la suivante. Nous souhaitons définir une "matrice d'index" (à gauche), laquelle permettra de calculer de façon parallèle la matrice de similarité (à droite), ici dans le cas de $n$ = 300 individus.
<img src = "http://img11.hostingpics.net/pics/226582Capturedcran20170119171946.png"/>

**Remarque** : le coût algorithmique pour générer cette matrice est grand, $o(n^2)$. On le considèrera cependant comme tolérable, puisque qu'il n'est nécessaire de l'exécuter qu'une seule fois. En effet, une fois le fichier Excel créé, les exécutions ultérieures du programme n'ont plus qu'à importer le classeur. Si l'on dispose de ce classeur, il est donc possible de passer directement au bloc suivant.

In [18]:
#####################################################
#     Attention : portion de code chronophage       #
#####################################################
# Création d'une matrice d'index
debut = t.time()
mat = [[None]*data.count()]*data.count()
#mat = pd.DataFrame(mat)
for i in np.arange(0,data.count()):
    for j in np.arange(0,data.count()):
        mat[i][j] = str(i+1)+'.'+str(j+1)
print(t.time()-debut)

#Il peut être intéressant de sauvegarder cette matrice pour la charger plus rapidement par la suite.
#pd.DataFrame(mat).to_csv("Matrice_index.csv") TODO : decommenter cette ligne dangeureuse à la toute fin

68.05421900749207


In [19]:
# Chargeons la matrice d'index (déjà) créée dans un RDD 
#On ne conserve que le nombre de colonnes nécessaire :
mat_rdd = sc.textFile("Matrice_index.csv").map(lambda line: line.split(";")).map(lambda ligne : ligne[:n_obs])#.filter(lambda line: len(line)<=n_obs)

# On ne conserve que le nombre de lignes nécessaire :
mat_rdd = sc.parallelize(mat_rdd.take(n_obs)) # TODO : à optimiser, pas besoin de repasser par python normalement

# Test
mat_rdd.count() 

1500

La matrice d'index est telle que l'élément en position (i,j) soit la chaîne de caractère "i+1.j+1" comme on le constate ci-dessous. Ceci est lié à l'indexation des RDD qui commence en 1, et non en 0 comme les DataFrames Pandas.

**Remarque** : une autre façon de procéder est de créer la matrice d'index _via_ Excel, en copiant dans chaque cellule d'une plage `=CONCATENER(LIGNE();'.';COLONNE())`

### Calcul de la matrice de similarité entre les observations
Nous disposons à présent de toutes les fonctions nécessaires pour calculer la matrice des similarités. Dans un premier temps, on initialise cette matrice avec des valeurs nulles.

In [20]:
def fill_similarity_row(row):
    modified_row = [0]*len(row)
    for (i,item) in zip(range(0,len(row)),row):
        modified_row[i] = str(retourne_similarite(item)) #if retourne_similarite(item)!=0 else str(pref[0])
    return modified_row
# test
distances_row = fill_similarity_row(['1.1','1.2','1.3'])
#distances_row = fill_similarity_row(mat_rdd.take(1))
distances_row

['-0.0', '-71.4202228157', '-0.679715719324']

In [21]:
data = pd.DataFrame(data.collect()) #Nécessaire car Spark ne peut travailler qu'avec un RDD à la fois.

Jusque là nous avons essentiellement défini les fontions utiles au programme. À présent, nous "mappons" la matrice d'index à l'aide de ces fonctions.
De part la _lasy evaluation_, la portion de code suivante n'est exécutée que lorsque la fonction `collect()` est appelée, au bloc suivant.

In [22]:
mat_similarites = mat_rdd.map(lambda ligne : fill_similarity_row(ligne))

In [23]:
import time as t
debut = t.time()
mat_similarites_df = pd.DataFrame(mat_similarites.collect())
fin = t.time()
print('Temps d\'exécution :%.2f'%(fin-debut))
#mat_similarites.saveAsTextFile('matrice_similarites.txt') # Does'nt work...?
mat_similarites_df.to_csv("matrice_similarites.csv", sep = ';')

Temps d'exécution :26.98


**Remarque : ** vu que la matrice de similarité est symétrique, trouver un moyen de ne remplir que la moitié supérieure permettrait de diviser par 2 le temps de calcul. En python 'pur', cela pourrait se faire avec la fonction <a href = 'https://docs.scipy.org/doc/numpy/reference/generated/numpy.tril.html'> `np.tril()`</a>.

## Actualisation des matrices _Availabilities_ et _Responsabilities_

In [24]:
S = pd.read_csv("matrice_similarites.csv", delimiter=";", index_col=False)
S.drop(S.columns[[0]], axis=1, inplace=True) # suppression de la 1ère colonne 
S.to_csv('matrice_sim.csv', index = False, header = False)

Srdd = sc.textFile("matrice_sim.csv").cache()
S    = S.as_matrix() # matrix version python
rdd  = sc.parallelize(S) #version rdd à gérer plus tard

In [25]:
def actualise_A_and_R(S,R,A,coefficient_lissage,suivi_iteration):
    if (suivi_iteration):
        print("Rancien","\n",R)
        print("S","\n",S)
        print("A actuel","\n",A)
    nbre_points = S.shape[0] # Nombre de points
    
    ###################################
    ##  Calcul des responsabilities  ##
    ###################################
    
    # On stocke la matrice de resultat intermédiaire dans la matrice resultat
    resultat = np.zeros((nbre_points , nbre_points ))
    indices_des_points=np.arange(nbre_points)
    # On commence par ajouter la matrice A et la matrice S
    np.add(A,S,resultat) 
    # On recupère les indices des valeurs maximales sur chaque colonne
    I = np.argmax(resultat, axis=1) 

    # On recupère les valeurs maximales sur chaque colonne
    Y = resultat[indices_des_points, I]   
    resultat[indices_des_points, I] = -np.inf
    Y2 = np.max(resultat, axis=1)

    # On soustrait ces valeurs à la matrice de similarité
    np.subtract(S, Y[:, None], resultat)

    resultat[indices_des_points, I] = S[indices_des_points, I] - Y2

    # Lissage de la matrice R
    resultat=(1-coefficient_lissage)*resultat
    R=coefficient_lissage*R + resultat
    
    if(suivi_iteration):
        print("R actualisé","\n",R)
    
    ###################################
    ###   Calcul des availibities   ###
    ###################################

    np.maximum(R, 0, resultat) #On recupère le maximum de chaque ligne

    resultat.flat[::nbre_points + 1] = R.flat[::nbre_points + 1]

    resultat -= np.sum(resultat, axis=0)

    self_availabilities = np.diag(resultat).copy()

    resultat.clip(0, np.inf, resultat)

    resultat.flat[::nbre_points + 1] = self_availabilities

    # lissage de la matrice des availibilities
    A=coefficient_lissage*A - (1-coefficient_lissage)*resultat
    if(suivi_iteration):
        print("A actualisé",A)
        
    return(R,A) #on retourne les deux matrices actualisées

In [50]:
def run_affinity_clustering(S, pref, coefficient_lissage, nb_iterations, suivi_iteration = False):
    # On place la valeur de preference sur la diagonale de la matrice ( on prend le minimum par défaut)
    nbre_points = S.shape[0] # Nombre de points
    S.flat[::(nbre_points + 1)] = pref
    
    ### Initialisation des deux matrices Availabilities
    A = np.zeros((nbre_points,nbre_points))
    R = np.zeros((nbre_points,nbre_points))
    iteration=1
    while (iteration<=nb_iterations):
        R,A=actualise_A_and_R(S=S,R=R,A=A,coefficient_lissage=0.5,suivi_iteration=suivi_iteration)
        iteration=iteration+1
        
    # Affectation des clusters
    E = R + A
    is_cluster = np.where(np.diag(E)>0)[0] #is.cluster renvoie les indices des points representant chaque cluster
    K = is_cluster.size # Nombre de clusters
    # On renumérote les clusters de 0,1,2..
    numeros_clusters = S[:,is_cluster].argmax(1) 
    numeros_clusters[is_cluster] = np.arange(K)
    #On affecte chaque point à son cluster
    cluster_affecte = is_cluster[numeros_clusters]
   
    # Affichage des résultats
    print("Nombre_de_clusters trouvés:",K)
    print("\t")
    clusters_tries = [(list(cluster_affecte).count(x),x) for x in is_cluster]
    clusters_tries.sort(reverse=True)

    for numero_cluster,(nb_membres, representant) in enumerate(clusters_tries) :
        print ('Cluster n°%d  contenant %d points -->Représentant: point numéro %d'%(numero_cluster,nb_membres,representant))
        idxs = filter(lambda x: x[1] == representant, zip(range(len(cluster_affecte)),cluster_affecte))
        print ("Membres:",[x[0] for x in idxs])
        print("\t")
    return(R,A,K,is_cluster,cluster_affecte,numeros_clusters)

## Tests sur les données simulées

In [51]:
R,A,K,is_cluster,cluster_affecte,numero_clusters=run_affinity_clustering(S, pref = pref, coefficient_lissage=0.5,nb_iterations=500,suivi_iteration = False)

Nombre_de_clusters trouvés: 118
	
Cluster n°0  contenant 5 points -->Représentant: point numéro 95
Membres: [35, 95, 110, 127, 130]
	
Cluster n°1  contenant 4 points -->Représentant: point numéro 126
Membres: [74, 83, 126, 147]
	
Cluster n°2  contenant 3 points -->Représentant: point numéro 141
Membres: [13, 31, 141]
	
Cluster n°3  contenant 3 points -->Représentant: point numéro 136
Membres: [113, 136, 149]
	
Cluster n°4  contenant 3 points -->Représentant: point numéro 132
Membres: [12, 50, 132]
	
Cluster n°5  contenant 3 points -->Représentant: point numéro 121
Membres: [99, 121, 140]
	
Cluster n°6  contenant 3 points -->Représentant: point numéro 96
Membres: [67, 96, 111]
	
Cluster n°7  contenant 3 points -->Représentant: point numéro 87
Membres: [87, 90, 138]
	
Cluster n°8  contenant 3 points -->Représentant: point numéro 68
Membres: [68, 88, 146]
	
Cluster n°9  contenant 3 points -->Représentant: point numéro 36
Membres: [3, 36, 71]
	
Cluster n°10  contenant 3 points -->Représent

Après avoir implémenté l'affinity propagation sur des données simulées, essayons cet algorithme sur des données réelles : les Iris de Fisher.

## Application avec les Iris de Fisher
On ne présente plus le jeu de données _Iris_ (mais faisons-le quand même), largement utilisé en classification. 

150 fleurs décrites par 4 variables (la longueur et la largeur des sépales et pétales en cm) appartiennent à 3 espèces d'iris : setosa, virginica et versicolor. Le nombre "logique" de groupes que devrait trouver l'affinity propagation correspond donc au nombre d'espèces, 3.

In [18]:
from sklearn import datasets
iris = pd.DataFrame(datasets.load_iris().data)
iris.tail()

Unnamed: 0,0,1,2,3
145,6.7,3.0,5.2,2.3
146,6.3,2.5,5.0,1.9
147,6.5,3.0,5.2,2.0
148,6.2,3.4,5.4,2.3
149,5.9,3.0,5.1,1.8


In [9]:
iris.to_csv('iris.csv', index = False, header = False) # enregistrement des échantillons d'iris...
data = sc.textFile('iris.csv').cache()# ...et lecture de ces derniers avec Spark

In [10]:
data = data.map(lambda line: line.split(","))

In [11]:
n_cols = len(data.take(1)[0])
n_obs  = data.count()
# Conversion en float des strings
data   = data.map(lambda line : [float(line[i]) for i in range(n_cols)])
data.take(3)

[[5.1, 3.5, 1.4, 0.2], [4.9, 3.0, 1.4, 0.2], [4.7, 3.2, 1.3, 0.2]]

In [54]:
# Récupération du nombre de colonnes et d'observations
n_cols = len(data.take(1)[0])
n_obs  = data.count()
# Conversion en float des strings
data   = data.map(lambda line : [float(line[i]) for i in range(n_cols)])
# Centrage-réduction des données
data = centre_reduit(data)
# Création du vecteur de préférences
pref = preference(data,'moyenne')
# Chargement de la matrice d'index...
mat_rdd = sc.textFile("Matrice_index.csv").map(lambda line: line.split(";")).map(lambda ligne : ligne[:n_obs])
# ...et rognage pour qu'elle soit aux dimensions d'Iris
mat_rdd = sc.parallelize(mat_rdd.take(n_obs)) # TODO : à optimiser, pas besoin de repasser par python normalement
# Collecte des données
data = pd.DataFrame(data.collect())
# Et c'est parti ! 
mat_similarites = mat_rdd.map(lambda ligne : fill_similarity_row(ligne))
mat_similarites_df = pd.DataFrame(mat_similarites.collect())
mat_similarites_df.to_csv("matrice_similarites_iris.csv",sep = ';')

In [55]:
S = pd.read_csv("matrice_similarites_iris.csv", delimiter=";", index_col=False)
S.drop(S.columns[[0]], axis=1, inplace=True) # suppression de la 1ère colonne 
S.to_csv('matrice_sim.csv', index = False, header = False)

Srdd = sc.textFile("matrice_sim.csv").cache()
S    = S.as_matrix() # matrix version python
rdd  = sc.parallelize(S) #version rdd à gérer plus tard

In [63]:
R,A,K,is_cluster,cluster_affecte,numero_clusters=run_affinity_clustering(S, pref = -100, coefficient_lissage=0.5,nb_iterations=1000)

Nombre_de_clusters trouvés: 3
	
Cluster n°0  contenant 54 points -->Représentant: point numéro 54
Membres: [0, 2, 10, 11, 14, 19, 33, 34, 37, 40, 43, 44, 45, 48, 54, 55, 56, 63, 66, 72, 73, 74, 75, 76, 78, 79, 83, 84, 87, 89, 90, 91, 94, 98, 102, 105, 107, 108, 113, 116, 117, 118, 122, 126, 131, 133, 134, 136, 137, 138, 143, 145, 147, 149]
	
Cluster n°1  contenant 50 points -->Représentant: point numéro 49
Membres: [3, 4, 5, 7, 8, 12, 15, 16, 20, 21, 22, 23, 24, 26, 30, 36, 39, 41, 42, 46, 49, 50, 51, 57, 59, 64, 67, 69, 71, 77, 85, 86, 96, 97, 99, 100, 101, 106, 109, 111, 115, 120, 121, 123, 124, 129, 132, 140, 142, 144]
	
Cluster n°2  contenant 46 points -->Représentant: point numéro 128
Membres: [1, 6, 9, 13, 17, 18, 25, 27, 28, 29, 31, 32, 35, 38, 47, 52, 53, 58, 60, 61, 62, 65, 68, 70, 80, 81, 82, 88, 92, 93, 95, 103, 104, 110, 112, 114, 119, 125, 127, 128, 130, 135, 139, 141, 146, 148]
	


## Conclusions et critiques du travail rendu

- Tout n'est pas parallélisé
- Comment jouer avec la préférence pour influencer le nombre de clusters ?

- Ouverture : possibilité d'utiliser cet algo avant de faire tourner les k-means (gain de robustesse).

## Bibliographie

[1] W.C. Hung, C.Y. Chu and Y.L. Wu, "Map Reduce Affinity Propagation Clustering Algorithm", _International Journal of Electronics and Electrical Engineering, Vol. 3_, 4 August 2015.

[2] B. J. Frey and D. Dueck, "Clustering by passing Messages between Data Points", _ScienceMag, Vol. 315_, 16 February 2007.

[3] P. Redmond, J. A. Trono and D. Kronenberg, "Affinity Propagation and other Data Clustering Techniques", _SMC Clustering Paper Patrick Redmond_.

[4] X. Dupré, "Premiers pas avec Spark", <a href = "http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx2/notebooks/spark_first_steps.html">
*http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx2/notebooks/spark_first_steps.html*</a>

___

# Résolution distribuée d'un Sudoku 
Cette partie est un bonus, indépendant de l'algorithme présenté précédemment. L'objectif est de résoudre un sudoku de trois façons : en python, en programmation fonctionnelle, puis en utilisant une librairie _ad hoc_ de calcul parallèle. Nous comparerons alors les performances de ces 3 modes en termes de temps d'exécution.

<ol> 
    <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Résolution_séquentielle_en_python](#Résolution-séquentielle-en-python)</span></li>
    <ul>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Chargement_du_puzzle_a_résoudre_en_mémoire](#Chargement-du-puzzle-à-résoudre-en-mémoire)</span></li> 
        <li> <span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Définition_des_fonctions_de_recherche](#Définition-des-fonctions-de-recherche)</span></li>
        <li> <span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Résolution](#Résolution)</span> </li>
        </ul>
<br/>
    <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Version_parallélisée](#Version-parallélisée)</span></li>
    <ul>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Programmation_fonctionnelle:_map()](#Programmation-fonctionnelle-:-map()</span> </li>
        <li><span style="position:center; top:10px; right:5px; width:100px; height:90px; margin:10px;">[Parallélisation_avec_Joblib](#Parallélisation-avec-Joblib)</span> </li>
        </ul> 
</ol>

**Remarque** : Le présent programme ne résout que des sudokus 'simples', i.e. pour lesquels il n'est pas nécessaire de faire d'hypothèse et dont la solution est unique. L'exécution globale du programme est rapide, ne pas hésiter à le rejouer.




In [34]:
import csv
from functools import reduce
import time as t
os.chdir('/Users/pierredesmet/Documents/Documents Word/Etudes/UTT/ENSAE/Semestre 1/Éléments logiciels pour le traitement des données massives/Partie Sudoku')

## Résolution séquentielle en python

Prenons le sudoku suivant qui nous servira d'exemple de base. Il est tiré de la <a href = "https://fr.wikipedia.org/wiki/Sudoku"> page wikipédia </a>dédiée aux sudoku.
<img src="https://upload.wikimedia.org/wikipedia/commons/f/ff/Sudoku-by-L2G-20050714.svg" alt="sdk_wikipedia" align = 'center' style="width:200px;height:200px;hspace=10">

Pour traduire cette image en données compréhensibles par une machine, nous renseignons manuellement les données dans un fichier Excel comme suit :
<img src="http://img11.hostingpics.net/pics/501441Capturedcran20161227172935.png" alt = "sdk_csv" style = "height:180px">

### Chargement du puzzle à résoudre en mémoire
Commençons par charger le sudoku à résoudre sous forme de Dataframe Pandas. Celui-ci est un fichier csv dont les cases non-résolues sont remplies avec des zéros.

In [35]:
with open('sudoku1.csv', 'r', encoding='utf-8',newline = "") as f :
    sdk = pd.read_csv(f, sep=';',
    header=None,
    index_col=False
    )
sdk

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,5,3,0,0,7,0,0,0,0
1,6,0,0,1,9,5,0,0,0
2,0,9,8,0,0,0,0,6,0
3,8,0,0,0,6,0,0,0,3
4,4,0,0,8,0,3,0,0,1
5,7,0,0,0,2,0,0,0,6
6,0,6,0,0,0,0,2,8,0
7,0,0,0,4,1,9,0,0,5
8,0,0,0,0,8,0,0,7,9


### Définition des fonctions de recherche
Définissons tout d'abord 2 fonctions donnant les chiffres possibles pour une ligne unique (resp. une colonne unique).

In [36]:
def dispo_sur_ligne(ligne,sudoku):
    posibilites = np.setdiff1d([1,2,3,4,5,6,7,8,9],sudoku.iloc[ligne],assume_unique = True)
    return posibilites
def dispo_sur_colonne(colonne,sudoku):
    posibilites = np.setdiff1d([1,2,3,4,5,6,7,8,9],sudoku[colonne],assume_unique = True)
    return posibilites
# Test : quelles sont les possibilités sur la première ligne ?
dispo_sur_ligne(0,sdk)

array([1, 2, 4, 6, 8, 9])

Pour les régions c'est un peu plus compliqué. Essayons déjà de connaître la région associée à une cellule donnée :

In [37]:
def give_region(ligne,colonne):
    idx_ligne,idx_colonne = (0,0)
    if (ligne<=8) & (colonne <= 8):
        idx_ligne = ligne//3
        idx_colonne = colonne//3
    return((idx_ligne,idx_colonne))
# Test : quelle est la région de la cellule (2,7) ?
print('Région de la cellule (2,7) :',give_region(2,7))

Région de la cellule (2,7) : (0, 2)


In [38]:
# Recherche des éléments de la région associée à la cellule (i,j)
def give_presents(i,j,sdk):
    liste = []
    liste.append(sdk.loc[i,j])
    liste.append(sdk.loc[i,j+1])
    liste.append(sdk.loc[i,j+2])
    liste.append(sdk.loc[i+1,j])
    liste.append(sdk.loc[i+1,j+1])
    liste.append(sdk.loc[i+1,j+2])
    liste.append(sdk.loc[i+2,j])
    liste.append(sdk.loc[i+2,j+1])
    liste.append(sdk.loc[i+2,j+2])
    return liste
# Test : quels sont les éléments de la région associée à la cellule (0,0) ?
print('Éléments de la région associée à la cellule (0,0) :',give_presents(0,0,sdk))

def dispo_sur_region(ligne,colonne,sudoku):
    region = give_region(ligne,colonne)
    if region == (0,0):
        presents = give_presents(0,0,sdk)
    elif region == (0,1):
        presents = give_presents(0,3,sdk)
    elif region == (0,2):
        presents = give_presents(0,6,sdk)
    elif region == (1,0):
        presents = give_presents(3,0,sdk)
    elif region == (1,1):
        presents = give_presents(3,3,sdk)
    elif region == (1,2):
        presents = give_presents(3,6,sdk)
    elif region == (2,0):
        presents = give_presents(6,0,sdk)
    elif region == (2,1):
        presents = give_presents(6,3,sdk)
    elif region == (2,2):
        presents = give_presents(6,6,sdk)
    possibilites = np.setdiff1d([1,2,3,4,5,6,7,8,9],presents,assume_unique = True)
    return possibilites
# Test : quels sont les possibilités pour la région associée à la cellule (0,0) ?
print('Nombres possibles dans cette région :',dispo_sur_region(0,0,sdk))

Éléments de la région associée à la cellule (0,0) : [5, 3, 0, 6, 0, 0, 0, 9, 8]
Nombres possibles dans cette région : [1 2 4 7]


In [39]:
def possible(ligne,colonne,sudoku):
    possibilites = reduce(np.intersect1d, (dispo_sur_ligne(ligne,sudoku),
                                          dispo_sur_colonne(colonne,sudoku),
                                          dispo_sur_region(ligne,colonne,sudoku)))
    return possibilites
# Test : quelles sont les valeurs possibles pour la cellule (2,4) ?
print('Chiffres possibles pour la cellule (1,1) :', possible(1,1,sdk))

Chiffres possibles pour la cellule (1,1) : [2 4 7]


### Résolution 
Place à la résolution ! Tant qu'il reste des zéros (i.e. des valeurs non trouvées), la recherche de solutions continue. S'il existe une unique solution, elle s'afiche.

In [40]:
debut = t.time()
# Tant que toutes les cellules ne sont pas remplies...
while np.count_nonzero(sdk)!= 81:
    # Pour chaque cellule...
    for i in np.arange(0,9):
        for j in np.arange(0,9):
            # Si une seule possibilité existe pour la cellule, y inscrire cette possibilité
            if (len(possible(i,j,sdk)) == 1) & (sdk.loc[i,j] == 0): 
                sdk.loc[i,j] = possible(i,j,sdk)
fin = t.time()
print('Temps d\'exécution : %.3f'%(fin-debut),'secondes.')
sdk

Temps d'exécution : 1.427 secondes.


Unnamed: 0,0,1,2,3,4,5,6,7,8
0,5,3,4,6,7,8,9,1,2
1,6,7,2,1,9,5,3,4,8
2,1,9,8,3,4,2,5,6,7
3,8,5,9,7,6,1,4,2,3
4,4,2,6,8,5,3,7,9,1
5,7,1,3,9,2,4,8,5,6
6,9,6,1,5,3,7,2,8,4
7,2,8,7,4,1,9,6,3,5
8,3,4,5,2,8,6,1,7,9


## Version parallélisée

### Programmation fonctionnelle : map()
On se propose de résoudre le même sudoku mais en programmation fonctionnelle. Rechargons-le dans un premier temps.

In [41]:
# Relecture du Sudoku initial, non-résolu
with open('sudoku1.csv', 'r', encoding='utf-8',newline = "") as f :
    sdk = pd.read_csv(f, sep=';',
    header=None,
    index_col=False
    )
sdk

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,5,3,0,0,7,0,0,0,0
1,6,0,0,1,9,5,0,0,0
2,0,9,8,0,0,0,0,6,0
3,8,0,0,0,6,0,0,0,3
4,4,0,0,8,0,3,0,0,1
5,7,0,0,0,2,0,0,0,6
6,0,6,0,0,0,0,2,8,0
7,0,0,0,4,1,9,0,0,5
8,0,0,0,0,8,0,0,7,9


L'idée est d'arriver à une syntaxe de la forme **map(**`fonction`*, *`cases_à_résoudre`**)**, où `fonction` résout une unique case, et va être appliquée aux `cases_à_résoudre` en parallèle. 

Commençons par définir cette fonction.

In [42]:
# La fonction suivante résout une case individuelle (si c'est possible sans faire d'hypothèse)
def resolution(case,sudoku):
    possibilites = reduce(np.intersect1d, (dispo_sur_ligne(case[0],sudoku),
                                          dispo_sur_colonne(case[1],sudoku),
                                          dispo_sur_region(case[0],case[1],sudoku)))
    if (len(possibilites) == 1): 
                sudoku.loc[case[0],case[1]] = possibilites
# Test : résolvons la cellule centrale (4,4) :
print('Sudoku avant...\n',sdk)
resolution([4,4],sdk)
print('\n...Sudoku après.\n',sdk)

Sudoku avant...
    0  1  2  3  4  5  6  7  8
0  5  3  0  0  7  0  0  0  0
1  6  0  0  1  9  5  0  0  0
2  0  9  8  0  0  0  0  6  0
3  8  0  0  0  6  0  0  0  3
4  4  0  0  8  0  3  0  0  1
5  7  0  0  0  2  0  0  0  6
6  0  6  0  0  0  0  2  8  0
7  0  0  0  4  1  9  0  0  5
8  0  0  0  0  8  0  0  7  9

...Sudoku après.
    0  1  2  3  4  5  6  7  8
0  5  3  0  0  7  0  0  0  0
1  6  0  0  1  9  5  0  0  0
2  0  9  8  0  0  0  0  6  0
3  8  0  0  0  6  0  0  0  3
4  4  0  0  8  5  3  0  0  1
5  7  0  0  0  2  0  0  0  6
6  0  6  0  0  0  0  2  8  0
7  0  0  0  4  1  9  0  0  5
8  0  0  0  0  8  0  0  7  9


Enfin, nous "mappons" chacune des cases non résolues avec la fonction de résolution individuelle. La <a href = 'https://docs.python.org/2/library/functions.html#map'> fonction map()</a> de python permet l'exécution parallélisée des calculs, comme en atteste la documentation :

> Apply function to every item of iterable and return a list of the results. If additional iterable arguments are passed, function must take that many arguments and is applied to the items from all iterables in parallel.

In [43]:
# Tant que toutes les cellules ne sont pas remplies...
debut = t.time()
while np.count_nonzero(sdk)!= 81:
    # Listons les indices des cases à résoudre
    cases_à_remplir = []
    for i in np.arange(0,9):
            for j in np.arange(0,9):
                if sdk.loc[i,j] == 0:
                    cases_à_remplir.append([i,j])
    list(map(lambda x : resolution(x,sdk),cases_à_remplir))
fin = t.time()
print('Temps d\'exécution : %.7f'%(fin-debut),'secondes.')
sdk

Temps d'exécution : 0.5514050 secondes.


Unnamed: 0,0,1,2,3,4,5,6,7,8
0,5,3,4,6,7,8,9,1,2
1,6,7,2,1,9,5,3,4,8
2,1,9,8,3,4,2,5,6,7
3,8,5,9,7,6,1,4,2,3
4,4,2,6,8,5,3,7,9,1
5,7,1,3,9,2,4,8,5,6
6,9,6,1,5,3,7,2,8,4
7,2,8,7,4,1,9,6,3,5
8,3,4,5,2,8,6,1,7,9


L'exécution des calculs en parallèle nous fournit des résultats environ 2,5 fois plus rapidement qu'en python séquentiel.

### Parallélisation avec Joblib
Testons à présent la librairie de calcul distribué Joblib.

In [44]:
# Relecture du Sudoku initial, non-résolu
with open('sudoku1.csv', 'r', encoding='utf-8',newline = "") as f :
    sdk = pd.read_csv(f, sep=';',
    header=None,
    index_col=False
    )
sdk

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,5,3,0,0,7,0,0,0,0
1,6,0,0,1,9,5,0,0,0
2,0,9,8,0,0,0,0,6,0
3,8,0,0,0,6,0,0,0,3
4,4,0,0,8,0,3,0,0,1
5,7,0,0,0,2,0,0,0,6
6,0,6,0,0,0,0,2,8,0
7,0,0,0,4,1,9,0,0,5
8,0,0,0,0,8,0,0,7,9


In [45]:
from joblib import Parallel, delayed
#https://pythonhosted.org/joblib/parallel.html
#http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td2a_cenonce_session_2D_parallelisation_local.html

In [46]:
# Listons les indices des cases qu'il faut résoudre
debut = t.time()
while np.count_nonzero(sdk)!= 81:
    cases_à_remplir = []
    for i in np.arange(0,9):
            for j in np.arange(0,9):
                if sdk.loc[i,j] == 0:
                    cases_à_remplir.append([i,j])
    Parallel(n_jobs=2,backend = 'threading')(delayed(resolution) (x,sdk) for x in cases_à_remplir)
fin = t.time()
print('Temps d\'exécution : %.7f'%(fin-debut),'secondes.')
sdk

Temps d'exécution : 0.9897740 secondes.


Unnamed: 0,0,1,2,3,4,5,6,7,8
0,5,3,4,6,7,8,9,1,2
1,6,7,2,1,9,5,3,4,8
2,1,9,8,3,4,2,5,6,7
3,8,5,9,7,6,1,4,2,3
4,4,2,6,8,5,3,7,9,1
5,7,1,3,9,2,4,8,5,6
6,9,6,1,5,3,7,2,8,4
7,2,8,7,4,1,9,6,3,5
8,3,4,5,2,8,6,1,7,9


## Conclusion
Pour un unique sudoku, les meilleurs résultats sont fournis par la programmation fonctionnelle (`map()`), puis par Joblib, et enfin en Python 'pur'. Attention cependant à ne pas tirer de conclusion rapide : sur de gros volumes de données, les performances pourraient bien être différentes.

| **Méthode**           | Python "pur"    | Package Joblib | Programmation fonctionnelle |
|-------------------|-----------------|----------------|-----------------------------|
| **Temps d'exécution** | 1.1611 seconde | 1.1281 seconde        | 0.5435 seconde |