# 加载已经展平的数据

In [1]:
from datasets import load_dataset, load_from_disk

ds = load_from_disk("../datasets/Musique_flattened")
ds

Dataset({
    features: ['id', 'title', 'text', 'hop'],
    num_rows: 398724
})

# 获取embedding字段的embedding
需要提前下载Qwen3-Embedding-0.6B

In [None]:
import torch
from sentence_transformers import SentenceTransformer
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"  # 指定使用的GPU设备
model = SentenceTransformer(
    "Qwen/Qwen3-Embedding-0.6B",
    cache_folder="../models/Qwen3-Embedding-0.6B",
    model_kwargs={
        "attn_implementation": "flash_attention_2",
        "torch_dtype": torch.bfloat16,
    },
    tokenizer_kwargs={"padding_side": "left"},
    device="cuda",  # 指定一个主设备
)

def make_encoder_fn(model: SentenceTransformer, pool):
    def encode_text(batch):
        embeddings = model.encode_document(
            batch["text"],
            normalize_embeddings=True,
            pool=pool,
            show_progress_bar=True,
        )
        batch["vector"] = embeddings  # 保持张量格式（Parquet 会自动转换）
        return batch
    return encode_text

pool = model.start_multi_process_pool()
encode_text_fn = make_encoder_fn(model, pool)

ds = ds.map(
    encode_text_fn,
    batched=True,
    batch_size=4 * 10240,  # 根据 GPU 内存大小调整批处理大小
)

model.stop_multi_process_pool(pool)
ds.to_parquet("../datasets/Musique_encode/Musique_encoded.parquet")

In [6]:
from dotenv import load_dotenv
import os
from pymilvus import MilvusClient, DataType, IndexType

def init_database(collection_name: str, endpoint: str = None, token: str = None):
    """初始化或重置 Milvus 集合。"""
    client = MilvusClient(uri=endpoint, token=token)
    # [WARNING] 如果集合已存在，将会被删除, 请谨慎使用此功能！
    if client.has_collection(collection_name=collection_name):
        print(f"集合 '{collection_name}' 已存在，正在删除...")
        client.drop_collection(collection_name=collection_name)

    schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=False)
    schema.add_field(
        field_name="id", datatype=DataType.VARCHAR, max_length=64, is_primary=True
    )
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=1024)
    schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=256)
    # 将 text 字段的最大长度增加，以防段落过长导致错误
    schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=4096)
    schema.add_field(field_name="hop", datatype=DataType.INT8)

    index_params = MilvusClient.prepare_index_params()
    index_params.add_index(
        field_name="vector",
        index_name="vector_index",
        index_type=IndexType.FLAT,
        metric_type="IP",
        params={},
    )

    print(f"正在创建新集合 '{collection_name}'...")
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
        index_params=index_params,
    )

    return client

load_dotenv()
endpoint = os.getenv("MILVUS_ENDPOINT")
token = os.getenv("MILVUS_TOKEN")

collection_name = "Musique"
client = init_database(collection_name, endpoint, token)

集合 'Musique' 已存在，正在删除...
正在创建新集合 'Musique'...


In [7]:
from tqdm.auto import tqdm

batch_size = 10000  # 每批次插入的文档数量
num_docs = len(ds)  # 数据集中的总文档数量
for i in tqdm(range(0, num_docs, batch_size), desc="插入数据批次"):
    # 计算当前批次的结束索引
    end_index = min(i + batch_size, num_docs)

    # 从 Hugging Face Dataset 中选择一个切片
    batch_slice = ds.select(range(i, end_index))

    # 将切片转换为 MilvusClient.insert 所需的 list of dicts 格式
    data_to_insert = batch_slice.to_list()

    # 插入当前批次的数据
    client.insert(collection_name=collection_name, data=data_to_insert)

插入数据批次:   0%|          | 0/40 [00:00<?, ?it/s]