L'objectif de ce mini-projet est de simuler l'execution du K-means en mode distribue pour l'adapter aux contraintes du Big Data, en particulier la décentralisation des données.
Il est ainsi interdit de rapatrier toutes les données en memoire (chose impossible en Big Data). 

In [1]:
# import des packages pour le tp
from sklearn.neighbors import NearestNeighbors
import numpy as np
import scipy.stats
import copy

In [2]:
# !pip install glob2
# !pip install cassandra-driver==3.24.0
# !pip install tqdm

# K-means simple :

C'est ce kmeans qui sera adapté par la suite.

In [3]:
def kmeans_(feats, num_clus):
    # Initialiser le nombre maximal d'itérations et le changement maximal de centroïdes
    max_iter = 10000
    # Initialiser un objet NearestNeighbors pour trouver les plus proches voisins en utilisant l'algorithme 'ball_tree'
    nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree')
    # Initialiser les variables pour suivre l'itération courante et le changement de centroïdes
    iter_ = 0
    # Sélectionner aléatoirement les centroïdes initiaux à partir des points de données en entrée (feats)
    centroides = feats[np.random.choice(feats.shape[0], num_clus, replace=False), :]
    # Répéter jusqu'à ce que le nombre maximal d'itérations soit atteint
    while(iter_ < max_iter):
        # Mettre à jour l'objet NearestNeighbors avec les centroïdes courants
        nbrs.fit(centroides)        
        # Incrémenter le compteur d'itérations
        iter_ += 1        
        # Trouver les plus proches voisins de chaque centroïde et calculer les distances
        distances, indices = nbrs.kneighbors(feats)
        # Créer une copie des centroïdes actuels pour effectuer une comparaison ultérieure
        centroides_prev = copy.deepcopy(centroides)
        # Mettre à jour la position de chaque centroïde en calculant la moyenne de ses plus proches voisins
        for i in range(centroides.shape[0]):
            centroides[i, :] = feats[np.where(indices[:, 0] == i)[0], :].mean(axis=0)

    # Retourner les indices des plus proches voisins de chaque centroïde et la distance moyenne à ces voisins
    return indices[:, 0], distances.mean()



On commence par remplir 3 table sur Cassandra avec 100000 element chacune
Chaque table contient 4 colonnes

Les tables suivent la structure suivante :
ID_pref (Partition key),Id_company, age, weight, shoulder circumference

Id_company est unique pour chaque ligne
La colonne ID_pref est assigné une des trois valeurs (0,1,2). 40% des lignes ont une valeure de 0. Les valeurs de 1 et 2 representent 30% chacune. 

Pour la première table
A la colonne age, est assignée une valeur suivant une loi normale entre 18 et 99 avec une moyenne=30 et un écart type = 10  (on utilise scipy.stats).
La colonne weight et shoulder circumference sont aleatoires (on utilise numpy.random). 
50<weight<99 et 30<shoulder circumference <60

Pour la seconde table 
A la colonne age, est assignée une valeur suivant une loi normale entre 18 et 99 avec moyenne=30 et écart type = 15  (utilisez scipy.stats).
La colonne weight est assigné une valeur suivant une loi normale entre 50 et 99 avec moyenne=70 et écart type = 10  (utilisez scipy.stats).
30<shoulder circumference (aleatoire) <60

la troisième table utilise le même modèle que la tablea précedente pour genèrer la colonne age
Le poids et shoulder circumference dependent de l'age :
Poids = 50 + sqrt(age)
shoulder circumference = 10 + 2*log(age)
ID_pref = les identifiants des personnes dépendent de leur age comme suit: 
- 0 au premiers 40% (les plus jeunes)
- 1 au second 30% 
- 2 au dernier 30% (les plus âgées) 


NB
On commence par faire l'exercice sur un dictionnaire (cassandra_dict={}, avec cassandra_dict[table1],cassandra_dict[table2],cassandra_dict[table3] representant les tables). On modifie ensuite le code pour s'executer sur cassandra.

## Travail préliminaire : création d'un cluster

 Afin de travailler avec cassandra sur ma machine, j'ai crée un cluster à 3 noeuds avec docker-compose (voir fichier docker-compose.yaml). J'utilise ce uster dans la suite du projet.

In [4]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider


cluster_addresses = ['172.31.160.1']
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster=Cluster(cluster_addresses, auth_provider=auth_provider,  connect_timeout=10)#127.0.0.1
session=cluster.connect()
keyspace = "ai"
timeout = 20
replication = "{'class': 'SimpleStrategy', 'replication_factor': 1}"
session.execute(f"CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH replication = {replication}", timeout=timeout)
session.set_keyspace(keyspace)
#session.execute("drop table python_test")
session.execute("create table if not exists python_test(id uuid primary key, name text, lastname text)", timeout=timeout)
for i in range(10):
    session.execute("insert into python_test(id,name,lastname) values(uuid(), 'prenom_"+str(i)+"','nom') if not exists")
res=session.execute("select * from python_test")

In [5]:
# On ferme la connexion au cluster
cluster.shutdown()

### Création des dictionnaires pour l'exercice 2

In [6]:
# Initialiser un dictionnaire vide pour stocker les tables
cassandra_dict = {}

# Initialiser les tables avec des listes vides
cassandra_dict["table1"] = []
cassandra_dict["table2"] = []
cassandra_dict["table3"] = []


### Définition de fonction qui servira aussi dans la suite du projet

La fonction `genere_nombre` utilise une distribution normale tronquée pour générer des nombres aléatoires dans un intervalle donné. 


In [7]:
# Fonction pour générer un nombre aléatoire basé sur une distribution normale tronquée, dans un intervalle donné
def genere_nombre(mu, sigma, min_intervale, max_intervale, nombre_donnees=1):
    # Crée une distribution normale tronquée avec les paramètres mu (moyenne), sigma (écart-type), 
    # et les limites de l'intervalle (min_intervale et max_intervale)
    norm_dist = scipy.stats.truncnorm((min_intervale - mu) / sigma, (max_intervale - mu) / sigma, loc=mu, scale=sigma)
    
    # Génère un échantillon aléatoire à partir de la distribution normale tronquée créée précédemment
    return norm_dist.rvs(size=nombre_donnees).reshape(nombre_donnees, 1)


### Génération des données
Je génére les données en m'assurant que l'id_company est unique sur les 3 tables.

#### Données de la table1

In [8]:
# On définit une variable 'N' égale à 100 000
N = 100_000

# On génère un vecteur des 'id_pref' de taille N en choisissant aléatoirement 
# parmi 0, 1 et 2 selon les probabilités 0.4, 0.3 et 0.3
id_pref = np.random.choice([0, 1, 2], p=[0.4, 0.3, 0.3], size=N).reshape(N,1).astype('int')

# On crée un vecteur des 'id_company_1' avec des valeurs allant de 1 à N inclus
id_company_1 = np.array(range(1,N+1)).reshape(N,1).astype('int')

# On génère un vecteur des 'ages_t1' de taille N avec des valeurs entières générées par la fonction 'genere_nombre'
ages_t1 = genere_nombre(30,10,18,99,N).round(0).astype('int')

# On génère un vecteur des 'weights_t1' de taille N avec des valeurs uniformément réparties entre 50 et 99, 
# arrondies à 2 décimales
weights_t1 = np.random.uniform(50, 99, size=N).round(2).reshape(N, 1)

# On génère un vecteur des 'shoulder_circumference_t1' de taille N avec des valeurs uniformément réparties 
# entre 30 et 60 
shoulder_circumference_t1 = np.random.uniform(30, 60, size=N).round(2).reshape(N,1)


Je convertis les données générées en liste pour l'implémentation dans cassandra et les ajoute au disctionnaire `cassandra_dict`.

In [9]:
cassandra_dict['table1'] = [x.tolist() for x in (id_pref,id_company_1,ages_t1,weights_t1,shoulder_circumference_t1)]
cassandra_dict['table1'] = [[x[i][0]for x in cassandra_dict['table1']] for i in range(len(cassandra_dict['table1'][0])) ]
len(cassandra_dict['table1'])

100000

Je m'assure que les données sont correctes.

In [10]:
cassandra_dict['table1'][:10]

[[2, 1, 38, 56.66, 43.31],
 [1, 2, 53, 53.32, 30.7],
 [0, 3, 20, 68.95, 47.78],
 [2, 4, 18, 69.29, 46.69],
 [2, 5, 32, 61.82, 33.26],
 [2, 6, 29, 53.73, 56.74],
 [0, 7, 18, 71.09, 46.04],
 [1, 8, 34, 89.42, 55.69],
 [2, 9, 48, 66.28, 56.97],
 [2, 10, 28, 59.65, 51.4]]

#### Données de la table2

In [11]:
# On définit une variable 'N' égale à 100 000
N = 100_000

# On génère un vecteur des 'id_pref' de taille N en choisissant aléatoirement 
# parmi 0, 1 et 2 selon les probabilités 0.4, 0.3 et 0.3
id_pref = np.random.choice([0, 1, 2], p=[0.4, 0.3, 0.3], size=N).reshape(N,1).astype('int')

# On crée un vecteur des 'id_company_2' avec des valeurs allant de N+1 à 2*N inclus
id_company_2 = np.array(range(N+1,2*N+1)).reshape(N,1).astype('int')

# On génère un vecteur des 'ages_t2' de taille N avec des valeurs entières générées par la fonction 'genere_nombre'
ages_t2 = genere_nombre(30, 15, 18, 99,N).round(0).astype('int')

# On génère un vecteur des 'weights_t2' de taille N avec des valeurs générées par la fonction 'genere_nombre',
# arrondies à 2 décimales
weights_t2 = genere_nombre(70, 10, 50, 99,N).round(2)

# On génère un vecteur des 'shoulder_circumference_t2' de taille N avec des valeurs uniformément réparties 
# entre 30 et 60, arrondies à 2 décimales
shoulder_circumference_t2 = np.random.uniform(30, 60, size=N).round(2).reshape(N,1)


In [12]:
cassandra_dict['table2'] =  [x.tolist() for x in (id_pref,id_company_2,ages_t2,weights_t2,shoulder_circumference_t2)]
cassandra_dict['table2'] = [[x[i][0]for x in cassandra_dict['table2']] for i in range(len(cassandra_dict['table2'][0])) ]
len(cassandra_dict['table2'])


100000

Je m'assure que les données sont correctes.

In [13]:
cassandra_dict['table2'][:10]

[[1, 100001, 25, 78.29, 48.96],
 [1, 100002, 31, 73.82, 33.47],
 [2, 100003, 19, 69.19, 55.95],
 [2, 100004, 32, 65.65, 54.85],
 [1, 100005, 46, 67.75, 58.91],
 [2, 100006, 45, 61.68, 43.41],
 [2, 100007, 21, 68.69, 35.75],
 [0, 100008, 46, 67.74, 51.28],
 [2, 100009, 47, 68.65, 47.24],
 [1, 100010, 25, 78.8, 45.13]]

#### Données de la table3
  Les "id_pref" de la table3 sont créés avec une attribution de 0 au 40% les plus jeunes, 1 au 30% suivants et 2 au 30% restants.

In [14]:
# On définit une variable 'N' égale à 100 000
N = 100_000

# On calcule le nombre d'individus pour chaque groupe
groupe_jeune = int(0.4 * N)
autre_goupe = int(0.3 * N)

# On génère un vecteur des 'id_pref' en créant des tableaux de 0, 1 et 2 avec les tailles calculées précédemment, 
# puis on les concatène et on redimensionne en une matrice N x 1
id_pref = np.hstack((np.full(groupe_jeune, 0), np.full(autre_goupe, 1), np.full(autre_goupe, 2))).reshape(N,1).astype('int')

# On crée un vecteur des 'id_company_3' avec des valeurs allant de N+1 à 2*N inclus
id_company_3 = np.array(range(N+1,2*N+1)).reshape(N,1).astype('int')

# On génère un vecteur des 'ages_t3' de taille N avec des valeurs entières générées par la fonction 'genere_nombre',
# puis on les trie par ordre croissant
ages_t3 = np.array(sorted(genere_nombre(30,10,18,99,N).round(0).astype('int')))

# On génère un vecteur des 'weights_t3' en ajoutant 50 à la racine carrée des âges
weights_t3 = 50 + np.sqrt(ages_t3).round(2)

# On génère un vecteur des 'shoulder_circumference_t3' en ajoutant 10 au double du logarithme naturel des âges
shoulder_circumference_t3 = (10 + 2 * np.log(ages_t3)).round(2)


In [15]:
cassandra_dict['table3'] = [x.tolist() for x in (id_pref,id_company_3,ages_t3,weights_t3,shoulder_circumference_t3)]
cassandra_dict['table3'] = [[x[i][0]for x in cassandra_dict['table3']] for i in range(len(cassandra_dict['table3'][0])) ]
len(cassandra_dict['table3'])

100000

Je m'assure que les données sont correctes.

In [16]:
cassandra_dict['table3'][:10]

[[0, 100001, 18, 54.24, 15.78],
 [0, 100002, 18, 54.24, 15.78],
 [0, 100003, 18, 54.24, 15.78],
 [0, 100004, 18, 54.24, 15.78],
 [0, 100005, 18, 54.24, 15.78],
 [0, 100006, 18, 54.24, 15.78],
 [0, 100007, 18, 54.24, 15.78],
 [0, 100008, 18, 54.24, 15.78],
 [0, 100009, 18, 54.24, 15.78],
 [0, 100010, 18, 54.24, 15.78]]

In [17]:
# On s'assure que toutes les données ont été générées en vérifiant la taille des tables dans le dictionnaire.
print([len(l) for l in cassandra_dict.values()])

[100000, 100000, 100000]


Aprés connexion au cluster, je créé le keyspace. Une fois connecté à ce dernier, je créé les tables

In [18]:
# Code adapté pour cassandra 
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# On se connecte au cluster Cassandra
cluster_addresses = ['172.31.160.1']
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster=Cluster(cluster_addresses, auth_provider=auth_provider,  connect_timeout=10)
session = cluster.connect()


In [19]:
# Un temps de réponse de cassandra parfois long m'oblige à définir un timeout de 30 secondes
timeout = 30
# Création d'un keyspace avec une seule réplication 
session.execute("CREATE KEYSPACE IF NOT EXISTS tp_cassandra WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}", timeout=timeout)

# Connection au keyspace créé
session.set_keyspace('tp_cassandra')

# Création des tables
session.execute("CREATE TABLE IF NOT EXISTS table1 (id_pref int, id_company int, age int, weight float, shoulder_circumference float, PRIMARY KEY (id_company))", timeout=timeout)
session.execute("CREATE TABLE IF NOT EXISTS table2 (id_pref int, id_company int, age int, weight float, shoulder_circumference float, PRIMARY KEY (id_company))", timeout=timeout)
session.execute("CREATE TABLE IF NOT EXISTS table3 (id_pref int, id_company int, age int, weight float, shoulder_circumference float, PRIMARY KEY (id_company))", timeout=timeout)


<cassandra.cluster.ResultSet at 0x7fb62c4c9d50>

Afin de charger rapidement les données dans cassandra, j'ai créé une fonction qui le fait en batch. Je suis limité à une taille de batch de 100 sur ma machine mais cela est très rapide.

In [20]:
from cassandra.query import BatchStatement
from tqdm import tqdm

def execute_batch(rows, table_name, session, batch_size=100):
    for i in tqdm(range(0, len(rows), batch_size), desc=f"Insertion de données dans la {table_name}"):
        batch = BatchStatement()
        for row in [rows[i:(i + batch_size)][0]]:
            query = f"INSERT INTO {table_name} (id_pref, id_company, age, weight, shoulder_circumference) VALUES (%s, %s, %s, %s, %s)"
            batch.add(query, row)
        session.execute(batch)
        

In [21]:
# Ajout des lignes à chaque table avec la fonction créée
execute_batch(cassandra_dict["table1"], "table1", session)
execute_batch(cassandra_dict["table2"], "table2", session)
execute_batch(cassandra_dict["table3"], "table3", session)


Insertion de données dans la table1: 100%|██████████| 1000/1000 [00:01<00:00, 514.49it/s]
Insertion de données dans la table2: 100%|██████████| 1000/1000 [00:01<00:00, 542.19it/s]
Insertion de données dans la table3: 100%|██████████| 1000/1000 [00:01<00:00, 534.99it/s]


Pour s'assuré que les données sont bien chargées dans cassandra on contrôle la taille des tables est les première lignes.

In [22]:
print([j[0] for j in [session.execute(f"SELECT count(*) FROM tp_cassandra.table{i}") for i in range(1,4)]])
for i in range(1, 4):
    print({f"Table{i}": session.execute(f"SELECT * FROM tp_cassandra.table{i} limit 5")[:]})

  print([j[0] for j in [session.execute(f"SELECT count(*) FROM tp_cassandra.table{i}") for i in range(1,4)]])


[Row(count=1000), Row(count=1000), Row(count=1000)]
{'Table1': [Row(id_company=71801, age=25, id_pref=2, shoulder_circumference=37.58000183105469, weight=73.30000305175781), Row(id_company=5801, age=42, id_pref=1, shoulder_circumference=57.150001525878906, weight=91.6500015258789), Row(id_company=54001, age=27, id_pref=0, shoulder_circumference=39.41999816894531, weight=56.15999984741211), Row(id_company=4701, age=32, id_pref=0, shoulder_circumference=53.33000183105469, weight=68.95999908447266), Row(id_company=58201, age=19, id_pref=0, shoulder_circumference=51.150001525878906, weight=67.54000091552734)]}
{'Table2': [Row(id_company=140501, age=38, id_pref=0, shoulder_circumference=57.900001525878906, weight=61.83000183105469), Row(id_company=111501, age=37, id_pref=1, shoulder_circumference=42.9900016784668, weight=57.349998474121094), Row(id_company=196901, age=36, id_pref=0, shoulder_circumference=36.349998474121094, weight=75.20999908447266), Row(id_company=196601, age=32, id_pref=

In [23]:

# On ferme  la connexion au cluster
cluster.shutdown()

On va modifie le K-means presente plus pour qu'il partitionne les trois tables créées. 
On considère que les données avec ID_pref=0 sont sur le serveur 0, ID_pref=1 sur le serveur 1 et ID_pref=2 sur le serveur 2.
L'algorithme ne peut pas agreger les donnees de differents serveurs. 
Il faut ecrire une fonction qui recois les moyennes des centroides durant une itération
Calcul de nouvelles moyennes par rapport aux données dans un serveur

A partir de ces moyennes par serveur, on va generer une moyenne globale.

NB chaque serveur contribue a la moyenne globale en fonction des nombre d'elements dans ce dernier

## Kmeans distribué

Dans un premier temps je créé un nouvel keyspace qui va contenir les nouvelles tables.
Ces tables seront partitionnées suivant l'id_pref (clé de particitionnement).

In [24]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Définition du délai d'attente de connexion en secondes
connect_timeout_in_seconds = 10
# Utilisation de l'adresse IP du cluster et les ports spécifiés pour chaque instance Docker Cassandra
cluster_addresses = ['172.31.160.1']

# Connexion au cluster avec l'authentification 
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(cluster_addresses, auth_provider=auth_provider, connect_timeout=connect_timeout_in_seconds)
session = cluster.connect()

In [25]:
# Création du keyspace avec la stratégie de réplication SimpleStrategy
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS exe_3
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""", timeout=20)

# On utilise le keyspace créé
session.set_keyspace('exe_3')

# On Créé les tables avec id_pref comme clé de partition
table_creation_query = """
    CREATE TABLE IF NOT EXISTS {} (
        id_pref int,
        id_company int,
        age int,
        weight float,
        shoulder_circumference float,
        PRIMARY KEY (id_pref, id_company)
    )
"""

# Création des trois nouvelles tables 
session.execute(table_creation_query.format("data1"), timeout=60)
session.execute(table_creation_query.format("data2"), timeout=60)
session.execute(table_creation_query.format("data3"), timeout=60)



<cassandra.cluster.ResultSet at 0x7fb641a098d0>

Je créé une nouvelle fonction de chargement de données 

In [26]:

def insert_data(table_cas, table_dict):    
    for row in tqdm(cassandra_dict[table_dict], desc=f"Insertion de données dans la {table_cas}"):
        query = f"INSERT INTO {table_cas} (id_pref, id_company, age, weight, shoulder_circumference) VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, row, timeout=120)


Ensuite, on charge les données des dictionnaires `cassandra_dict` dans ces tables.
Les données sont réparties sur les 3 serveurs suivant leur id_pref pour tester l'algorithme de Kmeans distribué.

In [27]:
insert_data('data1','table1')
insert_data('data2','table2')
insert_data('data3','table3')

Insertion de données dans la data1: 100%|██████████| 100000/100000 [03:35<00:00, 464.80it/s]
Insertion de données dans la data2: 100%|██████████| 100000/100000 [03:40<00:00, 453.03it/s]
Insertion de données dans la data3: 100%|██████████| 100000/100000 [03:49<00:00, 435.12it/s]


Pour savoir à quoi m'attendre comme moyenne globale des centroïdes je la calcule avec le kmeans fourni plus haut. 

Dans un premier temps je convertis les colonnes des caractéristiques en numpy array pour pouvoir les utiliser avec la fonction `kmeans_`.

In [28]:
# Conversion des features en np.array
t1 = np.array([x[2:] for x in cassandra_dict['table1']])
t2 = np.array([x[2:] for x in cassandra_dict['table2']])
t3 = np.array([x[2:] for x in cassandra_dict['table3']])


Ensuite, je calcule la moyenne des centroïdes pour chaque table et je récurère les indices de tous les points.
Enfin, je calcule la moyenne globale des centroïdes.

In [39]:
# On récupère les indices, qui vont servir à connaitre le nombre d'élèment dans chaque table, 
# ainsi que la moyenne des centroïde pour chaque table
num_clusters = 7
ind1, kmns1 = kmeans_(t1, num_clusters)
ind2, kmns2 = kmeans_(t2, num_clusters)
ind3, kmns3 = kmeans_(t3, num_clusters)

In [40]:

# On calcule la moyenne globale 
kmeans_moyenne_globale = (kmns1*len(ind1) + kmns2*len(ind2) + kmns3*len(ind3))/(len(ind1) + len(ind2) + len(ind3))

In [41]:
# On affiche la moyenne des centroïdes calculée
print(f"Moyenne des centroïdes pour {num_clusters} clusters:")
print(kmeans_moyenne_globale)

Moyenne des centroïdes pour 7 clusters:
6.81622184287636


Cette moyenne globale me permettra de contrôler le que mon kmeans distribué me retourne une valeur cohérente. 

Pour l'adaption de l'algorithme de kmeans plus haut, je créé quelques fonctions utiles pour la suite.
La fonction `get_table_names_with_columns` récupère les noms des tables contenant les colonnes spécifiées. La fonction `get_server_data` récupère les données d'un serveur selon l'identifiant du serveur, le nom du keyspace et le nom de la table.

In [42]:
# !pip install matplotlib

In [31]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.neighbors import NearestNeighbors
from cassandra.query import SimpleStatement

def get_table_names_with_columns(keyspace, column_names):
    # On interroge la base de données pour obtenir les noms de tables et les noms de colonnes
    query = f"SELECT table_name, column_name FROM system_schema.columns WHERE keyspace_name = '{keyspace}';"
    statement = SimpleStatement(query, fetch_size=100)
    rows = session.execute(statement)

    # On stocke les noms de tables qui ont les colonnes spécifiées
    tables_with_columns = {}
    for row in rows:
        if row.column_name in column_names:
            if row.table_name not in tables_with_columns:
                tables_with_columns[row.table_name] = set()
            tables_with_columns[row.table_name].add(row.column_name)

    # On conserve uniquement les noms de tables qui ont toutes les colonnes spécifiées
    table_names = [table_name for table_name, columns in tables_with_columns.items() if columns == set(column_names)]

    return table_names

def get_server_data(server_id, keyspace, table_name):
    # On interroge la base de données pour obtenir les données du serveur
    query = f"SELECT age, weight, shoulder_circumference FROM {keyspace}.{table_name} WHERE id_pref = {server_id}"
    statement = SimpleStatement(query, fetch_size=100)
    rows = session.execute(statement)

    # On convertit les résultats en une liste d'arrays numpy
    server_data = [np.array([row.age, row.weight, row.shoulder_circumference]) for row in rows]

    if len(server_data) > 0:
        return np.vstack(server_data)
    else:
        return np.empty((0, 3))  # On retourne un tableau vide avec 3 colonnes (age, weight, shoulder_circumference)


On s'assure que les fonctions sont correctes

In [32]:
column_names = {'id_pref', 'id_company', 'age', 'weight', 'shoulder_circumference'}
table_name = get_table_names_with_columns(keyspace='exe_3', column_names=column_names)
df = get_server_data(1, 'exe_3', table_name[0])
df[np.random.choice(df.shape[0],5,replace=False),:]

array([[62.        , 68.26999664, 35.18999863],
       [22.        , 72.83000183, 31.81999969],
       [48.        , 98.47000122, 33.75      ],
       [23.        , 82.47000122, 39.36000061],
       [31.        , 52.09999847, 36.08000183]])

Enfin, on créé la fonction de kmeans distribué en s'appuyant sur la fonction kmeans existante.
La fonction kmeans_ effectue l'algorithme K-means pour la mise en cluster des données. La fonction kmeans_distributed effectue le K-means de manière distribuée, en récupérant les données de plusieurs serveurs et en calculant les centroïdes finaux.

In [33]:
import copy

def kmeans_(feats, num_clus):
    # On vérifie si les données sont vides
    if feats.shape[0] == 0:
        return np.array([]), np.nan  # On retourne un tableau vide et une valeur NaN pour les distances moyennes
        
    max_iter = 10000
    max_delta = 0.001
    nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree')
    iter_ = 0
    delta_ = 0
    centroides = feats[np.random.choice(feats.shape[0], num_clus, replace=False), :]
    
    while(iter_ < max_iter):
        iter_ += 1
        if not np.any(np.isnan(centroides)):
            nbrs.fit(centroides)            
            distances, indices = nbrs.kneighbors(feats)
            centroides_prev = copy.deepcopy(centroides)
        
            for i in range(centroides.shape[0]):
                centroides[i, :] = feats[np.where(indices[:, 0] == i)[0], :].mean(axis=0)
            # On vérifie la convergence en comparant la différence entre les anciens et les nouveaux centroïdes.
            delta_ = np.linalg.norm(centroides - centroides_prev)
            if delta_ < max_delta:
                break
    return indices[:, 0], distances.mean()

def kmeans_distributed(keyspace, column_names, num_clusters, num_servers):
    np.seterr(divide='ignore', invalid='ignore')
    table_names = get_table_names_with_columns(keyspace=keyspace, column_names=column_names)
    server_id = range(num_servers)
    centroids = []
    poids = []
    for i in tqdm(server_id, desc=f"recupération données serveur"):
        for table in table_names:
            data = get_server_data(i, keyspace, table)
            
            # On calcule la moyenne de chaque colonne en excluant les valeurs NaN
            column_means = np.nanmean(data, axis=0)
            
            # On trouve les indices des valeurs NaN
            inds = np.where(np.isnan(data))
            
            # On remplit les valeurs NaN avec les moyennes correspondantes de la colonne
            data[inds] = np.take(column_means, inds[1])
            
            if data.size > 0:
                indices, distances_moyenne = kmeans_(data, num_clusters)
                if len(indices) > 0 and not np.isnan(distances_moyenne):
                    centroids.append(distances_moyenne)
                    poids.append(len(indices))

    moyenne_golobale = np.array(centroids).dot(np.array(poids)) / np.array(poids).sum()
    
    return moyenne_golobale


In [34]:
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)

# On récupére les noms des tables qui ont les colonnes spécifiées

column_names = {'id_pref', 'id_company', 'age', 'weight', 'shoulder_circumference'}

# On exécute le K-means distribué sur les données de tous les serveurs
num_clusters = 7
kmeans_distribute_moyenne_globale = kmeans_distributed(keyspace='exe_3', column_names=column_names, num_clusters=num_clusters, num_servers=3)


recupération données serveur: 100%|██████████| 3/3 [00:14<00:00,  4.72s/it]


In [43]:

#  On affiche la moyenne des centroïdes calculée
print(f"Kmeans distribué : Moyenne des centroïdes pour {num_clusters} clusters:")
print(kmeans_distribute_moyenne_globale)

Kmeans distribué : Moyenne des centroïdes pour 7 clusters:
7.25167867405137


In [44]:
difference_entre_deux_kmeans = kmeans_distribute_moyenne_globale/kmeans_moyenne_globale - 1
print(f"L'écart entre les deux kmeans est de {round(difference_entre_deux_kmeans, 4)*100} pourcent.")

L'écart entre les deux kmeans est de 6.39 pourcent.


On constate que les resultats des deux algorithmes (kmeans et kmeans distribués) sont très proches pour un nombre de clusters identiques.