# Data Ingest in Python (Azure AI Search)
노트북에 있는 예제를 실행하기 위해 필요한 Azure AI Search의 검색 인덱스를 생성한다. 이 노트북에서는 다음과 같은 작업을 수행한다.

- Azure AI Search 인덱스 생성
- Azure Blob Storage에 PDF 업로드
- Azure AI Document Intellingence를 통한 PDF의 내용물 추출 및 구조화
- 추출된 텍스트를 청크로 분할
- 분할된 청크를 Embeddings로 변환
- Azure AI Search의 인덱스로 등록



# 사전 준비
이 파이썬 예제를 실행하려면 다음과 같은 환경이 필요하다:
- [Azure AI Search 리소스](https://learn.microsoft.com/azure/search/search-create-service-portal)의 엔드포인트 및 관리자 API 키. 이 노트북은 `azure-search-documents==11.4.0`에 기반해서 작성된 것이다.
- [Azure AI Document Intelligence 리소스](https://learn.microsoft.com/azure/ai-services/document-intelligence/create-document-intelligence-resource)의 엔드포인트 및 키
- [Azure Blob Storage 리소스](https://learn.microsoft.com/azure/storage/common/storage-account-create?tabs=azure-portal)의 [connection string](https://learn.microsoft.com/azure/storage/common/storage-account-get-info?tabs=portal#get-a-connection-string-for-the-storage-account)
- [Azure OpenAI Service](https://learn.microsoft.com/azure/ai-services/openai/how-to/create-resource?pivots=web-portal)에 접근할 수 있는 승인된 Azure 구독
- Azure OpenAI Service에 배포된 `text-embedding-ada-002` [Embeddings 모델](https://learn.microsoft.com/azure/ai-services/openai/tutorials/embeddings?tabs=python%2Ccommand-line&pivots=programming-language-python). 이 모델이 사용하는 API 버전은 `2023-05-15`다.
- Azure OpenAI Service 연동 및 모델 정보
  - OpenAI API 키
  - OpenAI Embeddings 모델의 배포 이름
  - OpenAI API 버전
- [Python](https://www.python.org/downloads/release/python-31011/)(이 예제는 버전 3.10.x로 테스트 했다.)

이 예제에서는 Visual Studio Code와 [Jupyter extension](https://marketplace.visualstudio.com/items?itemName=ms-toolsai.jupyter)를 사용한다. 이 노트북은 https://github.com/Azure-Samples/azure-search-openai-demo 를 기반으로 작성된 것이다.

## 패키지 설치

In [None]:
!pip install azure-search-documents==11.4.0
!pip install azure-identity==1.15.0
!pip install azure-ai-formrecognizer==3.3.2
!pip install azure-storage-blob==12.14.1
!pip install openai[datalib]==1.3.9
!pip install pypdf==3.17.0
!pip install jsonpickle

In [None]:
import azure.search.documents
print("azure.search.documents", azure.search.documents.__version__)
import azure.ai.formrecognizer
print("azure.ai.formrecognizer", azure.ai.formrecognizer.__VERSION__)
import azure.storage.blob
print("azure.storage.blob", azure.storage.blob.__version__)
import openai
print("openai", openai.__version__)

## 라이브러리 및 환경변수 불러오기

In [None]:
import os
import io
import time
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient  
from azure.search.documents.indexes.models import (  
    ExhaustiveKnnAlgorithmConfiguration,
    ExhaustiveKnnParameters,
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    SemanticConfiguration,
    SemanticPrioritizedFields,
    SemanticField,
    SemanticSearch,
    VectorSearch,
    HnswAlgorithmConfiguration,
    HnswParameters,
    VectorSearchAlgorithmConfiguration,
    VectorSearchAlgorithmKind,
    VectorSearchAlgorithmMetric,
    VectorSearchProfile,
)  

## 연동 설정

In [None]:
# Azure Blob Storage
azure_storage_container: str = "content"
azure_blob_connection_string: str = "<Your blob connection string>"
# Azure AI Search
search_service_endpoint: str = "<Your search service endpoint>"
search_service_admin_key: str = "<Your search service admin key>"
index_name: str = "gptkbindex"
search_analyzer_name: str = "ko.lucene"
credential = AzureKeyCredential(search_service_admin_key)
# Azure AI Document Intelligence
document_intelligence_key: str = "<Your document intelligence key>"
document_intelligence_endpoint: str = "<Your document intelligence endpoint>"
document_intelligence_creds: str = AzureKeyCredential(document_intelligence_key)
# Azure OpenAI Service
AZURE_OPENAI_API_KEY = "Your OpenAI API Key"
AZURE_OPENAI_ENDPOINT = "https://<Your OpenAI Service>.openai.azure.com/"
model: str = "embedding"  # 자동 구축시 기본으로 설정


## 검색 인덱스 정의
검색 인덱스 스키마와 벡터 검색 설정을 생성한다.
아래 코드는 `azure-search-documents==11.4.0`를 기반으로 작성된 것이므로 다른 버전에서는 동작하지 않을 수 있다.

In [None]:
def create_search_index():
    # Create a search index
    fields = [
        SimpleField(name="id", type="Edm.String", key=True),
        SearchableField(
            name="content", type="Edm.String", analyzer_name=search_analyzer_name
        ),
        SearchField(
            name="embedding",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            hidden=False,
            searchable=True,
            filterable=False,
            sortable=False,
            facetable=False,
            vector_search_dimensions=1536,
            vector_search_profile_name="embedding_config",
        ),
        SimpleField(name="category", type="Edm.String", filterable=True, facetable=True),
        SimpleField(name="sourcepage", type="Edm.String", filterable=True, facetable=True),
        SimpleField(name="sourcefile", type="Edm.String", filterable=True, facetable=True),
        SimpleField(name="metadata", type="Edm.String", filterable=True, facetable=True),
    ]

    semantic_config = SemanticConfiguration(
        name="default",
        prioritized_fields=SemanticPrioritizedFields(
            title_field=None,
            keywords_fields=None,
            content_fields=[SemanticField(field_name="content")]
        )
    )

    # Create the semantic settings with the configuration
    semantic_search = SemanticSearch(configurations=[semantic_config])

    # Configure the vector search configuration
    vector_search = VectorSearch(
        algorithms=[
            HnswAlgorithmConfiguration(
                name="hnsw_config",
                kind=VectorSearchAlgorithmKind.HNSW,
                parameters=HnswParameters(
                    m=4,
                    ef_construction=400,
                    ef_search=500,
                    metric=VectorSearchAlgorithmMetric.COSINE,
                ),
            ),
        ],
        profiles=[
            VectorSearchProfile(
                name="embedding_config",
                algorithm_configuration_name="hnsw_config",
            ),
        ],
    )

    index_client = SearchIndexClient(endpoint=search_service_endpoint, credential=credential)
    if index_name not in index_client.list_index_names():
        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
            semantic_search=semantic_search,
        )
        print(f"Creating {index_name} search index")
        result = index_client.create_or_update_index(index) 
        print(f' {result.name} created')
    else:
        print(f"Search index {index_name} already exists")

def remove_from_index(filename):
    print(f"Removing sections from '{filename or '<all>'}' from search index '{index_name}'")
    search_client = SearchClient(endpoint=search_service_endpoint,
                                    index_name=index_name,
                                    credential=credential)
    while True:
        filter = None if filename is None else f"sourcefile eq '{os.path.basename(filename)}'"
        r = search_client.search("", filter=filter, top=1000, include_total_count=True)
        if r.get_count() == 0:
            break
        r = search_client.delete_documents(documents=[{ "id": d["id"] } for d in r])
        print(f"\tRemoved {len(r)} sections from index")
        # It can take a few seconds for search results to reflect changes, so wait a bit
        time.sleep(2)


# Azure Blob Storage에 PDF 파일 업로드하기
PDF 파일을 페이지 단위로 분할한 뒤 Azure Blob Storage에 업로드한다. 업로드된 파일은 채팅 UI에서 미리보기용으로도 사용된다.

In [None]:
from azure.storage.blob import BlobServiceClient
from pypdf import PdfReader, PdfWriter

def blob_name_from_file_page(filename, page = 0):
    if os.path.splitext(filename)[1].lower() == ".pdf":
        return os.path.splitext(os.path.basename(filename))[0] + f"-{page}" + ".pdf"
    else:
        return os.path.basename(filename)

def upload_blobs(filename):
    blob_service_client = BlobServiceClient.from_connection_string(azure_blob_connection_string)
    blob_container = blob_service_client.get_container_client(azure_storage_container)
    if not blob_container.exists():
        blob_container.create_container()

    # 파일이 PDF인 경우 페이지별로 분할하고, 각 페이지를 개별 Blob으로 업로드한다.
    if os.path.splitext(filename)[1].lower() == ".pdf":
        reader = PdfReader(filename)
        pages = reader.pages
        for i in range(len(pages)):
            blob_name = blob_name_from_file_page(filename, i)
            
            f = io.BytesIO()
            writer = PdfWriter()
            writer.add_page(pages[i])
            writer.write(f)
            f.seek(0)
            blob_container.upload_blob(blob_name, f, overwrite=True)
    else:
        blob_name = blob_name_from_file_page(filename)
        with open(filename,"rb") as data:
            blob_container.upload_blob(blob_name, data, overwrite=True)


# Azure AI Document Intelligence의 OCR 사용하기
PDF에 OCR을 사용하는 [레이아웃 모델](https://learn.microsoft.com/azure/ai-services/document-intelligence/concept-layout) `prebuilt-layout`은 고도화된 머신러닝 기반 문서 분석 API다. 이 API를 사용하면 다양한 형식의 문서를 구조화된 데이터로 반환받을 수 있다. 이 API는 마이크로소프트가 가진 강력한 광학문자인식(OCR) 기능과 딥러닝 모델을 결합하여 텍스트, 테이블, (체크 박스)선택 표시, 문서 구조를 추출한다.

In [None]:
from azure.ai.formrecognizer import DocumentAnalysisClient
import html
import jsonpickle

def table_to_html(table):
    table_html = "<table>"
    rows = [sorted([cell for cell in table.cells if cell.row_index == i], key=lambda cell: cell.column_index) for i in range(table.row_count)]
    for row_cells in rows:
        table_html += "<tr>"
        for cell in row_cells:
            tag = "th" if (cell.kind == "columnHeader" or cell.kind == "rowHeader") else "td"
            cell_spans = ""
            if cell.column_span > 1: cell_spans += f" colSpan={cell.column_span}"
            if cell.row_span > 1: cell_spans += f" rowSpan={cell.row_span}"
            table_html += f"<{tag}{cell_spans}>{html.escape(cell.content)}</{tag}>"
        table_html +="</tr>"
    table_html += "</table>"
    return table_html

def get_document_text(filename):
    offset = 0
    page_map = []

    print(f"Extracting text from '{filename}' using Azure AI Document Intelligence")
    form_recognizer_client = DocumentAnalysisClient(endpoint=document_intelligence_endpoint, credential=document_intelligence_creds, headers={"x-ms-useragent": "azure-search-chat-demo/1.0.0"})
    with open(filename, "rb") as f:
        poller = form_recognizer_client.begin_analyze_document("prebuilt-layout", document = f)
    form_recognizer_results = poller.result()
    
    # Debug 용(AnalyzeResult 객체의 구조를 유지하며 JSON으로 변환)
    # json_data = jsonpickle.encode(form_recognizer_results)
    # with open('data.json', "w", encoding='utf-8') as f:
    #     f.write(json_data)
    #
    # f = open("data.json")
    # json_str = f.read()
    # form_recognizer_results = jsonpickle.decode(json_str)

    for page_num, page in enumerate(form_recognizer_results.pages):
        tables_on_page = [table for table in form_recognizer_results.tables if table.bounding_regions[0].page_number == page_num + 1]

        # mark all positions of the table spans in the page
        page_offset = page.spans[0].offset
        page_length = page.spans[0].length
        table_chars = [-1]*page_length
        for table_id, table in enumerate(tables_on_page):
            for span in table.spans:
                # replace all table spans with "table_id" in table_chars array
                for i in range(span.length):
                    idx = span.offset - page_offset + i
                    if idx >=0 and idx < page_length:
                        table_chars[idx] = table_id

        # build page text by replacing characters in table spans with table html
        page_text = ""
        added_tables = set()
        for idx, table_id in enumerate(table_chars):
            if table_id == -1:
                page_text += form_recognizer_results.content[page_offset + idx]
            elif table_id not in added_tables:
                page_text += table_to_html(tables_on_page[table_id])
                added_tables.add(table_id)

        page_text += " "
        page_map.append((page_num, offset, page_text))
        offset += len(page_text)

    return page_map


# Embeddings 생성 함수 정의
tenacity 라이브러리를 사용하여 Embeddings API 콘솔에 retry를 설정하면 Rate limit에 대처할 수 있다.

In [None]:
from tenacity import retry, stop_after_attempt, wait_random_exponential
from openai import AzureOpenAI

client = AzureOpenAI(
  api_key = AZURE_OPENAI_API_KEY,  
  api_version = "2024-02-01",
  azure_endpoint = AZURE_OPENAI_ENDPOINT
)

def before_retry_sleep(retry_state):
    print("Rate limited on the OpenAI embeddings API, sleeping before retrying...")

@retry(wait=wait_random_exponential(min=15, max=60), stop=stop_after_attempt(15), before_sleep=before_retry_sleep)
def compute_embedding(text):
    return client.embeddings.create(input = [text], model=model).data[0].embedding


# 청크 분할

In [None]:
MAX_SECTION_LENGTH = 1000
SENTENCE_SEARCH_LIMIT = 100
SECTION_OVERLAP = 100

def split_text(page_map, filename):
    SENTENCE_ENDINGS = [".", "!", "?"]
    WORDS_BREAKS = [",", ";", ":", " ", "(", ")", "[", "]", "{", "}", "\t", "\n"]
    print(f"Splitting '{filename}' into sections")

    def find_page(offset):
        num_pages = len(page_map)
        for i in range(num_pages - 1):
            if offset >= page_map[i][1] and offset < page_map[i + 1][1]:
                return i
        return num_pages - 1

    all_text = "".join(p[2] for p in page_map)
    length = len(all_text)
    start = 0
    end = length
    while start + SECTION_OVERLAP < length:
        last_word = -1
        end = start + MAX_SECTION_LENGTH

        if end > length:
            end = length
        else:
            # Try to find the end of the sentence
            while end < length and (end - start - MAX_SECTION_LENGTH) < SENTENCE_SEARCH_LIMIT and all_text[end] not in SENTENCE_ENDINGS:
                if all_text[end] in WORDS_BREAKS:
                    last_word = end
                end += 1
            if end < length and all_text[end] not in SENTENCE_ENDINGS and last_word > 0:
                end = last_word # Fall back to at least keeping a whole word
        if end < length:
            end += 1

        # Try to find the start of the sentence or at least a whole word boundary
        last_word = -1
        while start > 0 and start > end - MAX_SECTION_LENGTH - 2 * SENTENCE_SEARCH_LIMIT and all_text[start] not in SENTENCE_ENDINGS:
            if all_text[start] in WORDS_BREAKS:
                last_word = start
            start -= 1
        if all_text[start] not in SENTENCE_ENDINGS and last_word > 0:
            start = last_word
        if start > 0:
            start += 1

        section_text = all_text[start:end]
        yield (section_text, find_page(start))

        last_table_start = section_text.rfind("<table")
        if (last_table_start > 2 * SENTENCE_SEARCH_LIMIT and last_table_start > section_text.rfind("</table")):
            # If the section ends with an unclosed table, we need to start the next section with the table.
            # If table starts inside SENTENCE_SEARCH_LIMIT, we ignore it, as that will cause an infinite loop for tables longer than MAX_SECTION_LENGTH
            # If last table starts inside SECTION_OVERLAP, keep overlapping
            print(f"Section ends with unclosed table, starting next section with the table at page {find_page(start)} offset {start} table start {last_table_start}")
            start = min(end - SECTION_OVERLAP, start + last_table_start)
        else:
            start = end - SECTION_OVERLAP

    if start + SECTION_OVERLAP < end:
        yield (all_text[start:end], find_page(start))


# 인덱스에 등록할 문서 생성

In [None]:
import re
import base64
import json
def filename_to_id(filename):
    filename_ascii = re.sub("[^0-9a-zA-Z_-]", "_", filename)
    filename_hash = base64.b16encode(filename.encode('utf-8')).decode('ascii')
    return f"file-{filename_ascii}-{filename_hash}"

def create_sections(filename, page_map, use_vectors, category):
    file_id = filename_to_id(filename)
    for i, (content, pagenum) in enumerate(split_text(page_map, filename)):
        section = {
            "id": f"{file_id}-page-{i}",
            "content": content,
            "category": category,
            "sourcepage": blob_name_from_file_page(filename, pagenum),
            "sourcefile": filename,
            "metadata": json.dumps({"page": pagenum, "sourcepage": blob_name_from_file_page(filename, pagenum)})
        }
        
        section["embedding"] = compute_embedding(content)
        yield section


# 청크 인덱싱

In [None]:
def index_sections(filename, sections):
    search_client = SearchClient(
        endpoint=search_service_endpoint, index_name=index_name, credential=credential
    )
    i = 0
    batch = []
    for s in sections:
        batch.append(s)
        i += 1
        if i % 1000 == 0:
            results = search_client.upload_documents(documents=batch)
            succeeded = sum([1 for r in results if r.succeeded])
            print(f"\tIndexed {len(results)} sections, {succeeded} succeeded")
            batch = []

    if len(batch) > 0:
        results = search_client.upload_documents(documents=batch)
        succeeded = sum([1 for r in results if r.succeeded])
        print(f"\tIndexed {len(results)} sections, {succeeded} succeeded")

# 지금까지 정의한 작업 실행하기

In [None]:
import glob

print("Create Search Index...")
create_search_index()
print("Processing files...")

path_pattern = "../data/*.pdf"
for filename in glob.glob(path_pattern):
    print(f"Processing '{filename}'")
    try:
        upload_blobs(filename)
        remove_from_index(filename)
        page_map = get_document_text(filename)
        category = os.path.basename(os.path.dirname(filename))
        sections = create_sections(
            os.path.basename(filename), page_map, False, category
        )
        index_sections(os.path.basename(filename), sections)

    except Exception as e:
        print(f"\tGot an error while reading {filename} -> {e} --> skipping file")