### Semantic Chunking <a id="SemanticChunking"></a>

为什么我们在处理文本时通常会使用固定的分块大小，而不考虑实际内容的语义意义。是不是可以基于文本的语义实现一种更好的方法来处理文本分块，即并非固定参数(**chunksize**)，而是基于语义自行动态确定参数。

我们可以通过**embedding**技术进行动态规划

**Embedding**将文本转化为高维空间中的向量的技术，这些向量能够反映出文本的语义内容。通过文本嵌入技术，可以捕捉到文本的深层次语义信息。当比较两段文本的嵌入向量时，可以根据它们在高维空间中的距离或者角度，来推断这两段文本在语义上的相似度或者差异。利用相似度，将语义上相似的文本自动分组在一起，形成聚类，这有助于更好地理解和组织大量的文本数据。


In [None]:
with open('file_path.txt') as file:
    essay = file.read()

#### 我们需要将文本进行拆分，拆分成多个单句，可以按照标点符号进行切分

split_char = ['.', '?', '!']

In [None]:
import re

# Splitting the essay on '.', '?', and '!'
single_sentences_list = re.split(r'[。\n]+', essay)
print (f"{len(single_sentences_list)} senteneces were found")

In [None]:
single_sentences_list[:10]

In [None]:
length = 0
for list in single_sentences_list:
    length = max(length, len(list))
length


我们需要为单个句子拼接更多的句子，但是 `list` 添加比较困难。因此将其转换为字典列表（`List[dict]`）

{ 'sentence' : XXX  , 'index' : 0}

In [None]:
sentences = [{'sentence': x, 'index' : i} for i, x in enumerate(single_sentences_list)]
sentences[:3]

In [None]:
def combine_sentences(sentences, buffer_size=2):
    # 
    
    combined_sentences = [
        ' '.join(sentences[j]['sentence'] for j in range(max(i - buffer_size, 0), min(i + buffer_size + 1, len(sentences))))
        for i in range(len(sentences))
    ]   
    # 更新原始字典列表，添加组合后的句子
    for i, combined_sentence in enumerate(combined_sentences):
        sentences[i]['combined_sentence'] = combined_sentence

    return sentences


In [None]:
sentences = combine_sentences(sentences)

In [None]:
sentences[:6]

接下来使用**embedding model**对**sentences** 进行编码

In [None]:
import re
from sentence_transformers import SentenceTransformer
import numpy as np
# 加载到GPU
import os
model = SentenceTransformer(model_name_or_path='your model path')


def combine_sentences(sentences, buffer_size=1):
    combined_sentences = [
        ' '.join(sentences[j]['sentence'] for j in range(max(i - buffer_size, 0), min(i + buffer_size + 1, len(sentences))))
        for i in range(len(sentences))
    ]   
    # 更新原始字典列表，添加组合后的句子
    for i, combined_sentence in enumerate(combined_sentences):
        sentences[i]['combined_sentence'] = combined_sentence

    return sentences

In [None]:
embeddings = model.encode([x['combined_sentence'] for x in sentences])

将embedding添加到sentence中

In [None]:
for i, sentence in enumerate(sentences):
    sentence['combined_sentence_embedding'] = embeddings[i]

In [None]:
sentences[1]

接下来需要根据余弦相似度进行切分

In [None]:
def cosine_similarity(vec1, vec2):
    """Calculate the cosine similarity between two vectors."""
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    return dot_product / (norm_vec1 * norm_vec2)

In [None]:
import numpy as np

In [None]:
cosine_similarity(sentences[0]['combined_sentence_embedding'], sentences[1]['combined_sentence_embedding'])

In [None]:
def calculate_cosine_distances(sentences):
    distances = []
    for i in range(len(sentences) - 1):
        embedding_current = sentences[i]['combined_sentence_embedding']
        embedding_next = sentences[i + 1]['combined_sentence_embedding']
        # Calculate cosine similarity
        similarity = cosine_similarity(embedding_current, embedding_next)
        # Convert to cosine distance
        distance = 1 - similarity
        distances.append(distance)
        # Store distance in the dictionary
        sentences[i]['distance_to_next'] = distance
    return distances, sentences

In [None]:
distances, sentences = calculate_cosine_distances(sentences)

In [None]:
sentences[-2]['distance_to_next']

In [None]:
max(distances)

In [None]:
import matplotlib.pyplot as plt

plt.plot(distances);

In [None]:
import numpy as np

plt.plot(distances)

y_upper_bound = 0.25
plt.ylim(0, y_upper_bound)
plt.xlim(0, len(distances))


# We need to get the distance threshold that we'll consider an outlier
# We'll use numpy .percentile() for this
breakpoint_percentile_threshold = 50
breakpoint_distance_threshold = np.percentile(distances, breakpoint_percentile_threshold) # If you want more chunks, lower the percentile cutoff
plt.axhline(y=breakpoint_distance_threshold, color='r', linestyle='-')
num_distances_above_theshold = len([x for x in distances if x > breakpoint_distance_threshold]) # The amount of distances above your threshold
plt.text(x=(len(distances)*.01), y=y_upper_bound/50, s=f"{num_distances_above_theshold + 1} Chunks")

# Then we'll get the index of the distances that are above the threshold. This will tell us where we should split our text
indices_above_thresh = [i for i, x in enumerate(distances) if x > breakpoint_distance_threshold] # The indices of those breakpoints on your list

# Start of the shading and text
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']

for i, breakpoint_index in enumerate(indices_above_thresh):
    start_index = 0 if i == 0 else indices_above_thresh[i - 1]
    end_index = breakpoint_index if i <= len(indices_above_thresh) - 1 else len(distances)

    plt.axvspan(start_index, end_index, facecolor=colors[i % len(colors)], alpha=0.25)
    plt.text(x=np.average([start_index, end_index]),
            y=breakpoint_distance_threshold + (y_upper_bound)/ 20,
            s=f"Chunk #{i}", horizontalalignment='center',
            rotation='vertical')
# # Additional step to shade from the last breakpoint to the end of the dataset
if indices_above_thresh:
    last_breakpoint = indices_above_thresh[-1]
    if last_breakpoint < len(distances):
        plt.axvspan(last_breakpoint, len(distances), facecolor=colors[len(indices_above_thresh) % len(colors)], alpha=0.25)
        plt.text(x=np.average([last_breakpoint, len(distances)]),
                 y=breakpoint_distance_threshold + (y_upper_bound)/ 20,
                 s=f"Chunk #{i+1}",
                 rotation='vertical')
plt.title("Essay Chunks Based On Embedding Breakpoints")
plt.xlabel("Index of sentences in essay (Sentence Position)")
plt.ylabel("Cosine distance between sequential sentences")
plt.show()

In [None]:
# Initialize the start index
start_index = 0

# Create a list to hold the grouped sentences
chunks = []

# Iterate through the breakpoints to slice the sentences
for index in indices_above_thresh:
    # The end index is the current breakpoint
    end_index = index

    # Slice the sentence_dicts from the current start index to the end index
    group = sentences[start_index:end_index + 1]
    combined_text = ' '.join([d['sentence'] for d in group])
    chunks.append(combined_text)
    
    # Update the start index for the next group
    start_index = index + 1

# The last group, if any sentences remain
if start_index < len(sentences):
    combined_text = ' '.join([d['sentence'] for d in sentences[start_index:]])
    chunks.append(combined_text)

# grouped_sentences now contains the chunked sentences

In [None]:
len(max(chunks))

In [None]:
for chunk in chunks:
    print("---" + chunk)

#把chunks写入txt文件
with open('chunks.txt', 'w') as f:
    for chunk in chunks:
        f.write(chunk+'\n')

In [None]:
for i, chunk in enumerate(chunks[:2]):
    buffer = 200
    print (f"Chunk #{i}")
    print (chunk[:buffer].strip())
    print ("...")
    print (chunk[-buffer:].strip())
    print ("\n")