# MateConv mini数据预处理

- Step 1.导入库

In [1]:
import itertools
import re
import json
import jsonlines
import psutil
import ujson
import numpy as np
import pandas as pd
from transformers import AutoTokenizer
from datasets import load_dataset
import os
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


- Step 2.定义BOS和EOS标记，并加载分词器

In [2]:
# 定义BOS和EOS标记
bos_token = "<s>"
eos_token = "</s>"

In [3]:
# 加载训练好的分词器路径
tokenizer = AutoTokenizer.from_pretrained('/root/autodl-tmp/MateGenConv/model/mateconv_tokenizer', use_fast=False)
print(f'加载的tokenizer词表大小: {len(tokenizer)}')

加载的tokenizer词表大小: 6400


- Step 3.读取部分数据

In [4]:
def preview_dataset(file_path, num_lines=5):
    """
    读取并展示数据集的前 num_lines 行
    """
    # 检查文件是否存在
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"{file_path} 文件不存在，请检查路径！")

    # 逐行读取并展示前 num_lines 行
    with jsonlines.open(file_path) as reader:
        for idx, obj in enumerate(reader):
            print(f"第 {idx + 1} 行数据: {obj}")
            if idx + 1 >= num_lines:
                break

# 指定文件路径和需要展示的行数
file_path = './dataset/Data/seq-monkey/mobvoi_seq_monkey_general_open_corpus.jsonl'
preview_dataset(file_path, num_lines=5)

第 1 行数据: {'text': '在查处虚开增值税专用发票案件中，常常涉及进项留抵税额和税款损失的认定和处理。在计算税款损失时，要不要将进项留抵税额包括在内？\n对此，实务中存在意见分歧。\n有人主张归并，即计算税款损失时包括进项留抵税额；\n有人主张剥离，即计算税款损失时剔除进项留抵税额。分析这个问题，需要确定进项留抵税额与税款损失之间是什么关系。\n理清这二者之间的关系，首先需要了解增值税的概念和其抵扣机制。增值税是以商品（货物、服务等）在流转过程中产生的增值额作为计税依据而征收的一种流转税。为避免重复征税，在增值税中存在抵扣链条机制。\n一般而言，交易上游企业缴纳的税额，交易下游企业可以对相应的税额进行抵扣。\n对增值税一般纳税人来说，其购进货物、服务等取得增值税专用发票，发票上的税额是进项税额。\n其出售货物、服务等，向购买方开具增值税专用发票，发票的税额是销项税额。\n一般情况下，销项税额减去进项税额的金额是应纳税额，企业根据应纳税额按期申报纳税。\n其次需要了解进项留抵税额的概念及产生原因。\n在计算销项税额和进项税额的差额时，有时会出现负数，即当期进项税额大于当期销项税额。这个差额在当期未实现抵扣，为进项留抵税额，在以后纳税人有销项税额时再进行抵扣。\n企业产生进项留抵税额的主要原因是其进项税额和销项税额时间上的不一致。\n例如，企业前期集中采购货物和服务，投资大，销项税率低于进项税率等。\n从税款抵扣的角度看，进项留抵税额只是购进的这部分进项税额参与到增值税应纳税额的计算过程中，但是其对应的进项税额抵扣还未真正实现，一般要等到其未来有相应的销项税额时，才能真正实现进项税额抵扣。\n可见，进项留抵税额处于不确定状态，能否抵扣受到很多因素影响，例如企业经营中断，没有销项税额，这时进项留抵税额就无法实现抵扣。但如果企业按照税收政策规定申请进项留抵退税，进项税额抵扣就随之实现。\n最后需要了解税款损失的概念。\n税款损失，通常是指因虚开增值税专用发票，导致国家税款被骗或者流失的金额。关于税款损失，实务中有多种表述。\n例如，北京大学法学院教授陈兴良曾谈到虚开行为本身不会造成国家税款损失，只有利用发票抵扣时才会造成国家税款损失。刘兵等编著的《虚开增值税专用发票案例司法观点和案例解析》一书中提到：“给国家税款造成损失的数额，实际上就是被骗取的国家税款在侦查终

- Step 4.统计与清理数据

In [5]:
def get_total_lines(file_path):
    """
    获取 JSONL 文件的总行数，不忽略错误，保证能够全面统计。
    """
    with open(file_path, 'rb') as f:  # 使用二进制模式避免编码问题
        return sum(1 for _ in f)

In [6]:
def check_jsonl_format(file_path):
    """
    检查 JSONL 文件中的每一行是否是有效的 JSON 格式，带进度显示，并统计所有有问题的行。
    """
    total_lines = get_total_lines(file_path)  # 获取文件总行数
    valid_lines = 0
    invalid_lines = 0

    # 使用逐行读取，捕获 JSON 和编码错误
    with open(file_path, 'rb') as f:  # 使用二进制读取避免编码问题
        # 使用 tqdm 进度条显示检查进度
        for idx, line in tqdm(enumerate(f), total=total_lines, desc="Checking JSONL format"):
            try:
                # 先尝试将每行数据解码为 UTF-8
                decoded_line = line.decode('utf-8')
                # 然后检查是否是有效的 JSON 格式
                obj = jsonlines.Reader([decoded_line]).read()
                valid_lines += 1
            except UnicodeDecodeError as e:
                print(f"Encoding error at line {idx + 1}: {e}")
                invalid_lines += 1
            except jsonlines.InvalidLineError as e:
                print(f"Invalid JSON at line {idx + 1}: {e}")
                invalid_lines += 1

    print(f"检查完成，文件中共有 {valid_lines} 行有效的 JSON 数据，{invalid_lines} 行无效的 JSON 数据。")
    return valid_lines, invalid_lines

In [7]:
valid_lines, invalid_lines = check_jsonl_format(file_path)

Checking JSONL format: 100%|██████████| 13000000/13000000 [02:29<00:00, 86999.20it/s]

检查完成，文件中共有 13000000 行有效的 JSON 数据，0 行无效的 JSON 数据。





In [9]:
def remove_invalid_line(file_path, output_path, invalid_line_num):
    """
    读取文件，跳过指定的无效行，并将结果写入新文件
    """
    with open(file_path, 'rb') as infile, open(output_path, 'wb') as outfile:
        for idx, line in enumerate(infile):
            if idx + 1 != invalid_line_num:  # 跳过无效行
                outfile.write(line)

# 使用该函数删除第 9598787 行并保存为新文件, 第二次开始就不会有无效行，不需要调用
# remove_invalid_line('./dataset/Data/seq-monkey/mobvoi_seq_monkey_general_open_corpus.jsonl',
#                     './dataset/Data/seq-monkey/mobvoi_seq_monkey_general_open_corpus.jsonl', 
#                     invalid_line_num=9598787)

- Step 5.定义处理函数（逐块处理数据）

In [16]:
def process_seq_monkey(chunk_size=50000):
    """
    逐块读取 mobvoi_seq_monkey_general_open_corpus.jsonl 文件，
    对文本进行分词，并将分词结果保存为二进制文件，支持跳过无效行，并显示处理进度。
    """
    doc_ids = []
    chunk_idx = 0
    total_lines = 0

    # 先计算总行数以便显示进度
    # 注意调整成你自己的目录
    with open('./dataset/Data/seq-monkey/mobvoi_seq_monkey_general_open_corpus.jsonl', 'r', encoding='utf-8') as f:
        total_lines = sum(1 for _ in f)

    # 打开jsonlines文件逐行读取
    with jsonlines.open('./dataset/Data/seq-monkey/mobvoi_seq_monkey_general_open_corpus.jsonl') as reader:
        # 使用 tqdm 进度条显示进度
        with tqdm(total=total_lines, desc="Processing lines") as pbar:
            while True:
                try:
                    # 使用 itertools.islice 按块读取文件，每次读取 chunk_size 行数据
                    chunk = list(itertools.islice(reader, chunk_size))
                except jsonlines.InvalidLineError as e:
                    print(f"Skipping invalid chunk at chunk {chunk_idx}: {e}")
                    continue

                if not chunk:  # 如果读取到文件末尾，则停止
                    break

                # 遍历块中的每一行数据
                # 逐行对数据进行编码（按token进行编码）
                for idx, obj in enumerate(chunk):
                    try:
                        # 从每一行数据中提取'text'字段（即文本内容）
                        content = obj.get('text', '')
                        
                        # 跳过长度超过512的文本
                        if len(content) > 512:
                            continue

                        # 对文本进行分词，将其转为 token ids 序列，并加上BOS和EOS标记
                        text_id = tokenizer(f'{bos_token}{content}{eos_token}').data['input_ids']
                        
                        # 将分词结果添加到 doc_ids 列表中
                        doc_ids += text_id

                    except UnicodeDecodeError as e:
                        # 如果遇到编码错误，跳过该行，并打印错误信息
                        print(f"Skipping invalid line {chunk_idx * chunk_size + idx + 1}: {e}")
                        continue

                # 每处理完一块数据，更新 chunk_idx 并打印进度信息
                chunk_idx += 1
                pbar.update(len(chunk))  # 更新进度条

                # 如果累积的 token ids 超过 1,000,000 个，保存到文件中
                if len(doc_ids) > 1000000:
                    arr = np.array(doc_ids, dtype=np.uint16)
                    with open(f'./dataset/Data/seq-monkey/clean_seq_monkey.bin', 'ab') as f:
                        f.write(arr.tobytes())
                    doc_ids = []

    # 如果处理完所有数据后 doc_ids 中还有未保存的内容，最后再保存一次
    if doc_ids:
        arr = np.array(doc_ids, dtype=np.uint16)
        with open(f'./dataset/Data/seq-monkey/clean_seq_monkey.bin', 'ab') as f:
            f.write(arr.tobytes())

In [17]:
def pretrain_process():
    """
    函数的作用是调用 process_seq_monkey() 函数生成数据，
    然后整合所有生成的二进制文件，并将其合并保存为一个总的预训练数据文件。
    """
    # 调用 process_seq_monkey 函数处理数据
    process_seq_monkey()

    # 数据文件路径列表，目前只处理 clean_seq_monkey.bin 文件
    data_path_list = [
        './dataset/Data/seq-monkey/clean_seq_monkey.bin'
    ]
    
    data_lst = []
    
    # 读取生成的二进制文件
    for data_path in data_path_list:
        with open(data_path, 'rb') as f:
            # 将二进制文件中的内容加载到 numpy 数组中
            data = np.fromfile(f, dtype=np.uint16)
            data_lst.append(data)

    # 将所有读取到的数据合并为一个大数组
    arr = np.concatenate(data_lst)
    print(f"合并后的数据大小: {arr.shape}")

    # 将合并后的数据保存为最终的预训练数据文件
    with open('./dataset/Data/seq-monkey/pretrain_data.bin', 'wb') as f:
        f.write(arr.tobytes())

- 运行数据处理

【TIME WARNING：45分钟左右】

In [18]:
pretrain_process()

Processing lines: 100%|██████████| 13000000/13000000 [39:51<00:00, 5436.78it/s]


合并后的数据大小: (1510396873,)


运行结束后会创建一个名为pretrain_data.bin的二进制数据文件，该文件也就是接下来进行模型预训练的文件：

检查是否存在tokenizer长度、unk覆盖率高于1%/5%，以及长度截断问题（trunc_rate>30%)

In [1]:
import numpy as np
arr = np.memmap("./dataset/Data/seq-monkey/pretrain_data.bin", dtype=np.uint16, mode="r")
print("max_token_id_in_bin =", int(arr.max()))  # 必须 < 6400

max_token_id_in_bin = 6399


In [2]:
from transformers import AutoTokenizer

# 修改为你的 tokenizer 路径或名称
tokenizer = AutoTokenizer.from_pretrained("./model/mateconv_tokenizer")

print("vocab_size =", tokenizer.vocab_size)
print("special_tokens =", tokenizer.special_tokens_map)
print("unk_token_id =", tokenizer.unk_token_id)

  from .autonotebook import tqdm as notebook_tqdm


vocab_size = 6400
special_tokens = {'bos_token': '<s>', 'eos_token': '</s>', 'unk_token': '<unk>'}
unk_token_id = 0


In [6]:
import os, json, random, glob

def read_jsonl(path, max_lines=200000):
    """安全读取 jsonl（容错），最多读 max_lines 行避免大文件过慢。"""
    data = []
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for i, line in enumerate(f):
            if i >= max_lines: break
            line = line.strip()
            if not line: continue
            try:
                data.append(json.loads(line))
            except Exception:
                continue
    return data

def record_to_text(rec):
    """
    尝试从一条样本中生成用于分词覆盖度检查的原始文本。
    兼容几种常见 schema：
      1) {"text": "..."}
      2) {"instruction": "...", "input": "...", "output": "..."}
      3) {"prompt": "...", "response": "..."}
      4) {"messages":[{"role":"user","content":"..."},{"role":"assistant","content":"..."}]}
    """
    # 1) 纯 text
    if isinstance(rec, dict) and "text" in rec and isinstance(rec["text"], str):
        return rec["text"]

    # 2) 指令数据（instruction / input / output）
    ins = rec.get("instruction") if isinstance(rec, dict) else None
    inp = rec.get("input") if isinstance(rec, dict) else None
    out = rec.get("output") if isinstance(rec, dict) else None
    if any([ins, inp, out]):
        parts = []
        if ins: parts.append(f"指令：{ins}")
        if inp: parts.append(f"输入：{inp}")
        if out: parts.append(f"回答：{out}")
        return "\n".join(parts)

    # 3) prompt/response
    prm = rec.get("prompt") if isinstance(rec, dict) else None
    rsp = rec.get("response") if isinstance(rec, dict) else None
    if prm or rsp:
        return f"用户：{prm or ''}\n助手：{rsp or ''}"

    # 4) chat messages
    msgs = rec.get("messages") if isinstance(rec, dict) else None
    if isinstance(msgs, list):
        lines = []
        for m in msgs:
            role = m.get("role", "")
            content = m.get("content", "")
            if isinstance(content, list):
                # 有些数据把 content 做成多段
                content = " ".join([c.get("text","") if isinstance(c, dict) else str(c) for c in content])
            lines.append(f"{role}：{content}")
        return "\n".join(lines)

    # 兜底：把所有字符串字段拼一拼
    if isinstance(rec, dict):
        texts = []
        for k, v in rec.items():
            if isinstance(v, str) and v.strip():
                texts.append(v.strip())
        if texts:
            return "\n".join(texts)

    return None

def gather_texts_from_dir(dir_or_file, sample_size=2000, file_glob="*.jsonl"):
    """从单个 jsonl 或目录里的多份 jsonl 抽样生成 your_text_list。"""
    paths = []
    if os.path.isdir(dir_or_file):
        paths = sorted(glob.glob(os.path.join(dir_or_file, file_glob)))
    else:
        paths = [dir_or_file]

    records = []
    for p in paths:
        recs = read_jsonl(p, max_lines=200000)  # 每个文件最多读取 20w 行做候选
        records.extend(recs)

    random.shuffle(records)
    texts = []
    for rec in records:
        t = record_to_text(rec)
        if t:
            # 简单清理一下超长文本，避免统计过慢
            if len(t) > 8000:
                t = t[:8000]
            texts.append(t)
            if len(texts) >= sample_size:
                break
    return texts

# === 修改这里为你的路径 ===
# 例1：目录里是一堆 *.jsonl
DATA_PATH = "./dataset/Data/seq-monkey/mobvoi_seq_monkey_general_open_corpus.jsonl"

your_text_list = gather_texts_from_dir(DATA_PATH, sample_size=2000, file_glob="*.jsonl")
print(f"Collected texts: {len(your_text_list)}")
print("Sample preview:\n", your_text_list[0][:200] if your_text_list else "EMPTY")


Collected texts: 2000
Sample preview:
 该项目的投运是郑州航空港区全面实现集中供暖的里程碑式跨越，将打破区域内无大型集中热源的困境，解决区内2500万平米用热问题。
航空港区集中供热项目一期总投资20.89亿元，包含供热长输管线、隔压换热站、区域内供热市政管网三大部分，项目建设范围跨越新密市、新郑市、航空港区三地六村镇，涉及各领域、各层级协调部门达50多个。建设期内克服疫情反复、扬尘管控、特大暴雨灾害等诸多不利因素，按期完成长输管线、隔


In [7]:
from datasets import load_dataset
unk_id = tokenizer.unk_token_id
def unk_rate(texts, n=2000):
    import numpy as np
    texts = texts[:n]
    ids = tokenizer(texts, truncation=True, max_length=2048)["input_ids"]
    tot = sum(len(x) for x in ids)
    unk = sum((np.array(x)==unk_id).sum() for x in ids)
    return unk/tot
print("UNK rate ≈", unk_rate(your_text_list))

UNK rate ≈ 0.0


In [8]:
def stats(texts, n=2000):
    enc = tokenizer(texts[:n], truncation=True, max_length=2048)
    lens = [len(x) for x in enc["input_ids"]]
    trunc = sum(l==2048 for l in lens)/len(lens)
    return sum(lens)/len(lens), trunc
avg_len, trunc_rate = stats(your_text_list)
print("avg toks/sample =", avg_len, "trunc_rate =", trunc_rate)

avg toks/sample = 744.8865 trunc_rate = 0.071


In [1]:
import os
import numpy as np
from math import floor

path = './dataset/Data/seq-monkey/pretrain_data.bin'  # 改为你的路径
size = os.path.getsize(path)
tokens = size // 2

# LMConfig-like params（改这里以匹配你的设置）
max_seq_len = 512
batch_size = 32
epochs = 1
n_routed_experts = 4
num_experts_per_tok = 2

sequences = tokens // max_seq_len
steps_per_epoch = sequences // batch_size
total_tokens = tokens * epochs
tokens_per_expert = total_tokens * (num_experts_per_tok / n_routed_experts)

print(f'bytes: {size}')
print(f'tokens  : {tokens:,}')
print(f'sequences(~{max_seq_len}): {sequences:,}')
print(f'steps/epoch (batch {batch_size}): {steps_per_epoch:,}')
print(f'total tokens (epochs={epochs}): {total_tokens:,}')
print(f'tokens per expert (n_experts={n_routed_experts}, num_per_tok={num_experts_per_tok}): {tokens_per_expert:,}')

bytes: 3020793746
tokens  : 1,510,396,873
sequences(~512): 2,949,993
steps/epoch (batch 32): 92,187
total tokens (epochs=1): 1,510,396,873
tokens per expert (n_experts=4, num_per_tok=2): 755,198,436.5
