In [55]:
# Adjust the file path as needed.
file_path = r'D:\ML\Thesis_chatbot\Data\out\preprocessed_md.md'

with open(file_path, 'r', encoding='utf-8') as f:
    markdown_text = f.read()

In [56]:
def split_markdown_into_sections(text: str) -> list:
    sections = []
    lines = text.splitlines()
    current_heading = None
    current_content = []
    
    for line in lines:
        # Skip lines that are completely blank.
        if not line.strip():
            continue
        
        if line.strip().startswith("#"):
            # When encountering a heading, flush the previous section, if any.
            if current_heading is not None or current_content:
                # If there's a heading, add it at the top of the content.
                if current_heading:
                    full_content = current_heading + "\n" + "\n".join(current_content).strip()
                else:
                    full_content = "\n".join(current_content).strip()
                section = {
                    "heading": current_heading if current_heading else "",
                    "content": full_content
                }
                if section["heading"] or section["content"]:
                    sections.append(section)
                # Reset the content accumulator.
                current_content = []
            # Update the current heading from the new heading line.
            current_heading = line.strip().lstrip("#").strip()
        else:
            # Accumulate content lines.
            current_content.append(line)
    
    # Flush any remaining content after the loop.
    if current_heading is not None or current_content:
        if current_heading:
            full_content = current_heading + "\n" + "\n".join(current_content).strip()
        else:
            full_content = "\n".join(current_content).strip()
        section = {
            "heading": current_heading if current_heading else "",
            "content": full_content
        }
        if section["heading"] or section["content"]:
            sections.append(section)
    
    return sections


In [None]:
# 2. Split the markdown into sections.
sections = split_markdown_into_sections(markdown_text)
print("Total sections found:", len(sections))
sections

In [58]:
import re

# Utility Functions
def unique_list(l: list) -> list:
    """Removes duplicates from a list while preserving order."""
    return list(dict.fromkeys(l))

def estimate_tokens(text: str) -> int:
    """Dummy token estimator based on whitespace splits."""
    return len(text.split())

def extract_overlap_text(chunk_text: str, overlap_token_count: int = 50) -> str:
    """
    Extract an overlap snippet for continuity. Skips atomic lines like tables, formulas and images.
    """
    lines = chunk_text.splitlines()
    selected_lines = []
    for line in lines:
        stripped = line.strip()
        if (stripped.startswith("|") or stripped.startswith("$$") or 
            stripped.startswith("![") or stripped.startswith("TABLE_TITLE:") or 
            stripped.startswith("TABLE_COLUMNS:") or stripped.startswith("Where:") or 
            stripped.startswith("-")):
            continue
        selected_lines.append(line)
    selected_text = " ".join(selected_lines).strip()
    words = selected_text.split()
    if len(words) > overlap_token_count:
        return " ".join(words[-overlap_token_count:])
    return selected_text

def extract_media_metadata(text: str) -> dict:
    """
    Extracts media metadata from markdown for images and tables.
    Expects images to be marked with ![IMG_TITLE: <caption>] and tables with TABLE_TITLE: markers.
    """
    image_metadata = []
    table_metadata = []

    # Images via IMG_TITLE marker.
    image_pattern = re.compile(r'!\[IMG_TITLE:\s*(.*?)\]')
    for match in image_pattern.finditer(text):
        image_metadata.append({'title': match.group(1).strip()})

    # Tables via TABLE_TITLE markers.
    table_title_pattern = re.compile(r'TABLE_TITLE:\s*(.*)')
    for match in table_title_pattern.finditer(text):
        table_metadata.append({'title': match.group(1).strip()})
    
    # Bottom table captions.
    table_bottom_pattern = re.compile(r"(Table\s+\d+(?:\.\d+)?):\s*(.+)")
    for match in table_bottom_pattern.finditer(text):
        combined_title = f"{match.group(1).strip()}: {match.group(2).strip()}"
        table_metadata.append({'title': combined_title})
    return {'images': image_metadata, 'tables': table_metadata}

def unify_metadata(metadata: dict) -> str:
    """
    Converts the structured metadata dictionary (with keys like "headings", "images", "tables")
    into a unified string with explicit labels and delimiters.
    """
    parts = []
    if "headings" in metadata and metadata["headings"]:
        join_headings = "; ".join(metadata["headings"])
        parts.append("Headings: " + join_headings)
    if "images" in metadata and metadata["images"]:
        join_images = "; ".join(metadata["images"])
        parts.append("Figures: " + join_images)
    if "tables" in metadata and metadata["tables"]:
        join_tables = "; ".join(metadata["tables"])
        parts.append("Tables: " + join_tables)
    unified_str = " | ".join(parts)
    return unified_str

def contains_atomic(text: str) -> bool:
    """
    Returns True if the text contains atomic elements (tables, formulas or images).
    """
    if re.search(r'^\s*\|', text, flags=re.MULTILINE):
        return True
    if "$$" in text:
        return True
    if re.search(r'!\[', text):
        return True
    return False

def split_text_by_token_limit(text: str, token_limit: int, overlap: int) -> list:
    """
    Splits a (possibly long) text into subchunks where each subchunk's token
    count does not exceed token_limit. An overlap is maintained between subchunks.
    """
    words = text.split()
    subchunks = []
    start = 0
    while start < len(words):
        end = start + token_limit
        sub_chunk = " ".join(words[start:end])
        subchunks.append(sub_chunk)
        start = max(0, end - overlap)
    return subchunks


In [59]:
def combine_sections_to_chunks(sections: list,
                               token_limit_normal: int = 400,
                               token_limit_atomic: int = 2000,
                               overlap_token_count: int = 50) -> list:
    """
    Combines section dictionaries (from split_markdown_into_sections) into chunks.

    Requirements:
      1. Try to keep chunk size up to token_limit_normal (400 tokens) for any section.
      2. If an atomic section is larger than 400 tokens, allow it to extend up to token_limit_atomic (2000 tokens).
      3. If even with token_limit_atomic the atomic section is not fully covered, then split it.
         In that case, first attempt to split on internal headings. If internal headings are found,
         create subchunks based on those splits. Otherwise, fall back to token-based splitting.
         Every resulting subchunk carries the section’s heading in its metadata.
      4. If a heading appears at the end of a chunk, remove all trailing heading lines and store 
         them so that they are immediately prepended to the next chunk's content.
      5. Finally, if a chunk’s text is identical to its derived overlap text (i.e. it only contains metadata
         and overlap text), then the chunk is removed.

    A brief overlap snippet (of overlap_token_count tokens) is attached between chunks only if no pending heading exists.
    """
    chunks = []
    current_chunk_sections = []  # Accumulates text for the current chunk.
    current_chunk_metadata = {"headings": [], "images": [], "tables": []}
    overlap_prefix = ""
    pending_heading = ""  # Stores one or more trailing heading lines to be injected into the next chunk.

    def flush_current_chunk() -> str:
        nonlocal current_chunk_sections, current_chunk_metadata, pending_heading, overlap_prefix
        if not current_chunk_sections:
            return ""
        chunk_text = "\n".join(current_chunk_sections).strip()

        # --- New Code: Remove all trailing heading lines ---
        lines = chunk_text.splitlines()
        heading_lines = []
        # Remove lines from the bottom as long as they exactly match one of the accumulated headings.
        while lines and (lines[-1].strip() in current_chunk_metadata["headings"]):
            heading_lines.append(lines.pop())
        # Reconstruct the chunk text without the trailing headings.
        chunk_text = "\n".join(lines).strip()
        # Save the removed headings (in original order) as pending_heading.
        if heading_lines:
            pending_heading = "\n".join(reversed(heading_lines))
        else:
            pending_heading = ""
        # ------------------------------------------------------

        # Remove markdown heading markers.
        cleaned_text = re.sub(r'^#+\s*', '', chunk_text, flags=re.MULTILINE)
        media_metadata = extract_media_metadata(cleaned_text)
        combined_metadata = {
            "headings": unique_list(current_chunk_metadata["headings"]),
            "images": unique_list(current_chunk_metadata["images"] + [item['title'] for item in media_metadata.get('images', [])]),
            "tables": unique_list(current_chunk_metadata["tables"] + [item['title'] for item in media_metadata.get('tables', [])])
        }
        unified_meta = unify_metadata(combined_metadata)
        chunks.append({
            "chunk_text": cleaned_text,
            "metadata": unified_meta
        })
        # If no pending heading is captured, create an overlap snippet;
        # otherwise, leave overlap snippet empty (because pending_heading will be injected).
        overlap_snippet = "" if pending_heading else extract_overlap_text(cleaned_text, overlap_token_count)
        current_chunk_sections = []
        current_chunk_metadata = {"headings": [], "images": [], "tables": []}
        return overlap_snippet

    for section in sections:
        section_text = section["content"]
        section_heading = section["heading"]
        section_tokens = estimate_tokens(section_text)
        is_atomic = contains_atomic(section_text)

        # Process atomic sections that exceed the normal limit.
        if is_atomic and section_tokens > token_limit_normal:
            if current_chunk_sections:
                overlap_prefix = flush_current_chunk()
            if section_tokens <= token_limit_atomic:
                media_metadata = extract_media_metadata(section_text)
                unified_meta = unify_metadata({
                    "headings": [section_heading] if section_heading else [],
                    "images": unique_list([item['title'] for item in media_metadata.get('images', [])]),
                    "tables": unique_list([item['title'] for item in media_metadata.get('tables', [])])
                })
                chunks.append({
                    "chunk_text": section_text,
                    "metadata": unified_meta
                })
                overlap_prefix = extract_overlap_text(section_text, overlap_token_count)
            else:
                # Atomic section exceeds token_limit_atomic.
                # First try to split based on internal headings.
                internal_sections = split_markdown_into_sections(section_text)
                if len(internal_sections) > 1:
                    for subsec in internal_sections:
                        sub_text = subsec["content"]
                        if estimate_tokens(sub_text) > token_limit_atomic:
                            subchunks = split_text_by_token_limit(sub_text, token_limit_atomic, overlap_token_count)
                            for sub in subchunks:
                                media_metadata = extract_media_metadata(sub)
                                unified_meta = unify_metadata({
                                    "headings": [section_heading] if section_heading else [],
                                    "images": unique_list([item['title'] for item in media_metadata.get('images', [])]),
                                    "tables": unique_list([item['title'] for item in media_metadata.get('tables', [])])
                                })
                                chunks.append({
                                    "chunk_text": sub,
                                    "metadata": unified_meta
                                })
                        else:
                            media_metadata = extract_media_metadata(sub_text)
                            unified_meta = unify_metadata({
                                "headings": [section_heading] if section_heading else [],
                                "images": unique_list([item['title'] for item in media_metadata.get('images', [])]),
                                "tables": unique_list([item['title'] for item in media_metadata.get('tables', [])])
                            })
                            chunks.append({
                                "chunk_text": sub_text,
                                "metadata": unified_meta
                            })
                    overlap_prefix = ""
                else:
                    # Fallback: split purely by token count.
                    subchunks = split_text_by_token_limit(section_text, token_limit_atomic, overlap_token_count)
                    for sub in subchunks:
                        media_metadata = extract_media_metadata(sub)
                        unified_meta = unify_metadata({
                            "headings": [section_heading] if section_heading else [],
                            "images": unique_list([item['title'] for item in media_metadata.get('images', [])]),
                            "tables": unique_list([item['title'] for item in media_metadata.get('tables', [])])
                        })
                        chunks.append({
                            "chunk_text": sub,
                            "metadata": unified_meta
                        })
                    overlap_prefix = ""
            continue  # Move to next section.

        # Process non-atomic sections (or atomic sections within token_limit_normal).
        if not current_chunk_sections:
            # Always inject pending_heading (if it exists) into every new chunk,
            # regardless of any overlap_prefix.
            if pending_heading:
                current_chunk_sections.append(pending_heading)
                pending_heading = ""
            if overlap_prefix:
                current_chunk_sections.append(overlap_prefix)
                overlap_prefix = ""
        tentative_chunk = ("\n".join(current_chunk_sections) + "\n" if current_chunk_sections else "") + section_text
        if estimate_tokens(tentative_chunk) <= token_limit_normal:
            current_chunk_sections.append(section_text)
            if section_heading:
                current_chunk_metadata["headings"].append(section_heading)
        else:
            overlap_prefix = flush_current_chunk()
            if overlap_prefix:
                current_chunk_sections.append(overlap_prefix)
            current_chunk_sections.append(section_text)
            if section_heading:
                current_chunk_metadata["headings"].append(section_heading)

    if current_chunk_sections:
        flush_current_chunk()

    # --- Filter trivial chunks ---
    filtered_chunks = []
    for chunk in chunks:
        chunk_text = chunk["chunk_text"].strip()
        overlap_snippet = extract_overlap_text(chunk_text, overlap_token_count).strip()
        if chunk_text != overlap_snippet:
            filtered_chunks.append(chunk)
    # -------------------------------

    return filtered_chunks


In [None]:
# 3. Combine the section dictionaries into chunks with extended metadata.
metadata_chunks = combine_sections_to_chunks(
    sections,
    token_limit_normal=400,
    token_limit_atomic=2000,
    overlap_token_count=50
)
print("Total chunks generated:", len(metadata_chunks))
print("\n----- Metadata of Chunks -----\n")
for idx, meta in enumerate(metadata_chunks):
    print(f"--- Chunk {idx+1} ---")
    print("Metadata:", meta['metadata'])
    print("Chunk Text:\n", meta['chunk_text'])
    print("\n---------------------------------\n")

In [61]:
import json

json_output_path = r"D:\ML\Thesis_chatbot\Data\out\metadata_chunks.json"
with open(json_output_path, "w", encoding="utf-8") as outfile:
    json.dump(metadata_chunks, outfile, ensure_ascii=False, indent=4)

print(f"✅ Metadata chunks saved to {json_output_path}")

✅ Metadata chunks saved to D:\ML\Thesis_chatbot\Data\out\metadata_chunks.json
