1、处理典故范围重叠

“㸌如羿射九日落，矫如群帝骖龙翔。” [4, 5, 6,射日弓（后羿）]; [2, 3, 4, 5,射日弓（后羿）]  -> [2, 3, 4, 5, 6,射日弓（后羿）]

In [6]:
import pandas as pd
import ast
import numpy as np

In [7]:


def merge_connected_spans(spans_str):
    try:
        spans = ast.literal_eval(spans_str)
    except:
        return spans_str
    
    if not spans:
        return spans
    
    # 将spans按起始位置排序
    spans.sort(key=lambda x: x[0])
    
    merged_spans = []
    current_positions = None
    
    for span in spans:
        if not isinstance(span, list) or len(span) < 2:
            continue
            
        # 获取当前span的所有位置
        current_span = list(range(span[0], span[-1] + 1))
        
        if current_positions is None:
            # 第一个span
            current_positions = current_span
            continue
            
        # 检查是否与当前positions相连
        if span[0] <= current_positions[-1] + 1:
            # 相连，合并
            # 使用集合去重，然后转回有序列表
            current_positions = sorted(list(set(current_positions + current_span)))
        else:
            # 不相连，保存当前positions并开始新的span
            merged_spans.append(current_positions)
            current_positions = current_span
    
    # 添加最后一组positions
    if current_positions:
        merged_spans.append(current_positions)
    
    return merged_spans

# 读取CSV文件
df = pd.read_csv('爬取的典故数据.csv', sep='\t')

# 处理allusion_index列
df['allusion_index'] = df['allusion_index'].apply(merge_connected_spans)

# 保存处理后的数据
df.to_csv('process_connected_spans_data.csv', sep='\t', index=False)

# 打印一些示例进行验证
print("处理后的一些示例：")
for i, row in df.iterrows():
    if isinstance(row['allusion_index'], list) and len(row['allusion_index']) > 1:
        print(f"原句：{row['sentence']}")
        print(f"典故：{row['allusion']}")
        print(f"处理后的spans：{row['allusion_index']}")
        print("-" * 50)

# 测试代码
test_cases = [
    "[[1, 2], [4, 5]]",  # 不相连的spans
    "[[1, 2], [4, 5, 6], [5, 6, 7]]",  # 部分相连的spans
    "[[1, 2], [2, 3], [3, 4]]",  # 全部相连的spans
]

print("测试结果：")
for test in test_cases:
    result = merge_connected_spans(test)
    print(f"输入: {test}")
    print(f"输出: {result}")
    print("-" * 50)


处理后的一些示例：
原句：僻居谋道不谋身，避病桃源不避秦。
典故：桃源（陶潜）
处理后的spans：[[10, 11], [13, 14]]
--------------------------------------------------
原句：何用深求避秦客，吾家便是武陵源。
典故：桃源（陶潜）
处理后的spans：[[4, 5, 6], [12, 13, 14]]
--------------------------------------------------
原句：悔学秦人南避地，武陵原上又徵师。
典故：桃源（陶潜）
处理后的spans：[[2, 3, 4, 5], [8, 9]]
--------------------------------------------------
原句：行尽绿潭潭转幽，疑是武陵春碧流。秦人鸡犬桃花里，将比通塘渠见羞。
典故：桃源（陶潜）
处理后的spans：[[10, 11, 12], [16, 17, 18, 19]]
--------------------------------------------------
原句：薜带何辞楚，桃源堪避秦。
典故：桃源（陶潜）
处理后的spans：[[6, 7], [9, 10]]
--------------------------------------------------
原句：见说桃源洞，如今犹避秦。
典故：桃源（陶潜）
处理后的spans：[[2, 3], [9, 10]]
--------------------------------------------------
原句：同作危时避秦客，此行何似武陵滩。
典故：桃源（陶潜）
处理后的spans：[[4, 5, 6], [12, 13]]
--------------------------------------------------
原句：青野雾销凝晋洞，碧山烟散避秦溪。已是大仙怜后进，不应来向武陵迷。
典故：桃源（陶潜）
处理后的spans：[[12, 13, 14], [28, 29]]
--------------------------------------------------
原句：闻说花源堪避秦，幽寻数月不逢人。
典故：桃源（陶潜）
处理后的spans：[[2, 3], [5,

2、处理未在提供的异形词范围内的情况

In [8]:
import difflib

In [9]:

# 加载数据
final_data = pd.read_csv('process_connected_spans_data.csv', sep='\t')  # 诗句和典故的原数据
variation_data = pd.read_csv('典故的异形数据.csv', sep='\t')  # 典故的异形数据

# 使用difflib计算最长公共子串
def longest_common_substring(str1, str2):
    sequence_matcher = difflib.SequenceMatcher(None, str1, str2)
    match = sequence_matcher.find_longest_match(0, len(str1), 0, len(str2))
    return str1[match.a: match.a + match.size]

# 创建一个新DataFrame用于存放修改过的行
modified_rows = []
no_match_rows = []  # 新增：存储没有匹配的行

# 遍历final_data中的每一行
for idx, row in final_data.iterrows():
    if row['variation_number'] == 0:
        sentence = row['sentence']
        allusions = row['allusion'].split(';')  # 拆分多个典故
        
        allusion_index = []  # 存储多个典故的匹配位置
        matched_allusions = 0  # 记录匹配成功的典故数

        for allusion in allusions:
            variation_list = variation_data[variation_data['allusion'] == allusion]['variation_list'].values
            
            if len(variation_list) > 0:
                variation_list = eval(variation_list[0])  # 将字符串类型的variation_list转换成列表
                merged_variations = ''.join(variation_list)  # 合并所有变体为一个长字符串
                
                # 查找最长公共子串
                longest_match = longest_common_substring(merged_variations, sentence)
                
                if longest_match:
                    # 更新变体列表，添加新的异形典故
                    if longest_match not in variation_list:
                        variation_list.append(longest_match)

                    # 更新variation_data
                    variation_data.at[variation_data[variation_data['allusion'] == allusion].index[0], 'variation_list'] = str(variation_list)
                    
                    # 记录匹配位置
                    match_start = sentence.find(longest_match)
                    match_end = match_start + len(longest_match)
                    allusion_index.append([match_start, match_end])  # 记录该典故的匹配位置

                    matched_allusions += 1  # 匹配成功的典故数加1

        if matched_allusions > 0:
            final_data.at[idx, 'variation_number'] = matched_allusions  # 更新variation_number为匹配的典故数
            final_data.at[idx, 'allusion_index'] = str(allusion_index)  # 保存多个典故的匹配位置

            # 保存修改后的行
            modified_rows.append(final_data.loc[idx])
        else:
            no_match_rows.append(final_data.loc[idx])  # 如果没有找到匹配的典故，记录此行

# 将修改过的行保存到新的DataFrame
modified_data = pd.DataFrame(modified_rows)

# 将没有匹配的行保存到新的DataFrame
no_match_data = pd.DataFrame(no_match_rows)

# 保存到新的CSV文件
final_data.to_csv('processed_data.csv', index=False, sep='\t')
variation_data.to_csv('updated_典故的异性数据.csv', index=False, sep='\t')
modified_data.to_csv('modified_rows.csv', index=False, sep='\t')  # 保存修改过的行
no_match_data.to_csv('no_match_rows.csv', index=False, sep='\t')  # 保存没有匹配的行

3、同诗句不同典故合并

In [10]:
data = pd.read_csv("processed_data.csv",sep = "	")

result = data.groupby('sentence').agg({
    'author': 'first',  # 保留第一条作者
    'title': 'first',   # 保留第一条标题
    'allusion': lambda x: ';'.join(x),  # 合并典故，以分号分隔
    'variation_number': 'sum',  # 求和典故变体数
    'allusion_index': lambda x: ';'.join(x)  # 合并索引，逗号分隔后再用分号分隔不同典故
}).reset_index()

def transform_allusion_index(row):
    # 拆分 allusion 和 allusion_index
    allusions = row['allusion'].split(';')
    allusion_indices = row['allusion_index'].split(';')
    
    # 组合为目标格式
    combined = []
    for allusion, indices in zip(allusions, allusion_indices):
        index_list = indices.strip('[]')  # 去掉索引的外部括号
        combined.append(f"[{index_list},{allusion}]")  # 按目标格式拼接

    return ';'.join(combined)

# 对 DataFrame 的每一行应用函数
result['transformed_allusion'] = result.apply(transform_allusion_index, axis=1)

def adjust_transformed_allusion(row):
    # 获取 transformed_allusion 的值
    transformed_allusion = row['transformed_allusion']
    allusions = row['allusion'].split(';')  # 获取所有典故名列表
    
    # 初始化变量
    adjusted = transformed_allusion
    current_allusion_index = 0  # 当前典故的索引
    
    # 遍历 transformed_allusion 并找到 ] 后的 ,
    i = 0
    while i < len(adjusted):
        if adjusted[i] == ']' and i + 1 < len(adjusted) and adjusted[i + 1] == ',':
            # 在 ] 前插入当前典故名
            current_allusion = allusions[current_allusion_index]
            adjusted = adjusted[:i] + f',{current_allusion}' + adjusted[i:]
            i += len(current_allusion) + 1  # 跳过插入的典故名长度
            
            # 如果当前典故名已经完全应用，切换到下一个典故
            current_allusion_index = min(current_allusion_index + 1, len(allusions) - 1)
        
        i += 1
    
    return adjusted

# 对 DataFrame 的每一行应用函数
result['transformed_allusion'] = result.apply(adjust_transformed_allusion, axis=1)


def replace_comma_after_bracket(row):
    return row['transformed_allusion'].replace('],', '];')

# 对 `transformed_allusion` 列应用替换函数
result['transformed_allusion'] = result.apply(replace_comma_after_bracket, axis=1)

result.to_csv('merged_data.csv', sep='\t', index=False, encoding='utf-8')

4、训练集测试集划分

In [12]:
from sklearn.model_selection import train_test_split
import os

In [15]:
def split_dataset(input_file, train_ratio=0.8, random_state=42):
    """
    将数据集分割为训练集和测试集
    
    Args:
        input_file: 输入文件路径 (final_data.csv)
        train_ratio: 训练集比例，默认0.8
        random_state: 随机种子，默认42
    """
    print(f"开始读取数据文件: {input_file}")
    
    # 读取CSV文件
    df = pd.read_csv(input_file, sep='\t')
    print(f"总数据量: {len(df)}")
    
    # 获取包含典故的样本
    df_with_allusions = df[df['variation_number'] != '0']
    
    print(f"包含典故的样本数: {len(df_with_allusions)}")
    
    # 分别对包含典故和不包含典故的样本进行分割
    train_df, test_df = train_test_split(
        df_with_allusions,
        train_size=train_ratio,
        random_state=random_state,
        shuffle=True
    )
    
    # 打乱数据顺序
    train_df = train_df.sample(frac=1, random_state=random_state).reset_index(drop=True)
    test_df = test_df.sample(frac=1, random_state=random_state).reset_index(drop=True)
    
    # 保存文件
    train_file = 'train.csv'
    test_file = 'test.csv'
    
    train_df.to_csv(train_file, sep='\t', index=False)
    test_df.to_csv(test_file, sep='\t', index=False)
    
    print("\n数据集分割完成:")
    print(f"训练集大小: {len(train_df)}")
    print(f"测试集大小: {len(test_df)}")
    print(f"训练集保存至: {train_file}")
    print(f"测试集保存至: {test_file}")
    
    # 打印包含典故的样本统计
    print("\n包含典故的样本统计:")
    print(f"训练集中包含典故的样本数: {len(train_df[train_df['variation_number'] != '0'])}")
    print(f"测试集中包含典故的样本数: {len(test_df[test_df['variation_number'] != '0'])}")

if __name__ == "__main__":
        input_file = "merged_data.csv"
        split_dataset(input_file) 

开始读取数据文件: merged_data.csv
总数据量: 13393
包含典故的样本数: 13393

数据集分割完成:
训练集大小: 10714
测试集大小: 2679
训练集保存至: train.csv
测试集保存至: test.csv

包含典故的样本统计:
训练集中包含典故的样本数: 10714
测试集中包含典故的样本数: 2679
