In [None]:
import pandas as pd

def read_icd_codes_from_file(file_path):
    """
    Read ICD codes from a text file where each line contains an ICD code followed by a description.
    
    Parameters:
    file_path (str): The path to the text file containing the ICD codes and descriptions.
    
    Returns:
    list: A list of ICD codes extracted from the file.
    """
    icd_codes = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            code = line.split()[0]  # Assumes that the code and description are separated by whitespace
            icd_codes.append(code)
    
    return icd_codes

# Example usage:
file_path = '/Users/houzhen/research/LLMCoder/ALLcode/ICD-10/icd10cm-codes-2025.txt'
icd_codes = read_icd_codes_from_file(file_path)
print(icd_codes)


# GPT

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def calculate_existence_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    
    predict_col = 'predict' if 'predict' in df.columns else 'generated_result'
    label_col = 'label' if 'label' in df.columns else 'ground_truth'
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row[label_col]))
            pred_codes = set(extract_icd_codes(row[predict_col]))
            
            total_predictions += len(pred_codes)
            
            invalid_codes = [code for code in pred_codes if code not in icd_codes]
            if invalid_codes:
                error_rows += 1
                invalid_predictions += len(invalid_codes)
    else:
        # Compare only the first prediction and the first label
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[label_col])
            pred_codes = extract_icd_codes(row[predict_col])
            
            if true_codes and pred_codes:
                
                if pred_codes[0] not in icd_codes:
                    error_rows += 1
                    invalid_predictions += 1
    
    error_rate = error_rows / total_rows if total_rows > 0 else 0
    invalid_rate = invalid_predictions / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': total_rows,
        'error_samples': error_rows,
        'error_rate': error_rate,
        'total_predictions': total_predictions,
        'invalid_predictions': invalid_predictions,
        'invalid_rate': invalid_rate
    }

def analyze_files(root_folder, icd_codes):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Error samples: {error_stats['error_samples']}")
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    print(f"Invalid prediction rate: {error_stats['invalid_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
                        if len(df) > 0:
                            print("Example of the first row:")
                            print(df.iloc[0])
            
            # JSONL file
    
    return results

# function unchanged

folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/GPT"
]

print("Collecting valid ICD codes...")
valid_icd_codes = collect_valid_codes(folders)
print(f"Collected {len(valid_icd_codes)} valid ICD codes")

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder, valid_icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'existence_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    """Determine if there is a hierarchical error between two codes"""
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_level_errors(df, is_multi=False):
    """Calculate hierarchical errors"""
    total_predictions = 0
    level_errors = 0
    
    predict_col = 'predict' if 'predict' in df.columns else 'generated_result'
    label_col = 'label' if 'label' in df.columns else 'ground_truth'
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row[label_col]))
            pred_codes = set(extract_icd_codes(row[predict_col]))
            
            for pred_code in pred_codes:
                total_predictions += 1
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
    else:
        # Compare only the first prediction and the first label
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[label_col])
            pred_codes = extract_icd_codes(row[predict_col])
            
                total_predictions += 1
                if is_level_error(true_codes[0], pred_codes[0]):
                    level_errors += 1
    
    error_rate = level_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'error_rate': error_rate
    }

def analyze_files(root_folder):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_level_errors(df, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
                        if len(df) > 0:
                            print("Example of the first row:")
                            print(df.iloc[0])
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_level_errors(df, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
                        if len(df) > 0:
                            print("Example of the first row:")
                            print(df.iloc[0])
    
    return results

folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/GPT"
]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'level_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    """Determine if there is a hierarchical error between two codes"""
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def is_character_error(true_code, pred_code, icd_codes):
    """[Translated]"""
    # ICD code
    if pred_code not in icd_codes:
        return False
        
    if is_level_error(true_code, pred_code):
        return False
    
    true_match = re.match(r'([A-Z])(\d+.*)', true_code)
    pred_match = re.match(r'([A-Z])(\d+.*)', pred_code)
    
    if true_match and pred_match:
        true_letter, true_nums = true_match.groups()
        pred_letter, pred_nums = pred_match.groups()
        
        if true_letter == pred_letter and true_code != pred_code:
            if len(true_code) == len(pred_code):
                return True
    
    return False

def calculate_character_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    char_errors = 0
    
    predict_col = 'predict' if 'predict' in df.columns else 'generated_result'
    label_col = 'label' if 'label' in df.columns else 'ground_truth'
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row[label_col]))
            pred_codes = set(extract_icd_codes(row[predict_col]))
            
            for pred_code in pred_codes:
                    total_predictions += 1
                    if any(true_code and is_character_error(true_code, pred_code, icd_codes) 
                          for true_code in true_codes):
                        char_errors += 1
    else:
        # Compare only the first prediction and the first label
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[label_col])
            pred_codes = extract_icd_codes(row[predict_col])
            
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                    total_predictions += 1
                    if is_character_error(true_code, pred_code, icd_codes):
                        char_errors += 1
    
    error_rate = char_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_predictions': total_predictions,
        'character_errors': char_errors,
        'error_rate': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['character_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
                        if len(df) > 0:
                            print("Example of the first row:")
                            print(df.iloc[0])
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['character_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
                        if len(df) > 0:
                            print("Example of the first row:")
                            print(df.iloc[0])
    
    return results

folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/GPT"
]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'character_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import re

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def check_counts(path):
    """[Translated]ICD[Translated]"""
    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            for file in os.listdir(folder_path):
                if file.endswith('.csv'):
                    try:
                        df = pd.read_csv(os.path.join(folder_path, file))
                        total_rows = len(df)
                        wrong_count = 0
                        
                        is_multi = 'multi' in folder_name.lower()
                        
                        for idx, row in df.iterrows():
                            true_codes = convert_to_array(row['label' if 'label' in df.columns else 'ground_truth'])
                            true_count = len(true_codes)
                            
                            pred_codes = extract_icd_codes(row['predict' if 'predict' in df.columns else 'generated_result'])
                            pred_count = len(pred_codes)
                            
                            if (is_multi and pred_count != true_count) or (not is_multi and pred_count != 1):
                                wrong_count += 1
                        
                        error_rate = wrong_count / total_rows if total_rows > 0 else 0
                        print(f"\nProcessing file: {folder_name}/{file}")
                        print(f"Total rows: {total_rows}")
                        print(f"Rows with quantity error: {wrong_count}")
                        print(f"Error rate: {error_rate:.4f}")
                        
                    except Exception as e:
                        print(f"\nProcessing fileerror {folder_name}/{file}: {str(e)}")
                        if 'df' in locals():
                            print(f"File column names: {df.columns.tolist()}")
                            if len(df) > 0:
                                print("Example of the first row:")
                                print(df.iloc[0])

path = "/Users/houzhen/research/LLMCoder/code/challange/auto/GPT"
check_counts(path)

# fine tune GPT

In [None]:
import pandas as pd
import os
import json
import re

def load_icd_codes(code_list):
    """[Translated]ICD[Translated]"""
    return set(code_list)

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def calculate_existence_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    
    predict_col = 'generated_result'  # CSV file
    if 'predict' in df.columns:       # JSONL file
        predict_col = 'predict'
        
    if is_multi:
        for _, row in df.iterrows():
            predicted_codes = convert_to_array(row[predict_col])
            if any(code not in icd_codes for code in predicted_codes):
                error_rows += 1
    else:
        predictions = df[predict_col].apply(convert_to_array)
        for pred_list in predictions:
            if any(code not in icd_codes for code in pred_list):
                error_rows += 1
    
    error_rate = error_rows / total_rows if total_rows > 0 else 0
    
    return {
        'total_samples': total_rows,
        'error_samples': error_rows,
        'error_rate': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
    
    return results



folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto",

]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'existence_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_level_errors(df, is_multi=False):
    """Calculate hierarchical errors"""
    total_predictions = 0
    level_errors = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label' if 'label' in df.columns else 'ground_truth']))
            pred_codes = set(convert_to_array(row['predict' if 'predict' in df.columns else 'generated_result']))
            
            for pred_code in pred_codes:
                total_predictions += 1
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
    else:
        true_col = 'label' if 'label' in df.columns else 'ground_truth'
        pred_col = 'predict' if 'predict' in df.columns else 'generated_result'
        
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[true_col])
            pred_codes = convert_to_array(row[pred_col])
            
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                total_predictions += 1
                if is_level_error(true_code, pred_code):
                    level_errors += 1
    
    error_rate = level_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'error_rate': error_rate
    }

def analyze_files(root_folder):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_level_errors(df, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_level_errors(df, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
    
    return results

folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto"
]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'level_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def is_character_error(true_code, pred_code, icd_codes):
    """[Translated]"""
    # ICD code
    if pred_code not in icd_codes:
        return False
        
    if is_level_error(true_code, pred_code):
        return False
    
    true_match = re.match(r'([A-Z])(\d+.*)', true_code)
    pred_match = re.match(r'([A-Z])(\d+.*)', pred_code)
    
    if true_match and pred_match:
        true_letter, true_nums = true_match.groups()
        pred_letter, pred_nums = pred_match.groups()
        
        if true_letter == pred_letter and true_code != pred_code:
            if len(true_code) == len(pred_code):
                return True
    
    return False

def calculate_character_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    char_errors = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label' if 'label' in df.columns else 'ground_truth']))
            pred_codes = set(convert_to_array(row['predict' if 'predict' in df.columns else 'generated_result']))
            
            for pred_code in pred_codes:
                    total_predictions += 1
                    if any(true_code and is_character_error(true_code, pred_code, icd_codes) 
                          for true_code in true_codes):
                        char_errors += 1
    else:
        true_col = 'label' if 'label' in df.columns else 'ground_truth'
        pred_col = 'predict' if 'predict' in df.columns else 'generated_result'
        
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[true_col])
            pred_codes = convert_to_array(row[pred_col])
            
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                    total_predictions += 1
                    if is_character_error(true_code, pred_code, icd_codes):
                        char_errors += 1
    
    error_rate = char_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_predictions': total_predictions,
        'character_errors': char_errors,
        'error_rate': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['character_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['character_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
    
    return results



folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto"
]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'character_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os

def check_counts(path):
    """[Translated]"""
    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if os.path.isdir(folder_path):
            for file in os.listdir(folder_path):
                if file.endswith('.csv'):
                    df = pd.read_csv(os.path.join(folder_path, file))
                    total = len(df)
                    if 'multi' in file.lower():
                        wrong_count = sum(df['ground_truth'].str.count(',') != df['generated_result'].str.count(','))
                    else:
                        wrong_count = sum((df['generated_result'].str.count(',') > 0) | df['generated_result'].isna())
                    
                    print(f"{folder_name}/{file}: {wrong_count}/{total} = {wrong_count/total:.4f}")

path = "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto"
check_counts(path)

# enhance GPT

In [None]:
import pandas as pd
import os
import json
import re

def load_icd_codes(code_list):
    """[Translated]ICD[Translated]"""
    return set(code_list)

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def calculate_existence_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    
    predict_col = 'generated_result'  # CSV file
    if 'predict' in df.columns:       # JSONL file
        predict_col = 'predict'
        
    if is_multi:
        for _, row in df.iterrows():
            predicted_codes = convert_to_array(row[predict_col])
            if any(code not in icd_codes for code in predicted_codes):
                error_rows += 1
    else:
        predictions = df[predict_col].apply(convert_to_array)
        for pred_list in predictions:
            if any(code not in icd_codes for code in pred_list):
                error_rows += 1
    
    error_rate = error_rows / total_rows if total_rows > 0 else 0
    
    return {
        'total_samples': total_rows,
        'error_samples': error_rows,
        'error_rate': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
    
    return results



folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto_enhance_only_you",

]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'existence_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_level_errors(df, is_multi=False):
    """Calculate hierarchical errors"""
    total_predictions = 0
    level_errors = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label' if 'label' in df.columns else 'ground_truth']))
            pred_codes = set(convert_to_array(row['predict' if 'predict' in df.columns else 'generated_result']))
            
            for pred_code in pred_codes:
                total_predictions += 1
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
    else:
        true_col = 'label' if 'label' in df.columns else 'ground_truth'
        pred_col = 'predict' if 'predict' in df.columns else 'generated_result'
        
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[true_col])
            pred_codes = convert_to_array(row[pred_col])
            
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                total_predictions += 1
                if is_level_error(true_code, pred_code):
                    level_errors += 1
    
    error_rate = level_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'error_rate': error_rate
    }

def analyze_files(root_folder):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_level_errors(df, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_level_errors(df, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
    
    return results

folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto_enhance_only_you"
]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'level_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text) or text is None:
        return []
    if isinstance(text, str):
        if text.startswith('[') and text.endswith(']'):
            try:
                return json.loads(text)
            except json.JSONDecodeError:
                text = text.strip('[]').replace("'", "").replace('"', '')
                return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    elif isinstance(text, list):
        return text
    return []

def is_level_error(true_code, pred_code):
    """[Translated]"""
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def is_character_error(true_code, pred_code, icd_codes):
    """[Translated]"""
    if not true_code or not pred_code:
        return False
        
    # ICD code
    if pred_code not in icd_codes:
        return False
        
    if is_level_error(true_code, pred_code):
        return False
    
    true_match = re.match(r'([A-Z])(\d+.*)', str(true_code))
    pred_match = re.match(r'([A-Z])(\d+.*)', str(pred_code))
    
    if true_match and pred_match:
        true_letter, true_nums = true_match.groups()
        pred_letter, pred_nums = pred_match.groups()
        
        if true_letter == pred_letter and true_code != pred_code:
            if len(true_code) == len(pred_code):
                return True
    
    return False

def calculate_character_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    char_errors = 0
    
    true_col = 'label' if 'label' in df.columns else 'ground_truth'
    pred_col = 'predict' if 'predict' in df.columns else 'generated_result'
    
    if true_col not in df.columns or pred_col not in df.columns:
        print(f"Warning: Required columns not found. Available columns: {df.columns.tolist()}")
        return {'total_predictions': 0, 'character_errors': 0, 'error_rate': 0}
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row[true_col]))
            pred_codes = set(convert_to_array(row[pred_col]))
            
            if not true_codes or not pred_codes:
                continue
                
            total_predictions += len(pred_codes)
            
            matched_codes = true_codes.intersection(pred_codes)
            
            remaining_true = true_codes - matched_codes
            remaining_pred = pred_codes - matched_codes
            
            if remaining_pred:
                
                for pred_code in remaining_pred.copy():
                    for true_code in remaining_true.copy():
                        if is_level_error(true_code, pred_code):
                            matched_pairs.add((true_code, pred_code))
                            remaining_true.discard(true_code)
                            remaining_pred.discard(pred_code)
                
                for pred_code in remaining_pred:
                    for true_code in remaining_true:
                        if is_character_error(true_code, pred_code, icd_codes):
                            char_errors += 1
                
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row[true_col])
            pred_codes = convert_to_array(row[pred_col])
            
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                    total_predictions += 1
                    if is_character_error(true_code, pred_code, icd_codes):
                        char_errors += 1
    
    error_rate = char_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_predictions': total_predictions,
        'character_errors': char_errors,
        'error_rate': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """Analyze all files in the folder"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            # CSV file
            csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
            for filename in csv_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = pd.read_csv(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = 'multi' in folder_name.lower()
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['character_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    import traceback
            
            # JSONL file
            jsonl_files = [f for f in os.listdir(folder_path) if f.endswith('.jsonl') and 'trainer_log' not in f]
            for filename in jsonl_files:
                file_path = os.path.join(folder_path, filename)
                file_key = f"{folder_name}/{filename}"
                print(f"\nProcessing file: {file_key}")
                
                try:
                    df = load_jsonl(file_path)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    results[file_key] = error_stats
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['character_errors']}")
                    print(f"Error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    import traceback
    
    return results



folders = [
    "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto_enhance_only_you"
]

for folder in folders:
    print(f"\nProcessing file夹: {folder}")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'character_errors_{os.path.basename(folder)}.csv'
    results_df.round(4).to_csv(output_file)
    
    print("\n=== Summary of results ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os

def check_counts(path):
    """[Translated]"""
    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if os.path.isdir(folder_path):
            for file in os.listdir(folder_path):
                if file.endswith('.csv'):
                    df = pd.read_csv(os.path.join(folder_path, file))
                    total = len(df)
                    if 'multi' in file.lower():
                        wrong_count = sum(df['ground_truth'].str.count(',') != df['generated_result'].str.count(','))
                    else:
                        wrong_count = sum((df['generated_result'].str.count(',') > 0) | df['generated_result'].isna())
                    
                    print(f"{folder_name}/{file}: {wrong_count}/{total} = {wrong_count/total:.4f}")

path = "/Users/houzhen/research/LLMCoder/code/challange/auto/pure_auto_enhance_only_you"
check_counts(path)

# LLAMA 1b

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def calculate_existence_errors(df, icd_codes, is_multi=False, is_mimic=False):
    """[Translated]"""
    total_predictions = 0
    invalid_predictions = 0
    exact_matches = 0
    
    if is_multi or is_mimic:
        for _, row in df.iterrows():
            ground_truth = set(convert_to_array(row['label']))
            
            predicted_codes = set(extract_icd_codes(row['predict']))
            
            if ground_truth == predicted_codes:
                exact_matches += 1
            
            total_predictions += len(predicted_codes)
            invalid_predictions += sum(1 for code in predicted_codes if code not in icd_codes)
    else:
        for _, row in df.iterrows():
            
            if not predicted_codes:
                continue
                
            ground_truth_first = ground_truth[0] if ground_truth else None
            predicted_first = predicted_codes[0]
            
            if ground_truth_first == predicted_first:
                exact_matches += 1
                continue
                
            if predicted_first not in icd_codes:
                invalid_predictions += 1
    
    error_rate = invalid_predictions / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df),
        'remaining_predictions': total_predictions,
        'invalid_predictions': invalid_predictions,
        'error_rate_on_non_exact': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """[Translated]trained[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if not os.path.exists(prediction_file):
                prediction_file = os.path.join(folder_path, "predict.jsonl")
            
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mlti'])
                    is_mimic = 'mimic' in folder_name.lower()
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi, is_mimic)
                    results[folder_name] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Exact matches: {error_stats['exact_matches']}")
                    print(f"Exact match rate: {error_stats['exact_match_rate']:.4f}")
                    print(f"Remaining predictions: {error_stats['remaining_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate on non-exact matches: {error_stats['error_rate_on_non_exact']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/notrain"
]

for folder in llama_folders:
    print(f"\nProcessing Llama {model_size} trainedmodel")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'existence_errors_llama_{model_size}_trained.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} trained 结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_level_errors(df, is_multi=False, is_mimic=False):
    """Calculate hierarchical errors"""
    total_predictions = 0
    level_errors = 0
    exact_matches = 0
    
    if is_multi or is_mimic:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label']))
            pred_codes = set(extract_icd_codes(row['predict']))
            
            if true_codes == pred_codes:
                exact_matches += 1
                continue
            
            for pred_code in pred_codes:
                total_predictions += 1
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row['label'])
            pred_codes = extract_icd_codes(row['predict'])
            
            if true_codes and pred_codes:
                if true_codes[0] == pred_codes[0]:
                    exact_matches += 1
                    continue
                    
                total_predictions += 1
                if is_level_error(true_codes[0], pred_codes[0]):
                    level_errors += 1
    
    error_rate = level_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df) if len(df) > 0 else 0,
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'error_rate': error_rate
    }

def analyze_llama_folder(root_folder):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mlti'])
                    is_mimic = 'mimic' in folder_name.lower()
                    
                    error_stats = calculate_level_errors(df, is_multi, is_mimic)
                    results[folder_name] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Exact matches: {error_stats['exact_matches']}")
                    print(f"Exact match rate: {error_stats['exact_match_rate']:.4f}")
                    print(f"Predictions for hierarchical error evaluation: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Hierarchical error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/notrain"
]

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'level_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_character_error(true_code, pred_code, icd_codes):
    """[Translated]"""
    if pred_code not in icd_codes:
        return True
    
    if true_code == pred_code:
        return False
    
    true_base = re.match(r'([A-Z]\d+)', true_code)
    pred_base = re.match(r'([A-Z]\d+)', pred_code)
    
    if not true_base or not pred_base:
        return True
    
    true_chars = list(true_code)
    pred_chars = list(pred_code)
    
    if len(true_chars) != len(pred_chars):
        return True
    
    for t_char, p_char in zip(true_chars, pred_chars):
        if t_char != p_char:
            return True
    
    return False

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_character_errors(df, icd_codes, is_multi=False, is_mimic=False):
    """[Translated]"""
    total_predictions = 0
    exact_matches = 0
    
    if is_multi or is_mimic:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label']))
            pred_codes = set(extract_icd_codes(row['predict']))
            
            if true_codes == pred_codes:
                exact_matches += 1
                continue
            
            for pred_code in pred_codes:
                    continue
                    
                total_predictions += 1
                
                if pred_code not in icd_codes:
                    invalid_codes += 1
                    continue
                
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
                elif pred_code not in true_codes:
                    other_char_errors += 1
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row['label'])
            pred_codes = extract_icd_codes(row['predict'])
            
            if not true_codes or not pred_codes:
                continue
                
            true_code = true_codes[0]
            pred_code = pred_codes[0]
            
            if true_code == pred_code:
                exact_matches += 1
                continue
                
            total_predictions += 1
            
            if pred_code not in icd_codes:
                invalid_codes += 1
            elif is_level_error(true_code, pred_code):
                level_errors += 1
            else:
                other_char_errors += 1
    
    total_errors = level_errors + invalid_codes + other_char_errors
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df) if len(df) > 0 else 0,
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'level_error_rate': level_errors / total_predictions if total_predictions > 0 else 0,
        'invalid_codes': invalid_codes,
        'invalid_code_rate': invalid_codes / total_predictions if total_predictions > 0 else 0,
        'other_char_errors': other_char_errors,
        'other_error_rate': other_char_errors / total_predictions if total_predictions > 0 else 0,
        'total_error_rate': total_errors / total_predictions if total_predictions > 0 else 0
    }

def analyze_llama_folder(root_folder, icd_codes):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mlti'])
                    is_mimic = 'mimic' in folder_name.lower()
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi, is_mimic)
                    
                    results[folder_name] = {
                        'total_predictions': error_stats['total_predictions'],
                        'other_char_errors': error_stats['other_char_errors'],
                        'char_error_rate': error_stats['other_error_rate']
                    }
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['other_char_errors']}")
                    print(f"Character error rate: {error_stats['other_error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

def collect_valid_codes(folders):
    """[Translated]label[Translated]ICD[Translated]"""
    valid_codes = set()
    for folder in folders:
        for folder_name in os.listdir(folder):
            folder_path = os.path.join(folder, folder_name)
            
            if os.path.isdir(folder_path):
                prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
                if os.path.exists(prediction_file):
                    try:
                        df = load_jsonl(prediction_file)
                        for _, row in df.iterrows():
                            codes = convert_to_array(row['label'])
                            valid_codes.update(codes)
                    except Exception as e:
                        print(f"Processing fileerror: {str(e)}")
    return valid_codes

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/notrain"
]

# ICD code
print("Collecting valid ICD codes...")
valid_icd_codes = collect_valid_codes(llama_folders)
print(f"Collected {len(valid_icd_codes)} valid ICD codes")

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder, valid_icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'character_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 字符错误结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """[Translated]JSONL[Translated]"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """[Translated]ICD[Translated]"""
    if pd.isna(text):
        return []
    matches = re.findall(r'[A-Z]\d{2}\.?\d*[A-Z]?', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def check_counts_llama(path):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if os.path.isdir(folder_path):
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                try:
                    df = load_jsonl(prediction_file)
                    total = len(df)
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mlti'])
                    is_mimic = 'mimic' in folder_name.lower()
                    
                    wrong_count = 0
                    for idx, row in df.iterrows():
                        true_codes = convert_to_array(row['label'])
                        true_count = len(true_codes)
                        
                        pred_codes = extract_icd_codes(row['predict'])
                        
                        if is_multi or is_mimic:
                            if len(pred_codes) != true_count:
                                wrong_count += 1
                                    print(f"\nExample - {folder_name}:")
                                    print(f"Label ({true_count}): {row['label']}")
                                    print(f"Predict ({len(pred_codes)}): {row['predict']}")
                                    print(f"Extracted codes: {pred_codes}")
                        else:
                            if len(pred_codes) != 1:
                                wrong_count += 1
                                    print(f"\nExample - {folder_name}:")
                                    print(f"Label ({true_count}): {row['label']}")
                                    print(f"Predict ({len(pred_codes)}): {row['predict']}")
                                    print(f"Extracted codes: {pred_codes}")
                    
                    error_rate = wrong_count/total if total > 0 else 0
                    print(f"{folder_name}: {wrong_count}/{total} = {error_rate:.4f}")
                    
                    results[folder_name] = {
                        'total_samples': total,
                        'wrong_count': wrong_count,
                        'error_rate': error_rate
                    }
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/notrain",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/notrain"
]

all_results = {}
for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\n=== Llama {model_size} model ===")
    results = check_counts_llama(folder)
    
    results_df = pd.DataFrame.from_dict(results, orient='index')
    
    output_file = f'count_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} Summary of results ===")
    print(results_df.round(4).to_string())
    
    all_results[model_size] = results

all_results_df = pd.concat([pd.DataFrame.from_dict(res, orient='index').assign(model=size) 
                          for size, res in all_results.items()])
all_results_df.to_csv('count_errors_llama_all.csv')

print("\n=== 所有model汇总 ===")
for size, res in all_results.items():
    print(f"\n{size} model平均Error rate: {pd.DataFrame(res).loc['error_rate'].mean():.4f}")

# fine tuned LLAma 1b

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def calculate_existence_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    invalid_predictions = 0
    exact_matches = 0
    
    if is_multi:
        for _, row in df.iterrows():
            ground_truth = set(convert_to_array(row['label']))
            
            if 'mimic' in str(row.get('dataset', '')).lower():
                predicted_codes = set(extract_icd_codes(row['predict']))
            else:
                predicted_codes = set(convert_to_array(row['predict']))
            
            if ground_truth == predicted_codes:
                exact_matches += 1
            
            total_predictions += len(predicted_codes)
            invalid_predictions += sum(1 for code in predicted_codes if code not in icd_codes)
    else:
        for _, row in df.iterrows():
            pred_codes = set(convert_to_array(row['predict']))
            
            if ground_truth == pred_codes:
                exact_matches += 1
                continue
                
            total_predictions += len(pred_codes)
            invalid_predictions += sum(1 for code in pred_codes if code not in icd_codes)
    
    error_rate = invalid_predictions / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df),
        'remaining_predictions': total_predictions,
        'invalid_predictions': invalid_predictions,
        'error_rate_on_non_exact': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """[Translated]trained[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if not os.path.exists(prediction_file):
                prediction_file = os.path.join(folder_path, "predict.jsonl")
            
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[folder_name] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Exact matches: {error_stats['exact_matches']}")
                    print(f"Exact match rate: {error_stats['exact_match_rate']:.4f}")
                    print(f"Remaining predictions: {error_stats['remaining_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate on non-exact matches: {error_stats['error_rate_on_non_exact']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

# ICD code

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/train"
]

for folder in llama_folders:
    print(f"\nProcessing Llama {model_size} trainedmodel")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'existence_errors_llama_{model_size}_trained.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} trained 结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_level_errors(df, is_multi=False, is_mimic=False):
    """Calculate hierarchical errors"""
    total_predictions = 0
    level_errors = 0
    exact_matches = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label']))
            
            if is_mimic:
                pred_codes = set(extract_icd_codes(row['predict']))
            else:
                pred_codes = set(convert_to_array(row['predict']))
            
            if true_codes == pred_codes:
                exact_matches += 1
                continue
            
            for pred_code in pred_codes:
                total_predictions += 1
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row['label'])
            pred_codes = convert_to_array(row['predict'])
            
            if set(true_codes) == set(pred_codes):
                exact_matches += 1
                continue
                
            if true_codes and pred_codes:
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                total_predictions += 1
                if is_level_error(true_code, pred_code):
                    level_errors += 1
    
    error_rate = level_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df) if len(df) > 0 else 0,
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'error_rate': error_rate
    }

def analyze_llama_folder(root_folder):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    is_mimic = any(x in folder_name.lower() for x in ['mimic'])
                    
                    error_stats = calculate_level_errors(df, is_multi, is_mimic)
                    results[folder_name] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Exact matches: {error_stats['exact_matches']}")
                    print(f"Exact match rate: {error_stats['exact_match_rate']:.4f}")
                    print(f"Predictions for hierarchical error evaluation: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Hierarchical error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/train"
]

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'level_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_character_error(true_code, pred_code, icd_codes):
    """[Translated]"""
    if pred_code not in icd_codes:
        return True
    
    if true_code == pred_code:
        return False
    
    true_base = re.match(r'([A-Z]\d+)', true_code)
    pred_base = re.match(r'([A-Z]\d+)', pred_code)
    
    if not true_base or not pred_base:
        return True
    
    true_chars = list(true_code)
    pred_chars = list(pred_code)
    
    if len(true_chars) != len(pred_chars):
        return True
    
    for t_char, p_char in zip(true_chars, pred_chars):
        if t_char != p_char:
            return True
    
    return False

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_character_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    exact_matches = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label']))
            
            if 'mimic' in str(row.get('dataset', '')).lower() or 'mimic' in os.path.basename(os.getcwd()).lower():
                pred_codes = set(extract_icd_codes(row['predict']))
            else:
                pred_codes = set(convert_to_array(row['predict']))
            
            if true_codes == pred_codes:
                exact_matches += 1
                continue
            
            for pred_code in pred_codes:
                    continue
                    
                total_predictions += 1
                
                if pred_code not in icd_codes:
                    invalid_codes += 1
                    continue
                
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
                elif pred_code not in true_codes:
                    other_char_errors += 1
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row['label'])
            
            if 'mimic' in str(row.get('dataset', '')).lower() or 'mimic' in os.path.basename(os.getcwd()).lower():
                pred_codes = extract_icd_codes(row['predict'])
            else:
                pred_codes = convert_to_array(row['predict'])
            
            if set(true_codes) == set(pred_codes):
                exact_matches += 1
                continue
            
            if true_codes and pred_codes:
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                    continue
                    
                total_predictions += 1
                
                if pred_code not in icd_codes:
                    invalid_codes += 1
                elif is_level_error(true_code, pred_code):
                    level_errors += 1
                elif pred_code != true_code:
                    other_char_errors += 1
    
    total_errors = level_errors + invalid_codes + other_char_errors
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df) if len(df) > 0 else 0,
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'level_error_rate': level_errors / total_predictions if total_predictions > 0 else 0,
        'invalid_codes': invalid_codes,
        'invalid_code_rate': invalid_codes / total_predictions if total_predictions > 0 else 0,
        'other_char_errors': other_char_errors,
        'other_error_rate': other_char_errors / total_predictions if total_predictions > 0 else 0,
        'total_error_rate': total_errors / total_predictions if total_predictions > 0 else 0
    }

def analyze_llama_folder(root_folder, icd_codes):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    
                    results[folder_name] = {
                        'total_predictions': error_stats['total_predictions'],
                        'other_char_errors': error_stats['other_char_errors'],
                        'char_error_rate': error_stats['other_error_rate']
                    }
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['other_char_errors']}")
                    print(f"Character error rate: {error_stats['other_error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder, valid_icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'char_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 字符错误结果 ===")
    print(results_df.round(4).to_string())

# ICD code
def collect_valid_codes(folders):
    valid_codes = set()
    for folder in folders:
        for folder_name in os.listdir(folder):
            folder_path = os.path.join(folder, folder_name)
            
            if os.path.isdir(folder_path):
                prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
                if os.path.exists(prediction_file):
                    try:
                        df = load_jsonl(prediction_file)
                        for _, row in df.iterrows():
                            codes = convert_to_array(row['label'])
                            valid_codes.update(codes)
                    except Exception as e:
                        print(f"Processing fileerror: {str(e)}")
    return valid_codes

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/train"
]

# ICD code
print("Collecting valid ICD codes...")
valid_icd_codes = collect_valid_codes(llama_folders)
print(f"Collected {len(valid_icd_codes)} valid ICD codes")

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder, valid_icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'char_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 字符错误结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """[Translated]JSONL[Translated]"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """[Translated]ICD[Translated]"""
    if pd.isna(text):
        return []
    matches = re.findall(r'[A-Z]\d{2}\.?\d*[A-Z]?', str(text))
    return [match.replace('.', '') for match in matches]

def get_code_count(text):
    """[Translated]"""
    if pd.isna(text):
        return 0
    if isinstance(text, str):
        if text.startswith('['):
            cleaned = text.strip('[]').replace("'", "").replace('"', '')
            return len([x for x in cleaned.split(',') if x.strip()])
        elif re.match(r'^[A-Z]\d{2}\.?\d*[A-Z]?$', text.strip()):
            return 1
        else:
            codes = extract_icd_codes(text)
            return len(codes)
    return 0

def check_counts_llama(path):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if os.path.isdir(folder_path):
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                df = load_jsonl(prediction_file)
                total = len(df)
                
                is_mimic = 'mimic' in folder_name.lower()
                
                wrong_count = 0
                for _, row in df.iterrows():
                    label_count = get_code_count(row['label'])
                    
                    if is_mimic:
                        pred_codes = extract_icd_codes(row['predict'])
                        pred_count = len(pred_codes)
                    else:
                        pred_count = get_code_count(row['predict'])
                    
                    if label_count != pred_count:
                        wrong_count += 1
                            print(f"\nExample - {folder_name}:")
                            print(f"Label ({label_count}): {row['label']}")
                            print(f"Predict ({pred_count}): {row['predict']}")
                            if is_mimic or not row['predict'].startswith('['):
                                extracted_codes = extract_icd_codes(row['predict'])
                                print(f"Extracted codes: {extracted_codes}")
                
                error_rate = wrong_count/total if total > 0 else 0
                print(f"{folder_name}: {wrong_count}/{total} = {error_rate:.4f}")
                
                results[folder_name] = {
                    'total_samples': total,
                    'wrong_count': wrong_count,
                    'error_rate': error_rate
                }
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/1b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/3b/train",
    "/Users/houzhen/research/LLMCoder/code/llama_auto/8b/train"
]

all_results = {}
for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\n=== Llama {model_size} model ===")
    results = check_counts_llama(folder)
    
    results_df = pd.DataFrame.from_dict(results, orient='index')
    
    output_file = f'count_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} Summary of results ===")
    print(results_df.round(4).to_string())
    
    all_results[model_size] = results

all_results_df = pd.concat([pd.DataFrame.from_dict(res, orient='index').assign(model=size) 
                          for size, res in all_results.items()])
all_results_df.to_csv('count_errors_llama_all.csv')

print("\n=== 所有model汇总 ===")
for size, res in all_results.items():
    print(f"\n{size} model平均Error rate: {pd.DataFrame(res).loc['error_rate'].mean():.4f}")

# enhance llama 

In [None]:
import json

input_file_path = '/Users/houzhen/research/LLMCoder/code/llama_auto/enhance/mimc_test/generated_predictions.jsonl'
output_file_path = '/Users/houzhen/research/LLMCoder/code/llama_auto/enhance/mimc_test/modified_predictions.jsonl'

modified_data = []

with open(input_file_path, 'r') as file:
    for line in file:
        data = json.loads(line)
        
        if 'label' in data:
            if isinstance(data['label'], str):
                labels = data['label'].split(', ')
            elif isinstance(data['label'], list):
                data['label'] = json.dumps(data['label'])
        
        if 'predict' in data:
            if isinstance(data['predict'], str):
                predicts = data['predict'].split(', ')
                data['predict'] = json.dumps(predicts)
            elif isinstance(data['predict'], list):
                data['predict'] = json.dumps(data['predict'])
        

with open(output_file_path, 'w') as file:
    for item in modified_data:

print("File successfully saved to:", output_file_path)


In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def calculate_existence_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    invalid_predictions = 0
    exact_matches = 0
    
    if is_multi:
        for _, row in df.iterrows():
            ground_truth = set(convert_to_array(row['label']))
            
            if 'mimic' in str(row.get('dataset', '')).lower():
                predicted_codes = set(extract_icd_codes(row['predict']))
            else:
                predicted_codes = set(convert_to_array(row['predict']))
            
            if ground_truth == predicted_codes:
                exact_matches += 1
            
            total_predictions += len(predicted_codes)
            invalid_predictions += sum(1 for code in predicted_codes if code not in icd_codes)
    else:
        for _, row in df.iterrows():
            pred_codes = set(convert_to_array(row['predict']))
            
            if ground_truth == pred_codes:
                exact_matches += 1
                continue
                
            total_predictions += len(pred_codes)
            invalid_predictions += sum(1 for code in pred_codes if code not in icd_codes)
    
    error_rate = invalid_predictions / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df),
        'remaining_predictions': total_predictions,
        'invalid_predictions': invalid_predictions,
        'error_rate_on_non_exact': error_rate
    }

def analyze_files(root_folder, icd_codes):
    """[Translated]trained[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if not os.path.exists(prediction_file):
                prediction_file = os.path.join(folder_path, "predict.jsonl")
            
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_existence_errors(df, icd_codes, is_multi)
                    results[folder_name] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Exact matches: {error_stats['exact_matches']}")
                    print(f"Exact match rate: {error_stats['exact_match_rate']:.4f}")
                    print(f"Remaining predictions: {error_stats['remaining_predictions']}")
                    print(f"Invalid predictions: {error_stats['invalid_predictions']}")
                    print(f"Error rate on non-exact matches: {error_stats['error_rate_on_non_exact']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

# ICD code

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/enhance",
]

for folder in llama_folders:
    print(f"\nProcessing Llama {model_size} trainedmodel")
    results = analyze_files(folder, icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'existence_errors_llama_{model_size}_trained.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} trained 结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_level_errors(df, is_multi=False, is_mimic=False):
    """Calculate hierarchical errors"""
    total_predictions = 0
    level_errors = 0
    exact_matches = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label']))
            
            if is_mimic:
                pred_codes = set(extract_icd_codes(row['predict']))
            else:
                pred_codes = set(convert_to_array(row['predict']))
            
            if true_codes == pred_codes:
                exact_matches += 1
                continue
            
            for pred_code in pred_codes:
                total_predictions += 1
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row['label'])
            pred_codes = convert_to_array(row['predict'])
            
            if set(true_codes) == set(pred_codes):
                exact_matches += 1
                continue
                
            if true_codes and pred_codes:
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                total_predictions += 1
                if is_level_error(true_code, pred_code):
                    level_errors += 1
    
    error_rate = level_errors / total_predictions if total_predictions > 0 else 0
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df) if len(df) > 0 else 0,
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'error_rate': error_rate
    }

def analyze_llama_folder(root_folder):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    is_mimic = any(x in folder_name.lower() for x in ['mimic'])
                    
                    error_stats = calculate_level_errors(df, is_multi, is_mimic)
                    results[folder_name] = error_stats
                    
                    print(f"Total samples: {error_stats['total_samples']}")
                    print(f"Exact matches: {error_stats['exact_matches']}")
                    print(f"Exact match rate: {error_stats['exact_match_rate']:.4f}")
                    print(f"Predictions for hierarchical error evaluation: {error_stats['total_predictions']}")
                    print(f"Hierarchical errors: {error_stats['level_errors']}")
                    print(f"Hierarchical error rate: {error_stats['error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/enhance",
]

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'level_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """Load JSONL file and convert to DataFrame"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """Extract all ICD codes from text"""
    if pd.isna(text):
        return []
    # ICD code
    matches = re.findall(r'[A-Z]\d{2}\.?\d*', str(text))
    return [match.replace('.', '') for match in matches]

def convert_to_array(text):
    """Convert a string-formatted array into an actual array"""
    if pd.isna(text):
        return []
    if isinstance(text, str):
        if text.startswith('['):
            text = text.strip('[]').replace("'", "").replace('"', '')
            return [item.strip() for item in text.split(',') if item.strip()]
        else:
            return [text.strip()]
    return []

def is_character_error(true_code, pred_code, icd_codes):
    """[Translated]"""
    if pred_code not in icd_codes:
        return True
    
    if true_code == pred_code:
        return False
    
    true_base = re.match(r'([A-Z]\d+)', true_code)
    pred_base = re.match(r'([A-Z]\d+)', pred_code)
    
    if not true_base or not pred_base:
        return True
    
    true_chars = list(true_code)
    pred_chars = list(pred_code)
    
    if len(true_chars) != len(pred_chars):
        return True
    
    for t_char, p_char in zip(true_chars, pred_chars):
        if t_char != p_char:
            return True
    
    return False

def is_level_error(true_code, pred_code):
    if not true_code or not pred_code:
        return False
        return False
    return true_code in pred_code or pred_code in true_code

def calculate_character_errors(df, icd_codes, is_multi=False):
    """[Translated]"""
    total_predictions = 0
    exact_matches = 0
    
    if is_multi:
        for _, row in df.iterrows():
            true_codes = set(convert_to_array(row['label']))
            
            if 'mimic' in str(row.get('dataset', '')).lower() or 'mimic' in os.path.basename(os.getcwd()).lower():
                pred_codes = set(extract_icd_codes(row['predict']))
            else:
                pred_codes = set(convert_to_array(row['predict']))
            
            if true_codes == pred_codes:
                exact_matches += 1
                continue
            
            for pred_code in pred_codes:
                    continue
                    
                total_predictions += 1
                
                if pred_code not in icd_codes:
                    invalid_codes += 1
                    continue
                
                if any(is_level_error(true_code, pred_code) for true_code in true_codes):
                    level_errors += 1
                elif pred_code not in true_codes:
                    other_char_errors += 1
    else:
        for _, row in df.iterrows():
            true_codes = convert_to_array(row['label'])
            
            if 'mimic' in str(row.get('dataset', '')).lower() or 'mimic' in os.path.basename(os.getcwd()).lower():
                pred_codes = extract_icd_codes(row['predict'])
            else:
                pred_codes = convert_to_array(row['predict'])
            
            if set(true_codes) == set(pred_codes):
                exact_matches += 1
                continue
            
            if true_codes and pred_codes:
                true_code = true_codes[0]
                pred_code = pred_codes[0]
                
                    continue
                    
                total_predictions += 1
                
                if pred_code not in icd_codes:
                    invalid_codes += 1
                elif is_level_error(true_code, pred_code):
                    level_errors += 1
                elif pred_code != true_code:
                    other_char_errors += 1
    
    total_errors = level_errors + invalid_codes + other_char_errors
    
    return {
        'total_samples': len(df),
        'exact_matches': exact_matches,
        'exact_match_rate': exact_matches / len(df) if len(df) > 0 else 0,
        'total_predictions': total_predictions,
        'level_errors': level_errors,
        'level_error_rate': level_errors / total_predictions if total_predictions > 0 else 0,
        'invalid_codes': invalid_codes,
        'invalid_code_rate': invalid_codes / total_predictions if total_predictions > 0 else 0,
        'other_char_errors': other_char_errors,
        'other_error_rate': other_char_errors / total_predictions if total_predictions > 0 else 0,
        'total_error_rate': total_errors / total_predictions if total_predictions > 0 else 0
    }

def analyze_llama_folder(root_folder, icd_codes):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(root_folder):
        folder_path = os.path.join(root_folder, folder_name)
        
        if os.path.isdir(folder_path):
            print(f"\n{'='*50}")
            print(f"Analyzing folder: {folder_name}")
            print(f"{'='*50}")
            
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                print(f"\nProcessing file: {prediction_file}")
                
                try:
                    df = load_jsonl(prediction_file)
                    print(f"Number of rows: {len(df)}")
                    
                    is_multi = any(x in folder_name.lower() for x in ['multi', 'mimic', 'mlti'])
                    
                    error_stats = calculate_character_errors(df, icd_codes, is_multi)
                    
                    results[folder_name] = {
                        'total_predictions': error_stats['total_predictions'],
                        'other_char_errors': error_stats['other_char_errors'],
                        'char_error_rate': error_stats['other_error_rate']
                    }
                    
                    print(f"Total predictions: {error_stats['total_predictions']}")
                    print(f"Character errors: {error_stats['other_char_errors']}")
                    print(f"Character error rate: {error_stats['other_error_rate']:.4f}")
                    
                except Exception as e:
                    print(f"Processing fileerror: {str(e)}")
                    if 'df' in locals():
                        print(f"File column names: {df.columns.tolist()}")
    
    return results

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder, valid_icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'char_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 字符错误结果 ===")
    print(results_df.round(4).to_string())

# ICD code
def collect_valid_codes(folders):
    valid_codes = set()
    for folder in folders:
        for folder_name in os.listdir(folder):
            folder_path = os.path.join(folder, folder_name)
            
            if os.path.isdir(folder_path):
                prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
                if os.path.exists(prediction_file):
                    try:
                        df = load_jsonl(prediction_file)
                        for _, row in df.iterrows():
                            codes = convert_to_array(row['label'])
                            valid_codes.update(codes)
                    except Exception as e:
                        print(f"Processing fileerror: {str(e)}")
    return valid_codes

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/enhance",
]

# ICD code
print("Collecting valid ICD codes...")
valid_icd_codes = collect_valid_codes(llama_folders)
print(f"Collected {len(valid_icd_codes)} valid ICD codes")

for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\nProcessing Llama {model_size} model")
    results = analyze_llama_folder(folder, valid_icd_codes)
    
    # and save
    results_df = pd.DataFrame.from_dict(results, orient='index')
    output_file = f'char_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} 字符错误结果 ===")
    print(results_df.round(4).to_string())

In [None]:
import pandas as pd
import os
import json
import re

def load_jsonl(file_path):
    """[Translated]JSONL[Translated]"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return pd.DataFrame(data)

def extract_icd_codes(text):
    """[Translated]ICD[Translated]"""
    if pd.isna(text):
        return []
    matches = re.findall(r'[A-Z]\d{2}\.?\d*[A-Z]?', str(text))
    return [match.replace('.', '') for match in matches]

def get_code_count(text):
    """[Translated]"""
    if pd.isna(text):
        return 0
    if isinstance(text, str):
        if text.startswith('['):
            cleaned = text.strip('[]').replace("'", "").replace('"', '')
            return len([x for x in cleaned.split(',') if x.strip()])
        elif re.match(r'^[A-Z]\d{2}\.?\d*[A-Z]?$', text.strip()):
            return 1
        else:
            codes = extract_icd_codes(text)
            return len(codes)
    return 0

def check_counts_llama(path):
    """[Translated]LLAMA[Translated]"""
    results = {}
    
    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if os.path.isdir(folder_path):
            prediction_file = os.path.join(folder_path, "generated_predictions.jsonl")
            if os.path.exists(prediction_file):
                df = load_jsonl(prediction_file)
                total = len(df)
                
                is_mimic = 'mimic' in folder_name.lower()
                
                wrong_count = 0
                for _, row in df.iterrows():
                    label_count = get_code_count(row['label'])
                    
                    if is_mimic:
                        pred_codes = extract_icd_codes(row['predict'])
                        pred_count = len(pred_codes)
                    else:
                        pred_count = get_code_count(row['predict'])
                    
                    if label_count != pred_count:
                        wrong_count += 1
                            print(f"\nExample - {folder_name}:")
                            print(f"Label ({label_count}): {row['label']}")
                            print(f"Predict ({pred_count}): {row['predict']}")
                            if is_mimic or not row['predict'].startswith('['):
                                extracted_codes = extract_icd_codes(row['predict'])
                                print(f"Extracted codes: {extracted_codes}")
                
                error_rate = wrong_count/total if total > 0 else 0
                print(f"{folder_name}: {wrong_count}/{total} = {error_rate:.4f}")
                
                results[folder_name] = {
                    'total_samples': total,
                    'wrong_count': wrong_count,
                    'error_rate': error_rate
                }
    
    return results

llama_folders = [
    "/Users/houzhen/research/LLMCoder/code/llama_auto/enhance",
]

all_results = {}
for folder in llama_folders:
    model_size = folder.split('/')[-1]
    print(f"\n=== Llama {model_size} model ===")
    results = check_counts_llama(folder)
    
    results_df = pd.DataFrame.from_dict(results, orient='index')
    
    output_file = f'count_errors_llama_{model_size}.csv'
    results_df.round(4).to_csv(output_file)
    
    print(f"\n=== Llama {model_size} Summary of results ===")
    print(results_df.round(4).to_string())
    
    all_results[model_size] = results

all_results_df = pd.concat([pd.DataFrame.from_dict(res, orient='index').assign(model=size) 
                          for size, res in all_results.items()])
all_results_df.to_csv('count_errors_llama_all.csv')

print("\n=== 所有model汇总 ===")
for size, res in all_results.items():
    print(f"\n{size} model平均Error rate: {pd.DataFrame(res).loc['error_rate'].mean():.4f}")

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

error_types = ['Nonexistent Code Generation', 'Level Errors', 'Character Errors', 'Quantity Errors']



plt.figure(figsize=(12, 6))


plt.xlabel('Error Types', fontsize=14)
plt.ylabel('Total Error Counts', fontsize=14)

plt.title('Total Error Counts by Error Type and Training Stage', fontsize=16)

for i in range(len(error_types)):
    plt.text(x[i] - bar_width, target[i], f'{target[i]:,}', ha='center', va='bottom', fontsize=10)
    plt.text(x[i], pretrain[i], f'{pretrain[i]:,}', ha='center', va='bottom', fontsize=10)
    plt.text(x[i] + bar_width, train[i], f'{train[i]:,}', ha='center', va='bottom', fontsize=10)

plt.legend(fontsize=12)

plt.tight_layout()

plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

data_1b = [33642, 9963, 8415, 20009]
data_3b = [23178, 4715, 7484, 19697]
data_8b = [22450, 4045, 8512, 19949]
data_target = [1000, 1000, 1000, 1000]


fig, ax = plt.subplots(figsize=(12, 6))

ax.bar(index, data_target, bar_width, label='Target Fine-tuning', color=target_color)
ax.bar(index + 1 * bar_width, data_1b, bar_width, label='1B Pre-train', color=pre_train_colors[0])
ax.bar(index + 2 * bar_width, data_3b, bar_width, label='3B Pre-train', color=pre_train_colors[1])
ax.bar(index + 3 * bar_width, data_8b, bar_width, label='8B Pre-train', color=pre_train_colors[2])
ax.bar(index + 4 * bar_width, [x * 3 for x in data_1b], bar_width, label='1B Train', color=train_colors[0])
ax.bar(index + 5 * bar_width, [x * 3 for x in data_3b], bar_width, label='3B Train', color=train_colors[1])
ax.bar(index + 6 * bar_width, [x * 3 for x in data_8b], bar_width, label='8B Train', color=train_colors[2])

ax.set_xlabel('Error Type')
ax.set_xticklabels(['Non-existence Errors', 'Hierarchy-Level Errors', 'Quantity Mismatches', 'Character Errors'])

ax.set_ylabel('Total Errors')


ax.yaxis.grid(True, linestyle='--', which='major', color='gray', alpha=0.5)

ax.legend()

plt.show()
