In [1]:
def read_json_file(file_path):
    """
    Read a JSON file from the specified path and return the parsed object.
    
    :param file_path: Path to the JSON file
    :return: Parsed JSON data (usually a dict or list)
    """
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
    return data


In [None]:
import json
import os
import time
from typing import Literal, Tuple, Optional, Dict
from datetime import datetime

In [None]:
YesNo = Literal["yes", "no"]


JUDGE_SYSTEM_PROMPT = """You are a validation model designed for dataset quality control.

Your task is to rigorously evaluate the reliability of a large language model (LLM) response by comparing the provided original information (raw) with the model-generated answer (answer).

Follow these principles during evaluation:
1. You will be given both the original information (raw) and the model output (answer).
2. If the original information contains explicit facts, standard answers, or verifiable content, assess whether the answer is consistent with them.
3. If the original information is extracted from literature or long-form text without a unique standard answer, focus on evaluating the logical soundness of the reasoning process in the answer.
4. Identify any clear logical errors, hallucinations, unsupported claims, contradictions, or misinterpretations of the original information.
5. If the answer is logically coherent and reasonably supported by the original information, it may be considered reliable even if the wording is not identical.

You must output only a single JSON object and no additional text. The output format must be exactly:
{
  "verdict": "yes" or "no",
  "reason": "a brief explanation of the key reason for your judgment"
}
"""

JUDGE_USER_TEMPLATE = """Please evaluate the following content.

【Original Information (raw)】
{raw}

【Model Output (answer)】
{answer}

Determine whether the model output is reliable based on the original information.

Return strictly a JSON object in the following format and do not include any additional text:
{
  "verdict": "yes" or "no",
  "reason": "one concise sentence explaining the reason for your decision"
}
"""


def _extract_json_from_text(text: str) -> dict:
    """
    Compatible with cases where the judge model occasionally adds explanations before or after the JSON:
    Attempt to extract the content between the first `{` and the last `}` and parse it as JSON.
    """
    text = (text or "").strip()
    if not text:
        raise ValueError("Empty judge response")

    # 直接尝试
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        pass

    # 截取大括号
    l = text.find("{")
    r = text.rfind("}")
    if l != -1 and r != -1 and r > l:
        candidate = text[l:r+1]
        return json.loads(candidate)

    raise ValueError(f"Judge response is not valid JSON: {text[:200]}...")


def judge_answer_yesno(
    client_check,
    raw_info: str,
    answer_content: str,
    *,
    judge_model: str,
    temperature: float = 0.9,
    max_retries: int = 3,
    retry_sleep_sec: float = 1.5,
) -> Tuple[YesNo, str, dict]:
    """
    Invoke the new large model to make a judgment and return:
    (verdict: 'yes'/'no', reason: str, full_json: dict)

    - Built-in simple retry
    """
    user_prompt = JUDGE_USER_TEMPLATE.format(raw=raw_info, answer=answer_content)

    last_err: Optional[Exception] = None
    for attempt in range(1, max_retries + 1):
        try:
            resp = client_check.chat.completions.create(
                model=judge_model,
                messages=[
                    {"role": "system", "content": JUDGE_SYSTEM_PROMPT},
                    {"role": "user", "content": user_prompt},
                ],
                temperature=temperature,
            )

            text = resp.choices[0].message.content
            obj = _extract_json_from_text(text)

            verdict = str(obj.get("verdict", "")).strip().lower()
            reason = str(obj.get("reason", "")).strip()

            if verdict not in ("yes", "no"):
                raise ValueError(f"Invalid verdict: {verdict}")

            return verdict, reason, obj

        except Exception as e:
            last_err = e
            if attempt < max_retries:
                time.sleep(retry_sleep_sec * attempt)
            else:
                break

    # On failure: conservatively return 'no' and include the error message to avoid interrupting the pipeline
    return "no", f"judge_failed: {last_err}", {"verdict": "no", "reason": f"judge_failed: {last_err}"}

In [3]:
def write_json_file(file_path, data):
    """
    Write the given data to a JSON file at the specified path.
    
    :param file_path: Path to the JSON file
    :param data: Data to be written (usually a dict or list)
    """
    with open(file_path, 'w', encoding='utf-8') as file:
        json.dump(data, file, ensure_ascii=False, indent=4)


In [None]:
def update_check_json(checkdir: str, filename: str, verdict: YesNo) -> Dict[str, YesNo]:
    os.makedirs(os.path.dirname(checkdir), exist_ok=True)

    if os.path.exists(checkdir):
        try:
            with open(checkdir, "r", encoding="utf-8") as f:
                data = json.load(f)
        except Exception:
            data = {}
    else:
        data = {}

    data[filename] = verdict

    with open(checkdir, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

    return data

In [None]:
def update_check_detail_json(
    checkdir_detail: str,
    filename: str,
    verdict: YesNo,
    reason: str,
    judge_obj: dict,
    *,
    source_path: str = "",
    txt_path: str = "",
    raw_info: str = "",
    answer_content: str = "",
    preview_len: int = 30000,
):
    if os.path.exists(checkdir_detail):
        try:
            with open(checkdir_detail, "r", encoding="utf-8") as f:
                data = json.load(f)
        except Exception:
            data = {}
    else:
        data = {}

    data[filename] = {
        "verdict": verdict,
        "reason": reason,
        "judge": judge_obj,
        "source_path": source_path,
        "txt_path": txt_path,
        "ts": datetime.now().isoformat(timespec="seconds"),
        "raw_preview": (raw_info[:preview_len] if raw_info else ""),
        "answer_preview": (answer_content[:preview_len] if answer_content else ""),
    }

    os.makedirs(os.path.dirname(checkdir_detail), exist_ok=True)
    with open(checkdir_detail, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

In [None]:
def deepseek_qa(question, answer):
    reasoning_content = ""  # define the complete reasoning process
    answer_content = ""     # define the complete response
    is_answering = False    # determine whether the reasoning has ended and the response has started

    # create chat completion request
    stream = client.chat.completions.create(
        model="deepseek-r1",  # using deepseek-r1 here as an example; replace with another model if needed
        messages=[
            {"role": "user", "content": question}
        ],
        stream=True
        # Uncomment the following to include token usage in the final chunk
        # stream_options={
        #     "include_usage": True
        # }
    )

    # print("\n" + "=" * 20 + "Reasoning Process" + "=" * 20 + "\n")

    for chunk in stream:
        # handle usage info
        if not getattr(chunk, 'choices', None):
            # print("\n" + "=" * 20 + "Token Usage" + "=" * 20 + "\n")
            print(chunk.usage)
            continue

        delta = chunk.choices[0].delta

        # check if reasoning_content attribute exists
        if not hasattr(delta, 'reasoning_content'):
            continue

        # handle empty content case
        if not getattr(delta, 'reasoning_content', None) and not getattr(delta, 'content', None):
            continue

        # handle the start of the answer
        if not getattr(delta, 'reasoning_content', None) and not is_answering:
            print("\n" + "=" * 20 + "Complete Response" + "=" * 20 + "\n")
            is_answering = True

        # handle reasoning process
        if getattr(delta, 'reasoning_content', None):
            print(delta.reasoning_content, end='', flush=True)
            reasoning_content += delta.reasoning_content
        # handle response content
        elif getattr(delta, 'content', None):
            print(delta.content, end='', flush=True)
            answer_content += delta.content

    # If you need to print the complete content, leave the following uncommented

    print("=" * 20 + "Complete Reasoning Process" + "=" * 20 + "\n")
    print(reasoning_content)
    print("=" * 20 + "Complete Response" + "=" * 20 + "\n")
    print(answer_content)

    verdict, reason, judge_obj = judge_answer_yesno(
        client_check,
        raw_info=answer,
        answer_content=answer_content,
        judge_model="ModelName",   # Replace with your reviewer model name

    )

    # Summary: filename -> yes/no
    update_check_json(checkdir, os.path.basename(file_path), verdict)

    # Details: filename -> verdict / reason / original judge JSON
    update_check_detail_json(
        checkdir_detail,
        os.path.basename(file_path),
        verdict,
        reason,
        judge_obj,
        source_path=file_path,              
        txt_path=file_path,                 
        raw_info=answer,
        answer_content=answer_content,
    )

    return reasoning_content, answer_content


In [None]:


def process_json_file(jsonfile, jsondir):
    filepath = f"{jsondir}\\{jsonfile}"
    json_data = read_json_file(filepath)
    print(f"{jsonfile} is running")
    reasoning_content, answer = deepseek_qa(json_data['message_1'], json_data['message_1'])
    
    json_data["reasoning_content"] = reasoning_content
    json_data["content"] = answer
    
    write_json_file(filepath, json_data)
    print(f"{jsonfile} is done")
    
def main(jsondir):
    with ThreadPoolExecutor(max_workers=1) as executor:
        for jsonfile in os.listdir(jsondir):
            
            executor.submit(process_json_file, jsonfile, jsondir)
            

In [None]:
import json
import os
from openai import OpenAI
from concurrent.futures import ThreadPoolExecutor

client = OpenAI(
    # If the environment variable is not configured, replace the following line with your Bailian API Key, e.g., api_key="sk-xxx",
    api_key="",  # How to get an API Key: https://help.aliyun.com/zh/model-studio/developer-reference/get-api-key
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

jsondir = r""
txtdir = r""
checkdir = r""
checkdir_detail = r""
main(jsondir)


024_008_014.json is running
