In [None]:
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch.client import MlClient

In [None]:
# ES 클라이언트 정의
import os
from dotenv import load_dotenv
from elasticsearch import Elasticsearch

load_dotenv()

url = os.getenv("ELASTIC_CLOUD_URL")
api_id = os.getenv("ELASTIC_API_ID")
api_key = os.getenv("ELASTIC_API_KEY")
es_model_id = os.getenv("ELASTIC_MODEL_ID")


client = Elasticsearch(
    url,
    api_key=(api_id, api_key)
)

print(client.info())

In [None]:
import os
import glob
from typing import List, Dict

import pandas as pd
from PyPDF2 import PdfReader

# 1. PDF > CSV

In [None]:
def extract_text_from_pdf(pdf_path: str) -> str:
    """
    단일 PDF 파일에서 모든 페이지 텍스트를 추출하는 함수.
    """
    reader = PdfReader(pdf_path)
    texts = []

    for page in reader.pages:
        page_text = page.extract_text()
        if page_text:
            texts.append(page_text)

    return "\n".join(texts)


In [None]:

def chunk_text(
    text: str,
    max_chars: int = 1000,
    overlap: int = 200
) -> List[str]:
    """
    긴 텍스트를 max_chars 기준으로 잘라 chunk 리스트를 반환.
    overlap만큼 앞 chunk와 겹치게 슬라이딩 윈도우 형태로 자름.
    """
    text = " ".join(text.split())
    if not text:
        return []

    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        end = start + max_chars
        chunk = text[start:end]
        chunks.append(chunk)

        # 다음 chunk의 시작 인덱스 (overlap 적용)
        start = end - overlap
        if start < 0:
            start = 0
        if start >= text_length:
            break

    return chunks


In [None]:
def build_pdf_chunk_dataframe(
    folder_path: str,
    max_chars: int = 1000,
    overlap: int = 200
) -> pd.DataFrame:
    """
    주어진 폴더에서 모든 PDF를 읽고
    파일별로 텍스트 추출 → chunking → DataFrame 생성.
    컬럼: filename, chunk_seq, chunk_text
    """
    pdf_files = glob.glob(os.path.join(folder_path, "*.pdf"))

    records: List[Dict] = []

    for pdf_path in pdf_files:
        filename = os.path.basename(pdf_path)
        print(f"Processing: {filename}")

        full_text = extract_text_from_pdf(pdf_path)
        chunks = chunk_text(full_text, max_chars=max_chars, overlap=overlap)

        for i, chunk in enumerate(chunks, start=1):
            records.append(
                {
                    "filename": filename,
                    "chunk_seq": i,
                    "chunk_text": chunk,
                }
            )

    df = pd.DataFrame(records, columns=["filename", "chunk_seq", "chunk_text"])
    return df


In [None]:
# 1.1 폴더 내 PDF파일을 읽어서 데이터 프레임으로 저장

folder_path = "D:/workspace/대학원/25년도2학기/정보검색프로젝트/색인데이터"

max_chars = 1000
overlap = 200

df = build_pdf_chunk_dataframe(
    folder_path=folder_path,
    max_chars=max_chars,
    overlap=overlap
    )

print(df.head())

In [None]:
# 1.2 데이터 프레임을 CSV로 저장

df.to_csv("pdf_chunks.csv", index=False, encoding="utf-8-sig")

In [None]:

class_info = pd.read_csv('./pdf_chunks.csv', encoding='utf-8-sig')
class_info.loc[1:2]

# 2. ES 색인 시작

In [None]:
index_name = "class-info"

In [None]:
# 2.1 임베딩을 위한 ingest pipeline 추가 

client.ingest.put_pipeline(
    id="pipeline",
    processors=[
        {
            "inference": {
                "model_id": es_model_id,  # embedding에 활용할 model_id 지정
                "field_map": {"chunk_text": "text_field"},  # embedding 대상 text를 chunk_text 필드로 지정
                "target_field": "chunk_embedding", # embedding 결과를 chunk_embedding 필드에 저장
            }
        }
    ],
)

In [None]:
# 2.2 색인을 위한 analyzer, mapping 구성, pipeline 연결 

index_body = {
    "settings": {
        "index.mapping.exclude_source_vectors": False, 
        "analysis": {
            "tokenizer": {
                "nori_tokenizer_custom": {
                    "type": "nori_tokenizer",
                    "decompound_mode": "mixed"
                }
            },
            "analyzer": {
                "korean_nori": {
                    "type": "custom",
                    "tokenizer": "nori_tokenizer_custom",
                    "filter": [
                        "lowercase"
                    ]
                }
            }
        },
        "index": {
        "number_of_replicas": "1",
        "number_of_shards": "1",
        "default_pipeline": "pipeline",
        }
    },
    "mappings": {
        "properties": {
            "filename": {
                "type": "keyword"
            },
            "chunk_seq": {
                "type": "integer"
            },
            "chunk_text": {
                "type": "text",
                "analyzer": "korean_nori"
            },
            "chunk_embedding.predicted_value": {
                "type": "dense_vector",
                "dims": 768,
                "index": True,
                "similarity": "cosine"
            }
        }
    }
}

In [None]:
# if client.indices.exists(index=index_name):
#     client.indices.delete(index=index_name)
#     print(f"Deleted existing index: {index_name}")

In [None]:
# 2.3 인덱스 생성
response = client.indices.create(
    index=index_name,
    body=index_body
)

In [None]:
# 2.4 데이터 색인

indexed_results = []

for index, row in class_info.iterrows():
    doc_source = {
        "filename": row['filename'] if pd.notna(row['filename']) else None,
        "chunk_seq": row['chunk_seq'] if pd.notna(row['chunk_seq']) else None,
        "chunk_text": row['chunk_text'] if pd.notna(row['chunk_text']) else None
    }
    
    try:
        response = client.index(index="class-info", document=doc_source, id=str(index))
        indexed_results.append({'index': index, 'status': 'success', 'response': response})
        if index % 30 == 0:
            print(f"Successfully indexed document with index {index}")
    except Exception as e:
        indexed_results.append({'index': index, 'status': 'failed', 'error': str(e)})
        print(f"Failed to index document with index {index}: {e}")
