# Présentation du notebook

Ce notebook présente le code produit par Arnaud PAGNIEZ et Yann TEKAM afin de reproduire un algorithme de calcul distribué appliqué à la technique d'Affinity Propagation.

# Test du setup Spark

In [1]:
### Test des variables d'environnements

import os
import numpy as np
for o, v in sorted(os.environ.items()):
    if "SPARK" in o.upper():
        print("{0:25}= {1}".format(o, v.replace(os.environ["USERNAME"], "<username>")))

PYSPARK_DRIVER_PYTHON    = jupyter-notebook
PYSPARK_SUBMIT_ARGS      = "--name" "PySparkShell" "pyspark-shell" 
SPARK_CMD                = set PYSPARK_SUBMIT_ARGS="--name" "PySparkShell" "pyspark-shell" && jupyter-notebook 
SPARK_ENV_LOADED         = 1
SPARK_HOME               = C:\Users\<username>\Spark\bin\..
SPARK_JARS_DIR           = "C:\Users\<username>\Spark\bin\..\jars"
SPARK_SCALA_VERSION      = 2.10
_SPARK_CMD_USAGE         = Usage: bin\pyspark.cmd [options]


In [7]:
### Petit test de fonctionnement de Spark

from pyquickhelper.filehelper import remove_folder
def clean(folder):
    if os.path.exists(folder):
        return remove_folder(folder)
    else:
        return []
clean("fichier.out.txt")

[('fichier.out.txt\\.part-00000.crc', 'file'),
 ('fichier.out.txt\\.part-00001.crc', 'file'),
 ('fichier.out.txt\\._SUCCESS.crc', 'file'),
 ('fichier.out.txt\\part-00000', 'file'),
 ('fichier.out.txt\\part-00001', 'file'),
 ('fichier.out.txt\\_SUCCESS', 'file'),
 ('fichier.out.txt', 'dir')]

In [8]:
### Test de fonctionnement de Spark

text_file = sc.textFile("Spark-ELTDM-PAGNIEZ-TEKAM.ipynb")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("fichier.out.txt")
os.listdir("fichier.out.txt/")

['.part-00000.crc',
 '.part-00001.crc',
 '._SUCCESS.crc',
 'part-00000',
 'part-00001',
 '_SUCCESS']

In [9]:
### Définition de plusieurs fonctions utiles à la manipulation des RDD et à notre code en général

def read_rdd(path, **options):
    pat = os.path.join(path, "part*")
    all_files = glob.glob(pat)
    if len(all_files) == 0:
        raise Exception("No file to read in '{0}'".format(path))
    merge = []
    for f in all_files:
        try:
            df = pandas.read_csv(f, header=None, **options)
        except Exception as e:
            raise Exception("Unable to read '{0}'".format(f)) from e
        merge.append(df)
    if len(merge) == 0:
        raise Exception("No file to read in '{0}'".format(path))
    concatenated_df = pandas.concat(merge, ignore_index=True)
    return concatenated_df

def extract_column(cols, row):
    spl = row.split(";")
    return [spl[i].strip() for i in cols]

def filter_column(row):
    spl = row.split(";")
    return spl[-1].strip() != "<=50K"

def filter_column_split(row):
    return row[-1].strip() != "<=50K"

def extract_age_rich(row):
    spl = row.split(";")
    target = spl[-1].strip()
    age = float(spl[0])
    return (age, target)

def custom_agg(aggset):
    temp = list([_[0] for _ in aggset])
    return len(temp), sum(temp)

def extract_column_and_multiply_row(n, row):
    spl = row.split(";")
    return [tuple(_.strip() for _ in spl)] * n

def extract_value(x,i,j):
    a = x.map(lambda row: extract_column([j],row))
    return float(a.collect()[i][0])

def convert_to_float_array(arr):

    b = []

    for e in arr:
        if (isinstance(e,str)):
            ind = e.index(',')
            temp = float(e[:ind]) + 10**(-len(e) + ind + 1)*float(e[ind+1:])
            b.append(temp)
        else:
            b.append(float(e))
    
    return np.array(b)

# Début de l'algorithme de Map/Reduce Affinity Propagation

In [10]:
#### Importation de quelques packages (possible grâce au setup spark)
import glob
import pandas
import os
assert os.path.exists("AP_sample.csv")
import time

#### Importation de toutes les données grâce à pandas

df = pandas.read_csv("AP_sample.csv", sep=";")
df.set_index(df.columns[0],inplace = True)
df.index.set_names('Index',inplace = True)

### Création d'une table de test (petite)

df_temp = df.iloc[:10,:]
df_temp.to_csv('small_sample.csv', sep=';')

### Création d'un rdd à partir de notre table de test 

rdd = sc.textFile("small_sample.csv")

In [11]:
### Premier test de création de Spark DataFrame

lines = sc.textFile('small_sample.csv')
rdd = lines.map(lambda x: x.split(';'))

header = rdd.first()
data = rdd.filter(lambda row : row != header).toDF(header)

data.schema

StructType(List(StructField(Index,StringType,true),StructField(0,StringType,true),StructField(1,StringType,true),StructField(2,StringType,true),StructField(3,StringType,true),StructField(4,StringType,true),StructField(5,StringType,true),StructField(6,StringType,true),StructField(7,StringType,true),StructField(8,StringType,true),StructField(9,StringType,true),StructField(10,StringType,true),StructField(11,StringType,true),StructField(12,StringType,true),StructField(13,StringType,true),StructField(14,StringType,true),StructField(15,StringType,true),StructField(16,StringType,true),StructField(17,StringType,true),StructField(18,StringType,true),StructField(19,StringType,true),StructField(20,StringType,true),StructField(21,StringType,true),StructField(22,StringType,true),StructField(23,StringType,true),StructField(24,StringType,true),StructField(25,StringType,true),StructField(26,StringType,true),StructField(27,StringType,true),StructField(28,StringType,true),StructField(29,StringType,true)

In [12]:
### Finalement solution plus courte et efficace pour créer une DataFrame Spark qui comporte le bon schéma de variablesS

test = spark.createDataFrame(df_temp)
test.schema

StructType(List(StructField(0,LongType,true),StructField(1,LongType,true),StructField(2,LongType,true),StructField(3,LongType,true),StructField(4,LongType,true),StructField(5,LongType,true),StructField(6,LongType,true),StructField(7,LongType,true),StructField(8,LongType,true),StructField(9,LongType,true),StructField(10,LongType,true),StructField(11,LongType,true),StructField(12,LongType,true),StructField(13,LongType,true),StructField(14,LongType,true),StructField(15,LongType,true),StructField(16,LongType,true),StructField(17,LongType,true),StructField(18,LongType,true),StructField(19,LongType,true),StructField(20,LongType,true),StructField(21,LongType,true),StructField(22,LongType,true),StructField(23,LongType,true),StructField(24,LongType,true),StructField(25,LongType,true),StructField(26,LongType,true),StructField(27,LongType,true),StructField(28,LongType,true),StructField(29,LongType,true),StructField(30,LongType,true),StructField(31,LongType,true),StructField(32,LongType,true),Stru

In [22]:
### Mise à jour des dataframes pour gérer les éventuels str restants ###
t0 = time.time()

arr = df.values.copy()
for i in range(len(df.index)):
    arr[i,:] = convert_to_float_array(arr[i,:])
    
df = pandas.DataFrame(arr)
   
print("runtime = {} s".format(time.time() - t0))
print(df.max().max())

runtime = 1.5960910320281982 s
243.0


In [23]:
##### Création du dictionnaire des vecteurs X de notre dataset grâce à Spark

from pyspark.mllib.linalg import Vectors

len_samples = df_temp.shape[0]
nb_col = df_temp.shape[1]

dict_vectors = {}

for i in range(len_samples):
    arr = convert_to_float_array(df_temp.iloc[i].values)
    vector = Vectors.dense(arr)
    
    dict_vectors[i] = vector

In [24]:
## Initialisation de la matrice d'affinités 
### Mise en place de l'algorithme AP dans une fonction dédiée

t0 =time.time()

def affinity_propagation(df,n_iter=100,lamb=0.5,n_convergence_iter=5,pref = "min"):
    
    from pyspark.mllib.linalg import Vectors

    len_samples = df.shape[0]
    nb_col = df.shape[1]

    dict_vectors = {}

    for i in range(len_samples):
        arr = convert_to_float_array(df.iloc[i].values)
        vector = Vectors.dense(arr)

        dict_vectors[i] = vector
    
    ### Initialisation des matrices de l'algorithme
    
    distance_matrix = np.zeros((len_samples,len_samples))
    availability_matrix = np.zeros((len_samples,len_samples))
    responsability_matrix = np.zeros((len_samples,len_samples))
    
    ### Calcul de matrice des distances grâce à Spark
    
    for i in range(len_samples):
        for j in range(len_samples):
            distance_matrix[i,j] = -np.sqrt(dict_vectors[i].squared_distance(dict_vectors[j]))

    ## Création des préférences
    
    if( pref == "min"):
        preferences = np.min(distance_matrix.flatten())
    else:
        preferences = np.median(distance_matrix.flatten())

    for i in range(len_samples):
        distance_matrix[i,i] =  preferences 

    nb_clusters = []
    
    #### Itérations des mises à jour jusqu'a convergence
    
    for u in range(n_iter):

        for i in range(len_samples):
            
            arr_temp = [availability_matrix[i,k] + distance_matrix[i,k] for k in range(len_samples)]
            value_temp = distance_matrix[i,i] - np.max([distance_matrix[i,k] for k in range(len_samples) if k!=i])
            
            for j in range(len_samples):
                
                if (i != j):
                    temp = distance_matrix[i,j] - np.max(np.delete(arr_temp,j))
                    responsability_matrix[i,j] = lamb * responsability_matrix[i,j] + (1-lamb) * temp
                else:
                    responsability_matrix[i,j] = lamb * responsability_matrix[i,j] + (1-lamb) * value_temp
                                                
        for i in range(len_samples):
                                                         
            value_temp = np.sum([np.max([0,responsability_matrix[k,i]]) for k in range(len_samples) if (k!=i)])
            
            for j in range(len_samples):
                                                         
                if (i != j):
                    temp = min(0,responsability_matrix[j,j] + \
                            np.sum([np.max([0,responsability_matrix[k,j]]) for k in range(len_samples) if (k!=j and k!=i)]))
                    availability_matrix[i,j] = lamb * availability_matrix[i,j] + (1-lamb)*temp
                else:
                    
                    availability_matrix[i,j] = lamb * availability_matrix[i,j] + (1-lamb)*value_temp


        total = availability_matrix + responsability_matrix
        K = np.sum(np.diag(total) > 0 )

        nb_clusters.append(K)

        if(len(nb_clusters)>n_convergence_iter):
            test = np.sum(nb_clusters[u - n_convergence_iter:] == K)
            if (test == n_convergence_iter):
                print('Algo Converged after {} iterations'.format(u))
                break
    
    #### Identification des clusters et mise en forme du résultat final
    
    I = np.where(np.diag(availability_matrix + responsability_matrix) > 0)[0]
    K = I.size 

    if K > 0:
        c = np.argmax(distance_matrix[:, I], axis=1)
        c[I] = np.arange(K)  # Identification des clusters
        
        for k in range(K):
            ii = np.where(c == k)[0]
            j = np.argmax(np.sum(distance_matrix[ii[:, np.newaxis], ii], axis=0))
            I[k] = ii[j]

        c = np.argmax(distance_matrix[:, I], axis=1)
        c[I] = np.arange(K)
        labels = I[c]
        # Création des labels par points
        cluster_centers_indices = np.unique(labels)
        labels = np.searchsorted(cluster_centers_indices, labels)
    else:
        labels = np.empty((len_samples, 1))
        cluster_centers_indices = None
        labels.fill(np.nan)
    
    return cluster_centers_indices, labels, u, preferences

print("runtime = {} s".format(time.time() - t0))

runtime = 0.005000114440917969 s


In [25]:
### Test de la fonction

t0 = time.time()
a,b,c,d = affinity_propagation(df_temp,100,0.9,5)
print('runtime = {} s'.format(time.time() - t0))

Algo Converged after 46 iterations
runtime = 1.9741120338439941 s


In [26]:
### Fonction permettant de créer la hash table <cluster center, cluster members>

def local_computation (df,n_iter=100,n_convergence_iter=5,damp=0.5,pref = "min"):
    
    cluster_centers_indices, labels, n_iter, pref = affinity_propagation(df,n_iter,damp,n_convergence_iter,pref)
    
    points_centers = {}
    
    if(cluster_centers_indices != None):
        if(len(cluster_centers_indices)>0):

            for e in cluster_centers_indices:

                lab = np.searchsorted(cluster_centers_indices,e)

                points_centers[e] = np.where(labels == lab)[0]
    else: 
        return("No cluster centers")
    
    return points_centers, pref

In [27]:
#### Test de la fonction de hachage ###

t0 = time.time()

df1 = df.iloc[:50,:]
df2 = df.iloc[50:100,:]

pts_centers1, pref = local_computation(df1,100,5,0.9,"min")
pts_centers2, pref = local_computation(df2,100,5,0.9,"min")

print("runtime = {} s".format(time.time() - t0))

Algo Converged after 21 iterations




Algo Converged after 24 iterations
runtime = 130.87148594856262 s


In [28]:
### Table de hachage 1 ###

pts_centers1

{35: array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
        17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
        34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], dtype=int32)}

In [29]:
### Table de hachage 2 ####

pts_centers2

{7: array([ 0,  1,  7, 10, 11, 12, 14, 15, 16, 17, 20, 21, 22, 23, 24, 30, 31,
        34, 40, 41, 43, 44, 46, 47, 48], dtype=int32),
 9: array([ 2,  3,  4,  5,  6,  8,  9, 13, 18, 19, 25, 26, 27, 28, 29, 32, 33,
        35, 36, 37, 38, 39, 42, 45, 49], dtype=int32)}

In [54]:
#### Fonction permettant de rassembler les résultats des différents mappers ####

def merge (df1,df2, points_centers1, points_centers2, pref):
    
    if(isinstance(points_centers1,str) or isinstance(points_centers2,str)):
        print("No clusters to merge")
        return 0.0
    
    keys1 = [e for e in points_centers1.keys()]
    keys2 = [e for e in points_centers2.keys()]
    
    centers_1 = df1.iloc[keys1, :].values
    centers_2 = df2.iloc[keys2, :].values
    
    n1 = df1.shape[0]
    n2 = df2.shape[0]
    
    df_full = pandas.concat([df1,df2])
    
    dict_vectors_1 = {}
    dict_vectors_2 = {}
    
    for i in keys1:
        arr = convert_to_float_array(df1.iloc[i].values)
        vector = Vectors.dense(arr)

        dict_vectors_1[i] = vector
    
    for i in keys2:
        arr = convert_to_float_array(df2.iloc[i].values)
        vector = Vectors.dense(arr)

        dict_vectors_2[i] = vector
    
    distance_centers = np.zeros([len(keys1),len(keys2)])
    
    threshold = 0.5*np.abs(pref)
    
    to_merge = []
    
    for i in range(len(keys1)):
        for j in range(len(keys2)):
            
            a = np.sqrt(dict_vectors_1[keys1[i]].squared_distance(dict_vectors_2[keys2[j]]))
            distance_centers[i,j] = a
            
            if(a < threshold):
                if(len(to_merge)>0.0):
                    temp1 = [e[0] for e in to_merge]
                    temp2 = [e[1] for e in to_merge]
                    
                    if not((i in temp1) or (j in temp2)):
                        to_merge.append([i,j])
                else:
                    to_merge.append([i,j])
                
    new_centers = {}
    
    for i in range(len(keys1)):
        key = keys1[i]
        if not(i in [e[0] for e in to_merge]):
            new_centers[key] = points_centers1[key]
    for i in range(len(keys2)):
        key = keys2[i]
        if not(i in [e[1] for e in to_merge]):
            new_centers[key + n1] = np.array([ e + n1 for e in points_centers2[key]])
                
    if (len(to_merge) > 0):
        
        
        for i,j in to_merge:
            
            all_points = [e for e in points_centers1[keys1[i]]] + [e + n1 for e in points_centers2[keys2[j]]]
            
            print(keys1[i],keys2[j])
            new_center = np.mean(np.array([convert_to_float_array(df1.iloc[keys1[i]].values),convert_to_float_array(df2.iloc[keys2[j]].values)]),axis = 0)
            
            new_centers[len(df_full)] = np.array(all_points)
            
            df_full = df_full.append(pandas.Series(new_center), ignore_index = True)
            df_full = df_full.drop(df_full.index[[i,n1 + j]])
        
    
    return distance_centers, df_full, new_centers


In [55]:
## Test de la fonction ####

t0 = time.time()

a, b, final_hash = merge(df1,df2,pts_centers1,pts_centers2,pref)

print("runtime = {} s".format(time.time() - t0))
print(a)

35 9
runtime = 0.010000944137573242 s
[[ 60.61725497  62.98258807]]


In [56]:
final_hash

{57: array([50, 51, 57, 60, 61, 62, 64, 65, 66, 67, 70, 71, 72, 73, 74, 80, 81,
        84, 90, 91, 93, 94, 96, 97, 98], dtype=int32),
 100: array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
        17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
        34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 52,
        53, 54, 55, 56, 58, 59, 63, 68, 69, 75, 76, 77, 78, 79, 82, 83, 85,
        86, 87, 88, 89, 92, 95, 99], dtype=int32)}

In [58]:
### Table de hachage 1 ###
pts_centers1

{35: array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
        17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
        34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], dtype=int32)}

In [59]:
### Table de hachage 2 ###
pts_centers2

{7: array([ 0,  1,  7, 10, 11, 12, 14, 15, 16, 17, 20, 21, 22, 23, 24, 30, 31,
        34, 40, 41, 43, 44, 46, 47, 48], dtype=int32),
 9: array([ 2,  3,  4,  5,  6,  8,  9, 13, 18, 19, 25, 26, 27, 28, 29, 32, 33,
        35, 36, 37, 38, 39, 42, 45, 49], dtype=int32)}

In [60]:
#### Table de hachage résultante ###

final_hash

{57: array([50, 51, 57, 60, 61, 62, 64, 65, 66, 67, 70, 71, 72, 73, 74, 80, 81,
        84, 90, 91, 93, 94, 96, 97, 98], dtype=int32),
 100: array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
        17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
        34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 52,
        53, 54, 55, 56, 58, 59, 63, 68, 69, 75, 76, 77, 78, 79, 82, 83, 85,
        86, 87, 88, 89, 92, 95, 99], dtype=int32)}

# Conclusion

Ce notebook présente donc les fonctions nécessaires au calcul distribué appliqué à l'Affinity Propagation et implémente un exemple sur lequel on voit bien que la table de hachage finale correspond au résultat attendu.

Il resterait maintenant à implémenter les fonctions présentées ci-dessus sur un cluster réel afin de paralléliser de manière effective la technique d'Affinity Propagation sur la base de données considérée.