In [38]:
import polars as pl
import glob
import os

from haystack import Pipeline
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter

from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever
from haystack.components.embedders import SentenceTransformersDocumentEmbedder

In [39]:
df = pl.read_parquet('hf://datasets/AgentPublic/piaf/plain_text/train-00000-of-00001.parquet')

In [43]:
df[0]["question"].to_list()

['Combien de personnes travaillent au ministère des sports']

## Dataset Analysis

In [29]:
print(df.columns)

['id', 'title', 'context', 'question', 'answers']


In [30]:
df.describe()

statistic,id,title,context,question,answers
str,str,str,str,str,f64
"""count""","""3835""","""3835""","""3835""","""3835""",3835.0
"""null_count""","""0""","""0""","""0""","""0""",0.0
"""mean""",,,,,
"""std""",,,,,
"""min""","""p140295201616088""","""6 Heures de Shanghai 2017""","""2012 est sorti en 2012. Son th…","""A cause de qui Emanuele se voi…",
"""25%""",,,,,
"""50%""",,,,,
"""75%""",,,,,
"""max""","""p140295460357824""","""Événement Azolla""","""Étienne Báthory, roi de Pologn…","""à quelle ronde fénix est sorti…",


In [31]:
# Indiquer le nombre de valeur nulle dans chaque colonne
print(df.null_count())

# Indiquer le nombre de lignes
print(df.height)

shape: (1, 5)
┌─────┬───────┬─────────┬──────────┬─────────┐
│ id  ┆ title ┆ context ┆ question ┆ answers │
│ --- ┆ ---   ┆ ---     ┆ ---      ┆ ---     │
│ u32 ┆ u32   ┆ u32     ┆ u32      ┆ u32     │
╞═════╪═══════╪═════════╪══════════╪═════════╡
│ 0   ┆ 0     ┆ 0       ┆ 0        ┆ 0       │
└─────┴───────┴─────────┴──────────┴─────────┘
3835


In [32]:
# keep 2500 rows
df = df.head(2500)

# Shuffle the dataframe
df_shuffled = df.sample(fraction=1.0, with_replacement=False, seed=42)

# Calculate split indices
n = df_shuffled.height
train_end = int(n * 0.7)
val_end = train_end + int(n * 0.15)

# Split the dataframe
train_df = df_shuffled[:train_end]
val_df = df_shuffled[train_end:val_end]
test_df = df_shuffled[val_end:]

In [33]:
from haystack import Document

# Create document lists for each dataset split
train_documents = [
    Document(
        content=row['context'],
        meta={'id': row['id'], 'title': row['title']}
    )
    for row in train_df.to_dicts()
]

val_documents = [
    Document(
        content=row['context'],
        meta={'id': row['id'], 'title': row['title']}
    )
    for row in val_df.to_dicts()
]

test_documents = [
    Document(
        content=row['context'],
        meta={'id': row['id'], 'title': row['title']}
    )
    for row in test_df.to_dicts()
]

# Use training documents for indexing
documents = test_documents

In [None]:
embedding_model_list = [
    "intfloat/multilingual-e5-large-instruct",
    "Lajavaness/bilingual-embedding-large",
    "HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1",
]

In [9]:
for i, model in enumerate(embedding_model_list):
    print(f"Model {i+1}: {model}")
    embedder = SentenceTransformersDocumentEmbedder(model = model, trust_remote_code=True)
    embedder.warm_up()

    document_store = MilvusDocumentStore(
        connection_args={"uri": "./milvus.db"},
        drop_old=True,
        collection_name=f"piaf_{i+1}"
    )
    indexing_pipeline = Pipeline()
    indexing_pipeline.add_component("embedder", embedder)
    indexing_pipeline.add_component("writer", DocumentWriter(document_store))
    indexing_pipeline.connect("embedder", "writer")
    indexing_pipeline.run({"documents": documents})
    print(f"Indexed {len(documents)} documents with model {model}")

Model 1: intfloat/multilingual-e5-large-instruct


Batches:   0%|          | 0/12 [00:00<?, ?it/s]

Indexed 375 documents with model intfloat/multilingual-e5-large-instruct
Model 2: Lajavaness/bilingual-embedding-large


Batches:   0%|          | 0/12 [00:00<?, ?it/s]

Indexed 375 documents with model Lajavaness/bilingual-embedding-large
Model 3: HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1


Batches:   0%|          | 0/12 [00:00<?, ?it/s]

Indexed 375 documents with model HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1
Model 4: Qwen/Qwen3-Embedding-8B


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/215 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/16.3k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/729 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/30.4k [00:00<?, ?B/s]

Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/4.92G [00:00<?, ?B/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/4.90G [00:00<?, ?B/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/336M [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

tokenizer_config.json:   0%|          | 0.00/7.26k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/313 [00:00<?, ?B/s]

Batches:   0%|          | 0/12 [00:00<?, ?it/s]

PipelineRuntimeError: The following component failed to run:
Component name: 'embedder'
Component type: 'SentenceTransformersDocumentEmbedder'
Error: MPS backend out of memory (MPS allocated: 35.95 GB, other allocations: 2.12 MB, max allowed: 36.27 GB). Tried to allocate 333.06 MB on private pool. Use PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0 to disable upper limit for memory allocations (may cause system failure).

In [11]:
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.evaluators import DocumentMAPEvaluator, DocumentMRREvaluator, DocumentRecallEvaluator, DocumentNDCGEvaluator
from haystack import Document, Pipeline


for i, model in enumerate(embedding_model_list):
    grounds_truth = []
    retrieval_results_list = []
    print(f"Model {i+1}: {model}")
    embedder = SentenceTransformersTextEmbedder(model=model, trust_remote_code=True, progress_bar=False)
    embedder.warm_up()

    document_store = MilvusDocumentStore(
        connection_args={"uri": "./milvus.db"},
        collection_name=f"piaf_{i+1}"
    )

    retrieval_pipeline = Pipeline()
    retrieval_pipeline.add_component("embedder", embedder)
    retrieval_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
    retrieval_pipeline.connect("embedder", "retriever")

    for row in test_df.to_dicts():
        retrieval_results = retrieval_pipeline.run({"embedder": {"text": row["question"]}})
        grounds_truth.append([Document(
            content=row["context"],
            meta={
                "id": row["id"],
                "title": row["title"],
            }
        )])
        retrieval_result = retrieval_results["retriever"]["documents"]
        retrieval_results_list.append(retrieval_result)
    evaluator = Pipeline()
    mrr_evaluator = DocumentMRREvaluator()
    map_evaluator = DocumentMAPEvaluator()
    recall = DocumentRecallEvaluator()
    ndcg = DocumentNDCGEvaluator()
    evaluator.add_component("mrr_evaluator", mrr_evaluator)
    evaluator.add_component("map_evaluator", map_evaluator)
    evaluator.add_component("recall_evaluator", recall)
    evaluator.add_component("ndcg_evaluator", ndcg)
    score = evaluator.run({
        "mrr_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth},
        "map_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth},
        "recall_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth},
        "ndcg_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth}
    }
    )
    print(f"Score for model {model}: ")
    print(f"MRR: {score['mrr_evaluator']['score']}")
    print(f"MAP: {score['map_evaluator']['score']}")
    print(f"Recall: {score['recall_evaluator']['score']}")
    print(f"NDCG: {score['ndcg_evaluator']['score']}")

Model 1: intfloat/multilingual-e5-large-instruct
Score for model intfloat/multilingual-e5-large-instruct: 
MRR: 0.6666666666666666
MAP: 0.6666666666666666
Recall: 0.6666666666666666
NDCG: 0.0
Model 2: Lajavaness/bilingual-embedding-large
Score for model Lajavaness/bilingual-embedding-large: 
MRR: 0.6693333333333333
MAP: 0.6693333333333333
Recall: 0.6693333333333333
NDCG: 0.0
Model 3: HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1
Score for model HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1: 
MRR: 0.648
MAP: 0.648
Recall: 0.648
NDCG: 0.0


In [13]:
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack import Pipeline

# 1. DocumentStore en mémoire avec BM25
document_store = InMemoryDocumentStore()
document_store.write_documents(documents)

retriever = InMemoryBM25Retriever(document_store=document_store)
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("retriever", retriever)


grounds_truth = []
retrieval_results_list = []

for row in test_df.to_dicts():
    retrieval_results = retrieval_pipeline.run({"retriever": {"query": row["question"]}})
    grounds_truth.append([Document(
        content=row["context"],
        meta={
            "id": row["id"],
            "title": row["title"],
        }
    )])
    retrieval_result = retrieval_results["retriever"]["documents"]
    retrieval_results_list.append(retrieval_result)

In [14]:
evaluator = Pipeline()
mrr_evaluator = DocumentMRREvaluator()
map_evaluator = DocumentMAPEvaluator()
recall = DocumentRecallEvaluator()
ndcg = DocumentNDCGEvaluator()
evaluator.add_component("mrr_evaluator", mrr_evaluator)
evaluator.add_component("map_evaluator", map_evaluator)
evaluator.add_component("recall_evaluator", recall)
evaluator.add_component("ndcg_evaluator", ndcg)
score = evaluator.run({
    "mrr_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth},
    "map_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth},
    "recall_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth},
    "ndcg_evaluator": {"retrieved_documents": retrieval_results_list, "ground_truth_documents": grounds_truth}
}
)
print(f"Score for model {model}: ")
print(f"MRR: {score['mrr_evaluator']['score']}")
print(f"MAP: {score['map_evaluator']['score']}")
print(f"Recall: {score['recall_evaluator']['score']}")
print(f"NDCG: {score['ndcg_evaluator']['score']}")

Score for model HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1: 
MRR: 0.6371111111111107
MAP: 0.6576162257495586
Recall: 0.7306666666666667
NDCG: 0.39933996689679263


In [8]:
output_path = "piaf_results"
if not os.path.exists(output_path):
    os.makedirs(output_path)

In [None]:
from sentence_transformers import (
    SentenceTransformer,
    SentenceTransformerTrainer,
    SentenceTransformerTrainingArguments
)
from sentence_transformers.losses import MultipleNegativesRankingLoss
from sentence_transformers.training_args import BatchSamplers
from sentence_transformers.evaluation import InformationRetrievalEvaluator
from datasets import Dataset

# 1. Modèle de base
base_model = "intfloat/multilingual-e5-large-instruct"
model = SentenceTransformer(base_model)

# 2. Préparation des données
train_examples = [
    {"anchor": row["question"], "positive": row["context"]}
    for row in train_df.to_dicts()
]
train_dataset = Dataset.from_list(train_examples)

val_queries = {f"q{i}": row["question"] for i, row in enumerate(val_df.to_dicts())}
val_corpus  = {f"d{i}": row["context"]  for i, row in enumerate(val_df.to_dicts())}
val_rel     = {qid: {did: 1} for qid, did in zip(val_queries, val_corpus)}

evaluator = InformationRetrievalEvaluator(
    val_queries, val_corpus, val_rel, name="piaf-validation"
)

# 3. Loss
train_loss = MultipleNegativesRankingLoss(model)

# 4. Hyper-paramètres adaptés à MPS
training_args = SentenceTransformerTrainingArguments(
    output_dir=output_path,                # dossier de sortie
    num_train_epochs=1,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    warmup_ratio=0.1,
    # Désactivation explicite de toute mixed precision
    batch_sampler=BatchSamplers.NO_DUPLICATES,
)

# 5. Création du Trainer et lancement
trainer = SentenceTransformerTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    evaluator=evaluator,
    loss=train_loss
)
trainer.train()

# 6. Sauvegarde
model.save_pretrained(output_path)
print(f"✓ Modèle entraîné et sauvegardé dans : {output_path}")

## Indexing data

In [34]:
final_model_name = "intfloat/multilingual-e5-large-instruct"

In [None]:
embedder = SentenceTransformersDocumentEmbedder(model = final_model_name, trust_remote_code=True)
embedder.warm_up()

document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    drop_old=True,
    collection_name="piaf_final_model"
)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("embedder", embedder)
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.run({"documents": train_documents})
indexing_pipeline.run({"documents": val_documents})
indexing_pipeline.run({"documents": test_documents})
print(f"Indexed {len(documents)} documents with model {model}")

Batches:   0%|          | 0/55 [00:00<?, ?it/s]

Batches:   0%|          | 0/12 [00:00<?, ?it/s]

Batches:   0%|          | 0/12 [00:00<?, ?it/s]

Indexed 375 documents with model SentenceTransformer(
  (0): Transformer({'max_seq_length': 512, 'do_lower_case': False}) with Transformer model: XLMRobertaModel 
  (1): Pooling({'word_embedding_dimension': 1024, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})
  (2): Normalize()
)


In [45]:
document_store.count_documents()

2500