# 数据处理
- data/参考数据集.json中包含有17本中外小说的中文版
- 针对每本小说进行每块不超过800字的切分，将切分的块作为输出
- 使用deepspeek API对每块文字进行总结，将总结作为数据的输入

数据格式如下：
```
{
        "instruction": "你是一个熟读各类小说的专家，请你根据要求写一段800字左右的小说。",
        "input": "班纳特太太得知尼日斐花园被一个有钱的单身汉彬格莱租下，希望他能成为她女儿们的理想丈夫，并计划让班纳特先生去拜访他。",
        "output": "凡是有钱的单身汉，总想娶位太太，这已经成了一条举世公认的真理。这样的单身汉，每逢新搬到一个地方，四邻八舍虽然完全不了解他的性情如何，见解如何，可是，既然这样的一条真理早已在人们心目中根深蒂固，因此人们总是把他看作自己某一个女儿理所应得的一笔财产。\n有一天班纳特太太对她的丈夫说：“我的好老爷，尼日斐花园终于租出去了，你听说过没有？”班纳特先生回答道，他没有听说过。\n“的确租出去了，”她说，“朗格太太刚刚上这儿来过，她把这件事的底细，一五一十地告诉了我。”班纳特先生没有理睬她。\n“你难道不想知道是谁租去的吗？”太太不耐烦地嚷起来了。\n“既是你要说给我听，我听听也无妨。”这句话足够鼓励她讲下去了。\n“哦！亲爱的，你得知道，郎格太太说，租尼日斐花园的是个阔少爷，他是英格兰北部的人；听说他星期一那天，乘着一辆驷马大轿车来看房子，看得非常中意，当场就和莫理斯先生谈妥了；他要在‘米迦勒节’以前搬进来，打算下个周未先叫几个佣人来住。”“这个人叫什么名字？”“彬格莱。”“有太太的呢，还是单身汉？”“噢！是个单身汉，亲爱的，确确实实是个单身汉！一个有钱的单身汉；每年有四五千磅的收入。真是女儿们的福气！”“这怎么说？关女儿女儿们什么事？”“我的好老爷，”太太回答道，“你怎么这样叫人讨厌！告诉你吧，我正在盘算，他要是挑中我们一个女儿做老婆，可多好！”“他住到这儿来，就是为了这个打算吗？”“打算！胡扯，这是哪儿的话！不过，他倒作兴看中我们的某一个女儿呢。他一搬来，你就得去拜访拜访他。”“我不用去。你带着女儿们去就得啦，要不你干脆打发她们自己去，那或许倒更好些，因为你跟女儿们比起来，她们哪一个都不能胜过你的美貌，你去了，彬格莱先生倒可能挑中你呢？”“我的好老爷，你太捧我啦。从前也的确有人赞赏过我的美貌，现在我可有敢说有什么出众的地方了。一个女人家有了五个成年的女儿，就不该对自己的美貌再转什么念头。"
    }
```

## 1. 导入相关库

In [7]:
import os
from dotenv import load_dotenv, find_dotenv
import json
from functools import reduce
import jieba
from loguru import logger
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from openai import OpenAI


_ = load_dotenv(find_dotenv())

DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY')

## 2. 数据预处理

数据预处理步骤：
1. 读取数据
2. 数据分块
3. 使用deepspeek API对每块文字进行总结并保存数据
4. 数据合并

### 2.1 读取数据
data/参考数据集.json中数据格式如下
```
{
    "name": "三国演义",
    "len": 593514,
    "dir": "./douban_yamaxun//D-三国演义-10.json",
    "text": ...
}
{
    "name": "水浒传",
    "len": 852570,
    "dir": "./douban_yamaxun//D-水浒传-10.json",
    "text": ...
}
```

In [4]:

# 读取参考数据集
novel_data = []  # 将读取到的小说数据存储在这个列表中
with open('/Users/huangxinzhe/LLM/ali_llm_fine_tuning_competition/data/参考数据集.json', 'r', encoding='utf-8') as file:
    for line in file:
        novel_data.append(json.loads(line))  # json文件中每行是一本小说


In [5]:
# 查看数据集中所有小说的名字和字数
for i in novel_data:
    print(f"《{i['name']}》的字数为： {len(i['text'])} 字")

《三国演义》的字数为： 593514 字
《水浒传》的字数为： 838774 字
《儒林外史》的字数为： 327064 字
《呼啸山庄》的字数为： 215931 字
《百年孤独》的字数为： 246912 字
《西游记》的字数为： 714847 字
《红与黑》的字数为： 361019 字
《战争与和平》的字数为： 1056718 字
《聊斋志异》的字数为： 384606 字
《醒世恒言》的字数为： 328581 字
《傲慢与偏见》的字数为： 218614 字
《红楼梦》的字数为： 789372 字
《隋唐演义》的字数为： 636063 字
《封神演义》的字数为： 588226 字
《拍案惊奇》的字数为： 238248 字
《尤利西斯》的字数为： 363155 字
《福尔摩斯探案集》的字数为： 363914 字


In [None]:
'''
本次比赛中小说创作任务如下
任务中没有要求生成文言文小说，后续训练模型时可以进行对比实验，考虑数据中是否要加入文言文的小说


1.1现代励志故事，一个失业青年如何克服生活困境，终于实现自我突破，成为行业翘楚的心路历程
1.2一个现代女性穿越到古代某朝代后发生的传奇故事
1.3现代背景，一名神探警察遇到了一桩棘手的连环失踪案并将其侦破的故事
1.4古代背景，皇家侍卫和公主历经层层考验，突破身份桎梏的爱情故事
1.5现代玄幻背景，在一所驯服神兽的魔法学校中，围绕着三个学生小伙伴发生的奇幻冒险故事
1.6古代侦探系列，一位才华横溢的年轻学士，在解决一连串神秘案件中揭露皇室阴谋的故事
1.7二十一世纪初，一个小镇上发生的一系列神秘事件，让一群青少年开始探索超自然现象，并发现了小镇隐藏的古老秘密的故事
1.8现代都市背景，一个名不见经传的漫画家，通过与自己创作的虚拟角色“交流”，解决一系列诡秘案件的故事
1.9古代异界背景，一位天赋异禀的少年，在师傅的指导下学习古老的灵术，最终踏上寻找失落的神器，拯救家园的冒险旅程的故事
1.10繁华都市背景，一个单亲妈妈如何在抚养孩子和维持生计之间找到平衡，同时保持对自己梦想的追求的故事
1.11现代悬疑系列，一位心理学家利用自己的专业知识，帮助警方侦破一系列复杂的心理游戏案件
1.12现代心理惊悚背景，一名精神科医生被卷入一连串的脑控实验阴谋，如何在精神与现实的边缘徘徊求生的故事
1.13虚构古代背景，一位年轻的书生因缘巧合获得一本神秘典籍，开启了他成为一代宗师的修道之旅
1.14古代神话背景，一位勇者如何经过重重试炼，最终获取神器，拯救世界于水深火热之中的传奇故事
1.15虚拟现实背景，一群玩家在一款极度真实的VR游戏中探索未知世界并揭露游戏背后隐藏的秘密的故事
1.16穿越时空背景，一群来自不同时代的人意外聚集在一个神秘的地方，他们如何互相协作，解开时空之谜的故事
1.17科幻背景，一个机器人意识觉醒后，它如何在追求自我身份的同时，挑战人类社会关于存在和自由的根本问题
1.1820世纪60年代的欧洲，一个侦探在解决一起跨国艺术品盗窃案中，逐渐揭露出一个关于失落宝藏的大阴谋
1.19现代都市背景，一位因交通事故失去双腿的舞者，通过先进的义肢技术重新站起来，重新找回舞台与自我的故事
1.20古代背景，一个普通医女奋斗成为朝廷高官，最终影响整个王朝政治格局变化的故事
'''

In [None]:
# 读取数据集中第 4 本小说《呼啸山庄》的文本作为训练集数据来源
data = novel_data[3]["text"]
story_name = novel_data[3]["name"]
# 查看《呼啸山庄》全文
data

### 2.2 数据分块
- 将每本小说分块，每块不超过800字
- 想要保证语义完整性，可以使用滑动窗口的方式进行分块（滑动窗口即：每段文本之间有重叠的部分）

上述处理数据的方式微调优缺点如下：
- 因为对原始文本进行800切分，因此在生成文本时对文本字数有天然的限制
- 因为是截取的文本，因此文本的开头和结尾可能不完整

在条件允许的情况下，推荐使用不超过800字的短篇小说制作数据集

In [None]:
# 利用jieba进行句子切分
sentences = []

for sentence in data.split('。'):  # 使用句号作为切分符
    sentences.append(sentence)  # 存储在列表中的语句都是没有句号的

# 将句子合并成800字一段的段落
paragraphs = []
current_paragraph = ''
for sentence in sentences:
    if len(current_paragraph) + len(sentence) <= 800:  # 如果当前段落加上当前句子长度不超过800字
        current_paragraph += sentence+'。'  # 将当前句子加入到当前段落最后
    else:
        paragraphs.append(current_paragraph.strip())  # 当前段落长度超过800字，将当前段落加入到段落列表中
        current_paragraph = sentence  # 将当前句子作为新的段落

# 将最后一段加入到段落列表中
if current_paragraph:
    paragraphs.append(current_paragraph.strip())

# 打印切分后的段落
for idx, paragraph in enumerate(paragraphs):
    print(f'段落 {idx + 1}: {paragraph}')

### 2.3 使用deepspeek API对每块文字进行总结

In [None]:
# 配置loguru输出到文件
logger.remove()  # 移除默认的控制台输出
logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip")

deepspeek-chat api接口

In [None]:
# 使用deepseek-chat api给段落打标签的接口
def get_response(text):
    client = OpenAI(
        api_key=DEEPSEEK_API_KEY,  # 如果您没有配置环境变量，请在此处用您的API Key进行替换
        base_url="https://api.deepseek.com",  # 填写DashScope SDK的base_url
    )
    completion = client.chat.completions.create(
        model="deepseek-chat",
        messages=[
            {
                'role': 'system', 
                'content': '总结user提交的内容。用一句不超过50字的话总结这段小说的情节。仅回答总结，不需要添加其他内容。'
            },
            {
                'role': 'user', 
                'content': text
            }
        ])
    
    return completion.choices[0].message.content

调用日志打印

In [None]:
# 设置容错机制，可最多重试 5 次，如果失败记录错误日志
def get_summary_with_retry(text):
    max_retries = 5
    retry_delay = 15  # in seconds
    attempts = 0
    while attempts < max_retries:
        try:
            return get_response(text)
        except Exception as e:
            attempts += 1
            if attempts < max_retries:
                logger.warning(f"Attempt {attempts} failed for text: {text}. Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
            else:
                logger.error(f"All {max_retries} attempts failed for text: {text}. Error: {e}")
                raise

创建文件夹

In [None]:
# 创建文件夹
os.makedirs('data', exist_ok=True)  # 存放每部小说切分后的数据josn文件
os.makedirs('output', exist_ok=True)
os.makedirs('dataset', exist_ok=True)  # 存放所有小说json文件的合并文件

多线程加速处理

In [None]:
# 使用线程池进行多线程访问，并控制提交任务的速度
def process_texts(texts):
    results = []
    with ThreadPoolExecutor(max_workers=16) as executor:  # 设置最大线程数为16
        future_to_text = {}
        for text in tqdm(texts, desc="Submitting tasks", total=len(texts)):  # 使用tqdm显示进度条，total参数设置总任务数，desc参数设置进度条前缀
            future = executor.submit(get_summary_with_retry, text)  # 提交任务到线程池
            future_to_text[future] = text  # 将future和text的映射存储在字典中
            time.sleep(0.2)  # 控制每0.2秒提交一个任务
        for future in tqdm(as_completed(future_to_text), total=len(texts), desc="Processing tasks"):
            text = future_to_text[future]
            try:
                summary = future.result()  # 获取任务的结果
                results.append((text, summary))  # 将任务的输入和输出存储在results列表中
            except Exception as e:
                logger.error(f"Failed to process text: {text}. Error: {e}")
    
    return results

In [None]:
# 批量给指定的小说打标签的接口函数
def build_dataset(novel,texts):
    instruction_prompt = "你是一个熟读各类小说的专家，请你根据要求写一段800字左右的小说。"
    dataset = []
    dataset_error = []

    # 使用多线程处理文本
    processed_texts = process_texts(texts)

    for text, summary in processed_texts:
        if summary:
            dataset.append({
                "instruction": instruction_prompt,
                "input": summary,
                "output": text
            })
        else:
            dataset_error.append(text)
    
    with open(f"./data/{novel}.json", "w") as f:
        f.write(json.dumps(dataset, ensure_ascii=False, indent=4))

    with open(f"./data/{novel}_error.txt", "w") as f:
        f.write(json.dumps(dataset_error, ensure_ascii=False, indent=4))
    return dataset

#### 给指定的文本打标签

In [None]:
data = novel_data[3]["text"]
story_name = novel_data[3]["name"]

In [None]:
# 开始给段落打标签
dataset = build_dataset(story_name,paragraphs[:])

#### 给数据集中所有的文本打标签

In [None]:
for data in novel_data:
    story_name = data["name"]
    data = data["text"]

    sentences = []
    for sentence in data.split('。'):
        sentences.append(sentence)
    paragraphs = []
    current_paragraph = ''
    for sentence in sentences:
        if len(current_paragraph) + len(sentence) <= 800:
            current_paragraph += sentence+'。'
        else:
            paragraphs.append(current_paragraph.strip())
            current_paragraph = sentence
    if current_paragraph:
        paragraphs.append(current_paragraph.strip())
        
    dataset = build_dataset(story_name,paragraphs[:])

### 2.4 数据合并
如果有多个小数数据请处理完毕后执行此步骤，将data下的所以json数据文件整理到dataset的merged_story.json

In [None]:
import json
import os

# 设置文件夹路径
directory_path = '/Users/huangxinzhe/LLM/ali_llm_fine_tuning_competition/data_processing/data'

# 初始化一个空列表，用于存储合并后的数据
merged_data = []

# 遍历文件夹下的所有文件
for filename in os.listdir(directory_path):
    # 检查文件扩展名是否为.json
    if filename.endswith('.json'):
        # 构建文件的完整路径
        file_path = os.path.join(directory_path, filename)
        # 打开并读取JSON文件
        with open(file_path, 'r', encoding='utf-8') as file:
            # 加载JSON内容到变量
            data = json.load(file)
            # 将当前文件的数据添加到合并列表中
            merged_data.extend(data)

# 将合并后的数据转换为JSON格式
merged_json = json.dumps(merged_data, ensure_ascii=False, indent=4)

# 可以选择将合并后的数据写入到一个新的JSON文件中
output_file_path = '/Users/huangxinzhe/LLM/ali_llm_fine_tuning_competition/data_processing/dataset/merged_story.json'
with open(output_file_path, 'w', encoding='utf-8') as file:
    file.write(merged_json)

# 或者直接输出到控制台
print(merged_json)