In [1]:
!pip install opensearch-py
!pip install python-dotenv



In [2]:
import json

with open('fsi-regulation.json', 'r', encoding='utf-8') as f:
    documents = json.load(f)

print(documents[0])

{'article': '1', 'title': '목적', 'content': '이 규정은 「전자금융거래법」(이하 "법"이라 한다) 및 동법 시행령(이하 "시행령"이라 한다)에서 금융위\n\n원회에 위임한 사항과 그 시행에 필요한 사항 및 다른 법령에 따라 금융감독원의 검사를 받는 기관의 정보기술부\n\n문 안전성 확보 등을 위하여 필요한 사항을 규정함을 목적으로 한다.', 'text': '제1조(목적) 이 규정은 「전자금융거래법」(이하 "법"이라 한다) 및 동법 시행령(이하 "시행령"이라 한다)에서 금융위\n\n원회에 위임한 사항과 그 시행에 필요한 사항 및 다른 법령에 따라 금융감독원의 검사를 받는 기관의 정보기술부\n\n문 안전성 확보 등을 위하여 필요한 사항을 규정함을 목적으로 한다.', 'code': '제1조'}


In [3]:
import boto3
from botocore.config import Config

region = boto3.Session().region_name # 'us-east-1'
embedding_model_id = "amazon.titan-embed-text-v2:0"

def init_bedrock_client(region: str):
    retry_config = Config(
        region_name=region,
        retries={"max_attempts": 10, "mode": "standard"}
    )
    return boto3.client("bedrock-runtime", region_name=region, config=retry_config)

def embed_document(document):
    text_response = boto3_client.invoke_model(
        modelId=embedding_model_id,
        body=json.dumps({"inputText": document['text']})
    )
    text_embedding = json.loads(text_response['body'].read())['embedding']
    document['text_embedding'] = text_embedding
    
    if document.get('title'):
        title_response = boto3_client.invoke_model(
            modelId=embedding_model_id,
            body=json.dumps({"inputText": document['title']})
        )
        title_embedding = json.loads(title_response['body'].read())['embedding']
        document['title_embedding'] = title_embedding

    return document

boto3_client = init_bedrock_client(region)

document_with_emb = []
for document in documents:
    response = embed_document(document)
    document_with_emb.append(response)

print(document_with_emb[1])

{'chapter': '1', 'article': '2', 'clause': '1', 'content': '"전산실"이라 함은 전산장비, 통신 및 보안장비, 전산자료 보관 및 출력장비가 설치된 장소를 말한다.', 'text': '1. "전산실"이라 함은 전산장비, 통신 및 보안장비, 전산자료 보관 및 출력장비가 설치된 장소를 말한다.', 'code': '제1장제2조제1호', 'text_embedding': [0.005457766354084015, -0.017944473773241043, 0.07111635059118271, 0.020590662956237793, 0.006987594533711672, 0.04630832001566887, 0.009137623943388462, -0.009385704062879086, -0.0004315563419368118, 0.018688714131712914, -0.0011111929779872298, -0.01579444482922554, -0.014388656243681908, -0.04564677178859711, 0.010667452588677406, -0.0519314743578434, 0.023815708234906197, 0.04465445131063461, 0.006822207942605019, 0.03621971979737282, 0.05160069838166237, 0.05424688756465912, 0.0014678083825856447, 0.003493797266855836, 0.023981094360351562, -0.04928528144955635, -0.00020931774633936584, -0.06615474075078964, 0.042669810354709625, -0.02712344378232956, -0.012321320362389088, -0.0041966913267970085, 0.007235675118863583, 0.03307737037539482, 0.07938569039106

In [4]:
output_file = 'kr-fsi-regulation-with-emb.json'

with open(output_file, 'w', encoding='utf-8') as f:
    json.dump(document_with_emb, f, ensure_ascii=False, indent=4)

In [5]:
from opensearchpy import OpenSearch, RequestsHttpConnection
from dotenv import load_dotenv
import os

host = os.getenv('OPENSEARCH_HOST')
user = os.getenv('OPENSEARCH_USER')
password = os.getenv('OPENSEARCH_PASSWORD')
region = os.getenv('OPENSEARCH_REGION')
index_name = os.getenv('OPENSEARCH_INDEX')

os_client = OpenSearch(
    hosts = [{'host': host.replace("https://", ""), 'port': 443}],
    http_auth = (user, password),
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [6]:
mapping = {
    "settings": {
        "index": {
            "knn": True,
            "knn.algo_param.ef_search": 512
        }
    },
    "mappings": {
        "properties": {
            "code": {"type": "keyword"},
            "title": {"type": "text"},
            "text": {"type": "text"},
            "chapter": {"type": "keyword"},
            "section": {"type": "keyword"},
            "paragraph": {"type": "keyword"},
            "article": {"type": "keyword"},
            "sub-article": {"type": "keyword"},
            "clause": {"type": "keyword"},
            "item": {"type": "keyword"},
            "title_embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "faiss",
                    "parameters": {
                        "ef_construction": 512,
                        "m": 16
                    }
                }
            },
            "text_embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "faiss",
                    "parameters": {
                        "ef_construction": 512,
                        "m": 16
                    }
                }
            }
        }
    }
}


def init_opensearch_index(os_client, index_name, mapping):
    if os_client.indices.exists(index=index_name):
        os_client.indices.delete(index=index_name)
    os_client.indices.create(index=index_name, body=mapping)

init_opensearch_index(os_client, index_name, mapping)

In [7]:
bulk_data = []
for doc in document_with_emb:
    
    _id = "" + doc.get('chapter') if doc.get('chapter') else "-"
    _id += "_" + doc.get('section') if doc.get('section') else "-"
    _id += "_" + doc.get('paragraph') if doc.get('paragraph') else "-"
    _id += "_" + doc.get('article') if doc.get('article') else "-"
    _id += "_" + doc.get('sub-article') if doc.get('sub-article') else "-"
    _id += "_" + doc.get('clause') if doc.get('clause') else "-"
    _id += "_" + doc.get('item') if doc.get('item') else "-"
    
    bulk_data.append({"index": {"_index": index_name, "_id": _id}})
    bulk_data.append(doc)

if bulk_data:
    response = os_client.bulk(body=bulk_data)
    successful = sum(1 for item in response['items'] if item['index']['status'] in (200, 201))
    failed = len(response['items']) - successful

    print(f"Indexed {successful} documents successfully.")
    print(f"Failed to index {failed} documents.")
else:
    print("No data to index.")

Indexed 568 documents successfully.
Failed to index 0 documents.


In [9]:
search_response = os_client.search(
    index=index_name,
    body={
        "size": 1,
        "query": {
            "function_score": {
                "random_score": {}
            }
        }
    }
)

#print(search_response['hits']['hits'][0]['_source']['title'])
print(search_response['hits']['hits'][0]['_source']['text'])

제37조의5(정보보호최고책임자의 업무) 정보보호최고책임자는 정보보안점검의 날을 지정하고, 임직원이 금융감독

원장이 정하는 정보보안 점검항목을 준수했는지 여부를 매월 점검하고, 그 점검 결과 및 보완 계획을 최고경영자

에게 보고하여야 한다.[[본조신설 2015. 2. 3.]]]


-----
