In [1]:
import json
from collections import defaultdict, Counter
from seqeval.scheme import auto_detect
from seqeval.metrics.sequence_labeling import get_entities
from seqeval.scheme import Entities


class StrictConfusionMatrix:
    def __init__(self, y_true, y_pred):
        self.y_true = y_true
        self.y_pred = y_pred
    
    def compute(self):
        """
        Compute confusion matrix, false positives, and false negatives for all entities.

        Returns:
            dict: A dictionary containing:
                - 'confusion_matrix': The confusion matrix for all entity types.
                - 'false_negatives': A dictionary with false negative counts categorized by type and subcategory.
                - 'false_positives': A dictionary with false positive counts categorized by type and subcategory.
        """
        # Prepare entities (this initializes and formats entities based on the input scheme)
        self.prepare_entities()

        # Compute the confusion matrix for all entities
        confusion_matrix = self.compute_confusion_matrix()

        # Initialize dictionaries for false negatives and false positives
        false_negatives = defaultdict(Counter)
        false_positives = defaultdict(Counter)

        # Get all unique entity types from the data
        entity_types = set(
            ent[1] for ent in self.true_entities
        ).union(set(ent[1] for ent in self.pred_entities))

        # Iterate over all entity types to calculate false negatives and positives
        for entity_type in entity_types:
            # Compute false negatives for this type
            fn_counts = self.compute_false_negatives(entity_type)

            # Compute false positives for this type
            fp_counts = self.compute_false_positives(entity_type)

            # Merge the results into the global dictionaries
            for t_type, counts in fn_counts.items():
                for subtype, count in counts.items():
                    false_negatives[t_type][subtype] += count

            for t_type, counts in fp_counts.items():
                for subtype, count in counts.items():
                    false_positives[t_type][subtype] += count

        # Return the aggregated results
        return {
            'confusion_matrix': confusion_matrix,
            'false_negatives': dict(false_negatives),  # Convert to standard dict for output clarity
            'false_positives': dict(false_positives),  # Convert to standard dict for output clarity
        }

    
    
    def prepare_entities(self):
        # Initialize true and predicted entities
        self.scheme = auto_detect(self.y_true, False)
        entities_true = self.extract_entities(self.y_true)
        entities_pred = self.extract_entities(self.y_pred)
        self.true_entities = self.flatten_strict_entities(entities_true)
        self.pred_entities = self.flatten_strict_entities(entities_pred)

    def extract_entities(self, y_data):
        # Replace with the Entities() logic if provided
        return Entities(y_data, self.scheme, False)

    @staticmethod
    def flatten_strict_entities(entities):
        """Flatten entities extracted in strict mode into tuples."""
        return [e.to_tuple() for sen in entities.entities for e in sen]

    def compute_confusion_matrix(self):
        """Compute confusion matrix across all entity types."""
        types = set([ent[1] for ent in self.true_entities]).union(
            [ent[1] for ent in self.pred_entities]
        )

        confusion_matrix = {typ: {'TP': 0, 'FP': 0, 'FN': 0} for typ in types}

        for entity_type in types:
            TP, FP, FN = self.extract_strict_entity_confusion(entity_type)
            confusion_matrix[entity_type]['TP'] = TP
            confusion_matrix[entity_type]['FP'] = FP
            confusion_matrix[entity_type]['FN'] = FN

        return confusion_matrix

    def extract_strict_entity_confusion(self, entity):
        """Extract TP, FP, and FN for a given entity type."""
        fns = set([e for e in self.true_entities if e[1] == entity]) - set(
            [e for e in self.pred_entities if e[1] == entity]
        )
        fps = set([e for e in self.pred_entities if e[1] == entity]) - set(
            [e for e in self.true_entities if e[1] == entity]
        )
        tps = set([e for e in self.pred_entities if e[1] == entity]).intersection(
            set([e for e in self.true_entities if e[1] == entity])
        )
        return len(tps), len(fps), len(fns)

    def compute_false_positives(self, entity_type):
        """Analyze false positives for a specific entity type."""
        false_positives = set(
            [e for e in self.pred_entities if e[1] == entity_type]
        ) - set([e for e in self.true_entities if e[1] == entity_type])

        return self.analyze_errors(false_positives, self.true_entities, "FP")

    def compute_false_negatives(self, entity_type):
        """Analyze false negatives for a specific entity type."""
        false_negatives = set(
            [e for e in self.true_entities if e[1] == entity_type]
        ) - set([e for e in self.pred_entities if e[1] == entity_type])

        return self.analyze_errors(false_negatives, self.pred_entities, "FN")

    def analyze_errors(self, target_entities, comparison_entities, error_type):
        """Analyze entity-level errors (FP or FN)."""
        counts = defaultdict(Counter)
        non_o_errors = set()
        indexed_entities = defaultdict(list)

        # Index comparison entities by sentence
        for entity in comparison_entities:
            sen, entity_type, start, end = entity
            indexed_entities[sen].append(entity)
        
        # Track processed pairs to avoid duplicates in counting
        processed_pairs = set()
        # First pass: entity errors
        for target_entity in target_entities:
            t_sen, t_type, t_start, t_end = target_entity

            for comp_entity in indexed_entities[t_sen]:
                c_type, c_start, c_end = comp_entity[1:]

                # Check for entity type mismatch with exact boundary match
                if (
                t_start == c_start
                and t_end == c_end
                and t_type != c_type
                and target_entity not in non_o_errors
            ):
                    counts[t_type][c_type] += 1
                    non_o_errors.add(target_entity)

        # Second pass: boundary errors
        for target_entity in target_entities - non_o_errors:
            t_sen, t_type, t_start, t_end = target_entity

            for comp_entity in indexed_entities[t_sen]:
                c_type, c_start, c_end = comp_entity[1:]

                # Check for boundary issues with the same entity type
                if (
                t_type == c_type
                and (t_start <= c_start <= t_end or t_start <= c_end <= t_end)
                and target_entity not in non_o_errors
            ):
                    counts[t_type]['Boundary'] += 1
                    non_o_errors.add(target_entity)

        # Third pass: combined entity and boundary errors
        for target_entity in target_entities - non_o_errors:
            t_sen, t_type, t_start, t_end = target_entity

            for comp_entity in indexed_entities[t_sen]:
                c_type, c_start, c_end = comp_entity[1:]

                # Check for combined entity and boundary issues with different types
                if (
                c_type != t_type
                and (t_start <= c_start <= t_end or t_start <= c_end <= t_end)
                and target_entity not in non_o_errors
            ):
                    counts[t_type]['Entity and Boundary'] += 1
                    non_o_errors.add(target_entity)

        # Remaining errors are "O" errors (completely unmatched)
        for target_entity in target_entities - non_o_errors:
            t_sen, t_type, t_start, t_end = target_entity
            counts[t_type]['O'] += 1

        return dict(counts)



from collections import defaultdict, Counter

class EntityConfusion:
    def __init__(self, y_true, y_pred):
        """
        Initialize the EntityConfusionMatrix class.

        Args:
            y_true (list): The ground truth entities.
            y_pred (list): The predicted entities.
        """
        self.y_true = y_true
        self.y_pred = y_pred
        

    def prepare_entities(self):
        """
        Prepare entities for confusion matrix calculation.
        - In non-strict mode, entities are extracted using `get_entities`.
        """    
        # Use non-strict extraction
        self.true_entities = get_entities(self.y_true)
        self.pred_entities = get_entities(self.y_pred)

    @staticmethod
    def extract_entity_confusion(entity, true_entities, pred_entities):
        fns = set([e for e in true_entities if e[0] == entity]) - set([e for e in pred_entities if e[0] == entity])
        fps = set([e for e in pred_entities if e[0] == entity]) - set([e for e in true_entities if e[0] == entity])
        tps = set([e for e in pred_entities if e[0] == entity]).intersection(set([e for e in true_entities if e[0] == entity]))
        return len(tps), len(fps), len(fns)


    def compute(self):
        """
        Compute the confusion matrix, false negatives, and false positives.

        Returns:
            dict: A dictionary containing:
                  - 'confusion_matrix': The confusion matrix for entity recognition.
                  - 'false_negatives': Detailed false negatives.
                  - 'false_positives': Detailed false positives.
        """
        self.prepare_entities()
        return {
            'confusion_matrix': self.compute_confusion_matrix(),
            'false_negatives': self.compute_false_negatives_with_boundary(),
            'false_positives': self.compute_false_positives_with_boundary()
        }
    

    def compute_confusion_matrix(self):
        """
        Compute a confusion matrix for Named Entity Recognition (NER) predictions.

        Returns:
            dict: A confusion matrix structured as:
                  {entity_type: {'TP': count, 'FP': count, 'FN': count}}
        """
        # Extract all unique entity types from true and predicted entities
        types = set([ent[0] for ent in self.true_entities]).union([ent[0] for ent in self.pred_entities])

        # Initialize the confusion matrix
        confusion_matrix = {typ: {'TP': 0, 'FP': 0, 'FN': 0} for typ in types}

        
        # Populate the confusion matrix for each entity type
        for entity_type in types:
            TP, FP, FN = self.extract_entity_confusion(entity_type, self.true_entities, self.pred_entities)
            confusion_matrix[entity_type]['TP'] = TP
            confusion_matrix[entity_type]['FP'] = FP
            confusion_matrix[entity_type]['FN'] = FN

        return confusion_matrix
    

    def compute_false_negatives_with_boundary(self):
        """
        Compute false negatives with detailed categorization:
        - 'Boundary': Incorrect boundaries for the same entity type.
        - 'Missed': Predicted as O or no match at all.

        Returns:
            dict: False negatives categorized by entity type.
        """
        fn_counts = defaultdict(Counter)
        true_indexed = {(t[1], t[2]): t[0] for t in self.true_entities}  # Index true entities by boundaries
        pred_indexed = {(p[1], p[2]): p[0] for p in self.pred_entities}  # Index predicted entities by boundaries
        # Iterate through true entities to classify false negatives
        for (t_start, t_end), t_type in true_indexed.items():
            if (t_start, t_end) in pred_indexed:
                if pred_indexed[(t_start, t_end)] != t_type:
                    # Type mismatch at the exact position
                    matched_type = pred_indexed.get((t_start, t_end))
                    fn_counts[t_type][matched_type] += 1
            else:
                # No exact match found, check for other errors
                boundary_error = False
                entity_error = False
                for (p_start, p_end), p_type in pred_indexed.items():
                    if t_type == p_type:
                        if (p_start <= t_start <= p_end) or (p_start <= t_end <= p_end):
                            # Boundary error for the same type
                            fn_counts[t_type]['Boundary'] += 1
                            boundary_error = True
                            break
                    else:
                        if (p_start <= t_start <= p_end) or (p_start <= t_end <= p_end):
                            # Boundary error with a different entity type
                            fn_counts[t_type]['Entity and Boundary'] += 1
                            entity_error = True
                            break
                if not boundary_error and not entity_error:
                    # Missed entity entirely
                    fn_counts[t_type]['O'] += 1

        return dict(fn_counts)


    def compute_false_positives_with_boundary(self):
        """
        Compute false positives with boundary categorization:
        - 'Boundary': Incorrect boundaries for the same entity type.
        - 'Missed': Predicted as O or no match at all.

        Returns:
            dict: False positives categorized by entity type.
        """
        fp_counts = defaultdict(Counter)
        true_indexed = {(t[1], t[2]): t[0] for t in self.true_entities}  # Index true entities by boundaries
        pred_indexed = {(p[1], p[2]): p[0] for p in self.pred_entities}  # Index predicted entities by boundaries
        # Iterate through predicted entities to find false positives
        for (p_start, p_end), p_type in pred_indexed.items():
            entity_error = False
            if (p_start, p_end) not in true_indexed or true_indexed[(p_start, p_end)] != p_type:
                # No matching true entity or type mismatch at the same position
                matched_type = true_indexed.get((p_start, p_end))
                if matched_type:
                    fp_counts[p_type][matched_type] += 1
                    entity_error = True
                boundary_error = False
                entity_boundary_error = False
                for (t_start, t_end), t_type in true_indexed.items():
                    if t_type == p_type and not (p_start == t_start and p_end == t_end):
                        if (p_start <= t_start <= p_end) or (p_start <= t_end <= p_end):
                            # Detected boundary error for the same entity type
                            fp_counts[p_type]['Boundary'] += 1
                            boundary_error = True
                            break
                    elif t_type != p_type and not (p_start == t_start and p_end == t_end):
                        if (p_start <= t_start <= p_end) or (p_start <= t_end <= p_end):
                            fp_counts[p_type]['Entity and Boundary'] += 1
                            entity_boundary_error = True
                            break
                if not boundary_error and not entity_error and not entity_boundary_error:
                    # Missed entity entirely (e.g., predicted as O)
                    fp_counts[p_type]['O'] += 1

        return dict(fp_counts)
    

def save_json(file_path, data):
    with open(file_path, 'w', encoding='utf-8') as file:
        json.dump(data, file, ensure_ascii=False, indent=4)


In [8]:
from pathlib import Path
import pandas as pd
def run_confusion(dataset_name, model_name):
    
	base_folder = Path(f'/Users/ay227/Library/CloudStorage/GoogleDrive-ahmed.younes.sam@gmail.com/My Drive/Final Year Experiments/Thesis-Experiments/Experiments/BaseLineExperiment/{dataset_name}_{model_name}')
	evaluation_metrics_file = base_folder / 'fine_tuning/evaluation_metrics.json'
	with open(evaluation_metrics_file, 'r') as file:
		entity_outputs = json.load(file)  # Use json.load() to read file, not json.loads()
	y_true = entity_outputs['entity_outputs']['y_true']
	y_pred = entity_outputs['entity_outputs']['y_pred']
	results_folder = base_folder / 'extractions/results'  
	original_confusion_path = results_folder / 'entity_confusion_data.json'
	scrapped_confusion_path = results_folder / 'entity_confusion_data-backup.json'
	strict_confusion_path = results_folder / 'strict_entity_confusion_data.json'
	print(original_confusion_path)
	df = pd.read_json(
		original_confusion_path,
		lines=True
	)
	df.to_json(
		scrapped_confusion_path,
		lines=True,
		orient='records'
	)
	strict_confusion = StrictConfusionMatrix(y_true, y_pred)
	confusion = EntityConfusion(y_true, y_pred)
	strict_confusion_matrix = strict_confusion.compute()
	confusion_matrix = confusion.compute()
	print(f'Saving Confusion for {dataset_name}_{model_name}')
	save_json(original_confusion_path, confusion_matrix)
	save_json(strict_confusion_path, strict_confusion_matrix)


In [None]:
model_name = 'arabertv02'
dataset_name = 'ANERCorp_CamelLab'

run_confusion(dataset_name, model_name)

In [None]:
model_name = 'bert'
dataset_name = 'conll2003'

run_confusion(dataset_name, model_name)


/Users/ay227/Library/CloudStorage/GoogleDrive-ahmed.younes.sam@gmail.com/My Drive/Final Year Experiments/Thesis-Experiments/Experiments/BaseLineExperiment/conll2003_bert/extractions/results/entity_confusion_data.json
Saving Confusion for conll2003_bert


In [12]:
test_folder = Path('/Users/ay227/Library/CloudStorage/GoogleDrive-ahmed.younes.sam@gmail.com/My Drive/Final Year Experiments/Thesis-Experiments/Experiments/BaseLineExperiment/conll2003_bert/extractions/results/entity_confusion_data.json')
import json
with open(test_folder, 'r') as f:
    data = json.load(f)
    


In [None]:
data

# loose

In [22]:
from pathlib import Path
import pandas as pd
file_name = '/Users/ay227/Library/CloudStorage/GoogleDrive-ahmed.younes.sam@gmail.com/My Drive/Final Year Experiments/Thesis-Experiments/Experiments/BaseLineExperiment/ANERCorp_CamelLab_arabertv02/fine_tuning/evaluation_metrics.json'
with open(file_name, 'r') as file:
    entity_outputs = json.load(file)  # Use json.load() to read file, not json.loads()
y_true = entity_outputs['entity_outputs']['y_true']
y_pred = entity_outputs['entity_outputs']['y_pred']

base_folder = Path('/Users/ay227/Library/CloudStorage/GoogleDrive-ahmed.younes.sam@gmail.com/My Drive/Final Year Experiments/Thesis-Experiments/Experiments/BaseLineExperiment/ANERCorp_CamelLab_arabertv02/extractions/results')
original_confusion_path = base_folder / 'entity_confusion_data.json'
scrapped_confusion_path = base_folder / 'entity_confusion_data-backup.json'
strict_confusion_path = base_folder / 'strict_entity_confusion_data.json'

In [None]:
df = pd.read_json(
    original_confusion_path,
	lines=True
)

In [None]:
df.to_json(
	scrapped_confusion_path,
	lines=True,
	orient='records'
)

In [None]:
strict_confusion = StrictConfusionMatrix(y_true, y_pred)
confusion = EntityConfusion(y_true, y_pred)

In [None]:
strict_confusion_matrix = strict_confusion.compute()
confusion_matrix = confusion.compute()


In [None]:
save_json(original_confusion_path, confusion_matrix)
save_json(strict_confusion_path, strict_confusion_matrix)
