In [1]:
import sys
import os

# 패키지 경로 추가 (예: 패키지가 있는 디렉토리)
package_path = os.path.abspath('/home/bjh/dacon/pdf/RAG')
if package_path not in sys.path:
    sys.path.append(package_path)

In [2]:
from pymupdf4llm.helpers.pymupdf_rag import to_markdown
try:
    import pymupdf as fitz
except ImportError:
    import fitz

import string
import os
import tabula
import pypdf

In [3]:
from glob import glob

project_path = '/home/bjh/dacon/pdf/dacon'

data_file_list = glob(project_path + '/test_source/*')
data_file_list.sort()

data_file_list

['/home/bjh/dacon/pdf/dacon/test_source/「FIS 이슈 & 포커스」 22-4호 《중앙-지방 간 재정조정제도》 v2.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/「FIS 이슈 & 포커스」 23-2호 《핵심재정사업 성과관리》 v2.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/「FIS 이슈 & 포커스」(신규) 통권 제1호 《우발부채》 v2.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/「FIS 이슈&포커스」 22-2호 《재정성과관리제도》 v2.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/국토교통부_행복주택출자.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/보건복지부_노인장기요양보험 사업운영.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/보건복지부_부모급여(영아수당) 지원.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/산업통상자원부_에너지바우처.pdf',
 '/home/bjh/dacon/pdf/dacon/test_source/중소벤처기업부_혁신창업사업화자금(융자).pdf']

In [12]:
text = to_markdown(data_file_list[8], page_chunks=True)
print(text[0]['text'])

## 사  업  명
 혁신창업사업화자금(융자) (5152-301)
 1. 사업 코드 정보
 2. 사업 지원 형태 및 지원율
 3. 지출계획 총괄표
(단위: 백만원, %)

## 4. 사업목적
ㅇ (창업기반지원) 기술력과 사업성이 우수하고 미래 성장가능성이 높은 중소벤처기업의
창업을 활성화하고 고용창출 도모
ㅇ (개발기술사업화) 중소기업이 보유한 우수 기술의 사장을 방지하고 개발기술의
제품화·사업화를 촉진하여 기술기반 중소기업을 육성
## 5. 사업근거 및 추진경위
ㅇ 법령상 근거 : 중소기업진흥에 관한 법률 제66조, 제67조, 제74조
중소기업창업지원법 제35조



# 1. Utils

In [7]:
import io
import matplotlib.pyplot as plt
from PIL import Image


# check mediabox == rect == crop
def check(page):
    mediaW,mediaH = page.mediabox.width, page.mediabox.height
    rectW,rectH = page.rect.width, page.rect.height
    cropW,cropH = page.cropbox.width, page.cropbox.height
    flag = (mediaW == rectW) and (rectW == cropW) and (mediaH == rectH) and (rectH == cropH)
    return flag, mediaW, mediaH


def pdf_visualize(page, rect, matrix=[4, 4], figsize = (11,15),show=True):

    # 페이지 일부를 이미지로 변환
    matrix = fitz.Matrix(*matrix) # 해상도 높이기
    pix = page.get_pixmap(clip=rect, matrix=matrix)


    # 이미지 데이터를 PIL 이미지로 변환
    img = Image.open(io.BytesIO(pix.tobytes()))

    # 이미지를 Jupyter Notebook에서 바로 표시
    if show:
        plt.figure(figsize=figsize)
        plt.imshow(img)
        plt.axis('off')  # 축 숨기기
        plt.show()

    return pix

# 2. file read

## 2.1 트리 클래스 정의(목차 구조)

In [9]:
class Node:
    def __init__(self, title, page, contents=None, parent=None, children=None, level=0, next_=None):
        self.title = title
        self.page = page
        self.contents = contents if contents is not None else list()
        self.tables = []
        self.parent = parent
        self.children = children if children is not None else dict()
        self.level = level
        self.next = next_

    def add_child(self, child:'Node'):
        child.parent = self
        self.children[len(self.children)] = child

    def __repr__(self):
        return self.title
    
    def is_in_page(self, pno):
        if pno >= self.page[0] and pno <= self.page[1]:
            return True
        
    def is_leaf(self):
        return len(self.children) <= 0 
    
    def display_tree(self, level=0):
        ret = "\t" * level + f"{self.title} ({self.page})" + "\n"
        for i, child in self.children.items():
            ret += child.display_tree(level + 1)
        return ret
    
    def print_contents(self, level=0):
        ret = f"level:{level}" + f"{self.title} ({self.page})" + "\n" + "contents: "
        for c in self.contents:
            ret += c + '\n'
        ret += '\n======\n\n'
        for i, child in self.children.items():
            ret += child.print_contents(level + 1)
        return ret
    
class Tree:
    def __init__(self, root):
        self.tree = root
    
    def display_tree(self, level=0):
        return self.tree.display_tree(level)
    
    def print_contents(self, level=0):
        return self.tree.print_contents(level)
    
    def get_nodes(self, pno, nodes=[], curr=None):
        if curr is None:
            curr = self.tree
            nodes.clear()

        if curr.is_in_page(pno):
            nodes.append(curr)
        
        if not curr.is_leaf():
            for _, child in curr.children.items():
                self.get_nodes( pno, nodes, child)
        return nodes
    
    def to_list(self, treeL=[], curr=None):
        if curr is None:
            curr = self.tree
            treeL.clear()
        
        treeL.append(curr)
        if not curr.is_leaf():
            for _, child in curr.children.items():
                self.to_list( treeL, child)

        return treeL



## 2.2 목차 형식으로 읽는 함수(** 목차의 텍스트가 본문에 그대로 존재해야함)

In [10]:
from copy import deepcopy
import re
from langchain.docstore.document import Document

def build_tree_from_file(file_path):
    root = None
    node_stack = []
    prev_node = None
    
    with open(file_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            # 공백 제거
            line = line.rstrip()
            if not line:
                continue

            # 들여쓰기의 수를 계산
            indent_level = (len(line) - len(line.lstrip())) / 4 # \t
            line = line.lstrip()

            title, page1, page2 = line.split('::')
            title, page1, page2 = title.strip(), int(page1.strip()), int(page2.strip())

            new_node = Node(title, (page1, page2))
            if prev_node is not None:
                prev_node.next = new_node

            if indent_level == 0:
                # 루트 노드 설정
                root = new_node
                node_stack = [root]
            else:
                # 현재 노드의 부모 노드 찾기
                while len(node_stack) > indent_level:
                    node_stack.pop()

                parent_node = node_stack[-1]
                parent_node.add_child(new_node)
                new_node.parent = parent_node
                new_node.level = len(node_stack)

                # 스택에 현재 노드를 추가
                node_stack.append(new_node)
                prev_node = new_node

    return root

def make_tree_contents(doc, table_contents_0):
    pno = 0
    table_contents = deepcopy(table_contents_0)
    curr = table_contents.tree
    doc_c =deepcopy(doc)
    page = doc_c[pno]

    for nxt in table_contents.to_list()[1:]:
        while not nxt.is_in_page(pno):
            curr.contents.append(page['text'])
            curr.tables += page["tables_s"]
            pno += 1
            if pno < len(doc_c): page = doc_c[pno] 
            else: break
        if nxt.is_in_page(pno):
            # pattern = r'[\*\s]*'.join(map(re.escape, nxt.title.split()))
            pattern = ''.join([re.escape(char) + r'(?:\*+|\s+|\[\d+\)\])*' for word in nxt.title.split() for char in word])
            # pattern = r'(?:\s*\[\d+\)\]\s*)?([^\[\]]+)'
            match = re.search(pattern, page['text'])
            
            t1, t2 = page['text'][:match.start()], page['text'][match.end():]

            match_t = len(re.findall('<표', t1))
            if t1 != '': curr.contents.append(t1)
            if match_t > 0:
                curr.tables += page['tables_s'][:match_t]
                page['tables_s'] = page['tables_s'][match_t:]
            page['text'] = t2
            
        curr = nxt

    curr.contents.append(page['text'])
    pno += 1
    while pno < len(doc_c):
        page = doc_c[pno]
        curr.contents.append(page['text'])
        pno += 1

    return table_contents


def split_docs(table_contents, splited_documents, text_splitter):
    def get_title(node):
        title = node.title
        while node.parent is not None:
            title = node.parent.title + " > " + title
            node = node.parent
        return title

    for node in table_contents.to_list():
        
        title = get_title(node)
        
        text = '\n'.join(node.contents).replace("#", "")
        text =  re.sub(r'[#\*]+', '', text) # 약간의 전처리

        if text != '':
            # tables. do not split tables
            pattern = r'^.*<표.*$'
            matches = re.findall(pattern, text, re.MULTILINE)
            tables = [title + '\n' + ti + '\n' + ta  for ti, ta in zip(matches, node.tables) ]
            for t in tables:
                splited_documents.append(Document(page_content=t, metadata={"title": title}))
            
            for spd in text_splitter.split_documents([Document(page_content=text, metadata={"title": title})]):
                spd.page_content = title + '\n' + spd.page_content
                splited_documents.append(spd)

## 2.3 목차가 존재하는 문서 읽기

In [16]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from tqdm import tqdm

text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50, separators=["\n\n", "- ", "\n",  " ", ""])

# 문서에 목차가 존재하는 경우
contenns_table_paths = [
    '「FIS 이슈 & 포커스」 22-4호 《중앙-지방 간 재정조정제도》.txt',
    '「FIS 이슈 & 포커스」 23-2호 《핵심재정사업 성과관리》.txt',
    '「FIS 이슈 & 포커스」(신규) 통권 제1호 《우발부채》.txt',
    '「FIS 이슈&포커스」 22-2호 《재정성과관리제도》.txt'
]

pdf_file_paths = [
    '「FIS 이슈 & 포커스」 22-4호 《중앙-지방 간 재정조정제도》 v2.pdf',
    '「FIS 이슈 & 포커스」 23-2호 《핵심재정사업 성과관리》 v2.pdf',
    '「FIS 이슈 & 포커스」(신규) 통권 제1호 《우발부채》 v2.pdf',
    '「FIS 이슈&포커스」 22-2호 《재정성과관리제도》 v2.pdf'
]

splited_documents = []

for pdfP, ctP in tqdm(zip(pdf_file_paths, contenns_table_paths)):
    ctP = project_path+'/test_source_contents_tables/'+ctP
    pdfP = project_path+'/test_source/'+pdfP

    table_contents_0 = Tree(build_tree_from_file(ctP))
    doc = to_markdown(pdfP, page_chunks=True, margins=(0, 70, 0, 70))

    table_contents = make_tree_contents(doc, table_contents_0)
    split_docs(table_contents, splited_documents, text_splitter)

4it [00:10,  2.56s/it]


## 2.4 목차가 존재하지 않는 문서 읽기

In [15]:
file_paths = [
    '/home/bjh/dacon/pdf/RAG/test_source/국토교통부_행복주택출자.pdf',
    '/home/bjh/dacon/pdf/RAG/test_source/보건복지부_노인장기요양보험 사업운영.pdf',
    '/home/bjh/dacon/pdf/RAG/test_source/보건복지부_부모급여(영아수당) 지원.pdf',
    '/home/bjh/dacon/pdf/RAG/test_source/산업통상자원부_에너지바우처.pdf',
    '/home/bjh/dacon/pdf/RAG/test_source/중소벤처기업부_혁신창업사업화자금(융자).pdf'
]
splited_documents2 = []
text_splitter2 = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50, separators=["\n\n\n\n", "\n\n\n", "\n\n", "#", "- ", "\n",  " ", ""])


for file_path in tqdm(file_paths):
    title = file_path.split("/")[-1].replace(".pdf", "")
    doc_ = to_markdown(pdfP, margins=(0, 70, 0, 70))
    splited_documents2 += text_splitter2.split_documents([Document(page_content=doc_, metadata={"title": title})])

  0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 5/5 [00:15<00:00,  3.16s/it]


# 3 db 저장

In [None]:
splited_documents, splited_documents2

In [36]:
import pickle

data = splited_documents + splited_documents2
with open("./tot_documnets.pkl", "wb") as f:
    pickle.dump(data, f)

In [None]:
with open("./tot_documnets.pkl", "rb") as f:
    loaded_data = pickle.load(f)

loaded_data