
# Pour le bon fonctionnement de ce notebook, il faut :
 ##### 1- Il faut que le notebook soit en mode trusted pour pouvoir exécuter les cellules qui comprennent des graphiques
 ##### 2- Installer les librairies suivantes : pandas, numpy, matplotlib, psutil, pyarrow, scikit-learn, ipympl, apache-beam
 ##### 3- Exécuter les cellules si les graphiques ne sont pas affichés dès l'ouverture du notebook.
 ##### 4- Exécuter les cellules pour lancer l'animation de la partie B qui montre les changements  des centres et des effectifs suite à l'arrivée de chaque batch (le notebook doit être en mode trusted).


In [1]:
!pip install ipympl
import numpy as np
import pandas as pd
import psutil
import pyarrow.parquet as pq
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import os
import apache_beam as beam
import logging
import random
import typing
from sklearn.cluster import KMeans
from functools import reduce
logging.root.setLevel(logging.ERROR)
# pour afficher les graphiques interactifs
%matplotlib notebook 




[notice] A new release of pip is available: 23.3.1 -> 23.3.2
[notice] To update, run: C:\Users\driou\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


# Génération du dataset

Les données sont générées aléatoirement à partir de deux centres de classes.
Chaque classe contient 100 points de données.
Chaque point de données est un vecteur de deux dimensions (X et Y).

In [2]:
# Générer les données pour la première classe
np.random.seed(42) # pour avoir les memes données à chaque exécution
class1_centroid = np.array([5, 5])
class1_points = class1_centroid + np.random.randn(100, 2)

# Générer les données pour la deuxième classe
class2_centroid = np.array([10, 10])
class2_points = class2_centroid + np.random.randn(100, 2)

# Concaténer les deux classes
data = np.concatenate([class1_points, class2_points])

# Créer un DataFrame et enregistrer les données dans un fichier CSV
df = pd.DataFrame(data, columns=['X', 'Y'])
df.to_csv('donnees.csv',index_label=False)


# Solution pour lire les données avec peu de mémoire consomée

##### La lecture de données par morceaux (chunks) permet de lire les données sans les charger entièrement dans la mémoire. Cela assure la lecture des fichiers volumineux sans surcharger la mémoire.
##### En effet, l'objet retourné par la fonction read_csv est un itérateur qui permet d'itérer sur chaque chunk (groupe de données) et faire le traitement nécessaire.


#### 1_ Implémentation de la lecture par morceaux et affichage de l'usage mémoire

In [3]:
iterator = pd.read_csv("donnees.csv" , chunksize=20) # Lire le fichier par morceaux de 20 lignes
donnes = next(iterator) # Prendre le premier chunk sans charger le fichier entièrement dans la mémoire
print(donnes.info(verbose=False, memory_usage='deep')) # Afficher l'information sur le chunk
# mémory usage : indique la taille de mémoire utilisée pour stocker le chunk

<class 'pandas.core.frame.DataFrame'>
Index: 20 entries, 0 to 19
Columns: 2 entries, X to Y
dtypes: float64(2)
memory usage: 480.0 bytes
None


#### 2_ Comparaison entre la lecture par morceaux et la lecture entière
###### L'affichage ci-dessous montre que La taille mémoire occupée par le chunk pointé par l'itérateur est inférieure à la taille mémoire occupée par le fichier entier, cette différence est beaucoup plus importante lorsque la taille du fichier augmente.

In [4]:
donnes_entiere= pd.read_csv("donnees.csv") # Lire le fichier entièrement dans la mémoire
print(donnes_entiere.info(verbose=False, memory_usage='deep')) # Afficher l'information sur les données entières

<class 'pandas.core.frame.DataFrame'>
Index: 200 entries, 0 to 199
Columns: 2 entries, X to Y
dtypes: float64(2)
memory usage: 4.7 KB
None


#### 3_ Comparaison de l'utilisation mémoire des deux méthodes avec différentes tailles de données
###### Le graphique ci-dessous montre l'occupation de la mémoire par les deux méthodes de lecture avec différentes tailles de données. On remarque que la taille mémoire occupée par la solution avec morceaux (chunks) est constante et inférieure à la taille mémoire occupée par la lecture entière, cette dernière augmente proportionnellement avec la taille de fichier de données.


In [5]:

# Paramètres
taille_chunk = 500
nb_lignes_max = 10000
pas = 100  

# Initialisation des listes pour stocker les résultats
utilisation_memoire_chunk = []
utilisation_memoire_entiere = []
liste_nb_lignes = []

# Générer des jeux de données de tailles croissantes et mesurer l'occupation mémoire
for nb_lignes in range(taille_chunk, nb_lignes_max + 1, pas): 
    # Générer un jeu de données de taille nb_lignes
    donnees = {'colonne1': range(nb_lignes)}
    df = pd.DataFrame(donnees)
    
    # Enregistrer le jeu de données dans un fichier CSV
    nom_fichier = f"jeu_donnees_{nb_lignes}.csv"
    df.to_csv(nom_fichier, index=False)
    
    # Fermer le gestionnaire de fichier pour permettre la suppression
    del df
    
    # Mesurer l'occupation mémoire du premier chunk
    iterateur = pd.read_csv(nom_fichier, chunksize=taille_chunk)
    premier_chunk = next(iterateur)
    utilisation_memoire_chunk.append(premier_chunk.memory_usage(deep=True).sum())
    
    # Mesurer l'occupation mémoire du fichier complet
    donnees_entieres = pd.read_csv(nom_fichier)
    utilisation_memoire_entiere.append(donnees_entieres.memory_usage(deep=True).sum())
    
    # Ajouter le nombre de lignes à la liste
    liste_nb_lignes.append(nb_lignes)
    
    # Supprimer le fichier CSV pour éviter l'accumulation
    del iterateur
    del donnees_entieres
    os.remove(nom_fichier)

# Afficher un graphique des changements de taille mémoire
plt.figure(figsize=(10, 8))
plt.plot(liste_nb_lignes, utilisation_memoire_chunk, label='Utilisation de la mémoire avec les chunks')
plt.plot(liste_nb_lignes, utilisation_memoire_entiere, label='Utilisation de la mémoire avec la lecture entière')
plt.xlabel('Nombre de lignes')
plt.ylabel('Utilisation de la mémoire (octets)')
plt.title('Comparaison entre les deux méthodes de lecture')
plt.legend()
plt.show()


<IPython.core.display.Javascript object>

#### 4_ Code pour lire les données meme si la taille de données augmente 
###### Le code ci-dessous est conçu pour permettre la lecture des données meme si leur taille augmente. Il permet de récupérer la taille de fichier de données en question et la taille de mémoire disponible. Si la taille de fichier est inférieure à 30% de la taille de mémoire disponible, on peut lire le fichier entièrement sans surcharger la mémoire. Sinon, on utilise la lecture par morceaux (chunks) avec une taille de chunk égale à 1/8 de la taille de mémoire disponible.

In [6]:

# Obtient la capacité totale de la mémoire en Go
capacite_memoire = psutil.virtual_memory().total / (1024 ** 3)

# Spécifiez le chemin du fichier CSV que vous souhaitez lire
csv_file_path = 'donnees.csv'

# Obtient la mémoire disponible en Go
memoire_disponible = psutil.virtual_memory().available / (1024**3) # Memoire disponible < capacité de la mémoire 

# Affiche la mémoire disponible et la capacité mémoire en Go
print(f"Capacité totale de la mémoire : {capacite_memoire:.2f} Go \n")
print(f"Mémoire disponible: {memoire_disponible:.2f} Go \n")


# Obtient la taille totale du fichier CSV sans le lire en entier
taille_fichier = os.path.getsize(csv_file_path) / (1024 ** 2)  # Convertit en mégaoctets

# Affiche la taille du fichier CSV
print(f"Taille totale du fichier CSV: {taille_fichier:.2f} Mo \n")

if taille_fichier <= (memoire_disponible * 30) /100 : # La taille du fichier est inférieure à 30% de la taille de mémoire disponible
    # La lecture en entier est possible sans surcharger la mémoire 
    print("La taille du fichier est inférieure à 30% de la taille du mémoire disponible, on effectue une lecture complète...  \n")
    donnes = pd.read_csv(csv_file_path)
else:
    # La lecture en entier surchargerait la mémoire, utiliser des chunks
    chunk_size = int(memoire_disponible / 8)  # Utiliser des chunks de taille 1/8 de la taille de la mémoire disponible. 
    print(f"Lecture par morceaux avec une taille de chunk de {chunk_size} Go...")
    reader = pd.read_csv(csv_file_path, chunksize=chunk_size)
    # A ce stade, la variable reader est un itérator. Ce dernier va permettre d'itérer sur chaque chunk (groupe de données) et faire le traitement nécessaire. 
    # Il ne faut pas concaténer les chunks sinon, tout le fichier sera chargé dans la mémoire et cela revient à la meme chose que lire le fichier en entier.

Capacité totale de la mémoire : 31.85 Go 

Mémoire disponible: 20.65 Go 

Taille totale du fichier CSV: 0.01 Mo 

La taille du fichier est inférieure à 30% de la taille du mémoire disponible, on effectue une lecture complète...  


# A_ Implémentation de KMeans sequentiel
#### 1_ Implémentation de la fonction KMeans sequentiel
###### L'algorithme KMeans est un algorithme d'apprentissage non supervisé qui permet de regrouper les données en k clusters. Il est composé de deux étapes :
###### 1- Initialisation des centres : choisir k points de données aléatoirement comme centres initiaux
###### 2- Assigner chaque point de données au cluster le plus proche
###### 3- Mettre à jour les centres : calculer les nouveaux centres en utilisant la moyenne des points de données de chaque cluster
###### 4- Répéter les étapes 2 et 3 jusqu'à ce que les centres ne changent plus
###### L'algorithme décrit ci-dessus est implémenté dans la fonction KMeans_sequentiel ci-dessous.

In [7]:
def KMeans_sequentiel(data, k, max_iterations=100): # data : les données, k : le nombre de clusters, max_iterations : le nombre d'itérations maximales
    # Choix aléatoire  de k centres µ1, ..., µk
    centers = data[np.random.choice(range(len(data)), k, replace=False)]
    counts = np.zeros(k)
    new_centers = np.zeros_like(centers)
    labels=[]
    # Pour chaque nouveau point Xi :
    for iteration in range(max_iterations):
        # On associe chaque point au cluster le plus proche 
        labels = np.argmin(np.linalg.norm(data[:, np.newaxis] - centers, axis=2), axis=1)

        # Mise à jour des centres 
        np.add.at(new_centers, labels, data)
        np.add.at(counts, labels, 1)
        for j in range(k):
            if counts[j] > 0:
                centers[j] = new_centers[j] / counts[j]

        # Réinitialisation des centres 
        new_centers.fill(0)
        counts.fill(0)

    return centers, labels

#### 2_ Application du KMeans sequentiel sur le dataset
###### La fonction KMeans_sequentiel est appliquée sur le dataset généré précédemment avec K=2. Le graphique ci-dessous montre les clusters obtenus. 
###### Les centres de chaque cluster sont représentés par des croix rouges.

In [25]:
X = donnes_entiere.values
centres, labels = KMeans_sequentiel(X,2,100)
# Créer le scatter plot
plt.figure(figsize=(10, 8))
scatter = plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis', alpha=0.6)
plt.scatter(centres[:, 0], centres[:, 1], c='red', marker='X', s=100, label='Centres de clusters')

# Ajouter des légendes et un titre
plt.legend()
plt.title('Résultat de KMeans séquentiel')

# Ajouter une légende pour les clusters
legend_labels = [f'Cluster {i}' for i in range(len(centres))]
plt.legend(handles=scatter.legend_elements()[0], labels=legend_labels)
# Afficher le plot
plt.show()

<IPython.core.display.Javascript object>

#### 3_ Enregistrer les résultats de l’algorithme dans un fichier de manière à consommer peu de mémoire
###### Dans cette section, nous allons enregistrer les labels de chaque point de données issus de l'algorithme KMeans séquentiel dans un fichier.
##### 1_ Quantisation
###### La quantisation est une technique qui permet de réduire la taille de données en changeant la représentation des données. Elle consiste dans notre cas à changer le type des données de float64 à float32. Cela permet de réduire la taille mémoire allouée pour stocker les données.

In [9]:
print(type(labels[0])) # les labels sont de type numpy.int64


<class 'numpy.int64'>


In [10]:
labels_32 = labels.astype(np.int32) # on convertit les labels en int32
print(type(labels_32[0])) # les labels sont de type numpy.int32


<class 'numpy.int32'>


##### 2_ Utilisation des fichiers parquets
###### Après la quantisaton de données, on peut utiliser les fichiers parquets pour enregistrer les données. Les fichiers parquets sont des fichiers binaires qui permettent de stocker les données de manière compressée. Ils sont utilisés pour stocker les données dans les data lakes.

In [11]:
# Se fait en deux étapes :
labels_32 = pd.DataFrame(labels_32 ) # convertir les labels en dataframe
labels_32.to_parquet("labels_32.parquet",index=False) # enregistrer les labels dans un fichier parquet

  if _pandas_api.is_sparse(col):


##### 3_ Comparaison entre la taille du fichier CSV et celle du fichier parquet
###### On compare dans cette section la taille du fichier CSV et celle du fichier parquet de notre solution. Pour ce faire : 
###### 1- On sauvegarde nos données en CSV, et on calcule la taille du fichier en utilisant la fonction os.path.getsize
###### 2- On calcule la taille du fichier parquet de notre solution en utilisant la fonction os.path.getsize
###### 3- On compare les deux tailles
###### On remarque que la taille du fichier parquet est beaucoup plus petite que celle du fichier CSV.
###### Le pourcentage de réduction de la taille est de 85.66%.

In [12]:
# Enregistrer les labels dans un fichier CSV ordinaire :
np.savetxt("labels_32.csv", labels, delimiter=',') # enregistrer les labels dans un fichier CSV

In [13]:
# Calcule de la taille des fichiers et comparison des résultats :
taille_fichier_csv = os.path.getsize("donnees.csv") / 1024   # Convertir en Ko
taille_fichier_parquet = os.path.getsize("labels_32.parquet") / 1024  # Convertir en Ko

print(f"Taille totale du fichier CSV : {taille_fichier_csv:.2f} KO \n")
print(f"Taille totale du fichier parquet : {taille_fichier_parquet:.2f} KO \n")
print(f"La taille a été réduite par : {100 - (taille_fichier_parquet / taille_fichier_csv) * 100:.2f} % \n")

Taille totale du fichier CSV : 8.00 KO 

Taille totale du fichier parquet : 1.15 KO 

La taille a été réduite par : 85.66 % 


#### 4_ Validation  de la cohérence des résultats
###### Notre solution (Quantisation + fichier parquet) est basée sur la compression de données, il est donc important de vérifier la cohérence des résultats obtenus après la compression. Pour ce faire, on compare les résultats obtenus avec la solution de référence (KMeans séquentiel sans compression de données).
###### Pour valider la cohérence des résultats, une métrique simple est utilisée : la précision (accuracy). Cette métrique permet de comparer les résultats obtenus avec la solution de référence. Elle est calculée en utilisant la fonction accuracy_score de la librairie scikit-learn. Sa valeur varie entre 0 et 1. Plus la valeur est proche de 1, plus les résultats sont cohérents.
###### Suite à l'exécution de la cellule ci-dessous, on remarque que la valeur de la précision est égale à 1, ce qui confirme la cohérence des résultats obtenus avec notre solution de compression des données.

In [14]:
# Lecture de données compressées
labels_data = pq.read_table("labels_32.parquet").to_pandas()
print( f"La précision entre les deux résultats est : {accuracy_score(labels_data, labels_32)}")
print("Les résultats sont cohérents à {0:.2f}%".format(accuracy_score(labels_data, labels_32) * 100))

La précision entre les deux résultats est : 1.0
Les résultats sont cohérents à 100.00%


# B_ Streaming KMeans 
#### 1_ Implémentation de la classe StreamingKMeans
###### La classe StreamingKMeans représente l'algorithme décrit dans la partie B.
###### Elle prend en paramètre :
###### ----> 1- k : le nombre de clusters
###### ----> 2- T : la taille de l'historique
###### ----> 3- r : le paramètre de pondération
###### ----> 4- P : la partition P qui représente les labels.
###### Elle est composée de deux méthodes :
###### 1- La méthode update : cette méthode permet de mettre à jour les centres et les labels à chaque arrivée d'un nouveau batch. Elle prend en paramètre le nouveau batch et renvoie les nouveaux centres ainsi que les labels du nouveau batch, elle effectue les étapes suivantes :
###### ----> 1-1- Vérifier si la taille maximale de l'historique est atteinte, si c'est le cas, enlever le batch le plus ancien
###### ----> 1-2- Ajouter le nouveau batch à l'historique
###### ----> 1-3- Initialiser les centres avec la partition P (cette partition est aléatoire si c'est le premier batch, sinon elle est calculée à partir des batches précédents)
###### ----> 1-4- Calculer le poids de chaque batch, ensuite attribuer ce poids à chaque point de données selon son appartenance au batch (le poids est élevé pour les points de données des batches récents)
###### ----> 1-5- Utiliser l'algorithme KMeans pondéré pour calculer les nouveaux centres et les nouveaux labels
###### ----> 1-6- Mettre à jour la partition P
###### 2- La méthode initialize_centroids : cette méthode permet d'initialiser les centres avec la partition P. Elle prend en paramètre le nouveau batch et l'historique des batches. Elle effectue les étapes suivantes :
###### ----> 2-1- Si c'est le premier batch, on choisit deux centres aléatoirement
###### ----> 2-2- Sinon, on calcule les centres à partir de la partition P


In [15]:

class StreamingKMeans:
    def __init__(self, k, T, r,P=None):
        self.k = k # Nombre de clusters
        self.T = T # Taille de l'historique
        self.r = r # Paramètre de pondération
        self.X = [] # Historique des batches
        self.P = P # Partition P qui représente les labels des batches précédents (None s'il n'y a pas de batch précédent)
        

    def update(self, batch): 
        if len(self.X) > 0:
            X_all = np.concatenate(self.X,axis=0)
        else :
            X_all = self.X
        # Si la taille de l'historique est T, enlever le plus vieux batch
        if len(self.X) == self.T:
            self.X.pop(0)
            print(" Le batch le plus ancien a été supprimé")
            

        # Ajouter le nouveau batch à l'historique
        self.X.append(batch)
        # Initialiser les centroids C avec la partition P    
        initial_centroids = self.initialize_centroids(batch,X_all)
        # Utiliser l'algorithme k-means pondéré
        X_all = np.concatenate(self.X,axis=0) # On inclus le nouveau batch dans le calcule des poids 
        sample_weights = [self.r**t for t in range(len(self.X))] # Les poids associés à chaque batch 
        all_points_weight = np.repeat(sample_weights, [tableau.shape[0] for tableau in self.X]) # Les poids associés à chaque point de données selon son appartenance à quel batch 
        kmeans = KMeans(n_clusters=self.k, random_state=0,init=initial_centroids,n_init=1)
        labels = kmeans.fit_predict(X_all,sample_weight=all_points_weight) # On calcule les nouveaux labels avec les poids associés à chaque point de données
        centroids = kmeans.cluster_centers_ 
        self.P = labels 
 
        
        return centroids, labels[- batch.shape[0] :] # On renvoie les nouveaux centres ainsi que les labels de nouveau batch 

    def initialize_centroids(self,batch,X_all): # Initialisation des centroids avec la partition P
        
        centres_initiaux = []
        # Vous pouvez personnaliser la méthode d'initialisation des centroids selon vos besoins
        # Ici, nous utilisons simplement les centroids du dernier batch
        if len(self.X) == 1 : # Pas de batchs précédents, on a que le batch qu'on vient de rajouter
            centres_initiaux =  batch[np.random.choice(range(len(batch)), self.k, replace=False)] # On prend deux centres aléatoirement 
            
        else : # il y a des batches précédents donc on calcule leurs centres avec la partition P afin de les utiliser comme centres initiaux 
            for classe in range(self.k) :
                centre = np.mean(X_all[self.P == classe],axis=0)
                centres_initiaux.append(centre)

        return centres_initiaux
        



#### 2_ Animation pour montrer le changement des centres suite à l'arrivée du chaque batch avec notre algorithme (exécuter les cellules pour lancer l'animation)
###### L'animation ci-dessous montre le changement des centres suite à l'arrivée de chaque batch. Les centres sont représentés par des croix rouges.
###### La cohérence des résultats obtenus est confirmée par le graphique, en effet les clusters sont bien séparés.


In [16]:

from matplotlib.animation import FuncAnimation


# Définir la variable d'environnement pour éviter la fuite de mémoire
os.environ['OMP_NUM_THREADS'] = '1'

# Charger les données à partir d'un fichier CSV
data = pd.read_csv("donnees.csv").values
np.random.shuffle(data)

# Diviser les données en 10 batches
batches = np.array_split(data, 4)

# Initialiser le graphique
fig, ax = plt.subplots(figsize=(10, 6))
all_labels = []

centroid_scatter = ax.scatter([], [], c='red', marker='X', s=100,label = "Centres")
scatter = ax.scatter([], [], alpha=0.6, c=[],label = "Les poins de données")
ax.legend(loc='lower right')
def init():
    global streamingkmeans
    global all_labels
    global current_data
    global centroid_scatter
    global scatter

    # instancier l'algorithme de streaming KMeans
    k = 2
    T = 3
    r = 20
    streamingkmeans = StreamingKMeans(k, T, r)

    # Initialiser le graphique
    
    all_labels = []
    current_data = batches[0]

def update(frame): # frame représente le numéro du batch
    global streamingkmeans
    global all_labels
    global current_data
    if frame > 0 :
        current_data = np.concatenate((current_data, batches[frame]), axis=0)
    centroids, labels = streamingkmeans.update(batches[frame])
    all_labels.extend(labels.tolist())
    
    ax.clear()
    ax.set_title(f"Résultat de streaming KMeans suite à l'arrivé du batche numéro {frame + 1}")
    centroid_scatter = ax.scatter(centroids[:, 0], centroids[:, 1], c='red', marker='X', s=100,label = "Centres")
    for i, txt in enumerate(centroids):
        ax.text(txt[0], txt[1], f'({txt[0]:.2f}, {txt[1]:.2f})', fontsize=12, ha='right', va='bottom',fontweight=500)
    
    scatter = ax.scatter(current_data[:, 0], current_data[:, 1], alpha=0.3, c=all_labels,label = "Les poins de données")
    ax.legend(loc='lower right')
    # Fixer les limites du graphique pour maintenir la même échelle
    ax.set_xlim(0, 14)
    ax.set_ylim(0, 14)

    

# Créer l'animation
anim = FuncAnimation(fig, update, frames=len(batches) , init_func=init, repeat=True, blit=True,interval=1500)

plt.show()


<IPython.core.display.Javascript object>

# C_ Implémentation de k-means distribué (Apache Beam)
###### Dans cette section, nous allons implémenter l'algorithme KMeans distribué en utilisant le framework Apache Beam.
###### Grâce à la classe MeanCombiner personnalisée, notre solution prend en considération les données de n'importe quelle dimension et pour n'importe quelle valeur de k. 
#### 1_ Implémentation de la classe MeanCombiner pour le calcule des centres
###### La classe MeanCombiner permet de calculer la moyenne des points de données qui ont la meme clé (le meme centre). Elle hérite de la classe CombineFn de la librairie Apache Beam. Elle est composée de quatre méthodes :
###### 1- La méthode create_accumulator : cette méthode permet d'initialiser l'accumulateur. Elle renvoie un tuple qui contient la somme des points de données et le nombre de points de données.
###### 2- La méthode add_input : cette méthode permet de mettre à jour l'accumulateur à chaque arrivée d'un nouveau point de données. Elle prend en paramètre l'accumulateur et le nouveau point de données. Elle renvoie un tuple qui contient la somme des points de données et le nombre de points de données.
###### 3- La méthode merge_accumulators : cette méthode permet de fusionner les accumulateurs. Elle prend en paramètre une liste d'accumulateurs et renvoie un accumulateur qui contient la somme des points de données et le nombre de points de données.
###### 4- La méthode extract_output : cette méthode permet de calculer la moyenne des points de données. Elle prend en paramètre l'accumulateur et renvoie la moyenne des points de données (centre).

In [17]:
class MeanCombiner(beam.CombineFn) :
    
    def __init__(self, dim=2, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.dim = dim
        
    def create_accumulator(self): # initialisation de notre accumulateur 
        sum = np.zeros(self.dim)
        count = 0
        return sum,count

    def add_input(self, accumulator, input): # s'exécute à l'arrivée de chaque élément
        current_sum, current_count = accumulator
        current_sum += input
        current_count += 1
        return current_sum,current_count

    def merge_accumulators(self, accumulators): 
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator): # s'exécute à la fin de chaque fenêtre
        sum, count = accumulator
        if count == 0 :
            return float('NaN')
        return sum / count


#### 2_ Implémentation de la fonction assign_cluster
###### La fonction assign_cluster permet d'assigner le cluster le plus proche à chaque point de données. Elle prend en paramètre un point de données et les centroïdes sous forme d'un dictionnaire. Elle renvoie un tuple qui contient le numéro du cluster assigné (le plus proche) ainsi que les coordonnées du point.
###### Le foncionnement de la fonction est celui décrit dans la partie C de l'énoncé.

In [18]:
def assign_cluster_dictionnaire(point, centroids):
    """Trouver le cluster le plus proche pour un point de données donné en utilisant un dictionnaire pour passer les centroïdes"""
    # centroids est un dictionnaire avec les coordonnées des centroïdes
    # Initialisation avec une distance infinie et aucun cluster assigné
    min_distance = float('inf')
    assigned_cluster = None

    # Parcours de tous les centroïdes
    for cluster, centroid in centroids.items():

        # Calcul de la distance entre le point et le centroïde actuel
        dist =  np.sqrt(np.sum((point[1] - centroid)**2))

        # Mise à jour du cluster assigné si la distance est plus petite
        if dist < min_distance:
            min_distance = dist
            assigned_cluster = cluster

    # Retourne un tuple avec le numéro du cluster assigné et les coordonnées du point
    return assigned_cluster, point[1]

#### 3_ Création et execution du pipeline pour le KMeans distribué
###### Le pipeline est composé de trois étapes :
###### 1- Lecture des données : cette étape permet de lire les données à partir du fichier, ensuite, un centre choisi aléatoirement est associé à chaque point de données.
###### 2- Calcul des centres : cette étape permet de calculer les centres grâce à la classe MeanCombiner. Les points de données sont d'abord regroupés par clé (le centre), ensuite, la classe MeanCombiner est appliquée sur chaque groupe pour calculer la moyenne des points de données qui ont la meme clé (le meme centre).
###### 3- Assignation des clusters : cette étape permet d'assigner un cluster à chaque point de données. Elle utilise la fonction assign_cluster pour trouver le cluster le plus proche pour chaque point de données. Les centres sont passés à la fonction assign_cluster sous forme de dictionnaire grâce à la fonction beam.pvalue.AsDict(centroids).
###### Les résultats sont convertis en tuple pour pouvoir les enregistrer dans un fichier CSV avec le schéma Point prédéfini par la classe Point. (Cette étape n'est pas demandée dans l'énoncé, mais elle permet une meilleure visualisation des résultats avec un scatter plot).


In [26]:

# Une classe pour représenter un point de données sous forme de schémas ( cela nous aide uniquement pour l'affichage des résultats)
class Point(typing.NamedTuple):
    id : int
    X: float
    Y: float
    
pipeline = beam.Pipeline()
k = 2

# Lecture des données et initialisation aléatoire des centres
point_donnes = (
        pipeline
        |"Lecture de données" >> beam.io.ReadFromCsv("donnees.csv",)
        |"Ajout de centre choisi aléatoirement" >> beam.Map(lambda x : (random.randint(0,1),np.array([float(x.X),float(x.Y)]))) # ajouter un centre choisi aléatoirement à chaque point de données
        
)

# Calculer les centres
centroids = (
        point_donnes
        |"Regrouper par clé et calculer la moyenne " >> beam.CombinePerKey(MeanCombiner(dim=2)) # calculer la moyenne des points de données qui ont la meme clé (le meme centre)
)

# Assigner un cluster à chaque point de données
resultats = (
        point_donnes
        |"Appliquer la fonction assign cluster " >> beam.Map(assign_cluster_dictionnaire, centroids=beam.pvalue.AsDict(centroids), )  # assigner un cluster à chaque point de données
        # convertir les résultats en tuple pour pouvoir les enregistrer dans un fichier CSV avec le schéma Point
        | "Convertir les résutats en tuple" >> beam.Map(lambda x : list([int(x[0]),float(x[1][0]),float(x[1][1])])).with_output_types(Point)
        |"Ecrire les réultats dans un fichier CSV" >> beam.io.WriteToCsv("résultats_kmeans_partie_C",header=["ID_Cluster","X","Y"]) 
        
)
pipeline.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x18d361ffd10>

#### 4_ Affichage des résultats sous la forme d'un graphique
###### le graphique ci-dessous montre les résultats obtenus avec notre solution. 
###### Chaque cluster est représenté par une couleur différente. La structure des clusters est bien respectée ce qui confirme la cohérence des résultats obtenus..


In [20]:

# Charger les données depuis le fichier CSV 
resultats_kmeans = pd.read_csv('résultats_kmeans_partie_C-00000-of-00001')

plt.figure(figsize=(10,8))
plt.scatter(resultats_kmeans['X'], resultats_kmeans['Y'], c=resultats_kmeans['ID_Cluster'], cmap='viridis')
plt.xlabel('X')
plt.ylabel('Y')
plt.title("Résultats de KMeans distribué")

plt.show()

<IPython.core.display.Javascript object>

#### Quelle supposition fait-on pour passer les centroïdes sous cette forme ? Peut-on optimiser cette étape ?
###### 1_ On suppose que la PCollection qu'on a créé (qui contient les données) est suffisamment petite pour tenir dans la mémoire. 
###### 2_ Oui, on peut optimiser cette étape en utilisant beam.pvalue.AsIter(pcollection). Cette fonction permet de lire les données en utilisant un itérator. Cela permet de ne pas charger toutes les données dans la mémoire.

#### 5_ Implémentation de k-means distribué (Apache Beam) avec la solution optimisée
###### Pour implémenter la solution précédente, on utilise la fonction beam.pvalue.AsIter(pcollection). Cette fonction assure la récupération des données en utilisant un itérator qui permet de ne pas les charger toutes dans la mémoire.

In [21]:
def convert_to_list(element):
    return [element]


def cluster_plus_proche_iter(point, centroids): # Trouver le cluster le plus proche pour un point de données.
    """Trouver le cluster le plus proche pour un point de données donné en utilisant un itérator pour passer les centroïdes"""
    # centroids est un itérator
    # point est un tableau de coordonnées du point
    # Initialisation avec une distance infinie et aucun cluster assigné
    min_distance = float('inf')
    assigned_cluster = None

    # Parcours de tous les centroïdes avec l'itérateur
    for cluster, centroid in centroids:

        # Calcul de la distance entre le point et le centroïde actuel
        dist =  np.sqrt(np.sum((point[1] - centroid)**2))

        # Mise à jour du cluster assigné si la distance est plus petite
        if dist < min_distance:
            min_distance = dist
            assigned_cluster = cluster

    # Retourne un tuple avec le numéro du cluster assigné et les coordonnées du point
    return assigned_cluster, point[1]
pipeline = beam.Pipeline()
k = 2
point_donnes = (
        pipeline
        |"Lecture de données" >> beam.io.ReadFromCsv("donnees.csv",)
        |"Ajout de centre choisi aléatoirement" >> beam.Map(lambda x : (random.randint(0,1),np.array([float(x.X),float(x.Y)])))
)

centroids = (
        point_donnes
        |"Regrouper par clé et calculer la moyenne " >> beam.CombinePerKey(MeanCombiner())
)


liste_centroids = (
    centroids
    | "Convertir en liste" >> beam.Map(convert_to_list)
    | "Flattening" >> beam.FlatMap(lambda x: x)  # Aplatir la liste de listes en une liste unique
)


resultats = (
        point_donnes
        |"Appliquer la fonction assign cluster " >> beam.Map(cluster_plus_proche_iter, centroids=beam.pvalue.AsIter(liste_centroids), )
        |"Affichage de résultats" >> beam.Map(print)
)

pipeline.run()

(0, array([5.49671415, 4.8617357 ]))
(0, array([5.64768854, 6.52302986]))
(0, array([4.76584663, 4.76586304]))
(0, array([6.57921282, 5.76743473]))
(0, array([4.53052561, 5.54256004]))
(0, array([4.53658231, 4.53427025]))
(0, array([5.24196227, 3.08671976]))
(0, array([3.27508217, 4.43771247]))
(0, array([3.98716888, 5.31424733]))
(0, array([4.09197592, 3.5876963 ]))
(0, array([6.46564877, 4.7742237 ]))
(0, array([5.0675282 , 3.57525181]))
(0, array([4.45561728, 5.11092259]))
(0, array([3.84900642, 5.37569802]))
(0, array([4.39936131, 4.70830625]))
(0, array([4.39829339, 6.85227818]))
(0, array([4.98650278, 3.94228907]))
(0, array([5.82254491, 3.77915635]))
(0, array([5.2088636 , 3.04032988]))
(0, array([3.67181395, 5.19686124]))
(0, array([5.73846658, 5.17136828]))
(0, array([4.88435172, 4.6988963 ]))
(0, array([3.52147801, 4.28015579]))
(0, array([4.53936123, 6.05712223]))
(0, array([5.34361829, 3.23695984]))
(0, array([5.32408397, 4.61491772]))
(0, array([4.323078  , 5.61167629]))
(

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x18d35a8be90>

# D. Implémentation de k-means séquentiel distribuée (Apache Beam)
###### Dans cette section, nous allons implémenter l'algorithme KMeans séquentiel distribué en utilisant le framework Apache Beam.
### 1_ Implémentation de la classe KMeansSequentielSdistribué
###### Pour ce faire, nous créons une classe KMeansSequentielSdistribué qui hérite de la classe DoFn de la librairie Apache Beam. 
###### Dans cette classe, nous déclarons deux états :
###### -----> 1- L'état des centres : cet état contient les centres actuels. Il est initialisé avec les k premiers points de données arrivés.
###### -----> 2- L'état des effectifs : cet état contient les effectifs de chaque cluster sous la forme (id_cluster, coordonnées du point). 
###### La classe KMeansSequentielSdistribué est composée de deux méthodes :
###### 1- La méthode process : cette méthode permet de mettre à jour les centres et les effectifs à chaque arrivée d'un nouveau point de données. Elle prend en paramètre le nouveau point de données ainsi que les états des centres et des effectifs. Elle renvoie le  point de données avec son nouveau cluster assigné (le cluster le plus proche). Elle effectue les étapes suivantes :
###### -----> 1-1- Récupérer l'état actuel des centres et des effectifs
###### -----> 1-2- Trouver le cluster le plus proche pour le nouveau point de données
###### -----> 1-3- Mettre à jour les effectifs
###### -----> 1-4- Mettre à jour les centres
###### 2- La méthode cluster_plus_proche : cette méthode permet de trouver le cluster le plus proche pour un point de données donné. Elle prend en paramètre un point de données, les centres actuels et les effectifs actuels. Elle renvoie le cluster le plus proche. Initialement, les centres sont les k premiers points de données arrivés.
###### 3- La méthode calculer_centres : cette méthode permet de calculer les nouveaux centres. Elle prend en paramètre les effectifs actuels. Elle renvoie les nouveaux centres en faisant la moyenne sur les effectifs de chaque cluster.
###### Le pipeline est composé de trois étapes :
###### 1- Lecture des données : cette étape permet de lire les données à partir du fichier.
###### 2- Création d'une clé unique pour l'ensemble de données avec deux dimensions : cette étape permet de générer des données avec une seule clé (0 dans ce cas) et deux dimensions (X et Y).
###### 3- Application de kmeans : cette étape permet d'appliquer l'algorithme KMeans séquentiel distribué sur les données. Elle utilise la classe KMeansSequentielSdistribué pour mettre à jour les centres et les effectifs à chaque arrivée d'un nouveau point de données. 
###### 4- Les résultats sont convertis en tuple pour pouvoir les enregistrer dans un fichier CSV avec le schéma Point prédéfini par la classe Point.


In [22]:
from apache_beam.transforms.userstate import BagStateSpec

class KMeansSequentielSdistribué(beam.DoFn):
    
    centres = BagStateSpec(name='centres', coder=beam.coders.PickleCoder()) # l'état des centres
    effectifs = BagStateSpec(name='effectifs', coder=beam.coders.PickleCoder()) # l'état des effectifs
    
    def __init__(self, k, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.k =k
        
     
    

    def process(self, element, centres = beam.DoFn.StateParam(centres), effectifs = beam.DoFn.StateParam(effectifs) ):
        """Process un point de données en calculant le cluster le plus proche et en mettant à jour les centres et les effectifs"""

        
        centres_liste = list(centres.read()) # récupérer l'état actuel des centres (les centres actuels) 
        effectifs_liste = list(effectifs.read()) # récupérer l'état actuel des effectifs (les effectifs de cluster)
        
        key,coord = element # récupérer la clé et les coordonnées du point de données (comme on est dans le cas de clé unique, la clé est toujours 0)
        
        cluster = self.cluster_plus_proche(element, centres_liste,effectifs_liste) # trouver le cluster le plus proche 

      

        
        ### Mise à jour des effectifs
        n_element = (cluster[0] , coord) # mise à jour de l'id du cluster pour le point de données
        effectifs_liste.append(n_element) # mise à jour de la liste des effectifs
        effectifs.add(n_element) # mise à jour de l'état des effectifs
     
        ### Mise à jour des centres 
        nouveaux_centres = self.calculer_centres(effectifs_liste) # calculer les nouveaux centres
        centres.clear() # vider l'état des centres
        for centre in nouveaux_centres : # mettre à jour l'état des centres
            centres.add(centre)

        yield n_element # retourner le point de données avec son nouveau cluster assigné
   
        

    def cluster_plus_proche(self,point, centroids,effectif_liste):
        """Trouver le cluster le plus proche pour un point de données donné"""

        min_distance = float('inf')
        assigned_cluster = None
        
        if len(effectif_liste) <= self.k - 1  : # les centres sont toujours les k premiers effectifs arrivés
            return len(centroids), point[1] # len(centroids) permet de donner l'id de cluster courant (ex : si on a 3 centres, le cluster courant est 3)
        
        # Sinon, on calcule le cluster le plus proche
        for centroid in centroids:
            dist =  np.sqrt(np.sum((point[1] - centroid[1])**2))
            # Mise à jour du cluster assigné si la distance est plus petite
            if dist < min_distance:
                min_distance = dist
                assigned_cluster = centroid
        # Retourner le cluster assigné 
        return assigned_cluster
    
    def calculer_centres(self,effectifs_liste):
        """Calculer les nouveaux centres"""
       
        if len(effectifs_liste) <= self.k : # les centres sont toujours les k premiers effectifs arrivés 
            return effectifs_liste
        # Sinon, on calcule les nouveaux centres
        n_centres = []
        for i in range(self.k) : # pour chaque cluster i 
            elements = list(filter(lambda point : point[0] == i,effectifs_liste)) # récupérer les points de données qui appartiennent au cluster i
            somme = reduce(lambda x,y : x + y[1], elements,np.zeros_like(elements[0][1])) # calculer la somme des coordonnées des points de données
            count = len(elements) # calculer le nombre de points de données
            moyenne = somme / count # calculer la moyenne des coordonnées des points de données
            n_centres.append((i,moyenne)) # ajouter le nouveau centre i à la liste des nouveaux centres
        return n_centres # retourner la liste des nouveaux centres
            

pipeline = beam.Pipeline()
labels = (
        pipeline
        |"Lecture de données" >> beam.io.ReadFromCsv("donnees.csv")
        |"Création d'une clé unique pour l'ensemble de donnéés avec deux dimension" >> beam.Map(lambda x : (0,np.array([float(x.X),float(x.Y)]))) # générer des données avec une seule clé (0 dans ce cas) et deux dimensions (X et Y)
        | "Application de kmeans " >> beam.ParDo(KMeansSequentielSdistribué(k=2))
        | "Convertir les résutats en tuple" >> beam.Map(lambda x : list([int(x[0]),float(x[1][0]),float(x[1][1])])).with_output_types(Point)
        |"Ecrire les réultats dans un fichier CSV" >> beam.io.WriteToCsv("résultats_kmeans_partie_D",header=["ID_Cluster","X","Y"]) # enregistrer les résultats dans un fichier CSV pour les visualiser par la suite
)

#chaque point affiché est un tuple de la forme (id_cluster, coordonnées du point)
pipeline.run() # exécuter le pipeline

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x18d351381d0>

### 2_ Visualisation du résultat de l'algorithme Partie D
###### Le graphique ci-dessous montre les résultats obtenus avec notre solution.
###### Chaque cluster est représenté par une couleur différente. On peut remarquer que certaines points sont mal classées.


In [23]:

# Charger les données depuis le fichier CSV 
resultats_kmeans_partie_D = pd.read_csv('résultats_kmeans_partie_D-00000-of-00001')

plt.figure(figsize=(10,8))

# Scatter plot avec coloration par ID
plt.scatter(resultats_kmeans_partie_D['X'], resultats_kmeans_partie_D['Y'], c=resultats_kmeans_partie_D['ID_Cluster'], cmap='viridis')
plt.xlabel('X')
plt.ylabel('Y')
plt.title("Résultat KMeans séquentiel distribué")

plt.show()

<IPython.core.display.Javascript object>

# E. Implémentation d’une version streaming et distribuée de k-means (Apache Beam)
###### Dans cette section, nous allons implémenter l'algorithme KMeans streaming distribué en utilisant le framework Apache Beam.
### 1_ Implémentation de la classe KmeansStreamingDistributed
###### Pour ce faire, nous créons une classe KmeansStreamingDistributed qui hérite de la classe DoFn de la librairie Apache Beam.
###### Dans cette classe, nous déclarons deux états :
###### -----> 1- L'état des centres : cet état contient les centres actuels (initialisé aléatoirement dès l'arrivée du premier batch). 
###### -----> 2- L'état des batches : cet état contient l'historique des batches (initialement vide).
###### La classe KmeansStreamingDistributed est composée de deux méthodes :
###### 1- La méthode process : cette méthode permet de mettre à jour les centres et les batches à chaque arrivée d'un nouveau batch. Elle prend en paramètre le nouveau batch ainsi que les états des centres et celui des batches. Elle renvoie les nouveaux centres ainsi que les labels associés aux points du nouveau batch. Elle effectue les étapes suivantes :
###### -----> 1-1- Récupérer l'état actuel des centres et des batches
###### -----> 1-2- Si la taille maximale de l'historique est atteinte, enlever le batch le plus ancien
###### -----> 1-3- Ajouter le nouveau batch à l'historique
###### -----> 1-4- Initialiser les centres avec la partition P (cette partition est aléatoire s'il s'agit du premier batch, sinon on récupère les centres actuels directement à partir de l'état)
###### -----> 1-5- Calculer le poids de chaque batch, ensuite attribuer ce poids à chaque point de données selon son appartenance au batch (le poids est élevé pour les points de données des batches récents)
###### -----> 1-6- Utiliser l'algorithme KMeans pondéré pour calculer les nouveaux centres et les nouveaux labels
###### -----> 1-7- Mettre à jour les centres.
###### -----> 1-8- retourner les nouveaux centres ainsi que les labels associés aux points du batch.
###### 2- La méthode initialize_centroids : cette méthode permet d'initialiser les centres aléatoirement en choisissant k points de données.
###### La méthode PrintResults permet d'afficher les résultats.
###### Le pipeline est composé de trois étapes :
###### 1- Creation des PCollections contenant les batches : cette étape permet de créer des PCollections contenant les batches à l'aide de beam.Create().
###### 2- Application de kmeans : cette étape permet d'appliquer l'algorithme KMeans streaming distribué sur les données. Elle utilise la classe KmeansStreamingDistributed.
###### 3- Affichage des résultats : cette étape permet d'afficher les résultats à l'aide de la fonction PrintResults.
###### ---> Les résultats sont affichés à chaque arrivée d'un nouveau batch.
###### ---> à chaque affichage, on montre les nouveaux centres ainsi que les labels associés aux points du nouveau batch.



In [24]:
from functools import reduce
from apache_beam.transforms.userstate import BagStateSpec

class KmeansStreamingDistributed(beam.DoFn):

    centres = BagStateSpec(name='centres', coder=beam.coders.PickleCoder()) # l'état des centres
    
    historique_batches = BagStateSpec(name='batches', coder=beam.coders.PickleCoder()) # l'état des batches

    def __init__(self, k,T,r, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.k = k # Nombre de clusters
        self.T = T # Taille de l'historique
        self.r = r # Paramètre de pondération



    def process(self, element, centres = beam.DoFn.StateParam(centres), historique_batches = beam.DoFn.StateParam(historique_batches)):

        """Process d'un batch de données et mise à jour des centres et les batches"""

        centres_liste = list(centres.read()) # récupérer l'état actuel des centres (les centres actuels) 
        batches_liste = list(historique_batches.read()) # récupérer l'état actuel des batches (les batches actuels)
        key,batch = element # récupérer le batch actuel
        
        if len(batches_liste) > 0:
            X_all = np.concatenate(batches_liste,axis=0)
        else :
            X_all = batches_liste
        # Si la taille de l'historique est T, enlever le plus vieux batch
        if len(batches_liste) == self.T:
            print("\nLa taille maximale de l'historique est atteinte, le batch le plus ancien sera supprimé")
            batches_liste.pop(0)
            historique_batches.clear() # vider l'état des batches
            for batch in batches_liste : # mettre à jour l'état des batches
                historique_batches.add(batch)
            

        # Initialiser les centroids C avec la partition P  
        if len(centres_liste) == 0 : # Si c'est le premier batch, on initialise les centres aléatoirement avec le premier batch
            initial_centroids = self.initialize_centroids(batch)
        else :
            initial_centroids = centres_liste # Sinon, on initialise les centres avec les centres actuels

        # Ajouter le nouveau batch à l'historique
        batches_liste.append(batch)
        historique_batches.add(batch)


        # Calcule des coefficients de pondération pour chaque point de données selon son appartenace à un batch
        X_all = np.concatenate(batches_liste,axis=0) # On inclus le nouveau batch dans le calcule des nouveaux centres
        sample_weights = [self.r**t for t in range(len(batches_liste))] # Les poids associés à chaque batch 
        all_points_weight = np.repeat(sample_weights, [tableau.shape[0] for tableau in batches_liste]) # Les poids associés à chaque point de données sela son appartenance à quel batch 
        # Appliquer l'algorithme kmeans avec les centres initiaux et les poids associés à chaque point de données
        kmeans = KMeans(n_clusters=self.k, random_state=0,init=initial_centroids,n_init=1)
        labels = kmeans.fit_predict(X_all,sample_weight=all_points_weight)
        
        centroids = kmeans.cluster_centers_ # récupérer les nouveaux centres 
        centres.clear() # vider l'état des centres
        for centre in centroids : # mettre à jour l'état des centres
            centres.add(centre)
        yield  centroids, labels[- batch.shape[0] :] # retourner le point de données avec son nouveau cluster assigné

    def initialize_centroids(self,batch):
        """Initialiser les centres"""
        centres_initiaux =  batch[np.random.choice(range(len(batch)), self.k, replace=False)] # On prend deux centres aléatoirement 
        return centres_initiaux

def PrintResults(element):
    print("Arrivée d'un nouveau batch -------------------------------------- ") 
    print("Les nouveaux centres sont : ")
    for i,centre in enumerate(element[0]) :
        print(f"Le centre du cluster {i} : {centre[0]:.2f} , {centre[1]:.2f}")
    print("Les labels associés aux points du nouveau batch sont : \n",element[1])
    


# Charger les données à partir d'un fichier CSV
data = pd.read_csv("donnees.csv").values
np.random.shuffle(data)



# Diviser les données en 4 batches
batches = np.array_split(data, 4)

pipeline = beam.Pipeline()

labels = (
        pipeline
        | beam.Create(batches)
        | beam.Map(lambda x : (0,x)) 
        | "Application de kmeans " >> beam.ParDo(KmeansStreamingDistributed(k=2,T=2,r=0.9))
        | "Affichage de résultats" >> beam.Map(PrintResults)
       
    
)

#chaque point affiché est un tuple de la forme (id_cluster, coordonnées du point)
pipeline.run() # exécuter le pipeline

Arrivée d'un nouveau batch -------------------------------------- 
Les nouveaux centres sont : 
Le centre du cluster 0 : 4.95 , 5.04
Le centre du cluster 1 : 10.21 , 10.10
Les labels associés aux points du nouveau batch sont : 
 [1 1 1 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 1 0 1 1 1 1 0 0 0 0 0 1 0 1 1 1 1 0 0
 0 0 0 1 1 0 1 0 0 1 1 0 0]
Arrivée d'un nouveau batch -------------------------------------- 
Les nouveaux centres sont : 
Le centre du cluster 0 : 4.93 , 5.01
Le centre du cluster 1 : 10.25 , 10.03
Les labels associés aux points du nouveau batch sont : 
 [1 0 0 1 1 0 1 1 0 1 0 0 1 0 0 0 1 0 0 0 0 1 1 1 1 1 1 1 1 0 1 1 1 0 1 0 0
 0 1 0 1 0 0 1 1 1 0 0 0 0]

La taille maximale de l'historique est atteinte, le batch le plus ancien sera supprimé
Arrivée d'un nouveau batch -------------------------------------- 
Les nouveaux centres sont : 
Le centre du cluster 0 : 4.92 , 4.98
Le centre du cluster 1 : 10.28 , 9.96
Les labels associés aux points du nouveau batch sont : 
 [1 0 0 1 1 0 1 1 0 1

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x18d360b2e90>