# 一、基于文本长度的分块
## 1.1 固定大小分块

In [1]:
from langchain.text_splitter import CharacterTextSplitter

text = "这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块。"
text_splitter = CharacterTextSplitter(
    separator="",
    chunk_size=10,
    chunk_overlap=0
)
chunks = text_splitter.split_text(text)
print(chunks)

['这是一段很长的文本，', '用于测试分块功能。希', '望能够正确地将它分成', '多个块。']


## 加入chunk_overlap

In [2]:
from langchain.text_splitter import CharacterTextSplitter

text = "这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块。"
text_splitter = CharacterTextSplitter(
    separator="",
    chunk_size=10,
    chunk_overlap=5
)
chunks = text_splitter.split_text(text)
print(chunks)

['这是一段很长的文本，', '长的文本，用于测试分', '用于测试分块功能。希', '块功能。希望能够正确', '望能够正确地将它分成', '地将它分成多个块。']


## 加入separator

In [3]:
from langchain.text_splitter import CharacterTextSplitter

text = "这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块。"
text_splitter = CharacterTextSplitter(
    separator="。",
    chunk_size=10,
    chunk_overlap=5
)
chunks = text_splitter.split_text(text)
print(chunks)

Created a chunk of size 18, which is longer than the specified 10


['这是一段很长的文本，用于测试分块功能', '希望能够正确地将它分成多个块']


划分的块有可能会大于chunk_size

In [4]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

text = "这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块。"
def word_count(text):
    return len(text.split())
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=10,
    chunk_overlap=0
)
chunks = text_splitter.split_text(text)
print(chunks)

['这是一段很长的文本，', '用于测试分块功能。希', '望能够正确地将它分成', '多个块。']


使用RecursiveCharacterTextSplitter可以保证输出块大小小于等于chunk_size

In [5]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
import nltk

nltk.download('punkt')
text = "这是一个句子。这是另一个句子。这还是一个句子。"
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=20,
    chunk_overlap=0,
    separators=["\n", "。"]
)
chunks = text_splitter.split_text(text)
print(chunks)

['这是一个句子。这是另一个句子', '。这还是一个句子。']


[nltk_data] Downloading package punkt to /Users/hezhidong/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [6]:
from langchain_text_splitters import CharacterTextSplitter


test_text = """这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块。"""

text_splitter = CharacterTextSplitter(
    separator="。",
    chunk_size = 10,
    chunk_overlap = 5,
    length_function = len,
    is_separator_regex = False,
)
texts = text_splitter.create_documents([test_text])
for text in texts:
    print(text)

Created a chunk of size 18, which is longer than the specified 10


page_content='这是一段很长的文本，用于测试分块功能'
page_content='希望能够正确地将它分成多个块'


In [7]:
from langchain_text_splitters import CharacterTextSplitter


test_text = """这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块。"""

text_splitter = CharacterTextSplitter(
    separator="。",
    chunk_size = 50,
    chunk_overlap = 5,
    length_function = len,
    is_separator_regex = False,
)
texts = text_splitter.create_documents([test_text])
for text in texts:
    print(text)

page_content='这是一段很长的文本，用于测试分块功能。希望能够正确地将它分成多个块'


## 1.2 可变长度分块

In [8]:
def variable_length_chunking(text, core_sections=[], core_chunk_size=300, other_chunk_size=800):
    chunks = []
    for i, section in enumerate(text.split("\n\n")):
        if i in core_sections:
            for j in range(0, len(section), core_chunk_size):
                chunks.append(section[j:j + core_chunk_size])
        else:
            for j in range(0, len(section), other_chunk_size):
                chunks.append(section[j:j + other_chunk_size])
    return chunks

text = "摘要：这是一篇关于人工智能的论文摘要。\n\n实验过程：详细的实验过程描述，包括多个步骤和数据。\n\n结论：得出了一些重要结论。"
chunks = variable_length_chunking(text, core_sections=[0, 2], core_chunk_size=300, other_chunk_size=800)
print(chunks)

['摘要：这是一篇关于人工智能的论文摘要。', '实验过程：详细的实验过程描述，包括多个步骤和数据。', '结论：得出了一些重要结论。']


# 二、基于语义的分块
## 2.1 基于句子的分块

In [9]:
import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize

def sentence_based_chunking(text):
    sentences = sent_tokenize(text)
    chunks = []
    chunk = ""
    for sentence in sentences:
        if len(chunk) + len(sentence) < 500:
            chunk += sentence + " "
        else:
            chunks.append(chunk.strip())
            chunk = sentence + " "
    if chunk:
        chunks.append(chunk.strip())
    return chunks

text = "这是一个句子。这是另一个句子。它们共同构成了一个段落。"
chunks = sentence_based_chunking(text)
print(chunks)

['这是一个句子。这是另一个句子。它们共同构成了一个段落。']


[nltk_data] Downloading package punkt to /Users/hezhidong/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


## 2.2 基于段落的分块

In [10]:
def paragraph_based_chunking(text):
    chunks = text.split("\n\n")
    return chunks

text = "这是第一段。\n\n这是第二段。\n\n这是第三段。"
chunks = paragraph_based_chunking(text)
print(chunks)

['这是第一段。', '这是第二段。', '这是第三段。']


## 2.3 基于语义单元的分块（使用语言模型）

In [11]:
from transformers import AutoTokenizer, AutoModel
import torch

def semantic_chunking(text, model_name="bert-base-uncased", max_chunk_length=512):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)
    tokens = tokenizer.encode(text, add_special_tokens=False)
    chunks = []
    start = 0
    while start < len(tokens):
        end = min(start + max_chunk_length, len(tokens))
        input_ids = torch.tensor([tokens[start:end]])
        with torch.no_grad():
            outputs = model(input_ids)
            # 这里可以添加更复杂的语义相似性判断逻辑，暂时简化处理
        chunks.append(tokenizer.decode(input_ids[0]))
        start = end
    return chunks

text = "这是一段语义较为复杂的文本，需要使用语言模型进行分块处理。"
chunks = semantic_chunking(text)
print(chunks)

  from .autonotebook import tqdm as notebook_tqdm


['[UNK] [UNK] 一 [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] 的 文 本 ， [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] [UNK] 行 分 [UNK] [UNK] [UNK] 。']


# 三、基于逻辑结构的分块
## 3.1 基于标题的分块

In [12]:
def title_based_chunking(text, title_delimiter="##"):
    chunks = []
    parts = text.split(title_delimiter)
    for part in parts:
        if part.strip():
            chunks.append(title_delimiter + part)
    return chunks

text = "## 章节一\n内容一。\n## 章节二\n内容二。"
chunks = title_based_chunking(text)
print(chunks)

['## 章节一\n内容一。\n', '## 章节二\n内容二。']


## 3.2 基于标记的分块

In [13]:
def tag_based_chunking(xml_text, tag="<item>"):
    import re
    chunks = re.findall(f"{tag}[^<]*</item>", xml_text)
    return chunks

xml_text = "<items><item>内容一</item><item>内容二</item></items>"
chunks = tag_based_chunking(xml_text)
print(chunks)

['<item>内容一</item>', '<item>内容二</item>']


## 3.3 基于结构的分块

In [14]:
from bs4 import BeautifulSoup

# 示例 HTML 文本
html_content = """
<html>
<head><title>Page Title</title></head>
<body>
<h1>This is a heading</h1>
<p>This is a paragraph.</p>
<ul>
<li>List item 1</li>
<li>List item 2</li>
</ul>
</body>
</html>
"""

# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(html_content, 'html.parser')

# 定义基于结构的分块逻辑
def split_by_structure(soup):
    chunks = []
    for tag in soup.find_all(True):  # 查找所有标签
        if tag.name in ['p', 'h1', 'h2', 'h3', 'li']:  # 只关注某些类型的标签
            chunks.append(tag.get_text())
    return chunks

# 执行基于结构的分块
structure_chunks = split_by_structure(soup)
print(structure_chunks)

['This is a heading', 'This is a paragraph.', 'List item 1', 'List item 2']


## 3.4  JSON数据分块

In [15]:
from langchain.text_splitter import RecursiveCharacterTextSplitter


def json_to_text(json_data):
    """
    将 JSON 数据转换为字符串格式
    """
    import json
    return json.dumps(json_data, ensure_ascii=False, indent=4)


# 示例 JSON 数据
json_data = {
    "name": "张三",
    "age": 30,
    "address": {
        "street": "南京路",
        "city": "上海",
        "zipcode": "200000"
    },
    "hobbies": ["读书", "运动", "音乐"],
    "work_experience": [
        {
            "company": "公司 A",
            "position": "软件工程师",
            "start_date": "2010-01-01",
            "end_date": "2015-12-31"
        },
        {
            "company": "公司 B",
            "position": "高级软件工程师",
            "start_date": "2016-01-01",
            "end_date": "2020-12-31"
        }
    ]
}


# 将 JSON 数据转换为文本
text = json_to_text(json_data)


# 使用 RecursiveCharacterTextSplitter 进行分块
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n", " ", "{"],
    chunk_size=200,
    chunk_overlap=0
)


# 对文本进行分块
chunks = text_splitter.split_text(text)


print(chunks)

['{\n    "name": "张三",\n    "age": 30,\n    "address": {\n        "street": "南京路",\n        "city": "上海",\n        "zipcode": "200000"\n    },\n    "hobbies": [\n        "读书",\n        "运动",\n        "音乐"\n    ],', '"work_experience": [\n        {\n            "company": "公司 A",\n            "position": "软件工程师",\n            "start_date": "2010-01-01",\n            "end_date": "2015-12-31"\n        },\n        {', '"company": "公司 B",\n            "position": "高级软件工程师",\n            "start_date": "2016-01-01",\n            "end_date": "2020-12-31"\n        }\n    ]\n}']


In [16]:
from langchain_text_splitters import RecursiveJsonSplitter

# 使用 RecursiveCharacterTextSplitter 进行分块
json_splitter = RecursiveJsonSplitter(max_chunk_size=300)
# 还可以设置min_chunk_size


# 对文本进行分块
chunks = json_splitter.split_json(json_data)
print(chunks)

[{'name': '张三', 'age': 30, 'address': {'street': '南京路', 'city': '上海', 'zipcode': '200000'}, 'hobbies': ['读书', '运动', '音乐']}, {'work_experience': [{'company': '公司 A', 'position': '软件工程师', 'start_date': '2010-01-01', 'end_date': '2015-12-31'}, {'company': '公司 B', 'position': '高级软件工程师', 'start_date': '2016-01-01', 'end_date': '2020-12-31'}]}]


In [17]:
# 当json里面有一个大列表时，按上述方法会完整保留，这样就会导致片段的长度超长。我们可以指定convert_lists=True来预处理json
chunks = json_splitter.split_json(json_data, convert_lists=True)
print(chunks)

[{'name': '张三', 'age': 30, 'address': {'street': '南京路', 'city': '上海', 'zipcode': '200000'}, 'hobbies': {'0': '读书', '1': '运动', '2': '音乐'}}, {'work_experience': {'0': {'company': '公司 A', 'position': '软件工程师', 'start_date': '2010-01-01', 'end_date': '2015-12-31'}}}, {'work_experience': {'1': {'company': '公司 B', 'position': '高级软件工程师', 'start_date': '2016-01-01', 'end_date': '2020-12-31'}}}]


# 四、混合分块方法

In [18]:
def hybrid_chunking(text):
    paragraph_chunks = paragraph_based_chunking(text)
    final_chunks = []
    for paragraph in paragraph_chunks:
        sentence_chunks = sentence_based_chunking(paragraph)
        final_chunks.extend(sentence_chunks)
    return final_chunks

text = "这是第一段。这是第一句。这是第二句。\n\n这是第二段。这是第三句。这是第四句。"
chunks = hybrid_chunking(text)
print(chunks)

['这是第一段。这是第一句。这是第二句。', '这是第二段。这是第三句。这是第四句。']


In [19]:
# from sklearn.decomposition import LatentDirichletAllocation as LDA
# from sklearn.feature_extraction.text import CountVectorizer
# import numpy as np

# # 假设我们已经有了一个包含多个文档的列表
# documents = ["文档1的内容", "文档2的内容", ...]

# # 使用 CountVectorizer 和 LDA 进行主题建模
# vectorizer = CountVectorizer()
# X = vectorizer.fit_transform(documents)

# lda = LDA(n_components=5, random_state=0)
# lda.fit(X)

# # 获取每个文档的主题分布
# topic_distributions = lda.transform(X)

# # 简单地按最大概率的主题分配文档
# for i, doc in enumerate(documents):
#     topic_idx = np.argmax(topic_distributions[i])
#     print(f"Document {i+1} is most likely about topic {topic_idx}")

请注意，上述代码仅展示了如何确定每个文档的主题，并未实际执行分块操作。要真正实现基于主题的分块，还需要进一步逻辑来将同一主题下的内容合并。

# 基于实体的分块

In [20]:
import spacy
from langchain.text_splitter import CharacterTextSplitter

# 加载 SpaCy 模型
nlp = spacy.load("en_core_web_sm")

# 示例文本
text = "Bill Gates founded Microsoft Corporation in 1975."

# 使用 SpaCy 进行命名实体识别
doc = nlp(text)

# 定义基于实体的分块函数
def split_by_entities(text, doc):
    entity_spans = [ent for ent in doc.ents]
    if not entity_spans:
        return [text]  # 如果没有实体，则返回整个文本作为一个分块
    
    chunks = []
    last_end = 0
    for span in entity_spans:
        start = span.start_char
        end = span.end_char
        
        if start > last_end:
            chunks.append(text[last_end:start])  # 添加非实体部分
        
        chunks.append(text[start:end])  # 添加实体部分
        last_end = end
    
    if last_end < len(text):
        chunks.append(text[last_end:])  # 添加剩余部分
    
    return chunks

# 执行基于实体的分块
entity_chunks = split_by_entities(text, doc)
print(entity_chunks)

['Bill Gates', ' founded ', 'Microsoft Corporation', ' in ', '1975', '.']


# 基于对话的分块

In [21]:
import re

# 示例对话文本
dialogue = """
Alice: Hello!
Bob: Hi Alice, how are you?
Alice: I'm good, thanks! What about you?
Bob: Doing well, thank you.
"""

# 使用正则表达式匹配对话模式
pattern = r'([A-Za-z]+):\s*(.*?)(?=\n[A-Za-z]+:|$)'

# 执行基于对话的分块
dialogue_chunks = re.findall(pattern, dialogue, flags=re.DOTALL)
for speaker, utterance in dialogue_chunks:
    print(f"{speaker}: {utterance.strip()}")

Alice: Hello!
Bob: Hi Alice, how are you?
Alice: I'm good, thanks! What about you?
Bob: Doing well, thank you.
