In [None]:
import pandas as pd
import numpy as np
import sys
import os
import json
import datetime
import time

In [None]:
# General Utilities
def read_excel_file(file_path):
    """
    Opens and reads input file Excel document
    :param file_path: Path to the input Excel file
    :return: DataFrame containing the contents of the Excel file
    """
    # check if file exists
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Input file {file_path} does not exist")
    
    # check if file is an Excel file
    if not file_path.endswith(('.xls', '.xlsx')):
        raise ValueError(f"Input file {file_path} is not an Excel file")

    # try reading the Excel file and saving it as a DataFrame
    try:
        df = pd.read_excel(file_path)

    # raise a ValueError for any other exceptions that occur when reading the file 
    # with clearer error message
    except Exception as e:
        raise ValueError(f"Error reading Excel file {file_path}: {e}")
    return df

def gen_outputfilename(name_type):
    """Generates a timestamped output filename based on the name type
    :param name_type: Type of the output file as string (e.g., 'ground_truth_scores')
    :return: Timestamped output filename
    """
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_filename = f"{name_type}_{timestamp}.json"
    return output_filename

def write_json_file(json_data, output_filename):
    """
    Writes the JSON data to a file
    :param json_data: JSON data as a dictionary
    :param output_filename: Path to the output JSON file
    """
    if json_data is None:
        print("No data to write to JSON file.")
        return

    try:
        # Write to JSON file
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(json_data, f, indent=2, ensure_ascii=False)
        print(f"JSON data written to {output_filename}")
        return output_filename
    except IOError as e:
        print(f"Error writing JSON file: {e}")

In [None]:
disease_cols_canine_thorax = [
    'perihilar_infiltrate', 'pneumonia', 'bronchitis', 'interstitial', 'diseased_lungs',
    'hypo_plastic_trachea', 'cardiomegaly', 'pulmonary_nodules', 'pleural_effusion', 'rtm',
    'focal_caudodorsal_lung', 'focal_perihilar', 'pulmonary_hypoinflation', 'right_sided_cardiomegaly',
    'pericardial_effusion', 'bronchiectasis', 'pulmonary_vessel_enlargement', 'left_sided_cardiomegaly',
    'thoracic_lymphadenopathy', 'esophagitis', 'vhs_v2'
]

disease_cols_feline_thorax = [
    'pulmonary_nodules', 'esophagitis', 'pneumonia', 'bronchitis', 'interstitial', 'diseased_lungs',
    'hypo_plastic_trachea', 'cardiomegaly', 'pleural_effusion', 'perihilar_infiltrate', 'rtm',
    'focal_caudodorsal_lung', 'right_sided_cardiomegaly', 'focal_perihilar', 'left_sided_cardiomegaly',
    'bronchiectasis', 'pulmonary_vessel_enlargement', 'thoracic_lymphadenopathy', 'pulmonary_hypoinflation',
    'pericardial_effusion', 'Fe_Alveolar'
]

## Make JSON file of ground truth scores

In [None]:
def extract_cols_excel_file(excel_file_path, required_cols_excel, disease_columns):
    """    Verifies the structure of the Excel file and extracts relevant columns
    :param excel_file_path: Path to the Excel file
    :return: file path, DataFrame with relevant columns, and a list of disease columns
    """
    try:
        # Read the Excel file
        df = read_excel_file(excel_file_path)
        print(f"Loaded Excel file with {df.shape[0]} rows and {df.shape[1]} columns")
        
        # Check that all required columns exist
        required_columns = required_cols_excel + disease_columns
        missing_columns = [col for col in required_columns if col not in df.columns]
        
        if missing_columns:
            print(f"Error: Missing columns: {missing_columns}")
            return None, None
        
        # Extract relevant columns and remove rows with no findings
        extracted_df = df[required_columns].copy()
        extracted_df = extracted_df.dropna(subset=['Findings'])  # Remove rows with no findings

        return excel_file_path, extracted_df, disease_columns
    
    except FileNotFoundError as e:
        print(f"Error: {e}")
        return None, None, None

def json_setup(excel_file_path, len_extracted_df, disease_columns, model_name=None):
    """
    Sets up a JSON structure from the extracted DataFrame
    :param excel_file_path: Path to the reference Excel or json file
    :param extracted_df: length of DataFrame with relevant columns
    :param disease_columns: List of disease columns
    :return: JSON structure as a dictionary
    """
    json_data = {
        "source_file": excel_file_path,
        "extraction_date": datetime.datetime.now().isoformat(),
        "total_cases": len_extracted_df,
        "disease_columns": disease_columns,
        "model_name": model_name if model_name else "ground truth",
        "cases": []
    }
    return json_data

def json_structure(excel_file_path, extracted_df, disease_columns):
    """
    Creates a JSON structure from the extracted DataFrame
    :param excel_file_path: Path to the Excel file
    :param extracted_df: DataFrame with relevant columns
    :param disease_columns: List of disease columns
    :return: JSON structure as a dictionary
    """
    json_data = json_setup(excel_file_path, len(extracted_df), disease_columns)
    if json_data is None:
        return None
    # populate cases in df
    # Add each case with CaseID, findings, and all disease classifications
    for index, row in extracted_df.iterrows():
        case_data = {
            "case_id": str(row['CaseID']).strip(),
            "findings": str(row['Findings']).strip()
        }

        # Add all disease classifications
        for disease in disease_columns:
            case_data[disease] = str(row[disease]).strip() if pd.notna(row[disease]) else "Unknown" # Positive/Negative value to str and removing whitespace (strip)

        json_data["cases"].append(case_data)

    return json_data

In [None]:
# used to extract from ground truth scores excel file
def extract_scores_from_excel(disease_columns, excel_file_path, output_filename=None):
    """
    Extract CaseID, Findings, and all 21 disease classifications from Excel file and convert to JSON format
    :param excel_file_path: Path to the Excel file
    :param output_filename: Optional custom filename for output JSON
    :return: DataFrame with all data, and output filename
    """
    
    try:
        excel_file_path, extracted_df, disease_columns = extract_cols_excel_file(excel_file_path, ['CaseID', 'Findings'], disease_columns)
        
        json_data = json_structure(excel_file_path, extracted_df, disease_columns)

        # Generate output filename if not provided
        if output_filename is None:
            output_filename = gen_outputfilename("ground_truth_scores")

        write_json_file(json_data, output_filename)
        
        return extracted_df, output_filename
        
    except Exception as e:
        print(f"Error processing Excel file: {e}")
        return None, None

## AI Scoring

In [None]:
# prompt for Google API
def read_prompt_template(prompt_filename):
    """
    Reads a prompt template from an .txt file
    :param prompt_filename: The filename of the prompt template
    :return: The content of the prompt template as a string
    """
    try:
        with open(prompt_filename, 'r') as file:
            prompt_template = file.read()
        return prompt_template
    except FileNotFoundError:
        print(f"Prompt template file '{prompt_filename}' not found.")
        sys.exit(1)

def create_prompt(prompt_template, findings):
    """
    Creates a prompt for the Google Generative AI API based on the findings
    :param prompt_template: The prompt template as a string
    :param findings: The radiology findings to include in the prompt
    :return: The formatted prompt string
    """
    prompt = (
        f"{prompt_template}\n"
        f"Radiology Findings: {findings}\n"
    )
    
    return prompt

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
my_api_key = os.getenv('GOOGLE_API_KEY')

if not my_api_key:
    print("Please set GOOGLE_API_KEY in your .env file")
    my_api_key = input("Enter your Google API key: ")

In [None]:
import google.generativeai as genai

def call_google_api(prompt, model='gemini-2.0-flash-lite', api_key=None):
    """
    Calls the Google Generative AI API with the given prompt
    :param prompt: The prompt to send to the Google Generative AI API
    :param model: The model to use for the API call
    :param api_key: The API key for authentication
    :return: The response from the Google Generative AI API
    """
    genai.configure(api_key=api_key)
    model = genai.GenerativeModel(model)

    generation_config = genai.types.GenerationConfig(
        temperature=0.2  # adjust creativity (0.0-2.0)
    )

    try:
        response = model.generate_content(prompt, generation_config=generation_config)
        response_text = response.text
        
        # Clean up markdown formatting if present
        if response_text.startswith('```json'):
            # Remove ```json from start and ``` from end
            response_text = response_text.replace('```json', '').replace('```', '').strip()
        elif response_text.startswith('```'):
            # Remove ``` from start and end
            response_text = response_text.replace('```', '').strip()
        if 'summary' in response_text or '**' in response_text or '***' in response_text:
            # Remove any markdown formatting or summary text
            response_text = response_text.replace('**', '').replace('***', '').replace('in summary', '').strip()
        # Remove any text or reading/trailing whitespace after the final closing brace
        if response_text.endswith('}'):
            response_text = response_text[:response_text.rfind('}') + 1].strip()
        
        return response_text
        
    except Exception as e:
        print(f"API Error: {e}")
        return f"Error: {e}"

In [None]:
# Extract CaseID and Findings from ground truth json file
def read_from_json(json_file_path, max_cases=None):
    try:
        # Read the ground truth JSON file
        with open(json_file_path, 'r', encoding='utf-8') as f:
            ground_truth_data = json.load(f)
        
        cases = ground_truth_data['cases']
        if max_cases:
            cases = cases[:max_cases]
        return ground_truth_data, cases
    except FileNotFoundError:
        print(f"Error: JSON file {json_file_path} not found.")
        return None, None

def call_with_prompt(prompt_template, findings, case_id, api_key):
    """
    Calls the Google Generative AI API with a prompt
    :param prompt_template: The prompt template as a string
    :param findings: The radiology findings to include in the prompt
    :return: The response from the API
    """
    try:
        prompt = create_prompt(prompt_template, findings)
            
        print(f"📡 Making API call for case {case_id}")
        response = call_google_api(prompt, api_key=api_key)
        print(f"✅ API response received for case {case_id}")
        time.sleep(2)  # Sleep to avoid hitting API rate limits
        return response
    
    except Exception as e:
        print(f"Error calling API for case {case_id}: {e}")
        response = None
    
def parse_response_as_json(response, case_id, disease_cols):
    """
    Parses the API response as JSON
    :param response: The response from the API
    :param case_id: The CaseID for the current case
    :param disease_cols: List of disease columns to include in the output
    :return: Parsed JSON data or None if parsing fails
    """
    try:
        ai_classifications = json.loads(response)
                
        # Create case data in ground truth format
        ai_case_data = {
            "case_id": case_id,
        }

        # Add AI predictions for each disease
        for disease in disease_cols:
            if disease in ai_classifications:
                ai_case_data[disease] = ai_classifications[disease]
            else:
                ai_case_data[disease] = "Unknown"  # Fallback if AI didn't provide this disease

        return ai_case_data

    except json.JSONDecodeError:
        print(f"Error parsing API response for case {case_id}")
        return None

def handle_parse_error(e, response, case_id, disease_cols):
    """
    Handles parsing errors and returns a failed case entry
    :param e: The exception raised during parsing
    :param response: The API response that caused the error
    :param case_id: The CaseID for the current case
    :param disease_cols: List of disease columns to include in the output
    :return: A dictionary with the case ID and an error message
    """
    print(f"  Warning: Failed to parse AI response for case {case_id}")
    print(f"  Error: {e}")
    print(response)  # Print the raw response for debugging

    # Create case with all Unknown values
    ai_case_data = {
       "case_id": case_id,
    }

    # Set all diseases to Unknown for failed cases
    for disease in disease_cols:
       ai_case_data[disease] = "Unknown"
    
    return ai_case_data

    
def summary(successful_predictions, failed_predictions, output_filename):
    """
    Prints a summary of the AI processing results
    :param successful_predictions: Number of successful predictions
    :param failed_predictions: Number of failed predictions
    :param output_filename: Name of the output file where predictions are saved
    """
    print(f"\nAI processing complete!")
    print(f"Successful predictions: {successful_predictions}")
    print(f"Failed predictions: {failed_predictions}")
    print(f"AI predictions saved to: {output_filename}")

In [None]:
def ai_scorings(ground_truth_json_path, api_key, model_type="google", max_cases=None, output_filename=None, prompt_filename="/Users/Emily/radiology-project/Prompts/canine_thorax_prompt.txt"):
    """
    Process cases and create AI predictions in the same format as ground truth scores
    :param ground_truth_json_path: Path to your ground truth JSON file with cases
    :param api_key: API key for AI service
    :param model_type: "google" or "openai"  
    :param max_cases: Optional limit on number of cases to process
    :param output_filename: Optional custom filename for AI predictions
    :return: AI predictions in ground truth format
    """
    
    prompt_template = read_prompt_template(prompt_filename)

    print(f"ai_scorings function called at {datetime.datetime.now()}")

    try:
        # Read the ground truth JSON file
        with open(ground_truth_json_path, 'r', encoding='utf-8') as f:
            ground_truth_data = json.load(f)
        
        cases = ground_truth_data['cases']
        if max_cases:
            cases = cases[:max_cases]

        # Create AI predictions structure matching ground truth format
        ai_predictions = json_setup(ground_truth_data['source_file'], len(cases), ground_truth_data['disease_columns'])
        
        successful_predictions = 0
        failed_predictions = 0
        
        for i, case in enumerate(cases, 1):
            case_id = case['case_id']
            findings_text = case['findings']

            print(f"[AI_SCORINGS] Processing case {i}/{len(cases)} - CaseID: {case_id}")

            response = call_with_prompt(prompt_template, findings_text, case_id, api_key)
            
            
            # Try to parse AI response as JSON
            try:
                ai_case_data = parse_response_as_json(response, case_id, ground_truth_data['disease_columns'])

                if ai_case_data:
                    successful_predictions += 1
                    print(f" Successfully processed case {case_id}")

            except json.JSONDecodeError as e:
                ai_case_data = handle_parse_error(e, response, case_id, disease_cols=ground_truth_data['disease_columns'])
                failed_predictions += 1
            
            ai_predictions["cases"].append(ai_case_data)

        # Generate output filename if not provided
        if output_filename is None:
            output_filename = gen_outputfilename(f"ai_predictions_{model_type}")
        
        # Write AI predictions to JSON file
        write_json_file(ai_predictions, output_filename)
        
        summary(successful_predictions, failed_predictions, output_filename)
        
        return ai_predictions, output_filename
        
    except Exception as e:
        print(f"Error processing cases: {e}")
        return None, None

## Confusion Matrix

In [None]:
# Counts number of positive and negative cases for each disease in a JSON file
def count_pos_neg(json_file, disease):
    """
    Counts the number of positive and negative cases in the given JSON file for a given disease.
    :param json_file: JSON file containing case data, either ground truth or ai scores
    :param disease: Disease column to count positives and negatives for
    :return: Dictionary with counts of positive and negative cases
    """
    with open(json_file, 'r') as f:
        data = json.load(f)

    # extract disease_columns from the JSON data
    if disease not in data['disease_columns']:
        raise ValueError(f"JSON file does not contain '{disease}' key")

    # Initialize counters
    pos_count = 0
    neg_count = 0

    # Count positives and negatives
    for case in data['cases']:
        if case[disease] == 'Positive':
            pos_count += 1
        elif case[disease] == 'Negative':
            neg_count += 1

    # Return counts
    return {'Positive': pos_count, 'Negative': neg_count}
# makes sure that all ai file cases are included in ground truth file
def compare_check_files(ground_truth, ai_scores):
    """
    Compares the ground truth and AI scores JSON files for consistency.
    :param ground_truth: JSON file containing ground truth data
    :param ai_scores: JSON file containing AI scores data
    :return: None
    """
    # first, check if ai_scores has the same cases as ground_truth
    if len(ground_truth['cases']) >= len(ai_scores['cases']):
        raise ValueError("AI scores should have less than or equal to ground truth cases")
    # also make sure both have the same CaseIDs
    ground_truth_ids = {case['case_id'] for case in ground_truth['cases']}
    ai_scores_ids = {case['case_id'] for case in ai_scores['cases']}
    if ground_truth_ids != ai_scores_ids:
        raise ValueError("Ground truth and AI scores must have the same CaseIDs")

In [None]:
# return Excel file, input file paths
def create_confusion_matrix(ground_truth, ai_scores):
    """
    Creates an Excel file containing confusion matrix from comparing ground truth and AI scores
    :param ground_truth: JSON file containing ground truth data
    :param ai_scores: JSON file containing AI scores data
    :return: confusion matrix export as Excel file
    """

    # compare the ground truth and ai_scores files
    #compare_check_files(ground_truth, ai_scores)

    # columns from Example Confusion Matrix
    cols = [
        'condition','tp_Positive','fn_Positive','tn_Positive','fp_Positive','Sensitivity','Specificity','Check',
        'Positive Ground Truth','Negative Ground Truth','Ground Truth Check'
    ]

    # create df to hold confusion matrix data
    df = pd.DataFrame(columns=cols)

   # Load JSON files if they are file paths (strings)
    if isinstance(ground_truth, str):
        with open(ground_truth, 'r') as f:
            ground_truth_data = json.load(f)
    else:
        ground_truth_data = ground_truth
        
    if isinstance(ai_scores, str):
        with open(ai_scores, 'r') as f:
            ai_scores_data = json.load(f)
    else:
        ai_scores_data = ai_scores
    
    # Get disease columns from the ground truth data
    disease_columns = ground_truth_data['disease_columns']
    
    # Process each disease (not 'condition' - that field doesn't exist)
    for disease in disease_columns:
        # Count ground truth cases for this disease
        gt_counts = count_pos_neg(ground_truth, disease)
        
        # Initialize confusion matrix counters
        tp = fp = tn = fn = 0
        
        # Calculate confusion matrix values for this disease
        for ai_case in ai_scores_data['cases']:
            # Find corresponding ground truth case
            gt_case = next((c for c in ground_truth_data['cases'] if c['case_id'] == ai_case['case_id']), None)
            if not gt_case:
                continue
                
            ai_pred = ai_case[disease]
            gt_true = gt_case[disease]
            
            if ai_pred == 'Positive' and gt_true == 'Positive':
                tp += 1
            elif ai_pred == 'Positive' and gt_true == 'Negative':
                fp += 1
            elif ai_pred == 'Negative' and gt_true == 'Positive':
                fn += 1
            elif ai_pred == 'Negative' and gt_true == 'Negative':
                tn += 1
        
        # Calculate metrics
        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
        
        # Create row for this disease
        row = {
            'condition': disease,
            'tp_Positive': tp,
            'fn_Positive': fn,
            'tn_Positive': tn,
            'fp_Positive': fp,
            'Sensitivity': sensitivity,
            'Specificity': specificity,
            'Check': tp + fn + tn + fp,
            'Positive Ground Truth': gt_counts['Positive'],
            'Negative Ground Truth': gt_counts['Negative'],
            'Ground Truth Check': gt_counts['Positive'] + gt_counts['Negative']
        }
        
        # Use pd.concat instead of deprecated df.append
        df = pd.concat([df, pd.DataFrame([row])], ignore_index=True)
    
    # Export to Excel
    df.to_excel('confusion_matrix.xlsx', index=False)
    print("Confusion matrix created and saved as 'confusion_matrix.xlsx'")
    
    return df

# MAIN

In [None]:
# Process your Excel file with disease classifications
excel_file_path = 
complete_data, complete_json_filename = extract_scores_from_excel(disease_cols_feline_thorax, excel_file_path)

In [None]:
ai_scores, ai_json_filename = ai_scorings(
    ground_truth_json_path=complete_json_filename,
    api_key=my_api_key,
    model_type="google",  # or "openai"
    max_cases=None,  # Process all cases
    output_filename=None  # Auto-generate filename
)

In [None]:
create_confusion_matrix(ground_truth=complete_json_filename, ai_scores=ai_json_filename)