In [15]:
from pymilvus import MilvusClient, connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np

# Milvus 서버 연결
client = MilvusClient(uri="http://localhost:19530")
connections.connect(host="localhost", port="19530")

print("✅ Milvus 연결 성공!")
print(f"현재 컬렉션: {client.list_collections()}")


✅ Milvus 연결 성공!
현재 컬렉션: ['test_collection', 'demo_collection', 'delete_test_collection']


In [16]:
# 기존 컬렉션 정리
collection_name = "delete_test_collection"

if collection_name in client.list_collections():
    client.drop_collection(collection_name)
    print(f"🗑️ 기존 컬렉션 '{collection_name}' 삭제됨")

# 스키마 정의
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=200)
]

schema = CollectionSchema(fields, description="삭제 테스트용 컬렉션")
collection = Collection(name=collection_name, schema=schema)

print(f"✅ 컬렉션 '{collection_name}' 생성 완료!")


🗑️ 기존 컬렉션 'delete_test_collection' 삭제됨
✅ 컬렉션 'delete_test_collection' 생성 완료!


In [17]:
# 10개의 테스트 데이터 삽입
num_entities = 10
embeddings = np.random.random((num_entities, 128)).tolist()
texts = [f"테스트 문서 {i}" for i in range(num_entities)]

# 데이터 삽입 (id는 auto_id=True이므로 제외)
data = [embeddings, texts]
insert_result = collection.insert(data)
collection.flush()

print(f"✅ {len(insert_result.primary_keys)}개 데이터 삽입 완료")
print(f"📊 현재 entity 수: {collection.num_entities}")
print(f"🔑 삽입된 ID 범위: {min(insert_result.primary_keys)} ~ {max(insert_result.primary_keys)}")

# 인덱스 생성 및 로드
index_params = {"metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 4}}
collection.create_index("embedding", index_params)
collection.load()
print("✅ 인덱스 생성 및 컬렉션 로드 완료")


✅ 10개 데이터 삽입 완료
📊 현재 entity 수: 10
🔑 삽입된 ID 범위: 458834835355992333 ~ 458834835355992342
✅ 인덱스 생성 및 컬렉션 로드 완료


In [18]:
# 삭제할 ID 선정 (처음 3개)
delete_ids = insert_result.primary_keys[:3]

print("=" * 50)
print("🎯 삭제 실험 시작!")
print(f"삭제 전 entity 수: {collection.num_entities}")
print(f"삭제할 ID들: {delete_ids}")

# 삭제 실행
collection.delete(expr=f"id in {delete_ids}")
collection.flush()

print(f"삭제 후 entity 수: {collection.num_entities}")
print("=" * 50)
print("❓ 어라? entity 수가 변하지 않았네요!")
print("🔍 이제 진짜 삭제 여부를 확인해보겠습니다...")


🎯 삭제 실험 시작!
삭제 전 entity 수: 10
삭제할 ID들: [458834835355992333, 458834835355992334, 458834835355992335]
삭제 후 entity 수: 10
❓ 어라? entity 수가 변하지 않았네요!
🔍 이제 진짜 삭제 여부를 확인해보겠습니다...


In [19]:
# 1️⃣ 삭제된 데이터 조회 시도
print("1️⃣ query()로 삭제된 데이터 조회 시도:")
try:
    query_result = collection.query(
        expr=f"id in {delete_ids}",
        output_fields=["id", "text"]
    )
    print(f"   조회된 데이터: {len(query_result)}개")
    if len(query_result) == 0:
        print("   ✅ 삭제된 데이터는 조회되지 않음! (논리적 삭제 성공)")
    else:
        print(f"   ❌ 아직 {len(query_result)}개가 조회됨")
except Exception as e:
    print(f"   조회 오류: {e}")

print("\n2️⃣ search()로 삭제된 데이터 검색 시도:")
# 삭제된 데이터 중 하나의 벡터로 검색
search_vector = [embeddings[0]]  # 첫 번째 벡터 (삭제됨)
search_results = collection.search(
    data=search_vector,
    anns_field="embedding",
    param={"metric_type": "L2", "params": {"nprobe": 4}},
    limit=10,
    output_fields=["id", "text"]
)

found_deleted_ids = [hit.id for hit in search_results[0] if hit.id in delete_ids]
print(f"   검색 결과에서 삭제된 ID 발견: {found_deleted_ids}")
if len(found_deleted_ids) == 0:
    print("   ✅ 삭제된 데이터는 검색되지 않음! (논리적 삭제 성공)")
else:
    print(f"   ❌ 아직 {len(found_deleted_ids)}개가 검색됨")


1️⃣ query()로 삭제된 데이터 조회 시도:
   조회된 데이터: 0개
   ✅ 삭제된 데이터는 조회되지 않음! (논리적 삭제 성공)

2️⃣ search()로 삭제된 데이터 검색 시도:
   검색 결과에서 삭제된 ID 발견: []
   ✅ 삭제된 데이터는 검색되지 않음! (논리적 삭제 성공)


In [20]:
print("3️⃣ Manual Compaction 실행하기:")
print(f"   Compaction 전 entity 수: {collection.num_entities}")

# 컬렉션 언로드 (compaction을 위해 필요)
collection.release()
print("   컬렉션 언로드 완료")

# Compaction 실행
try:
    compaction_id = utility.compact(collection_name)
    print(f"   ✅ Compaction 시작됨 (ID: {compaction_id})")
    
    # Compaction 상태 확인
    import time
    for i in range(5):
        state = utility.get_compaction_state(compaction_id)
        print(f"   Compaction 상태 ({i+1}/5): {state}")
        if state.state == 2:  # Completed
            break
        time.sleep(1)
        
except Exception as e:
    print(f"   ⚠️ Compaction 오류: {e}")
    print("   (로컬 환경에서는 정상적인 현상일 수 있습니다)")

# 컬렉션 재로드
collection.load()
print("   컬렉션 재로드 완료")

print(f"   Compaction 후 entity 수: {collection.num_entities}")
if collection.num_entities < 10:
    print("   🎉 물리적 삭제 완료! entity 수가 감소했습니다!")
else:
    print("   ℹ️ 실제 운영환경에서는 시간이 더 걸릴 수 있습니다.")


3️⃣ Manual Compaction 실행하기:
   Compaction 전 entity 수: 10
   컬렉션 언로드 완료
   ⚠️ Compaction 오류: module 'pymilvus.orm.utility' has no attribute 'compact'
   (로컬 환경에서는 정상적인 현상일 수 있습니다)
   컬렉션 재로드 완료
   Compaction 후 entity 수: 10
   ℹ️ 실제 운영환경에서는 시간이 더 걸릴 수 있습니다.


In [21]:
# 정리
collection.release()
connections.disconnect("default")

print("🎉 Milvus 삭제 메커니즘 학습 완료!")
print("\n💡 배운 내용:")
print("- delete() 후 즉시 num_entities가 변하지 않는 것은 정상")
print("- query()와 search()로 실제 삭제 여부 확인 가능")
print("- 물리적 삭제는 compaction을 통해 백그라운드에서 처리")
print("- 실무에서는 논리적 삭제만으로도 충분한 경우가 많음")


🎉 Milvus 삭제 메커니즘 학습 완료!

💡 배운 내용:
- delete() 후 즉시 num_entities가 변하지 않는 것은 정상
- query()와 search()로 실제 삭제 여부 확인 가능
- 물리적 삭제는 compaction을 통해 백그라운드에서 처리
- 실무에서는 논리적 삭제만으로도 충분한 경우가 많음


In [22]:
from pymilvus import MilvusClient, connections
import numpy as np

# MilvusClient를 사용한 연결 (간단한 방법)
client = MilvusClient(uri="http://localhost:19530")

# connections를 사용한 연결 (고급 기능 사용 시)
connections.connect(host="localhost", port="19530")

print("Milvus 연결 성공!")
print(f"현재 컬렉션 목록: {client.list_collections()}")


Milvus 연결 성공!
현재 컬렉션 목록: ['test_collection', 'demo_collection', 'delete_test_collection']


In [3]:
from pymilvus import FieldSchema, CollectionSchema, DataType

# 필드 정의
fields = [
    # id 필드: 기본키, 자동생성
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    
    # embedding 필드: 128차원 벡터
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    
    # 추가 메타데이터 필드 (선택사항)
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=500),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=100)
]

# 스키마 생성
schema = CollectionSchema(fields, description="벡터 검색을 위한 테스트 컬렉션")

print("스키마 정의 완료!")
for field in fields:
    print(f"- {field.name}: {field.dtype} (dim: {getattr(field, 'dim', 'N/A')})")


스키마 정의 완료!
- id: 5 (dim: None)
- embedding: 101 (dim: 128)
- text: 21 (dim: None)
- category: 21 (dim: None)


In [4]:
from pymilvus import Collection

# 컬렉션 이름
collection_name = "demo_collection"

# 기존 컬렉션이 있다면 삭제 (테스트용)
if collection_name in client.list_collections():
    client.drop_collection(collection_name)
    print(f"기존 컬렉션 '{collection_name}' 삭제됨")

# 새 컬렉션 생성
collection = Collection(name=collection_name, schema=schema)

print(f"컬렉션 '{collection_name}' 생성 완료!")
print(f"현재 컬렉션 목록: {client.list_collections()}")


컬렉션 'demo_collection' 생성 완료!
현재 컬렉션 목록: ['test_collection', 'demo_collection']


In [5]:
# 샘플 데이터 생성
num_entities = 100

# 임의의 128차원 벡터 생성
embeddings = np.random.random((num_entities, 128)).tolist()

# 텍스트 메타데이터 생성
texts = [f"샘플 텍스트 {i}" for i in range(num_entities)]

# 카테고리 메타데이터 생성
categories = [f"category_{i % 5}" for i in range(num_entities)]

# 데이터 삽입 (id 필드는 auto_id=True이므로 제외)
data = [
    embeddings,  # embedding 필드
    texts,       # text 필드
    categories   # category 필드
]

# 데이터 삽입 실행
insert_result = collection.insert(data)
print(f"데이터 삽입 완료! 삽입된 entity 수: {len(insert_result.primary_keys)}")

# flush를 통해 데이터를 디스크에 저장
collection.flush()
print(f"컬렉션의 총 entity 수: {collection.num_entities}")


데이터 삽입 완료! 삽입된 entity 수: 100
컬렉션의 총 entity 수: 100


In [6]:
# 인덱스 파라미터 설정
index_params = {
    "metric_type": "L2",     # 거리 측정 방식 (L2, IP, COSINE 등)
    "index_type": "IVF_FLAT", # 인덱스 타입
    "params": {"nlist": 128}   # 클러스터 수
}

# embedding 필드에 인덱스 생성
collection.create_index("embedding", index_params)
print("인덱스 생성 완료!")

# 컬렉션을 메모리에 로드 (검색을 위해 필요)
collection.load()
print("컬렉션 로드 완료! 이제 검색 가능합니다.")


인덱스 생성 완료!
컬렉션 로드 완료! 이제 검색 가능합니다.


In [7]:
# 검색할 쿼리 벡터 생성
search_vectors = np.random.random((2, 128)).tolist()  # 2개의 쿼리 벡터

# 검색 파라미터 설정
search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 10}  # 검색할 클러스터 수
}

# 벡터 검색 실행
results = collection.search(
    data=search_vectors,           # 쿼리 벡터들
    anns_field="embedding",        # 검색할 벡터 필드
    param=search_params,           # 검색 파라미터
    limit=5,                       # 반환할 결과 수
    output_fields=["text", "category"]  # 함께 반환할 필드들
)

# 결과 출력
for i, result in enumerate(results):
    print(f"\n쿼리 {i+1}의 검색 결과:")
    for j, hit in enumerate(result):
        print(f"  순위 {j+1}: ID={hit.id}, 거리={hit.distance:.4f}")
        print(f"    텍스트: {hit.entity.get('text')}")
        print(f"    카테고리: {hit.entity.get('category')}")



쿼리 1의 검색 결과:
  순위 1: ID=458834835355992231, 거리=15.5446
    텍스트: 샘플 텍스트 1
    카테고리: category_1
  순위 2: ID=458834835355992252, 거리=17.6478
    텍스트: 샘플 텍스트 22
    카테고리: category_2
  순위 3: ID=458834835355992264, 거리=17.7503
    텍스트: 샘플 텍스트 34
    카테고리: category_4
  순위 4: ID=458834835355992327, 거리=18.0765
    텍스트: 샘플 텍스트 97
    카테고리: category_2
  순위 5: ID=458834835355992326, 거리=18.2815
    텍스트: 샘플 텍스트 96
    카테고리: category_1

쿼리 2의 검색 결과:
  순위 1: ID=458834835355992231, 거리=15.9250
    텍스트: 샘플 텍스트 1
    카테고리: category_1
  순위 2: ID=458834835355992251, 거리=16.0421
    텍스트: 샘플 텍스트 21
    카테고리: category_1
  순위 3: ID=458834835355992314, 거리=16.1887
    텍스트: 샘플 텍스트 84
    카테고리: category_4
  순위 4: ID=458834835355992320, 거리=16.4747
    텍스트: 샘플 텍스트 90
    카테고리: category_0
  순위 5: ID=458834835355992258, 거리=17.3435
    텍스트: 샘플 텍스트 28
    카테고리: category_3


In [8]:
# 특정 카테고리만 검색
filtered_results = collection.search(
    data=search_vectors[:1],       # 첫 번째 쿼리 벡터만 사용
    anns_field="embedding",
    param=search_params,
    limit=3,
    expr='category == "category_0"',  # 필터 조건
    output_fields=["text", "category"]
)

print("필터링된 검색 결과 (category_0만):")
for hit in filtered_results[0]:
    print(f"  ID={hit.id}, 거리={hit.distance:.4f}")
    print(f"    텍스트: {hit.entity.get('text')}")
    print(f"    카테고리: {hit.entity.get('category')}")


필터링된 검색 결과 (category_0만):
  ID=458834835355992300, 거리=19.0270
    텍스트: 샘플 텍스트 70
    카테고리: category_0
  ID=458834835355992240, 거리=19.0566
    텍스트: 샘플 텍스트 10
    카테고리: category_0
  ID=458834835355992230, 거리=19.1706
    텍스트: 샘플 텍스트 0
    카테고리: category_0


In [9]:
# 처음 5개 entity의 ID 조회
first_ids = insert_result.primary_keys[:5]
print(f"조회할 ID들: {first_ids}")

# ID로 데이터 조회
query_results = collection.query(
    expr=f"id in {first_ids}",
    output_fields=["id", "text", "category"]
)

print(f"\n조회된 데이터 {len(query_results)}개:")
for entity in query_results:
    print(f"  ID: {entity['id']}")
    print(f"  텍스트: {entity['text']}")
    print(f"  카테고리: {entity['category']}")
    print()


조회할 ID들: [458834835355992230, 458834835355992231, 458834835355992232, 458834835355992233, 458834835355992234]

조회된 데이터 5개:
  ID: 458834835355992230
  텍스트: 샘플 텍스트 0
  카테고리: category_0

  ID: 458834835355992231
  텍스트: 샘플 텍스트 1
  카테고리: category_1

  ID: 458834835355992232
  텍스트: 샘플 텍스트 2
  카테고리: category_2

  ID: 458834835355992233
  텍스트: 샘플 텍스트 3
  카테고리: category_3

  ID: 458834835355992234
  텍스트: 샘플 텍스트 4
  카테고리: category_4



In [11]:
# 삭제 전 entity 수 확인
print(f"삭제 전 entity 수: {collection.num_entities}")

# 특정 ID들 삭제
delete_ids = first_ids[:2]  # 처음 2개만 삭제
print(f"삭제할 ID들: {delete_ids}")

# 데이터 삭제 실행
collection.delete(expr=f"id in {delete_ids}")
collection.flush()

# ⚠️ 중요: num_entities는 즉시 변경되지 않습니다!
print(f"삭제 후 num_entities: {collection.num_entities}")
print("※ num_entities는 물리적 삭제 완료 후 변경됩니다.")

# 삭제 확인: 삭제된 데이터 조회 시도
try:
    deleted_check = collection.query(
        expr=f"id in {delete_ids}",
        output_fields=["id", "text"]
    )
    print(f"삭제된 데이터 조회 결과: {len(deleted_check)}개 (논리적으로 삭제됨)")
except Exception as e:
    print(f"삭제된 데이터 조회 실패: {e}")

# 실제 삭제 확인: 검색으로 확인
search_result = collection.search(
    data=[embeddings[0]],  # 첫 번째 벡터로 검색
    anns_field="embedding",
    param={"metric_type": "L2", "params": {"nprobe": 10}},
    limit=10,
    output_fields=["id", "text"]
)

# 삭제된 ID가 검색 결과에 없는지 확인
found_deleted_ids = [hit.id for hit in search_result[0] if hit.id in delete_ids]
print(f"검색 결과에서 삭제된 ID 발견: {found_deleted_ids}")
print("→ 빈 리스트면 삭제가 정상적으로 동작한 것입니다.")


삭제 전 entity 수: 100
삭제 후 entity 수: 100
삭제된 ID들: [458834835355992230, 458834835355992231]


In [None]:
# Compaction을 수동으로 실행하여 물리적 삭제 강제 실행
print("=== Manual Compaction 실행 ===")

# 컬렉션 언로드 (compaction을 위해 필요)
collection.release()
print("컬렉션 언로드 완료")

# Compaction 실행 (물리적 삭제 수행)
try:
    # utility.compact 사용
    from pymilvus import utility
    compaction_id = utility.compact(collection_name)
    print(f"Compaction 시작: ID={compaction_id}")
    
    # Compaction 상태 확인
    state = utility.get_compaction_state(compaction_id)
    print(f"Compaction 상태: {state}")
    
    # Compaction 완료 대기 (실제 운영에서는 더 긴 시간 필요)
    import time
    time.sleep(2)
    
except Exception as e:
    print(f"Compaction 실행 중 오류: {e}")

# 컬렉션 다시 로드
collection.load()
print("컬렉션 재로드 완료")

# Compaction 후 entity 수 확인
print(f"Compaction 후 entity 수: {collection.num_entities}")
print("※ 실제 환경에서는 compaction이 완료되기까지 시간이 걸릴 수 있습니다.")


In [12]:
# 컬렉션 언로드 (메모리에서 제거)
collection.release()
print("컬렉션 언로드 완료!")

# 연결 해제
connections.disconnect("default")
print("연결 해제 완료!")

print("\n=== Milvus 사용법 튜토리얼 완료 ===")
print("주요 배운 내용:")
print("1. Milvus 클라이언트 연결")
print("2. 컬렉션 스키마 정의 및 생성")
print("3. 벡터 데이터 삽입")
print("4. 인덱스 생성 및 컬렉션 로드")
print("5. 벡터 유사도 검색")
print("6. 조건부 필터링 검색")
print("7. 데이터 조회 및 삭제")
print("8. 리소스 정리")


컬렉션 언로드 완료!
연결 해제 완료!

=== Milvus 사용법 튜토리얼 완료 ===
주요 배운 내용:
1. Milvus 클라이언트 연결
2. 컬렉션 스키마 정의 및 생성
3. 벡터 데이터 삽입
4. 인덱스 생성 및 컬렉션 로드
5. 벡터 유사도 검색
6. 조건부 필터링 검색
7. 데이터 조회 및 삭제
8. 리소스 정리
