In [1]:
import os
from typing import List, Dict, Any, Union
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

True

In [2]:


from pathlib import Path
from langchain_core.documents import Document
from tqdm.auto import tqdm
import orjson
from uuid_extensions import uuid7
import json
import datasets
import numpy as np
from omegaconf import OmegaConf
from argparse import ArgumentParser

from opensearch_toolkits.core import SyncOpenSearchManagerIndexer, OpenSearchConfig


def read_jsonlines(path: Union[str, Path]) -> List[Dict]:
    """Load batch data from jsonline files."""
    if isinstance(path, str):
        path = Path(path)

    with path.open('r') as f:
        data: List[Dict] = [orjson.loads(line) for line in f]
    return data

def convert_korquadv1_to_langchain_docs(dict_records: List[Dict[str, Any]]) -> List[Document]:
    """
    파이썬 딕셔너리 형태의 데이터셋 레코드를 Langchain Document 객체 리스트로 변환합니다.
    
    Args:
        dict_records: 딕셔너리 형태의 레코드 리스트
        
    Returns:
        List[Document]: Langchain Document 객체 리스트
    """
    documents = []
    
    for record in tqdm(dict_records, desc="Converting records to documents"):
        # description을 page_content로 사용
        question = record.get('question', '')

        if (question != "nan") and question:
            page_content = question
        
        # 메타데이터 구성
        metadata = {k: v for k, v in record.items() if k in ['title']}
        
        # Document 객체 생성 및 리스트에 추가
        doc = Document(page_content=page_content, metadata=metadata, id=str(uuid7()))
        documents.append(doc)
    
    return documents

def get_cached_embeddings_path(config):
    dataset_name = config.dataset_name.replace("/", "_")
    embedding_model_name = config.model_name.tokenizer.replace("/", "_")
    cache_name = f"{dataset_name}+{config.default}+{config.split}+{embedding_model_name}_embeddings.jsonl"
    return f"{config.cache_dir}/{cache_name}"

In [3]:
# load config
config = OmegaConf.load("../../scripts/sample.yaml")
index_manager = SyncOpenSearchManagerIndexer(
    OpenSearchConfig(
        host=os.environ["OPENSEARCH_HOST"],  # OpenSearch 서버 주소
        port=int(os.environ["OPENSEARCH_PORT"]),  # OpenSearch 서버 포트
        user=os.environ["OPENSEARCH_USER"],  # OpenSearch 사용자 이름
        password=os.environ["OPENSEARCH_PASSWORD"],  # OpenSearch 비밀번호
        use_ssl=os.environ["OPENSEARCH_USE_SSL"].lower() == "true",  # SSL 사용 여부
    )
)
preprocessor_fn = convert_korquadv1_to_langchain_docs

In [4]:
index_manager.check_connection()

True

In [7]:
index_manager.delete_index(config.index_name)

True

In [5]:
# OpenSearch 인덱스 생성
with open(f"../../scripts/{config.mapping_file}") as f:
    index_config = json.load(f)
index_manager.create_index_if_not_exists(index_name=f"{config.index_name}", index_config=index_config)


True

In [6]:
# 데이터셋 로드
ds = datasets.load_dataset(
    config.dataset_name, config.default, split=config.split
        )
cached_embeddings = read_jsonlines(get_cached_embeddings_path(config))

docs = preprocessor_fn(ds.to_list())

Converting records to documents:   0%|          | 0/60407 [00:00<?, ?it/s]

In [7]:
documents = index_manager._filter_and_merge(
    docs,
        cached_embeddings,
        text_field="text",
        embedding_field="vector_field",
)

In [8]:
documents[12]

{'id': '06805d68-fd88-7ecd-8000-a7708ce8fac1',
 'metadata': {'title': '파우스트_서곡'},
 'type': 'Document',
 'text': '파우스트 교향곡을 피아노 독주용으로 편곡한 사람은?',
 'vector_field': {'_index': 12,
  'embedding': [0.0310500618070364,
   0.014241465367376804,
   -0.045230407267808914,
   0.016014007851481438,
   0.009229447692632675,
   -0.0062344614416360855,
   0.054520975798368454,
   -0.016869718208909035,
   0.026649266481399536,
   0.052565064281225204,
   0.008373737335205078,
   0.03153903782367706,
   -0.0006990848341956735,
   -0.018947873264551163,
   0.023593157529830933,
   0.026038045063614845,
   0.016014007851481438,
   0.009046081453561783,
   -0.010757502168416977,
   -0.041807565838098526,
   -0.01448595430701971,
   -0.012652289122343063,
   0.006142777856439352,
   -0.023348668590188026,
   -0.01638074219226837,
   0.00953505840152502,
   -0.006051094736903906,
   -0.005164823494851589,
   -0.01051301322877407,
   -0.044496938586235046,
   0.02836068719625473,
   -0.013935854658484459,
 

In [13]:
index_manager.index_document(config.index_name, documents[12], refresh=True)

ERROR:root:Error indexing document to 'korquad_v1': RequestError(400, 'strict_dynamic_mapping_exception', 'mapping set to strict, dynamic introduction of [id] within [_doc] is not allowed')


False

In [None]:
 def index_document(
        self,
        index_name: str,
        document: Dict[str, Any],
        doc_id: Optional[str] = None,
        refresh: bool = False
    ) 

In [None]:
result = index_manager.index_documents_for_langchain(
        config.index_name,
        docs,
        cached_embeddings,
        text_field="text",
        embedding_field="vector_field",
        id_field="id",
        refresh=True,
    )

In [None]:

    



    

    # 문서 인덱싱
    

    print(f"인덱싱 결과: {result}")

if __name__ == "__main__":
    main()

---

In [1]:
import os
import logging
from typing import List, Dict, Any, Union
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

from pathlib import Path
from langchain_core.documents import Document
from tqdm.auto import tqdm
import orjson
from uuid_extensions import uuid7
import json
import datasets
import numpy as np
from omegaconf import OmegaConf
from argparse import ArgumentParser

import openai
from langchain_openai import OpenAIEmbeddings

from opensearch_toolkits.core import SyncOpenSearchManagerIndexer, OpenSearchConfig

logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

EMBEDDING_API_URL = "http://192.168.219.104:7997/"


def read_jsonlines(path: Union[str, Path]) -> List[Dict]:
    """Load batch data from jsonline files."""
    if isinstance(path, str):
        path = Path(path)

    with path.open('r') as f:
        data: List[Dict] = [orjson.loads(line) for line in f]
    return data

def convert_korquadv1_to_langchain_docs(dict_records: List[Dict[str, Any]]) -> List[Document]:
    """
    파이썬 딕셔너리 형태의 데이터셋 레코드를 Langchain Document 객체 리스트로 변환합니다.
    
    Args:
        dict_records: 딕셔너리 형태의 레코드 리스트
        
    Returns:
        List[Document]: Langchain Document 객체 리스트
    """
    documents = []
    
    for record in tqdm(dict_records, desc="Converting records to documents"):
        # description을 page_content로 사용
        question = record.get('question', '')

        if (question != "nan") and question:
            page_content = question
        
        # 메타데이터 구성
        metadata = {k: v for k, v in record.items() if k in ['title']}
        
        # Document 객체 생성 및 리스트에 추가
        doc = Document(page_content=page_content, metadata=metadata, id=str(uuid7()))
        documents.append(doc)
    
    return documents

def get_cached_embeddings_path(config):
    dataset_name = config.dataset_name.replace("/", "_")
    embedding_model_name = config.model_name.tokenizer.replace("/", "_")
    cache_name = f"{dataset_name}+{config.default}+{config.split}+{embedding_model_name}_embeddings.jsonl"
    return f"{config.cache_dir}/{cache_name}"

In [2]:
config = OmegaConf.load("../../scripts/sample.yaml")
index_manager = SyncOpenSearchManagerIndexer(
    OpenSearchConfig(
        host=os.environ["OPENSEARCH_HOST"],  # OpenSearch 서버 주소
        port=int(os.environ["OPENSEARCH_PORT"]),  # OpenSearch 서버 포트
        user=os.environ["OPENSEARCH_USER"],  # OpenSearch 사용자 이름
        password=os.environ["OPENSEARCH_PASSWORD"],  # OpenSearch 비밀번호
        use_ssl=os.environ["OPENSEARCH_USE_SSL"].lower() == "true",  # SSL 사용 여부
    )
)
preprocessor_fn = convert_korquadv1_to_langchain_docs

In [3]:
# OpenSearch 인덱스 생성
with open("../../scripts/sample_mapping.json") as f:
    index_config = json.load(f)
index_manager.create_index_if_not_exists(index_name=config.index_name, index_config=index_config)

# 데이터셋 로드
ds = datasets.load_dataset(
    config.dataset_name, config.default, split=config.split
        )
# cached_embeddings = read_jsonlines(get_cached_embeddings_path(config))

docs = preprocessor_fn(ds.to_list())

Converting records to documents:   0%|          | 0/60407 [00:00<?, ?it/s]

In [4]:
docs[0]

Document(id='06805f41-2597-71f3-8000-bd5d9ee937e6', metadata={'title': '파우스트_서곡'}, page_content='바그너는 괴테의 파우스트를 읽고 무엇을 쓰고자 했는가?')

In [5]:
# embedding.
client = openai.OpenAI(base_url=EMBEDDING_API_URL)

def process_batch(batch_texts: List[str]) -> List[List[float]]:
    """단일 배치에 대한 임베딩을 처리합니다."""
    try:
        embedding_outputs = client.embeddings.create(
            model='kure-v1',
            input=batch_texts,
        )
        return [encoded_embedding.embedding for encoded_embedding in embedding_outputs.data]
    except Exception as e:
        print(f"Error processing batch: {e}")
        # 에러가 발생하면 재시도 로직을 구현할 수 있습니다
        # 이 예제에서는 간단히 빈 리스트 반환
        return [[] for _ in range(len(batch_texts))]

In [6]:
documents = index_manager._filter_and_merge(docs, [], "text", "vector_field")

In [9]:
config.index_name

'korquad_v1'

Generating embeddings:   2%|▏         | 1000/60407 [00:43<31:20, 31.60it/s]

In [11]:
index_mapping = index_manager.get_mappings(config.index_name)[config.index_name]['mappings'].get('properties', {})

In [14]:
index_mapping

{'metadata': {'properties': {'title': {'type': 'text',
    'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256},
     'korean': {'type': 'text', 'analyzer': 'korean_analyzer'}}}}},
 'text': {'type': 'text',
  'fields': {'english': {'type': 'text', 'analyzer': 'english'},
   'keyword': {'type': 'keyword', 'ignore_above': 2048},
   'korean': {'type': 'text', 'analyzer': 'korean_analyzer'},
   'ngram': {'type': 'text',
    'analyzer': 'ngram_analyzer',
    'search_analyzer': 'standard'}},
  'index_options': 'offsets',
  'analyzer': 'multilingual_analyzer'},
 'type': {'type': 'keyword'},
 'vector_field': {'type': 'knn_vector',
  'dimension': 1024,
  'method': {'engine': 'faiss',
   'space_type': 'cosinesimil',
   'name': 'hnsw',
   'parameters': {'ef_construction': 512, 'm': 32}},
  'space_type': 'cosinesimil'}}

In [16]:
documents[0]

{'id': '06805f41-2597-71f3-8000-bd5d9ee937e6',
 'metadata': {'title': '파우스트_서곡'},
 'type': 'Document',
 'text': '바그너는 괴테의 파우스트를 읽고 무엇을 쓰고자 했는가?'}

In [17]:
def print_nested_keys(dictionary, indent=0):
    for key, value in dictionary.items():
        print("  " * indent + str(key))
        if isinstance(value, dict):
            print_nested_keys(value, indent + 1)

# 사용 예시
nested_dict = {
    "level1_1": {
        "level2_1": {
            "level3_1": "value3_1",
            "level3_2": "value3_2"
        },
        "level2_2": "value2_2"
    },
    "level1_2": "value1_2"
}

print_nested_keys(nested_dict)

level1_1
  level2_1
    level3_1
    level3_2
  level2_2
level1_2


In [18]:
print_nested_keys(index_mapping)

metadata
  properties
    title
      type
      fields
        keyword
          type
          ignore_above
        korean
          type
          analyzer
text
  type
  fields
    english
      type
      analyzer
    keyword
      type
      ignore_above
    korean
      type
      analyzer
    ngram
      type
      analyzer
      search_analyzer
  index_options
  analyzer
type
  type
vector_field
  type
  dimension
  method
    engine
    space_type
    name
    parameters
      ef_construction
      m
  space_type


In [20]:
print_nested_keys(documents[0])

id
metadata
  title
type
text


In [22]:
index_manager.client.indices.get_mapping(config.index_name,pretty=True)

{'korquad_v1': {'mappings': {'dynamic': 'strict',
   'properties': {'metadata': {'properties': {'title': {'type': 'text',
       'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256},
        'korean': {'type': 'text', 'analyzer': 'korean_analyzer'}}}}},
    'text': {'type': 'text',
     'fields': {'english': {'type': 'text', 'analyzer': 'english'},
      'keyword': {'type': 'keyword', 'ignore_above': 2048},
      'korean': {'type': 'text', 'analyzer': 'korean_analyzer'},
      'ngram': {'type': 'text',
       'analyzer': 'ngram_analyzer',
       'search_analyzer': 'standard'}},
     'index_options': 'offsets',
     'analyzer': 'multilingual_analyzer'},
    'type': {'type': 'keyword'},
    'vector_field': {'type': 'knn_vector',
     'dimension': 1024,
     'method': {'engine': 'faiss',
      'space_type': 'cosinesimil',
      'name': 'hnsw',
      'parameters': {'ef_construction': 512, 'm': 32}},
     'space_type': 'cosinesimil'}}}}}

In [8]:
result = index_manager.index_documents_for_langchain(
        config.index_name,
        docs,
        embedding_fn=process_batch,
        text_field="text",
        embedding_field="vector_field",
        id_field="id",
        refresh=False,
    )

Generating embeddings:   2%|▏         | 1000/60407 [00:31<31:20, 31.60it/s]

KeyboardInterrupt: 

In [None]:


def main():
    # argparse 설정
    parser = ArgumentParser(description="OpenSearch 인덱스 생성 및 문서 인덱싱")
    parser.add_argument(
        "--config_path",
        type=str,
        default="config/config.yaml",
    )
    args = parser.parse_args()
    
    # load config
    config = OmegaConf.load(args.config_path)
    index_manager = SyncOpenSearchManagerIndexer(
        OpenSearchConfig(
            host=os.environ["OPENSEARCH_HOST"],  # OpenSearch 서버 주소
            port=int(os.environ["OPENSEARCH_PORT"]),  # OpenSearch 서버 포트
            user=os.environ["OPENSEARCH_USER"],  # OpenSearch 사용자 이름
            password=os.environ["OPENSEARCH_PASSWORD"],  # OpenSearch 비밀번호
            use_ssl=os.environ["OPENSEARCH_USE_SSL"].lower() == "true",  # SSL 사용 여부
        )
    )
    preprocessor_fn = convert_korquadv1_to_langchain_docs

    # OpenSearch 인덱스 생성
    with open(config.mapping_file) as f:
        index_config = json.load(f)
    index_manager.create_index_if_not_exists(index_name=config.index_name, index_config=index_config)

    # 데이터셋 로드
    ds = datasets.load_dataset(
        config.dataset_name, config.default, split=config.split
            )
    # cached_embeddings = read_jsonlines(get_cached_embeddings_path(config))

    docs = preprocessor_fn(ds.to_list())

    # embedding.
    client = openai.OpenAI(base_url=EMBEDDING_API_URL)

    def process_batch(batch_texts: List[str]) -> List[List[float]]:
        """단일 배치에 대한 임베딩을 처리합니다."""
        try:
            embedding_outputs = client.embeddings.create(
                model='kure-v1',
                input=batch_texts,
            )
            return [encoded_embedding.embedding for encoded_embedding in embedding_outputs.data]
        except Exception as e:
            print(f"Error processing batch: {e}")
            # 에러가 발생하면 재시도 로직을 구현할 수 있습니다
            # 이 예제에서는 간단히 빈 리스트 반환
            return [[] for _ in range(len(batch_texts))]

    # 문서 인덱싱
    result = index_manager.index_documents_for_langchain(
        config.index_name,
        docs,
        embedding_fn=process_batch,
        text_field="text",
        embedding_field="vector_field",
        id_field="id",
        refresh=False,
    )

    print(f"인덱싱 결과: {result}")

if __name__ == "__main__":
    main()