In [1]:
import time, psutil, json, numpy as np
import matplotlib.pyplot as plt
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

# Configurações DGX
MILVUS_HOST = "172.17.0.1" 
MILVUS_PORT = "19540"
COLLECTION_NAME = "lfw_cosface_tta"

connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
print(f"Status da Conexão: {connections.get_connection_addr('default')}")

Status da Conexão: {'address': '172.17.0.1:19540', 'user': ''}


In [5]:
if utility.has_collection(COLLECTION_NAME):
    utility.drop_collection(COLLECTION_NAME)

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="label", dtype=DataType.VARCHAR, max_length=500),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024) # TTA habilitado
]
schema = CollectionSchema(fields, "Benchmark LFW CosFace TTA")
collection = Collection(COLLECTION_NAME, schema)
print("Coleção criada com sucesso.")

Coleção criada com sucesso.


In [6]:
embeddings = np.load("assets/lfw_resnet_embeddings.npy").astype('float32')
with open("assets/lfw_resnet_names.json", 'r') as f:
    labels = json.load(f)

process = psutil.Process()
num_vectors = len(embeddings)

print(f"-> Iniciando Benchmarking de Ingestão para {num_vectors} vetores...")

# --- MÉTODO A: Individual (Síncrono) ---
print("\n[MÉTODO A] Inserindo 100 amostras individualmente para medir latência...")
mem_start_a = process.memory_info().rss / (1024 * 1024)
t0 = time.time()

for i in range(100):
    collection.insert([[labels[i]], [embeddings[i].tolist()]])

tempo_a = time.time() - t0
mem_end_a = process.memory_info().rss / (1024 * 1024)
vps_a = 100 / tempo_a

print(f"  - Tempo Total (100 vetores): {tempo_a:.4f}s")
print(f"  - Consumo de RAM Adicional: {mem_end_a - mem_start_a:.2f} MB")
print(f"  - Throughput: {vps_a:.2f} vetores/seg")

# --- MÉTODO B: Bulk (Lote) ---
print(f"\n[MÉTODO B] Inserindo restante ({num_vectors-100}) em lote (Batch: 1000)...")
mem_start_b = process.memory_info().rss / (1024 * 1024)
t1 = time.time()
batch_size = 1000

for i in range(100, num_vectors, batch_size):
    end = min(i + batch_size, num_vectors)
    collection.insert([labels[i:end], embeddings[i:end].tolist()])

collection.flush() # Garante a persistência no disco
tempo_b = time.time() - t1
mem_end_b = process.memory_info().rss / (1024 * 1024)
vps_b = (num_vectors - 100) / tempo_b

print(f"  - Tempo Total: {tempo_b:.4f}s")
print(f"  - Consumo de RAM Adicional: {mem_end_b - mem_start_b:.2f} MB")
print(f"  - Throughput: {vps_b:.2f} vetores/seg")

print("\n" + "="*40)
print(f"RESULTADO: O Método Bulk foi {vps_b/vps_a:.1f}x mais eficiente.")
print("="*40)

-> Iniciando Benchmarking de Ingestão para 13233 vetores...

[MÉTODO A] Inserindo 100 amostras individualmente para medir latência...
  - Tempo Total (100 vetores): 0.2618s
  - Consumo de RAM Adicional: 0.01 MB
  - Throughput: 381.90 vetores/seg

[MÉTODO B] Inserindo restante (13133) em lote (Batch: 1000)...
  - Tempo Total: 5.9270s
  - Consumo de RAM Adicional: 6.13 MB
  - Throughput: 2215.78 vetores/seg

RESULTADO: O Método Bulk foi 5.8x mais eficiente.
