# CLOVA Studio를 이용해 RAG 구현하기

In [1]:
# Vector DB인 Milvus와 관련된 모듈들은 모듈의 용도를 명확히 구분하기 위해 Vector DB 구축 단계에서 불러왔으며 해당 부분에서 코드를 확인하실 수 있습니다.
import json
import os
import subprocess
from langchain_community.document_loaders import UnstructuredHTMLLoader
from pathlib import Path
import base64
import http.client
from tqdm import tqdm
import requests

### 1. Raw Data → Connecting

txt → html 변환 및 원본 사이트 주소 mapping

In [2]:
url_to_filename_map = {}
 
with open("clovastudiourl.txt", "r") as file:
    urls = [url.strip() for url in file.readlines()]
 
folder_path = "clovastudioguide"
 
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
 
for url in urls:
    filename = url.split("/")[-1] + ".html"
    file_path = os.path.join(folder_path, filename)
    subprocess.run(["wget", "--user-agent=Mozilla/5.0","-O", file_path, url], check=True)
    url_to_filename_map[url] = filename
 
with open("url_to_filename_map.json", "w") as map_file:
    json.dump(url_to_filename_map, map_file)

--2024-09-27 19:54:00--  https://guide.ncloud-docs.com/docs/clovastudio-playground
Resolving guide.ncloud-docs.com (guide.ncloud-docs.com)... 104.18.6.159, 104.18.7.159, 2606:4700::6812:69f, ...
Connecting to guide.ncloud-docs.com (guide.ncloud-docs.com)|104.18.6.159|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘clovastudioguide/clovastudio-playground.html’

     0K .......... .......... .......... .......... .......... 15.6M
    50K .......... .......... .........                        4.65M=0.009s

2024-09-27 19:54:01 (8.32 MB/s) - ‘clovastudioguide/clovastudio-playground.html’ saved [81335]

--2024-09-27 19:54:01--  https://guide.ncloud-docs.com/docs/clovastudio-spec
Resolving guide.ncloud-docs.com (guide.ncloud-docs.com)... 104.18.6.159, 104.18.7.159, 2606:4700::6812:69f, ...
Connecting to guide.ncloud-docs.com (guide.ncloud-docs.com)|104.18.6.159|:443... connected.
HTTP request sent, awaiting response... 200 OK
Lengt

LangChain 활용 HTML 로딩

In [4]:
# 폴더 이름에 맞게 수정
html_files_dir = Path('clovastudioguide')
 
html_files = list(html_files_dir.glob("*.html"))
 
clovastudiodatas = []
 
for html_file in html_files:
    loader = UnstructuredHTMLLoader(str(html_file))
    document_data = loader.load()
    clovastudiodatas.append(document_data)
    print(f"Processed {html_file}")

Processed clovastudioguide/clovastudio-playground.html
Processed clovastudioguide/clovastudio-spec.html


Mapping 정보를 활용해 'source'를 실제 URL로 대체

In [5]:
with open("url_to_filename_map.json", "r") as map_file:
    url_to_filename_map = json.load(map_file)
 
filename_to_url_map = {v: k for k, v in url_to_filename_map.items()}
 
# clovastudiodatas 리스트의 각 Document 객체의 'source' 수정
for doc_list in clovastudiodatas:
    for doc in doc_list:
        extracted_filename = doc.metadata["source"].split("/")[-1]
        if extracted_filename in filename_to_url_map:
            doc.metadata["source"] = filename_to_url_map[extracted_filename]
        else:
            print(f"Warning: {extracted_filename}에 해당하는 URL을 찾을 수 없습니다.")

In [6]:
# 이중 리스트를 풀어서 하나의 리스트로 만드는 작업
clovastudiodatas_flattened = [item for sublist in clovastudiodatas for item in sublist]

### 2. Chunking

In [7]:
class SegmentationExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def _send_request(self, completion_request):
        headers = {
            "Content-Type": "application/json; charset=utf-8",
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id
        }
 
        conn = http.client.HTTPSConnection(self._host)
        conn.request(
            "POST",
            "/testapp/v1/api-tools/segmentation/b8841471522d45c39b5efc8a3704ef84", # If using Service App, change 'testapp' to 'serviceapp', and corresponding app id.
            json.dumps(completion_request),
            headers
        )
        response = conn.getresponse()
        result = json.loads(response.read().decode(encoding="utf-8"))
        conn.close()
        return result
 
    def execute(self, completion_request):
        res = self._send_request(completion_request)
        if res["status"]["code"] == "20000":
            return res["result"]["topicSeg"]
        else:
            raise ValueError(f"{res}")
 
 
if __name__ == "__main__":
    segmentation_executor = SegmentationExecutor(
        host="clovastudio.apigw.ntruss.com",
        api_key='<api_key>',
        api_key_primary_val='<api_key_primary_val>',
        request_id='<request_id>'
    )
  
    chunked_html = []
  
    for htmldata in tqdm(clovastudiodatas_flattened):
        try:
            request_data = {
                "postProcessMaxSize": 100,
                "alpha": -100,
                "segCnt": -1,
                "postProcessMinSize": -1,
                "text": htmldata.page_content,
                "postProcess": True
            }
             
            request_json_string = json.dumps(request_data)
            request_data = json.loads(request_json_string, strict=False)
            response_data = segmentation_executor.execute(request_data)
            result_data = [' '.join(segment) for segment in response_data]
 
        except json.JSONDecodeError as e:
            print(f"JSON decoding failed: {e}")
        except Exception as e:
            print(f"An error occurred: {e}")
         
        for paragraph in result_data:
            chunked_document = {
                "source": htmldata.metadata["source"],
                "text": paragraph
            }
            chunked_html.append(chunked_document)
  
print(len(chunked_html))

100%|█████████████████████████████████████████████| 2/2 [00:02<00:00,  1.11s/it]

25





### 3. Embedding

In [8]:
class EmbeddingExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def _send_request(self, completion_request):
        headers = {
            "Content-Type": "application/json; charset=utf-8",
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id
        }
 
        conn = http.client.HTTPSConnection(self._host)
        conn.request(
            "POST",
            "/testapp/v1/api-tools/embedding/clir-emb-dolphin/ab25c547628241cdae51173406f28711", # If using Service App, change 'testapp' to 'serviceapp', and corresponding app id.
            json.dumps(completion_request),
            headers
        )
        response = conn.getresponse()
        result = json.loads(response.read().decode(encoding="utf-8"))
        conn.close()
        return result
 
    def execute(self, completion_request):
        res = self._send_request(completion_request)
        if res["status"]["code"] == "20000":
            return res["result"]["embedding"]
        else:
            error_code = res["status"]["code"]
            error_message = res.get("status", {}).get("message", "Unknown error")
            raise ValueError(f"오류 발생: {error_code}: {error_message}")
 
 
if __name__ == "__main__":
    embedding_executor = EmbeddingExecutor(
        host="clovastudio.apigw.ntruss.com",
        api_key='<api_key>',
        api_key_primary_val='<api_key_primary_val>',
        request_id='<request_id>'
    )
 
    for i, chunked_document in enumerate(tqdm(chunked_html)):
        try:
            request_json = {
                "text": chunked_document['text']
            }
            request_json_string = json.dumps(request_json)
            request_data = json.loads(request_json_string, strict=False)
            response_data = embedding_executor.execute(request_data)
        except ValueError as e:
            print(f"Embedding API Error. {e}")
        except Exception as e:
            print(f"Unexpected error: {e}")
         
        chunked_document["embedding"] = response_data

100%|███████████████████████████████████████████| 25/25 [00:07<00:00,  3.51it/s]


In [None]:
dimension_set = set()
 
for item in chunked_html:
    if "embedding" in item:
        dimension = len(item["embedding"])
        dimension_set.add(dimension)
 
print("임베딩된 벡터들의 차원:", dimension_set)

In [9]:
chunked_html[10]

{'source': 'https://guide.ncloud-docs.com/docs/clovastudio-playground',
 'text': '사업자등록번호: 129-86-31394 통신판매업신고번호:제2009-경기성남-0510호 대표이사: 김유원 주소: 경기도 성남시 분당구 분당내곡로 131 판교테크원 타워1 13~15층, 13529 고객지원 대표전화: 1544-5876 © NAVER Cloud Corp. All Rights Reserved. Change password! Change profile Success!',
 'embedding': [-0.4977,
  -0.37463403,
  -0.8082111,
  1.5684292,
  -1.0647352,
  -0.8259442,
  -0.470726,
  1.4804882,
  0.24201761,
  1.2678417,
  -0.87588906,
  0.97624964,
  1.3570925,
  1.0701687,
  0.0037479987,
  0.02852275,
  -0.7737519,
  -0.27900526,
  0.62572086,
  1.0088664,
  -0.017886374,
  -0.37477672,
  1.188733,
  0.84866834,
  1.2898862,
  -1.2503594,
  -1.2410463,
  -0.28544605,
  0.68900746,
  1.0667744,
  0.2868338,
  1.2272992,
  -0.62896794,
  -0.41503736,
  1.3241513,
  0.33913246,
  -1.0360606,
  -1.1065421,
  -0.60298157,
  -0.10519498,
  -0.9272375,
  -1.5781276,
  -0.03345066,
  -0.32603604,
  0.00807339,
  1.0682707,
  -1.5219321,
  -1.3957804,
  0.13822852,
  -1.095

In [10]:
for item in chunked_html:
    item['text'] = ", ".join(item['text'])

### 4. Vector DB

Milvus Set Up

In [11]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

Collections & ID 생성

In [12]:
connections.connect()
  
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=3000),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=9000),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
]

schema = CollectionSchema(fields, description="Enter a collection description")

collection_name = "demo_collection"
collection = Collection(name=collection_name, schema=schema, using='default', shards_num=2)

for item in chunked_html:
    source_list = [item['source']]
    text_list = [item['text']]
    embedding_list = [item['embedding']]
    
    entities = [
        source_list,
        text_list,
        embedding_list
    ]
    
    insert_result = collection.insert(entities)
    print("ID for which data insertion was completed:", insert_result.primary_keys)

print("All data insertion has been completed")

ID for which data insertion was completed: [452835994821198528]
ID for which data insertion was completed: [452835994821198530]
ID for which data insertion was completed: [452835994821198532]
ID for which data insertion was completed: [452835994821198534]
ID for which data insertion was completed: [452835994821198536]
ID for which data insertion was completed: [452835994821198538]
ID for which data insertion was completed: [452835994821198540]
ID for which data insertion was completed: [452835994821198542]
ID for which data insertion was completed: [452835994821198544]
ID for which data insertion was completed: [452835994821198546]
ID for which data insertion was completed: [452835994821198548]
ID for which data insertion was completed: [452835994821198550]
ID for which data insertion was completed: [452835994821198552]
ID for which data insertion was completed: [452835994821198554]
ID for which data insertion was completed: [452835994821198556]
ID for which data insertion was complete

Indexing

In [13]:
import pymilvus

# Milvus 2.4x 버전 코드
milvus = pymilvus.Milvus()


index_params = {
    "metric_type": "IP",
    "index_type": "HNSW",
    "params": {
        "M": 8,
        "efConstruction": 200
    }
}
 
collection = Collection("htmlrag_forncp")
collection.create_index(field_name="embedding", index_params=index_params)
utility.index_building_progress("htmlrag_forncp")
 
print([index.params for index in collection.indexes])



[{'metric_type': 'IP', 'index_type': 'HNSW', 'params': {'M': 8, 'efConstruction': 200}}]


In [None]:
import pymilvus

# Milvus 2.4x 버전 코드
milvus = pymilvus.Milvus()

# 인덱스 파라미터 설정
index_params = {
    "metric_type": "IP",
    "index_type": "HNSW",
    "params": {
        "M": 8,
        "efConstruction": 200
    }
}

# 컬렉션 생성
collection_name = "htmlrag_forncp"
milvus.create_collection(
    name=collection_name,
    dimension=1024,  # 임시로 1024로 설정
    primary_key="pk",  # 임시로 pk로 설정
)

# 인덱스 생성
milvus.create_index(
    collection_name=collection_name,
    field_name="embedding",
    index_params=index_params
)

# 인덱스 빌딩 진행 상태 확인
print(milvus.get_index_building_progress(collection_name))

# 인덱스 정보 출력
indexes = milvus.get_collection_info(collection_name).index_info
print([index.params for index in indexes])

재실행시 Milvus 로딩 방법

1. Docker를 실행한 후, collections가 저장되어 있는 컨테이너(milvus-standalone)를 실행시켜줍니다.
2. 아래의 코드를 통해 connection을 실행하고, 만든 collection의 이름을 넣어 연결합니다. Milvus 및 Milvus-Standalone의 기본 환경은 host = "localhost", port="19530"입니다. 필요한 모듈과 라이브러리는 Milvus에 데이터를 저장할 때와 동일합니다.

In [14]:
connections.connect("default", host="localhost", port="19530")
 
# 불러올 collection 이름을 넣는 곳
collection = Collection("htmlrag_forncp")
utility.load_state("htmlrag_forncp")

<LoadState: Loaded>

In [None]:
# from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
 
# connections.connect()
 
# fields = [
#     FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
#     FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=3000),
#     FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=3000),
#     FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
# ]
 
# schema = CollectionSchema(fields, description="html chunk collection for rag inner product")
# collection_name = "htmlrag_forncp"
# collection = Collection(name=collection_name, schema=schema)
 
# entities = []
 
# for item in chunked_html:
#     entities.append(
#         {
#             "source": item["source"],
#             "text": item["text"],
#             "embedding": item["embedding"]
#         }
#     )
# print(fields)
# insert_result = collection.insert(entities)
 
# print("Data insertion complete. IDs:", insert_result.primary_keys)

### 5. Retrieval → HyperCLOVA X

Chat Completion API
1. 가장 긴 길이의 답변을 선택하는 경우

In [15]:
class CompletionExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def execute(self, completion_request, response_type="stream"):
        headers = {
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id,
            "Content-Type": "application/json; charset=utf-8",
            "Accept": "text/event-stream"
        }
 
        final_answer = ""
 
        with requests.post(
            self._host + "/testapp/v1/chat-completions/HCX-003",
            headers=headers,
            json=completion_request,
            stream=True
        ) as r:
            if response_type == "stream":
                longest_line = ""
                for line in r.iter_lines():
                    if line:
                        decoded_line = line.decode("utf-8")
                        if decoded_line.startswith("data:"):
                            event_data = json.loads(decoded_line[len("data:"):])
                            message_content = event_data.get("message", {}).get("content", "")
                            if len(message_content) > len(longest_line):
                                longest_line = message_content
                final_answer = longest_line
            elif response_type == "single":
                final_answer = r.json()  # 가정: 단일 응답이 JSON 형태로 반환됨

2. "[DONE]" 바로 직전의 출력 결과를 선택하는 경우

In [16]:
class CompletionExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def execute(self, completion_request):
        headers = {
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id,
            "Content-Type": "application/json; charset=utf-8",
            "Accept": "text/event-stream"
        }
 
        response = requests.post(
            self._host + "/testapp/v1/chat-completions/HCX-003",
            headers=headers,
            json=completion_request,
            stream=True
        )
 
        # 스트림에서 마지막 'data:' 라인을 찾기 위한 로직
        last_data_content = ""
 
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode("utf-8")
                if '"data":"[DONE]"' in decoded_line:
                    break
                if decoded_line.startswith("data:"):
                    last_data_content = json.loads(decoded_line[5:])["message"]["content"]
 
        return last_data_content

답변 생성 함수 정의

In [17]:
# 사용자의 쿼리를 임베딩하는 함수를 먼저 정의
def query_embed(text: str):
    request_data = {"text": text}
    response_data = embedding_executor.execute(request_data)
    return response_data

In [18]:
def html_chat(realquery: str) -> str:
    # 사용자 쿼리 벡터화
    query_vector = query_embed(realquery)
 
    collection.load()
 
    search_params = {"metric_type": "IP", "params": {"ef": 64}}
    results = collection.search(
        data=[query_vector],  # 검색할 벡터 데이터
        anns_field="embedding",  # 검색을 수행할 벡터 필드 지정
        param=search_params,
        limit=10,
        output_fields=["source", "text"]
    )
 
    reference = []
 
    for hit in results[0]:
        distance = hit.distance
        source = hit.entity.get("source")
        text = hit.entity.get("text")
        reference.append({"distance": distance, "source": source, "text": text})
 
    completion_executor = CompletionExecutor(
        host="https://clovastudio.stream.ntruss.com",
        api_key='<api_key>',
        api_key_primary_val='<api_key_primary_val>',
        request_id='<request_id>'
    )
 
    preset_texts = [
        {
            "role": "system",
            "content": "- 너의 역할은 사용자의 질문에 reference를 바탕으로 답변하는거야. \n- 너가 가지고있는 지식은 모두 배제하고, 주어진 reference의 내용만을 바탕으로 답변해야해. \n- 답변의 출처가 되는 html의 내용인 'source'도 답변과 함께 {url:}의 형태로 제공해야해. \n- 만약 사용자의 질문이 reference와 관련이 없다면, {제가 가지고 있는 정보로는 답변할 수 없습니다.}라고만 반드시 말해야해."
        }
    ]
 
    for ref in reference:
        preset_texts.append(
            {
                "role": "system",
                "content": f"reference: {ref['text']}, url: {ref['source']}"
            }
        )
 
    preset_texts.append({"role": "user", "content": realquery})
 
    request_data = {
        "messages": preset_texts,
        "topP": 0.6,
        "topK": 0,
        "maxTokens": 1024,
        "temperature": 0.5,
        "repeatPenalty": 1.2,
        "stopBefore": [],
        "includeAiFilters": False
    }
 
    # LLM 생성 답변 반환
    response_data = completion_executor.execute(request_data)
 
    return response_data

실행 결과 모음

In [19]:
response = html_chat("클로바스튜디오의 주요 기능이 뭐야?")
print(response)

제가 가지고 있는 정보로는 답변할 수 없습니다.


In [20]:
response2 = html_chat("클로바스튜디오의 사용 권한을 어떻게 설정해?")
print(response2)

제가 가지고 있는 정보로는 답변할 수 없습니다.


In [21]:
response3 = html_chat("플레이그라운드가 뭐야?")
print(response3)

플레이그라운드는 프롬프트 입력과, 파라미터 설정을 이용해 HyperCLOVA X 모델을 활용할 수 있는 공간입니다. 

- url : https://guide.***********.***/docs/clovastudio-playground


In [None]:
response4 = html_chat("CLOVA X에서 스킬을 만들 수 있어?")
print(response4)

In [None]:
response5 = html_chat("부산 1박 2일 여행 계획을 짜줘?")
print(response5)

In [None]:
response6 = html_chat("니가 그렇게 싸움을 잘해?")
print(response6)

In [24]:
response7 = html_chat("CLOVA Studio가 지원하는 클라우드 환경 사양?")
print(response7)

제가 가지고 있는 정보로는 답변할 수 없습니다.
