In [None]:
# !pip install FlagEmbedding pymilvus "pymilvus[model]"

# V1 - 사전 임베딩(한글)

In [10]:
from FlagEmbedding import BGEM3FlagModel
import numpy as np
import json, pickle

# 모델 초기화 (CPU/GPU 설정)
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

# with open("../ServiceExtraction/integration/0.1.5_embedding_all.json", "r") as f:
#     services = json.load(f)

with open("../ServiceExtraction/integration/service_list_ver1.1.6.json", "r") as f:
    services = {}
    data = json.load(f)
    for device_id, device_info in data.items():
        services[device_id] = f"{device_info["info"]}; {";".join(device_info["examples"])}"
    # print(services)

keys = list(services.keys())
texts = list(services.values())

# 임베딩 생성 (배치 처리)
batch_size = 32
dense_embeddings = []
sparse_embeddings = []
colbert_embeddings = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i+batch_size]
    outputs = model.encode(
        batch, 
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=True  # ColBERT 활성화
    )
    dense_embeddings.extend(outputs['dense_vecs'])
    sparse_embeddings.extend(outputs['lexical_weights'])
    colbert_embeddings.extend(outputs['colbert_vecs'])

# ColBERT 벡터 저장 전처리
def process_colbert(embeddings):
    """3D 배열을 저장 가능한 형태로 변환"""
    return [emb.astype(np.float16) for emb in embeddings]  # 절반의 저장 공간 절약

processed_colbert = process_colbert(colbert_embeddings)

# 변환 함수 확장
def convert_to_serializable(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.float16):
        return float(obj)
    elif isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    else:
        return obj

# 저장
np.save('./embedding_result_v1/dense_embeddings.npy', np.array(dense_embeddings))
# ColBERT 벡터 압축 저장
with open('./embedding_result_v1/colbert_embeddings.pkl', 'wb') as f:
    pickle.dump(processed_colbert, f)

# float32 → float 로 강제 변환
def convert_to_serializable(obj):
    if isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    elif isinstance(obj, np.float32):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    else:
        return obj

serializable_sparse = convert_to_serializable(sparse_embeddings)

with open('./embedding_result_v1/sparse_embeddings.json', 'w') as f:
    json.dump(serializable_sparse, f, indent=2)
    
# 메타데이터 저장 (ColBERT 정보 추가)
metadata = {
    'keys': keys,
    'texts': texts,
    'colbert_shapes': [emb.shape for emb in processed_colbert]  # 원본 형태 정보
}
with open('./embedding_result_v1/metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)


Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 144134.16it/s]
You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


# V4 - 사전 임베딩(영어)

In [3]:
from FlagEmbedding import BGEM3FlagModel
import numpy as np
import json, pickle

# 모델 초기화 (CPU/GPU 설정)
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

# with open("../ServiceExtraction/integration/0.1.5_embedding_all.json", "r") as f:
#     services = json.load(f)

def optimized_weighted_string(device_info, name_weight=3):
    # 시작: names 강화
    start_names = [name for name in device_info["names"] for _ in range(name_weight)]
    
    # 중간: expressions
    middle_exprs = list(set(device_info["expressions"]))
    
    # 끝: names 재강화  
    end_names = [name for name in device_info["names"] for _ in range(2)]
    
    return ". ".join(start_names + middle_exprs + end_names)

# def optimized_weighted_string(device_info, name_weight=3, explanation_weight=2):
#     # 1. 이름 강조 (앞, 중간, 끝)
#     names = list(set(device_info.get("names", [])))
#     name_tokens = [name for name in names for _ in range(name_weight)]

#     # 2. 설명 추가 (weight 적용)
#     explanation = device_info.get("explanation", "")
#     explanation_tokens = [explanation] * explanation_weight if explanation else []

#     # 3. 표현 다양화
#     exprs = list(set(device_info.get("expressions", [])))

#     # 4. 혼합
#     parts = name_tokens + explanation_tokens + exprs + names
#     return ". ".join(parts)


with open("../ServiceExtraction/integration/service_list_ver1.1.6.5.json", "r") as f:
    services = {}
    data = json.load(f)
    for device_id, device_info in data.items():
        services[device_id] = optimized_weighted_string(device_info)
    # print(services)

keys = list(services.keys())
texts = list(services.values())

# 임베딩 생성 (배치 처리)
batch_size = 32
dense_embeddings = []
sparse_embeddings = []
colbert_embeddings = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i+batch_size]
    outputs = model.encode(
        batch, 
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=True  # ColBERT 활성화
    )
    dense_embeddings.extend(outputs['dense_vecs'])
    sparse_embeddings.extend(outputs['lexical_weights'])
    colbert_embeddings.extend(outputs['colbert_vecs'])

# ColBERT 벡터 저장 전처리
def process_colbert(embeddings):
    """3D 배열을 저장 가능한 형태로 변환"""
    return [emb.astype(np.float16) for emb in embeddings]  # 절반의 저장 공간 절약

processed_colbert = process_colbert(colbert_embeddings)

# 변환 함수 확장
def convert_to_serializable(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.float16):
        return float(obj)
    elif isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    else:
        return obj

# 저장
np.save('./embedding_result_v4/dense_embeddings.npy', np.array(dense_embeddings))
# ColBERT 벡터 압축 저장
with open('./embedding_result_v4/colbert_embeddings.pkl', 'wb') as f:
    pickle.dump(processed_colbert, f)

# float32 → float 로 강제 변환
def convert_to_serializable(obj):
    if isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    elif isinstance(obj, np.float32):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    else:
        return obj

serializable_sparse = convert_to_serializable(sparse_embeddings)

with open('./embedding_result_v4/sparse_embeddings.json', 'w') as f:
    json.dump(serializable_sparse, f, indent=2)
    
# 메타데이터 저장 (ColBERT 정보 추가)
metadata = {
    'keys': keys,
    'texts': texts,
    'colbert_shapes': [emb.shape for emb in processed_colbert]  # 원본 형태 정보
}
with open('./embedding_result_v4/metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)


Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 144465.12it/s]
You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


# V2 - 사전 임베딩(영어) & 각 표현별 벡터 생성

In [6]:
from FlagEmbedding import BGEM3FlagModel
import numpy as np
import json, pickle
from tqdm import tqdm
import math

# 모델 초기화 (CPU/GPU 설정)
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

# 데이터 로드
with open("../ServiceExtraction/integration/service_list_ver1.1.6.2.json", "r") as f:
    services = {}
    data = json.load(f)
    for device_id, device_info in data.items():
        services[device_id] = ";".join([expr for expr in device_info])

keys = list(services.keys())
texts = list(services.values())

print(f"총 {len(texts)}개의 텍스트를 임베딩합니다.")

# 임베딩 생성 (배치 처리 + tqdm)
batch_size = 32
total_batches = math.ceil(len(texts) / batch_size)

dense_embeddings = []
sparse_embeddings = []
colbert_embeddings = []

# tqdm으로 진행률 표시
with tqdm(total=total_batches, desc="임베딩 생성", unit="batch") as pbar:
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        
        # 현재 배치 정보 업데이트
        current_batch = i // batch_size + 1
        pbar.set_postfix({
            'batch': f"{current_batch}/{total_batches}",
            'items': f"{len(batch)}"
        })
        
        try:
            outputs = model.encode(
                batch, 
                return_dense=True,
                return_sparse=True,
                return_colbert_vecs=True
            )
            
            dense_embeddings.extend(outputs['dense_vecs'])
            sparse_embeddings.extend(outputs['lexical_weights'])
            colbert_embeddings.extend(outputs['colbert_vecs'])
            
        except Exception as e:
            print(f"\n배치 {current_batch} 처리 중 오류 발생: {e}")
            continue
        
        pbar.update(1)

print(f"\n임베딩 완료: {len(dense_embeddings)}개 벡터 생성")

# ColBERT 벡터 저장 전처리
def process_colbert(embeddings):
    """3D 배열을 저장 가능한 형태로 변환"""
    print("ColBERT 벡터 처리 중...")
    processed = []
    for emb in tqdm(embeddings, desc="ColBERT 처리", unit="vector"):
        processed.append(emb.astype(np.float16))
    return processed

processed_colbert = process_colbert(colbert_embeddings)

# 변환 함수
def convert_to_serializable(obj):
    if isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    elif isinstance(obj, np.float32):
        return float(obj)
    elif isinstance(obj, np.float16):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    else:
        return obj

# 저장 (진행률 표시 포함)
print("\n임베딩 결과 저장 중...")

# Dense 임베딩 저장
print("Dense 임베딩 저장...")
np.save('./embedding_result_v4/dense_embeddings.npy', np.array(dense_embeddings))

# ColBERT 벡터 압축 저장
print("ColBERT 임베딩 저장...")
with open('./embedding_result_v4/colbert_embeddings.pkl', 'wb') as f:
    pickle.dump(processed_colbert, f)

# Sparse 임베딩 저장
print("Sparse 임베딩 변환 및 저장...")
serializable_sparse = convert_to_serializable(sparse_embeddings)

with open('./embedding_result_v4/sparse_embeddings.json', 'w') as f:
    json.dump(serializable_sparse, f, indent=2)

# 메타데이터 저장
print("메타데이터 저장...")
metadata = {
    'keys': keys,
    'texts': texts,
    'total_count': len(texts),
    'dense_shape': np.array(dense_embeddings).shape,
    'colbert_shapes': [emb.shape for emb in processed_colbert],
    'batch_size': batch_size
}

with open('./embedding_result_v4/metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print("\n=== 임베딩 완료 ===")
print(f"- 총 텍스트 수: {len(texts)}")
print(f"- Dense 임베딩 크기: {np.array(dense_embeddings).shape}")
print(f"- Sparse 임베딩 수: {len(sparse_embeddings)}")
print(f"- ColBERT 임베딩 수: {len(processed_colbert)}")
print(f"- 배치 크기: {batch_size}")
print(f"- 저장 경로: ./embedding_result_v4/")


Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 227951.30it/s]


총 51개의 텍스트를 임베딩합니다.


임베딩 생성:   0%|          | 0/2 [00:00<?, ?batch/s, batch=1/2, items=32]You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.
임베딩 생성: 100%|██████████| 2/2 [03:25<00:00, 102.56s/batch, batch=2/2, items=19]



임베딩 완료: 51개 벡터 생성
ColBERT 벡터 처리 중...


ColBERT 처리: 100%|██████████| 51/51 [00:00<00:00, 527.41vector/s]


임베딩 결과 저장 중...
Dense 임베딩 저장...
ColBERT 임베딩 저장...
Sparse 임베딩 변환 및 저장...
메타데이터 저장...

=== 임베딩 완료 ===
- 총 텍스트 수: 51
- Dense 임베딩 크기: (51, 1024)
- Sparse 임베딩 수: 51
- ColBERT 임베딩 수: 51
- 배치 크기: 32
- 저장 경로: ./embedding_result_v4/





# V3 - 사전 임베딩(영어) & 디바이스 이름 가중치

In [8]:
from FlagEmbedding import BGEM3FlagModel
import numpy as np
import json, pickle

# 모델 초기화 (CPU/GPU 설정)
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

# 새로운 구조의 디바이스 정의 파일 로드
with open("../ServiceExtraction/integration/service_list_ver1.1.6.3.json", "r", encoding="utf-8") as f:
    device_definitions = json.load(f)

# 가중치 설정
def create_weighted_expressions(device_data, name_weight=5, expr_weight=1):
    """names는 높은 가중치, expressions는 기본 가중치"""
    weighted_list = []
    
    # names: 높은 가중치 (중복 삽입)
    if "names" in device_data:
        for name in device_data["names"]:
            weighted_list.extend([name] * name_weight)
    
    # expressions: 기본 가중치
    if "expressions" in device_data:
        for expr in device_data["expressions"]:
            weighted_list.extend([expr] * expr_weight)
    
    return weighted_list

# 가중치가 적용된 표현 리스트 생성
device_expressions_weighted = {}
print("가중치 적용된 표현 생성 중...")

for device_type, device_data in device_definitions.items():
    weighted_expressions = create_weighted_expressions(device_data)
    device_expressions_weighted[device_type] = weighted_expressions
    print(f"{device_type}: {len(weighted_expressions)}개 표현 (가중치 적용)")

# 각 표현별로 메타데이터 생성 (수정된 부분)
all_texts = []
all_metadata = []

for device_type, expressions in device_expressions_weighted.items():  # 수정: device_expressions → device_expressions_weighted
    for expression in expressions:
        all_texts.append(expression)
        all_metadata.append({
            "device_type": device_type,
            "expression": expression,
            "device_id": f"{device_type}_{len(all_metadata)}",
            "is_weighted": True
        })

print(f"총 {len(all_texts)}개의 표현을 임베딩합니다. (가중치 적용 후)")

# 임베딩 생성 (배치 처리)
batch_size = 32
dense_embeddings = []
sparse_embeddings = []
colbert_embeddings = []

for i in range(0, len(all_texts), batch_size):
    batch = all_texts[i:i+batch_size]
    print(f"배치 {i//batch_size + 1}/{(len(all_texts)-1)//batch_size + 1} 처리 중...")
    
    outputs = model.encode(
        batch, 
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=True
    )
    
    dense_embeddings.extend(outputs['dense_vecs'])
    sparse_embeddings.extend(outputs['lexical_weights'])
    colbert_embeddings.extend(outputs['colbert_vecs'])

# ColBERT 벡터 저장 전처리
def process_colbert(embeddings):
    """3D 배열을 저장 가능한 형태로 변환"""
    return [emb.astype(np.float16) for emb in embeddings]

processed_colbert = process_colbert(colbert_embeddings)

# 변환 함수
def convert_to_serializable(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.float16):
        return float(obj)
    elif isinstance(obj, np.float32):
        return float(obj)
    elif isinstance(obj, dict):
        return {k: convert_to_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_serializable(i) for i in obj]
    else:
        return obj

# 저장
print("임베딩 결과 저장 중...")

# Dense 임베딩 저장
np.save('./embedding_result_v3/dense_embeddings.npy', np.array(dense_embeddings))

# ColBERT 벡터 압축 저장
with open('./embedding_result_v3/colbert_embeddings.pkl', 'wb') as f:
    pickle.dump(processed_colbert, f)

# Sparse 임베딩 저장 (경로 수정)
serializable_sparse = convert_to_serializable(sparse_embeddings)
with open('./embedding_result_v3/sparse_embeddings.json', 'w', encoding='utf-8') as f:  # 수정: 경로 통일
    json.dump(serializable_sparse, f, indent=2, ensure_ascii=False)

# 메타데이터 저장 (경로 수정)
metadata = {
    'total_expressions': len(all_texts),
    'expressions': all_texts,
    'metadata': all_metadata,
    'device_types': list(device_expressions_weighted.keys()),
    'colbert_shapes': [emb.shape for emb in processed_colbert],
    'weighted_counts': {device: len(exprs) for device, exprs in device_expressions_weighted.items()}
}

with open('./embedding_result_v3/metadata.json', 'w', encoding='utf-8') as f:  # 수정: 경로 통일
    json.dump(metadata, f, indent=2, ensure_ascii=False)

# 기기별 인덱스 매핑 저장 (경로 수정)
device_index_mapping = {}
for idx, meta in enumerate(all_metadata):
    device_type = meta['device_type']
    if device_type not in device_index_mapping:
        device_index_mapping[device_type] = []
    device_index_mapping[device_type].append(idx)

with open('./embedding_result_v3/device_index_mapping.json', 'w', encoding='utf-8') as f:  # 수정: 경로 통일
    json.dump(device_index_mapping, f, indent=2, ensure_ascii=False)

# 가중치 적용 통계 출력
print("\n=== 임베딩 완료 ===")
print(f"- 총 표현 수: {len(all_texts)} (가중치 적용 후)")
print(f"- 기기 종류 수: {len(device_expressions_weighted)}")
print(f"- Dense 임베딩 크기: {np.array(dense_embeddings).shape}")

print("\n=== 기기별 가중치 적용 결과 ===")
for device_type, count in metadata['weighted_counts'].items():
    original_count = len(device_definitions.get(device_type, {}).get('names', [])) + len(device_definitions.get(device_type, {}).get('expressions', []))
    weight_ratio = count / original_count if original_count > 0 else 0
    print(f"{device_type}: {original_count} → {count} (x{weight_ratio:.1f})")


Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 181833.99it/s]
You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


가중치 적용된 표현 생성 중...
AirConditioner: 114개 표현 (가중치 적용)
AirPurifier: 72개 표현 (가중치 적용)
AirQualityDetector: 86개 표현 (가중치 적용)
Alarm: 88개 표현 (가중치 적용)
Blind: 90개 표현 (가중치 적용)
Button: 99개 표현 (가중치 적용)
Buttonx4: 131개 표현 (가중치 적용)
Calculator: 80개 표현 (가중치 적용)
Camera: 91개 표현 (가중치 적용)
Charger: 101개 표현 (가중치 적용)
Clock: 121개 표현 (가중치 적용)
ContactSensor: 81개 표현 (가중치 적용)
Curtain: 72개 표현 (가중치 적용)
Dehumidifier: 109개 표현 (가중치 적용)
Dishwasher: 101개 표현 (가중치 적용)
DoorLock: 88개 표현 (가중치 적용)
EmailProvider: 87개 표현 (가중치 적용)
Fan: 96개 표현 (가중치 적용)
Feeder: 116개 표현 (가중치 적용)
GasMeter: 74개 표현 (가중치 적용)
GasValve: 69개 표현 (가중치 적용)
Humidifier: 88개 표현 (가중치 적용)
HumiditySensor: 80개 표현 (가중치 적용)
Irrigator: 116개 표현 (가중치 적용)
LeakSensor: 76개 표현 (가중치 적용)
Light: 122개 표현 (가중치 적용)
LightSensor: 75개 표현 (가중치 적용)
MenuProvider: 76개 표현 (가중치 적용)
MotionSensor: 79개 표현 (가중치 적용)
PresenceSensor: 85개 표현 (가중치 적용)
Pump: 104개 표현 (가중치 적용)
Refrigerator: 101개 표현 (가중치 적용)
RobotCleaner: 102개 표현 (가중치 적용)
Shade: 102개 표현 (가중치 적용)
Siren: 97개 표현 (가중치 적용)
SmartPlug: 103개 표현 (

# 호출

##  dense_vecs (문장 전체 평균)만 활용

In [None]:
import numpy as np
import json, pickle
from sklearn.metrics.pairwise import cosine_similarity

# 임베딩 데이터 로드
dense_embeddings = np.load('./embedding_result_v1/dense_embeddings.npy')
with open('./embedding_result_v1/sparse_embeddings.json') as f:
    sparse_embeddings = json.load(f)
with open('./embedding_result_v1/metadata.json') as f:
    metadata = json.load(f)

# 모델 초기화 (CPU 전용)
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False)


Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 149796.57it/s]


In [10]:

def recommend_services(query, top_k=10):
    # 쿼리 임베딩 생성
    query_dense = model.encode([query], return_dense=True)['dense_vecs'][0]
    
    # 유사도 계산
    dense_scores = cosine_similarity([query_dense], dense_embeddings)[0]
    
    # 상위 K개 결과 추출
    top_indices = np.argsort(dense_scores)[-top_k:][::-1]
    
    return [
        {
            'key': metadata['keys'][i],
            'text': metadata['texts'][i],
            'score': float(dense_scores[i])
        }
        for i in top_indices
    ]

# 사용자 입력 처리
user_query = "기온이 30도 이상이면 커튼을 닫고 에어컨을 틀어줘"
results = recommend_services(user_query)

# 결과 출력
print(f"추천 서비스 (쿼리: '{user_query}'):")
for idx, result in enumerate(results, 1):
    print(f"{idx}. {result['key']} (유사도: {result['score']:.4f})")
    print(f"   내용: {result['text'][:50]}...")

추천 서비스 (쿼리: '기온이 30도 이상이면 커튼을 닫고 에어컨을 틀어줘'):
1. AirConditioner (유사도: 0.6530)
   내용: 에어컨은 냉방, 난방, 제습, 송풍 등 다양한 모드로 실내 온도와 습도를 조절하는 기기입니...
2. Curtain (유사도: 0.6228)
   내용: 커튼은 열고 닫고 멈추는 기능을 가진 장치로, 햇빛 조절이나 사생활 보호를 위해 사용됩니다...
3. Blind (유사도: 0.5736)
   내용: 블라인드(커튼)는 창문을 덮거나 열어 햇빛의 양을 조절하고 사생활을 보호하는 장치입니다. ...
4. TemperatureSensor (유사도: 0.5592)
   내용: 온도 센서는 현재 환경의 온도를 측정하여 수치로 제공합니다. 주로 특정 온도 조건에 따라 ...
5. Humidifier (유사도: 0.5570)
   내용: 가습기는 실내 습도를 조절해주는 장치로, 전원을 켜고 끌 수 있으며 자동, 약풍, 중간, ...
6. Fan (유사도: 0.5387)
   내용: 선풍기나 환풍기 등의 팬 장치는 전원을 켜고 끌 수 있으며, 풍속을 RPM이나 퍼센트로 조...
7. WeatherProvider (유사도: 0.5333)
   내용: 날씨 제공 장치는 현재 기온, 습도, 기압, 미세먼지 수치, 날씨 상태 등을 바탕으로 자동...
8. DoorLock (유사도: 0.5332)
   내용: 도어락은 문을 원격으로 열고 닫을 수 있는 기기로, 현재 문이 열려 있는지 닫혀 있는지도 ...
9. AirPurifier (유사도: 0.5262)
   내용: 공기청정기는 실내 공기 중의 먼지, 미세먼지, 냄새 등을 줄여 쾌적한 환경을 만들어 주는 ...
10. Window (유사도: 0.5209)
   내용: 창문은 열림, 닫힘, 또는 상태를 알 수 없는 상태로 존재하며, 환기나 보안, 환경 조건에...


## colbert_vecs를 활용한 dense token-level 다중 검색

In [None]:
import numpy as np
import json, pickle
from sklearn.metrics.pairwise import cosine_similarity
from FlagEmbedding import BGEM3FlagModel

# 모델 및 데이터 초기화
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False)  # CPU 환경

# 임베딩 데이터 로드 (ColBERT 추가)
dense_embeddings = np.load('./embedding_result_v1/dense_embeddings.npy')
# colbert_data = np.load('./embedding_result_v1/colbert_embeddings.npz', allow_pickle=True)
# colbert_embeddings = [emb.astype(np.float32) for emb in colbert_data['colbert']]  # float32로 변환
with open('./embedding_result_v1/colbert_embeddings.pkl', 'rb') as f:
    colbert_embeddings = pickle.load(f)

with open('./embedding_result_v1/sparse_embeddings.json') as f:
    sparse_embeddings = json.load(f)
    
with open('./embedding_result_v1/metadata.json') as f:
    metadata = json.load(f)



Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 181309.97it/s]


In [57]:
# ColBERT 유사도 계산 함수
# 1) 평균 맥스심 점수
def colbert_maxsim(query_vec, doc_vecs):
    """
    query_vec: [query_tokens, dim]
    doc_vecs: [doc_tokens, dim]
    """
    sim_matrix = cosine_similarity(query_vec, doc_vecs)
    return np.max(sim_matrix, axis=1).mean()  

# 2) Softmax MaxSim
def colbert_softmax_maxsim(query_vec, doc_vecs, temperature=0.05):
    sim_matrix = cosine_similarity(query_vec, doc_vecs)
    max_sim = np.max(sim_matrix, axis=1)
    weights = np.exp(max_sim / temperature)
    weights /= np.sum(weights)
    return np.sum(weights * max_sim)

# 하이브리드 추천 함수
def hybrid_recommend(query, top_k=10, max_k=15, weights=(0.6, 0.3, 0.1)):
    # 쿼리 임베딩 생성
    query_emb = model.encode(
        [query], 
        return_dense=True,
        return_sparse=True,
        return_colbert_vecs=True
    )
    
    # 각 유사도 계산
    dense_scores = cosine_similarity([query_emb['dense_vecs'][0]], dense_embeddings)[0]
    
    sparse_scores = []
    query_weights = query_emb['lexical_weights'][0]  # dict

    for doc_weights in sparse_embeddings:  # 문서별 sparse dict
        score = sum(query_weights.get(token, 0) * doc_weights.get(token, 0) for token in query_weights)
        sparse_scores.append(score)
    
    colbert_scores = [
        colbert_softmax_maxsim(query_emb['colbert_vecs'][0], doc_emb)
        for doc_emb in colbert_embeddings
    ]
    
    # 점수 정규화 및 결합
    max_score = max(dense_scores.max(), 1e-6)
    combined_scores = (
        weights[0] * dense_scores/max_score +
        weights[1] * np.array(sparse_scores) +
        weights[2] * np.array(colbert_scores)
    )
    
    # # 상위 K개 추출
    # top_indices = np.argsort(combined_scores)[-top_k:][::-1]

    # 유사 결과 많을 경우 확장
    sorted_indices = np.argsort(combined_scores)[::-1]

    gap_threshold = 0.01
    initial_top = 5
    top_indices = [sorted_indices[0]]

    for i in range(1, len(sorted_indices)):
        if len(top_indices) >= max_k:
            break
        prev = combined_scores[top_indices[-1]]
        curr = combined_scores[sorted_indices[i]]
        if curr >= 0.5 or abs(prev - curr) <= gap_threshold or len(top_indices) < initial_top:
            top_indices.append(sorted_indices[i])
        else:
            break

    return [{
        'key': metadata['keys'][i],
        'text': metadata['texts'][i],
        'dense_score': float(dense_scores[i]),
        'sparse_score': float(sparse_scores[i]),
        'colbert_score': float(colbert_scores[i]),
        'combined_score': float(combined_scores[i])
    } for i in top_indices]



In [58]:
# 추천 실행
results = hybrid_recommend(
    "날씨가 맑고, 기온이 30도 이상이면 커튼을 닫고 에어컨을 틀고, 미세먼지가 나쁘면 창문을 닫고 공기청정기를 켜줘.", 
    top_k=10)

# 결과 출력
print("추천 서비스:")
for idx, item in enumerate(results, 1):
    print(f"{idx}. {item['key']}")
    print(f"   종합 점수: {item['combined_score']:.4f}")
    print(f"   Dense: {item['dense_score']:.3f}, Sparse: {item['sparse_score']:.3f}, ColBERT: {item['colbert_score']:.3f}")


추천 서비스:
1. AirPurifier
   종합 점수: 0.7494
   Dense: 0.703, Sparse: 0.272, ColBERT: 0.793
2. AirConditioner
   종합 점수: 0.7357
   Dense: 0.717, Sparse: 0.185, ColBERT: 0.802
3. Curtain
   종합 점수: 0.7140
   Dense: 0.693, Sparse: 0.178, ColBERT: 0.809
4. WeatherProvider
   종합 점수: 0.6962
   Dense: 0.659, Sparse: 0.218, ColBERT: 0.791
5. AirQualityDetector
   종합 점수: 0.6761
   Dense: 0.629, Sparse: 0.231, ColBERT: 0.801
6. Blind
   종합 점수: 0.6617
   Dense: 0.628, Sparse: 0.186, ColBERT: 0.798
7. TemperatureSensor
   종합 점수: 0.6356
   Dense: 0.620, Sparse: 0.114, ColBERT: 0.824
8. Humidifier
   종합 점수: 0.6317
   Dense: 0.639, Sparse: 0.056, ColBERT: 0.804
9. Window
   종합 점수: 0.6238
   Dense: 0.619, Sparse: 0.082, ColBERT: 0.809
10. Dehumidifier
   종합 점수: 0.6005
   Dense: 0.589, Sparse: 0.093, ColBERT: 0.796
11. DoorLock
   종합 점수: 0.5989
   Dense: 0.563, Sparse: 0.154, ColBERT: 0.816
12. Pump
   종합 점수: 0.5944
   Dense: 0.575, Sparse: 0.116, ColBERT: 0.782
13. Fan
   종합 점수: 0.5797
   Dense: 0.574, Spar