In [1]:
import getpass
import os

if "GOOGLE_API_KEY" not in os.environ:
    os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter your Google AI API key: ")

Enter your Google AI API key:  ········


In [2]:
import pandas as pd
import re
from difflib import SequenceMatcher
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
from openpyxl import load_workbook

truth_df = pd.read_excel('/home/jupyter/CPE_EVAL/ground_truth.xlsx')
extract_df = pd.read_excel('/home/jupyter/CPE_EVAL/extraction_gemini.xlsx')

# Read the spreadsheets
table1 = truth_df
table2 = extract_df

table1_name = "truth"
table2_name = "extraction"

# using col names of table1 for table 2; Attention: make sure order is the same!
table2.columns = table1.columns

# Sorting the tables by 'Authors' column
table1_sorted = table1.sort_values(by="filename").reset_index(drop=True)
table2_sorted = table2.sort_values(by="filename").reset_index(drop=True)

# Strip any leading or trailing whitespace from all column names
table1_sorted.columns = table1_sorted.columns.str.strip()
table2_sorted.columns = table2_sorted.columns.str.strip()

# DROP ‘filename’
table1_sorted = table1_sorted.drop('filename', axis=1)
table2_sorted = table2_sorted.drop('filename', axis=1)

# Create an empty DataFrame to store the comparison results
columns = table1_sorted.columns
comparison_results = pd.DataFrame(columns=columns)

# Initialize confusion matrices
confusion_matrices = {col: {"Correct": 0, "Incorrect": 0, "False Positive": 0, "False Negative": 0} for col in columns}

# Initialize a DataFrame to store the highlighted differences for table2
table2_highlighted = table2_sorted.copy()

class ColumnProcessor:
    def process(self, value):
        raise NotImplementedError("Subclasses should implement this!")

    def compare(self, val1, val2):
        raise NotImplementedError("Subclasses should implement this!")

    # Higlight the differences was an optional feature to higlight the found
    # differences between Truth and User cases
    # def highlight_differences(self, val1, val2):
    #     raise NotImplementedError("Subclasses should implement this!")

class StringColumnProcessor(ColumnProcessor):
    def process(self, value):
        if pd.isna(value) or value == "N/A":
            return ""
        return str(value).replace(" ", "").replace(".", "").replace(",", "").lower()

    def compare(self, val1, val2):
        if val1 == "" and val2 != "":
            return "False Positive"
        elif val1 != "" and val2 == "":
            return "False Negative"
        similarity = SequenceMatcher(None, val1, val2).ratio()
        return "Correct" if similarity == 1 else "Incorrect"

class StringColumnNoisyProcessor(ColumnProcessor):
    def process(self, value):
        if pd.isna(value) or value == "N/A":
            return ""
        return str(value).replace(" ", "").replace(".", "").replace(",", "").lower()

    def compare(self, val1, val2):
        if val1 == "" and val2 != "":
            return "False Positive"
        elif val1 != "" and val2 == "":
            return "False Negative"
        similarity = SequenceMatcher(None, val1, val2).ratio()
        return "Correct" if similarity > 0.75 else "Incorrect"

    def highlight_differences(self, val1, val2):
        matcher = SequenceMatcher(None, val1, val2)
        highlighted_val2 = []
        for tag, i1, i2, j1, j2 in matcher.get_opcodes():
            if tag == 'equal':
                highlighted_val2.append(val2[j1:j2])
            elif tag == 'replace' or tag == 'delete' or tag == 'insert':
                highlighted_val2.append(f"[{val2[j1:j2]}]")
        return ''.join(highlighted_val2)

class ListColumnProcessor(ColumnProcessor):
    def process(self, value):
        # 1) Handle blanks/N/A as an empty list
        if pd.isna(value) or str(value).strip().upper() == "N/A":
            return []
        # 2) Split on commas into items
        raw = str(value)
        items = []
        for part in raw.split(','):
            # strip whitespace and surrounding punctuation, lowercase
            clean = part.strip().lower().strip(" .;:()[]{}\"'")
            if clean:
                items.append(clean)
        return items

    def compare(self, truth_list, comp_list):
        # 1) both empty → Correct
        if not truth_list and not comp_list:
            return "Correct"
        # 2) truth empty vs comp nonempty → FP
        if not truth_list and comp_list:
            return "False Positive"
        # 3) truth nonempty vs comp empty → FN
        if truth_list and not comp_list:
            return "False Negative"
        # 4) both nonempty → set logic
        truth_set = set(truth_list)
        comp_set  = set(comp_list)
        missing = truth_set - comp_set
        extra   = comp_set - truth_set

        if not missing and not extra:
            return "Correct"
        if extra and not missing:
            return "False Positive"
        if missing and not extra:
            return "False Negative"
        return "Incorrect"

class BooleanListColumnProcessor(ColumnProcessor):
    def process(self, value):
        # Normalize missing
        if pd.isna(value) or str(value).strip().upper() == "N/A":
            return []
        raw = str(value)
        # Split on standalone “OR”
        parts = re.split(r'\bOR\b', raw, flags=re.IGNORECASE)
        alternatives = []
        for part in parts:
            # Now split on commas into a list of strings
            items = [item.strip().lower() 
                     for item in part.split(',') 
                     if item.strip()]
            alternatives.append(items)
        # If only one alternative, just return that list
        if len(alternatives) == 1:
            return alternatives[0]
        # Otherwise, return list-of-alternative-lists
        return alternatives

    def compare(self, truth_list, comp_list):
        # Handle empty‐vs‐nonempty
        if not truth_list and comp_list:
            return "False Positive"
        if truth_list and not comp_list:
            return "False Negative"
        if not truth_list and not comp_list:
            return "Correct"

        # If truth_list is a list of alternatives
        if (isinstance(truth_list, list) and truth_list and
            isinstance(truth_list[0], list)):
            # Try each alternative in turn
            for alt in truth_list:
                result = self._compare_single(alt, comp_list)
                if result == "Correct":
                    return "Correct"
                # Otherwise keep track of FP/FN if they occur
                if result in ("False Positive", "False Negative"):
                    last_fp_fn = result
            return last_fp_fn if 'last_fp_fn' in locals() else "Incorrect"

        # Otherwise it's a single list -> compare directly
        return self._compare_single(truth_list, comp_list)

    def _compare_single(self, truth_items, comp_items):
        truth_set = set(truth_items)
        comp_set  = set(comp_items)
        missing = truth_set - comp_set
        extra   = comp_set - truth_set

        if not missing and not extra:
            return "Correct"
        if extra and not missing:
            return "False Positive"
        if missing and not extra:
            return "False Negative"
        return "Incorrect"

class KeywordListColumnProcessor(ColumnProcessor):
    def process(self, value):
        # Treat blanks/N/A as “no keywords”
        if pd.isna(value) or str(value).strip().upper() == "N/A":
            return []
        raw = str(value)
        # Split on commas into candidate keywords
        parts = [part.strip() for part in raw.split(",")]
        keywords = []
        for part in parts:
            # Remove everything except letters+digits, lowercase
            clean = re.sub(r'[^A-Za-z0-9]', '', part).lower()
            if clean:
                keywords.append(clean)
        return keywords

    def compare(self, truth_keywords, comp_keywords):
        # Edge cases: empty vs non-empty
        if not truth_keywords and comp_keywords:
            return "False Positive"
        if truth_keywords and not comp_keywords:
            return "False Negative"
        if not truth_keywords and not comp_keywords:
            return "Correct"

        # Check that every truth‐keyword appears in *some* comp string
        missing = [
            kw for kw in truth_keywords
            if not any(kw in comp for comp in comp_keywords)
        ]
        return "Correct" if not missing else "False Negative"

class OrderedListColumnProcessor(ListColumnProcessor):
    def compare(self, truth_list, comparison_list):
        if truth_list == "" and comparison_list != "":
            return "False Positive"
        elif truth_list != "" and comparison_list == "":
            return "False Negative"
        if not truth_list or not comparison_list:
            return "Incorrect"

        # Convert the lists to sets to compare their contents
        truth_set = set(truth_list)
        comparison_set = set(comparison_list)

        # Check for missing elements in the second list (False Negative)
        missing_elements = truth_set - comparison_set

        # Check for extra elements in the second list (False Positive)
        extra_elements = comparison_set - truth_set

        # If both missing and extra elements exist, treat it as Incorrect
        if missing_elements and extra_elements:
            return "Incorrect"

        # If there are only extra elements in the second list (False Positive)
        if extra_elements:
            return "False Positive"

        # If there are only missing elements in the second list (False Negative)
        if missing_elements:
            return "False Negative"

        # Check if the lists have the same elements but in the wrong order (Incorrect)
        if truth_list != comparison_list:
            return "Incorrect"

        # If both lists are exactly the same in content and order, return Correct
        return "Correct"

class NumericColumnProcessor(ColumnProcessor):
    # def process(self, value):
    #     if pd.isna(value) or value == "N/A":
    #         return ""
    #     try:
    #         # Round the numeric value to the first digit after the comma
    #         value = round(float(value), 1)
    #     except ValueError:
    #         pass
    #     return ''.join(filter(str.isdigit, str(value)))

# rewritten process function
    def process(self,value):
        if pd.isna(value) or value == "N/A":
            return ""

        # Use regular expressions to find numeric parts in the string
        match = re.search(r"\d+(\.\d+)?", str(value))

        if match:
            # Convert the matched number to float and round to one decimal place
            number = round(float(match.group()), 1)

            # Return the number as a string with one decimal place
            return str(number)

        # Return empty string if no numeric part is found
        return ""



    def compare(self, val1, val2):
        if val1 == "" and val2 != "":
            return "False Positive"
        elif val1 != "" and val2 == "":
            return "False Negative"
        elif val1 == "" and val2 == "":
            return "Correct"
        elif val1 != "" and val2 != "":
       #similarity = SequenceMatcher(None, val1, val2).ratio() # can try other metrics later on
          num1=float(val1)
          num2=float(val2)
          if num1 == num2:
            return "Correct"
          else:
            return "Incorrect"

    # def highlight_differences(self, val1, val2):
    #     return val2  # No complex highlighting for numeric values

In [3]:
import pandas as pd
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import StructuredOutputParser, ResponseSchema
from langchain_core.runnables import RunnableSequence
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI


class LLMColumnProcessor(ColumnProcessor):
    def __init__(self, model_name="gemini-2.5-flash-preview-04-17"):
        # Define the structured output: just a yes/no field
        response_schemas = [
            ResponseSchema(
                name="match",
                description='“yes” if the two values are semantically equivalent, otherwise “no”'
            )
        ]
        self.parser = StructuredOutputParser.from_response_schemas(response_schemas)
        format_instructions = self.parser.get_format_instructions()

        fi = self.parser.get_format_instructions()
# turn every single-brace into a double-brace so f-string PromptTemplate treats
# them as *literal* braces
        fi_escaped = fi.replace('{', '{{').replace('}', '}}')

        prompt_template = (
            "You are evaluating data extraction.  Given the ground truth value:\n\n"
            "  {truth}\n\n"
            "and the extracted value:\n\n"
            "  {extracted}\n\n"
            "Decide whether they convey the same meaning (even if phrased differently).\n"
            f"{fi_escaped}\n"
        )

        prompt = ChatPromptTemplate.from_template(prompt_template)

        # Use ChatOpenAI for chat models
        llm = ChatGoogleGenerativeAI(model=model_name, temperature=0)

        # Chain prompt and chat model
        self.chain = prompt | llm

    def process(self, value):
        if pd.isna(value) or str(value).strip().upper() == "N/A":
            return ""
        return str(value)

    def compare(self, truth_val, extracted_val):
        if truth_val.strip().lower() == extracted_val.strip().lower():
            return "Correct"
        if not truth_val and not extracted_val:
            return "Correct"
        if not truth_val and extracted_val:
            return "False Positive"
        if truth_val and not extracted_val:
            return "False Negative"

        raw = self.chain.invoke({"truth": truth_val, "extracted": extracted_val})
        parsed = self.parser.parse(raw.content)  # extract text content here
        return "Correct" if parsed["match"].strip().lower() == "yes" else "Incorrect"

In [4]:
processors = {
    "country": StringColumnProcessor(),
    "type_of_study": BooleanListColumnProcessor(),
    "sample_type": BooleanListColumnProcessor(),
    "matrix_description": LLMColumnProcessor(),
    "bacterial_species": ListColumnProcessor(),
    "genes": KeywordListColumnProcessor(),
    "protocol_reference": KeywordListColumnProcessor(),
    "isolation_procedure": BooleanListColumnProcessor(),
    "method_validation": LLMColumnProcessor(),
    "isolate_characterization": KeywordListColumnProcessor(),
    "MLST_isolates": ListColumnProcessor(),
    "gene_localization": ListColumnProcessor(),
    "plasmid_type": StringColumnProcessor(),
    "plasmid_size": StringColumnProcessor(),
    "plasmid_transferable": StringColumnProcessor(),
    "LOD_culture_based": StringColumnProcessor(),
    "confirmation_carbapenemase": BooleanListColumnProcessor(), 
    "sample_dilution": StringColumnProcessor(),
    "sample_weight": StringColumnProcessor(),
    "first_enrichtment": KeywordListColumnProcessor(),
    "first_enrichtment_condition": NumericColumnProcessor(),
    "second_enrichment": BooleanListColumnProcessor(),
    "second_enrichtment_selective": KeywordListColumnProcessor(),
    "second_enrichtment_condition": NumericColumnProcessor(),
    "selective_plate": LLMColumnProcessor(),
    "ncb_method": KeywordListColumnProcessor(),
    "which_step": StringColumnNoisyProcessor(),
}

In [5]:
# Process and compare the data
for column in columns:
    processor = processors[column]
    processed_table1 = table1_sorted[column].apply(processor.process)
    processed_table2 = table2_sorted[column].apply(processor.process)
    comparison_results[column] = [
        processor.compare(val1, val2) for val1, val2 in zip(processed_table1, processed_table2)
    ]
    # table2_highlighted[column] = [
    #     processor.highlight_differences(val1, val2) for val1, val2 in zip(processed_table1, processed_table2)
    # ]
    # Update confusion matrices
    for result in comparison_results[column]:
        confusion_matrices[column][result] += 1

# Combine the analyzed tables with the comparison results in the specified order
combined_output = pd.DataFrame()

for column in columns:
    combined_output[f'{column}_{table1_name}'] = table1_sorted[column]
    combined_output[f'{column}_{table2_name}'] = table2_sorted[column]
    combined_output[f'{column}_comparison'] = comparison_results[column]

### Combined Metrics ### (Additional removed)

# Calculate accuracy for each column in the confusion matrices
for column, matrix in confusion_matrices.items():
    TP = matrix["Correct"]
    FP = matrix["False Positive"]
    FN = matrix["False Negative"]
    N = matrix["Incorrect"]

    accuracy = TP / (TP + FP + FN + N) if (TP + FP + FN + N) > 0 else 0

    # Store only the accuracy in the matrix
    matrix.update({
        "Accuracy": accuracy
    })

#### Combined metrics ####

# Calculate overall accuracy across all columns
overall_metrics = {"Correct": 0, "Incorrect": 0, "False Positive": 0, "False Negative": 0}

# Sum all the confusion matrix values to get overall metrics
for column, matrix in confusion_matrices.items():
    if column == "Overall":
        continue  # Skip if already calculated
    overall_metrics["Correct"] += matrix["Correct"]
    overall_metrics["Incorrect"] += matrix["Incorrect"]
    overall_metrics["False Positive"] += matrix["False Positive"]
    overall_metrics["False Negative"] += matrix["False Negative"]

# Calculate the overall accuracy
TP = overall_metrics["Correct"]
FP = overall_metrics["False Positive"]
FN = overall_metrics["False Negative"]
N = overall_metrics["Incorrect"]

overall_accuracy = TP / (TP + FP + FN + N) if (TP + FP + FN + N) > 0 else 0

# Store the overall metrics in the matrix
overall_metrics.update({"Accuracy": overall_accuracy})
confusion_matrices["Overall"] = overall_metrics

# Convert confusion matrices to DataFrame
confusion_matrices_df = pd.DataFrame(confusion_matrices).T

# Save the combined output table and confusion matrices to an Excel file
with pd.ExcelWriter(f'Comparison_Results_CARC_{table1_name}_{table2_name}.xlsx') as writer:
    combined_output.to_excel(writer, sheet_name='Combined Results', index=False)
    confusion_matrices_df.to_excel(writer, sheet_name='Confusion Matrices')

print("Combined Results:")
print(combined_output)

print("\nConfusion Matrices and Accuracy:")
print(confusion_matrices_df)

Combined Results:
  country_truth                                 country_extraction  \
0        France  the Netherlands, Sweden, France, Norway, Denma...   
1       Germany                                            Germany   
2       Germany                                            Germany   
3       Germany                                            Germany   
4       Germany                                            Germany   
5       Germany                                            Germany   
6       Germany                                            Germany   
7       Germany                                            Germany   

  country_comparison                                type_of_study_truth  \
0          Incorrect  method comparison, method validation OR method...   
1            Correct                               sample investigation   
2            Correct         sample investigation, protocol development   
3            Correct                               