In [1]:
# huggingface-cli login

In [None]:
# 문서 처리
! pip install -U docling

! pip install qwen-vl-utils
! pip install accelerate
! pip install packaging ninja
! pip install flash-attn==2.6.3 --no-build-isolation
! pip install "transformers==4.51.3"

# 나머지는 동일
! pip install nvidia-ml-py3
! pip install -q FlagEmbedding
! pip install selenium webdriver-manager requests tqdm beautifulsoup4 lxml
! pip install kiwipiepy

In [10]:
# 필수 import들
import torch
import transformers
import gc
import psutil
import time
import json
import os
import re
import xml.etree.ElementTree as ET
from xml.dom import minidom
from contextlib import contextmanager
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor,AutoModelForCausalLM
from qwen_vl_utils import process_vision_info
import huggingface_hub
from PIL import Image
from datetime import datetime
from sentence_transformers import SentenceTransformer
from pathlib import Path
import logging
import pynvml

# 버전 확인
print("=== 현재 설치된 버전 확인 ===")
print(f"PyTorch 버전: {torch.__version__}")
print(f"Transformers 버전: {transformers.__version__}")
print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA 버전: {torch.version.cuda}")
    print(f"사용 가능한 GPU 개수: {torch.cuda.device_count()}")

try:
    import accelerate
    print(f"Accelerate 버전: {accelerate.__version__}")
except ImportError:
    print("Accelerate 패키지가 설치되지 않음")

try:
    import flash_attn
    print(f"Flash Attention 버전: {flash_attn.__version__}")
except ImportError:
    print("Flash Attention 패키지가 설치되지 않음")

=== 현재 설치된 버전 확인 ===
PyTorch 버전: 2.4.1+cu124
Transformers 버전: 4.51.3
CUDA 사용 가능: True
CUDA 버전: 12.4
사용 가능한 GPU 개수: 1
Accelerate 버전: 1.8.1
Flash Attention 버전: 2.6.3


# 1. Crwaling Data
## URL, PDF_URL, 출판일
## PDF_URL 다운로드 받아 ex_pdf_file에 저장.
## pdf 파일명: 년도_제목.pdf

In [None]:
# 시스템 업데이트
apt update

# Chrome 브라우저 설치
wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | apt-key add -
echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google-chrome.list
apt update
apt install -y google-chrome-stable
apt install -y wget curl unzip xvfb

In [None]:
# Chrome 설치 확인
google-chrome --version

# Python 패키지 확인
python -c "import selenium; print('Selenium 설치됨')"
python -c "import webdriver_manager; print('WebDriver Manager 설치됨')"

In [1]:
import os
import platform
import time
import requests
import logging
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urljoin
from tqdm import tqdm
import re

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [2]:


class BOKNewsScraper:
    def __init__(self, save_dir='./data/ex_pdf_file', headless=True):
        self.BASE_URL = 'https://www.bok.or.kr/portal/singl/newsData/list.do?pageIndex={page}&targetDepth=3&menuNo=200081&syncMenuChekKey=1&depthSubMain=&subMainAt=&searchCnd=1&searchKwd=&depth2=200080&depth3=200081&date=&sdate=&edate=&sort=1&pageUnit=10'
        self.DETAIL_BASE = 'https://www.bok.or.kr'
        self.save_dir = save_dir
        self.driver = None
        self.headless = headless
        
        # 저장 디렉토리 생성
        os.makedirs(self.save_dir, exist_ok=True)
        logger.info(f"저장 디렉토리 생성: {self.save_dir}")
    
    def setup_driver(self):
        """RunPod 환경에 최적화된 Chrome 드라이버 설정"""
        try:
            options = Options()
            
            # RunPod 환경을 위한 Chrome 옵션들
            if self.headless:
                options.add_argument("--headless")
            options.add_argument("--no-sandbox")
            options.add_argument("--disable-dev-shm-usage")
            options.add_argument("--disable-gpu")
            options.add_argument("--disable-web-security")
            options.add_argument("--disable-features=VizDisplayCompositor")
            options.add_argument("--window-size=1920,1080")
            options.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
            
            # ChromeDriverManager를 사용해 자동으로 드라이버 관리
            service = Service(ChromeDriverManager().install())
            self.driver = webdriver.Chrome(service=service, options=options)
            
            # 페이지 로드 타임아웃 설정
            self.driver.set_page_load_timeout(30)
            self.driver.implicitly_wait(10)
            
            logger.info("Chrome 드라이버 설정 완료")
            return True
            
        except Exception as e:
            logger.error(f"드라이버 설정 실패: {e}")
            return False
    
    def get_news_list(self, page=1, max_retries=3):
        """뉴스 리스트 가져오기"""
        url = self.BASE_URL.format(page=page)
        logger.info(f"페이지 {page} 뉴스 리스트 가져오는 중: {url}")
        
        for attempt in range(max_retries):
            try:
                self.driver.get(url)
                
                # 페이지 로딩 대기
                WebDriverWait(self.driver, 15).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "div.bd-line"))
                )
                
                # 추가 대기 시간
                time.sleep(2)
                
                bd_line_div = self.driver.find_element(By.CSS_SELECTOR, "div.bd-line")
                news_items = bd_line_div.find_elements(By.CSS_SELECTOR, "li.bbsRowCls")
                
                if not news_items:
                    logger.warning(f"페이지 {page}에서 뉴스 항목을 찾을 수 없음")
                    return []
                
                results = []
                logger.info(f"총 {len(news_items)}개의 뉴스 항목 발견")
                
                for idx, item in enumerate(news_items):
                    try:
                        # 링크 추출
                        set_div = item.find_element(By.CSS_SELECTOR, "div.set")
                        a_tag = set_div.find_element(By.TAG_NAME, "a")
                        href = a_tag.get_attribute('href')
                        detail_url = urljoin(self.DETAIL_BASE, href)
                        
                        # 제목 추출
                        title_text = a_tag.text.strip()
                        
                        # 날짜 추출
                        try:
                            date_span = item.find_element(By.CSS_SELECTOR, "span.date")
                            date_text = date_span.text.strip()
                        except NoSuchElementException:
                            date_text = "날짜없음"
                        
                        results.append({
                            'title': title_text,
                            'detail_url': detail_url,
                            'date': date_text,
                        })
                        
                        logger.info(f"  {idx + 1}. {title_text[:50]}... ({date_text})")
                        
                    except Exception as e:
                        logger.error(f"뉴스 항목 {idx + 1} 처리 실패: {e}")
                        continue
                
                return results
                
            except TimeoutException:
                logger.warning(f"페이지 로딩 타임아웃 (시도 {attempt + 1}/{max_retries})")
                if attempt < max_retries - 1:
                    time.sleep(5)
                    continue
                else:
                    logger.error("페이지 로딩 최대 재시도 횟수 초과")
                    return []
            except Exception as e:
                logger.error(f"뉴스 리스트 가져오기 실패 (시도 {attempt + 1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(5)
                    continue
                else:
                    return []
    
    def get_pdf_links(self, detail_url, max_retries=3):
        """상세 페이지에서 PDF 링크만 추출"""
        for attempt in range(max_retries):
            try:
                logger.info(f"PDF 링크 추출 중: {detail_url}")
                self.driver.get(detail_url)
                time.sleep(3)  # 페이지 로딩 대기
                
                # 모든 파일 링크 찾기
                file_elements = self.driver.find_elements(By.XPATH, "//a[starts-with(@href, '/fileSrc/')]")
                
                if not file_elements:
                    logger.info("첨부 파일을 찾을 수 없음")
                    return []
                
                pdf_links = []
                for file_element in file_elements:
                    try:
                        file_href = file_element.get_attribute("href")
                        file_title = file_element.get_attribute("title") or file_element.text.strip()
                        
                        if file_title:
                            # 파일 확장자 확인
                            file_ext = self.get_file_extension_from_url(file_href) or self.get_file_extension_from_title(file_title)
                            
                            # PDF 파일만 필터링
                            if file_ext and file_ext.lower() == 'pdf':
                                pdf_links.append({
                                    'title': file_title,
                                    'file_url': urljoin(self.DETAIL_BASE, file_href),
                                    'file_type': 'pdf'
                                })
                                logger.info(f"  PDF 파일 발견: {file_title}")
                            else:
                                logger.info(f"  PDF가 아닌 파일은 건너뜀: {file_title} ({file_ext})")
                    except Exception as e:
                        logger.error(f"파일 링크 처리 실패: {e}")
                        continue
                
                if pdf_links:
                    logger.info(f"총 {len(pdf_links)}개의 PDF 파일 발견")
                else:
                    logger.info("PDF 파일을 찾을 수 없음")
                
                return pdf_links
                
            except Exception as e:
                logger.error(f"PDF 링크 추출 실패 (시도 {attempt + 1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(5)
                    continue
                else:
                    return []
    
    def get_file_extension_from_url(self, url):
        """URL에서 파일 확장자 추출"""
        try:
            # URL에서 파일명 부분 추출
            filename = url.split('/')[-1]
            if '.' in filename:
                return filename.split('.')[-1]
        except:
            pass
        return None
    
    def get_file_extension_from_title(self, title):
        """제목에서 파일 확장자 추출"""
        try:
            if '.' in title:
                return title.split('.')[-1]
        except:
            pass
        return None
    
    def download_pdf(self, file_url, save_path, max_retries=3):
        """PDF 파일 다운로드"""
        if os.path.exists(save_path):
            logger.info(f"이미 존재함, 건너뜀: {os.path.basename(save_path)}")
            return True
        
        for attempt in range(max_retries):
            try:
                logger.info(f"PDF 다운로드 중: {os.path.basename(save_path)}")
                
                headers = {
                    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
                }
                
                response = requests.get(file_url, headers=headers, timeout=30)
                response.raise_for_status()
                
                # PDF 파일인지 확인 (Content-Type 체크)
                content_type = response.headers.get('content-type', '').lower()
                if 'pdf' not in content_type and not save_path.lower().endswith('.pdf'):
                    logger.warning(f"PDF가 아닌 파일로 보임: {content_type}")
                
                with open(save_path, 'wb') as f:
                    f.write(response.content)
                
                file_size = os.path.getsize(save_path)
                logger.info(f"PDF 다운로드 완료: {os.path.basename(save_path)} ({file_size:,} bytes)")
                return True
                
            except Exception as e:
                logger.error(f"PDF 다운로드 실패 (시도 {attempt + 1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(5)
                    continue
                else:
                    return False
    
    def sanitize_filename(self, filename, max_length=100):
        """파일명 정리"""
        # 불법 문자 제거
        filename = re.sub(r'[\\/*?:"<>|]', "_", filename.strip())
        # 연속 공백을 하나로
        filename = re.sub(r'\s+', '_', filename)
        # 콜론을 대시로
        filename = filename.replace(":", "-")
        # 길이 제한
        if len(filename) > max_length:
            filename = filename[:max_length]
        return filename
    
    def run_scraper(self, start_page=1, end_page=1):
        """메인 스크래핑 실행 (PDF 전용)"""
        if not self.setup_driver():
            logger.error("드라이버 설정 실패로 종료")
            return
        
        try:
            all_results = []
            
            for page in range(start_page, end_page + 1):
                logger.info(f"\n{'='*50}")
                logger.info(f"페이지 {page} 처리 시작")
                logger.info(f"{'='*50}")
                
                news_list = self.get_news_list(page)
                if not news_list:
                    logger.warning(f"페이지 {page}에서 뉴스를 찾을 수 없음")
                    continue
                
                # 각 뉴스 항목 처리
                for i, news in enumerate(tqdm(news_list, desc=f"페이지 {page} 처리")):
                    logger.info(f"\n[{i+1}/{len(news_list)}] 처리 중: {news['title'][:50]}...")
                    
                    # PDF 링크만 추출
                    pdf_links = self.get_pdf_links(news['detail_url'])
                    news['pdf_files'] = pdf_links
                    
                    if not pdf_links:
                        logger.info("이 뉴스에는 PDF 파일이 없음")
                        all_results.append(news)
                        continue
                    
                    # PDF 파일 다운로드
                    for j, pdf_info in enumerate(pdf_links):
                        file_title = pdf_info['title']
                        file_url = pdf_info['file_url']
                        
                        # 파일명 생성
                        safe_date = self.sanitize_filename(news['date']) if news['date'] else 'no_date'
                        safe_title = self.sanitize_filename(file_title, 50)
                        
                        # PDF 확장자 확인 및 추가
                        if not safe_title.lower().endswith('.pdf'):
                            safe_title += '.pdf'
                        
                        filename = f"{safe_date}_{safe_title}"
                        save_path = os.path.join(self.save_dir, filename)
                        
                        # PDF 다운로드
                        success = self.download_pdf(file_url, save_path)
                        pdf_info['download_success'] = success
                        pdf_info['local_path'] = save_path if success else None
                    
                    all_results.append(news)
                    time.sleep(1)  # 서버 부하 방지
            
            # 결과 요약 출력
            self.print_summary(all_results)
            return all_results
            
        except KeyboardInterrupt:
            logger.info("\n사용자에 의해 중단됨")
        except Exception as e:
            logger.error(f"스크래핑 중 오류 발생: {e}")
        finally:
            self.cleanup()
    
    def print_summary(self, results):
        """결과 요약 출력 (PDF 전용)"""
        logger.info(f"\n{'='*60}")
        logger.info("PDF 다운로드 결과 요약")
        logger.info(f"{'='*60}")
        
        total_news = len(results)
        total_pdfs = sum(len(news.get('pdf_files', [])) for news in results)
        successful_downloads = sum(
            sum(1 for pdf_info in news.get('pdf_files', []) if pdf_info.get('download_success', False))
            for news in results
        )
        
        logger.info(f"총 뉴스 항목: {total_news}")
        logger.info(f"총 PDF 파일: {total_pdfs}")
        logger.info(f"성공한 다운로드: {successful_downloads}")
        logger.info(f"실패한 다운로드: {total_pdfs - successful_downloads}")
        
        # 저장된 PDF 파일 목록
        saved_pdfs = [f for f in os.listdir(self.save_dir) if f.endswith('.pdf')]
        logger.info(f"저장된 PDF 파일 수: {len(saved_pdfs)}")
        
        if saved_pdfs:
            logger.info("\n저장된 PDF 파일 목록:")
            for pdf_file in saved_pdfs:
                file_path = os.path.join(self.save_dir, pdf_file)
                file_size = os.path.getsize(file_path)
                logger.info(f"  - {pdf_file} ({file_size:,} bytes)")
        
        # PDF가 없는 뉴스 항목 통계
        news_without_pdf = sum(1 for news in results if not news.get('pdf_files'))
        if news_without_pdf > 0:
            logger.info(f"\nPDF가 없는 뉴스 항목: {news_without_pdf}개")
    
    def cleanup(self):
        """리소스 정리"""
        if self.driver:
            self.driver.quit()
            logger.info("드라이버 종료 완료")

# 사용 예시
def main():
    """메인 실행 함수"""
    # PDF 전용 스크래퍼 인스턴스 생성
    scraper = BOKNewsScraper(
        save_dir='./data/ex_pdf',  # 저장 경로
        headless=True  # GUI 없이 실행 (RunPod에서는 True 권장)
    )
    
    try:
        # 1페이지만 스크래핑 (테스트용)
        results = scraper.run_scraper(start_page=1, end_page=1)
        
        # 여러 페이지를 원할 경우:
        # results = scraper.run_scraper(start_page=1, end_page=3)
        
        return results
        
    except Exception as e:
        logger.error(f"메인 실행 중 오류: {e}")
        return None

if __name__ == "__main__":
    main()

2025-06-23 06:22:32,271 - INFO - 저장 디렉토리 생성: ./data/ex_pdf
2025-06-23 06:22:32,329 - INFO - Get LATEST chromedriver version for google-chrome
2025-06-23 06:22:32,389 - INFO - Get LATEST chromedriver version for google-chrome
2025-06-23 06:22:32,452 - INFO - There is no [linux64] chromedriver "137.0.7151.119" for browser google-chrome "137.0.7151" in cache
2025-06-23 06:22:32,453 - INFO - Get LATEST chromedriver version for google-chrome
2025-06-23 06:22:32,613 - INFO - WebDriver version 137.0.7151.119 selected
2025-06-23 06:22:32,615 - INFO - Modern chrome version https://storage.googleapis.com/chrome-for-testing-public/137.0.7151.119/linux64/chromedriver-linux64.zip
2025-06-23 06:22:32,615 - INFO - About to download new driver from https://storage.googleapis.com/chrome-for-testing-public/137.0.7151.119/linux64/chromedriver-linux64.zip
2025-06-23 06:22:32,700 - INFO - Driver downloading response is 200
2025-06-23 06:22:32,887 - INFO - Get LATEST chromedriver version for google-chrome
2

# 2. Extracted Data
#### pdf(raw data) -> Docling -> Table, Text, Image(graph, chart etc..)


#### 1) ex_images(이미지만 모음.)
#### 2) ex_text (텍스트 마크다운 형태)
#### 3_ ex_table (테이블 마크다운 형태태)

## 텍스트

In [3]:
# JSON 모듈 가져오기 - 데이터 직렬화 및 역직렬화에 사용
import json
# 로깅 모듈 가져오기 - 디버깅 및 정보 기록에 사용
import logging
# 시간 측정 모듈 가져오기 - 실행 시간 측정에 사용
import time
# 파일 경로 처리 모듈 가져오기 - 파일 및 디렉토리 경로 관리에 사용
from pathlib import Path
# 로거 인스턴스 생성 - 현재 모듈의 로깅을 위해 사용
import os
_log = logging.getLogger(__name__)

In [4]:
# 기본 데이터 모델 가져오기 - 입력 형식 정의에 사용
from docling.datamodel.base_models import InputFormat
# PDF 파이프라인 옵션 가져오기 - PDF 처리 설정에 사용
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions,
)
# 문서 변환기 및 PDF 형식 옵션 가져오기 - 문서 변환에 사용
from docling.document_converter import DocumentConverter, PdfFormatOption

# PDF 파이프라인 옵션 설정
pipeline_options = PdfPipelineOptions()

pipeline_options.do_ocr = False  # OCR 기능 비활성화 (이미 텍스트가 있는 PDF 사용)
pipeline_options.ocr_options.lang = ["ko"]  # OCR 언어를 한국어로 설정 (OCR 사용 시)

pipeline_options.do_table_structure = True  # 표 구조 인식 활성화 --> docling의 tableformer 활용해 표 상세 사항 파악
pipeline_options.table_structure_options.do_cell_matching = True  # 표 셀 매칭 활성화

# 문서 변환기 생성 및 PDF 형식 옵션 설정
doc_converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
    }
)

In [5]:
import os
import time
import json
from pathlib import Path

# 입력 및 출력 디렉토리 설정
input_dir = Path("./data/ex_pdf")
output_base_dir = Path("./data/ex_text")
output_base_dir.mkdir(parents=True, exist_ok=True)

# PDF 파일 리스트 가져오기
pdf_files = [f for f in os.listdir(input_dir) if f.endswith(".pdf")]

if not pdf_files:
    raise FileNotFoundError("입력 디렉토리에 PDF 파일이 없습니다.")

# 각 PDF 파일 처리
for pdf_file in pdf_files:
    input_doc_path = input_dir / pdf_file

    try:
        print(f"🔄 변환 시작: {pdf_file}")
        start_time = time.time()

        # 문서 변환 실행
        conv_result = doc_converter.convert(str(input_doc_path))

        end_time = time.time() - start_time
        print(f"✅ 변환 완료: {pdf_file} ({end_time:.2f}초)")

        # 파일 이름(확장자 제외)
        doc_filename = conv_result.input.file.stem

        # PDF별 출력 디렉토리 생성
        pdf_output_dir = output_base_dir / doc_filename
        pdf_output_dir.mkdir(parents=True, exist_ok=True)


        # 마크다운 저장
        with (pdf_output_dir / f"{doc_filename}.md").open("w", encoding="utf-8") as fp:
            fp.write(conv_result.document.export_to_markdown())

    except Exception as e:
        print(f"❌ 변환 실패: {pdf_file} - {e}")


2025-06-23 06:24:40,983 - INFO - Going to convert document batch...
2025-06-23 06:24:40,983 - INFO - Initializing pipeline for StandardPdfPipeline with options hash ab7aa2351bda7a3639289f49ddf570b8
2025-06-23 06:24:40,987 - INFO - Loading plugin 'docling_defaults'
2025-06-23 06:24:40,987 - INFO - Registered ocr engines: ['easyocr', 'ocrmac', 'rapidocr', 'tesserocr', 'tesseract']
2025-06-23 06:24:40,988 - INFO - Accelerator device: 'cuda:0'


🔄 변환 시작: 등록일_2025.06.19_(현지정보_250618)_2025_6월_FOMC_시장반응_f.pdf


2025-06-23 06:24:47,791 - INFO - Accelerator device: 'cuda:0'
2025-06-23 06:24:48,050 - INFO - Loading plugin 'docling_defaults'
2025-06-23 06:24:48,051 - INFO - Registered picture descriptions: ['vlm', 'api']
2025-06-23 06:24:48,051 - INFO - Processing document 등록일_2025.06.19_(현지정보_250618)_2025_6월_FOMC_시장반응_f.pdf
2025-06-23 06:24:50,178 - INFO - Finished converting document 등록일_2025.06.19_(현지정보_250618)_2025_6월_FOMC_시장반응_f.pdf in 9.20 sec.
2025-06-23 06:24:50,191 - INFO - Going to convert document batch...
2025-06-23 06:24:50,192 - INFO - Processing document 등록일_2025.06.12_[현지정보]_美_2025.5월_소비자물가_동향_및_금융시장_반응_F.pdf


✅ 변환 완료: 등록일_2025.06.19_(현지정보_250618)_2025_6월_FOMC_시장반응_f.pdf (9.20초)
🔄 변환 시작: 등록일_2025.06.12_[현지정보]_美_2025.5월_소비자물가_동향_및_금융시장_반응_F.pdf


2025-06-23 06:24:51,390 - INFO - Finished converting document 등록일_2025.06.12_[현지정보]_美_2025.5월_소비자물가_동향_및_금융시장_반응_F.pdf in 1.20 sec.
2025-06-23 06:24:51,400 - INFO - Going to convert document batch...
2025-06-23 06:24:51,400 - INFO - Processing document 등록일_2025.06.09_(현지정보)_美_2025.5월_고용지표_내용_및_뉴욕_금융시장_반응_f.pdf


✅ 변환 완료: 등록일_2025.06.12_[현지정보]_美_2025.5월_소비자물가_동향_및_금융시장_반응_F.pdf (1.20초)
🔄 변환 시작: 등록일_2025.06.09_(현지정보)_美_2025.5월_고용지표_내용_및_뉴욕_금융시장_반응_f.pdf


2025-06-23 06:24:52,285 - INFO - Finished converting document 등록일_2025.06.09_(현지정보)_美_2025.5월_고용지표_내용_및_뉴욕_금융시장_반응_f.pdf in 0.89 sec.
2025-06-23 06:24:52,294 - INFO - Going to convert document batch...
2025-06-23 06:24:52,295 - INFO - Processing document 등록일_2025.06.04_[현지정보]_25년_6월_캐나다_중앙은행_정책회의_결과_및_시장_반응.pdf


✅ 변환 완료: 등록일_2025.06.09_(현지정보)_美_2025.5월_고용지표_내용_및_뉴욕_금융시장_반응_f.pdf (0.89초)
🔄 변환 시작: 등록일_2025.06.04_[현지정보]_25년_6월_캐나다_중앙은행_정책회의_결과_및_시장_반응.pdf


2025-06-23 06:24:53,555 - INFO - Finished converting document 등록일_2025.06.04_[현지정보]_25년_6월_캐나다_중앙은행_정책회의_결과_및_시장_반응.pdf in 1.26 sec.
2025-06-23 06:24:53,562 - INFO - Going to convert document batch...
2025-06-23 06:24:53,563 - INFO - Processing document 등록일_2025.05.20_[현지정보]_미국_신용등급_하향_조정에_대한_시장참가자_평가.pdf


✅ 변환 완료: 등록일_2025.06.04_[현지정보]_25년_6월_캐나다_중앙은행_정책회의_결과_및_시장_반응.pdf (1.27초)
🔄 변환 시작: 등록일_2025.05.20_[현지정보]_미국_신용등급_하향_조정에_대한_시장참가자_평가.pdf


2025-06-23 06:24:54,633 - INFO - Finished converting document 등록일_2025.05.20_[현지정보]_미국_신용등급_하향_조정에_대한_시장참가자_평가.pdf in 1.07 sec.
2025-06-23 06:24:54,644 - INFO - Going to convert document batch...
2025-06-23 06:24:54,644 - INFO - Processing document 등록일_2025.05.19_(현지정보_20250516)_Moody’s社,_미국_신용등급_하향조정_f.pdf


✅ 변환 완료: 등록일_2025.05.20_[현지정보]_미국_신용등급_하향_조정에_대한_시장참가자_평가.pdf (1.08초)
🔄 변환 시작: 등록일_2025.05.19_(현지정보_20250516)_Moody’s社,_미국_신용등급_하향조정_f.pdf


2025-06-23 06:24:54,957 - INFO - Finished converting document 등록일_2025.05.19_(현지정보_20250516)_Moody’s社,_미국_신용등급_하향조정_f.pdf in 0.31 sec.
2025-06-23 06:24:54,966 - INFO - Going to convert document batch...
2025-06-23 06:24:54,966 - INFO - Processing document 등록일_2025.05.14_[현지정보]_美_2025.4월_소비자물가_동향_및_금융시장_반응.pdf


✅ 변환 완료: 등록일_2025.05.19_(현지정보_20250516)_Moody’s社,_미국_신용등급_하향조정_f.pdf (0.32초)
🔄 변환 시작: 등록일_2025.05.14_[현지정보]_美_2025.4월_소비자물가_동향_및_금융시장_반응.pdf


2025-06-23 06:24:55,981 - INFO - Finished converting document 등록일_2025.05.14_[현지정보]_美_2025.4월_소비자물가_동향_및_금융시장_반응.pdf in 1.02 sec.
2025-06-23 06:24:55,987 - INFO - Going to convert document batch...
2025-06-23 06:24:55,987 - INFO - Processing document 등록일_2025.05.08_(현지정보_250507)_2025_5월_FOMC_시장반응_f.pdf


✅ 변환 완료: 등록일_2025.05.14_[현지정보]_美_2025.4월_소비자물가_동향_및_금융시장_반응.pdf (1.02초)
🔄 변환 시작: 등록일_2025.05.08_(현지정보_250507)_2025_5월_FOMC_시장반응_f.pdf


2025-06-23 06:24:56,815 - INFO - Finished converting document 등록일_2025.05.08_(현지정보_250507)_2025_5월_FOMC_시장반응_f.pdf in 0.83 sec.
2025-06-23 06:24:56,823 - INFO - Going to convert document batch...
2025-06-23 06:24:56,824 - INFO - Processing document 등록일_2025.05.07_최근(2025.4월)의_미국경제_상황과_평가.pdf


✅ 변환 완료: 등록일_2025.05.08_(현지정보_250507)_2025_5월_FOMC_시장반응_f.pdf (0.83초)
🔄 변환 시작: 등록일_2025.05.07_최근(2025.4월)의_미국경제_상황과_평가.pdf


2025-06-23 06:25:01,390 - INFO - Finished converting document 등록일_2025.05.07_최근(2025.4월)의_미국경제_상황과_평가.pdf in 4.57 sec.


✅ 변환 완료: 등록일_2025.05.07_최근(2025.4월)의_미국경제_상황과_평가.pdf (4.57초)


## 테이블 추출

In [6]:
# JSON 모듈 가져오기 - 데이터 직렬화 및 역직렬화에 사용
import json
# 로깅 모듈 가져오기 - 디버깅 및 정보 기록에 사용
import logging
# 시간 측정 모듈 가져오기 - 실행 시간 측정에 사용
import time
# 파일 경로 처리 모듈 가져오기 - 파일 및 디렉토리 경로 관리에 사용
from pathlib import Path
# 로거 인스턴스 생성 - 현재 모듈의 로깅을 위해 사용
import os
import pandas as pd

_log = logging.getLogger(__name__)


In [None]:
def main():
    logging.basicConfig(level=logging.INFO)

    input_doc_path = Path("./data/ex_pdf")
    output_dir = Path("./data/ex_table")

    # PDF 파일 리스트 가져오기
    pdf_files = [f for f in os.listdir(input_doc_path) if f.endswith(".pdf")]

    doc_converter = DocumentConverter()

    start_time = time.time()

    # 각 PDF 파일을 개별적으로 변환
    for pdf_file in pdf_files:
        full_pdf_path = input_doc_path / pdf_file  # 파일 경로 조합
        doc_filename = Path(pdf_file).stem # 파일 이름 가져오기 (확장자 제외)

        # 파일 이름으로 디렉토리 생성
        file_output_dir = output_dir / doc_filename
        file_output_dir.mkdir(parents=True, exist_ok=True)

        conv_res = doc_converter.convert(full_pdf_path)  # 개별 파일 전달


        # Export tables
        for table_ix, table in enumerate(conv_res.document.tables):
            table_df: pd.DataFrame = table.export_to_dataframe()
            # 파일 이름으로 구분하여 출력
            print(f"## {doc_filename} - Table {table_ix}")
            print(table_df.to_markdown())

            # Save the table as md in the dedicated directory
            element_md_filename = file_output_dir / f"{doc_filename}-table-{table_ix + 1}.md"
            _log.info(f"Saving md table to {element_md_filename}")
            with element_md_filename.open("w", encoding="utf-8") as fp:
                fp.write(table.export_to_html(doc=conv_res.document))

    end_time = time.time() - start_time

    _log.info(f"Documents converted and tables exported in {end_time:.2f} seconds.")

main()

## 이미지 추출

In [8]:
import logging
import time
from pathlib import Path

In [9]:
from docling_core.types.doc import ImageRefMode, PictureItem, TableItem

In [10]:
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption

In [11]:
_log = logging.getLogger(__name__)

In [12]:
IMAGE_RESOLUTION_SCALE = 2.0

In [13]:
# 이미지 해상도 스케일 설정 (예: 2 = 144 DPI)
IMAGE_RESOLUTION_SCALE = 2

def main():
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)

    input_dir = Path("./data/ex_pdf")
    output_base_dir = Path("./data/images")
    output_base_dir.mkdir(parents=True, exist_ok=True)

    pdf_files = [f for f in input_dir.iterdir() if f.suffix.lower() == ".pdf"]

    if not pdf_files:
        _log.warning("No PDF files found.")
        return

    # 변환 파이프라인 설정
    pipeline_options = PdfPipelineOptions()
    pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
    pipeline_options.generate_page_images = True
    pipeline_options.generate_picture_images = True

    doc_converter = DocumentConverter(
        format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    for pdf_file in pdf_files:
        try:
            _log.info(f"Processing: {pdf_file.name}")
            start_time = time.time()

            conv_res = doc_converter.convert(str(pdf_file))
            doc_filename = pdf_file.stem

            # PDF별 디렉토리 생성
            pdf_output_dir = output_base_dir / doc_filename
            pdf_output_dir.mkdir(parents=True, exist_ok=True)

            # 표 및 이미지 저장
            table_counter = 0
            picture_counter = 0
            for element, _ in conv_res.document.iterate_items():
                if isinstance(element, TableItem):
                    table_counter += 1
                    img_path = pdf_output_dir / f"{doc_filename}-table-{table_counter}.png"
                    with img_path.open("wb") as fp:
                        element.get_image(conv_res.document).save(fp, "PNG")

                if isinstance(element, PictureItem):
                    picture_counter += 1
                    img_path = pdf_output_dir / f"{doc_filename}-picture-{picture_counter}.png"
                    with img_path.open("wb") as fp:
                        element.get_image(conv_res.document).save(fp, "PNG")
            elapsed = time.time() - start_time
            _log.info(f"{pdf_file.name} processed in {elapsed:.2f} seconds.")

        except Exception as e:
            _log.error(f"❌ Failed to process {pdf_file.name}: {e}")


In [None]:
if __name__ == "__main__":
    main()

# 3. VLM을 이용해서 각 Image에 대한 요약 생성
## qwen 2.5 VL 7B-awq사용

# VLM 모델 로딩 디버깅을 위한 코드

In [15]:
# VLM 모델 로딩 디버깅을 위한 개선된 코드

import traceback
import sys
import torch
from pathlib import Path

def debug_load_model():
    """VLM 모델 로딩 디버깅 함수"""
    print("🔍 VLM 모델 로딩 디버깅 시작")
    print("="*60)
    
    try:
        # 1. 기본 환경 확인
        print("1️⃣ 기본 환경 확인:")
        print(f"   🐍 Python 버전: {sys.version}")
        print(f"   🔥 PyTorch 버전: {torch.__version__}")
        print(f"   🖥️ CUDA 사용 가능: {torch.cuda.is_available()}")
        if torch.cuda.is_available():
            print(f"   🎯 CUDA 버전: {torch.version.cuda}")
            print(f"   🔢 GPU 개수: {torch.cuda.device_count()}")
            print(f"   📛 GPU 이름: {torch.cuda.get_device_name(0)}")
        print()
        
        # 2. 필요한 라이브러리 임포트 테스트
        print("2️⃣ 필요한 라이브러리 임포트 테스트:")
        
        try:
            from transformers import AutoProcessor, AutoModelForVision2Seq
            print("   ✅ transformers 임포트 성공")
        except ImportError as e:
            print(f"   ❌ transformers 임포트 실패: {e}")
            return None, None
        
        try:
            from PIL import Image
            print("   ✅ PIL 임포트 성공")
        except ImportError as e:
            print(f"   ❌ PIL 임포트 실패: {e}")
            return None, None
            
        print()
        
        # 3. 모델 이름 확인
        model_name = "Qwen/Qwen2.5-VL-7B-Instruct"  # 또는 사용하려는 모델명
        print(f"3️⃣ 로딩할 모델: {model_name}")
        print()
        
        # 4. 프로세서 로딩 테스트
        print("4️⃣ 프로세서 로딩 중...")
        try:
            processor = AutoProcessor.from_pretrained(
                model_name,
                trust_remote_code=True,
                torch_dtype=torch.float16
            )
            print("   ✅ 프로세서 로딩 성공")
        except Exception as e:
            print(f"   ❌ 프로세서 로딩 실패: {e}")
            print(f"   📋 상세 오류:")
            traceback.print_exc()
            return None, None
        
        print()
        
        # 5. 모델 로딩 테스트
        print("5️⃣ 모델 로딩 중...")
        try:
            model = AutoModelForVision2Seq.from_pretrained(
                model_name,
                trust_remote_code=True,
                torch_dtype=torch.bfloat16, attn_implementation="flash_attention_2",
                device_map="auto" if torch.cuda.is_available() else "cpu",
            )
            print("   ✅ 모델 로딩 성공")
        except Exception as e:
            print(f"   ❌ 모델 로딩 실패: {e}")
            print(f"   📋 상세 오류:")
            traceback.print_exc()
            return None, None
        
        print()
        
        # 6. GPU 메모리 상태 확인
        if torch.cuda.is_available():
            print("6️⃣ GPU 메모리 상태:")
            memory_allocated = torch.cuda.memory_allocated(0) / 1024**3
            memory_reserved = torch.cuda.memory_reserved(0) / 1024**3
            total_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
            print(f"   📊 할당된 메모리: {memory_allocated:.2f}GB")
            print(f"   📦 예약된 메모리: {memory_reserved:.2f}GB")
            print(f"   💾 총 메모리: {total_memory:.2f}GB")
            print(f"   🆓 사용 가능: {total_memory - memory_reserved:.2f}GB")
        
        print()
        print("✅ 모든 테스트 통과! 모델 로딩 성공")
        return model, processor
        
    except Exception as e:
        print(f"❌ 예상치 못한 오류 발생: {e}")
        print(f"📋 전체 오류 스택:")
        traceback.print_exc()
        return None, None

In [16]:
debug_load_model()

🔍 VLM 모델 로딩 디버깅 시작
1️⃣ 기본 환경 확인:
   🐍 Python 버전: 3.11.10 (main, Sep  7 2024, 18:35:41) [GCC 11.4.0]
   🔥 PyTorch 버전: 2.4.1+cu124
   🖥️ CUDA 사용 가능: True
   🎯 CUDA 버전: 12.4
   🔢 GPU 개수: 1
   📛 GPU 이름: NVIDIA GeForce RTX 3090

2️⃣ 필요한 라이브러리 임포트 테스트:
   ✅ transformers 임포트 성공
   ✅ PIL 임포트 성공

3️⃣ 로딩할 모델: Qwen/Qwen2.5-VL-7B-Instruct

4️⃣ 프로세서 로딩 중...


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


   ✅ 프로세서 로딩 성공

5️⃣ 모델 로딩 중...


2025-06-23 06:28:01,250 - INFO - We will use 90% of the memory on device 0 for storing the model, and 10% for the buffer to avoid OOM. You can set `max_memory` in to a higher value to use more memory (at your own risk).


Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

   ✅ 모델 로딩 성공

6️⃣ GPU 메모리 상태:
   📊 할당된 메모리: 15.98GB
   📦 예약된 메모리: 17.59GB
   💾 총 메모리: 23.68GB
   🆓 사용 가능: 6.09GB

✅ 모든 테스트 통과! 모델 로딩 성공


(Qwen2_5_VLForConditionalGeneration(
   (visual): Qwen2_5_VisionTransformerPretrainedModel(
     (patch_embed): Qwen2_5_VisionPatchEmbed(
       (proj): Conv3d(3, 1280, kernel_size=(2, 14, 14), stride=(2, 14, 14), bias=False)
     )
     (rotary_pos_emb): Qwen2_5_VisionRotaryEmbedding()
     (blocks): ModuleList(
       (0-31): 32 x Qwen2_5_VLVisionBlock(
         (norm1): Qwen2RMSNorm((1280,), eps=1e-06)
         (norm2): Qwen2RMSNorm((1280,), eps=1e-06)
         (attn): Qwen2_5_VLVisionFlashAttention2(
           (qkv): Linear(in_features=1280, out_features=3840, bias=True)
           (proj): Linear(in_features=1280, out_features=1280, bias=True)
         )
         (mlp): Qwen2_5_VLMLP(
           (gate_proj): Linear(in_features=1280, out_features=3420, bias=True)
           (up_proj): Linear(in_features=1280, out_features=3420, bias=True)
           (down_proj): Linear(in_features=3420, out_features=1280, bias=True)
           (act_fn): SiLU()
         )
       )
     )
     (merge

In [17]:
# LLM 모델 단독테트트

In [18]:
import traceback
import sys
import torch
from pathlib import Path

def debug_load_llm_model():
    """LLM 모델 로딩 디버깅 함수"""
    print("🧪 LLM 모델 로딩 디버깅 시작")
    print("="*60)

    try:
        # 1. 기본 환경 확인
        print("1️⃣ 기본 환경 확인:")
        print(f"   🐍 Python 버전: {sys.version}")
        print(f"   🔥 PyTorch 버전: {torch.__version__}")
        print(f"   🖥️ CUDA 사용 가능: {torch.cuda.is_available()}")
        if torch.cuda.is_available():
            print(f"   🎯 CUDA 버전: {torch.version.cuda}")
            print(f"   🔢 GPU 개수: {torch.cuda.device_count()}")
            print(f"   📛 GPU 이름: {torch.cuda.get_device_name(0)}")
        print()

        # 2. 라이브러리 임포트 테스트
        print("2️⃣ 필요한 라이브러리 임포트 테스트:")
        try:
            from transformers import AutoTokenizer, AutoModelForCausalLM
            print("   ✅ transformers 임포트 성공")
        except ImportError as e:
            print(f"   ❌ transformers 임포트 실패: {e}")
            return None, None
        
        print()

        # 3. 모델 이름 지정
        model_name = "google/gemma-3-4b-it"  # 원하는 LLM 이름으로 변경 가능
        print(f"3️⃣ 로딩할 LLM 모델: {model_name}")
        print()

        # 4. 토크나이저 로딩 테스트
        print("4️⃣ 토크나이저 로딩 중...")
        try:
            tokenizer = AutoTokenizer.from_pretrained(
                model_name,
                trust_remote_code=True
            )
            print("   ✅ 토크나이저 로딩 성공")
        except Exception as e:
            print(f"   ❌ 토크나이저 로딩 실패: {e}")
            print(f"   📋 상세 오류:")
            traceback.print_exc()
            return None, None
        
        print()

        # 5. 모델 로딩 테스트
        print("5️⃣ 모델 로딩 중...")
        try:
            model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else "cpu",
                trust_remote_code=True
            )
            print("   ✅ 모델 로딩 성공")
        except Exception as e:
            print(f"   ❌ 모델 로딩 실패: {e}")
            print(f"   📋 상세 오류:")
            traceback.print_exc()
            return None, None

        print()

        # 6. 간단한 추론 테스트
        print("6️⃣ 간단한 추론 테스트:")
        try:
            input_text = "AI가 무엇인가요?"
            inputs = tokenizer(input_text, return_tensors="pt").to(model.device)
            with torch.no_grad():
                outputs = model.generate(**inputs, max_new_tokens=50)
            decoded = tokenizer.decode(outputs[0], skip_special_tokens=True)
            print("   ✅ 추론 성공")
            print(f"   💬 출력 예시: {decoded[:100]}...")
        except Exception as e:
            print(f"   ❌ 추론 실패: {e}")
            traceback.print_exc()

        # 7. GPU 메모리 상태 확인
        if torch.cuda.is_available():
            print("\n7️⃣ GPU 메모리 상태:")
            memory_allocated = torch.cuda.memory_allocated(0) / 1024**3
            memory_reserved = torch.cuda.memory_reserved(0) / 1024**3
            total_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3
            print(f"   📊 할당된 메모리: {memory_allocated:.2f}GB")
            print(f"   📦 예약된 메모리: {memory_reserved:.2f}GB")
            print(f"   💾 총 메모리: {total_memory:.2f}GB")
            print(f"   🆓 사용 가능: {total_memory - memory_reserved:.2f}GB")

        print()
        print("✅ 모든 테스트 통과! LLM 모델 로딩 성공")
        return model, tokenizer

    except Exception as e:
        print(f"❌ 예상치 못한 오류 발생: {e}")
        print(f"📋 전체 오류 스택:")
        traceback.print_exc()
        return None, None

# 실행
print("🧪 LLM 모델 로딩 디버깅을 실행합니다...")
llm_model, llm_tokenizer = debug_load_llm_model()
print(f"\n📋 최종 결과: {'성공' if llm_model and llm_tokenizer else '실패'}")


🧪 LLM 모델 로딩 디버깅을 실행합니다...
🧪 LLM 모델 로딩 디버깅 시작
1️⃣ 기본 환경 확인:
   🐍 Python 버전: 3.11.10 (main, Sep  7 2024, 18:35:41) [GCC 11.4.0]
   🔥 PyTorch 버전: 2.4.1+cu124
   🖥️ CUDA 사용 가능: True
   🎯 CUDA 버전: 12.4
   🔢 GPU 개수: 1
   📛 GPU 이름: NVIDIA GeForce RTX 3090

2️⃣ 필요한 라이브러리 임포트 테스트:
   ✅ transformers 임포트 성공

3️⃣ 로딩할 LLM 모델: google/gemma-3-1b-it

4️⃣ 토크나이저 로딩 중...
   ✅ 토크나이저 로딩 성공

5️⃣ 모델 로딩 중...


2025-06-23 06:30:06,478 - INFO - We will use 90% of the memory on device 0 for storing the model, and 10% for the buffer to avoid OOM. You can set `max_memory` in to a higher value to use more memory (at your own risk).


   ✅ 모델 로딩 성공

6️⃣ 간단한 추론 테스트:
   ✅ 추론 성공
   💬 출력 예시: AI가 무엇인가요?

AI는 **인공지능(Artificial Intelligence)**의 약자로, 인간의 지능을 모방하여 컴퓨터가 스스로 학습하고 문제를 해결할 수 있도록 하는 ...

7️⃣ GPU 메모리 상태:
   📊 할당된 메모리: 17.85GB
   📦 예약된 메모리: 20.02GB
   💾 총 메모리: 23.68GB
   🆓 사용 가능: 3.66GB

✅ 모든 테스트 통과! LLM 모델 로딩 성공

📋 최종 결과: 성공


# 1. gpu 메모리 관리 및 모니터링 함수들

In [None]:
이 섹션에서는 GPU 메모리 상태를 실시간으로 모니터링하고 관리하는 함수들을 정의합니다. 
주요 기능으로는:

1) GPU 메모리 사용량 조회 (get_gpu_memory_info)

2) 메모리 정리 (cleanup_gpu_memory)

3) 메모리 정리 동작 방식:
 1. gc.collect(): Python 가비지 컬렉션 실행
 2. torch.cuda.empty_cache(): PyTorch GPU 캐시 비우기
 3. torch.cuda.synchronize(): GPU 연산 완료 대기

4)  메모리 임계값 확인 및 OOM(Out of Memory) 방지

5)  안전한 모델 추론을 위한 재시도 메커니즘

In [2]:

# GPU 관리를 위한 전역 설정 (클래스 정의 전에 추가)
try:
    import pynvml
    pynvml.nvmlInit()  # 전역 초기화
    PYNVML_AVAILABLE = True
    print("✅ NVML 초기화 성공")
except Exception as e:
    PYNVML_AVAILABLE = False
    print(f"⚠️ NVML 초기화 실패: {e}")



class AdvancedGPUManager:
    """고급 GPU 활용률 최적화 및 OOM 방지 관리자"""
    
    def __init__(self, target_utilization=85.0, safety_margin=0.9):
        self.target_utilization = target_utilization  # 목표 GPU 활용률 (%)
        self.safety_margin = safety_margin  # 메모리 안전 마진 (90%)
        self.memory_history = []
        self.optimal_batch_size = 1
        self.max_batch_size = 8
        
        # pynvml 초기화
        if PYNVML_AVAILABLE:
            try:
                pynvml.nvmlInit()
                self.nvml_enabled = True
            except:
                self.nvml_enabled = False
        else:
            self.nvml_enabled = False
    
    def get_gpu_metrics(self):
        """GPU 활용률과 메모리 정보를 종합적으로 수집"""
        if not torch.cuda.is_available():
            return None
            
        if self.nvml_enabled:
            return self._get_nvml_metrics()
        else:
            return self._get_torch_memory_info()
    
    def _get_nvml_metrics(self):
        """NVML을 사용한 상세한 GPU 메트릭 수집"""
        try:
            handle = pynvml.nvmlDeviceGetHandleByIndex(0)
            
            # GPU 활용률 정보
            utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
            
            # 메모리 정보
            memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
            total_memory = memory_info.total
            used_memory = memory_info.used
            free_memory = memory_info.free
            
            # PyTorch 메모리 정보
            torch_allocated = torch.cuda.memory_allocated()
            torch_reserved = torch.cuda.memory_reserved()
            
            return {
                'gpu_utilization': utilization.gpu,  # GPU 코어 활용률
                'memory_utilization': utilization.memory,  # 메모리 대역폭 활용률
                'total_memory_gb': total_memory / (1024**3),
                'used_memory_gb': used_memory / (1024**3),
                'free_memory_gb': free_memory / (1024**3),
                'memory_usage_percent': (used_memory / total_memory) * 100,
                'torch_allocated_gb': torch_allocated / (1024**3),
                'torch_reserved_gb': torch_reserved / (1024**3),
                'available_memory_gb': (total_memory - torch_reserved) / (1024**3)
            }
        except Exception as e:
            print(f"⚠️ NVML 메트릭 수집 실패: {e}")
            return self._get_torch_memory_info()
    
    def _get_torch_memory_info(self):
        """PyTorch만을 사용한 기본 메모리 정보"""
        allocated = torch.cuda.memory_allocated()
        reserved = torch.cuda.memory_reserved()
        total = torch.cuda.get_device_properties(0).total_memory
        
        return {
            'gpu_utilization': 0,  # 추정 불가
            'memory_utilization': 0,
            'total_memory_gb': total / (1024**3),
            'used_memory_gb': reserved / (1024**3),
            'free_memory_gb': (total - reserved) / (1024**3),
            'memory_usage_percent': (reserved / total) * 100,
            'torch_allocated_gb': allocated / (1024**3),
            'torch_reserved_gb': reserved / (1024**3),
            'available_memory_gb': (total - reserved) / (1024**3)
        }
    
    def calculate_optimal_batch_size(self, current_metrics, base_memory_per_item=0.8):
        """현재 메모리 상황을 기반으로 최적 배치 크기 계산"""
        if not current_metrics:
            return 1
            
        # 사용 가능한 메모리 (안전 마진 적용)
        available_memory = current_metrics['available_memory_gb'] * self.safety_margin
        
        # 예상 배치 크기 계산 (VLM은 메모리를 많이 사용)
        estimated_batch_size = max(1, int(available_memory / base_memory_per_item))
        
        # 최대값 제한
        optimal_batch_size = min(estimated_batch_size, self.max_batch_size)
        
        # GPU 활용률이 낮다면 배치 크기 증가 시도
        if current_metrics['gpu_utilization'] < self.target_utilization and optimal_batch_size < self.max_batch_size:
            optimal_batch_size = min(optimal_batch_size + 1, self.max_batch_size)
        
        return optimal_batch_size
    
    def should_process_batch(self, batch_size=1):
        """배치 처리 가능 여부 판단"""
        metrics = self.get_gpu_metrics()
        if not metrics:
            return True, 1
            
        # 메모리 사용률이 안전 임계값을 초과하는 경우
        if metrics['memory_usage_percent'] > (self.safety_margin * 100):
            print(f"⚠️ 메모리 사용률 높음 ({metrics['memory_usage_percent']:.1f}%) - 대기")
            return False, max(1, batch_size // 2)
        
        # 최적 배치 크기 계산
        optimal_size = self.calculate_optimal_batch_size(metrics)
        
        return True, optimal_size
    
    def cleanup_memory(self, intensive=False):
        """메모리 정리"""
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            if intensive:
                torch.cuda.synchronize()
    
    def print_detailed_status(self):
        """상세한 GPU 상태 출력"""
        metrics = self.get_gpu_metrics()
        if not metrics:
            print("❌ GPU 메트릭을 가져올 수 없습니다")
            return
            
        print("\n" + "="*60)
        print("🖥️ GPU 상태 모니터링")
        print("="*60)
        print(f"🎯 GPU 코어 활용률: {metrics['gpu_utilization']}%")
        print(f"📊 메모리 대역폭 활용률: {metrics['memory_utilization']}%")
        print(f"💾 메모리 사용량: {metrics['used_memory_gb']:.1f}GB / {metrics['total_memory_gb']:.1f}GB ({metrics['memory_usage_percent']:.1f}%)")
        print(f"🆓 사용 가능 메모리: {metrics['available_memory_gb']:.1f}GB")
        print(f"🔥 PyTorch 할당됨: {metrics['torch_allocated_gb']:.1f}GB")
        print(f"📦 PyTorch 예약됨: {metrics['torch_reserved_gb']:.1f}GB")
        
        # 상태 평가
        if metrics['gpu_utilization'] < 50:
            print("⚡ GPU 활용률이 낮습니다 - 배치 크기 증가 권장")
        elif metrics['gpu_utilization'] > 90:
            print("🔥 GPU 활용률이 매우 높습니다!")
            
        if metrics['memory_usage_percent'] > 85:
            print("⚠️ 메모리 사용률 주의 - OOM 위험")
        elif metrics['memory_usage_percent'] < 50:
            print("✅ 메모리 여유 충분")
        
        print("="*60)

print("✅ 고급 GPU 관리자 로딩 완료")

✅ NVML 초기화 성공
✅ 고급 GPU 관리자 로딩 완료


# 2. 배치 적응형 및 적응형 메모리 관리

메모리 상황에 따라 동적으로 처리량을 조정하는 시스템입니다:

1. 메모리 사용량이 90% 초과시 배치 크기를 절반으로 축소
2. 메모리 여유가 60% 미만시 배치 크기를 증가시켜 효율성 향상
3. 임계값 초과시 자동으로 처리를 일시정지하고 메모리 정리

In [3]:
class MemoryEfficientBatchProcessor:
    """메모리 효율적인 배치 처리 시스템"""
    
    def __init__(self, initial_batch_size=1, max_batch_size=4):
        self.gpu_manager = AdvancedGPUManager()
        self.current_batch_size = initial_batch_size
        self.max_batch_size = max_batch_size
        self.processing_stats = {
            'total_processed': 0,
            'successful_batches': 0,
            'oom_events': 0,
            'memory_cleanups': 0,
            'batch_size_adjustments': 0
        }
    
    def process_items_efficiently(self, items, process_function, **kwargs):
        """메모리 효율적인 아이템 처리"""
        results = []
        total_items = len(items)
        processed_count = 0
        
        print(f"🚀 효율적 배치 처리 시작: {total_items}개 아이템")
        start_time = time.time()
        
        while processed_count < total_items:
            # 현재 GPU 상태 확인
            can_process, optimal_batch_size = self.gpu_manager.should_process_batch(self.current_batch_size)
            
            if not can_process:
                print("⏸️ 메모리 부족으로 일시 정지 - 정리 중...")
                self._emergency_cleanup()
                time.sleep(3)
                continue
            
            # 배치 크기 조정
            if optimal_batch_size != self.current_batch_size:
                print(f"🔧 배치 크기 조정: {self.current_batch_size} → {optimal_batch_size}")
                self.current_batch_size = optimal_batch_size
                self.processing_stats['batch_size_adjustments'] += 1
            
            # 배치 생성
            batch_start = processed_count
            batch_end = min(processed_count + self.current_batch_size, total_items)
            current_batch = items[batch_start:batch_end]
            
            print(f"📦 배치 처리 중: {batch_start+1}-{batch_end}/{total_items} (크기: {len(current_batch)})")
            
            try:
                # 배치 처리 실행
                batch_results = self._process_batch_safe(current_batch, process_function, **kwargs)
                results.extend(batch_results)
                
                processed_count = batch_end
                self.processing_stats['successful_batches'] += 1
                
                # 진행률 및 GPU 상태 표시 (5배치마다)
                if self.processing_stats['successful_batches'] % 5 == 0:
                    progress = (processed_count / total_items) * 100
                    elapsed = time.time() - start_time
                    rate = processed_count / (elapsed / 60)  # 분당 처리율
                    
                    print(f"📊 진행률: {progress:.1f}% ({processed_count}/{total_items}) | 처리율: {rate:.1f}개/분")
                    self.gpu_manager.print_detailed_status()
                
            except torch.cuda.OutOfMemoryError:
                print(f"💥 OOM 발생! 배치 크기 {self.current_batch_size} → {max(1, self.current_batch_size // 2)}")
                self.processing_stats['oom_events'] += 1
                self.current_batch_size = max(1, self.current_batch_size // 2)
                self._emergency_cleanup()
                time.sleep(5)
                continue
                
            except Exception as e:
                print(f"❌ 배치 처리 오류: {e}")
                # 개별 처리로 폴백
                individual_results = self._fallback_individual_processing(current_batch, process_function, **kwargs)
                results.extend(individual_results)
                processed_count = batch_end
        
        self._print_processing_summary(total_items, time.time() - start_time)
        return results
    
    def _process_batch_safe(self, batch, process_function, **kwargs):
        """안전한 배치 처리 (메모리 모니터링 포함)"""
        batch_results = []
        
        for i, item in enumerate(batch):
            # 메모리 임계값 체크 (배치 중간에도)
            if i > 0 and i % 2 == 0:  # 2개마다 체크
                current_metrics = self.gpu_manager.get_gpu_metrics()
                if current_metrics and current_metrics['memory_usage_percent'] > 88:
                    print(f"⚠️ 배치 중간 메모리 임계값 초과 ({current_metrics['memory_usage_percent']:.1f}%) - 정리")
                    self._incremental_cleanup()
            
            # 아이템 처리
            try:
                result = process_function(item, **kwargs)
                batch_results.append(result)
                self.processing_stats['total_processed'] += 1
                
            except torch.cuda.OutOfMemoryError:
                # 배치 중간 OOM 발생시 즉시 정리 후 재시도
                print(f"💥 배치 중간 OOM - 긴급 정리 후 재시도")
                self._emergency_cleanup()
                result = process_function(item, **kwargs)
                batch_results.append(result)
                self.processing_stats['total_processed'] += 1
        
        # 배치 완료 후 메모리 정리
        self._incremental_cleanup()
        return batch_results
    
    def _fallback_individual_processing(self, batch, process_function, **kwargs):
        """개별 처리 폴백"""
        print(f"🔄 개별 처리 모드로 전환 ({len(batch)}개 아이템)")
        individual_results = []
        
        for item in batch:
            try:
                self._incremental_cleanup()  # 각 아이템 처리 전 정리
                result = process_function(item, **kwargs)
                individual_results.append(result)
                self.processing_stats['total_processed'] += 1
            except Exception as item_error:
                print(f"❌ 개별 아이템 처리 실패: {item_error}")
                individual_results.append(None)
        
        return individual_results
    
    def _incremental_cleanup(self):
        """점진적 메모리 정리"""
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        self.processing_stats['memory_cleanups'] += 1
    
    def _emergency_cleanup(self):
        """긴급 메모리 정리"""
        print("🧹 긴급 메모리 정리 실행...")
        self.gpu_manager.cleanup_memory(intensive=True)
        time.sleep(2)
        self.processing_stats['memory_cleanups'] += 1
    
    def _print_processing_summary(self, total_items, total_time):
        """처리 결과 요약"""
        print("\n" + "="*50)
        print("📊 배치 처리 완료 요약")
        print("="*50)
        print(f"📷 총 처리 아이템: {self.processing_stats['total_processed']}/{total_items}")
        print(f"📦 성공한 배치: {self.processing_stats['successful_batches']}")
        print(f"💥 OOM 발생 횟수: {self.processing_stats['oom_events']}")
        print(f"🔧 배치 크기 조정: {self.processing_stats['batch_size_adjustments']}회")
        print(f"🧹 메모리 정리 횟수: {self.processing_stats['memory_cleanups']}")
        print(f"⚡ 최종 배치 크기: {self.current_batch_size}")
        print(f"⏱️ 총 처리 시간: {total_time/60:.1f}분")
        print(f"📈 평균 처리율: {self.processing_stats['total_processed']/(total_time/60):.1f}개/분")
        print("="*50)

print("✅ 메모리 효율적 배치 처리기 로딩 완료")

✅ 메모리 효율적 배치 처리기 로딩 완료


# 3. xml파싱및 마크다운 포맷팅 함수들
1. VLM(Vision Language Model) 출력을 구조화된 형태로 변환하는 함수들입니다:
2. XML 형태의 모델 출력을 파싱하여 제목, 요약, 엔티티, 질문으로 분리
3. 마크다운 형태의 응답도 처리 가능
4. 파싱 실패시 기본값 설정으로 안정성 확보

In [4]:
class OptimizedOutputParser:
    """VLM 출력 파싱을 위한 최적화된 클래스"""
    
    # 정규표현식을 클래스 레벨에서 컴파일 (성능 최적화)
    XML_PATTERNS = {
        'title': re.compile(r'<title>(.*?)</title>', re.DOTALL | re.IGNORECASE),
        'summary': re.compile(r'<summary>(.*?)</summary>', re.DOTALL | re.IGNORECASE),
        'entities': re.compile(r'<entities>(.*?)</entities>', re.DOTALL | re.IGNORECASE),
        'questions': re.compile(r'<hypothetical_questions>(.*?)</hypothetical_questions>', re.DOTALL | re.IGNORECASE)
    }
    
    MARKDOWN_PATTERNS = {
        'title': re.compile(r'###\s*🖼️\s*[이미지\s]*제목[\s\S]*?\n(.+?)\n', re.IGNORECASE),
        'summary': re.compile(r'###\s*📋\s*[이미지\s]*요약[\s\S]*?\n([\s\S]*?)(?=###|$)', re.IGNORECASE),
        'entities': re.compile(r'###\s*🏷️\s*[핵심\s]*엔티티[\s\S]*?\n([\s\S]*?)(?=###|$)', re.IGNORECASE),
        'questions': re.compile(r'###\s*❓\s*[가상질문|관련질문][\s\S]*?\n([\s\S]*?)(?=###|$)', re.IGNORECASE)
    }
    
    @classmethod
    def parse_image_summary(cls, raw_output):
        """통합 파싱 함수 (XML 우선, 마크다운 백업)"""
        # 기본 구조
        parsed_data = {
            'title': '',
            'summary': '',
            'entities': '',
            'hypothetical_questions': ''
        }
        
        try:
            # XML 파싱 시도 (컴파일된 패턴 사용)
            for key, pattern in cls.XML_PATTERNS.items():
                match = pattern.search(raw_output)
                if match:
                    parsed_data[key] = match.group(1).strip()
            
            # XML이 실패하면 마크다운 파싱
            if not any(parsed_data.values()):
                markdown_keys = {'title': 'title', 'summary': 'summary', 'entities': 'entities', 'questions': 'hypothetical_questions'}
                for md_key, data_key in markdown_keys.items():
                    if md_key in cls.MARKDOWN_PATTERNS:
                        match = cls.MARKDOWN_PATTERNS[md_key].search(raw_output)
                        if match:
                            parsed_data[data_key] = match.group(1).strip()
        
        except Exception as e:
            print(f"⚠️ 파싱 오류: {e}")
            # 오류 시 원본 텍스트를 summary에 저장
            parsed_data['summary'] = raw_output[:500] + "..." if len(raw_output) > 500 else raw_output
        
        return parsed_data
    
    @staticmethod
    def create_formatted_markdown(parsed_data):
        """파싱된 데이터를 마크다운으로 변환 (최적화됨)"""
        return f"""# 📊 이미지 분석 결과

## 🖼️ {parsed_data.get('title', '제목 없음')}

### 📋 요약
{parsed_data.get('summary', '분석 내용 없음')}

### 🏷️ 핵심 엔티티
{parsed_data.get('entities', '엔티티 없음')}

### ❓ 관련 질문들
{parsed_data.get('hypothetical_questions', '질문 없음')}

---
*분석 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""

# 기존 함수를 새 클래스로 대체
def parse_image_summary_xml(raw_output):
    """기존 함수 호환성 유지"""
    return OptimizedOutputParser.parse_image_summary(raw_output)

def parse_markdown_response(raw_output):
    """기존 함수 호환성 유지"""
    return OptimizedOutputParser.parse_image_summary(raw_output)

def create_formatted_markdown_from_parsed(parsed_data):
    """기존 함수 호환성 유지"""
    return OptimizedOutputParser.create_formatted_markdown(parsed_data)

print("✅ 최적화된 출력 파서 로딩 완료")

✅ 최적화된 출력 파서 로딩 완료


# 4. 파일 저장 시스템

In [5]:
class OptimizedFileManager:
    """파일 저장을 위한 최적화된 클래스"""
    
    @staticmethod
    def create_output_directories(base_output_dir="./analysis_output"):
        """출력용 디렉토리들을 생성 (pathlib 사용)"""
        base_path = Path(base_output_dir)
        directories = {
            'base': base_path,
            'xml': base_path / "xml_results",
            'markdown': base_path / "markdown_results", 
            'json': base_path / "json_results"
        }
        
        # 디렉토리 생성 (한 번에 처리)
        for dir_name, dir_path in directories.items():
            dir_path.mkdir(parents=True, exist_ok=True)
        
        print(f"📁 출력 디렉토리 생성 완료: {base_path}")
        return directories
    
    @staticmethod
    def save_results_as_markdown(results, output_dirs, filename_prefix="image_analysis"):
        """마크다운 형식으로 저장 (최적화됨)"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename_prefix}_{timestamp}.md"
            output_file = output_dirs['markdown'] / filename
            
            # 템플릿 문자열로 한 번에 생성
            content_parts = [
                "# 📊 이미지 분석 결과\n",
                f"**생성 시간:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
                f"**총 분석 이미지:** {len(results)}개\n",
                "---\n"
            ]
            
            # 각 이미지 결과 추가
            for i, (image_path, analysis) in enumerate(results, 1):
                content_parts.extend([
                    f"\n## 🖼️ 이미지 {i}: {Path(image_path).name}\n",
                    f"**📁 파일 경로:** `{image_path}`\n",
                    f"**🔍 분석 결과:**\n{analysis}\n",
                    "---\n"
                ])
            
            # 한 번에 파일 쓰기
            output_file.write_text(''.join(content_parts), encoding='utf-8')
            
            print(f"✅ 마크다운 저장: {output_file.name}")
            return str(output_file)
        except Exception as e:
            print(f"❌ 마크다운 저장 실패: {e}")
            return None
    
    @staticmethod
    def save_results_as_json(results, output_dirs, filename_prefix="image_analysis"):
        """JSON 형식으로 저장 (최적화됨)"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename_prefix}_{timestamp}.json"
            output_file = output_dirs['json'] / filename
            
            # JSON 데이터 구조 생성 (list comprehension 사용)
            json_data = {
                "metadata": {
                    "generated_at": datetime.now().isoformat(),
                    "total_images": len(results),
                    "format_version": "1.0"
                },
                "analysis_results": [
                    {
                        "id": i,
                        "file_path": image_path,
                        "filename": Path(image_path).name,
                        "analysis": analysis,
                        "word_count": len(analysis.split()),
                        "processed_at": datetime.now().isoformat()
                    }
                    for i, (image_path, analysis) in enumerate(results, 1)
                ]
            }
            
            # JSON 파일 저장
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(json_data, f, ensure_ascii=False, indent=2)
            
            print(f"✅ JSON 저장: {output_file.name}")
            return str(output_file)
        except Exception as e:
            print(f"❌ JSON 저장 실패: {e}")
            return None

# 기존 함수들을 새 클래스로 대체
def create_output_directories(base_output_dir="./output"):
    """기존 함수 호환성 유지"""
    return OptimizedFileManager.create_output_directories(base_output_dir)

def save_results_as_markdown(results, output_dirs, filename_prefix="image_analysis"):
    """기존 함수 호환성 유지"""
    return OptimizedFileManager.save_results_as_markdown(results, output_dirs, filename_prefix)

def save_results_as_json(results, output_dirs, filename_prefix="image_analysis"):
    """기존 함수 호환성 유지"""
    return OptimizedFileManager.save_results_as_json(results, output_dirs, filename_prefix)

print("✅ 최적화된 파일 관리자 로딩 완료")

✅ 최적화된 파일 관리자 로딩 완료


# 5. vlm 모델 로딩 및 처리

In [6]:
class OptimizedVLMManager:
    """VLM 관리자"""
    
    def __init__(self):
        self.model = None
        self.processor = None
        self.model_name = "Qwen/Qwen2.5-VL-7B-Instruct"
        self.gpu_manager = AdvancedGPUManager()
    
    def load_model(self):
        """🚀  GPU 최적화 Qwen2.5-VL 모델 로드"""
        try:
            print(f"🔄 {self.model_name} 모델 로딩 중...")
            
            # 로딩 전 메모리 정리
            self.gpu_manager.cleanup_memory(intensive=True)
            
            # 모델 로딩 (AWQ 최적화)
            self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
                self.model_name,
                torch_dtype=torch.bfloat16,
                attn_implementation="flash_attention_2",  # 성능 최적화
                device_map="auto",  # 자동 GPU 배치
                trust_remote_code=True,  # 원격 코드 신뢰
                low_cpu_mem_usage=True,  # CPU 메모리 사용량 최소화
            )
            
            # 프로세서 로딩
            self.processor = AutoProcessor.from_pretrained(
                self.model_name,
                trust_remote_code=True
            )
            
            # AWQ 모델 최적화 설정 (half() 호출 제거)
            if torch.cuda.is_available():
                self.model.eval()  # 평가 모드로 설정
                print(f"✅ 모델 최적화 완료 (dtype: {self.model.dtype})")
            
            device_info = f"CPU" if not torch.cuda.is_available() else f"GPU ({torch.cuda.get_device_name(0)})"
            print(f"✅ 모델 로딩 완료! 디바이스: {device_info}")
            
            # 메모리 상태 확인
            self.gpu_manager.print_detailed_status()
            
            return self.model, self.processor
            
        except Exception as e:
            print(f"❌ 모델 로딩 오류: {e}")
            print("💡 해결 방법:")
            print("   1. Hugging Face 로그인 확인: huggingface_hub.login()")
            print("   2. 네트워크 연결 상태 확인")
            print("   3. GPU 메모리 부족 시 다른 프로세스 종료")
            return None, None
    
    def get_optimized_prompt(self):
        """최적화된 프롬프트 템플릿 반환"""
        return """You are an expert in extracting useful information from IMAGE.
Extract key entities, summarize them, and write useful information for retrieval.
Provide five hypothetical questions based on the image.

Please analyze this image and provide a structured summary in the following markdown format:

### 🖼️ Image Title
[Write a clear, descriptive title for the image in Korean]

### 📋 Image Summary
[Provide a comprehensive summary of the image content in Korean - describe what you see, key information, trends, patterns, etc.]

### 🏷️ Key Entities
[List the main entities, keywords, and important elements found in the image in Korean]

### ❓ Hypothetical Questions
1. [Question 1]
2. [Question 2] 
3. [Question 3]
4. [Question 4]
5. [Question 5]

Important: 
- All output must be in Korean
- Focus on financial/business content if applicable
- Include specific numbers, percentages, or data points if visible
- Make the summary detailed and informative for retrieval purposes"""
    
    def process_single_image_safe(self, image_path, max_retries=3):
        """🛡️ OOM 재시도 메커니즘을 포함한 안전한 단일 이미지 처리"""
        if not self.model or not self.processor:
            print("❌ 모델이 로드되지 않았습니다. load_model()을 먼저 실행하세요.")
            return None
        
        prompt_text = self.get_optimized_prompt()
        
        for attempt in range(max_retries):
            try:
                # 🔍 처리 전 메모리 확인
                memory_info = self.gpu_manager.get_gpu_metrics()
                if memory_info and memory_info['available_memory_gb'] < 3.0:  # 3GB 임계값
                    print(f"⚠️ 메모리 부족 ({memory_info['available_memory_gb']:.1f}GB) - 정리 중...")
                    self.gpu_manager.cleanup_memory(intensive=True)
                    time.sleep(2)
                
                # 📸 이미지 로드 (에러 처리 강화)
                try:
                    image = Image.open(image_path).convert('RGB')
                except Exception as img_error:
                    print(f"❌ 이미지 로드 실패 {image_path}: {img_error}")
                    return f"이미지 로드 실패: {str(img_error)}"
                
                # 메시지 구성
                messages = [{
                    "role": "user",
                    "content": [
                        {"type": "image", "image": image},
                        {"type": "text", "text": prompt_text}
                    ]
                }]
                
                # 🔧 입력 준비 (메모리 효율적)
                text = self.processor.apply_chat_template(
                    messages, tokenize=False, add_generation_prompt=True
                )
                
                image_inputs, video_inputs = process_vision_info(messages)
                inputs = self.processor(
                    text=[text],
                    images=image_inputs,
                    videos=video_inputs,
                    padding=True,
                    return_tensors="pt",
                )
                
                # GPU로 이동
                if torch.cuda.is_available():
                    inputs = inputs.to("cuda")
                
                # 🎯 응답 생성 (최적화된 설정)
                with torch.no_grad():
                    generated_ids = self.model.generate(
                        **inputs, 
                        max_new_tokens=768,  # 더 긴 응답 허용
                        do_sample=True,  # 샘플링 활성화
                        temperature=0.7,  # 창의성 조절
                        top_p=0.8,  # 토큰 선택 다양성
                        pad_token_id=self.processor.tokenizer.eos_token_id
                    )
                    
                    # 응답 디코딩
                    generated_ids_trimmed = [
                        out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
                    ]
                    output_text = self.processor.batch_decode(
                        generated_ids_trimmed, 
                        skip_special_tokens=True, 
                        clean_up_tokenization_spaces=False
                    )
                
                # 🧹 즉시 메모리 정리
                del inputs, generated_ids, generated_ids_trimmed, image
                self.gpu_manager.cleanup_memory()
                
                result = output_text[0] if output_text else "분석이 생성되지 않았습니다"
                
                # 결과 검증
                if len(result.strip()) < 100:
                    print(f"⚠️ 짧은 응답 감지 ({len(result)} 문자) - 재시도...")
                    continue
                
                return result
                
            except torch.cuda.OutOfMemoryError as e:
                print(f"💥 OOM 오류 발생 (시도 {attempt + 1}/{max_retries})")
                self.gpu_manager.cleanup_memory(intensive=True)
                time.sleep(5)  # OOM 후 더 오래 대기
                
                if attempt == max_retries - 1:
                    return f"메모리 부족으로 인해 {Path(image_path).name} 처리 실패"
            
            except Exception as e:
                print(f"❌ 처리 오류 (시도 {attempt + 1}/{max_retries}): {e}")
                if attempt == max_retries - 1:
                    return f"처리 오류: {str(e)}"
                time.sleep(2)
        
        return f"{max_retries}번 시도 후 {Path(image_path).name} 처리 실패"

# 기존 함수들과 호환성 유지
def load_model():
    """기존 함수 호환성 유지"""
    vlm_manager = OptimizedVLMManager()
    return vlm_manager.load_model()

def process_single_image_safe(model, processor, image_path, max_retries=3):
    """기존 함수 호환성 유지"""
    vlm_manager = OptimizedVLMManager()
    vlm_manager.model = model
    vlm_manager.processor = processor
    return vlm_manager.process_single_image_safe(image_path, max_retries)

print("✅ 최적화된 VLM 관리자 로딩 완료")

✅ 최적화된 VLM 관리자 로딩 완료


# 6. GPU 최적화 메인 파이프라인 로딩 완료


In [7]:
def gpu_optimized_main_pipeline(image_dir="./data/images", output_dir="./analysis_output"):
    """GPU 활용률 최적화된 메인 파이프라인"""
    print("="*60)
    print("🚀 GPU 최적화 MultiModal RAG 시스템")
    print("="*60)
    
    # 1. 고급 GPU 관리자 및 배치 처리기 초기화
    gpu_manager = AdvancedGPUManager(target_utilization=85.0)
    batch_processor = MemoryEfficientBatchProcessor(initial_batch_size=2, max_batch_size=6)
    
    # 초기 GPU 상태 확인
    print("🔍 초기 GPU 상태:")
    gpu_manager.print_detailed_status()
    
    # 2. 출력 디렉토리 초기화 
    output_dirs = OptimizedFileManager.create_output_directories(output_dir)
    
    # 3. 이미지 파일 수집
    image_extensions = ('.jpg', '.jpeg', '.png')
    image_files = [
        str(path) for path in Path(image_dir).rglob('*')
        if path.suffix.lower() in image_extensions and path.is_file()
    ]
    
    if not image_files:
        print(f"❌ 이미지 파일 없음: {image_dir}")
        return None
    
    print(f"📷 발견된 이미지: {len(image_files)}개")
    
    # 4. VLM 모델 로딩
    try:
        model, processor = load_model()
        if not model:
            print("❌ VLM 모델 로딩 실패")
            return None
    except Exception as e:
        print(f"❌ 모델 로딩 오류: {e}")
        return None
    
    # 5. 메모리 효율적 이미지 처리 함수 정의
    def process_single_image_optimized(image_path):
        """단일 이미지 처리 (최적화됨)"""
        return process_single_image_safe(model, processor, image_path, max_retries=2)
    
    # 6. GPU 최적화 배치 처리 실행
    print(f"🔄 GPU 최적화 이미지 분석 시작...")
    start_time = time.time()
    
    # 메모리 효율적 배치 처리 사용
    analysis_results = batch_processor.process_items_efficiently(
        image_files, 
        process_single_image_optimized
    )
    
    # 7. 결과 필터링 및 정리
    valid_results = []
    for i, (image_path, analysis) in enumerate(zip(image_files, analysis_results)):
        if analysis and len(str(analysis).strip()) > 50:
            valid_results.append((image_path, analysis))
    
    if not valid_results:
        print("❌ 유효한 분석 결과 없음")
        return None
    
    print(f"\n✅ 분석 완료! 성공: {len(valid_results)}/{len(image_files)}개")
    
    # 8. 결과 저장 
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename_prefix = f"gpu_optimized_analysis_{timestamp}"
    
    print(f"💾 결과 저장 중...")
    
    md_file = OptimizedFileManager.save_results_as_markdown(valid_results, output_dirs, filename_prefix)
    json_file = OptimizedFileManager.save_results_as_json(valid_results, output_dirs, filename_prefix)
    
    # 9. 최종 정리 및 결과 요약
    total_time = time.time() - start_time
    
    print(f"\n📊 GPU 최적화 처리 요약:")
    print(f"   📷 처리된 이미지: {len(valid_results)}개")
    print(f"   ⏱️ 총 소요 시간: {total_time/60:.1f}분")
    print(f"   ⚡ 평균 처리 속도: {len(valid_results)/(total_time/60):.1f}개/분")
    print(f"   📁 출력 경로: {output_dirs['base']}")
    
    # 최종 GPU 상태 확인
    print("\n🔍 최종 GPU 상태:")
    gpu_manager.print_detailed_status()
    
    return {
        'results': valid_results,
        'output_dirs': output_dirs,
        'files': {'markdown': md_file, 'json': json_file},
        'processing_time': total_time,
        'gpu_stats': batch_processor.processing_stats
    }

print("✅ GPU 최적화 메인 파이프라인 로딩 완료")

✅ GPU 최적화 메인 파이프라인 로딩 완료


# 🤖 2. LLM (Large Language Model) 파이프라인

# 🔧 1단계: 필수 라이브러리 임포트 및 설정

In [12]:
#!/usr/bin/env python3
"""
=======================================================================
🤖 LLM 텍스트/테이블 분석 파이프라인 (VLM 파이프라인 스타일)
=======================================================================
VLM 이미지 분석 파이프라인의 구조를 따라 구현된 
Gemma-3-1B-IT를 활용한 텍스트/테이블 분석 시스템
"""

import os
import gc
import re
import json
import time
import torch
import warnings
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from contextlib import contextmanager



# 경고 메시지 억제
warnings.filterwarnings('ignore')

print("✅ LLM 파이프라인 라이브러리 로딩 완료!")

✅ LLM 파이프라인 라이브러리 로딩 완료!


# 🚀 2단계: GPU 메모리 관리 함수들 (VLM 파이프라인에서 재사용)

In [13]:
class LLMGPUMemoryManager:
    """LLM 파이프라인용 고급 GPU 메모리 관리자 (VLM 스타일)"""
    
    @staticmethod
    def cleanup():
        """GPU 메모리 정리"""
        if torch.cuda.is_available():
            try:
                gc.collect()
                torch.cuda.empty_cache()
                torch.cuda.synchronize()
                print("🧹 GPU 메모리 정리 완료")
            except Exception as e:
                print(f"❌ GPU 메모리 정리 실패: {e}")
    
    @staticmethod
    def print_status():
        """GPU 메모리 상태 출력"""
        if not torch.cuda.is_available():
            print("❌ GPU를 사용할 수 없습니다")
            return
        
        try:
            allocated = torch.cuda.memory_allocated()
            reserved = torch.cuda.memory_reserved()
            total = torch.cuda.get_device_properties(0).total_memory
            
            allocated_gb = allocated / (1024**3)
            reserved_gb = reserved / (1024**3)
            total_gb = total / (1024**3)
            free_gb = (total - reserved) / (1024**3)
            usage_percent = (allocated / total) * 100
            
            print("\n" + "="*50)
            print("🖥️ LLM GPU 메모리 상태")
            print("="*50)
            print(f"📊 총 메모리: {total_gb:.2f} GB")
            print(f"💾 사용 중: {allocated_gb:.2f} GB ({usage_percent:.1f}%)")
            print(f"📦 예약됨: {reserved_gb:.2f} GB")
            print(f"🆓 사용 가능: {free_gb:.2f} GB")
            
            if usage_percent > 85:
                print("⚠️ 메모리 사용률 높음!")
            else:
                print("✅ GPU 메모리 상태 양호")
            
            print("="*50)
            
        except Exception as e:
            print(f"❌ GPU 메모리 상태 확인 실패: {e}")
    
    @staticmethod
    def get_available_memory():
        """사용 가능한 GPU 메모리 반환 (GB)"""
        if not torch.cuda.is_available():
            return 0
        
        try:
            reserved = torch.cuda.memory_reserved()
            total = torch.cuda.get_device_properties(0).total_memory
            return (total - reserved) / (1024**3)
        except:
            return 0

print("✅ LLM GPU 메모리 관리자 로딩 완료")

✅ LLM GPU 메모리 관리자 로딩 완료


# 📋 3단계: 마크다운 텍스트/테이블 분리 전처리 클래스

In [7]:
📖 extract_clean_text()
├── 테이블 라인 제거 (| 패턴 필터링)
├── 공백 정규화
└── 순수 텍스트 추출

🧠 Kiwi 한국어 형태소 분석
├── 문장 경계 탐지
├── 문장별 분리 (start, end 위치 포함)
└── 의미 단위 보존

📝 텍스트 청킹
├── 최대 청크 크기: 1200자
├── 문장 경계 보존
└── 예상 결과: 50-80개 청크

SyntaxError: invalid character '📖' (U+1F4D6) (2547790868.py, line 1)

In [None]:
🍲 BeautifulSoup HTML 파싱
├── <table> 태그 감지
├── <tr>, <th>, <td> 셀 추출
└── 텍스트 내용 정리

📊 마크다운 변환
├── HTML → | 구분자 테이블
├── 헤더 구분선 (---) 추가
└── 예상 결과: 10-20개 테이블

# 변환 예시:
HTML: <table><tr><th>항목</th><th>값</th></tr><tr><td>GDP</td><td>2.1%</td></tr></table>
↓
마크다운: | 항목 | 값 |
         |------|-----|
         | GDP  | 2.1% |

In [None]:
🤖 Gemma-3-1B-IT 모델 로딩
├── Eager 모드 (Dynamo 오류 방지)
├── BFloat16 정밀도
└── CUDA GPU 활용

📦 배치 처리
├── 총 아이템: 60-100개 (텍스트 + 테이블)
├── 동적 배치 크기: 4-8개
└── 메모리 효율적 처리

📄 분석 결과
├── 요약 생성
├── 가설적 질문 생성
└── 콘텐츠 타입 분류



💾 파일 저장
├── 마크다운 형식 (분석 결과)
├── JSON 형식 (구조화된 데이터)
└── 타입별 분리 저장



import re
import json
import time
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
from kiwipiepy import Kiwi

print("✅ 라이브러리 임포트 완료")

In [14]:
# OptimizedTextTableProcessor 전체 클래스 코드 (IndentationError 수정 포함)

import re
import time
import json
from pathlib import Path
from datetime import datetime
from bs4 import BeautifulSoup
from kiwipiepy import Kiwi

class OptimizedTextTableProcessor:
    """개선된 텍스트와 테이블 분리 처리기 - Kiwi 형태소 분석기 기반 정확한 문장 분리 청킹"""

    def __init__(self):
        self.table_pattern = re.compile(r'^\s*\|.*\|\s*$', re.MULTILINE)
        self.table_separator_pattern = re.compile(r'^\s*\|[\s\-:]*\|\s*$', re.MULTILINE)

        print("🔄 Kiwi 형태소 분석기 초기화 중...")
        self.kiwi = Kiwi()
        print("✅ Kiwi 초기화 완료")

        self.processing_stats = {
            'total_files': 0,
            'tables_extracted': 0,
            'text_chunks_created': 0,
            'processing_time': 0,
            'kiwi_sentences_analyzed': 0,
            'average_chunk_length': 0
        }

    def process_markdown_file(self, text_dir, table_dir, output_dir="./text_extraction_output"):
        start_time = time.time()

        text_path = Path(text_dir)
        table_path = Path(table_dir)
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)

        print("📖 텍스트 파일 처리 시작...")
        text_files = list(text_path.rglob("*.md"))
        all_text_chunks = []

        if not text_files:
            print(f"❌ {text_dir}에서 마크다운 파일을 찾을 수 없습니다")
        else:
            print(f"📁 {len(text_files)}개의 텍스트 파일 발견")

            for i, text_file in enumerate(text_files, 1):
                print(f"🔄 텍스트 처리 중: {text_file.name} ({i}/{len(text_files)})")

                try:
                    content = text_file.read_text(encoding='utf-8')
                    clean_text = self.extract_clean_text(content)

                    if clean_text.strip():
                        text_chunks = self._intelligent_text_chunking(clean_text, text_file.name)
                        all_text_chunks.extend(text_chunks)

                        self.processing_stats['total_files'] += 1
                        self.processing_stats['text_chunks_created'] += len(text_chunks)

                except Exception as e:
                    print(f"❌ {text_file.name} 처리 오류: {e}")

        print("📊 테이블 파일 처리 시작...")
        all_tables = self.load_table_files(table_path)
        self.processing_stats['tables_extracted'] = len(all_tables)

        saved_files = {}
        if all_text_chunks or all_tables:
            saved_files = self._save_extraction_results(all_text_chunks, all_tables, output_path)

        if all_text_chunks:
            avg_length = sum(len(chunk['content']) for chunk in all_text_chunks) / len(all_text_chunks)
            self.processing_stats['average_chunk_length'] = round(avg_length, 2)

        self.processing_stats['processing_time'] = time.time() - start_time
        self._print_processing_summary()

        return {
            'text_chunks': all_text_chunks,
            'tables': all_tables,
            'output_dir': output_path,
            'saved_files': saved_files,
            'stats': self.processing_stats
        }

    def extract_clean_text(self, content):
        if not content or not isinstance(content, str):
            return ""

        lines = content.split('\n')
        clean_lines = []

        for line in lines:
            line_stripped = line.strip()
            if not line_stripped.startswith('|') and '|' not in line:
                if line_stripped or len(clean_lines) == 0 or clean_lines[-1].strip():
                    clean_lines.append(line)

        return '\n'.join(clean_lines)

    def load_table_files(self, table_dir):
        tables = []
        table_path = Path(table_dir)

        print(f"📊 HTML 테이블 처리 시작: {table_dir}")

        for file_path in table_path.rglob("*.md"):
            try:
                content = file_path.read_text(encoding='utf-8')

                if '<table>' in content:
                    print(f"🍲 HTML 테이블 감지: {file_path.name}")
                    soup = BeautifulSoup(content, 'html.parser')
                    html_tables = soup.find_all('table')

                    for i, table in enumerate(html_tables):
                        markdown_table = self._html_table_to_markdown(table)

                        tables.append({
                            'source_file': str(file_path),
                            'filename': f"{file_path.stem}-table-{i+1}.md",
                            'content': markdown_table,
                            'table_type': 'html_converted',
                            'original_html': str(table)
                        })

                        print(f"   ✅ HTML 테이블 {i+1} 변환 완료")

                elif '|' in content and ('---' in content or '|-' in content):
                    tables.append({
                        'source_file': str(file_path),
                        'filename': file_path.name,
                        'content': content,
                        'table_type': 'markdown_native'
                    })
                    print(f"📋 마크다운 테이블 처리: {file_path.name}")

            except Exception as e:
                print(f"❌ 파일 처리 오류 {file_path}: {e}")

        print(f"📊 총 {len(tables)}개 테이블 로드 완료")
        return tables

    def _html_table_to_markdown(self, table):
        rows = table.find_all('tr')
        markdown_lines = []

        for i, row in enumerate(rows):
            cells = row.find_all(['th', 'td'])
            cell_texts = [re.sub(r'\s+', ' ', cell.get_text(strip=True)) for cell in cells]

            if cell_texts:
                markdown_line = '| ' + ' | '.join(cell_texts) + ' |'
                markdown_lines.append(markdown_line)

                if i == 0:
                    separator = '|' + '|'.join(['---'] * len(cell_texts)) + '|'
                    markdown_lines.append(separator)

        return '\n'.join(markdown_lines)

    def _intelligent_text_chunking(self, text, source_file, max_chunk_size=1200, overlap=150):
        if not text or len(text.strip()) < 50:
            return []

        print(f"🧠 Kiwi 문장 분리 시작: {source_file}")
        sentences = self._extract_sentences_with_kiwi(text)
        self.processing_stats['kiwi_sentences_analyzed'] += len(sentences)
        print(f"📝 {len(sentences)}개 문장 분리 완료")

        chunks = self._create_sentence_based_chunks(sentences, source_file, max_chunk_size, overlap)
        print(f"✅ {len(chunks)}개 청크 생성 완료")
        return chunks

    def _extract_sentences_with_kiwi(self, text):
        text = self._preprocess_text(text)
        sentences = []
        paragraphs = text.split('\n\n')

        for paragraph in paragraphs:
            if paragraph.strip():
                sent_results = self.kiwi.split_into_sents(paragraph.strip())
                for sent in sent_results:
                    if sent.text.strip() and len(sent.text.strip()) > 5:
                        sentences.append({
                            'text': sent.text.strip(),
                            'start': sent.start,
                            'end': sent.end,
                            'word_count': len(sent.text.split())
                        })

        return sentences

    def _preprocess_text(self, text):
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'[""\"]', '"', text)
        text = re.sub(r'[\'\']', "'", text)
        text = re.sub(r'\n{3,}', '\n\n', text)
        return text.strip()

    def _create_sentence_based_chunks(self, sentences, source_file, max_chunk_size, overlap):
        if not sentences:
            return []

        chunks = []
        current_chunk = {
            'sentences': [],
            'content': '',
            'char_count': 0,
            'word_count': 0
        }

        for sentence in sentences:
            sentence_text = sentence['text']
            sentence_length = len(sentence_text)

            if current_chunk['char_count'] + sentence_length + 1 <= max_chunk_size:
                current_chunk['sentences'].append(sentence)
                current_chunk['content'] += sentence_text + ' '
                current_chunk['char_count'] += sentence_length + 1
                current_chunk['word_count'] += sentence['word_count']
            else:
                if current_chunk['sentences']:
                    chunk_data = self._finalize_chunk(current_chunk, source_file, len(chunks))
                    chunks.append(chunk_data)

                current_chunk = {
                    'sentences': [sentence],
                    'content': sentence_text + ' ',
                    'char_count': sentence_length + 1,
                    'word_count': sentence['word_count']
                }

        if current_chunk['sentences']:
            chunk_data = self._finalize_chunk(current_chunk, source_file, len(chunks))
            chunks.append(chunk_data)

        return [chunk for chunk in chunks if len(chunk['content'].strip()) > 30]

    def _finalize_chunk(self, chunk_data, source_file, chunk_index):
        content = chunk_data['content'].strip()
        return {
            'content': content,
            'source_file': source_file,
            'type': 'text',
            'chunk_type': 'sentence_based',
            'chunk_index': chunk_index,
            'word_count': chunk_data['word_count'],
            'char_count': len(content),
            'sentence_count': len(chunk_data['sentences']),
            'analysis_method': 'kiwi_sentence_splitting'
        }
    def _save_extraction_results(self, text_chunks, tables, output_path):
        """추출된 텍스트 청크와 테이블을 파일로 저장"""
        saved_files = {}
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        try:
            # 텍스트 청크 저장
            if text_chunks:
                text_output_file = output_path / f"extracted_text_chunks_{timestamp}.json"
                # ... JSON 저장 로직
            
            # 테이블 저장  
            if tables:
                table_output_file = output_path / f"extracted_tables_{timestamp}.json"
                # ... JSON 저장 로직
                
            # 통합 결과 저장
            combined_output_file = output_path / f"extraction_results_{timestamp}.json"
            # ... 통합 JSON 저장 로직
            
        except Exception as e:
            print(f"❌ 파일 저장 오류: {e}")
            
        return saved_files

    def _print_processing_summary(self):
        print("\n" + "=" * 70)
        print("📊 Kiwi 기반 문장 분리 텍스트/테이블 추출 완료")
        print("=" * 70)
        print(f"📁 처리된 파일: {self.processing_stats['total_files']}개")
        print(f"📊 로드된 테이블: {self.processing_stats['tables_extracted']}개")
        print(f"📝 생성된 텍스트 청크: {self.processing_stats['text_chunks_created']}개")
        print(f"🧠 Kiwi 분석 문장: {self.processing_stats['kiwi_sentences_analyzed']}개")
        print(f"📏 평균 청크 길이: {self.processing_stats['average_chunk_length']}자")
        print(f"⏱️ 처리 시간: {self.processing_stats['processing_time']:.2f}초")
        print("=" * 70)
        print("🎯 Kiwi 형태소 분석기 전용 문장 분리 청킹")
        print("✅ 높은 정확도의 문장 경계 탐지")
        print("=" * 70)


# 🤖 4단계: Gemma-3-1B-IT 모델 로딩 및 LLM 분석

In [15]:
# 🛠️ 수정된 OptimizedLLMManager (문제 해결 버전)

import torch._dynamo
torch._dynamo.config.suppress_errors = True

class OptimizedLLMManager:
    """수정된 LLM 매니저 - 프롬프트 및 파싱 로직 수정"""
    
    def __init__(self, model_name="google/gemma-3-4b-it", max_length=4156):
        self.model_name = model_name
        self.max_length = max_length
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = None
        self.tokenizer = None
        self.generation_stats = {
            'total_processed': 0,
            'total_time': 0,
            'avg_time_per_item': 0
        }
    
    def load_model(self):
        """모델 로딩 (기존 로직 유지)"""
        try:
            torch._dynamo.config.suppress_errors = True
            
            print(f"🔄 {self.model_name} 모델 로딩 중...")
            
            LLMGPUMemoryManager.cleanup()
            
            print("📝 토크나이저 로딩...")
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_name,
                trust_remote_code=True
            )
            
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
            
            print("🤖 모델 로딩...")
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_name,
                torch_dtype=torch.bfloat16,
                device_map="auto",
                trust_remote_code=True,
                attn_implementation="eager"
            ).to(self.device)
            
            print(f"✅ {self.model_name} 로딩 완료!")
            print(f"🎯 사용 디바이스: {self.device}")
            
            return True
            
        except Exception as e:
            print(f"❌ 모델 로딩 실패: {e}")
            return False
    
    def analyze_content(self, text, source_file=""):
        """🔧 수정된 콘텐츠 분석 - 프롬프트와 파싱 로직 일치"""
        try:
            if not self.model or not self.tokenizer:
                raise ValueError("모델 또는 토크나이저가 로딩되지 않았습니다.")
            
            # 텍스트 데이터 안전한 추출
            if isinstance(text, dict):
                actual_text = text.get('content', str(text))
                if isinstance(source_file, dict):
                    source_file = text.get('source_file', str(source_file))
            else:
                actual_text = str(text)
            
            # 🔧 텍스트 길이 제한 완화 (1000 → 2000)
            text_preview = actual_text[:4000] if len(actual_text) > 4000 else actual_text
            
            # 🔧 수정된 프롬프트 (한국어 형식으로 일치)
            prompt = f"""다음 텍스트/테이블을 분석하여 핵심 내용을 요약하고 관련 가설 질문을 생성하세요.

중요한 해석 가이드라인:

날짜 형식 해석:
- "25.2월" = 2025년 2월
- "21.12월" = 2021년 12월  
- "24.10월" = 2024년 10월
- 년도.월 형식은 해당 년도의 해당 월로 해석

테이블 타입 구분:
- 목차 테이블: 페이지 번호나 구조적 섹션/챕터 포함
- 데이터 테이블: 실제 숫자, 통계, 백분율 포함

목차 테이블의 경우 "이 문서는 X개 챕터로 구성됨..." 으로 요약 시작

텍스트/테이블:
{text_preview}

다음 형식으로 응답하세요:

## 콘텐츠 타입:
[데이터 테이블 / 목차 테이블 / 일반 텍스트 중 선택]

## 요약:
[위 가이드라인에 따라 정확한 해석으로 핵심 내용 요약]

## Hypothetical Questions:
1. [질문 1]
2. [질문 2] 
3. [질문 3]
4. [질문 4]
5. [질문 5]

중요사항:
- 모든 출력은 한국어로 작성
- 금융/비즈니스 내용에 중점
- 검색 목적을 위한 상세하고 유익한 요약 작성
"""
            
            # 토큰화
            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                max_length=self.max_length,
                truncation=True,
                padding=True
            ).to(self.device)
            
            # 추론
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=1024,
                    temperature=0.5,
                    do_sample=True,
                    pad_token_id=self.tokenizer.pad_token_id,
                    eos_token_id=self.tokenizer.eos_token_id,
                    use_cache=True
                )
            
            # 디코딩
            response = self.tokenizer.decode(
                outputs[0][inputs['input_ids'].shape[1]:],
                skip_special_tokens=True
            ).strip()
            
            # 🔧 수정된 응답 파싱 (한국어 형식에 맞춤)
            content_type = "일반 텍스트"
            summary_text = ""
            questions = []
            
            try:
                # 콘텐츠 타입 추출
                if "## 콘텐츠 타입:" in response:
                    sections = response.split("## 요약:")
                    type_section = sections[0].replace("## 콘텐츠 타입:", "").strip()
                    content_type = type_section if type_section else "일반 텍스트"
                    
                    if len(sections) > 1:
                        remaining = "## 요약:" + sections[1]
                    else:
                        remaining = response
                else:
                    remaining = response
                
                # 요약과 질문 추출
                if "## 요약:" in remaining and "## Hypothetical Questions:" in remaining:
                    parts = remaining.split("## Hypothetical Questions:")
                    summary_text = parts[0].replace("## 요약:", "").strip()
                    
                    # 질문 추출 (더 견고한 정규식)
                    questions_section = parts[1] if len(parts) > 1 else ""
                    import re
                    question_matches = re.findall(r'\d+\.\s*(.+?)(?=\n\d+\.|$)', questions_section, re.DOTALL)
                    questions = [q.strip() for q in question_matches if q.strip()]
                    
                    # 질문이 없으면 다른 방식으로 추출
                    if not questions:
                        lines = questions_section.strip().split('\n')
                        for line in lines:
                            line = line.strip()
                            if re.match(r'\d+\.', line):
                                question = re.sub(r'^\d+\.\s*', '', line).strip()
                                if question:
                                    questions.append(question)
                else:
                    # 파싱 실패 시 전체를 요약으로 처리
                    summary_text = remaining.replace("## 요약:", "").strip()
                    
                # 🔧 질문이 여전히 없으면 기본 메시지 설정
                if not questions:
                    questions = ["분석 결과에서 질문을 추출할 수 없었습니다."]
                    
            except Exception as parse_error:
                print(f"⚠️ 응답 파싱 오류: {parse_error}")
                summary_text = response[:500] + "..." if len(response) > 500 else response
                questions = [f"파싱 오류로 인한 질문 생성 실패: {str(parse_error)}"]
            
            return {
                'source_file': source_file,
                'content_type': content_type,
                'summary': summary_text,
                'hypothetical_questions': questions,
                'status': 'success'
            }
            
        except Exception as e:
            print(f"❌ 분석 오류: {e}")
            return {
                'source_file': source_file,
                'content_type': "오류",
                'summary': f"분석 실패: {str(e)}",
                'hypothetical_questions': [f"오류로 인한 질문 생성 실패: {str(e)}"],
                'status': 'error'
            }
    
    def process_batch(self, text_chunks, tables, batch_size=8):
        """🔧 수정된 배치 처리 - 타입 키 일치"""
        
        print(f"\n🚀 수정된 배치 처리 시작!")
        print(f"📦 텍스트 청크: {len(text_chunks)}개")
        print(f"📊 테이블: {len(tables)}개") 
        print(f"🔧 배치 크기: {batch_size}")
        print("="*50)
        
        start_time = time.time()
        all_results = []
        
        # 1. 모든 아이템을 하나의 리스트로 통합
        all_items = []
        
        # 텍스트 청크 추가
        for i, chunk in enumerate(text_chunks):
            all_items.append({
                'type': 'text',
                'data': chunk,
                'index': i,
                'source_type': 'text_chunk'
            })
        
        # 테이블 추가
        for i, table in enumerate(tables):
            all_items.append({
                'type': 'table', 
                'data': table,
                'index': i,
                'source_type': 'table'
            })
        
        total_items = len(all_items)
        print(f"📊 총 처리할 항목: {total_items}개")
        
        # 2. 배치 단위로 처리
        processed_count = 0
        
        for batch_start in range(0, total_items, batch_size):
            batch_end = min(batch_start + batch_size, total_items)
            current_batch = all_items[batch_start:batch_end]
            
            batch_num = batch_start//batch_size + 1
            total_batches = (total_items-1)//batch_size + 1
            
            print(f"🔄 배치 {batch_num}/{total_batches}: {len(current_batch)}개 항목 처리 중...")
            
            # 배치 내 각 아이템 처리
            for item in current_batch:
                try:
                    if processed_count % 10 == 0:
                        LLMGPUMemoryManager.cleanup()
                    
                    # 데이터 추출
                    data = item['data']
                    content = data.get('content', '')
                    source_file = data.get('source_file', '')
                    
                    # LLM 분석 실행
                    analysis_result = self.analyze_content(content, source_file)
                    
                    # 🔧 수정된 결과 구조화 - type 키 일치
                    result = {
                        'analysis_id': f"{item['source_type']}_{item['index']}_{int(time.time())}",
                        'source_file': source_file,
                        'type': item['type'],  # ← content_type 대신 type 사용
                        'text_content': content[:1000],
                        'analysis_result': analysis_result.get('summary', ''),
                        'hypothetical_questions': analysis_result.get('hypothetical_questions', []),
                        'word_count': len(content.split()) if content else 0,
                        'char_count': len(content) if content else 0,
                        'processed_at': datetime.now().isoformat(),
                        'processing_batch': batch_num
                    }
                    
                    all_results.append(result)
                    processed_count += 1
                    
                    # 진행률 표시
                    if processed_count % 5 == 0:
                        progress = (processed_count / total_items) * 100
                        print(f"📈 진행률: {progress:.1f}% ({processed_count}/{total_items})")
                    
                except Exception as e:
                    print(f"❌ 아이템 처리 오류: {e}")
                    # 오류 발생시에도 결과에 포함
                    all_results.append({
                        'analysis_id': f"error_{item['index']}_{int(time.time())}",
                        'source_file': item['data'].get('source_file', ''),
                        'type': item['type'],  # ← content_type 대신 type 사용
                        'text_content': '',
                        'analysis_result': f'처리 실패: {str(e)}',
                        'hypothetical_questions': [f'오류로 인한 질문 생성 실패: {str(e)}'],
                        'word_count': 0,
                        'char_count': 0,
                        'processed_at': datetime.now().isoformat(),
                        'processing_batch': batch_num
                    })
                    processed_count += 1
            
            # 배치 완료 후 메모리 정리
            LLMGPUMemoryManager.cleanup()
            print(f"✅ 배치 {batch_num} 완료")
        
        # 3. 통계 업데이트
        total_time = time.time() - start_time
        self.generation_stats['total_processed'] = len(all_results)
        self.generation_stats['total_time'] = total_time
        self.generation_stats['avg_time_per_item'] = total_time / len(all_results) if all_results else 0
        
        print(f"\n🎉 수정된 배치 처리 완료!")
        print(f"📊 처리 통계:")
        print(f"   - 총 처리 항목: {len(all_results)}개")
        print(f"   - 총 소요 시간: {total_time:.2f}초")
        print(f"   - 평균 처리 시간: {self.generation_stats['avg_time_per_item']:.2f}초/항목")
        
        return all_results
    
    def cleanup(self):
        """리소스 정리"""
        try:
            if self.model:
                del self.model
                self.model = None
            if self.tokenizer:
                del self.tokenizer  
                self.tokenizer = None
            
            LLMGPUMemoryManager.cleanup()
            print("🧹 수정된 LLM 매니저 리소스 정리 완료")
            
        except Exception as e:
            print(f"❌ 리소스 정리 오류: {e}")

print("✅ 수정된 OptimizedLLMManagerFixed 클래스 로딩 완료!")


✅ 수정된 OptimizedLLMManagerFixed 클래스 로딩 완료!


# 🗄️ 5단계: 파일 저장관리자


In [16]:
class OptimizedLLMFileManager:
    """LLM 파이프라인용 파일 저장 관리자 (타입별 분리 저장)"""
    
    @staticmethod
    def create_output_directories(base_output_dir="./llm_analysis_output"):
        """출력 디렉토리 생성 (타입별 분리)"""
        base_path = Path(base_output_dir)
        directories = {
            'base': base_path,
            'markdown_text': base_path / "markdown_text_result",
            'markdown_table': base_path / "markdown_table_result", 
            'json_text': base_path / "json_text_result",
            'json_table': base_path / "json_table_result",
            'analysis': base_path / "analysis_results",
            'extraction': base_path / "text_extraction_output"
        }
        
        # 디렉토리 생성
        for dir_name, dir_path in directories.items():
            dir_path.mkdir(parents=True, exist_ok=True)
        
        print(f"📁 LLM 출력 디렉토리 생성 완료: {base_path}")
        print(f"   📝 텍스트 마크다운: {directories['markdown_text'].name}")
        print(f"   📊 테이블 마크다운: {directories['markdown_table'].name}")
        print(f"   📝 텍스트 JSON: {directories['json_text'].name}")
        print(f"   📊 테이블 JSON: {directories['json_table'].name}")
        return directories
    
    @staticmethod
    def save_analysis_results_by_type(results, output_dirs, filename_prefix="llm_analysis"):
        """타입별로 분리하여 저장 (메인 함수)"""
        # 타입별로 결과 분리
        text_results = [r for r in results if r['type'] == 'text']
        table_results = [r for r in results if r['type'] == 'table']
        
        saved_files = {}
        
        # 텍스트 결과 저장
        if text_results:
            saved_files['text_markdown'] = OptimizedLLMFileManager.save_text_results_as_markdown(
                text_results, output_dirs, filename_prefix
            )
            saved_files['text_json'] = OptimizedLLMFileManager.save_text_results_as_json(
                text_results, output_dirs, filename_prefix
            )
        
        # 테이블 결과 저장
        if table_results:
            saved_files['table_markdown'] = OptimizedLLMFileManager.save_table_results_as_markdown(
                table_results, output_dirs, filename_prefix
            )
            saved_files['table_json'] = OptimizedLLMFileManager.save_table_results_as_json(
                table_results, output_dirs, filename_prefix
            )
        
        return saved_files
    
    @staticmethod
    def save_text_results_as_markdown(text_results, output_dirs, filename_prefix="llm_text_analysis"):
        """텍스트 분석 결과를 마크다운으로 저장"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename_prefix}_text_{timestamp}.md"
            output_file = output_dirs['markdown_text'] / filename
            
            # 마크다운 콘텐츠 구성
            content_parts = [
                "# 📝 LLM 텍스트 분석 결과\n\n",
                f"**생성 시간:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
                f"**총 텍스트 분석:** {len(text_results)}개\n\n",
                "---\n\n"
            ]
            
            # 각 텍스트 분석 결과 추가
            for i, result in enumerate(text_results, 1):
                content_type = result.get('content_type', '일반 텍스트')
                content_parts.extend([
                    f"## 📝 텍스트 분석 {i}: {content_type}\n\n",
                    f"**📁 원본 파일:** {result.get('source_file', 'unknown')}\n",
                    f"**📋 콘텐츠 타입:** {content_type}\n",
                    f"**🕒 처리 시간:** {result.get('processed_at', 'unknown')}\n\n",
                    f"### 📄 원본 내용\n",
                    f"```\n{result['original_content'][:1000]}{'...' if len(result['original_content']) > 500 else ''}\n```\n\n",
                    f"### 🔍 분석 결과\n",
                    f"{result['analysis']}\n\n"
                ])
                
                # Hypothetical Questions 추가
                questions = result.get('hypothetical_questions', [])
                if questions:
                    content_parts.append("### ❓ Hypothetical Questions\n")
                    for j, question in enumerate(questions, 1):
                        content_parts.append(f"{j}. {question}\n")
                    content_parts.append("\n")
                
                content_parts.append("---\n\n")
            
            # 파일 저장
            output_file.write_text(''.join(content_parts), encoding='utf-8')
            
            print(f"✅ 텍스트 마크다운 저장: {output_file.name}")
            return str(output_file)
            
        except Exception as e:
            print(f"❌ 텍스트 마크다운 저장 실패: {e}")
            return None
    
    @staticmethod
    def save_table_results_as_markdown(table_results, output_dirs, filename_prefix="llm_table_analysis"):
        """테이블 분석 결과를 마크다운으로 저장"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename_prefix}_table_{timestamp}.md"
            output_file = output_dirs['markdown_table'] / filename
            
            # 마크다운 콘텐츠 구성
            content_parts = [
                "# 📊 LLM 테이블 분석 결과\n\n",
                f"**생성 시간:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
                f"**총 테이블 분석:** {len(table_results)}개\n\n",
                "---\n\n"
            ]
            
            # 각 테이블 분석 결과 추가
            for i, result in enumerate(table_results, 1):
                content_type = result.get('content_type', '데이터 테이블')
                content_parts.extend([
                    f"## 📊 테이블 분석 {i}: {content_type}\n\n",
                    f"**📁 원본 파일:** {result.get('source_file', 'unknown')}\n",
                    f"**📋 콘텐츠 타입:** {content_type}\n",
                    f"**🕒 처리 시간:** {result.get('processed_at', 'unknown')}\n\n",
                    f"### 📄 원본 테이블\n",
                    f"```\n{result['original_content'][:1000]}{'...' if len(result['original_content']) > 500 else ''}\n```\n\n",
                    f"### 🔍 분석 결과\n",
                    f"{result['analysis']}\n\n"
                ])
                
                # Hypothetical Questions 추가
                questions = result.get('hypothetical_questions', [])
                if questions:
                    content_parts.append("### ❓ Hypothetical Questions\n")
                    for j, question in enumerate(questions, 1):
                        content_parts.append(f"{j}. {question}\n")
                    content_parts.append("\n")
                
                content_parts.append("---\n\n")
            
            # 파일 저장
            output_file.write_text(''.join(content_parts), encoding='utf-8')
            
            print(f"✅ 테이블 마크다운 저장: {output_file.name}")
            return str(output_file)
            
        except Exception as e:
            print(f"❌ 테이블 마크다운 저장 실패: {e}")
            return None
    
    @staticmethod
    def save_text_results_as_json(text_results, output_dirs, filename_prefix="llm_text_analysis"):
        """텍스트 분석 결과를 JSON으로 저장"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename_prefix}_text_{timestamp}.json"
            output_file = output_dirs['json_text'] / filename
            
            # JSON 데이터 구성
            json_data = {
                "metadata": {
                    "generated_at": datetime.now().isoformat(),
                    "content_type": "text_analysis",
                    "total_text_analyses": len(text_results),
                    "format_version": "1.0",
                    "pipeline": "LLM_text_analysis"
                },
                "text_analysis_results": text_results
            }
            
            # JSON 파일 저장
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(json_data, f, ensure_ascii=False, indent=2)
            
            print(f"✅ 텍스트 JSON 저장: {output_file.name}")
            return str(output_file)
            
        except Exception as e:
            print(f"❌ 텍스트 JSON 저장 실패: {e}")
            return None
    
    @staticmethod
    def save_table_results_as_json(table_results, output_dirs, filename_prefix="llm_table_analysis"):
        """테이블 분석 결과를 JSON으로 저장"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename_prefix}_table_{timestamp}.json"
            output_file = output_dirs['json_table'] / filename
            
            # JSON 데이터 구성
            json_data = {
                "metadata": {
                    "generated_at": datetime.now().isoformat(),
                    "content_type": "table_analysis", 
                    "total_table_analyses": len(table_results),
                    "format_version": "1.0",
                    "pipeline": "LLM_table_analysis"
                },
                "table_analysis_results": table_results
            }
            
            # JSON 파일 저장
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(json_data, f, ensure_ascii=False, indent=2)
            
            print(f"✅ 테이블 JSON 저장: {output_file.name}")
            return str(output_file)
            
        except Exception as e:
            print(f"❌ 테이블 JSON 저장 실패: {e}")
            return None

print("✅ LLM 타입별 분리 저장 시스템 로딩 완료")

✅ LLM 타입별 분리 저장 시스템 로딩 완료


# 재귀적 파일 탐색색

In [17]:
def llm_main_pipeline_complete_recursive_fixed(
    markdown_files,  # 파일 리스트를 직접 받음
    output_dir="./llm_analysis_output",
    model_name="google/gemma-3-4b-it"
):
    """
    🚀 완전한 LLM 텍스트/테이블 분석 파이프라인 (모든 오류 수정 버전)
    """
    
    print("="*70)
    print("🚀 LLM 텍스트/테이블 분석 파이프라인 시작 (수정 버전)")
    print("="*70)
    
    start_time = time.time()
    
    try:
        # 1. GPU 메모리 최적화
        print("\n1️⃣ GPU 메모리 최적화...")
        LLMGPUMemoryManager.cleanup()  
        LLMGPUMemoryManager.print_status()
        
        # 2. 출력 디렉토리 설정
        print("\n2️⃣ 출력 디렉토리 설정...")
        output_dirs = OptimizedLLMFileManager.create_output_directories(output_dir)
        
        # 3. ✅ 기존 메서드를 이용한 텍스트 및 테이블 추출
        print("\n3️⃣ 마크다운 파일 처리...")
        text_processor = OptimizedTextTableProcessor()
        all_text_chunks = []
        all_tables = []
        
        for md_file in markdown_files:
            print(f"📄 처리 중: {md_file}")
            
            # 파일 읽기
            try:
                with open(md_file, 'r', encoding='utf-8') as f:
                    content = f.read()
            except Exception as e:
                print(f"❌ 파일 읽기 오류 ({md_file}): {e}")
                continue
            
            # ✅ 기존 메서드 사용: 파일 타입에 따라 처리 분기
            file_path = Path(md_file)
            
            # 파일 경로로 텍스트/테이블 구분 (ex_text vs ex_table)
            if "ex_text" in str(file_path) or "text" in file_path.name.lower():
                # 📝 텍스트 파일 처리
                print(f"📝 텍스트 파일로 처리: {file_path.name}")
                
                # 기존 메서드: 테이블 라인 제거
                clean_text = text_processor.extract_clean_text(content)
                
                if clean_text.strip():
                    # 기존 메서드: Kiwi 기반 청킹
                    text_chunks = text_processor._intelligent_text_chunking(
                        clean_text, str(md_file)
                    )
                    all_text_chunks.extend(text_chunks)
                    
            elif "ex_table" in str(file_path) or "table" in file_path.name.lower():
                # 📊 테이블 파일 처리
                print(f"📊 테이블 파일로 처리: {file_path.name}")
                
                # 기존 메서드와 동일한 구조로 테이블 데이터 생성
                table_data = {
                    'content': content,
                    'source_file': str(md_file),
                    'type': 'table',
                    'filename': file_path.name
                }
                all_tables.append(table_data)
                
            else:
                # 🔍 파일명으로 구분이 안 되는 경우, 내용으로 판단
                print(f"🔍 내용 분석하여 처리: {file_path.name}")
                
                # 테이블 마커가 많으면 테이블로, 아니면 텍스트로 처리
                table_line_count = len([line for line in content.split('\n') if '|' in line])
                total_lines = len(content.split('\n'))
                
                if table_line_count > total_lines * 0.3:  # 30% 이상이 테이블 라인
                    # 테이블로 처리
                    table_data = {
                        'content': content,
                        'source_file': str(md_file),
                        'type': 'table',
                        'filename': file_path.name
                    }
                    all_tables.append(table_data)
                else:
                    # 텍스트로 처리
                    clean_text = text_processor.extract_clean_text(content)
                    if clean_text.strip():
                        text_chunks = text_processor._intelligent_text_chunking(
                            clean_text, str(md_file)
                        )
                        all_text_chunks.extend(text_chunks)
        
        print(f"✅ 텍스트 청크: {len(all_text_chunks)}개, 테이블: {len(all_tables)}개")
        
        if not all_text_chunks and not all_tables:
            print("❌ 추출된 콘텐츠가 없습니다")
            return None
        
        # 4. LLM 모델 로딩 (안전한 로딩)
        print("\n4️⃣ LLM 모델 로딩...")
        llm_generator = OptimizedLLMManager(model_name)
        
        # 모델 로딩 확인
        print("🔧 모델 로딩 상태 확인...")
        load_success = llm_generator.load_model()
        if not load_success:
            print("❌ LLM 모델 로딩 실패")
            return None
        
        print("✅ LLM 모델 로딩 성공!")
        
        # 5. 배치 처리
        print("\n5️⃣ LLM 분석 시작...")
        analysis_results = llm_generator.process_batch(all_text_chunks, all_tables)
        
        if not analysis_results:
            print("❌ 분석 결과가 없습니다")
            return None
        
        print(f"✅ 분석 완료: {len(analysis_results)}개 결과")
        
        # 6. 결과 저장 (수정된 메서드명 사용)
        print("\n6️⃣ 결과 저장...")
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # 마크다운 저장 (올바른 메서드명과 호출 방식)
        try:
            md_saved = OptimizedLLMFileManager.save_analysis_results_as_markdown(
                analysis_results, output_dirs, f"llm_recursive_analysis_{timestamp}"
            )
            print(f"✅ 마크다운 저장: {md_saved}")
        except Exception as e:
            print(f"❌ 마크다운 저장 실패: {e}")
            md_saved = None
        
        # JSON 저장
        try:
            json_saved = OptimizedLLMFileManager.save_analysis_results_as_json(
                analysis_results, output_dirs, f"llm_recursive_analysis_{timestamp}"
            )
            print(f"✅ JSON 저장: {json_saved}")
        except Exception as e:
            print(f"❌ JSON 저장 실패: {e}")
            json_saved = None
        
        # 8. 성능 통계
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        print(f"\n🎯 파이프라인 완료!")
        print(f"⏱️ 총 소요 시간: {elapsed_time:.2f}초")
        print(f"📊 처리된 항목: {len(analysis_results)}개")
        print(f"💾 저장 위치: {output_dir}")
        
        return {
            'text_chunks': all_text_chunks,
            'tables': all_tables,
            'analysis_results': analysis_results,
            'saved_files': {'markdown': md_saved, 'json': json_saved}
        }

        
    except Exception as e:
        print(f"❌ 파이프라인 실행 오류: {e}")
        import traceback
        traceback.print_exc()
        return None

# vlm 파이프라인 단독 실행행

In [None]:
📋 execute_vlm_pipeline_only()
│
├── 📁 Path("./data/images")                            ✅ pathlib 표준라이브러리
│   ├── exists()                                        ✅ pathlib 메서드
│   └── rglob('*')                                      ✅ pathlib 메서드
│
├── 🖼️ gpu_optimized_main_pipeline()                    ✅ Line 3291에서 정의됨
│   ├── AdvancedGPUManager()                            ✅ Line 2434에서 정의됨
│   │   ├── cleanup_memory(intensive=True)              ✅ 클래스 내 메서드
│   │   └── print_detailed_status()                     ✅ 클래스 내 메서드
│   │
│   ├── MemoryEfficientBatchProcessor()                 ✅ Line 2619에서 정의됨  
│   │   ├── __init__(initial_batch_size=2)              ✅ 클래스 내 메서드
│   │   └── process_with_adaptive_batching()            ✅ 클래스 내 메서드
│   │       ├── torch.cuda.get_device_properties()      ✅ PyTorch CUDA
│   │       ├── torch.cuda.memory_allocated()           ✅ PyTorch CUDA
│   │       └── psutil.virtual_memory()                 ✅ psutil 라이브러리
│   │
│   ├── OptimizedFileManager.create_output_directories() ✅ Line 2914에서 정의됨
│   │   └── Path.mkdir(parents=True, exist_ok=True)     ✅ pathlib 표준라이브러리
│   │
│   ├── load_model()                                    ✅ Line 3230에서 정의됨 (VLM용)
│   │   ├── Qwen2_5_VLForConditionalGeneration.from_pretrained() ✅ transformers
│   │   ├── AutoProcessor.from_pretrained()             ✅ transformers  
│   │   ├── torch.bfloat16                              ✅ PyTorch dtype
│   │   ├── device_map="auto"                           ✅ transformers 설정
│   │   └── torch.cuda.empty_cache()                    ✅ PyTorch
│   │
│   └── process_images_with_adaptive_batching()         ✅ 추정 - 배치 처리 함수
│       ├── analyze_image()                             ✅ 추정 - 이미지 분석 함수
│       │   ├── processor()                             ✅ AutoProcessor 메서드
│       │   ├── model.generate()                        ✅ Transformers 모델 메서드
│       │   │   ├── max_new_tokens=512                  ✅ 생성 파라미터
│       │   │   ├── temperature=0.7                     ✅ 생성 파라미터
│       │   │   └── do_sample=True                      ✅ 생성 파라미터
│       │   └── processor.decode()                      ✅ AutoProcessor 메서드
│       └── chunk_analysis_results()                    ✅ 추정 - 결과 청킹 함수
│           ├── json.dumps()                            ✅ json 표준라이브러리
│           └── Path.write_text()                       ✅ pathlib 메서드

In [19]:
# 🖼️ VLM 파이프라인 단독 실행
def execute_vlm_pipeline_only():
    """VLM 파이프라인만 별도로 실행 (이미지 분석)"""
    
    print("🎬 VLM 파이프라인 단독 실행 시작")
    print("="*50)
    
    try:
        # 📁 입력 디렉토리 존재 여부 사전 체크
        image_dir = Path("./data/images")
        if not image_dir.exists():
            return {
                'success': False,
                'vlm_result': None,
                'message': f'입력 디렉토리가 존재하지 않음: {image_dir}'
            }
        
        # 이미지 파일 존재 여부 체크
        image_extensions = ('.jpg', '.jpeg', '.png')
        image_files = [
            path for path in image_dir.rglob('*')
            if path.suffix.lower() in image_extensions and path.is_file()
        ]
        
        if not image_files:
            return {
                'success': False,
                'vlm_result': None,
                'message': f'이미지 파일이 없음: {image_dir} (*.jpg, *.jpeg, *.png)'
            }
        
        # VLM 파이프라인 실행
        print("\n🖼️ VLM 파이프라인: 이미지 분석")
        print("📁 입력: ./data/images")
        print("📁 출력: ./analysis_output")
        print(f"📷 발견된 이미지: {len(image_files)}개")
        
        vlm_result = gpu_optimized_main_pipeline('./data/images', './analysis_output')
        
       
        if vlm_result:
            print(f"\n✅ VLM 파이프라인 완료!")
            print(f"📊 처리된 이미지: {len(image_files)}개")
            
            return {
                'success': True,
                'vlm_result': vlm_result,
                'total_images_processed': len(image_files),
                'message': 'VLM 파이프라인 성공적으로 완료'
            }
        else:
            print("❌ VLM 파이프라인 결과가 없습니다")
            return {
                'success': False,
                'vlm_result': None,
                'message': 'VLM 파이프라인 실행 실패'
            }
            
    except Exception as e:
        print(f"❌ VLM 파이프라인 실행 오류: {e}")
        import traceback
        traceback.print_exc()
        return {
            'success': False,
            'vlm_result': None,
            'message': f'오류 발생: {str(e)}'
        }

# VLM 파이프라인 실행
print("🖼️ VLM 파이프라인을 실행합니다...")
vlm_pipeline_result = execute_vlm_pipeline_only()
print(f"\n📋 VLM 파이프라인 결과: {vlm_pipeline_result.get('message', 'N/A')}")

🖼️ VLM 파이프라인을 실행합니다...
🎬 VLM 파이프라인 단독 실행 시작

🖼️ VLM 파이프라인: 이미지 분석
📁 입력: ./data/images
📁 출력: ./analysis_output
📷 발견된 이미지: 46개
🚀 GPU 최적화 MultiModal RAG 시스템
🔍 초기 GPU 상태:

🖥️ GPU 상태 모니터링
🎯 GPU 코어 활용률: 0%
📊 메모리 대역폭 활용률: 1%
💾 메모리 사용량: 0.3GB / 24.0GB (1.4%)
🆓 사용 가능 메모리: 24.0GB
🔥 PyTorch 할당됨: 0.0GB
📦 PyTorch 예약됨: 0.0GB
⚡ GPU 활용률이 낮습니다 - 배치 크기 증가 권장
✅ 메모리 여유 충분
📁 출력 디렉토리 생성 완료: analysis_output
📷 발견된 이미지: 46개
🔄 Qwen/Qwen2.5-VL-7B-Instruct 모델 로딩 중...


Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


✅ 모델 최적화 완료 (dtype: torch.bfloat16)
✅ 모델 로딩 완료! 디바이스: GPU (NVIDIA GeForce RTX 3090)

🖥️ GPU 상태 모니터링
🎯 GPU 코어 활용률: 0%
📊 메모리 대역폭 활용률: 0%
💾 메모리 사용량: 16.0GB / 24.0GB (66.8%)
🆓 사용 가능 메모리: 8.6GB
🔥 PyTorch 할당됨: 15.4GB
📦 PyTorch 예약됨: 15.4GB
⚡ GPU 활용률이 낮습니다 - 배치 크기 증가 권장
🔄 GPU 최적화 이미지 분석 시작...
🚀 효율적 배치 처리 시작: 46개 아이템
🔧 배치 크기 조정: 2 → 8
📦 배치 처리 중: 1-8/46 (크기: 8)
📦 배치 처리 중: 9-16/46 (크기: 8)
📦 배치 처리 중: 17-24/46 (크기: 8)
📦 배치 처리 중: 25-32/46 (크기: 8)
📦 배치 처리 중: 33-40/46 (크기: 8)
📊 진행률: 87.0% (40/46) | 처리율: 5.1개/분

🖥️ GPU 상태 모니터링
🎯 GPU 코어 활용률: 65%
📊 메모리 대역폭 활용률: 55%
💾 메모리 사용량: 16.1GB / 24.0GB (67.1%)
🆓 사용 가능 메모리: 8.5GB
🔥 PyTorch 할당됨: 15.5GB
📦 PyTorch 예약됨: 15.5GB
📦 배치 처리 중: 41-46/46 (크기: 6)

📊 배치 처리 완료 요약
📷 총 처리 아이템: 46/46
📦 성공한 배치: 6
💥 OOM 발생 횟수: 0
🔧 배치 크기 조정: 1회
🧹 메모리 정리 횟수: 6
⚡ 최종 배치 크기: 8
⏱️ 총 처리 시간: 9.0분
📈 평균 처리율: 5.1개/분

✅ 분석 완료! 성공: 46/46개
💾 결과 저장 중...
✅ 마크다운 저장: gpu_optimized_analysis_20250623_064538_20250623_064538.md
✅ JSON 저장: gpu_optimized_analysis_20250623_064538_20250623_064538.json

📊 GPU 최적

# llm 파이프라인 단독 실행

In [1]:
워크플로우 단계:
📄 파일 탐색: ./data/ex_text 디렉토리에서 .md 파일 스캔
📁 디렉토리 생성: 타입별 분리 저장을 위한 출력 디렉토리 생성
🤖 모델 로딩: Gemma-3-1B-IT LLM 모델 초기화
📖 텍스트 추출: 마크다운 파일에서 텍스트와 테이블 분리
🔄 LLM 분석: 배치 처리로 텍스트/테이블 분석
💾 분리 저장: 텍스트/테이블 결과를 타입별로 분리 저장

SyntaxError: invalid character '📄' (U+1F4C4) (3113715157.py, line 2)

In [19]:

def execute_llm_pipeline():
    """최종 수정된 성능 최적화 LLM 파이프라인 - 모든 문제 해결 버전"""
    
    print("🛠️ 최종 수정된 LLM 파이프라인 실행 시작! (모든 문제 해결)")
    print("="*60)
    
    try:
        # 1. 파일 디렉토리 존재 여부 사전 체크
        markdown_dir = Path("./data/ex_text")
        
        if not markdown_dir.exists():
            return {
                'success': False, 
                'result': None, 
                'message': f'디렉토리가 존재하지 않음: {markdown_dir}'
            }
        
        # 마크다운 파일 존재 여부 체크
        markdown_files = list(markdown_dir.rglob("*.md"))
        if not markdown_files:
            return {
                'success': False,
                'result': None,
                'message': f'마크다운 파일이 없음: {markdown_dir}'
            }
        
        print(f"📄 처리할 디렉토리: {markdown_dir}")
        print(f"📝 발견된 마크다운 파일: {len(markdown_files)}개")
        
        # 2. 출력 디렉토리 생성
        output_dirs = OptimizedLLMFileManager.create_output_directories("./llm_analysis_output")
        
        print("🔍 생성된 출력 디렉토리 구조:")
        for key, value in output_dirs.items():
            print(f"   {key}: {value}")
        
        # 3. ✅ 수정된 LLM 매니저 사용
        print("🤖 수정된 LLM 모델 로딩 중...")
        llm_manager = OptimizedLLMManager()  # ← 수정된 클래스 사용
        if not llm_manager.load_model():
            return {
                'success': False, 
                'result': None, 
                'message': '모델 로딩 실패'
            }
        
        # 4. ✅ 수정된 텍스트 추출기 초기화
        text_extractor = OptimizedTextTableProcessor()
        
        # 5. ✅ 개선된 디렉토리 분리 처리
        print("📖 개선된 텍스트/테이블 분리 처리 시작...")
        
        text_dir = Path("./data/ex_text")      # 텍스트 디렉토리
        table_dir = Path("./data/ex_table")    # 테이블 디렉토리
        
        # 출력 디렉토리 설정
        if 'base' in output_dirs:
            extraction_output_dir = output_dirs['base']
        elif 'markdown_text' in output_dirs:
            extraction_output_dir = output_dirs['markdown_text']
        else:
            extraction_output_dir = list(output_dirs.values())[0]
            
        extraction_result = text_extractor.process_markdown_file(
            text_dir, table_dir, extraction_output_dir
        )
        
        # 결과 확인
        if not extraction_result:
            return {
                'success': False, 
                'result': None, 
                'message': '파일 처리 실패'
            }
        
        all_text_chunks = extraction_result.get('text_chunks', [])
        all_tables = extraction_result.get('tables', [])
        
        print(f"📝 총 텍스트 청크: {len(all_text_chunks)}개")
        print(f"📊 총 테이블: {len(all_tables)}개")
        
        # 6. ✅ 배치 크기 동적 조정 및 수정된 process_batch 호출
        print("🤖 최적화된 LLM 분석 시작...")
        
        total_items = len(all_text_chunks) + len(all_tables)
        if total_items == 0:
            return {
                'success': False,
                'result': None,
                'message': '처리할 텍스트/테이블이 없습니다'
            }
        
        if total_items > 100:
            batch_size = 8
        elif total_items > 50:
            batch_size = 4
        else:
            batch_size = 2
            
        print(f"📦 동적 배치 크기: {batch_size} (총 {total_items}개 항목)")
        
        # ✅ 수정된 process_batch 메서드 호출
        analysis_results = llm_manager.process_batch(all_text_chunks, all_tables, batch_size)
        
        # 7. ✅ 수정된 저장 로직 (타입 키 일치 처리)
        print("💾 분석 결과 저장 시작...")
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # 🔧 분석 결과 데이터 구조 수정 (original_content, analysis 키 추가)
        print("🔧 분석 결과 데이터 구조 수정 중...")
        for result in analysis_results:
            # original_content 키 추가 (마크다운 저장에 필요)
            if 'original_content' not in result:
                result['original_content'] = result.get('text_content', '')
            
            # ✅ type 키는 이미 올바르게 설정됨 (수정된 process_batch에서)
            # 기존의 강제 'text' 설정 제거
            
            # analysis 키 추가 (마크다운 저장에 필요)
            if 'analysis' not in result:
                result['analysis'] = result.get('analysis_result', '')
        
        print(f"🔧 {len(analysis_results)}개 결과 데이터 구조 수정 완료")
        
        # 타입별 분포 확인
        text_count = len([r for r in analysis_results if r.get('type') == 'text'])
        table_count = len([r for r in analysis_results if r.get('type') == 'table'])
        print(f"📊 타입별 분포: 텍스트 {text_count}개, 테이블 {table_count}개")
        
        # ✅ 타입별 분리 저장 사용
        saved_files = OptimizedLLMFileManager.save_analysis_results_by_type(
            analysis_results, output_dirs, f"llm_analysis_final_{timestamp}"
        )
        
        # 결과 통합
        md_saved = saved_files.get('text_markdown') or saved_files.get('table_markdown')
        json_saved = saved_files.get('text_json') or saved_files.get('table_json')
        
        print(f"✅ 저장된 파일:")
        print(f"   📝 마크다운: {md_saved}")
        print(f"   📊 JSON: {json_saved}")
        
        # 모든 저장된 파일 표시
        if saved_files:
            print(f"   📄 전체 저장 파일:")
            for file_type, file_path in saved_files.items():
                if file_path:
                    print(f"      {file_type}: {file_path}")
        
        # ✅ 최종 반환값
        result = {
            'success': True,
            'total_files_processed': len(markdown_files),
            'total_text_chunks': len(all_text_chunks),
            'total_tables': len(all_tables),
            'total_analysis_results': len(analysis_results),
            'text_results_count': text_count,
            'table_results_count': table_count,
            'saved_files': {
                'markdown': md_saved,
                'json': json_saved,
                'all_saved_files': saved_files
            },
            'message': '모든 문제 해결 완료!'
        }
        
        return result
        
    except Exception as e:
        print(f"❌ 파이프라인 실행 오류: {e}")
        import traceback
        traceback.print_exc()
        return {
            'success': False, 
            'result': None, 
            'message': f'오류 발생: {str(e)}'
        }
    
    finally:
        # 리소스 정리
        if 'llm_manager' in locals():
            if hasattr(llm_manager, 'cleanup'):
                llm_manager.cleanup()
            else:
                print("🧹 LLM 매니저 리소스 정리 (cleanup 메서드 없음)")

print("✅ 최종 수정된 execute_llm_pipelined_final_fixed 함수 로딩 완료!")

✅ 최종 수정된 execute_llm_pipelined_final_fixed 함수 로딩 완료!


In [20]:

# 🧪 수정된 LLM 파이프라인 테스트 실행

print("🧪 수정된 LLM 파이프라인을 실행합니다... (모든 문제 해결)")
print("="*60)

# 최종 수정 버전 실행
llm_pipeline_result_final = execute_llm_pipeline()

print(f"\n📋 최종 LLM 파이프라인 결과: {llm_pipeline_result_final.get('message', 'N/A')}")

if llm_pipeline_result_final.get('success'):
    print(f"\n🎉 최종 성공!")
    print(f"   📁 처리된 파일: {llm_pipeline_result_final.get('total_files_processed')}개")
    print(f"   📝 텍스트 청크: {llm_pipeline_result_final.get('total_text_chunks')}개") 
    print(f"   📊 테이블: {llm_pipeline_result_final.get('total_tables')}개")
    print(f"   🔍 분석 결과: {llm_pipeline_result_final.get('total_analysis_results')}개")
    print(f"   📝 텍스트 결과: {llm_pipeline_result_final.get('text_results_count')}개")
    print(f"   📊 테이블 결과: {llm_pipeline_result_final.get('table_results_count')}개")
    
    saved_files = llm_pipeline_result_final.get('saved_files', {})
    print(f"\n💾 저장된 파일들:")
    for file_type, file_path in saved_files.items():
        if file_path and file_type != 'all_saved_files':
            print(f"   📄 {file_type}: {file_path}")
            
    # 모든 저장 파일 상세 표시
    all_saved = saved_files.get('all_saved_files', {})
    if all_saved:
        print(f"\n📂 전체 저장 파일 목록:")
        for key, path in all_saved.items():
            if path:
                print(f"   {key}: {path}")
else:
    print(f"\n❌ 실행 실패: {llm_pipeline_result_final.get('message')}")
    
print(f"\n{'='*60}")
print("🏁 수정된 LLM 파이프라인 테스트 완료")

🧪 수정된 LLM 파이프라인을 실행합니다... (모든 문제 해결)
🛠️ 최종 수정된 LLM 파이프라인 실행 시작! (모든 문제 해결)
📄 처리할 디렉토리: data/ex_text
📝 발견된 마크다운 파일: 9개
📁 LLM 출력 디렉토리 생성 완료: llm_analysis_output
   📝 텍스트 마크다운: markdown_text_result
   📊 테이블 마크다운: markdown_table_result
   📝 텍스트 JSON: json_text_result
   📊 테이블 JSON: json_table_result
🔍 생성된 출력 디렉토리 구조:
   base: llm_analysis_output
   markdown_text: llm_analysis_output/markdown_text_result
   markdown_table: llm_analysis_output/markdown_table_result
   json_text: llm_analysis_output/json_text_result
   json_table: llm_analysis_output/json_table_result
   analysis: llm_analysis_output/analysis_results
   extraction: llm_analysis_output/text_extraction_output
🤖 수정된 LLM 모델 로딩 중...
🔄 google/gemma-3-4b-it 모델 로딩 중...
🧹 GPU 메모리 정리 완료
📝 토크나이저 로딩...


tokenizer_config.json:   0%|          | 0.00/1.16M [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.69M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/33.4M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/35.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/662 [00:00<?, ?B/s]

🤖 모델 로딩...


config.json:   0%|          | 0.00/855 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/90.6k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/3.64G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.96G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/215 [00:00<?, ?B/s]

✅ google/gemma-3-4b-it 로딩 완료!
🎯 사용 디바이스: cuda
🔄 Kiwi 형태소 분석기 초기화 중...
✅ Kiwi 초기화 완료
📖 개선된 텍스트/테이블 분리 처리 시작...
📖 텍스트 파일 처리 시작...
📁 9개의 텍스트 파일 발견
🔄 텍스트 처리 중: 등록일_2025.06.19_(현지정보_250618)_2025_6월_FOMC_시장반응_f.md (1/9)
🧠 Kiwi 문장 분리 시작: 등록일_2025.06.19_(현지정보_250618)_2025_6월_FOMC_시장반응_f.md
📝 29개 문장 분리 완료
✅ 6개 청크 생성 완료
🔄 텍스트 처리 중: 등록일_2025.06.12_[현지정보]_美_2025.5월_소비자물가_동향_및_금융시장_반응_F.md (2/9)
🧠 Kiwi 문장 분리 시작: 등록일_2025.06.12_[현지정보]_美_2025.5월_소비자물가_동향_및_금융시장_반응_F.md
📝 25개 문장 분리 완료
✅ 5개 청크 생성 완료
🔄 텍스트 처리 중: 등록일_2025.06.09_(현지정보)_美_2025.5월_고용지표_내용_및_뉴욕_금융시장_반응_f.md (3/9)
🧠 Kiwi 문장 분리 시작: 등록일_2025.06.09_(현지정보)_美_2025.5월_고용지표_내용_및_뉴욕_금융시장_반응_f.md
📝 24개 문장 분리 완료
✅ 4개 청크 생성 완료
🔄 텍스트 처리 중: 등록일_2025.06.04_[현지정보]_25년_6월_캐나다_중앙은행_정책회의_결과_및_시장_반응.md (4/9)
🧠 Kiwi 문장 분리 시작: 등록일_2025.06.04_[현지정보]_25년_6월_캐나다_중앙은행_정책회의_결과_및_시장_반응.md
📝 22개 문장 분리 완료
✅ 7개 청크 생성 완료
🔄 텍스트 처리 중: 등록일_2025.05.20_[현지정보]_미국_신용등급_하향_조정에_대한_시장참가자_평가.md (5/9)
🧠 Kiwi 문장 분리 시작: 등록일_2025.05.20_[현지정보]_미국_신용등급_하향_조정에_대한_시장참가자_평가.md
📝 46개 문장 분리 완료


# Qdrant 벡터 DB & Reranker

🗄️ 벡터 DB: 분석 결과를 임베딩하여 Qdrant에 저장
🔍 검색 테스트: 여러 검색어로 벡터 검색 테스트

In [None]:
# Transformers 및 기타 라이브러리
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct