In [9]:

from langchain.text_splitter import MarkdownTextSplitter
import pymupdf4llm
import os
import pathlib
import re
import fitz

MD_DIRECTORY = "../../md_data"
PDF_DIRECTORY = "../../temp_pdf"
out_base = MD_DIRECTORY
img_base = MD_DIRECTORY + "/images"
md_base = MD_DIRECTORY + "/md"
txt_base = MD_DIRECTORY + "/txt"
os.makedirs(out_base, exist_ok=True)
os.makedirs(img_base, exist_ok=True)
os.makedirs(md_base, exist_ok=True)
os.makedirs(txt_base, exist_ok=True)
MAX_CHUNK = 640
MIN_CHUNK = 512
CONTROL_SPACE_REGEX = re.compile(
    r'[\x00-\x1F\x7F\u00A0\u1680\u180E\u2000-\u200F\u2028\u2029\u202F\u205F\u2060\u2061\u2062\u2063\u2064\uFEFF]'
)

In [10]:

# for filename in os.listdir(PDF_DIRECTORY):
#     if filename.endswith(".pdf"):
        
#         filename_s = filename[:-4]  # Remove '.pdf'
#         pdf_path = os.path.join(PDF_DIRECTORY, filename)
#         with fitz.open(pdf_path) as doc:
#             num_pages = len(doc)
#         for i in range(num_pages):
            
#             file_path_md = os.path.join(md_base, f"{filename_s}_page_{i+1}.md")

#             md_text = pymupdf4llm.to_markdown(
#                 f"./{PDF_DIRECTORY}/{filename}",
#                 write_images=False,
#                 filename=f"{filename_s}",
#                 pages=[i]
#             )
#             pathlib.Path(file_path_md).write_bytes(md_text.encode())

In [11]:

def clean_md_text(text):
    # Split text into lines
    text = re.sub(r"-[\u00AD\u200B\u200C\u200D\u200E\u200F]*\s*\n[\u00AD\u200B\u200C\u200D\u200E\u200F]*\s*", "", text)
    text = re.sub(r"[\u00AD\u200B\u200C\u200D\u200E\u200F]\s*", "", text)
    
    lines = text.split('\n')
    
    cleaned_lines = []
    
    for line in lines:
        stripped_line = line.strip()
        # Skip lines that only contain a number (e.g., page numbers)
        if re.fullmatch(r'\s*\d+\s*', line):
            continue
        # Skip empty lines
        if not stripped_line:
            continue
        cleaned_lines.append(line)
    
    # Join all lines into one paragraph-like text
    # merged_text = ' '.join(cleaned_lines)
    # Re-join lines for further processing
    merged_text = "\n".join(cleaned_lines)
    # Fix hyphenated line breaks: "infor-\nmation" → "information"
    # merged_text = re.sub(r"-\s*\n\s*", "", merged_text)
    # Normalize whitespace
    merged_text = re.sub(r"\s+", " ", merged_text)
    # Clean up space before punctuation
    # merged_text = re.sub(r" +\.\s", ". ", merged_text)
    
    return CONTROL_SPACE_REGEX.sub('', merged_text).strip()
    
    return merged_text

def get_sentence_end(paragraph):
    sentence_end = max(
        (m.end() for m in re.finditer(r'(?<=[.!?])\s', paragraph[:MAX_CHUNK])),
        default=None
    )
    if not sentence_end:
        sentence_end = paragraph[:MAX_CHUNK].rfind('\n')
    if sentence_end <= 0:
        sentence_end = paragraph[:MAX_CHUNK].rfind(' ')
    if sentence_end <= 0:
        sentence_end = MAX_CHUNK
    return sentence_end

def split_large_paragraph(paragraph):
    chunks = []
    while len(paragraph) > MAX_CHUNK:
        sentence_end = get_sentence_end(paragraph)
        chunk = paragraph[:sentence_end].strip()
        chunks.append(chunk)
        paragraph = paragraph[sentence_end:].strip()
    if paragraph:
        chunks.append(paragraph)
    return chunks

def is_title_like(paragraph):
    words = paragraph.strip().split()
    return len(paragraph) < 50 and len(words) <= 6

def is_page_number_like(paragraph):
    words = paragraph.strip().split()
    return len(paragraph) < 5 and len(words) <= 2

def split_into_paragraphs(text):
    lines = text.splitlines()
    paragraphs = []
    buffer = []
    in_table = False

    for line in lines:
        stripped = line.strip()

        if stripped.startswith("|"):
            # Table row
            buffer.append(line)
            in_table = True
        elif in_table and not stripped:
            # Blank line ends the table
            paragraphs.append("\n".join(buffer).strip())
            buffer = []
            in_table = False
        elif in_table:
            # Still in table
            buffer.append(line)
        elif not stripped:
            # Blank line ends current paragraph
            if buffer:
                paragraphs.append("\n".join(buffer).strip())
                buffer = []
        else:
            # Normal paragraph line
            buffer.append(line)

    # Add any trailing content
    if buffer:
        paragraphs.append("\n".join(buffer).strip())

    return [p for p in paragraphs if p]

def para_split(text):
    full_text = []
    growing_chunk = ""
    title_buffer = ""
    paragraphs = split_into_paragraphs(text)

    for paragraph in paragraphs:
        para_len = len(paragraph)

        if paragraph.startswith("|"):
            # Always flush before and after a table block
            if growing_chunk.strip():
                full_text.append(growing_chunk.strip())
                growing_chunk = ""
            full_text.append(paragraph)
            continue

        if is_title_like(paragraph):
            title_buffer = paragraph
            continue

        if title_buffer:
            paragraph = title_buffer + "\n\n" + paragraph
            title_buffer = ""

        if para_len < 50:
            growing_chunk += paragraph + "\n\n"
        elif para_len < MIN_CHUNK and len(growing_chunk) + para_len < MAX_CHUNK:
            growing_chunk += paragraph + "\n\n"
        else:
            if growing_chunk.strip():
                full_text.append(growing_chunk.strip())
                growing_chunk = ""

            if para_len > MAX_CHUNK:
                full_text.extend(split_large_paragraph(paragraph))
            else:
                full_text.append(paragraph)

    if title_buffer:
        growing_chunk += title_buffer + "\n\n"

    if growing_chunk.strip():
        full_text.append(growing_chunk.strip())

    return full_text

def remove_md_stuff(text):
    content = re.sub(r'(?:\n)?#{1,6}|(?:\n)?```(?:.|\n)*?```|(?:\n)?---+|(?:\n)?___+', '', text)

    # Replace spaces with a single space
    content = re.sub(r' +', ' ', content)

    # Remove bold (handles **bold** and __bold__)
    content = re.sub(r'(\*\*|__)(.*?)\1', r'\2', content)

    # Remove italic (handles *italic* and _italic_)
    content = re.sub(r'(\*|_)(.*?)\1', r'\2', content)
        
    return content

In [12]:

md_files = list(pathlib.Path(md_base).rglob("*.md"))
splitter = MarkdownTextSplitter(chunk_size=512, chunk_overlap=0)

for md_file in md_files:
    # Read the .md file
    with md_file.open("r", encoding="utf-8") as f:
        markdown_text = f.read()

    markdown_text = remove_md_stuff(markdown_text)
    # Split into chunks
    docs = para_split(markdown_text)
    # docs = splitter.split_text(remove_md_stuff(markdown_text))

    # Get the relative path from md_base
    relative_path = md_file.relative_to(md_base)

    # Build the new path under txt_base with .txt extension
    txt_file_path = txt_base / relative_path.with_suffix(".txt")

    # Ensure the parent directory exists
    txt_file_path.parent.mkdir(parents=True, exist_ok=True)

    # Write chunks to the new .txt file
    with txt_file_path.open("w", encoding="utf-8") as f:
        for i, chunk in enumerate(docs):
            chunk = clean_md_text(chunk)
            if not is_page_number_like(chunk):
                f.write(f"Chunk: {i+1}\n{chunk}\n\n")