In [1]:
import os
import json
import re
import tempfile
from pdf2image import convert_from_path
import img2pdf
import fitz
from PIL import Image
import io
import pandas as pd
import time
from langchain_upstage import UpstageLayoutAnalysisLoader
from rainbow_html_transformer import HTMLToTextWithMarkdownTables

In [2]:
def get_pdf_files(folder_path):
    """
    PDF 파일 목록을 가져옴
    folder_path : PDF 파일이 있는 폴더 경로
    반환 : PDF 파일 목록
    """
    return [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')]

In [3]:
# def load_json(json_path):
#     with open(json_path, 'r', encoding='utf-8') as f:
#         return json.load(f)

# # JSON 파일 경로
# json_path = "document_class.json"

# # JSON 파일을 딕셔너리로 로드
# json_dict = load_json(json_path)

# # 로드된 딕셔너리 출력 (선택사항)
# print(json_dict)
def load_document_classes(json_path):
    """
    문서 유형별 키워드 Json 파일을 읽어옴
    json_path : 문서 유형별 키워드 Json 파일 경로
    사업방법서 → business_method_document
    상품요약서 → product_summary
    약관 → terms_and_conditions
    반환 : 문서 유형별 키워드 딕셔너리
    """
    with open(json_path, 'r', encoding='utf-8') as f:
        print(f"Loading document classes from {json_path}")
        json_dict = json.load(f)
        print("문서 유형 정의\n",json_dict)
        return json_dict

In [4]:
# # pdf_path = "./raw_docs/(무)신한 이율보증형 퇴직적립식보험_사업방법서_231018(1)_P8.pdf"
# pdf_path = "./raw_docs/상품요약서_신한대중교통보장보험(무배당)_20240401_P8.pdf"
# # pdf_path = "./raw_docs/SHL0197_신한케어받는암보험(무배당, 갱신형)(3)_P1482.pdf"
# max_pages = 3
# extracted_text = []

# try:
#     doc = fitz.open(pdf_path)
#     print("doc길이", len(doc))
#     # print("doc[0]", doc[0].get_text())
#     # print("doc[1]", doc[1].get_text())
#     # print("doc[2]", doc[2].get_text())
#     for page_num in range(max_pages):
#         page_content = doc[page_num].get_text()
#         extracted_text.append(page_content)
    
#     doc.close()
# except Exception as e:
#     print(f"Error processing {pdf_path}: {e}")

# print(' '.join(extracted_text))
def extract_text_from_pdf(pdf_path, max_pages=3):
    extracted_text = []
    try:
        doc = fitz.open(pdf_path)
        # print(f"{pdf_path} 페이지 수", len(doc))

        for page_num in range(max_pages):
            page_content = doc[page_num].get_text()
            extracted_text.append(page_content)
        
        doc.close()
    except Exception as e:
        print(f"Error processing {pdf_path}: {e}")

    # print(' '.join(extracted_text))

    return ' '.join(extracted_text)


In [5]:
def classify_document(text_list, document_classes):
    """
    문서 유형 분류
    text : 텍스트
    document_classes : 문서 유형별 키워드
    반환 : 분류된 문서 유형
    """
    for doc_type, keywords in document_classes.items():
        if any(re.search(keyword, text_list, re.IGNORECASE) for keyword in keywords):
            return keywords[0]  # 키워드 리스트의 첫 번째 값 반환
    return "Unknown"

In [6]:
def extract_text_with_ocr(pdf_path, max_pages=3):
    extracted_text = []
    try:
        # PDF를 이미지로 변환
        images = convert_from_path(pdf_path, first_page=1, last_page=max_pages)
        
        for i, img in enumerate(images):
            with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
                img.save(temp_file, format="PNG")
                temp_file_path = temp_file.name
            
            try:
                loader = UpstageLayoutAnalysisLoader(
                    file_path=temp_file_path,
                    use_ocr=True
                )
                documents = loader.load()
                
                html_transformer = HTMLToTextWithMarkdownTables()
                for doc in documents:
                    # HTML 태그 제거
                    transformed_doc = html_transformer.transform_documents([doc])[0]
                    extracted_text.append(transformed_doc.page_content)
            finally:
                os.unlink(temp_file_path)
    
    except Exception as e:
        print(f"Error processing {pdf_path} with OCR: {e}")
    
    return ' '.join(extracted_text)

In [7]:
# 사용 예:
# Layout Analysis API의 최대 페이지 제한인 100페이지 이상은 처리 불가
folder_path = "./raw_docs"

json_path = "document_class.json"
document_classes = load_document_classes(json_path)

print("Getting PDF files...")
pdf_files = get_pdf_files(folder_path)
    
print(f"Found {len(pdf_files)} PDF files to process.")
    
results = []

for i, pdf_file in enumerate(pdf_files, 1):
    print(f"Processing file {i} of {len(pdf_files)}: {pdf_file}")
    pdf_path = os.path.join(folder_path, pdf_file)
        
    text = extract_text_from_pdf(pdf_path)
    doc_type = classify_document(text, document_classes)
    
    if doc_type == "Unknown":
        print(f"Document type unknown. Attempting OCR processing for {pdf_file}")
        ocr_text = extract_text_with_ocr(pdf_path)
        doc_type = classify_document(ocr_text, document_classes)
    
    results.append({"file_name": pdf_file, "document_type": doc_type})
    print(f"file_name: {pdf_file} document_type: {doc_type}\n")

# 결과 출력 또는 저장
for result in results:
    print(f"File: {result['file_name']}, Type: {result['document_type']}")



Loading document classes from document_class.json
문서 유형 정의
 {'business_method_document': ['사업방법서', '사업 내용', '비즈니스 방법'], 'product_summary': ['상품요약서', '상품 설명', '요약 정보'], 'terms_and_conditions': ['약관', '계약 조건', '보험 약관']}
Getting PDF files...
Found 116 PDF files to process.
Processing file 1 of 116: SHL0197_신한케어받는암보험(무배당, 갱신형)(3)_P1482.pdf
Document type unknown. Attempting OCR processing for SHL0197_신한케어받는암보험(무배당, 갱신형)(3)_P1482.pdf
file_name: SHL0197_신한케어받는암보험(무배당, 갱신형)(3)_P1482.pdf document_type: 약관

Processing file 2 of 116: 상품요약서_신한모으고키우는변액적립보험_240401_P26.pdf
file_name: 상품요약서_신한모으고키우는변액적립보험_240401_P26.pdf document_type: 상품요약서

Processing file 3 of 116: 사업방법서_신한(간편가입)든든한실속종신보험(무배당, 보증비용부과형)[해약환급금 일부지급형]_20240401_P19.pdf
file_name: 사업방법서_신한(간편가입)든든한실속종신보험(무배당, 보증비용부과형)[해약환급금 일부지급형]_20240401_P19.pdf document_type: 사업방법서

Processing file 4 of 116: 신한아름다운연금보험(무배당)_상품요약서_240401_v2_P22.pdf
file_name: 신한아름다운연금보험(무배당)_상품요약서_240401_v2_P22.pdf document_type: 상품요약서

Processing file 5 of 116: DB자산관리