安装依赖

conda create -n wiki python=3.10

conda activate wiki

pip install -r requirements.txt

# 提取数据

### wikiextractor的好处
- 自动识别并清除语法结构（引用；模板、文件、HTML标签、表格、图像引用等），只保留正文内容
- 分段合理，保留文章结构，每篇文章会被解析为：title，text，并按段落分块，便于后续使用或精调。
- 支持大规模并行
- 多语言支持


In [None]:
!wikiextractor ../zhwiki-20250601-pages-articles-multistream.xml.bz2 -o extracted --json

# 启发式过滤

为了提升语料质量，我们在预处理后的文本上应用了一系列启发式规则，主要针对维基数据中的冗余、无效或格式不规范内容进行过滤和清洗。以下是具体策略：
1. 长度过滤(太短可能信息不足)
2. 重定向过滤
3. 语言转换（处理 Wikipedia 的多语言模板 -{zh-cn:...}-，以及利用opencc库繁体换简体）
4. 结构冗余清理（清除空括号 (), （，）, （，）等格式残留或错误）
5. 无意义标题过滤（删除标题/列表类短句：若该行为标题式内容（字数≤ 15 不构成完整句子），认为是结构噪声，删除该行）
6. 英文比例过滤（对于英文字符明显超过中文字符的句子，可跳过保留中文主干为主）


In [None]:
# clean_and_convert.py
import os
import json
import jsonlines
import re
from tqdm import tqdm
from opencc import OpenCC

cc = OpenCC('t2s')

def is_valid_text(text):
    """过滤无效内容（太短、含特殊词、重定向）"""
    if len(text.strip()) < 200 or len(text.strip()) >8000:
        return False
    if '#REDIRECT' in text or '#重定向' in text:
        return False
    return True

def clean_text(text):
    """简单清洗模板/引用/分类"""
    #text = re.sub(r'\[\[Category:[^\]]+\]\]', '', text)  # 删除分类
    #text = re.sub(r'<ref[^<]*</ref>', '', text)          # 删除ref引用
    #text = re.sub(r'{{[^{}]+}}', '', text)               # 删除模板
    #text = re.sub(r'==+[^=]+==+', '', text)              # 删除标题
    text = re.sub(r'\n+', '\n', text).strip()            # 多个换行合并
    text = cc.convert(text)
    return text


def remove_bad_parentheses(text):
    def should_remove(content):
        # 超过一半是英文字母或特殊符号
        english = len(re.findall(r'[A-Za-z]', content))
        chinese = len(re.findall(r'[\u4e00-\u9fff]', content))
        symbols = len(re.findall(r'[\W_]', content))  # 非字母数字
        total = max(len(content), 1)

        english_ratio = english / total
        symbol_ratio = symbols / total

        # 明显是拉丁文名或特殊语种
        bad_keywords = ['学名', '拉丁', '英文', '英语', '德语', 'Latin', '名称','旧称','译名','港译','又译','日语','日文']

        if english_ratio > 0.7 and chinese == 0:
            return True
        if symbol_ratio > 0.4:
            return True
        if any(kw in content for kw in bad_keywords):
            return True
        return False

    # 匹配中英文括号对
    return re.sub(
        r'（([^（）]{0,30})）|[(（]([^()（）]{0,30})[)）]',
        lambda m: (
            '' if should_remove((m.group(1) or '') + (m.group(2) or '')) else m.group(0)
        ),
        text
    )

def clean_text_2(text: str) -> str:
    # 1. 处理语言选择语法：保留 zh-cn 部分
    text = re.sub(r'-\{[^{}]*?zh-cn:([^;{}]+?)(;[^{}]*)?\}-', r'\1', text)
    text = re.sub(r'-\{[^{}]*?zh-hans:([^;{}]+?)(;[^{}]*)?\}-', r'\1', text)

    # 2. 括号内英文拉丁乱码处理
    text = remove_bad_parentheses(text)
    # 去掉括号内语言标注类内容，如“德语: xxx”、“英语: xxx”、“法语: xxx”
    text = re.sub(r'[（(](德语|英语|法语|俄语|日语|韩语|西班牙语|拉丁语|荷兰语|捷克语|土耳其语|意大利语|葡萄牙语|匈牙利语|芬兰语|乌克兰语|保加利亚语|希腊语|瑞典语|丹麦语|挪威语|罗马尼亚语|斯洛文尼亚语|爱尔兰语|波兰语|阿拉伯语|希伯来语|世界语)[：:][^）)]{1,50}[）)]', '', text)
    # 去除括号内以标点开头的内容（如"（，缩写：EUVE）"）
    text = re.sub(r'[（(][，、。、：:；;\s\'"‘’“”`~^!@#$%^&*，.?!\-+=<>…·╯￣_＝×]{1}[^（）()]{0,50}[）)]', '', text)

    text = re.sub(r'\((\s*[，、,]?\s*)\)', '', text)
    text = re.sub(r'（\s*[，、,]?\s*）', '', text)

    # 3. 删除无意义的短标题行（≤ 5字，且无标点）
    cleaned_lines = []
    for line in text.splitlines():
        stripped = line.strip()
        if len(stripped) <= 15:
            continue  # 跳过这类短标题行
        cleaned_lines.append(line)
    text = "\n".join(cleaned_lines)

    return text.strip()


In [None]:
import re

def is_mostly_english(line: str, threshold: float = 0.7) -> bool:
    english_chars = len(re.findall(r'[A-Za-z]', line))
    chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', line))
    total = len(line.strip())

    if total == 0:
        return False

    ratio = english_chars / (total + 1e-5)
    if english_chars > chinese_chars * 2 or ratio > threshold:
        return True
    return False


### 过滤可视化

In [None]:
import os
import json

def inspect_cleaning(input_dir, max_docs=5):
    count = 0

    for dirpath, _, filenames in os.walk(input_dir):
        print(filenames.sort())
        for fname in filenames:
            if count >= max_docs:
                return
            path = os.path.join(dirpath, fname)
            
            with open(path, 'r', encoding='utf-8') as f:
                for line in f:
                    if count >= max_docs:
                        return
                    data = json.loads(line)
                    raw_text = data['text']
                    title = data['title']
                    id = data['id']
                    cleaned_text = clean_text(raw_text)
                    cleaned_text = clean_text_2(cleaned_text)

                    print("=" * 80)
                    print(f"📄 文件名: {fname}")
                    print(title,id)
                    print(f"🧾 原始文本前 300 字:\n{raw_text[:]}")
                    print("-" * 80)
                    print(f"✅ 清洗后文本前 300 字:\n{cleaned_text[:]}")
                    print("=" * 80)

                    #input("🔍 按 Enter 查看下一条，或 Ctrl+C 中止...")
                    count += 1


In [None]:
inspect_cleaning("extracted/AA/", max_docs=5)


### 保存jsonl文件

In [None]:
def processing_and_save(input_dir, output_jsonl_path, max_docs=1000):
    count = 0
    with jsonlines.open(output_jsonl_path, 'w') as writer:
        for dirpath, _, filenames in os.walk(input_dir):
            filenames.sort()
            for fname in filenames:
                if count >= max_docs:
                    return
                path = os.path.join(dirpath, fname)
                with open(path, 'r', encoding='utf-8') as f:
                    for line in f:
                        if count >= max_docs:
                            return
                        data = json.loads(line)
                        raw_text = data['text']
                        title = cc.convert(data.get('title', ''))
                        id_ = data.get('id', '')

                        cleaned = clean_text(raw_text)
                        cleaned = clean_text_2(cleaned)
                        if not is_valid_text(cleaned):
                            continue

                        # 保存为对比 jsonl 文件
                        writer.write({"text":cleaned.strip(),
                            'meta':{"id": id_,
                                              "source": "zhwiki",
                                              'language':'zh-cn',
                                              "title": title},
                            
                        })

                        count += 1

In [None]:
#同时保存raw和cleaned方便后续对比
cc = OpenCC('t2s')  # 繁转简

def processing_and_save(input_dir, output_jsonl_path, output_jsonl_path2, max_docs=1000):
    count = 0
    with jsonlines.open(output_jsonl_path, 'w') as cleaned_writer, \
         jsonlines.open(output_jsonl_path2, 'w') as raw_writer:

        for dirpath, _, filenames in os.walk(input_dir):
            filenames.sort()
            for fname in filenames:
                if count >= max_docs:
                    return
                path = os.path.join(dirpath, fname)
                with open(path, 'r', encoding='utf-8') as f:
                    for line in f:
                        if count >= max_docs:
                            return
                        data = json.loads(line)
                        raw_text = data['text']
                        title = cc.convert(data.get('title', ''))
                        id_ = data.get('id', '')

                        # 1. 保存原始数据
                        raw_writer.write({
                            "text": raw_text.strip(),
                            "meta": {
                                "id": id_,
                                "source": "zhwiki",
                                "language": "zh-cn",
                                "title": title
                            }
                        })
                        count += 1  # 注意：只以原始为基准

                        # 2. 清洗并保存清洗后的（可能跳过）
                        cleaned = clean_text(raw_text)
                        cleaned = clean_text_2(cleaned)
                        if not is_valid_text(cleaned):
                            continue

                        cleaned_writer.write({
                            "text": cleaned.strip(),
                            "meta": {
                                "id": id_,
                                "source": "zhwiki",
                                "language": "zh-cn",
                                "title": title
                            }
                        })

In [None]:
n =20000
processing_and_save(input_dir='extracted', output_jsonl_path=f'cleaned_samples_{n}.jsonl',output_jsonl_path2=f'raw_samples_{n}.jsonl', max_docs=n)

In [None]:
#绘图分析处理前后内容长度
import json
import matplotlib.pyplot as plt

# 文件路径
cleaned_path = f"cleaned_samples_{n}.jsonl"
raw_path = f"raw_samples_{n}.jsonl"

# 统计每条 text 的长度
def get_text_lengths(filepath):
    lengths = []
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            obj = json.loads(line)
            text = obj.get("text", "")
            lengths.append(len(text))
    return lengths

cleaned_lengths = get_text_lengths(cleaned_path)
raw_lengths = get_text_lengths(raw_path)

def plot_trimmed_hist( data, label, bins=40, logy=False, q_range=(0.0, 0.99)):
    # 1. 计算百分位数范围
    lower, upper = np.quantile(data, q_range)

    # 2. 仅绘制位于中间90%区间的数据
    trimmed = [x for x in data if lower <= x <= upper]
    
    # 3. 画图
    plt.hist(trimmed, bins=bins, density=True,alpha=0.6,label=label)

# 画图
plt.figure(figsize=(8, 6))
#plot_trimmed_hist(raw_lengths, bins=40,label="Raw",q_range=(0.0, 0.99))
#plot_trimmed_hist(cleaned_lengths, bins=40,label="Cleaned",q_range=(0.0, 0.95))
plt.hist(raw_lengths, bins=400, alpha=0.6, label="Raw", color='blue', edgecolor='black')
plt.hist(cleaned_lengths, bins=400, alpha=0.6, label="Cleaned", color='green', edgecolor='black')
plt.xlabel("Text Length")
plt.xlim(0,5000)
plt.yscale('log')
plt.ylabel("Count")
plt.title("Distribution of Text Lengths: Raw vs Cleaned")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
# 使用标准 json 读取 jsonl 文件替代 jsonlines
import json
import matplotlib.pyplot as plt
import numpy as np
import re
from collections import Counter

# 读取文件路径
cleaned_path = f"cleaned_samples_{n}.jsonl"

# 存储各类统计值
content_lengths = []
line_numbers = []
token_lengths = []
non_alpha_fractions = []
unique_words_fractions = []
mean_word_lengths = []
sentence_numbers = []
stop_word_fractions = []
symbol_to_word_ratios = []

# 简单的中文停用词表（可根据需要扩展）
stop_words = set("的了是在和是也就都而及与".split())

# 分句正则
sentence_splitter = re.compile(r'[。！？!?；;]+')

with open(cleaned_path, 'r', encoding='utf-8') as f:
    for line in f:
        obj = json.loads(line)
        text = obj["text"]
        content_lengths.append(len(text))

        lines = text.splitlines()
        line_numbers.append(len(lines))

        tokens = list(text)
        token_lengths.append(len(tokens))

        num_alpha = len([c for c in text if c.isalpha()])
        non_alpha_fractions.append((len(text) - num_alpha) / len(text))

        words = list(text)
        word_count = Counter(words)
        unique_words_fractions.append(len(word_count) / len(words))

        mean_word_lengths.append(np.mean([1 if ord(w) > 255 else 2 for w in words]))  # 粗略估计：英文 2，中文 1

        sentences = sentence_splitter.split(text)
        sentence_numbers.append(len([s for s in sentences if s.strip()]))

        stop_word_count = sum([1 for w in words if w in stop_words])
        stop_word_fractions.append(stop_word_count / len(words))

        symbol_count = len([c for c in text if not c.isalnum()])
        symbol_to_word_ratios.append(symbol_count / len(words))

# 可视化
fig, axes = plt.subplots(2, 3, figsize=(12, 7))
axes = axes.flatten()
metrics = [
    (content_lengths, '(a) Content Length'),
    (line_numbers, '(b) Line Number'),
    (sentence_numbers, '(g) Sentence Number'),
    (non_alpha_fractions, '(d) Non-alpha Fraction'),
    (unique_words_fractions, '(e) Unique Words Fraction'),
    (symbol_to_word_ratios, '(i) Symbol to Word Ratio')
    
]

def plot_trimmed_hist(ax, data, label, bins=40, logy=False, q_range=(0.0, 0.95)):
    # 1. 计算百分位数范围
    lower, upper = np.quantile(data, q_range)

    # 2. 仅绘制位于中间90%区间的数据
    trimmed = [x for x in data if lower <= x <= upper]
    
    # 3. 画图
    ax.hist(trimmed, bins=bins, density=True)
    ax.set_title(label)
    if logy:
        ax.set_yscale('log')

for i, (data, label) in enumerate(metrics):
    if i <3:
        plot_trimmed_hist(axes[i], content_lengths, "(a) Content Length", logy=True)
    else:
        axes[i].hist(data, bins=40, density=True)
        axes[i].set_title(label)

plt.tight_layout()
plt.show()
