In [1]:
# from __future__ import annotations
import copy
# from llama_index.core import a

In [19]:
from typing import TypedDict, Dict, List, Tuple,Optional

from llama_index.core import Document
from llama_index.core import SimpleDirectoryReader
from llama_index.core.readers.base import BaseReader


class LineType(TypedDict):
    """Line type as typed dict."""
    metadata: Dict[str, str]
    content: str


class HeaderType(TypedDict):
    """Header type as typed dict."""

    level: int
    name: str
    data: str


class MyFileReader(BaseReader):
    def __init__(self, headers_to_split_on: List[Tuple[str, str]], strip_headers: bool = True, ):
        self.headers_to_split_on = sorted(
            headers_to_split_on, key=lambda split: len(split[0]), reverse=True
        )
        self.strip_headers = strip_headers

    def load_data(self, file, extra_info=None, ):

        with open(file, "r", encoding="utf-8") as f:
            text = f.read()
        # load_data returns a list of Document objects
        # law_source = str(file).split("\\")[-1].split(".")[0]
        documents = [Document(text = text)]
        texts, metadatas = [], []
        for doc in documents:
            md_docs = self.split_text(doc.text)
            for md_doc in md_docs:
                texts.append(md_doc.page_content)

                metadatas.append(
                    md_doc.metadata | doc.metadata | {"book": md_doc.metadata.get("header1")})

        return [Document(text=text, metadata={"law_source": metadatas}) for text in texts]
    # noinspection PyTypeChecker
    def split_text(self, text: str):
        lines = text.split("\n")

        lines_with_metadata: List[LineType] = []
        current_content: List[str] = []
        current_metadata: Dict[str, str] = {}
        header_stack: List[HeaderType] = []
        initial_metadata: Dict[str, str] = {}

        in_code_block = False
        opening_fence = ""

        for line in lines:
            stripped_line = line.strip()

            stripped_line = "".join(filter(str.isprintable, stripped_line))

            if not in_code_block:
                # Exclude inline code spans
                if stripped_line.startswith("```") and stripped_line.count("```") == 1:
                    in_code_block = True
                    opening_fence = "```"
                elif stripped_line.startswith("~~~"):
                    in_code_block = True
                    opening_fence = "~~~"
            else:
                if stripped_line.startswith(opening_fence):
                    in_code_block = False
                    opening_fence = ""

            if in_code_block:
                current_content.append(stripped_line)
                continue
            for sep, name in self.headers_to_split_on:
                if stripped_line.startswith(sep) and (
                        len(stripped_line) == len(sep) or stripped_line[len(sep)] == " "
                ):
                    if name is None:
                        current_header_level = sep.count("#")
                        while (
                                header_stack and header_stack[-1]['level'] >= current_header_level
                        ):
                            popped_header = header_stack.pop()
                            if popped_header['name'] in initial_metadata:
                                initial_metadata.pop(popped_header['name'])
                            header: HeaderType = {
                                "level": current_header_level,
                                "name": name,
                                "data": stripped_line[len(sep):].strip(),
                            }
                            header_stack.append(header)
                            # Update initial_metadata with the current header
                            initial_metadata[name] = header["data"]
                        if current_content:
                            lines_with_metadata.append(
                                {
                                    "content": "\n".join(current_content),
                                    "metadata": current_metadata.copy(),
                                }
                            )
                            current_content.clear()

                    if not self.strip_headers:
                        current_content.append(stripped_line)

                    break
            else:
                if stripped_line:
                    current_content.append(stripped_line)
                elif current_content:
                    lines_with_metadata.append(
                        {
                            "content": "\n".join(current_content),
                            "metadata": current_metadata.copy(),
                        }
                    )
                    current_content.clear()

            current_metadata = initial_metadata.copy()

            if current_content:
                lines_with_metadata.append(
                    {"content": "\n".join(current_content), "metadata": current_metadata}
                )

            # lines_with_metadata has each line with associated header metadata
            # aggregate these into chunks based on common metadata
            return [
                Document(text=chunk["content"], metadata=chunk["metadata"])
                for chunk in lines_with_metadata
            ]
    def create_documents(
        self, texts: List[str], metadatas: Optional[List[dict]] = None
    ) -> List[Document]:
        """Create documents from a list of texts."""
        _metadatas = metadatas or [{}] * len(texts)
        documents = []
        for i, text in enumerate(texts):
            index = 0
            previous_chunk_len = 0
            for chunk in self.split_text(text):
                metadata = copy.deepcopy(_metadatas[i])
                if True:
                    offset = index + previous_chunk_len - 50
                    index = text.find(chunk, max(0, offset))
                    metadata["start_index"] = index
                    previous_chunk_len = len(chunk)
                new_doc = Document(text=chunk, metadata=metadata)
                documents.append(new_doc)
        return documents


In [20]:


reader = SimpleDirectoryReader(
    input_dir="./Law-Book", recursive=True, file_extractor={".md": MyFileReader([
            ("#", "header1"),
            ("##", "header2"),
            ("###", "header3"),
            ("####", "header4"),
        ])}
)

documents = reader.load_data()
print(documents)

[]


In [21]:
documents

[]

In [14]:
from llama_index.core.node_parser import SentenceSplitter,MarkdownNodeParser,MarkdownElementNodeParser

In [4]:
docs = SimpleDirectoryReader(input_dir=".\Law-Book",exclude=["md"],recursive=True).load_data()

In [15]:
markdown_splitter = MarkdownNodeParser()
markdown_ele_splitter = MarkdownElementNodeParser()

In [17]:
markdown_splitter_node = markdown_splitter.get_nodes_from_documents(docs)

In [37]:
markdown_splitter.get_nodes_from_node(docs[1])

[TextNode(id_='d49b4a1c-f344-4349-8bc6-fb7aad556724', embedding=None, metadata={}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='502353e5-e5d6-4cd1-a00e-f37ff9b395db', node_type=<ObjectType.DOCUMENT: '4'>, metadata={'file_path': 'D:\\code\\law_llama_system\\Law-Book\\1-宪法\\宪法.md', 'file_name': '宪法.md', 'file_size': 52540, 'creation_date': '2024-09-13', 'last_modified_date': '2024-01-22'}, hash='323706dfd9f77b663a726354766ddee525155af80b7f172754a4ff148be77449')}, text='序言\n\n中国是世界上历史最悠久的国家之一。中国各族人民共同创造了光辉灿烂的文化，具有光荣的革命传统。\n\n一八四○年以后，封建的中国逐渐变成半殖民地、半封建的国家。中国人民为国家独立、民族解放和民主自由进行了前仆后继的英勇奋斗。\n\n二十世纪，中国发生了翻天覆地的伟大历史变革。\n\n一九一一年孙中山先生领导的辛亥革命，废除了封建帝制，创立了中华民国。但是，中国人民反对帝国主义和封建主义的历史任务还没有完成。\n\n一九四九年，以毛泽东主席为领袖的中国共产党领导中国各族

In [42]:
documents[0].get_content(metadata_mode="none").split('\n\n')

['# 中华人民共和国宪法',
 '1982年12月4日 第五届全国人民代表大会第五次会议通过',
 '1982年12月4日 全国人民代表大会公告公布施行',
 '1988年4月12日 第七届全国人民代表大会第一次会议通过的《中华人民共和国宪法修正案》',
 '1993年3月29日 第八届全国人民代表大会第一次会议通过的《中华人民共和国宪法修正案》',
 '1999年3月15日 第九届全国人民代表大会第二次会议通过的《中华人民共和国宪法修正案》',
 '2004年3月14日 第十届全国人民代表大会第二次会议通过的《中华人民共和国宪法修正案》',
 '2018年3月11日 第十三届全国人民代表大会第一次会议通过的《中华人民共和国宪法修正案》',
 '<!-- INFO END -->',
 '## 序言',
 '中国是世界上历史最悠久的国家之一。中国各族人民共同创造了光辉灿烂的文化，具有光荣的革命传统。',
 '一八四○年以后，封建的中国逐渐变成半殖民地、半封建的国家。中国人民为国家独立、民族解放和民主自由进行了前仆后继的英勇奋斗。',
 '二十世纪，中国发生了翻天覆地的伟大历史变革。',
 '一九一一年孙中山先生领导的辛亥革命，废除了封建帝制，创立了中华民国。但是，中国人民反对帝国主义和封建主义的历史任务还没有完成。',
 '一九四九年，以毛泽东主席为领袖的中国共产党领导中国各族人民，在经历了长期的艰难曲折的武装斗争和其他形式的斗争以后，终于推翻了帝国主义、封建主义和官僚资本主义的统治，取得了新民主主义革命的伟大胜利，建立了中华人民共和国。从此，中国人民掌握了国家的权力，成为国家的主人。',
 '<!-- FORCE BREAK -->',
 '中华人民共和国成立以后，我国社会逐步实现了由新民主主义到社会主义的过渡。生产资料私有制的社会主义改造已经完成，人剥削人的制度已经消灭，社会主义制度已经确立。工人阶级领导的、以工农联盟为基础的人民民主专政，实质上即无产阶级专政，得到巩固和发展。中国人民和中国人民解放军战胜了帝国主义、霸权主义的侵略、破坏和武装挑衅，维护了国家的独立和安全，增强了国防。经济建设取得了重大的成就，独立的、比较完整的社会主义工业体系已经基本形成，农业生产显著提高。教育、科学、文化等事业有了很大的发展，社会主义思想教育取得了明显的成效。广大人民的生活有

In [22]:
len(docs)

2850

In [27]:
markdown_splitter_node[1]

TextNode(id_='4def94df-5280-4f07-b0ae-92ab8ca00994', embedding=None, metadata={'file_path': 'D:\\code\\law_llama_system\\Law-Book\\1-宪法\\宪法.md', 'file_name': '宪法.md', 'file_size': 52540, 'creation_date': '2024-09-13', 'last_modified_date': '2024-01-22'}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='502353e5-e5d6-4cd1-a00e-f37ff9b395db', node_type=<ObjectType.DOCUMENT: '4'>, metadata={'file_path': 'D:\\code\\law_llama_system\\Law-Book\\1-宪法\\宪法.md', 'file_name': '宪法.md', 'file_size': 52540, 'creation_date': '2024-09-13', 'last_modified_date': '2024-01-22'}, hash='323706dfd9f77b663a726354766ddee525155af80b7f172754a4ff148be77449')}, text='序言\n\n中国是世界上历史最悠久的国家之一。中国各族人民共同创造了光辉灿烂的文化，具有光荣的革命传统。\n\n一八四○年以后，封建的中国

In [20]:
len(markdown_splitter_node)

2850

In [18]:
markdown_ele_splitter_node = markdown_ele_splitter.get_nodes_from_documents(docs)

ValueError: 
******
Could not load OpenAI model. If you intended to use OpenAI, please check your OPENAI_API_KEY.
Original error:
No API key found for OpenAI.
Please set either the OPENAI_API_KEY environment variable or openai.api_key prior to initialization.
API keys can be found or created at https://platform.openai.com/account/api-keys

To disable the LLM entirely, set llm=None.
******

In [7]:
node_parser = SentenceSplitter()

In [11]:
docs[0].get_content(metadata_mode="none")

'\n\n中华人民共和国宪法\n\n1982年12月4日 第五届全国人民代表大会第五次会议通过\n\n1982年12月4日 全国人民代表大会公告公布施行\n\n1988年4月12日 第七届全国人民代表大会第一次会议通过的《中华人民共和国宪法修正案》\n\n1993年3月29日 第八届全国人民代表大会第一次会议通过的《中华人民共和国宪法修正案》\n\n1999年3月15日 第九届全国人民代表大会第二次会议通过的《中华人民共和国宪法修正案》\n\n2004年3月14日 第十届全国人民代表大会第二次会议通过的《中华人民共和国宪法修正案》\n\n2018年3月11日 第十三届全国人民代表大会第一次会议通过的《中华人民共和国宪法修正案》\n\n\n'

In [9]:
node = node_parser.get_nodes_from_documents(docs)