# Cohere Embed v4 + OpenSearch Serverless 멀티모달 임베딩 솔루션

이 노트북은 Cohere Embed v4와 OpenSearch Serverless를 사용하여 이미지와 텍스트를 결합한 멀티모달 임베딩 솔루션을 구현합니다.

In [6]:
# 라이브러리 설치 및 임포트
!pip install boto3>=1.34.0 opensearch-py>=2.4.0 requests-aws4auth>=1.2.3 pandas>=2.0.0 Pillow>=10.0.0 python-dotenv>=1.0.0

import boto3
import json
import base64
import pandas as pd
from PIL import Image
import io
import os
from datetime import datetime
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from dotenv import load_dotenv

# .env 파일 로드
load_dotenv()

# 설정
REGION = 'us-east-1'
INDEX_NAME = 'fashion-items-1'

# AWS 클라이언트 생성
bedrock_client = boto3.client('bedrock-runtime', region_name=REGION)
opensearch_client = boto3.client('opensearchserverless', region_name=REGION)
session = boto3.Session()
credentials = session.get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, REGION, 'aoss', session_token=credentials.token)

print("✅ 환경 설정 완료")

✅ 환경 설정 완료


In [7]:
# .env에서 엔드포인트 읽기
EXISTING_ENDPOINT = os.getenv('OPENSEARCH_ENDPOINT')

def get_or_create_collection():
    if EXISTING_ENDPOINT:
        print(f"✅ .env에서 엔드포인트 로드: {EXISTING_ENDPOINT[:30]}...")
        return EXISTING_ENDPOINT
    
    try:
        print("🔍 활성 OpenSearch 컬렉션 검색 중...")
        collections = opensearch_client.list_collections()
        for collection in collections["collectionSummaries"]:
            if collection["status"] == "ACTIVE":
                endpoint = f"https://{collection["id"]}.{REGION}.aoss.amazonaws.com"
                print(f"✅ 활성 컬렉션 발견: {collection["name"]} -> {endpoint[:30]}...")
                return endpoint
        
        print("❌ 활성 컬렉션을 찾을 수 없습니다.")
        print("💡 .env 파일에 OPENSEARCH_ENDPOINT를 설정하거나 OpenSearch 컬렉션을 생성해주세요.")
        return None
    except Exception as e:
        print(f"❌ 컬렉션 확인 실패: {e}")
        print("💡 .env 파일에 OPENSEARCH_ENDPOINT를 설정해주세요.")
        return None

ENDPOINT = get_or_create_collection()
if not ENDPOINT:
    raise Exception("OpenSearch 엔드포인트를 찾을 수 없습니다. .env 파일을 확인하거나 컬렉션을 생성해주세요.")

print(f"🎯 사용할 엔드포인트: {ENDPOINT[:30]}...")

🔍 활성 OpenSearch 컬렉션 검색 중...
✅ 활성 컬렉션 발견: bedrock-knowledge-base-9wf97u -> https://3ojsgyty8n9k800j7iu4.u...
🎯 사용할 엔드포인트: https://3ojsgyty8n9k800j7iu4.u...


In [8]:
# 데이터 액세스 권한 설정
def setup_data_access_policy():
    try:
        # 현재 사용자 ARN 가져오기
        sts = boto3.client('sts')
        identity = sts.get_caller_identity()
        user_arn = identity['Arn']
        
        # 데이터 액세스 정책
        policy = [
            {
                "Rules": [
                    {
                        "ResourceType": "index",
                        "Resource": ["index/*/*"],
                        "Permission": [
                            "aoss:CreateIndex", "aoss:DeleteIndex", "aoss:UpdateIndex",
                            "aoss:DescribeIndex", "aoss:ReadDocument", "aoss:WriteDocument"
                        ]
                    },
                    {
                        "ResourceType": "collection",
                        "Resource": ["collection/*"],
                        "Permission": ["aoss:CreateCollectionItems"]
                    }
                ],
                "Principal": [user_arn]
            }
        ]
        
        opensearch_client.create_access_policy(
            name='multimodal-data-access-policy',
            type='data',
            policy=json.dumps(policy)
        )
        print(f"✅ 데이터 액세스 정책 생성 완료: {user_arn}")
    except Exception as e:
        if 'ConflictException' in str(e):
            print("ℹ️ 데이터 액세스 정책이 이미 존재합니다")
        else:
            print(f"⚠️ 정책 설정 실패: {e}")

setup_data_access_policy()

ℹ️ 데이터 액세스 정책이 이미 존재합니다


In [9]:
# OpenSearch 클라이언트 및 인덱스 생성
client = OpenSearch(
    hosts=[{'host': ENDPOINT.replace('https://', ''), 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# 1536 차원 인덱스 매핑
mapping = {
    "settings": {"index": {"knn": True}},
    "mappings": {
        "properties": {
            "item_id": {"type": "keyword"},
            "title": {"type": "text"},
            "description": {"type": "text"},
            "category": {"type": "keyword"},
            "brand": {"type": "keyword"},
            "price": {"type": "float"},
            "color": {"type": "keyword"},
            "multimodal_embedding": {
                "type": "knn_vector",
                "dimension": 1536,
                "method": {"name": "hnsw", "space_type": "cosinesimil", "engine": "nmslib"}
            }
        }
    }
}

if client.indices.exists(index=INDEX_NAME):
    client.indices.delete(index=INDEX_NAME)
client.indices.create(index=INDEX_NAME, body=mapping)
print("1536 차원 인덱스 생성 완료")

1536 차원 인덱스 생성 완료


In [10]:
# 샘플 데이터 생성
os.makedirs('data/images', exist_ok=True)

sample_items = [
    {'item_id': 'item_001', 'title': '블루 데님 재킷', 'description': '캐주얼한 스타일의 클래식 블루 데님 재킷', 'category': '아우터', 'brand': '스타일코', 'price': 89.99, 'color': '블루'},
    {'item_id': 'item_002', 'title': '블랙 이브닝 드레스', 'description': '우아한 블랙 드레스로 특별한 행사에 적합', 'category': '드레스', 'brand': '엘레간스', 'price': 159.99, 'color': '블랙'},
    {'item_id': 'item_003', 'title': '화이트 스니커즈', 'description': '편안하고 스타일리시한 화이트 운동화', 'category': '신발', 'brand': '컴포트스텝', 'price': 79.99, 'color': '화이트'},
    {'item_id': 'item_004', 'title': '레드 울 스웨터', 'description': '따뜻한 겨울용 빨간 울 스웨터', 'category': '상의', 'brand': '웜웨어', 'price': 69.99, 'color': '레드'},
    {'item_id': 'item_005', 'title': '브라운 가죽 핸드백', 'description': '프리미엄 브라운 가죽 핸드백', 'category': '액세서리', 'brand': '레더크래프트', 'price': 129.99, 'color': '브라운'}
]

# 샘플 이미지 생성
colors = {'블루': (70, 130, 180), '블랙': (0, 0, 0), '화이트': (255, 255, 255), '레드': (220, 20, 60), '브라운': (139, 69, 19)}
for item in sample_items:
    color = colors[item['color']]
    img = Image.new('RGB', (400, 400), color)
    img.save(f"data/images/{item['item_id']}.jpg")

print(f"샘플 데이터 생성 완료: {len(sample_items)}개 아이템")

샘플 데이터 생성 완료: 5개 아이템


In [11]:
# 임베딩 생성 함수
def encode_image(image_path):
    with Image.open(image_path) as img:
        if img.mode != 'RGB':
            img = img.convert('RGB')
        buffer = io.BytesIO()
        img.save(buffer, format='JPEG')
        return base64.b64encode(buffer.getvalue()).decode('utf-8')

def generate_embedding(text, image_base64=None):
    if image_base64:
        payload = {
            "input_type": "search_document",
            "embedding_types": ["float"],
            "inputs": [{
                "content": [
                    {"type": "text", "text": text},
                    {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}
                ]
            }]
        }
    else:
        payload = {
            "input_type": "search_document",
            "embedding_types": ["float"],
            "texts": [text]
        }
    
    response = bedrock_client.invoke_model(
        modelId='cohere.embed-v4:0',
        body=json.dumps(payload),
        contentType='application/json'
    )
    
    result = json.loads(response['body'].read())
    if isinstance(result['embeddings'], list):
        return result['embeddings'][0]["float"]
    else:
        return result['embeddings']['float'][0]
        

print("임베딩 함수 정의 완료")

임베딩 함수 정의 완료


In [12]:
import time
# 임베딩 생성 및 업로드
processed_items = []

for item in sample_items:
    print(f"처리 중: {item['title']}")
    
    # 텍스트 생성
    text = f"Title: {item['title']}\nDescription: {item['description']}\nCategory: {item['category']}\nBrand: {item['brand']}\nColor: {item['color']}\nPrice: ${item['price']}"
    
    # 이미지 인코딩
    image_path = f"data/images/{item['item_id']}.jpg"
    image_base64 = encode_image(image_path)
    
    # 멀티모달 임베딩 생성
    embedding = generate_embedding(text, image_base64)
    time.sleep(2)  # 2초 대기
    
    # 문서 생성
    doc = {
        **item,
        'multimodal_embedding': embedding,
        'created_at': datetime.now().isoformat()
    }
    
    # OpenSearch에 업로드
    client.index(index=INDEX_NAME, body=doc)
    #client.index(index=INDEX_NAME, id=item['item_id'], body=doc)
    response = client.index(index=INDEX_NAME, body=doc)
    processed_items.append(doc)
    #print(f"완료: {item['item_id']}")
    print(f"완료: {item['item_id']} - {response['_id']}")

print(f"\n모든 아이템 처리 완료: {len(processed_items)}개")

처리 중: 블루 데님 재킷
완료: item_001 - 1%3A0%3AVUjPG5oBmcOlwPDXl4uY
처리 중: 블랙 이브닝 드레스
완료: item_002 - 1%3A0%3AVkjPG5oBmcOlwPDXoYug
처리 중: 화이트 스니커즈
완료: item_003 - 1%3A0%3AV0jPG5oBmcOlwPDXq4tl
처리 중: 레드 울 스웨터
완료: item_004 - 1%3A0%3AWEjPG5oBmcOlwPDXtIvV
처리 중: 브라운 가죽 핸드백
완료: item_005 - 1%3A0%3AWUjPG5oBmcOlwPDXvot2

모든 아이템 처리 완료: 5개


In [13]:
def display_results(results, query_desc):
    print(f"\n=== 검색 결과: {query_desc} ===")
    
    if not results:
        print("❌ 검색 결과가 없습니다.")
        return
    
    print(f"📊 총 {len(results)}개 결과 발견\n")
    
    for i, item in enumerate(results, 1):
        print(f"{i}. {item['title']} (점수: {item['score']:.4f})")
        print(f"   {item['category']} | {item['brand']} | {item['color']} | ${item['price']}")
        print(f"   {item['description']}")
        print()

print("🔍 검색 함수 정의 완료")

검색 함수 정의 완료


In [14]:
# 1. 텍스트 검색
results = search(query_text="블루 캐주얼 재킷 일상복")
display_results(results, "텍스트 검색: 블루 캐주얼 재킷")

# 2. 이미지 검색
results = search(query_image_path="data/images/item_001.jpg")
display_results(results, "이미지 검색: item_001.jpg")

# 3. 멀티모달 검색
results = search(
    query_text="우아한 정장 특별한 행사용",
    query_image_path="data/images/item_002.jpg"
)
display_results(results, "멀티모달 검색: 우아한 정장 + 이미지")



=== 검색 결과: 텍스트 검색: 블루 캐주얼 재킷 ===

=== 검색 결과: 이미지 검색: item_001.jpg ===

=== 검색 결과: 멀티모달 검색: 우아한 정장 + 이미지 ===


In [38]:
# 인덱스 존재 및 문서 수 확인
print(f"인덱스 존재: {client.indices.exists(index=INDEX_NAME)}")

# 문서 수 확인 (다른 방법)
response = client.search(index=INDEX_NAME, body={"query": {"match_all": {}}, "size": 0})
doc_count = response['hits']['total']['value']
print(f"문서 수: {doc_count}")

if doc_count == 0:
    print("❌ 문서가 없습니다. 임베딩 업로드가 실패했습니다.")
    print("API 제한이 해제된 후 임베딩 생성 셀을 다시 실행하세요.")

인덱스 존재: True
문서 수: 10


In [32]:
print("=== 멀티모달 임베딩 솔루션 정보 ===")
print(f"OpenSearch 엔드포인트: {ENDPOINT}")
print(f"인덱스: {INDEX_NAME}")
print(f"임베딩 모델: Cohere embed-v4")
print(f"임베딩 차원: 1024")
print(f"처리된 아이템: {len(processed_items)}개")
print("\n=== 솔루션 구축 완료 ===")

=== 멀티모달 임베딩 솔루션 정보 ===
OpenSearch 엔드포인트: https://xuys55qp27tqob78dyr0.us-east-1.aoss.amazonaws.com
인덱스: fashion-items
임베딩 모델: Cohere embed-v4
임베딩 차원: 1024
처리된 아이템: 5개

=== 솔루션 구축 완료 ===
