In [8]:
import pandas as pd
import numpy as np
import re
import json
from tqdm import tqdm


from pymilvus import CollectionSchema, FieldSchema, DataType, MilvusClient

In [36]:
# Initialize Milvus client
client = MilvusClient(uri="http://localhost:19530")

In [71]:
def benchmark_index_types(df_articles, partitioned_entities, index_types, embeddings_dim, metric_type="COSINE"):
    
    # Dictionary to store the created collections
    collections = {}

    # Iterate over different index types
    for index_type in index_types:
        collection_name = f"articles_collection_{index_type}"

        # Check if the collection already exists and drop it if necessary
        if client.has_collection(collection_name):
            print(f"Collection {collection_name} already exists. Dropping the collection... \n\n")
            client.drop_collection(collection_name)
        
        # Define the fields of the collection
        id_field = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True)
        text_field = FieldSchema(name="article", dtype=DataType.VARCHAR, max_length=65535)
        reference_field = FieldSchema(name="reference", dtype=DataType.VARCHAR, max_length=1000)
        embedding_field = FieldSchema(name="embedding_articles", dtype=DataType.FLOAT16_VECTOR, dim=embeddings_dim)

        # Define the schema
        schema = CollectionSchema(fields=[id_field, text_field, reference_field, embedding_field], description=f"Collection for {index_type} benchmark")
        
        # Create the collection
        client.create_collection(collection_name=collection_name, schema=schema)
        
        # Create partitions based on unique codes in df_articles
        codes = df_articles['normalized_code'].unique()
        for code in codes:
            client.create_partition(collection_name=collection_name, partition_name=code)
        
        # Insert entities into the partitions before creating the index
        for partition, entities in partitioned_entities.items():
            print(f"Inserting entities into partition: {partition}")
            try:
                client.insert(data=entities, collection_name=collection_name, partition_name=partition)
            except Exception as e:
                print(f"Error during insertion for partition {partition}: {e}")
        
        
        # Set the indexing parameters after insertion
        index_params = MilvusClient.prepare_index_params()

        # Configure index-specific parameters
        if index_type == "IVF_FLAT":
            # IVF-based indexes require `nlist` to define the number of clusters
            index_params.add_index(
                field_name="embedding_articles",
                metric_type=metric_type,
                index_type=index_type,
                index_name="vector_index",
                params={"nlist": 128, "nprobe": 100}  # You can adjust nlist based on your use case
            )
        elif index_type == "HNSW":
            # HNSW-specific parameters
            index_params.add_index(
                field_name="embedding_articles",
                metric_type=metric_type,
                index_type=index_type,
                index_name="vector_index",
                params={"M": 16, "efConstruction": 200}  # Example HNSW params, adjust if needed
            )
        elif index_type == "IVF_PQ":
            # IVF_PQ-specific parameters
            index_params.add_index(
                field_name="embedding_articles",
                metric_type=metric_type,
                index_type=index_type,
                index_name="vector_index",
                params={"nlist": 128, "m": 8}  #
            )
        else:
            # Default index parameters for FLAT and others
            index_params.add_index(
                field_name="embedding_articles",
                metric_type=metric_type,
                index_type=index_type,
                index_name="vector_index",
                params={}  # No additional params for FLAT
            )
        
        # Create the index on the collection after inserting the entities
        try:
            client.create_index(
                collection_name=collection_name,
                index_params=index_params,
                sync=True  # Wait for index creation to complete
            )
        except Exception as e:
            print(f"Failed to create an index on collection: {collection_name}")
            print(e)
            continue  # Skip this index and continue with the next one

        # Store the created collection in the dictionary
        collections[index_type] = collection_name

    print("\n\nBenchmark completed for all index types.")

    # Return the dictionary containing the collections
    return collections


#### import DF

In [4]:
df_articles = pd.read_csv("articles.csv")


def normalize_partition_name(name):
    # Remplacer les espaces et les tirets par des underscores
    name = re.sub(r'\s+|-', '_', name)
    # Supprimer les accents et caractères spéciaux
    name = re.sub(r'[^a-zA-Z0-9_]', '', name)
    return name

# Appliquer cette fonction aux noms des codes
df_articles['normalized_code'] = df_articles['code'].apply(normalize_partition_name)

#### Import Embeddings Partition

In [6]:
# Load the embeddings from the JSON file
with open('embeddings.json', 'r', encoding='utf-8') as f:
    loaded_embeddings = json.load(f)

In [9]:
# Create a dictionary to store entities by partition
partitioned_entities = {}

# Iterate over each row of the dataframe to create entities
for _, row in tqdm(df_articles.iterrows(), desc="Creating entities with partitions"):
    partition = row['normalized_code']
    
    # Find the corresponding embedding
    embedding = next(e['embedding'] for e in loaded_embeddings if e['id'] == row['id'])
    
    # Create an entity with the embedding
    entity = {
        "id": row['id'],
        "article": row['article'],
        "reference": row['reference'],
        "embedding_articles": np.array(embedding, dtype=np.float16)  #en np.array de float16 pour respecter les exigences de Milvus
    }
    
    # Add entity to the list corresponding to the partition
    if partition not in partitioned_entities:
        partitioned_entities[partition] = []
    partitioned_entities[partition].append(entity)

Creating entities with partitions: 22633it [10:58, 34.37it/s]


In [72]:
# Usage example
index_types = ["FLAT", "IVF_FLAT", "IVF_PQ", "HNSW"]
embeddings_dim = 1024  
created_collections = benchmark_index_types(df_articles, partitioned_entities, index_types, embeddings_dim)

Collection articles_collection_FLAT already exists. Dropping the collection... 


Inserting entities into partition: Code_Bruxellois_de_lAir_du_Climat_et_de_la_Matrise_de_lEnergie
Inserting entities into partition: Code_Bruxellois_de_lAmnagement_du_Territoire
Inserting entities into partition: Code_Bruxellois_du_Logement
Inserting entities into partition: Code_Civil
Inserting entities into partition: Code_Consulaire
Inserting entities into partition: Code_Electoral
Inserting entities into partition: Code_Electoral_Communal_Bruxellois
Inserting entities into partition: Code_Ferroviaire
Inserting entities into partition: Code_Forestier
Inserting entities into partition: Code_Judiciaire
Inserting entities into partition: Code_Pnal
Inserting entities into partition: Code_Pnal_Militaire
Inserting entities into partition: Code_Pnal_Social
Inserting entities into partition: Code_Rural
Inserting entities into partition: Code_Rglementaire_Wallon_de_lAction_sociale_et_de_la_Sant
Inserting entiti

In [55]:
created_collections

{'FLAT': 'articles_collection_FLAT',
 'IVF_FLAT': 'articles_collection_IVF_FLAT',
 'IVF_PQ': 'articles_collection_IVF_PQ',
 'HNSW': 'articles_collection_HNSW'}

#### Test Search

In [30]:
df_questions = pd.read_csv("questions_train.csv")

In [28]:
from FlagEmbedding import BGEM3FlagModel

bge_m3 = BGEM3FlagModel('BAAI/bge-m3',  
                       use_fp16=True, 
                       device='cuda')

Fetching 30 files:   0%|          | 0/30 [00:00<?, ?it/s]

  colbert_state_dict = torch.load(os.path.join(model_dir, 'colbert_linear.pt'), map_location='cpu')
  sparse_state_dict = torch.load(os.path.join(model_dir, 'sparse_linear.pt'), map_location='cpu')


In [31]:
def generate_embedding(article):
    embedding = bge_m3.encode([article], batch_size=12, max_length=8*1024)["dense_vecs"]
    return embedding[0]

In [35]:
created_collections['FLAT']

'articles_collection_FLAT'

In [37]:
all_collections = client.list_collections()
print(all_collections)

['articles_collection_FLAT', 'articles_collection_IVF_FLAT', 'articles_collection', 'articles_collection_IVF_PQ', 'articles_collection_HNSW']


In [97]:
res = client.list_partitions(collection_name="articles_collection")
print(res)

['_default', 'Code_Bruxellois_de_lAir_du_Climat_et_de_la_Matrise_de_lEnergie', 'Code_Bruxellois_de_lAmnagement_du_Territoire', 'Code_Bruxellois_du_Logement', 'Code_Civil', 'Code_Consulaire', 'Code_Electoral', 'Code_Electoral_Communal_Bruxellois', 'Code_Ferroviaire', 'Code_Forestier', 'Code_Judiciaire', 'Code_Pnal', 'Code_Pnal_Militaire', 'Code_Pnal_Social', 'Code_Rural', 'Code_Rglementaire_Wallon_de_lAction_sociale_et_de_la_Sant', 'Code_Wallon_de_lAction_sociale_et_de_la_Sant', 'Code_Wallon_de_lAgriculture', 'Code_Wallon_de_lEnseignement_Fondamental_et_de_lEnseignement_Secondaire', 'Code_Wallon_de_lEnvironnement', 'Code_Wallon_de_lHabitation_Durable', 'Code_Wallon_du_Bien_tre_des_animaux', 'Code_Wallon_du_Dveloppement_Territorial', 'Code_dInstruction_Criminelle', 'Code_de_Droit_Economique', 'Code_de_Droit_International_Priv', 'Code_de_lEau_intgr_au_Code_Wallon_de_lEnvironnement', 'Code_de_la_Dmocratie_Locale_et_de_la_Dcentralisation', 'Code_de_la_Fonction_Publique_Wallonne', 'Code_de_l

In [98]:
# Partition sur laquelle effectuer la recherche
partitions = res[1:]
#["Code_du_Bien_tre_au_Travail", "Code_des_Socits_et_des_Associations"]

# Vecteur de la question à tester
query_vector = generate_embedding(df_questions['question'].iloc[1])

# Paramètres de recherche
search_params = {
    'FLAT': {"metric_type": "COSINE", "params": {}},  # Pas de paramètres spécifiques pour FLAT
    'IVF_FLAT': {"metric_type": "COSINE", "params": {"nlist": 128, "nprobe": 8}},
    'IVF_PQ': {"metric_type": "COSINE", "params": {"nlist": 128, "m": 8}},
    'HNSW': {"metric_type": "COSINE", "params": {"ef": 200}}  # Paramètre spécifique pour HNSW
}

# Résultats de performance
performance_results = {}

# Boucle pour tester chaque collection
for index_type, collection_name in created_collections.items():
    print(f"Testing collection: {collection_name} with index type: {index_type}")

    # Charger les partitions pour la collection
    client.load_partitions(collection_name=collection_name, partition_names=partitions)

    index_info = client.describe_index(collection_name="articles_collection_IVF_FLAT", index_name="vector_index")
    print(f"Index info for IVF_FLAT: {index_info}")

    # Effectuer la recherche
    search_results = client.search(
        collection_name=collection_name,
        data=[query_vector],
        partition_names=partitions,
        limit=3,
        search_params=search_params[index_type],
        output_fields=['id', 'reference']
    )


    formatted_result = json.dumps(search_results[0], indent=3, ensure_ascii=False)
    print(f"Results for {index_type}:\n{formatted_result}")
    
    # Enregistrer les résultats dans le dictionnaire
    performance_results[index_type] = search_results


Testing collection: articles_collection_FLAT with index type: FLAT
Index info for IVF_FLAT: {'nlist': '128', 'nprobe': '100', 'index_type': 'IVF_FLAT', 'metric_type': 'COSINE', 'field_name': 'embedding_articles', 'index_name': 'vector_index', 'total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state': 'Finished'}
Results for FLAT:
[
   {
      "id": 8104,
      "distance": 0.5995019674301147,
      "entity": {
         "id": 8104,
         "reference": "Art. 815, Code Réglementaire Wallon de l'Action sociale et de la Santé (Livre V, Titre VII, Chapitre V, Section 4, Sous-section 4)"
      }
   },
   {
      "id": 7250,
      "distance": 0.5921611189842224,
      "entity": {
         "id": 7250,
         "reference": "Art. 10/21, Code Réglementaire Wallon de l'Action sociale et de la Santé (Livre III, PartiePREMIERE/1, Titre III, Chapitre III, Section 1)"
      }
   },
   {
      "id": 1073,
      "distance": 0.5771446824073792,
      "entity": {
         "id": 1073,
         

In [95]:
client.load_partitions(collection_name = "articles_collection_FLAT",
                                    partition_names=["Code_du_Bien_tre_au_Travail", "Code_des_Socits_et_des_Associations"])

search_results = client.search(
    collection_name="articles_collection",
    data = [query_vector],  # Vecteurs de requête
    partition_names=["Code_du_Bien_tre_au_Travail", "Code_des_Socits_et_des_Associations"],  # Rechercher uniquement dans la partition "Code Civil"
    limit=3,  # Nombre de résultats à retourner

    output_fields= ['id']
)

formatted_result = json.dumps(search_results[0], indent=3, ensure_ascii=False)
print(f"Results:\n{formatted_result}")

Results:
[
   {
      "id": 22036,
      "distance": 0.5346904993057251,
      "entity": {
         "id": 22036
      }
   },
   {
      "id": 20103,
      "distance": 0.5224465727806091,
      "entity": {
         "id": 20103
      }
   },
   {
      "id": 20394,
      "distance": 0.519309401512146,
      "entity": {
         "id": 20394
      }
   }
]


In [74]:
labels = df_questions['article_ids'].iloc[0]
print(f"Labels: {labels} \n")


# Affichage des résultats pour chaque index
for index_type, result in performance_results.items():
    list_ids = [result["entity"]["id"] for result in result[0]]
    list_prods = [result["distance"] for result in result[0]]
    print(f"Index Type: {index_type} - results: {list_ids} - distances: {list_prods}")

    print('\n')


Labels: 22225,22226,22227,22228,22229,22230,22231,22232,22233,22234 

Index Type: FLAT - results: [22036, 20103, 20394] - distances: [0.5346904993057251, 0.5224465727806091, 0.519309401512146]


Index Type: IVF_FLAT - results: [22036, 20103, 20394] - distances: [0.5346904993057251, 0.5224465727806091, 0.519309401512146]


Index Type: IVF_PQ - results: [22036, 20103, 20394] - distances: [0.5346904993057251, 0.5224465727806091, 0.519309401512146]


Index Type: HNSW - results: [22036, 20103, 20394] - distances: [0.5346904993057251, 0.5224465727806091, 0.519309401512146]




In [99]:
# Vérifier si l'index est appliqué à la collection
index_info = client.describe_index(collection_name="articles_collection_IVF_FLAT", index_name="vector_index")
print(f"Index info for IVF_FLAT: {index_info}")


RPC error: [describe_index], <MilvusException: (code=<bound method _MultiThreadedRendezvous.code of <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:19530: ConnectEx: Connection refused (No connection could be made because the target machine actively refused it.
 -- 10061)"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-10-22T13:06:19.4586478+00:00", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:19530: ConnectEx: Connection refused (No connection could be made because the target machine actively refused it.\r\n -- 10061)"}"
>>, message=Retry run out of 75 retry times, message=failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:19530: ConnectEx: Connection refused (No connection could be made because the target machine actively refused it.
 -- 10061)

MilvusException: <MilvusException: (code=<bound method _MultiThreadedRendezvous.code of <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:19530: ConnectEx: Connection refused (No connection could be made because the target machine actively refused it.
 -- 10061)"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-10-22T13:06:19.4586478+00:00", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:19530: ConnectEx: Connection refused (No connection could be made because the target machine actively refused it.\r\n -- 10061)"}"
>>, message=Retry run out of 75 retry times, message=failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:19530: ConnectEx: Connection refused (No connection could be made because the target machine actively refused it.
 -- 10061))>

In [69]:
res = client.describe_collection(collection_name="articles_collection_IVF_FLAT")
format_res = json.dumps(res, indent=3, ensure_ascii=False)
print(format_res)

{
   "collection_name": "articles_collection_IVF_FLAT",
   "auto_id": false,
   "num_shards": 1,
   "description": "Collection for IVF_FLAT benchmark",
   "fields": [
      {
         "field_id": 100,
         "name": "id",
         "description": "",
         "type": 5,
         "params": {},
         "is_primary": true
      },
      {
         "field_id": 101,
         "name": "article",
         "description": "",
         "type": 21,
         "params": {
            "max_length": 65535
         }
      },
      {
         "field_id": 102,
         "name": "reference",
         "description": "",
         "type": 21,
         "params": {
            "max_length": 1000
         }
      },
      {
         "field_id": 103,
         "name": "embedding_articles",
         "description": "",
         "type": 102,
         "params": {
            "dim": 1024
         }
      }
   ],
   "aliases": [],
   "collection_id": 453359513790895237,
   "consistency_level": 2,
   "properties": {},
