In [1]:
def pack_content(prompt, images):
    image_list = images or [] 
    content = [
        {"type": "image_url", "image_url": img_url}
        for img_url in image_list
    ] + [
        {"type": "text", "text": prompt}
    ]
    return content

def openai_pack_content(prompt, images):
    image_list = images or []
    content = [
        {"type": "image_url", "image_url": {
            "url": img_url,
            "detail": "auto"
        }}
        for img_url in image_list
    ] + [
        {"type": "text", "text": prompt}
    ]
    return content

In [None]:
from openai import OpenAI
import asyncio
import time
import os
import json
import pickle
import base64
from io import BytesIO
from PIL import Image
import numpy as np
from openai import AsyncOpenAI, APIConnectionError, InternalServerError
from asyncio import as_completed
from tqdm import tqdm
import httpx

import logging
from datetime import datetime, timezone, timedelta

dashscope_api_key = os.getenv("DASHSCOPE_API_KEY") or "1"
vl_model = "qwen3-vl-plus"
text_model = "qwen-plus"
dashscope_client = AsyncOpenAI(
    api_key=dashscope_api_key,
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",timeout=120.0)


local_text_api_key = "xxx"
local_text_model = "xxx"

local_text_client = AsyncOpenAI(
    api_key=local_text_api_key,
    base_url="xxx", 
    timeout=120.0
)

local_vl_api_key = "xxx"
local_vl_model = "xxx"

local_vl_client = AsyncOpenAI(
    api_key=local_vl_api_key,
    base_url="xxx", 
    timeout=120.0
)

async def get_response_async(prev_messages,
                             next_content,
                             model,
                             client,
                             tools=None,
                             max_retries=3):

    if isinstance(next_content, str):
        user_content = next_content
    else:
        user_content = next_content

    messages = prev_messages + [{"role": "user", "content": user_content}]

    MAX_TOKENS_LIMIT = 4096 

    for attempt in range(max_retries):
        try:
            reasoning_content = ""
            answer_content = ""
            tool_info = []
            is_answering = False

            if tools is not None:
                response = await client.chat.completions.create(
                    model=model,
                    messages=messages,
                    tools=tools,
                    parallel_tool_calls=True,
                    stream=True,
                    max_tokens=MAX_TOKENS_LIMIT
                )
            else:
                response = await client.chat.completions.create(
                    model=model,
                    messages=messages,
                    stream=True,
                    max_tokens=MAX_TOKENS_LIMIT
                )

            async for chunk in response:
                if chunk.choices:
                    delta = chunk.choices[0].delta
                    if hasattr(delta, 'reasoning_content'
                               ) and delta.reasoning_content != None:
                        reasoning_content += delta.reasoning_content
                    else:
                        if not is_answering:
                            is_answering = True
                        if delta.content is not None:
                            answer_content += delta.content
                        if delta.tool_calls is not None:
                            for tool_call in delta.tool_calls:
                                index = tool_call.index
                                while len(tool_info) <= index:
                                    tool_info.append({})
                                if tool_call.id:
                                    tool_info[
                                        index]['id'] = tool_info[index].get(
                                            'id', '') + tool_call.id
                                if tool_call.function and tool_call.function.name:
                                    tool_info[index][
                                        'name'] = tool_info[index].get(
                                            'name',
                                            '') + tool_call.function.name
                                if tool_call.function and tool_call.function.arguments:
                                    tool_info[index][
                                        'arguments'] = tool_info[index].get(
                                            'arguments',
                                            '') + tool_call.function.arguments
                                if tool_call.type:
                                    tool_info[index]['type'] = tool_call.type

            if not reasoning_content:
                if answer_content.startswith("<think>"):
                    end_think_idx = answer_content.find("</think>")
                    if end_think_idx != -1:
                        reasoning_content = answer_content[len("<think>"
                                                               ):end_think_idx]
                        answer_content = answer_content[end_think_idx +
                                                        len("</think>"):]

            new_message = {
                "role": "assistant",
                "content": answer_content,
            }
            if len(tool_info) > 0:
                tool_calls = [{
                    "id": tool_call["id"],
                    "function": {
                        "name": tool_call["name"],
                        "arguments": tool_call["arguments"]
                    },
                    "type": tool_call["type"],
                    "index": i
                } for i, tool_call in enumerate(tool_info)]
                new_message["tool_calls"] = tool_calls
            messages.append(new_message)

            return {
                "content": answer_content,
                "reasoning_content": reasoning_content,
                "usage": None,
                "prev_messages": messages,
                "tool_info": tool_info
            }

        except (APIConnectionError, InternalServerError) as e:
            print(
                f"--- [Retryable Error] (Attempt {attempt + 1}/{max_retries}): {e}"
            )
            if attempt == max_retries - 1: raise e
            await asyncio.sleep(5)

        except Exception as e:
            error_str = str(e).lower()
            if "incomplete chunked read" in error_str or "peer closed connection" in error_str or "connection closed" in error_str:
                print(
                    f"--- [Network/Server Cutoff] (Attempt {attempt + 1}/{max_retries}): {e}"
                )
                if attempt == max_retries - 1:
                    print("--- Max retries reached for cutoff error.")
                    raise e
                print(
                    "--- Server likely overloaded. Sleeping for 10 seconds...")
                await asyncio.sleep(10)
            else:
                print(f"--- [Fatal Error]: {e}")
                raise e

In [24]:
def process_qa_output(output_str):
    output_str = output_str.strip()
    if output_str.startswith("```json") and output_str.endswith("```"):
        output_str = output_str[len("```json"): -len("```")].strip()
    try:
        qa_list = json.loads(output_str)
        if isinstance(qa_list, list):
            return qa_list
        else:
            print("Output is not a list.")
            return []
    except json.JSONDecodeError as e:
        print(f"JSON decoding error: {e}")
        return []

In [None]:
def extract_quality_score(quality_check_output_str):
    score = None
    explanation = None
    
    score_start = quality_check_output_str.index("<scores>") + len("<scores>")
    score_end = quality_check_output_str.index("</scores>")
    score_json_str = quality_check_output_str[score_start:score_end].strip()
    score = json.loads(score_json_str)

    explanation_start = quality_check_output_str.index("<explanation>") + len("<explanation>")
    explanation_end = quality_check_output_str.index("</explanation>")
    explanation = quality_check_output_str[explanation_start:explanation_end].strip()

    return score, explanation

In [26]:
def check_answer_format_rule(answer_string: str):
    required_tags = {
        "description": ["<description>", "</description>"],
        "reason": ["<reason>", "</reason>"],
        "boxed": ["\\boxed{", "}"]
    }
    extracted_contents = {}
    for tag, (start_tag, end_tag) in required_tags.items():
        start_index = answer_string.find(start_tag)
        end_index = answer_string.find(end_tag)
        if start_index == -1 or end_index == -1 or start_index >= end_index:
            return False, None
        extracted_contents[tag] = answer_string[start_index + len(start_tag): end_index].strip()
    return True, extracted_contents

In [None]:
def alter_tags(answer_string: str):
    answer_string = answer_string.replace("<description>", "\\<description\\>")
    answer_string = answer_string.replace("</description>", "\\</description\\>")
    answer_string = answer_string.replace("<reason>", "\\<reason\\>")
    answer_string = answer_string.replace("</reason>", "\\</reason\\>")
    return answer_string

def format_data_md(results):
    md_lines = []
    for idx, res in enumerate(results):
        if res["qa_pair"] is None:
            continue
        md_lines.append(f"## Sample {idx + 1}\n")
        md_lines.append(f"**Original Sample Index:** {res['index']}\n")
        md_lines.append("**Context:**\n")
        md_lines.append(f"{res['context']}\n")
        md_lines.append("**Image Captions:**\n")
        for idx, caption in enumerate(res["image_captions"], 1):
            md_lines.append(f"- Image {idx}: {caption}\n")

        keyword_result = res.get('keyword_category_result', 'N/A')
        md_lines.append("**I. Thematic Classification and Keywords:**\n")
        md_lines.append(f"> {keyword_result}\n") 
        md_lines.append("---\n") 

        md_lines.append("**Question-Answer Pair:**\n")
        image_indices = res['qa_pair'].get('image_indices', [])
        md_lines.append(f"**Image Indices Used (1-indexed):** {image_indices}\n")
        md_lines.append(f"**Question:**\n{res['qa_pair']['question']}\n")
        md_lines.append(f"**Answer:**\n{alter_tags(res['qa_pair']['answer'])}\n")
        md_lines.append("**Format Check Result:**\n")
        md_lines.append(f"{res['format_check']}\n")
        md_lines.append("**Quality Scores:**\n")
        md_lines.append(f"```json\n{json.dumps(res['quality_score'], indent=2)}\n```\n")
        md_lines.append("**Quality Explanation:**\n")
        md_lines.append(f"{alter_tags(res['quality_explanation'])}\n")
        md_lines.append("---\n")
    return "\n".join(md_lines)

In [None]:
import re 

def build_caption_with_id(img_info):
    caption = img_info.get("caption", "")
    fig_id = img_info.get("fig_id", "")
    sub_label = img_info.get("subfig_label", "")

    if fig_id:

        prefix = f"This is {fig_id}{sub_label}. "
        return prefix + caption
    else:
        return caption

def extract_specific_context(item, target_indices):
    captions_list = item.get("context_enhanced_captions", [])
    summary_data = item.get("context_enhanced_summary", {})
    
    obs_parts = []
    int_parts = []
    
    for entry in captions_list:
        idx = entry.get("image_index")
        if idx in target_indices:
            obs = entry.get("observation", "")
            if obs: obs_parts.append(f"[Image {idx}]: {obs}")
            interp = entry.get("interpretation", "")
            if interp: int_parts.append(f"[Image {idx}]: {interp}")
    
    if summary_data.get("observation_summary"):
        obs_parts.append(f"[observation_summary]: {summary_data['observation_summary']}")
    if summary_data.get("interpretation_summary"):
        int_parts.append(f"[interpretation_summary]: {summary_data['interpretation_summary']}")
        
    return "\n".join(obs_parts), "\n".join(int_parts)


def extract_interpretation_text(item, target_indices=None):
    captions_list = item.get("context_enhanced_captions", [])
    summary_data = item.get("context_enhanced_summary", {})
    int_summary = summary_data.get("interpretation_summary", "")
    combined_int_parts = []
    for entry in captions_list:
        idx = entry.get("image_index")
        if target_indices and idx not in target_indices:
            continue
        int_text = entry.get("interpretation", "")
        if int_text and int_text != "Not found":
            combined_int_parts.append(f"[Image {idx} Interpretation]: {int_text}")
    if int_summary:
        combined_int_parts.append(f"[Overall Summary]: {int_summary}")
    return "\n".join(combined_int_parts)


def format_background_intro(theme_data, target_indices):
    if not theme_data:
        return ""
    

    exp_bg =theme_data.get("Experimental background", "N/A")
    

    all_themes = theme_data.get("Image Settings", {})
    
    selected_themes = {}
    indices_to_check = target_indices if target_indices else [int(k.replace("Image ", "")) for k in all_themes.keys() if "Image" in k]
    
    for idx in indices_to_check:
        key = f"Image {idx}"
        if key in all_themes:
            selected_themes[key] = all_themes[key]
            
    background_dict = {
        "Experimental background": exp_bg,
        "Image Settings": selected_themes
    }
    
    return json.dumps(background_dict, ensure_ascii=False, indent=2)


In [None]:
import json
import os

def generate_flattened_chain(item):
    """
    Convert nested logic_chain into a flattened list of strings and handle index alignment.
    """
    logic_chain = item.get("input_logic_chain", [])
    
    # Get the main data body
    data = logic_chain[0] if isinstance(logic_chain, list) and len(logic_chain) > 0 else logic_chain
    
    if not isinstance(data, dict):
        return []

    lines = []

    # 1. Research Context
    context = data.get("research_context", "")
    if context:
        lines.append(f"Research Context: {context}")

    # 2. Experiments (Facts)
    experiments = data.get("experiments", [])
    for i, exp in enumerate(experiments):
        setting = exp.get("experimental_setting", "standard setting")
        phenomenon = exp.get("visual_phenomenon", "observed phenomenon")
        result = exp.get("sub_conclusion", "observed result")
        
        line = (f"Experiment {i+1}: In the setting of {setting}, "
                f"the visual phenomenon observed was {phenomenon}, "
                f"which indicates {result}.")
        lines.append(line)

    # 3. Intermediate Inferences (Logic)
    reasoning = data.get("reasoning", {})
    inferences = reasoning.get("intermediate_inferences", [])

    # Step A: Collect all reference numbers to determine indexing style
    all_refs_ints = set()
    for inf in inferences:
        raw_refs = inf.get("based_on_experiments", [])
        for r in raw_refs:
            if str(r).isdigit(): 
                all_refs_ints.add(int(r))
    
    # Step B: Determine offset (if 0-indexed, add 1)
    offset = 1 if (0 in all_refs_ints) else 0

    for i, inf in enumerate(inferences):
        inference_text = inf.get("sub_conclusion", "")
        raw_refs = inf.get("based_on_experiments", [])
        
        corrected_refs = []
        for ref in raw_refs:
            try:
                # Key Fix: Convert to int, apply offset, then back to string
                val = int(ref)
                corrected_refs.append(str(val + offset))
            except (ValueError, TypeError):
                # Keep original value if not a digit
                corrected_refs.append(str(ref))
        
        ref_str = ", ".join(corrected_refs)
        
        line = (f"Intermediate Inference {i+1}: Derived from Experiment {ref_str}, "
                f"it is inferred that {inference_text}.")
        lines.append(line)

    # 4. Final Conclusion
    content = reasoning.get("content", "")
    conclusion = reasoning.get("conclusion", "")
    
    if content or conclusion:
        line = (f"Final Conclusion: Synthesizing the above, {content}. "
                f"Therefore, {conclusion}.")
        lines.append(line)

    return lines

def process_file(file_path):
    print(f"Processing {file_path}...")
    
    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    for item in data:
        flattened_list = generate_flattened_chain(item)
        item['flattened_logic_chain'] = flattened_list

    output_path = file_path.replace(".json", "_processed.json")
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"Done! Saved to {output_path}")

# --- Execution ---
input_qa_filename = "data_final.json" 
input_qa_file_prefix = os.path.splitext(input_qa_filename)[0]

if os.path.exists(input_qa_filename):
    process_file(input_qa_filename)
else:
    print(f"File '{input_qa_filename}' not found. Please check the path.")

Processing data_final_v2.json...
Done! Saved to data_final_v2_processed.json


In [29]:
LOGIC_CHAIN_QC_1_TEMPLATE = \
"""
You are an expert in biomedical reasoning and logic evaluation.
Your task is to evaluate the integrity and coherence of a logic chain.
The input is a structured list of strings representing the progression from experimental facts to intermediate inferences, and finally to a conclusion.

# Input Data

[Logic Chain] (The reasoning path to evaluate):
{flattened_logic_chain}

# Evaluation Criteria (1-5 Scale)

1. Evidence Support Strength
   Assess if the intermediate inferences provide sufficient and accurate support for the final reasoning content.
   - Score 1 (Critical Fail): Contradictory or Unsupported. The final content makes claims that contradict the intermediate inferences or relies on evidence not present in the chain.
   - Score 3 (Borderline): Weak or Partial Support. The final content is somewhat related but contains major leaps in logic or includes details not fully backed by the intermediate steps.
   - Score 5 (Pass): Strong Support. The final content is a robust and accurate synthesis strictly derived from the provided intermediate inferences.

2. Logical Flow and Coherence
   Assess if the transition from Intermediate Inferences to the Final Conclusion is logically sound and seamless.
   - Score 1 (Critical Fail): Fragmented or Disjointed. The logic jumps randomly; the connection between the inference layer and the conclusion layer is broken or nonsensical.
   - Score 3 (Borderline): Rough or Repetitive. The flow is understandable but clunky, redundant, or requires the reader to guess the connection between steps.
   - Score 5 (Pass): Seamless and Coherent. The reasoning flows naturally like a scientific argument; the conclusion feels like the inevitable result of the preceding steps.

# Output Format (Strict JSON)

You must return the result strictly in the following format:

<scores>
{{
  "Evidence Support Strength": A,
  "Logical Flow and Coherence": B
}}
</scores>

<explanation>
[Provide a brief explanation for your scoring. explicitly stating if there are logical gaps, contradictions, or if the chain is solid.]
</explanation>

(Where A, B are integer scores from 1 to 5)
"""

In [None]:
import os
import json
import asyncio
from tqdm.asyncio import tqdm_asyncio

INPUT_FILE = f"{input_qa_file_prefix}_processed.json"  

file_prefix = os.path.splitext(INPUT_FILE)[0]
OUTPUT_PASS_FILE = f"{file_prefix}_qc1_passed.json" 
OUTPUT_FAIL_FILE = f"{file_prefix}_qc1_failed.json"

CONCURRENT_LIMIT = 5

async def process_single_logic_chain(item, sem):
    """
    Perform QC specifically for the flattened_logic_chain.
    """
    async with sem:
        # 1. Extract logic chain
        raw_chain = item.get("flattened_logic_chain", [])
        
        # Format handling
        if isinstance(raw_chain, list):
            chain_str = "\n".join(raw_chain)
        elif isinstance(raw_chain, str):
            chain_str = raw_chain
        else:
            return False, item, "Missing or invalid flattened_logic_chain"

        if not chain_str.strip():
            return False, item, "Empty logic chain"

        # 2. Prepare Prompt
        prompt = LOGIC_CHAIN_QC_1_TEMPLATE.format(
            flattened_logic_chain=chain_str
        )
        
        try:
            content = openai_pack_content(prompt, None)
            response = await get_response_async([], content, local_text_model, local_text_client)
            
            scores, explanation = extract_quality_score(response["content"])
            
            if not scores:
                return False, item, "Score parsing failed"

            score_values = [v for k,v in scores.items() if isinstance(v, (int, float))]
            if not score_values:
                return False, item, "No valid scores"
            
            min_score = min(score_values)
            
            item["logic_qc_scores"] = scores
            item["logic_qc_explanation"] = explanation
            
            # Threshold setting (e.g., must be > 3)
            if min_score > 3: 
                return True, item, None
            else:
                return False, item, f"Low Score (Min: {min_score})"

        except Exception as e:
            return False, item, f"Error: {str(e)}"

async def run_qc_pipeline_single_file():
    if not os.path.exists(INPUT_FILE):
        print(f"[Error] Input file not found: {INPUT_FILE}")
        return

    print(f"[INFO] Processing Logic Chain QC for: {INPUT_FILE}")

    with open(INPUT_FILE, "r", encoding="utf-8") as f:
        data_list = json.load(f)

    semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
    
    async def process_item_wrapper(item):
        # Validate required fields
        if "flattened_logic_chain" not in item:
            return [], [item]

        is_passed, processed_item, reason = await process_single_logic_chain(item, semaphore)

        batch_passed = [] 
        batch_failed = [] 
        
        new_item = processed_item.copy() 
        
        if reason:
             new_item["qc_fail_reason"] = reason

        if is_passed:
            batch_passed.append(new_item)
        else:
            batch_failed.append(new_item)

        return batch_passed, batch_failed

    print(f"[INFO] Starting QC for {len(data_list)} items...")
    tasks = [process_item_wrapper(item) for item in data_list]
    results = await tqdm_asyncio.gather(*tasks)

    all_passed_items = []
    all_failed_items = []

    for batch_pass_list, batch_fail_list in results:
        if batch_pass_list:
            all_passed_items.extend(batch_pass_list)
        if batch_fail_list:
            all_failed_items.extend(batch_fail_list)

    print(f"[INFO] Writing results...")
    
    with open(OUTPUT_PASS_FILE, "w", encoding="utf-8") as f:
        json.dump(all_passed_items, f, indent=4, ensure_ascii=False)
    
    with open(OUTPUT_FAIL_FILE, "w", encoding="utf-8") as f:
        json.dump(all_failed_items, f, indent=4, ensure_ascii=False)

    print(f"-" * 30)
    print(f"[DONE] Finished Logic Chain QC.")
    print(f"Passed Items: {len(all_passed_items)}")
    print(f"Failed Items: {len(all_failed_items)}")

# Execution
await run_qc_pipeline_single_file()

[INFO] Processing Logic Chain QC for: data_final_v2_processed.json
[INFO] Starting QC for 10 items...




100%|██████████| 10/10 [00:43<00:00,  4.40s/it]

[INFO] Writing results...
------------------------------
[DONE] Finished Logic Chain QC.
Passed Items: 10
Failed Items: 0





In [None]:
import json
import os
import re

def extract_visual_phenomena(file_path):
    # Pre-process data before QC2
    print(f"Processing file: {file_path}")
    
    if not os.path.exists(file_path):
        print(f"Error: File {file_path} not found")
        return

    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    count = 0
    for item in data:
        extracted_list = []
        
        logic_chain = item.get("input_logic_chain", [])
        
        if logic_chain and isinstance(logic_chain, list) and len(logic_chain) > 0:
            first_chain_node = logic_chain[0]
            experiments = first_chain_node.get("experiments", [])
            
            for exp in experiments:
                vp_text = exp.get("visual_phenomenon", "")
                
                if vp_text:
                    # 1. Regex: Remove spaces and brackets [...] content
                    vp_text = re.sub(r'\s*\[.*?\]', '', vp_text)
                    
                    # 2. Strip whitespace
                    vp_text = vp_text.strip()
                    
                    # 3. Fix potential " ." cases
                    vp_text = vp_text.replace(' .', '.')
                    
                    # 4. Ensure trailing period exists
                    if len(vp_text) > 0 and not vp_text.endswith('.'):
                        vp_text += '.'

                extracted_list.append({
                    "visual_phenomenon": vp_text
                })
        
        # Write new field to item
        item["extracted_visual_phenomena"] = extracted_list
        count += 1

    # Save back to file
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    
    print(f"Done! Updated {count} items.")
    print(f"Results saved to: {file_path}")

# --- Execution ---
input_qa_filename = f"{input_qa_file_prefix}_processed_qc1_passed.json"

if __name__ == "__main__":
    extract_visual_phenomena(input_qa_filename)

正在处理文件: data_final_v2_processed_qc1_passed.json
处理完成！已更新 10 条数据。
结果已保存回: data_final_v2_processed_qc1_passed.json


In [32]:
LOGIC_CHAIN_QC_2_TEMPLATE = \
"""
You are an expert in biomedical text verification and fact-checking.
Your task is to verify if the [Visual Phenomena] described in the logic chain are supported by the provided Source Data ([Observation] and [Context]).

# Input Data

[Observation] (Objective visual descriptions of the images):
{Observation}

[Context] (Background containing [Image] tags):
{Context}

[Visual Phenomena] (The descriptions extracted from the logic chain to be verified):
{VisualPhenomena}

# Evaluation Criteria (1-5 Scale)

1. Source Grounding & Verification
   Assess if every visual phenomenon listed in the Target is explicitly mentioned or clearly visible in the [Context] or [Observation].
   - Score 1 (Critical Fail): Hallucination. The target describes features that are completely absent from both the Observation and Context, or contradicts them.
   - Score 3 (Borderline): Partial Match. Some descriptions are supported, but others are missing source evidence, or the target adds significant details not found in the source.
   - Score 5 (Pass): Fully Grounded. Every statement in the [Visual Phenomena] is directly supported by evidence found in the Source Observation or Source Context (textual descriptions of visual outcomes).

# Output Format (Strict JSON)

You must return the result strictly in the following format:

<scores>
{{
  "Source Grounding & Verification": A
}}
</scores>

<explanation>
[Provide a brief explanation. If there is a hallucination or missing reference, explicitly quote the unsupported part.]
</explanation>

(Where A is an integer score from 1 to 5)
"""

In [None]:
import os
import json
import asyncio
from tqdm.asyncio import tqdm_asyncio

# ================= Configuration =================
INPUT_FILE = f"{input_qa_file_prefix}_processed_qc1_passed.json"

file_prefix = f"{input_qa_file_prefix}_processed"
OUTPUT_PASS_FILE = f"{file_prefix}_qc2_passed.json"
OUTPUT_FAIL_FILE = f"{file_prefix}_qc2_failed.json"

CONCURRENT_LIMIT = 10  # Concurrency limit

# ================= Core Processing Functions =================

async def process_single_grounding_check(item, sem):
    async with sem:
        # 1. Extract source data
        obs = item.get("input_observation", "No observation provided.")
        ctx = item.get("input_context", "No context provided.")
        
        # 2. Extract visual_phenomena for verification
        extracted_data = item.get("extracted_visual_phenomena", [])
        
        if not extracted_data:
            return False, item, "No extracted visual phenomena found"

        vp_lines = []
        for idx, entry in enumerate(extracted_data):
            text = entry.get("visual_phenomenon", "")
            if text:
                vp_lines.append(f"{idx + 1}. {text}")
        
        vp_str = "\n".join(vp_lines)
        
        if not vp_str.strip():
             return False, item, "Empty visual phenomena text"

        # 3. Fill Prompt
        prompt = LOGIC_CHAIN_QC_2_TEMPLATE.format(
            Observation=obs,
            Context=ctx,
            VisualPhenomena=vp_str
        )
        
        try:
            # 4. Call Model
            content = openai_pack_content(prompt, None)
            response = await get_response_async([], content, local_text_model, local_text_client)
            
            # 5. Parse scores
            scores, explanation = extract_quality_score(response["content"])
            
            if not scores:
                return False, item, "Score parsing failed"

            # Get target score
            score_val = scores.get("Source Grounding & Verification", 0)
            
            # Write results back to item
            item["qc2_grounding_score"] = score_val
            item["qc2_grounding_explanation"] = explanation
            
            # 6. Thresholding (e.g., > 3 passed)
            if score_val > 3:
                return True, item, None
            else:
                return False, item, f"Low Grounding Score ({score_val})"

        except Exception as e:
            return False, item, f"Error: {str(e)}"

async def run_qc2_pipeline():
    if not os.path.exists(INPUT_FILE):
        print(f"[Error] Input file not found: {INPUT_FILE}")
        return

    print(f"[INFO] Processing QC2 (Grounding Check) for: {INPUT_FILE}")

    with open(INPUT_FILE, "r", encoding="utf-8") as f:
        data_list = json.load(f)

    semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
    
    async def process_item_wrapper(item):
        if "extracted_visual_phenomena" not in item:
             return [], [item] 

        is_passed, processed_item, reason = await process_single_grounding_check(item, semaphore)

        batch_passed = [] 
        batch_failed = [] 
        
        new_item = processed_item.copy()
        
        if reason:
             new_item["qc2_fail_reason"] = reason

        if is_passed:
            batch_passed.append(new_item)
        else:
            batch_failed.append(new_item)

        return batch_passed, batch_failed

    print(f"[INFO] Starting QC2 for {len(data_list)} items...")
    tasks = [process_item_wrapper(item) for item in data_list]
    results = await tqdm_asyncio.gather(*tasks)

    all_passed_items = []
    all_failed_items = []

    for batch_pass_list, batch_fail_list in results:
        if batch_pass_list:
            all_passed_items.extend(batch_pass_list)
        if batch_fail_list:
            all_failed_items.extend(batch_fail_list)

    print(f"[INFO] Writing QC2 results...")
    
    with open(OUTPUT_PASS_FILE, "w", encoding="utf-8") as f:
        json.dump(all_passed_items, f, indent=4, ensure_ascii=False)
    
    with open(OUTPUT_FAIL_FILE, "w", encoding="utf-8") as f:
        json.dump(all_failed_items, f, indent=4, ensure_ascii=False)

    print(f"-" * 30)
    print(f"[DONE] Finished QC2.")
    print(f"Passed Items: {len(all_passed_items)}")
    print(f"Failed Items: {len(all_failed_items)}")

# Execute
await run_qc2_pipeline()

[INFO] Processing QC2 (Grounding Check) for: data_final_v2_processed_qc1_passed.json
[INFO] Starting QC2 for 10 items...




100%|██████████| 10/10 [00:30<00:00,  3.07s/it]

[INFO] Writing QC2 results...
------------------------------
[DONE] Finished QC2.
Passed Items: 10
Failed Items: 0





In [34]:
LOGIC_CHAIN_QC_3_TEMPLATE = \
"""
You are an expert in evaluating question-answering logic.
Your task is to verify if the provided Conclusion effectively answers or corresponds to the specific Question asked.

# Input Data

Question:
{Question}

Observation (Visual Evidence containing scale info):
{Observation}

Logic Chain:
{LogicChain}

Conclusion (Derived from Logic Chain):
{Conclusion}

# Evaluation Criteria (1-5 Scale)

1. Question-Conclusion Alignment
   Assess if the Conclusion directly addresses the core inquiry of the Question.
   - Score 1 (Fail): The conclusion is irrelevant, unrelated, or contradicts the premise of the question. It does not provide an answer.
   - Score 3 (Passable): The conclusion is related and provides a partial answer, but may be slightly tangential or misses the specific format requested.
   - Score 5 (Pass): The conclusion provides a clear, logical, and direct answer to the question. It functions effectively as the final output.

2. Scale/Legend Consistency Check
Check if the problem statement lacks a scale/legend, but the observation results, reasoning content, and conclusion clearly include scale numbers or scale information.
- Score 1 point (Serious Failure): The problem statement lacks a scale/legend, but the observation results contain explicit scale numbers (e.g., "50 nm," "scale"), and the reasoning content utilizes this scale information from the observation.
- Score 5 points (Pass): The problem statement and observation results are consistent; either both include a scale/legend, or neither includes scale-related information. If the problem statement includes scale-related information, but the conclusion and reasoning content do not use it, it is not considered an error.

3. Reasoning Validity 
   Assess if the Logic Chain steps contains excessive speculation or hallucinations not supported by the Observation.
   - Score 1 (Critical Fail): Given ONLY Research Context, Experimental Settings, and Visual Phenomenon, the "inference", "sub_conclusion", "content", "conclusion" parts contains details impossible to know.
   - Score 5 (Pass): Given ONLY Research Context, Experimental Settings, and Visual Phenomenon, the "inference", "sub_conclusion", "content", "conclusion" parts are all supported without any hallucination.

# Output Format (Strict JSON)

You must return the result strictly in the following format:

<scores>
{{
  "Question-Conclusion Alignment": A,
  "Scale/Legend Consistency Check" : B,
  "Reasoning Validity" : C
}}
</scores>

<explanation>
[Briefly explain why the conclusion satisfies or fails to answer the question.]
</explanation>


(Where A, B, C is an integer score from 1 to 5)
"""

In [None]:
import os
import json
import asyncio
from tqdm.asyncio import tqdm_asyncio

# ================= Configuration =================
INPUT_FILE = f"{input_qa_file_prefix}_processed_qc2_passed.json"

file_prefix = f"{input_qa_file_prefix}_processed"
OUTPUT_PASS_FILE = f"{file_prefix}_qc3_passed.json"
OUTPUT_FAIL_FILE = f"{file_prefix}_qc3_failed.json"

CONCURRENT_LIMIT = 20  # Concurrency limit


# ================= Core Processing Functions =================

async def process_single_alignment_check(item, sem):
    async with sem:
        # 1. Extract Question
        # Path: basic_qa -> question
        question_text = item.get("basic_qa", {}).get("question", "")
        if not question_text:
             return False, item, "No question found in basic_qa"
        
        observation_text = item.get("input_observation", "No observation provided.")
        
        reasoning_content = ""
        try:
            # Logic for reasoning content extraction
            logic_chain = item.get("input_logic_chain", [])
            if logic_chain and isinstance(logic_chain, list):
                first_node = logic_chain[0]
                reasoning_node = first_node.get("reasoning", {})
                reasoning_content = reasoning_node.get("content", "")
        except Exception:
            pass
        if not reasoning_content:
            reasoning_content = "No detailed reasoning content provided."

        # 2. Extract Conclusion
        # Path: input_logic_chain -> first item -> conclusion
        conclusion_text = ""
        try:
            logic_chain = item.get("input_logic_chain", [])
            if logic_chain and isinstance(logic_chain, list):
                first_node = logic_chain[0]
                # Primary attempt: direct conclusion field
                conclusion_text = first_node.get("conclusion", "")
                
                # Fallback: check inside reasoning node
                if not conclusion_text and "reasoning" in first_node:
                     conclusion_text = first_node.get("reasoning", {}).get("conclusion", "")
        except Exception:
            pass

        if not conclusion_text:
             return False, item, "No conclusion found in logic chain"

        # 3. Prepare Prompt
        prompt = LOGIC_CHAIN_QC_3_TEMPLATE.format(
            Question=question_text,
            Observation=observation_text,
            LogicChain=logic_chain,
            Conclusion=conclusion_text
        )
        
        try:
            # 4. Call Model
            content = openai_pack_content(prompt, None)
            response = await get_response_async([], content, local_text_model, local_text_client)
            
            # 5. Parse Scores
            scores, explanation = extract_quality_score(response["content"])
            
            if not scores:
                return False, item, "Score parsing failed"

            # QC3 evaluates multiple alignment dimensions
            score_alignment = scores.get("Question-Conclusion Alignment", 0)
            score_scale = scores.get("Scale/Legend Consistency Check", 0)
            score_reasoning = scores.get("Reasoning Validity", 0)
            
            item["qc3_scores"] = scores
            item["qc3_explanation"] = explanation
            
            min_score = min(score_alignment, score_scale, score_reasoning)
            
            if min_score > 3:
                return True, item, None
            else:
                return False, item, f"Low Score (Min: {min_score}, Details: {scores})"

        except Exception as e:
            return False, item, f"Error: {str(e)}"

async def run_qc3_pipeline():
    if not os.path.exists(INPUT_FILE):
        print(f"[Error] Input file not found: {INPUT_FILE}")
        return

    print(f"[INFO] Processing QC3 (Alignment Check) for: {INPUT_FILE}")

    with open(INPUT_FILE, "r", encoding="utf-8") as f:
        data_list = json.load(f)

    semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
    
    async def process_item_wrapper(item):
        if "basic_qa" not in item or "input_logic_chain" not in item:
             return [], [item] 

        is_passed, processed_item, reason = await process_single_alignment_check(item, semaphore)

        batch_passed = [] 
        batch_failed = [] 
        
        new_item = processed_item.copy()
        
        if reason:
             new_item["qc3_fail_reason"] = reason

        if is_passed:
            batch_passed.append(new_item)
        else:
            batch_failed.append(new_item)

        return batch_passed, batch_failed

    print(f"[INFO] Starting QC3 for {len(data_list)} items...")
    tasks = [process_item_wrapper(item) for item in data_list]
    results = await tqdm_asyncio.gather(*tasks)

    all_passed_items = []
    all_failed_items = []

    for batch_pass_list, batch_fail_list in results:
        if batch_pass_list:
            all_passed_items.extend(batch_pass_list)
        if batch_fail_list:
            all_failed_items.extend(batch_fail_list)

    print(f"[INFO] Writing QC3 results...")
    
    with open(OUTPUT_PASS_FILE, "w", encoding="utf-8") as f:
        json.dump(all_passed_items, f, indent=4, ensure_ascii=False)
    
    with open(OUTPUT_FAIL_FILE, "w", encoding="utf-8") as f:
        json.dump(all_failed_items, f, indent=4, ensure_ascii=False)

    print(f"-" * 30)
    print(f"[DONE] Finished QC3.")
    print(f"Passed Items: {len(all_passed_items)}")
    print(f"Failed Items: {len(all_failed_items)}")

# Execution
await run_qc3_pipeline()

[INFO] Processing QC3 (Alignment Check) for: data_final_v2_processed_qc2_passed.json
[INFO] Starting QC3 for 10 items...


100%|██████████| 10/10 [00:13<00:00,  1.36s/it]

[INFO] Writing QC3 results...
------------------------------
[DONE] Finished QC3.
Passed Items: 10
Failed Items: 0



