In [1]:
%load_ext autoreload
%autoreload 

In [2]:
from pykt.utils.wandb_utils import WandbUtils
import pandas as pd
import os
from tqdm import tqdm_notebook
import yaml
import pandas as pd

In [None]:
key = ""
os.environ['WANDB_API_KEY'] = key
wandb_api = WandbUtils(user='pykt-team', project_name=f'dkt')

In [None]:
model_name="dkt"
dataset_name="assist2015"
check_result_list = wandb_api.check_sweep_by_model_dataset_name(dataset_name, model_name, emb_type="qid")

In [None]:
import re
import copy
import os
from tqdm.auto import tqdm
import json
import string

# You can ref this link, https://github.com/modelscope/data-juicer/blob/main/docs/Operators_ZH.md#filter

from call_gpt import send_chat_request
import concurrent.futures

prompt_check_by_qwen = open("prompt/train_data_clean/gpt4_prompt.md").read()


def check_by_qwen(input_data):
    response_data = send_chat_request(
        system="You are a helpful assistant.",
        examples=[],
        question=prompt_check_by_qwen.replace("{{text}}", input_data["text"]),
        engine="AT:qwen15-110b-chat",
        max_tokens=4096,
        top_p=0.8,
        temperature=0.01,
        stream=False,
        at_url="http://mathbrain-test.tal.com/sse-invoke",
    )
    return response_data


# 并行处理
def check_by_qwen_parallel(input_data_list, n_jobs=5):
    n_jobs = min(n_jobs, len(input_data_list))
    with concurrent.futures.ThreadPoolExecutor(max_workers=n_jobs) as executor:
        result_list = list(
            tqdm(
                executor.map(check_by_qwen, input_data_list),
                total=len(input_data_list),
                miniters=10,
            )
        )
    return result_list

# 二 1.模块出现别的模块内容
def content_out_of_module(input_data):
    for module in [
        "教学内容分析",
        "学情分析",
        "教学目标",
        "教学重难点",
        "教学准备",
        "教学过程",
        "课后作业",
        "教学评价",
        "板书设计",
    ]:
        if module != input_data["module"] and module in input_data["text"]:
            return {
                "is_good": False,
                "info": {
                    "reason": f"{input_data['module']}出现了{module}的内容",
                    "text": input_data["text"],
                },
            }
        if input_data["module"] != "教学重难点" and (
            "<h1>重点</h1>" in input_data["text"]
            or "<h1>难点</h1>" in input_data["text"]
        ):
            return {
                "is_good": False,
                "info": {
                    "reason": f"{input_data['module']}出现了教学重难点的内容",
                    "text": input_data["text"],
                },
            }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}


def check_h1_content(input_data):
    # 使用正则表达式找到所有<h1></h1>标签内的内容
    h1_tags = re.findall(r"<h1>(.*?)</h1>", input_data["text"])

    # 检查每个匹配的内容是否只包含数字
    for content in h1_tags:
        if content.isdigit():
            return {
                "is_good": False,
                "info": {
                    "reason": "标题只有数字",
                    "text": input_data["text"],
                },
            }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}


def has_punctuation_after_h1(input_data):
    # 正则表达式匹配</h1>后直接跟随的非空白字符
    # \S 匹配任何非空白字符
    # [] 中的字符表示标点符号集合
    pattern = re.compile(
        r"</h1>\s*([\．\：\、\，\,\:\;\.\!\?\,\:\;\u3002\uff1b\uff0c\uff1a\uff01\uff1f])"
    )
    matches = pattern.findall(input_data["text"])
    if matches:
        return {
            "is_good": False,
            "info": {"reason": "</h1>跟着标点符号", "text": input_data["text"]},
        }

    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}

# 一 15.只有二级标题，没有一级标题
def check_duplication(input_data):
    if "<h1>" not in input_data["text"] and "<h2>" in input_data["text"]:
        return {
            "is_good": False,
            "info": {"reason": "<h2> exists but not <h1>", "text": input_data["text"]},
        }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}

# 一 16.标题重复
def check_title_level(input_data):
    if input_data["text"].count("重点") >= 2 or input_data["text"].count("难点") >= 2:
        return {
            "is_good": False,
            "info": {"reason": "重复重难点", "text": input_data["text"]},
        }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}


# 一 2.标签只有序号、二 2.省略号 、一 4 模块名称被标题化
def check_by_keywords(input_data):
    check_keywords = {
        "br": ["<br>", "<br/>", "< br >", "< br/>"],
        "学段": ["<h1>学段</h1>", "<h2>学段</h2>"],
        "图片": ["如图所示", "docx", "image", "Image"],
        "内容不全": ["：\n<h2>"],
        "连续逗号": [",,", "，，", ",,,"],
        "标题符号": [
            "</h1><h1>",
            "</h1>\n<h1>",
            "</h2><h2>",
            "</h2>\n<h2>",
            "</h1>\n<h2>",
        ],
        "省略号": ["：……", "\n……", "……\n"],
        "引号":["```"],
        "标题后面有序号":["</h1>：","</h2>："],
        "模块名称以标题的形式出现在内容中":[
        "<h1>教学内容分析</h1>",
        "<h1>学情分析</h1>",
        "<h1>教学目标</h1>",
        "<h1>教学重难点</h1>",
        "<h1>教学准备</h1>",
        "<h1>教学过程</h1>",
        "<h1>课后作业</h1>",
        "<h1>教学评价</h1>",
        "<h1>板书设计</h1>",
    ]
    }

    for reason, keywords in check_keywords.items():
        for keyword in keywords:
            if keyword in input_data["text"]:
                return {
                    "is_good": False,
                    "info": {"reason": reason, "input_data": input_data},
                }

    return {"is_good": True, "info": {"reason": "ok", "text": input_data}}


def check_title(input_data):
    module = input_data['module']
    if module == '教学过程':
        return subtitle_num(input_data)
    else:
        # 一 2.标题标签只有序号
        text = input_data['text']
        h1_contents = re.findall(r'<h1>(.*?)</h1>', text)
        for hc in h1_contents:
            if len(hc) <= 2:
                return {
                    "is_good": False,
                    "info": {"reason": "标题只有序号", "text": input_data["text"]},
                }
        return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}
                
    

# 一 9.只有一个二级标题； 一 8.标题序号不一致
def subtitle_num(input_data):
    text = input_data['text']
    # 使用正则表达式找到所有的h1和其后的h2标签
    pattern = re.compile(r'<h1>(.*?)</h1>(.*?)(?=<h1>|$)', re.DOTALL)
    result = {}
    matches = pattern.findall(text)
    # print('matches: ',matches)
    for match in matches:
        # 清理h1标题
        h1_title = re.sub(r'<.*?>', '', match[0]).strip()
        # 找到所有h2标题
        h2_titles = re.findall(r'<h2>(.*?)</h2>', match[1])
        # 清理h2标题
        h2_titles_cleaned = [re.sub(r'<.*?>', '', h2).strip() for h2 in h2_titles]
        result[h1_title] = h2_titles_cleaned
        if len(h2_titles_cleaned) == 1:
            print(h2_titles_cleaned)
            return {
                    "is_good": False,
                    "info": {"reason": "一级标题下只有一个二级标题", "text": input_data["text"]},
                }
    # 判断标题序号是否规范
    for key,value in result.items():
        if key[0].isdigit():
            # print(key[0])
            return {
                    "is_good": False,
                    "info": {"reason": "一级标题序号不规范，不是汉字", "text": input_data["text"]},
                }
        else:
            for v in value:
                if not v[0].isdigit():
                    # print(v[0])
                    return {
                    "is_good": False,
                    "info": {"reason": "二级标题序号不规范，不是数字", "text": input_data["text"]},
                }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}
    

# 二 4.内容字数过少 
def check_by_length(input_data):
    if len(input_data["text"]) <= 5:
        return {
            "is_good": False,
            "info": {"reason": "短句", "input_data": input_data},
        }
    if len(input_data["text"]) >= 2500:
        return {
            "is_good": False,
            "info": {"reason": "长句", "input_data": input_data},
        }

    return {"is_good": True, "info": {"reason": "ok", "text": input_data}}

# 一 13.删掉时间安排
def delete_time(input_data):
    text = input_data["text"]
    text_without_time = re.sub(r'\（\d+分钟\）', '', text)
    text_without_time = re.sub(r'\（\d+min\）', '', text_without_time)
    # print(text_without_time)
    return {"is_good": True, "info": {"reason": "ok", "text": text_without_time}}


# 一 16.标题重复； 一 12.二级标题过长 一 11.子标题重复
def title_duplication(input_data):

    text = input_data["text"]
    text = re.sub(r'[一二三四五六七八九十]+、', '', text)
    h1_contents = re.findall(r'<h1>(.*?)</h1>', text)
    h2_contents = re.findall(r'<h2>(.*?)</h2>', text)
    
    # 二级标题过长
    for hc in h2_contents:
        if len(hc) >= 20:
            return {
                    "is_good": False,
                    "info": {"reason": "二级标题过长", "text": input_data["text"]},
                }

    all_headers = h1_contents + h2_contents

    # 去除数字和标点符号
    cleaned_headers = [re.sub(r'[^\w\s]', '', header) for header in all_headers]  # 去除标点
    cleaned_headers = [re.sub(r'\d+', '', header) for header in cleaned_headers]  # 去除数字

    unique_headers = set()
    duplicates = set()

    for header in cleaned_headers:
        if header in unique_headers:
            return {
                    "is_good": False,
                    "info": {"reason": "标题重复", "text": input_data["text"]},
                }
        else:
            unique_headers.add(header)
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}

# 二 3.教学过程内容重复
def process_duplication(input_data):
    module = input_data['module']
    if module != '教学过程':
        return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}
    pattern = re.compile(r'</h2>(.*?)<h1>', re.DOTALL)
    matches = pattern.findall(input_data["text"])

    # 移除匹配项周围的空白字符
    matches = [match.strip() for match in matches]
    punctuation = re.escape(string.punctuation)
    # 去掉所有数字和标点符号
    matches = [re.sub(r'[0-9\s'+ punctuation +']+', '', match) for match in matches]
    # 查找重复的内容
    # unique_matches = set()
    # duplicates = set()
    count = 1
    for match in matches:
        print('match: ', match)
        count = re.sub(r'[0-9\s'+ punctuation +']+', '', input_data["text"]).count(match)
        if count >= 2:
            return {
                "is_good": False,
                "info": {"reason": "内容重复", "text": input_data["text"]},
            }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}


# 三：教学过程只有标题，没有内容
# def has_null_after_title(input_data):
#     if "<h1>" in input_data["text"]:
#         h1_tags = re.findall(r"</h1>(.{1,10})", input_data["text"])
#         # print(f"h1_tags: {h1_tags}")
#         if not h1_tag:
#             return{
#                 "is_good": False,
#                 "info": {
#                     "reason": "一级标题只包含标题，不包含内容",
#                     "text": input_data["text"]
#                 }
#             }
#         if "<h2>" in input_data["text"]:
#             h2_tags_1 = re.findall(r"</h2>(.{1,10})<h1>", input_data["text"])
#             h2_tags_2 = re.findall(r"</h2>(.{1,10})<h2>", input_data["text"])
#             h2_tags_3 = re.findall(r"</h2>(.{1,10})", input_data["text"])
#             for h2_tags1, h2_tags2, h2_tags3 in zip(h2_tags_1, h2_tags_2, h2_tags_3):
#                 if not h2_tags1 or not h2_tags2 or not h2_tags3:
#                     return{
#                         "is_good": False,
#                         "info": {
#                             "reason": "二级标题只包含标题，不包含内容",
#                             "text": input_data["text"]
#                         }
#                     }
#     return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}


# 判断文本中是否重复出现标题中的内容
def check_title_duplication(input_data):
    h1_tags = re.findall(r"<h1>(.*?)</h1>", input_data["text"])
    h2_tags = re.findall(r"<h2>(.*?)</h2>", input_data["text"])
    for h1_tag, h2_tag in zip(h1_tags, h2_tags):
        if input_data["text"].count(h1_tag) > 1:
            return {
                "is_good": False,
                "info": {
                    "reason": "一级标题重复",
                    "text": input_data["text"]
                }
            }
        if input_data["text"].count(h2_tag) > 1:
            return {
                "is_good": False,
                "info": {
                    "reason": "二级标题重复",
                    "text": input_data["text"]
                }
            }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}


# 标题内容不明确
def check_title_content(input_data):
    h1_tags = re.findall(r"<h1>(.*?)</h1>", input_data["text"])
    h2_tags = re.findall(r"<h2>(.*?)</h2>", input_data["text"])
    patterns = [r"教学目标\d?", r"学情分析\d?", r"教学内容分析\d?", r"教学准备\d?", r"课后作业\d?", r"教学评价\d?", r"板书设计\d?"]
    for pattern in patterns:
        for tag in h1_tags + h2_tags:
            if re.search(pattern, tag):
                return {
                    "is_good": False,
                    "info": {
                        "reason": "标题内容不明确",
                        "text": input_data["text"]
                    }
                }
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}



# 检测换行符
def check_line_break(input_data):
    patterns = r"(?<=\n)[^1]+\."
    
    if re.search(r"[^1]+\.", input_data["text"]):
        if not re.search(patterns, input_data["text"]):
            return {
                "is_good": False,
                "info": {
                    "reason": "不存在换行符",
                    "text": input_data["text"]
                }
            }
            
    return {"is_good": True, "info": {"reason": "ok", "text": input_data["text"]}}



def is_good_sentence(input_data):

    checks = [
        check_by_keywords,
        check_by_length,
        check_duplication,
        check_title_level,
        check_h1_content,
        has_punctuation_after_h1,
        content_out_of_module,
        process_duplication,
        title_duplication,
        delete_time,
        subtitle_num,
        has_null_after_title,
        check_title_duplication,
        check_title_content,
        check_line_break
    ]

    for check in checks:
        result = check(input_data)
        if not result["is_good"]:
            return result
    return {"is_good": True, "info": {"reason": "ok", "text": input_data}}



if __name__ == "__main__":

    test_data = json.load(open("tests/test_data/data_filter_test_cases.json")) * 100
    result_list = check_by_qwen_parallel(test_data, n_jobs=60)
    print(result_list)
    # for input_data in tqdm(test_data):

    #     result = check_by_qwen(input_data)
    #     print(result)
