# Format Results Processing

This notebook processes raw results from the `raw/` directory and formats them into comparable JSON files for analysis.

## Task Description

The main task is to transform raw results into standardized comparison-ready formats:

1. **Input**: Raw JSON files from the `raw/` directory containing model predictions
2. **Output**: Formatted JSON files where:
   - **Key**: Patient ID
   - **Value**: List of complications
   - **For strict results**: Value is a list of complications concatenated with their grading using underscore (`_`) separator

## Processing Steps

1. Parse raw JSON files containing model outputs
2. Extract patient IDs and corresponding complications
3. Format complications according to the target schema
4. Generate comparison-ready JSON files for downstream analysis


## Comprehensive results

In [None]:
import json
from pathlib import Path
import re

def process_comprehensive_results(file, center="center1"):
    """
    Process comprehensive results from raw JSON files and format them for comparison.
    
    Args:
        file (str): Path to the raw JSON file to process
        center (str): Output directory name (default: "center1")
    
    The function extracts complications data from various JSON formats and 
    standardizes them into a consistent structure for downstream analysis.
    """
    # Load the raw JSON data
    with open(file, 'r') as f:
        data = json.load(f)
    
    # Process each patient record
    for k, v in data.items():
        # Extract the content field which contains the complications data
        tmp = v["content"]
        
        # Handle different content formats:
        if isinstance(tmp, dict):
            # Direct dictionary format - extract complications directly
            tmp = tmp["complications"]
        elif isinstance(tmp, str) and "```json" in tmp:
            # Markdown-wrapped JSON format - extract and parse the JSON block
            tmp = tmp.split("```json")[1].split("```")[0]
            tmp = json.loads(tmp)["complications"]
        else:
            # String JSON format - parse directly and extract complications
            tmp = json.loads(tmp)["complications"]
        complies = [t["name"] for t in tmp]
        # Remove parentheses and their contents (including both half-width and full-width parentheses)
        for i in range(len(complies)):
            # Remove half-width parentheses and their contents
            complies[i] = re.sub(r'\([^)]*\)', '', complies[i])
            # Remove full-width parentheses and their contents
            complies[i] = re.sub(r'（[^）]*）', '', complies[i])
            # Remove any remaining whitespace
            complies[i] = complies[i].strip()
        data[k] = complies
    
    # Save the processed data to the output directory
    with open(f"{center}/{Path(file).name}", 'w') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    
    print(f"Processed {file}")



In [5]:
# example
process_comprehensive_results("raw/claude3_7_comprehensive.json", "center1")

Processed raw/claude3_7_comprehensive.json


## Targeted results

In [6]:
import re

def clean_json_text(text):
    # Find all ```json and ``` markers
    start_markers = [m.start() for m in re.finditer(r'```json', text)]
    end_markers = [m.start() for m in re.finditer(r'```', text) if not text[max(0, m.start()-4):m.start()].endswith('json')]
    
    # If multiple json code blocks are found, raise an error
    if len(start_markers) > 1:
        raise ValueError("Multiple ```json markers found, cannot determine which one to use")
    
    # If ```json marker is found
    if start_markers:
        start_idx = start_markers[0] + 7  # Length of ```json is 7
        
        # Find the corresponding end marker
        valid_end_markers = [idx for idx in end_markers if idx > start_idx]
        if not valid_end_markers:
            raise ValueError("Cannot find matching end marker ```")
        
        end_idx = valid_end_markers[0]
        
        # Extract content between ```json and ```
        text = text[start_idx:end_idx].strip()
        return text
    
    # If no ```json marker is found, return original text
    else:
        first_brace = text.find('{')
        last_brace = text.rfind('}')
        
        if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
            text = text[first_brace:last_brace+1].strip()
            return text
        return text.strip()


In [7]:
# Define rules for processing complications list
# Rule 1: If all evaluations yield negative results, output "no postoperative complications"
# Rule 2: Identification of specific infectious complications automatically excludes diagnosis of "infection of unknown source" to prevent redundancy
def rules(list):
    # Rule 1: Return "no postoperative complications" for empty list
    if len(list) == 0:
        return ["无术后并发症"]
    # Rule 2: Handle mutual exclusivity between "unknown source" infection and specific infections
    else:
        has_unknown_source = any("来源不明" in item for item in list)
        if has_unknown_source:
            # Check for other specific infection-related complications
            has_other_infection = any("感染" in item and "来源不明" not in item for item in list)
            has_pneumonia = any("肺炎" in item for item in list)
            
            # If specific infection types exist, remove "unknown source" to ensure clinically coherent and mutually exclusive diagnostic conclusions
            if has_other_infection or has_pneumonia:
                list = [item for item in list if "来源不明" not in item]
        return list

In [8]:
# Function: Open modular JSON file based on column name
import json
def process_targeted_results(path, center="center1"):
    with open(path, "r") as f:
        tmp_dict = json.load(f)
    final_dict = {}
    for key, value in tmp_dict.items():
        result_list = []
        for k, v in value.items():
            try:
                result = v["content"]
                result_dict = json.loads(clean_json_text(result))
                if result_dict["bool"] == "True" or result_dict["bool"] == True:
                    result_list.append(k)
            except Exception as e:
                print(f"Error processing {path}: {v} - {e}")
                pass
        final_dict[key] = rules(result_list)
    with open(f"{center}/{Path(path).name}", 'w') as f:
        json.dump(final_dict, f, indent=4, ensure_ascii=False)
    print(f"Processed {path}")

In [9]:
# an example
process_targeted_results("raw/deepseek_r1_targeted.json", "center1")

Processing raw/deepseek_r1_targeted.json
Processed raw/deepseek_r1_targeted.json


## Strict results


In [11]:
import json
import re
import demjson3
def extract_complications_with_grading_from_comprehensive(data):
    """
    Extract complications from o1_hard_prompte for each case
    
    Returns:
        dict: Dictionary containing case ID and corresponding complications, each complication is in "name_grading" format
    """
    
    # Extract complications
    result = {}
    for case_id, case_data in data.items():
        # Parse JSON in content field
        try:
            # If case_data doesn't have 'content' key, use itself
            if 'content' in case_data:
                content = case_data['content']
            else:
                content = case_data
            # Check if there's ```json marker
            if "```json" in content:
                # Extract content between ```json and ```
                json_content = content.split("```json")[1].split("```")[0].strip()
                content = demjson3.decode(json_content)
            else:
                try:
                    content = demjson3.decode(content)
                except:
                    pass
            # Extract complications and convert to name_grading format
            if 'complications' in content:
                complications_list = []
                for comp in content['complications']:
                    if isinstance(comp, dict) and 'name' in comp:
                        # Remove parentheses and content within them
                        name = re.sub(r'[\(（][^)）]*[\)）]', '', comp['name']).strip()
                        # If name contains "其他" (other), discard this complication
                        if "其他" not in name:
                            grading = comp.get('grading', 'Null')
                            complications_list.append(f"{name}_{grading}")
                result[case_id] = complications_list
        except json.JSONDecodeError:
            # print(content)
            # Handle JSON parsing errors
            result[case_id] = ["Error: Cannot parse JSON content"]
        except Exception as e:
            # Handle other errors
            result[case_id] = [f"Error: {str(e)}"]
    
    return result


In [12]:
def extract_complications_with_grading_from_targeted(data):
    """
    Extract complications and their grading from divided data
    
    Args:
        data: Dictionary containing divided data
        
    Returns:
        dict: Dictionary containing case ID and corresponding complications, each complication is in "name_grading" format
    """
    
    # Define rules
    def rules(complications_list):
        if len(complications_list) == 0:
            return ["无术后并发症_Null"]
        else:
            # Check if contains "来源不明" (unknown source)
            has_unknown_source = any("来源不明" in item["name"] for item in complications_list)
            if has_unknown_source:
                # Check if there are other infection-related elements
                has_other_infection = any("感染" in item["name"] and "来源不明" not in item["name"] for item in complications_list)
                has_pneumonia = any("肺炎" in item["name"] for item in complications_list)
                
                # If there are other infection-related elements, remove "来源不明"
                if has_other_infection or has_pneumonia:
                    complications_list = [item for item in complications_list if "来源不明" not in item["name"]]
            
            # Convert to name_grading format
            return [f"{item['name']}_{item['grading']}" for item in complications_list]
    
    # Extract complications and grading
    result = {}
    for case_id, case_data in data.items():
        try:
            complications_list = []
            # Iterate through each complication
            for complication_name, complication_data in case_data.items():
                try:
                    # If complication_data doesn't have 'content' key, use itself
                    if 'content' in complication_data:
                        content = complication_data['content']
                    else:
                        content = complication_data
                    
                    # Check if there's ```json marker
                    if "```json" in content:
                        # Extract content between ```json and ```
                        json_content = content.split("```json")[1].split("```")[0].strip()
                        content_json = demjson3.decode(json_content)
                    else:
                        try:
                            content_json = demjson3.decode(content)
                        except:
                            content_json = content
                    
                    if isinstance(content_json, dict) and 'bool' in content_json and content_json['bool'] == True:
                        # If bool is True, add to complications list
                        grading = content_json.get('grading', 'Null')
                        # Remove parentheses and content within them
                        name = re.sub(r'[\(（][^)）]*[\)）]', '', complication_name).strip()
                        # If name contains "其他" (other), discard this complication
                        if "其他" not in name:
                            complications_list.append({
                                "name": name,
                                "grading": grading
                            })
                except json.JSONDecodeError:
                    print(f"JSON parsing error: {case_id} - {complication_name}")
                except Exception as e:
                    print(f"Processing error {case_id} - {complication_name}: {str(e)}")
            
            # Apply rules to process complications list and convert to name_grading format
            result[case_id] = rules(complications_list)
        except Exception as e:
            # Handle other errors
            result[case_id] = [f"Error: {str(e)}_Null"]
    
    return result


In [17]:
def process_human_gold_strict_data(center, file_type):
    """
    Process comprehensive data (human or gold) for a given center
    
    Args:
        center: str, center name (e.g., 'center1', 'center2')
        file_type: str, file type ('human' or 'gold')
    """
    # Read data from source
    with open(f'../data_source/{center}/{file_type}.json', 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # Extract complications data
    complications_data = extract_complications_with_grading_from_comprehensive(data)
    
    # Write to output file
    with open(f'strict/{center}/{file_type}.json', 'w', encoding='utf-8') as f:
        json.dump(complications_data, f, ensure_ascii=False, indent=4)


In [None]:
# Process human data using the function
process_human_gold_strict_data('center1', 'human_expert')  # Only example data with one case for privacy reasons
process_human_gold_strict_data('center2', 'human_expert')  # Not publicly available for privacy reasons

# Process gold data using the function
process_human_gold_strict_data('center1', 'gold_standard')  # Only example data with one case for privacy reasons
process_human_gold_strict_data('center2', 'gold_standard')  # Not publicly available for privacy reasons


In [None]:
def process_raw_to_strict_data(center, file):
    """
    Process comprehensive data (human or gold) for a given center
    
    Args:
        center: str, center name (e.g., 'center1', 'center2')
        file: str, files in raw folder
    """
    # Read data from source
    with open(f'raw/{center}/{file}.json', 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # Extract complications data
    if "comprehensive" in file:
        complications_data = extract_complications_with_grading_from_comprehensive(data)
    elif "targeted" in file:
        complications_data = extract_complications_with_grading_from_targeted(data)
    else:
        raise ValueError(f"Invalid file: {file}")
    
    # Write to output file
    with open(f'strict/{center}/{file}.json', 'w', encoding='utf-8') as f:
        json.dump(complications_data, f, ensure_ascii=False, indent=4)


In [None]:
# examples
process_raw_to_strict_data("center1", "deepseek_r1_targeted.json")
process_raw_to_strict_data("center1", "o1_comprehensive.json")