--
description: DOM Model with Pydantic and Pandoc Integration
output-file: core.dom.html
title: core.dom

This notebook demonstrates a Document Object Model (DOM) using Pydantic for static typing and validation, and integrates Pandoc (via pypandoc) for Markdown processing.
---

In [2]:
# | default_exp dom

In [3]:
# | hide
from nbdev.showdoc import *
from IPython.core.interactiveshell import InteractiveShell
from IPython.display import Markdown, display

InteractiveShell.ast_node_interactivity = "all"

In [4]:
# | export
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator
from functools import cached_property
import base64
import pypandoc
import pathlib
import asyncio
import tqdm
import json
import re
import os
from pathlib import Path
from pprint import pprint
import markdown
import asyncio 
import tqdm
import hashlib
from functools import cached_property

In [5]:
#| export
from ollama import chat, ChatResponse, Client, AsyncClient

## Base Element Class

In [6]:
# | export
class Element(BaseModel):
    summary: Optional[str] = None

## Figure Class with Base64 Validation

In [7]:
# | export
class Figure(Element):
    rawdata: str = Field(..., description="Base64-encoded image data")

    @field_validator("rawdata")
    def validate_base64(cls, v):
        try:
            base64.b64decode(v)
        except Exception:
            raise ValueError("rawdata must be valid base64")
        return v

## Table Structure: Cell, Column, Row, Table

In [8]:
# | export
class Cell(BaseModel):
    c: str


class Column(BaseModel):
    cells: List[Cell]


class Row(BaseModel):
    cols: List[Column]


class Table(Element):
    rows: List[Row]

In [9]:
# | export
def get_text_summary_response(content: str, model:str="gemma3-27b", role:str="user", lang: str="zh") -> ChatResponse:
    """
    Returns a ChatResponse from the chat model with a summary prompt for the given content.
    """
    match lang:
        case "en":
            prompt = (
                f"Please provide a summary of the following string. "
                f"The summary should be concise and informative: {content}. "
            )
        case "zh":
            content = re.sub(r"\s+", " ", content.strip())
            prompt = (
                f"请提供以下字符串的摘要。"
                f"摘要应简明扼要且信息丰富: {content}. "
            )
        case _:
            raise ValueError(f"Unsupported language: {lang}")
    return chat(
        model=model,
        messages=[
            {
                "role": role,
                "content": prompt,
            }
        ],
    )

In [10]:
# | export
image_link_pattern = r'[^\s]+\.(?:jpg|jpeg|png|gif|bmp|webp)'

def get_image_summary_response(image_link: str | Path, model:str="gemma3:27b", role:str="user", lang: str='zh') -> ChatResponse:
    """
    Returns a ChatResponse from the chat model with a summary prompt for the given image.
    """
    if isinstance(image_link, Path):
        image_link = str(image_link)
    if not re.match(image_link_pattern, image_link):
        # If the image link is not a URL, throw an error
        raise ValueError(f"Invalid image link: {image_link}")
    
    match lang:
        case 'en':
            prompt = "Please provide a summary of the following image. The summary should be concise and informative about the robot."
        case 'zh':
            prompt = "请提供以下图像的摘要。关于机器人机械尺寸,运动范围,自由度的说明应简明扼要。"
        case _:
            raise ValueError(f"Unsupported language: {lang}")

    response = chat(
        model=model,
        messages=[
            {
                "role": role,
                "content": prompt,
                'images': [f"{image_link}"],
            }
        ],
    )
    return response


In [13]:
# | export
image_link_pattern = r'[^\s]+\.(?:jpg|jpeg|png|gif|bmp|webp)'

async def get_image_summary_response_async(client: AsyncClient, image_link: str | Path, model:str="gemma3:27b", role:str="user", lang: str='zh') -> ChatResponse:
    """
    Returns a ChatResponse from the chat model with a summary prompt for the given image.
    """
    if isinstance(image_link, Path):
        image_link = str(image_link)
    if not re.match(image_link_pattern, image_link):
        # If the image link is not a URL, throw an error
        raise ValueError(f"Invalid image link: {image_link}")
    
    match lang:
        case 'en':
            prompt = "Please provide a summary of the following image. The summary should be concise and informative about the robot."
        case 'zh':
            prompt = "请提供以下图像的摘要。关于机器人机械尺寸,运动范围,自由度的说明应简明扼要。"
        case _:
            raise ValueError(f"Unsupported language: {lang}")

    response = await client.chat(
        model=model,
        messages=[
            {
                "role": role,
                "content": prompt,
                'images': [f"{image_link}"],
            }
        ],
    )
    return response


In [None]:
# | export
async def get_text_summary_response_async(client: AsyncClient, content: str, model:str="gemma3-27b", role:str="user", lang: str="zh") -> ChatResponse:
    """
    Returns a ChatResponse from the chat model with a summary prompt for the given content.
    """
    content = re.sub(r"\s+", " ", content.strip())
    match lang:
        case "en":
            prompt = (
                f"Please provide a summary of the following string. "
                f"The summary should be concise and informative: {content}. "
            )
        case "zh":
            prompt = (
                f"请提供以下字符串的摘要。"
                f"摘要应简明扼要且信息丰富: {content}. "
            )
        case _:
            raise ValueError(f"Unsupported language: {lang}")
    
    message = {
        "role": role,
        "content": prompt,
    }
    response =  await client.chat(
        model=model,
        messages=[message]
    )
    return response

In [13]:
# | export
async def get_image_summary_async_consume(client: AsyncClient, lock: asyncio.Lock, queue: asyncio.Queue, responses: ChatResponse, model:str="gemma3:27b") -> None:
    """
    Returns ChatResponse from the chat model with a summary prompt for the given image.
    """
    messages = []
    while True:
        # Wait for a message from the queue
        try:
            # Use a timeout to avoid blocking indefinitely
            chat_msg = await asyncio.wait_for(queue.get(), timeout=60)
            if chat_msg.role == 'SENTINEL':
                # If the sentinel value is received, break the loop
                break
            # Process the message and get the response
            message = chat_msg.dict()  # Convert ChatMessage to dict
            message.pop('id', None)  # Remove the id field if it exists, as it's not needed for the chat model
            message.pop('text', None)  # Remove the text field if it exists, as it's not needed for the chat model
            messages.append(message)
            if len(messages) < 16:
                # If we have less than 16 messages, continue to accumulate
                continue
        except asyncio.TimeoutError:
            # If the queue is empty for too long and empty, continue the loop
            # not queue is not empty, process the accumulated messages so far in the final case even it's less than 16
            if queue.empty():
                print("Queue is empty, continuing the loop.")
                continue
        finally:  # clean up the queue
            # If we have accumulated messages (16 or less if timeout), send them to the chat model
            if messages:
                resp = await client.chat(
                    model=model,
                    messages=messages
                )

                # Put the response in the output queue
                async with lock:
                    responses = resp

                messages = []  # Reset message list for the next batch
    
    return 

In [14]:
async def get_image_summary_response(queue: asyncio.Queue, lock:asyncio.Lock, image_link: str | Path, model:str="gemma3:27b", role:str="user", lang: str='zh') -> ChatResponse:
    """
    Returns a ChatResponse from the chat model with a summary prompt for the given image.
    """
    
    # message = await get_image_summary_async_produce(queue, image_link, role, lang)
    # Wait for the response from the queue
    
    # # with lock:
        
    # # response = await

    # return response

In [15]:
# os.getcwd()
image_link = "../res/siasun_md_sammple_hrsl/SN024002/img/__2.png"
# image_link
res = re.match(image_link_pattern, image_link)
res.group(0)
# get_image_summary_response("../res/")

'../res/siasun_md_sammple_hrsl/SN024002/img/__2.png'

In [23]:
imagepath = Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SN024002/img/img_13.png'
# imagepath = Path(os.getcwd()).parent / 'res/siasun_md_sample/SN024002/img/img_13.png'
# imagepath = Path(os.getcwd()).parent / 'siasun_md_sample_hrsl/SN024002/img/img_11.png'
# imagepath = Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SX322002/img/img_13.png'
image_link = str(imagepath)
image_link
display(Markdown(f"![image]({image_link})"))

'/Users/x/devel/cell/ribosome/res/siasun_md_sample_hrsl/SN024002/img/img_13.png'

![image](/Users/x/devel/cell/ribosome/res/siasun_md_sample_hrsl/SN024002/img/img_13.png)

In [20]:
# res = re.match(image_link_pattern, 'http://baidu.com/?home/img/small.png')
res = re.match(image_link_pattern, 'img/small.png')
# res = re.match(image_link_pattern, image_link)
# in case of match, print the matched string
if res:
    print(f"Matched image link: {res.group(0)}")
else:
    print(res)

Matched image link: img/small.png


In [21]:
# | export
image_link_pattern = r'[^\s]+\.(?:jpg|jpeg|png|gif|bmp|webp)'
from ollama import chat, ChatResponse
def get_image_summary_response_sync(image_links: list[str], model:str="gemma3:27b", role:str="user", lang: str='zh') -> ChatResponse:
    """
    Using Ollama batch request API to summarize multiple images in one request.
    In synchronous mode, this function will block until the response is received.
    Set configuration 'OLLAMA_NUM_PARALLEL' in 
      - /opt/homebrew/opt/ollama/homebrew.ollama.service in macOS
      - /etc/systemd/system/ollama.service in Linux
    Returns a ChatResponse from the chat model with a summary prompt for the given images.
    !!! messages content of ChatResponse is in a single string, not a list of strings.  
    """
    for link in image_links:
        if not re.match(image_link_pattern, link):
            # If the image link is not a URL, throw an error
            raise ValueError(f"Invalid image link: {link}")
    
    match lang:
        case 'en':
            prompt = "Please provide a summary of the following image. The summary should be concise and informative about the robot."
        case 'zh':
            prompt = "请提供以下图像的摘要。关于机器人机械尺寸,运动范围,自由度的说明应简明扼要。"
        case _:
            raise ValueError(f"Unsupported language: {lang}")

    messages = [ {
        "role": role,
        "content": prompt,
        'images': [link],
    } for link in image_links ]
    
    response = chat(
        model=model,
        messages= messages,
    )
    return response

In [24]:
if True: 
    # response = await get_image_summary_response_async(image_link, model="gemma3:27b", role="user", lang='zh')
    response = get_image_summary_response_sync([image_link], model="gemma3:27b", role="user", lang='zh')
    assert isinstance(response.message.content, str), "Response content should be a string"
    md_text = markdown.markdown(response.message.content)
    Markdown(md_text)
    # print(md_text)
    # print(response.content)
    # response_txt = await get_text_summary_response_async(md_text,model="gemma3:27b", role="user", lang='zh')
    # md_text = markdown.markdown(response_txt.message.content)
    # Markdown(md_text)


<p>以下是对图像的摘要：</p>
<p><strong>机器人机械尺寸:</strong></p>
<ul>
<li><strong>底座尺寸:</strong> 180mm x 180mm</li>
<li><strong>底座高度:</strong> 98mm</li>
<li><strong>整体高度:</strong> 223mm</li>
<li><strong>安装孔:</strong> 4个M13.5螺孔 (用于机器人安装), 2个M6H8螺孔 (用于定位销)</li>
<li><strong>轴线直径:</strong> 机器人旋转轴直径约180mm</li>
</ul>
<p><strong>运动范围:</strong></p>
<ul>
<li>该图仅显示了底座的结构，没有显示机器人的运动范围。需要根据机器人型号才能确定运动范围。</li>
</ul>
<p><strong>自由度:</strong></p>
<ul>
<li>该图仅显示了底座的结构，没有显示机器人的自由度。通常该类型的机器人具有旋转和倾斜自由度。</li>
</ul>
<p><strong>备注:</strong></p>
<ul>
<li>该图是机器人底座的机械图纸，用于显示安装和固定机器人的尺寸和位置。</li>
<li>图像上标有中文注释，说明了各个部分的用途和尺寸。</li>
<li>B-B表示一个剖面图，显示了底座内部的结构。</li>
</ul>

In [25]:
if True: 
    imagepath = [ 
                 Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SN024002/img/img_13.png',
                 Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SN024002/img/img_14.png',
                 Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SN024002/img/img_15.png',
                 Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SN024002/img/img_17.png'
    ]
    image_links = [str(p) for p in imagepath]
    response = get_image_summary_response_sync(image_links, model="gemma3:27b", role="user", lang='zh')
    assert isinstance(response.message.content, str), "Response content should be a string"
    md_text = markdown.markdown(response.message.content)
    Markdown(md_text)
    # print(md_text)
    # print(response.content)
    # response_txt = await get_text_summary_response_async(md_text,model="gemma3:27b", role="user", lang='zh')
    # md_text = markdown.markdown(response_txt.message.content)
    # Markdown(md_text)


<p>以下是图像的摘要，着重于机器人尺寸、范围和自由度：</p>
<p><strong>图像 1：</strong></p>
<ul>
<li><strong>尺寸：</strong> 宽度和高度约为 180mm x 223mm。中心孔直径为 31.5mm。安装孔距为75mm。</li>
<li><strong>范围:</strong> 图像显示的是一个用于连接机器人底座的旋转平台，可以提供360度旋转范围。</li>
<li><strong>自由度:</strong> 旋转自由度（围绕垂直轴旋转）。</li>
</ul>
<p><strong>图像 2：</strong></p>
<ul>
<li><strong>尺寸：</strong> 中心孔直径为 31.5mm，安装孔距为 90°。直径约为40mm，高度约为20mm。</li>
<li><strong>范围:</strong> 图像显示的是一个用于连接机器人底座的旋转平台，可以提供360度旋转范围。</li>
<li><strong>自由度:</strong> 旋转自由度（围绕垂直轴旋转）。</li>
</ul>
<p><strong>图像 3：</strong></p>
<ul>
<li><strong>尺寸：</strong> 机器人整体高度约为 509.5mm，宽度和深度约为 240.5mm。</li>
<li><strong>范围:</strong> 机器人具有多种运动能力。</li>
<li><strong>自由度:</strong> 机器人的运动自由度由多个关节决定，从图中大致可以看出，可能至少具有6个自由度（包含旋转和伸缩）。</li>
</ul>
<p><strong>图像 4：</strong></p>
<ul>
<li><strong>尺寸：</strong> 机器人整体长度约为 355mm，宽度和高度约为 50mm。</li>
<li><strong>范围:</strong> 机器人具有多种运动能力。</li>
<li><strong>自由度:</strong> 机器人的运动自由度由多个关节决定，从图中大致可以看出，可能至少具有6个自由度（包含旋转和伸缩）。</li>
</ul>
<p><strong>注意:</strong>  这些图纸仅显示机器人的机械尺寸和安装界面。要了解完整的运动范围和自由度，需要查阅更详细的技术规格。</p>

In [None]:
#| export
chat_client: AsyncClient = AsyncClient()  
msg_queue: asyncio.Queue = asyncio.Queue(maxsize=8)  # Limit the queue size to 8 messages
resp_queue: asyncio.Queue = asyncio.Queue(maxsize=1)  # Limit the queue size to 1 message for response 
lock = asyncio.Lock()  # Lock to ensure thread-safe access to the queues

## DOM Class with pypandoc Integration

In [None]:
# | export
from platform import node
from typing import Iterator, Callable, Optional
from queue import LifoQueue
import asyncio
import re
import copy

class DOM(BaseModel):
    # The content of the Markdown document. This can be a string containing Markdown syntax.
    raw_markdown: Optional[str] = Field(None, description="Raw Markdown content")
    # raw json
    raw_json: Optional[str] = Field(None, description="Raw JSON content")
    # The json representation of the Markdown AST.
    ast_json: Optional[str] = Field(None, description="JSON representation of the Markdown AST")
    file_path: Optional[Path] = Field(None, description="Path to the Markdown file")
    root_path: Optional[str] = Field(None, description="root path of the markdown document, required to get access to the images")
    table_count: int = Field(0, description="Number of tables in the Markdown document")
    section_count: int = Field(0, description="Number of headers in the Markdown document")
    section_level: list = Field(default_factory=list, description="List of section levels in the Markdown document")
    title: Optional[str] = Field(None, description="Title of the Markdown document, if available")


    TextBlock_Types: set[str] = {
        "Plain",
        "Para",
        "Figure",
        "LineBlock","CodeBlock","RawBlock","OrderedList","BulletList","DefinitionList",
        "Header","BlockQuote",
        "Table","TableRow", "TableCell"}

    NonTextBlock_Types: set[str] = {"HorizontalRule", "Div", "Null"}

    Block_Types: set[str] = TextBlock_Types.union(TextBlock_Types)

    Inline_Types: set[str] = {
        "Str", "Emph", "Strong", "Strikeout", "Superscript", "Subscript",
        "Decimal", "Period",
        "Link", 
        "Image", "Code", "Math", "RawInline", "SoftBreak", "HardBreak", "Span"   
    }
    
    Element_Types: set[str] = Block_Types | Inline_Types

    def __init__(self, md_file_path: Path, **data):
        """ Initializes the Markdown object with raw Markdown content.
        If content is provided, it will be set as the raw_markdown.
        """
        super().__init__(**data)
        self.file_path = Path(md_file_path)  # type: ignore
        self.root_path = md_file_path.parent  # type: ignore
        self.title = md_file_path.stem  # Use the file name without extension as the title
        self.queue = asyncio.Queue(maxsize=8)  # Limit the queue size to 8 messages
        self.client = AsyncClient()  # Create an AsyncClient instance for chat model interactions

    def setup(self):
        content = self.file_path.read_text(encoding="utf-8")  # type: ignore
        if content:

            self.raw_markdown = content
            self.raw_json = pypandoc.convert_text(self.raw_markdown, "json", "md")
            ast_json_file = self.file_path.parent / (str(self.file_path.stem) + "_ast.json")  # type: ignore

            if ast_json_file.exists():
                self.ast_json = ast_json_file.read_text(encoding="utf-8")  # type: ignore
            else:
                slide_splitter = r"(^<!--\s*Slide number:\s*\d+\s*-->$)"  # Regex to match slide splitters in the Markdown content
                # If the raw_markdown contains slide splitters, we need to reorganize the slides
                if re.search(slide_splitter, self.raw_markdown, flags=(re.MULTILINE|re.IGNORECASE)):  # type: ignore
                    # If there are slide splitters, we need to reorganize the slides
                    self.ast_json = self.reorg_slides(slide_splitter=slide_splitter)
                else:
                    # If there are no slide splitters, we can use the raw_json as is
                    self.ast_json = self.reorg()

        else:
            self.raw_markdown = None
            self.raw_json = None
            self.ast_json = None

    def reorg_slides(self, slide_splitter: str = r"(^<!--\s*Slide number:\s*\d+\s*-->$)") -> str:
        """Reorganizes the slides in the Markdown AST.
        This function splits the raw_markdown into slides based on the slide_splitter regex,
        and then reorganizes each slide into a Section with a Header and Content.
        """

        assert self.raw_json or self.raw_markdown, "raw_json/raw_markdown content is empty. Cannot reorganize slides."
        # Split the raw_markdown into slides
        items = re.split(slide_splitter, self.raw_markdown, flags=re.MULTILINE)  # type: ignore
        presentation = json.loads(self.raw_json)  # type: ignore
        presentation['blocks'] = []  # type: ignore
        slide_header0 = {
            't': 'Section', 
            'c': [
                {
                    't': 'Header', 
                    'c': [
                        1,  # Header level, can be 1, 2, 3, etc.
                        ['slide header', [], []],  # The header format list [id, formtat1, format2]
                        [{'t': 'Str', 'c': 'slide header'}],  # The header content list
                    ]
                },  # Header for the slide
                {
                    't': 'Content', 
                    'c': []
                }
                ]  # Content of the slide
            }
        # Reorganize each slide
        slide_header = None  # Initialize slide variable
        if items[0] == "":
            # If the first slide is empty, remove it
            items = items[1:]
            slide_header = copy.deepcopy(slide_header0)
            slide_header['c'][0]['c'][1][0] = '<!-- Slide number: 0 -->'
            slide_header['c'][0]['c'][2][0]['c'] = '<!-- Slide number: 0 -->'

        for item in items:
            if re.match(r"^<!--\s*Slide number:\s*\d+\s*-->$", item):
                # If the slide is a slide splitter
                slide_header = copy.deepcopy(slide_header0)  # must be deepcopy, otherwise the slide will be modified in place
                slide_header['c'][0]['c'][1][0] = item.strip()
                slide_header['c'][0]['c'][2][0]['c'] = item.strip()
            else:
                assert slide_header, f"Slide is not defined!"
                raw_ast = pypandoc.convert_text(item.strip(), "json", "md")
                ast = json.loads(raw_ast)
                assert ast.get('blocks'), f"AST blocks are not defined in {ast}"
                it = iter(ast['blocks'])
                slide, it = self.reorg_section(slide_header, it, bIgnoreLevel=True)         
                presentation['blocks'].append(slide)

        # Convert the list of slides back to a single JSON object
        return json.dumps(presentation, ensure_ascii=False).encode("utf-8").decode("utf-8")

    def to_markdown(self) -> str | None:
        return self.raw_markdown

    def to_html(self) -> str | None:
        if not self.raw_markdown:
            return None
        # Convert raw Markdown to HTML using pypandoc
        return pypandoc.convert_text(self.raw_markdown, "html", "md")

    def to_latex(self) -> str | None:
        if not self.raw_markdown:
            return None
        return pypandoc.convert_text(self.raw_markdown, "latex", "md")

    def to_json(self) -> str | None:
        """ Converts the Markdown content to a JSON representation of its AST.
        This uses pypandoc to convert the Markdown content into a JSON format.
        """
        return self.raw_json

    def walk(self, action: Optional[Callable] = None) -> None:
        """ Walks through the Markdown AST and applies an action to each node.
        If no action is provided, it defaults to the identity function.
        """
        if not self.raw_json:
            raise ValueError("raw_json content is empty. Cannot walk the AST.")
        if action is None:
            action = self.__class__.identity

        ast = json.loads(self.raw_json)

        def walk_node(node):
            node = action(node)
            if isinstance(node, dict):
                for key, value in node.items():
                    if isinstance(value, list):
                        node[key] = [
                            walk_node(child)
                            if isinstance(child, (dict, list))
                            else child
                            for child in value
                        ]
                    elif isinstance(value, dict):
                        node[key] = walk_node(value)
            elif isinstance(node, list):
                node = [
                    walk_node(child) if isinstance(child, (dict, list)) else child
                    for child in node
                ]
            return node

        ast = walk_node(ast)
        self.ast_json = json.dumps(ast, ensure_ascii=False).encode("utf-8").decode("utf-8")
        
    def reorg_section(self, section: dict, it: Iterator, bIgnoreLevel:bool=False) -> tuple[dict, Iterator]:
        """
        Reorganizes the list of nodes after a Section node.
        Puts all nodes after the Section into the Content of the Section.
        This is done until the next Section node is encountered.
        Returns the new Section node
        """

        assert section.get('t') == "Section", "Expected a Section node"
        level = section['c'][0]['c'][0]

        while True:
            try:
                item = next(it)
                if not bIgnoreLevel and isinstance(item, dict) and item.get('t') == "Section":
                    # If the item is a Section, add the next items into its Content until the next Section
                    next_level = item['c'][0]['c'][0]
                    if next_level >= level + 1:  # lower levels need to be continuous
                        # If the next Section is at a lower level, step into further subsection handling
                        item, it = self.reorg_section(item, it, bIgnoreLevel=False)
                    elif next_level == level:  # lower levels need to be continuous
                        # If the next Section is at the same level, we can stop here at the current recursion level
                        it = iter([item] + list(it))
                        return section, it
                        # item, it = reorg_section(item, it)
                    elif next_level < level:  # higher levels don't need to be continuous
                        # If the next Section is at a higher level, we can stop here at the current recursion level
                        it = iter([item] + list(it))
                        return section, it
                    else:  # next_level >= level + 2:
                        raise ValueError(f"Unexpected Section level encountered: node: {item}")

                # If the item is not a Section, we can add it to the Content of the Section
                assert section['c'][1].get('t') == "Content", "Expected a Content node"
                section['c'][1]['c'].append(item)

            except StopIteration:
                break
        return section, it
        

    def reorg(self, action: Optional[Callable] = None) -> str:
        """
        Reorganizes the node structure to ensure that headers are treated as sections.
            Applies an action to each node. If no action is provided, it defaults to the identity function.
        """
        if not self.raw_json:
            raise ValueError("raw_json content is empty. Cannot walk the AST.")
        if action is None:
            action = self.__class__.identity

        ast = json.loads(self.raw_json)


        
        def reorg_node(node):
            """Reorganizes the node structure to ensure that headers are treated as sections."""
            node = action(node)     

            if isinstance(node, dict):
                for key, value in node.items():
                    if isinstance(value, list):
                        new_list = []
                        for child in value:
                            if isinstance(child, (dict, list)):
                                new_list.append(reorg_node(child))
                            else:
                                new_list.append(child)
                            if isinstance(child, dict):
                                # If the child is a dict, check if it has a 't' key
                                if 't' in child:
                                    t = child['t']
                                    if t == "Header":
                                        # If the node is a Header, create a new section 
                                        # and include the header in its content 
                                        # along with the following nodes until the next Header
                                        child['t'] = "Section"
                                        child['c'] = [
                                            {
                                                't': 'Header',
                                                'c': [
                                                    child['c'][0],  # Header level
                                                    child['c'][1],  # Header content
                                                    child['c'][2],  # Header content
                                                ]
                                            },
                                            {
                                                't': 'Content',
                                                'c': []
                                            }
                                        ]
                                        self.section_level.append(child['c'][0]['c'][0])  # Append the header level to section_levell
                        it = iter(new_list)
                        new_value = []
                        while True:
                            try:
                                item = next(it)
                                if isinstance(item, dict) and item.get('t') == "Section":  # the first recursion level *
                                    item, it = self.reorg_section(item,it, bIgnoreLevel=False)
                                new_value.append(item)
                            except StopIteration:
                                break
                        node[key] = new_value
                    elif isinstance(value, dict):
                        node[key] = reorg_node(value)
            elif isinstance(node, list):
                node = [
                    reorg_node(child) if isinstance(child, (dict, list)) else child
                    for child in node
                ]
            return node

        ast = reorg_node(ast)
        return json.dumps(ast, ensure_ascii=False).encode("utf-8").decode("utf-8")
        
    async def textualize(self, action: Optional[Callable] = None) -> None:
        """ Walks through the Markdown AST and applies an action to each node.
        If no action is provided, it defaults to the identity function.
        """

        async def get_leaf_summary_async(node:str) -> str:

            # If the string length is less than 200, return the node as is
            if len(node) < 200:
                return node
            # If the node is not an image link, return a summary of the text
            response = await get_text_summary_response_async(node, model="gemma3:27b", role="user", lang='zh')
            if response.message.content:
                return response.message.content
            else:
                # If the response is empty, return the original node
                return node

        async def get_list_summary_async(root: list) -> str:
            """
            Given a list of strings, return a summary of the list.
            If the list is empty, return an empty string.
            If the list has only one element, return the summary of that element.
            If the list has more than one element, return a summary of the concatenated elements.
            """
            list_summary = []
            for n in root:
                if isinstance(n, str):
                    # If the element is a string, get its summary
                    summary = await get_leaf_summary_async(n)
                elif isinstance(n, int) or isinstance(n, float):
                    # If the element is a number, get its summary
                    summary = await get_leaf_summary_async(str(n))
                elif isinstance(n, dict):
                    # If the dict has a 's' key, use it as the summary
                    summary = n.get('s', '')
                elif isinstance(n, list):
                    # If the element is a list, get its summary
                    summary = await get_list_summary_async(n)
                elif n is None:
                    # If the element is None, skip it
                    summary = ""
                else:
                    raise ValueError(f"Unsupported element type: {type(n)} in {n}")
                list_summary.append(summary)
                
            # Concatenate all elements and summarize
            # concatenated = " ".join(list_summary)
            # response = get_text_summary_response(concatenated, model="gemma3:27b", role="user", lang='zh')
            # if response.message.content:
            #     return response.message.content
            # else:
            #     # If the response is empty, raise an exception
            #     raise ValueError("Summary response is empty. Please check the input data.")
            return await get_leaf_summary_async(" ".join(list_summary))

        async def summary_node_main(action: Optional[Callable] = None) -> None:
            """
            Main function to walk the AST and summarize nodes.
            This function will be called by the walk method.
            """
            if not self.ast_json:
                raise ValueError("raw_json content is empty. Cannot walk the AST and summarize.")
            if action is None:
                action = self.__class__.identity

            ast = json.loads(self.ast_json)
            blocks = ast.get("blocks", [])

            ast['blocks'] = await summary_node_async(blocks)
            if ast.get('title') is None:
                # If the title is not set, use the file name as the title
                if self.title is None:
                    self.title = self.root_path
            ast['title'] = self.title  # Add the title to the AST
            ast['file_path'] = self.file_path  # Add the file path to the AST

            assert isinstance(blocks, list), f"Expected a list of blocks, got {type(blocks)}"
            dict_summary = [b['s'] for b in blocks if isinstance(b, dict) and 's' in b]
            dict_summary = [ast['title']] + dict_summary  # Add the title to the summary list
            # the summary of the document from the summaries in the list of blocks
            if not ast['blocks'] or dict_summary == []:
                # If the blocks are empty, set the summary to an empty string
                ast['summary'] = ""
                # If the summary is empty, set the AST JSON to an empty string
                self.ast_json = json.dumps(ast, ensure_ascii=False).encode("utf-8").decode("utf-8")
            else:
                # If the blocks are not empty, set the summary to the concatenated summaries
                doc_summary = await get_leaf_summary_async(" ".join(dict_summary))
                ast['summary'] = doc_summary
                # Convert the summarized AST back to JSON
                self.ast_json = json.dumps(ast, ensure_ascii=False).encode("utf-8").decode("utf-8")
            
        async def summary_node_async(node: dict | list) -> dict | list:
            '''
            Given a string node, add key,value pair: node['s'] = node_summary, and return the node 
            '''
            if isinstance(node, dict):
                try:
                    t = node["t"]
                except KeyError:
                    raise ValueError(f"Node does not have a 't' key: {node}")
                if t == "Image":  # Image summary, the Image node is as defined in the pandoc AST
                    summary = []
                    if (not node['c'][1] == []) and (node['c'][1][0] is not None) and (node['c'][1][0].get('c') is not None):
                        summary.append(node['c'][1][0]['c'])  # The content of the second element is the image caption
                    try:
                        # summary.append(node['c'][0])  # The first element are defined to be attributes of the image rendering, i.e. content-irrelevant.
                        # If the node is an image, get its link
                        image_link = self.root_path / node['c'][2][0]  # Assuming the image link is in the third element of the list
                        image_link = str(image_link)
                        if image_link and re.search(image_link_pattern, image_link):  # re.match leads to empty match if there's spaces in the path!
                            # If the node is an image link, get its summary
                            response = await get_image_summary_async_produce(msg_queue, image_link, role="user", lang='zh')
                            summary.append(response.message.content)
                        else:
                            # If the node is not an image link, summarize its content
                            raise ValueError(f"Invalid image link: {image_link}")
                        # Get the summary of the image caption

                    except (IndexError, KeyError):
                        # Handle cases where the image link is not in the expected format
                        raise ValueError(f"Invalid image node structure: {node}")
                    # The second element is the image title
                    summary.append(node['c'][2][1])  # The second element is the image title

                    response_txt = await get_text_summary_response_async(
                        " ".join(summary), model="gemma3:27b", role="user"
                    )
                    node["s"] = response_txt.message.content

                    print(f"Summarize image: {image_link}")

                else: # TextBlock summary
                    dict_summary = []
                    for key, value in node.items(): # get summary of the values (content)
                        if isinstance(value, list):  # get the summary of the string list
                            if value == []:
                                # If the list is empty, skip it
                                continue
                            # If the value is a list, summarize each element
                            node[key] = [
                                await summary_node_async(child)
                                if isinstance(child, (dict,list))
                                else child
                                for child in value
                            ]
                            dict_summary.append(await get_list_summary_async(value))
                        elif isinstance(value, dict):
                            child = await summary_node_async(value)  # insert the value['s']
                            assert isinstance(child, dict) and 's' in child, f"Expected dict with 's' key, got {child}"
                            dict_summary.append(child['s'])

                        elif value is None or value == "":
                            # If the value is None, skip it
                            continue
                        else:
                            if not isinstance(value, str):
                                # If the value is not a string, convert it to a string
                                value = str(value)
                            dict_summary.append(await get_leaf_summary_async(value))
                    # get the summary of the node
                    if dict_summary:  # type: ignore
                        # If there are summaries, concatenate them
                        node["s"] = await get_leaf_summary_async(" ".join(dict_summary))
                        # node["s"] = get_text_summary_response(
                        #     " ".join(dict_summary), model="gemma3:27b", role="user", lang='zh'
                        # ).message.content
                    else:
                        # If no summaries, set to empty string
                        node["s"] = ""

                    # if t is table
                    if t == "Table":
                        print(f"Summarize table: {self.table_count}")
                        self.table_count += 1
                    elif t == "Section":
                        print(f"Summarize section: {self.section_count} section depth {node['c'][0]['c'][0]}")
                        self.section_count += 1

            elif isinstance(node, list):
                node = [
                    await summary_node_async(child) if isinstance(child, (dict,list)) else child
                    for child in node
                ]

            return node

        # Run the summary_node_main function asynchronously
        # asyncio.run(summary_node_main(action))
        await summary_node_main(action)
        return 
        
    @classmethod
    def identity(cls, obj):
        """Identity function for use in walk."""
        return obj

In [18]:
if True:
    md_file = Path('/d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/SX047001新松机器人产品识别设计标准A-1.md') 
    # md_file = Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SR02400401/SR02400401《 SIASUN SR210A-210-2.65 Specifications-CE》A-0.md'
    # md_file = Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SX322002/SX322002.md'
    # md_file = Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SR024011/SR024011.md'
    # md_file = Path(os.getcwd()).parent / 'res/siasun_md_sample_hrsl/SN024002/SN024002.md'
    # md_file = Path(os.getcwd()).parent / 'res/siasun_md_sample/SN024002/SN024002.md'
    md_file.parent
    os.getcwd()
    
    dom = DOM(md_file)
    dom.setup()
    dom.raw_json
    dom.ast_json
    md_file.name
    js_sections_file = md_file.parent / (str(md_file.stem) + "_presentation.json")
    js_sections_file
    # dom.ast_json = json.dumps(dom.ast_json, ensure_ascii=False, indent=2).encode("utf-8").decode("utf-8")
    # dom.ast_json = ast_json
    # Write the JSON representation of the AST to the file
    js_sections_file.write_text(dom.ast_json, encoding="utf-8")
    # js_sections_file.write_text(dom.ast_json, encoding="utf-8")
    file = await dom.textualize()
    dom.ast_json
    
    md_file.name
    js_semantics_file = md_file.parent / (str(md_file.stem) + "_semantics.json")
    js_semantics_file
    
    js_semantics_file.write_text(dom.ast_json, encoding="utf-8")

Path('/d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1')

'/d/devel/rag/ribosome/nbs'

'{"pandoc-api-version":[1,23,1],"meta":{},"blocks":[{"t":"Para","c":[{"t":"Strong","c":[{"t":"Str","c":"机器人让世界更美好"},{"t":"Space"},{"t":"Str","c":"ROBOTS"},{"t":"Space"},{"t":"Str","c":"MAKE"},{"t":"Space"},{"t":"Str","c":"A"},{"t":"Space"},{"t":"Str","c":"BETTER"},{"t":"Space"},{"t":"Str","c":"WORLD"}]}]},{"t":"Header","c":[1,["新松机器人-产品识别设计标准推荐",[],[]],[{"t":"Strong","c":[{"t":"Str","c":"新松机器人"},{"t":"Space"},{"t":"Str","c":"产品识别设计标准(推荐)"}]}]]},{"t":"Header","c":[4,["product-indentity-system",[],[]],[{"t":"Strong","c":[{"t":"Str","c":"PRODUCT"},{"t":"Space"},{"t":"Str","c":"INDENTITY"},{"t":"Space"},{"t":"Str","c":"SYSTEM"}]}]]},{"t":"Para","c":[{"t":"Image","c":[["",[],[]],[],["_page_1_Picture_0.jpeg",""]]}]},{"t":"Para","c":[{"t":"Strong","c":[{"t":"Str","c":"产品识别("}]},{"t":"Str","c":"Product"},{"t":"Space"},{"t":"Str","c":"Identity"},{"t":"Strong","c":[{"t":"Str","c":",简称"}]},{"t":"Str","c":"PI"},{"t":"Strong","c":[{"t":"Str","c":")是指通过视觉化、规范化和系统化的手段,将产品形象设"},{"t":"Space"},{"t":"Str

'{"pandoc-api-version": [1, 23, 1], "meta": {}, "blocks": [{"t": "Para", "c": [{"t": "Strong", "c": [{"t": "Str", "c": "机器人让世界更美好"}, {"t": "Space"}, {"t": "Str", "c": "ROBOTS"}, {"t": "Space"}, {"t": "Str", "c": "MAKE"}, {"t": "Space"}, {"t": "Str", "c": "A"}, {"t": "Space"}, {"t": "Str", "c": "BETTER"}, {"t": "Space"}, {"t": "Str", "c": "WORLD"}]}]}, {"t": "Section", "c": [{"t": "Header", "c": [1, ["新松机器人-产品识别设计标准推荐", [], []], [{"t": "Strong", "c": [{"t": "Str", "c": "新松机器人"}, {"t": "Space"}, {"t": "Str", "c": "产品识别设计标准(推荐)"}]}]]}, {"t": "Content", "c": [{"t": "Section", "c": [{"t": "Header", "c": [4, ["product-indentity-system", [], []], [{"t": "Strong", "c": [{"t": "Str", "c": "PRODUCT"}, {"t": "Space"}, {"t": "Str", "c": "INDENTITY"}, {"t": "Space"}, {"t": "Str", "c": "SYSTEM"}]}]]}, {"t": "Content", "c": [{"t": "Para", "c": [{"t": "Image", "c": [["", [], []], [], ["_page_1_Picture_0.jpeg", ""]]}]}, {"t": "Para", "c": [{"t": "Strong", "c": [{"t": "Str", "c": "产品识别("}]}, {"t": "Str"

'SX047001新松机器人产品识别设计标准A-1.md'

Path('/d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/SX047001新松机器人产品识别设计标准A-1_presentation.json')

50244

Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_1_Picture_0.jpeg
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_2_Picture_0.jpeg
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_2_Picture_2.jpeg
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_3_Picture_0.jpeg
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_3_Figure_1.jpeg
Summarize section: 0 section depth 4
Summarize section: 1 section depth 1
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_4_Picture_2.jpeg
Summarize section: 2 section depth 1
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_5_Picture_2.jpeg
Summarize image: /d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/_page_6_Picture_0.jpeg
Summarize image: /d/devel/rag/ribo

'{"pandoc-api-version": [1, 23, 1], "meta": {}, "blocks": [{"t": "Para", "c": [{"t": "Strong", "c": [{"t": "Str", "c": "机器人让世界更美好", "s": "Str 机器人让世界更美好"}, {"t": "Space", "s": "Space"}, {"t": "Str", "c": "ROBOTS", "s": "Str ROBOTS"}, {"t": "Space", "s": "Space"}, {"t": "Str", "c": "MAKE", "s": "Str MAKE"}, {"t": "Space", "s": "Space"}, {"t": "Str", "c": "A", "s": "Str A"}, {"t": "Space", "s": "Space"}, {"t": "Str", "c": "BETTER", "s": "Str BETTER"}, {"t": "Space", "s": "Space"}, {"t": "Str", "c": "WORLD", "s": "Str WORLD"}], "s": "Strong Str 机器人让世界更美好 Space Str ROBOTS Space Str MAKE Space Str A Space Str BETTER Space Str WORLD"}], "s": "Para Strong Str 机器人让世界更美好 Space Str ROBOTS Space Str MAKE Space Str A Space Str BETTER Space Str WORLD"}, {"t": "Section", "c": [{"t": "Header", "c": [1, ["新松机器人-产品识别设计标准推荐", [], []], [{"t": "Strong", "c": [{"t": "Str", "c": "新松机器人", "s": "Str 新松机器人"}, {"t": "Space", "s": "Space"}, {"t": "Str", "c": "产品识别设计标准(推荐)", "s": "Str 产品识别设计标准(推荐)"}], "s": "Strong

'SX047001新松机器人产品识别设计标准A-1.md'

Path('/d/devel/rag/ribosome/res/md.hrsl/01 设计标准/SX047001新松机器人产品识别设计标准A-1/SX047001新松机器人产品识别设计标准A-1_semantics.json')

138888

In [18]:
def document_reorg(root_folder: Path | str) -> None:
    """
    iterates through a root folder recursively and analyzes the semantics of each Markdown document.
    generate a json file containing the semantical summary of each document 
    output in the same folder as the original markdown file.capitalize
    """
    root = Path(root_folder) if isinstance(root_folder, str) else root_folder
    for file in root.rglob("*.md"):
        print(f"Processing file started: {file}")
        dom = DOM(file)
        dom.setup()  # Load the Markdown content and convert it to JSON AST
        ast_json_file = file.parent / (str(file.stem) + "_ast.json")
        if not ast_json_file.exists() and dom.ast_json:
            ast_json_file.write_text(dom.ast_json, encoding="utf-8")
        print(f"Processing file finished: {file}")

# document_reorg(Path("../res/test_batch_async"))
document_reorg(Path("/v/data/documents-semantics/.md.hrsl"))

Processing file started: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/新松SR25A/新松SR25A.md
Processing file finished: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/新松SR25A/新松SR25A.md
Processing file started: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/T35A-25检测报告(1)/T35A-25检测报告(1).md
Processing file finished: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/T35A-25检测报告(1)/T35A-25检测报告(1).md
Processing file started: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/江苏华途数控科技有限公司 测试报告/江苏华途数控科技有限公司 测试报告.md
Processing file finished: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/江苏华途数控科技有限公司 测试报告/江苏华途数控科技有限公司 测试报告.md
Processing file started: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/火弧机器人小臂IP等级检测报告/火弧机器人小臂IP等级检测报告.md
Processing file finished: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/火弧机器人小臂IP等级检测报告/火弧机器人小臂IP等级检测报告.md
Processing file started: /v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书/检测报告/新松SR210A/新松SR210A.md
Pro




Processing file finished: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322006《新松机器人打磨应用操作手册》（A-1)/SX322006《新松机器人打磨应用操作手册》（A-1).md
Processing file started: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322015《新松机器人快换应用操作手册》-A-0/SX322015《新松机器人快换应用操作手册》-A-0.md
Processing file finished: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322015《新松机器人快换应用操作手册》-A-0/SX322015《新松机器人快换应用操作手册》-A-0.md
Processing file started: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322004《新松机器人激光焊应用操作手册》-A-3/SX322004《新松机器人激光焊应用操作手册》-A-3.md
Processing file finished: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322004《新松机器人激光焊应用操作手册》-A-3/SX322004《新松机器人激光焊应用操作手册》-A-3.md
Processing file started: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322016《新松机器人FDS应用使用手册》-A-0/SX322016《新松机器人FDS应用使用手册》-A-0.md
Processing file finished: /v/data/documents-semantics/.md.hrsl/06 产品手册/软件类/手册/V4.0版本/SX322016《新松机器人FDS应用使用手册》-A-0/SX322016《新松机器人FDS

In [None]:
async def analyze_document_async(md_file: Path, semaphore: asyncio.Semaphore) -> DOM:
    """
    Asynchronously analyzes the semantics of a Markdown document.
    Returns a DOM object containing the semantical summary of the document.
    """
    try:
        async with semaphore:  # Limit concurrent access to the semaphore
            print(f"Analyzing document: {md_file}")
            dom = DOM(md_file)
            dom.setup()  # Load the Markdown content and convert it to JSON AST
            await dom.textualize()  # Summarize the document
            if dom.file_path:
                ast_json_file = md_file.parent / (str(md_file.stem) + "_semantics.json")
                ast_json_file.write_text(dom.ast_json, encoding="utf-8")  # type: ignore
                print(f"Finished analyzing document: {md_file.stem}")
    except asyncio.CancelledError:
        print(f"Analysis cancelled for document: {md_file}")
        raise
    return dom

In [None]:
import tqdm

async def document_semantics_analysis(root_folder: Path | str) -> None:
    """
    iterates through a root folder recursively and analyzes the semantics of each Markdown document.
    generate a json file containing the semantical summary of each document 
    output in the same folder as the original markdown file.capitalize
    """
    root = Path(root_folder) if isinstance(root_folder, str) else root_folder
    semaphore = asyncio.Semaphore(16)  # Limit the number of concurrent tasks
    # Iterate through all Markdown files in the root folder recursively
    # and create a task for each file to process it asynchronously
    to_do = [analyze_document_async(file, semaphore) for file in root.rglob("*.md")]
    to_do_iter = asyncio.as_completed(to_do)  # Create an iterator for the tasks
    to_do_iter = tqdm.tqdm(to_do_iter, total=len(to_do), desc="Processing files", unit="file")
    for coro in to_do_iter:
        # Wait for each task to complete and get the result
        try:
            _ = await coro  # Await the completion of the task
        except Exception as e:
            print(f"Error processing file {file}: {e}")
            continue

# Run the document semantics analysis
await document_semantics_analysis(Path("../res/md.hrsl"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl"))
# await document_semantics_analysis(Path("../res/test_batch_async"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl/05 技术规格"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl/06 产品手册"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl/06 产品样册"))
# await document_semantics_analysis(Path("/v/data/documents-semantics/.md.hrsl/08 检测报告与认证证书"))

## asyncio interface of processing many markdown files


In [None]:
async def process_one(file: Path) -> None:
    """
    Processes a single Markdown file and generates its semantics analysis.
    """
    print(f"Processing file started: {file}")
    dom = DOM(file)
    await dom.setup()
    ast_json_file = file.parent / (str(file.stem) + "_ast.json")
    semantics_json_file = file.parent / (str(file.stem) + "_semantics.json")
    ast_json_file.write_text(dom.ast_json, encoding="utf-8")
    await dom.textualize()
    semantics_json_file.write_text(dom.ast_json, encoding="utf-8")
    print(f"Semantics analysis completed for {file}. Results saved to {semantics_json_file}")

In [None]:
async def supervisor(root_folder: Path) -> int:
    """
    Supervises the processing of Markdown files in a root folder.
    """

    tasks = []
    for file in root_folder.rglob("*.md"):
        print(f"Processing file started: {file}")
        tasks.append(process_one(file))
    
    res = await asyncio.gather(*tasks)

    return len(res)

In [None]:
def process_many(root_folder: Path) -> None:
    """
    Processes all Markdown files in a root folder recursively and generates their semantics analysis.
    """
    
    return asyncio.run(supervisor(root_folder))

process_many(Path("../res/test_batch"))

## Section Class: Recursive Document Structure

In [None]:
# | export
class Section(BaseModel):
    summary: Optional[str] = None
    paragraphs: List[str] = Field(default_factory=list)
    figures: List[Figure] = Field(default_factory=list)
    tables: List[Table] = Field(default_factory=list)
    subsections: List["Section"] = Field(default_factory=list)

    def __init__(
        self,
        summary: Optional[str] = None,
        paragraphs: Optional[List[str]] = None,
        figures: Optional[List[Figure]] = None,
        tables: Optional[List[Table]] = None,
        subsections: Optional[List[dict]] = None,
    ):
        # Recursively initialize subsections if provided as dicts
        if subsections is not None:
            subs = [Section(**s) if isinstance(s, dict) else s for s in subsections]
        else:
            subs = []
        super().__init__(
            summary=summary,
            paragraphs=paragraphs or [],
            figures=figures or [],
            tables=tables or [],
            subsections=subs,
        )

    @classmethod
    def init(cls, md: Markdown):
        # Placeholder for initialization from Markdown
        return cls()

    @classmethod
    def update_forward_refs(cls, **localns):
        ...
        #BaseModel.model_rebuild()


# Support for recursive Section references
Section.model_rebuild()