# Translation Error Annotation Agreement Analysis

This notebook analyzes inter-annotator agreement on translation error annotations following the [MQM (Multidimensional Quality Metrics)](https://themqm.org/error-types-2/typology/) typology.

In this notebook, we'll calculate:
- Visualizations of agreement on the text
- Exact matching for span boundaries
- F1 for partial credit on spans
- Kappa for category agreement, subcategory agreement, and severity levels

## 1. SETUP AND IMPORTS

In [None]:
import json
import pandas as pd
import numpy as np
import matplotlib  # Add base matplotlib import
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import cohen_kappa_score, f1_score
from IPython.display import display, HTML, Markdown
import re
from itertools import combinations
import warnings
warnings.filterwarnings('ignore')

# Print version information with error handling
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
try:
    print(f"matplotlib version: {matplotlib.__version__}")
except (ImportError, NameError, AttributeError):
    print("matplotlib not properly imported")
try:
    print(f"seaborn version: {sns.__version__}")
except (ImportError, NameError, AttributeError):
    print("seaborn not properly imported")
try:
    import sklearn
    print(f"scikit-learn version: {sklearn.__version__}")
except (ImportError, NameError, AttributeError):
    print("scikit-learn not properly imported")

# Install required packages if in Colab
try:
    import google.colab
    print("Running in Google Colab. Installing/upgrading required packages...")
    !pip install -q pandas numpy matplotlib seaborn scikit-learn
except ImportError:
    print("Not running in Google Colab")

# Set up plotting style
plt.style.use('ggplot')
sns.set_palette("colorblind")

## 2. DATA LOADING

Below we load the data from Google Drive. The setup of the using_colab variable handles data loading intelligently. The code uses a try/except block to detect whether the code is running in Google Colab.

If the code is running in Colab:
- It tries to import the Google Colab drive module
- Mounts your Google Drive to /content/drive
- Sets using_colab = True
- Prints a success message

If the code is not running in Colab:
- The import will fail with an ImportError
- The except block catches this error
- Sets using_colab = False
- Prints a message indicating you’re using the local file system

This pattern makes the notebook work in both environments.

In [None]:
# Mount Google Drive for Colab
try:
    from google.colab import drive
    drive.mount('/content/drive')
    using_colab = True
    print("Google Drive mounted successfully")
except ImportError:
    using_colab = False
    print("Not running in Google Colab, using local file system")

Below, the code sets up file paths for the translation annotation project and checks whether these paths exist in the file system. Here’s a breakdown of what it does:
1. It imports the os module to work with file paths and directories.
2. It defines a check_path function that:
- Checks if a given path exists
- Prints whether the path exists or not
- Returns a boolean indicating existence

3. It sets up paths to data files based on the environment:
- If running in Google Colab (using_colab is True), it uses paths in Google Drive
- Otherwise, it uses local paths

4. It prints the paths to the annotations and content directories.
5. It checks if these directories exist using the check_path function. If the annotation directory exists, it:
- Lists all files in the directory
- Creates a variable containing an array of all JSON files in the directory
- Prints how many JSON files were found

In [None]:
import os

# Define the check_path function
def check_path(path):
    exists = os.path.exists(path)
    print(f" - {path}: {'EXISTS' if exists else 'DOES NOT EXIST'}")
    return exists

# Set paths to your data files
if using_colab:
    # Replace 'your_project_folder' with the actual folder name in your Google Drive
    base_path = '/content/drive/Translation_Annotations'
    annotation_dir = f'{base_path}/annotations'
    content_dir = f'{base_path}/content'
else:
    # Local paths
    annotation_dir = './data/annotations'
    content_dir = './data/content'

print(f"Annotation directory: {annotation_dir}")
print(f"Content directory: {content_dir}")

# Check if the directories exist
print("Checking paths:")
annotation_dir_exists = check_path(annotation_dir)
content_dir_exists = check_path(content_dir)

# List files in the directories if they exist
if annotation_dir_exists:
    print(f"\nFiles in annotation directory:")
    for file in os.listdir(annotation_dir):
        print(f" - {file}")
    # Create a variable containing an array of all JSON files in the annotations directory
    annotation_files = [os.path.join(annotation_dir, f) for f in os.listdir(annotation_dir) if f.endswith('.json')]
    # The variable annotation_files contains an array of file paths to all JSON files in the annotation directory.
    print(f"\nFound {len(annotation_files)} JSON files in the annotations directory")

if content_dir_exists:
    print(f"\nFiles in content directory:")
    for file in os.listdir(content_dir):
        print(f" - {file}")
    # Create a variable containing an array of all JSON files in the content directory
    content_files = [os.path.join(content_dir, f) for f in os.listdir(content_dir) if f.endswith('.json')]
    # The variable content_files contains an array of file paths to all JSON files in content directory.
    print(f"\nFound {len(content_files)} JSON files in the content directory")

## 3. DATA PRE-PROCESSING

In this notebook, we’re processing JSON files exported from Label Studio after texts have been annotated following the MQM translation error typology.

### Step 1 - Document Identification
The annotation objects in the JSON files give a unique ID for the text that was annotated for that object. That is, texts that are the same are given unique IDs, preventing the use of the document ID to group texts annotated by multiple annotators for analysis.

The code below therefore:
- Creates a way to identify unique texts across JSON files using section headers
- Groups annotation objects by assigning consistent doc_ids to each unique text
- Creates a verification code that shows which texts were identified and how many annotations exist for each

In [None]:
from pprint import pprint

def identify_documents(annotation_files):
    """
    Process annotation files to identify unique documents based on section headers.

    Args:
        annotation_files: List of paths to JSON annotation files

    Returns:
        A tuple containing:
        - section_to_doc_id: Dictionary mapping section headers to doc_ids
        - doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations
    """
    # Dictionary to map section headers to unique doc_ids
    section_to_doc_id = {}

    # Dictionary to store annotations by doc_id
    doc_id_to_annotations = {}

    # Base regex pattern to extract section header (after the main title)
    header_pattern = r"BLUE CARBON, MANGROVES AND PUBLIC POLICY\s*\n+\s*([A-Z][A-Z\s:]+(?:\s+AND\s+[A-Z\s]+)?)"

    # Process each annotation file
    for file_path in annotation_files:
        try:
            print(f"\nReading file: {os.path.basename(file_path)}")
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)

            print(f"File contains {len(data)} annotation entries")

            # Process each annotation in the file
            for item in data:
                # Extract the document text
                text = item.get('text', '')

                # Extract section header using regex
                match = re.search(header_pattern, text)
                if match:
                    section_header = match.group(1).strip()

                    # Remove any trailing single letters that might be separated by whitespace
                    section_header = re.sub(r'\s+[A-Z]\s*$', '', section_header)
                    # Also remove trailing single letters without whitespace
                    section_header = re.sub(r'[A-Z]$', '', section_header) if len(section_header) > 1 else section_header
                    # Trim any resulting whitespace
                    section_header = section_header.strip()

                    # For "A GREAT CARBON STORAGE SYSTEM", add ": MANGROVES" if it exists in the text
                    if section_header == "A GREAT CARBON STORAGE SYSTEM" and ": MANGROVES" in text:
                        section_header = "A GREAT CARBON STORAGE SYSTEM: MANGROVES"
                else:
                    # Fallback to using the first 50 chars if no section header found
                    section_header = text[:50].strip()

                print(f"Found section header: '{section_header}'")

                # Check if we've seen this section before
                if section_header not in section_to_doc_id:
                    # Assign new doc_id
                    doc_id = len(section_to_doc_id) + 1
                    section_to_doc_id[section_header] = doc_id
                    doc_id_to_annotations[doc_id] = []
                    print(f"  → Assigned NEW doc_id: {doc_id}")
                else:
                    # Use existing doc_id
                    doc_id = section_to_doc_id[section_header]
                    print(f"  → Using EXISTING doc_id: {doc_id}")

                # Store annotation with doc_id
                item['doc_id'] = doc_id
                doc_id_to_annotations[doc_id].append(item)

        except Exception as e:
            print(f"Error processing file {file_path}: {e}")

    return section_to_doc_id, doc_id_to_annotations

In [None]:
# Run the document identification using the existing annotation_files variable
section_to_doc_id, doc_id_to_annotations = identify_documents(annotation_files)

# To verify the results more thoroughly, examine a sample from each document:
print("\nFirst few words of each unique document:")
for doc_id, annotations in doc_id_to_annotations.items():
    if annotations:
        text_sample = annotations[0].get('text', '')[:100].replace('\n', ' ')
        print(f"  Doc ID {doc_id}: \"{text_sample}...\"")

### Step 2 - Restructuring the Annotations

Each annotation object within the JSON exports has the following structure:
- `text`: The full text content being annotated (appears to be sections from a scientific article)
- `id`: A unique identifier for each entry (76, 77, 78)
- `label`: An array of specific text annotations with:
 - `start` and `end` positions (character indices)
 - The highlighted `text` segment
 - `labels` categorizing the issue (like "Accuracy", "Terminology", "Style")
- The `subcategory` (e.g., "TERM: Wrong term", "STYLE: Unnatural style"), `severity` ("Minor" or "Major"), and `comments` (explanations and feedback for improvement for the error identified in the label object) keys each have their own separate arrays that correspond positionally to the items in the label array.
- `document_issues`: Area containing comments on issues spanning the entire document
- `overall_correspondence` and `overall_readability`: Numerical ratings (1-5 scale) of the quality of the content as a whole
- `correspondence_comments` and `readability_comments`: Feedback on the quality of the text as a whole
- Metadata including `annotator ID`, `annotation_id`, `timestamps`, and `lead_time`

#### **Pre-processing error-specific data**
The code below combines the contents from the `subcategory`, `severity`, and `comments` arrays with the corresponding `label` array. Combining corresponding content for each span (`label`, `subcategory`, `severity`, `comments`) into a single label object will make it much easier to:
- Calculate exact match metrics - With all data in a single object per span, you can directly compare span boundaries (`start`/`end`) between annotators.
- Calculate F1 for partial credit - The restructured format makes it straightforward to determine overlapping spans between annotators and compute precision/recall for partially matching annotations.
- Calculate Kappa statistics - Having `labels`, `subcategory`, and `severity` in the same object makes it simple to extract these fields and compute inter-annotator agreement using Cohen's Kappa or Fleiss' Kappa for multiple annotators.

In [None]:
def enrich_label_annotations(annotation):
    """
    Combine data from subcategory, severity, and comments arrays into the label array.

    Args:
        annotation: A single annotation object from the JSON data

    Returns:
        The annotation object with enriched label items
    """
    # Get the arrays
    labels = annotation.get('label', [])
    subcategories = annotation.get('subcategory', [])
    severities = annotation.get('severity', [])
    comments = annotation.get('comments', [])

    # Create a new enriched label array
    enriched_labels = []

    # Process each label item
    for i, label_item in enumerate(labels):
        # Create a copy of the label item to enrich
        enriched_item = label_item.copy()

        # Add subcategory if available, otherwise "None"
        enriched_item['subcategory'] = subcategories[i] if i < len(subcategories) else "None"

        # Add severity if available, otherwise "None"
        enriched_item['severity'] = severities[i] if i < len(severities) else "None"

        # Add comments if available, otherwise "None"
        enriched_item['comments'] = comments[i] if i < len(comments) else "None"

        # Add to the enriched labels array
        enriched_labels.append(enriched_item)

    # Replace the original label array with the enriched one
    annotation['enriched_labels'] = enriched_labels

    return annotation

def process_all_annotations(doc_id_to_annotations):
    """
    Process all annotations to enrich the label data.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations

    Returns:
        The updated doc_id_to_annotations dictionary
    """
    # Keep track of statistics for verification
    total_annotations = 0
    total_original_labels = 0
    total_enriched_labels = 0
    empty_annotations = []

    # Process each document's annotations
    for doc_id, annotations in doc_id_to_annotations.items():
        for i, annotation in enumerate(annotations):
            # Count original labels
            original_labels = annotation.get('label', [])
            total_original_labels += len(original_labels)

            # Track annotations with no labels
            if not original_labels:
                annotation_id = annotation.get('id', f"Unknown-{doc_id}-{i}")
                empty_annotations.append({
                    'doc_id': doc_id,
                    'annotation_id': annotation_id,
                    'metadata': {
                        'annotator': annotation.get('annotator', 'Unknown'),
                        'annotation_id': annotation.get('annotation_id', 'Unknown')
                    }
                })

            # Enrich the annotation
            enriched_annotation = enrich_label_annotations(annotation)

            # Count enriched labels
            enriched_labels = enriched_annotation.get('enriched_labels', [])
            total_enriched_labels += len(enriched_labels)

            # Update the annotation in the list
            annotations[i] = enriched_annotation
            total_annotations += 1

    # Print verification statistics
    print(f"\nProcessed {total_annotations} total annotations")
    print(f"Total original label items: {total_original_labels}")
    print(f"Total enriched label items: {total_enriched_labels}")

    # Print information about annotations with no labels
    if empty_annotations:
        print(f"\nFound {len(empty_annotations)} annotations with no label items:")
        for i, empty_ann in enumerate(empty_annotations, 1):
            print(f"  {i}. Doc ID: {empty_ann['doc_id']}, Annotation ID: {empty_ann['annotation_id']}")
    else:
        print("\nAll annotations contained label items - processing complete!")

    return doc_id_to_annotations

# Run the processing on your annotations
doc_id_to_annotations = process_all_annotations(doc_id_to_annotations)

# Function to print enriched label items in a specific order
def print_ordered_label(label_item):
    """Print a label item with keys in a specific order"""
    # Define the desired order
    order = ['start', 'end', 'text', 'labels', 'subcategory', 'severity', 'comments']

    # Print each field in order (if it exists)
    print("{")
    for key in order:
        if key in label_item:
            value = label_item[key]
            # Format string values with quotes
            if isinstance(value, str):
                formatted_value = f"'{value}'"
            else:
                formatted_value = value
            print(f"    '{key}': {formatted_value},")
    print("}")

# Show a sample of the enriched data
print("\nSample of enriched label data:")
for doc_id, annotations in doc_id_to_annotations.items():
    if annotations:
        sample_annotation = annotations[0]
        enriched_labels = sample_annotation.get('enriched_labels', [])

        if enriched_labels and len(enriched_labels) > 0:
            print(f"\nDoc ID {doc_id} - First enriched label item:")
            print_ordered_label(enriched_labels[0])
            print(f"Total enriched label items for this annotation: {len(enriched_labels)}")
            break

#### **Pre-processing Document-Level Data**

The code below standardizes and enriches document-level annotation data, including overall ratings, comments, and timing metrics. This preprocessing addresses several challenges:

- **Standardizing Inconsistent Structures**: Document-level ratings (`overall_correspondence`, `overall_readability`) appear in varied formats across annotations. This preprocessing extracts numeric values into a consistent structure.

- **Handling Missing Data**: Fields like `document_issues`, `correspondence_comments`, and `readability_comments` are irregularly present. The preprocessing explicitly marks missing fields with `None`, which provides valuable training information for evaluators.

- **Calculating Time Metrics**: Two time measurements are extracted:
  - `lead_time`: The active work time tracked by the annotation system
  - `review_time`: The elapsed time between creation and completion (computed from timestamps)
  
- **Organizing by Document**: Annotations are grouped by document ID, enabling document-level analysis and comparisons.

This preprocessing enables several important analyses:
1. **Evaluator Performance Assessment**: Identifying annotators who consistently omit required fields or take substantially less/more time than peers
2. **Inter-Rater Reliability**: Calculating agreement metrics for overall correspondence and readability ratings
3. **Document Difficulty Analysis**: Correlating document characteristics with annotation time and rating consistency
4. **Efficiency Metrics**: Comparing active work time versus total elapsed time to identify workflow bottlenecks

The resulting standardized structure makes it straightforward to create visualizations showing rating distributions, completion rates, and time statistics across documents and annotators.

In [None]:
import datetime

def preprocess_document_level_data(doc_id_to_annotations):
    """
    Standardize document-level data across all annotations.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations

    Returns:
        List of standardized document-level data dictionaries
    """
    processed_data = []

    # Process all annotations across all documents
    for doc_id, annotations in doc_id_to_annotations.items():
        for annotation in annotations:
            # Create standardized structure
            doc_data = {
                "annotation_id": annotation.get("annotation_id"),
                "doc_id": doc_id,  # Use the doc_id from the dictionary key
                "annotator": annotation.get("annotator"),
                "lead_time": annotation.get("lead_time"),
                "lead_time_minutes": round(annotation.get("lead_time", 0) / 60, 2),
                "created_at": annotation.get("created_at"),
                "updated_at": annotation.get("updated_at")
            }

            # Calculate review time (difference between created_at and updated_at)
            if annotation.get("created_at") and annotation.get("updated_at"):
                try:
                    created = datetime.datetime.fromisoformat(annotation["created_at"].replace('Z', '+00:00'))
                    updated = datetime.datetime.fromisoformat(annotation["updated_at"].replace('Z', '+00:00'))
                    review_time_seconds = (updated - created).total_seconds()
                    doc_data["review_time"] = review_time_seconds
                    doc_data["review_time_minutes"] = round(review_time_seconds / 60, 2)
                except (ValueError, TypeError):
                    doc_data["review_time"] = None
                    doc_data["review_time_minutes"] = None
            else:
                doc_data["review_time"] = None
                doc_data["review_time_minutes"] = None

            # Extract ratings from nested structures if present
            if "overall_correspondence" in annotation:
                if isinstance(annotation["overall_correspondence"], list) and len(annotation["overall_correspondence"]) > 0:
                    doc_data["correspondence_rating"] = annotation["overall_correspondence"][0].get("rating")
                else:
                    doc_data["correspondence_rating"] = None
            else:
                doc_data["correspondence_rating"] = None

            if "overall_readability" in annotation:
                if isinstance(annotation["overall_readability"], list) and len(annotation["overall_readability"]) > 0:
                    doc_data["readability_rating"] = annotation["overall_readability"][0].get("rating")
                else:
                    doc_data["readability_rating"] = None
            else:
                doc_data["readability_rating"] = None

            # Handle text fields
            doc_data["document_issues"] = annotation.get("document_issues", None)
            doc_data["correspondence_comments"] = annotation.get("correspondence_comments", None)
            doc_data["readability_comments"] = annotation.get("readability_comments", None)

            # Track missing fields for training feedback
            doc_data["missing_fields"] = []
            if doc_data["correspondence_rating"] is None:
                doc_data["missing_fields"].append("correspondence_rating")
            if doc_data["readability_rating"] is None:
                doc_data["missing_fields"].append("readability_rating")
            if doc_data["document_issues"] is None:
                doc_data["missing_fields"].append("document_issues")
            if doc_data["correspondence_comments"] is None:
                doc_data["missing_fields"].append("correspondence_comments")
            if doc_data["readability_comments"] is None:
                doc_data["missing_fields"].append("readability_comments")

            processed_data.append(doc_data)

    return processed_data

def summarize_document_level_data(processed_data):
    """
    Generate summary statistics for document-level data.

    Args:
        processed_data: Output from preprocess_document_level_data

    Returns:
        Dictionary of summary statistics
    """
    total_annotations = len(processed_data)
    total_with_missing_fields = sum(1 for item in processed_data if item["missing_fields"])

    # Get list of unique document IDs
    doc_ids = set(item.get('doc_id') for item in processed_data if item.get('doc_id') is not None)
    total_documents = len(doc_ids)

    # Per-document statistics
    per_document_stats = {}
    for doc_id in doc_ids:
        doc_annotations = [item for item in processed_data if item.get('doc_id') == doc_id]
        corr_ratings = [item.get('correspondence_rating') for item in doc_annotations
                        if item.get('correspondence_rating') is not None]
        read_ratings = [item.get('readability_rating') for item in doc_annotations
                       if item.get('readability_rating') is not None]
        lead_times = [item.get('lead_time') for item in doc_annotations
                     if item.get('lead_time') is not None]

        per_document_stats[doc_id] = {
            'count': len(doc_annotations),
            'correspondence_ratings': corr_ratings,
            'readability_ratings': read_ratings,
            'lead_times': lead_times
        }

    missing_field_counts = {
        "correspondence_rating": sum(1 for item in processed_data if "correspondence_rating" in item["missing_fields"]),
        "readability_rating": sum(1 for item in processed_data if "readability_rating" in item["missing_fields"]),
        "document_issues": sum(1 for item in processed_data if "document_issues" in item["missing_fields"]),
        "correspondence_comments": sum(1 for item in processed_data if "correspondence_comments" in item["missing_fields"]),
        "readability_comments": sum(1 for item in processed_data if "readability_comments" in item["missing_fields"])
    }

    # Lead time statistics (non-None values only)
    lead_times = [item["lead_time"] for item in processed_data if item["lead_time"] is not None]
    lead_time_stats = {
        "count": len(lead_times),
        "min": min(lead_times) if lead_times else None,
        "max": max(lead_times) if lead_times else None,
        "mean": sum(lead_times) / len(lead_times) if lead_times else None
    }

    # Review time statistics (non-None values only)
    review_times = [item["review_time"] for item in processed_data if item["review_time"] is not None]
    review_time_stats = {
        "count": len(review_times),
        "min": min(review_times) if review_times else None,
        "max": max(review_times) if review_times else None,
        "mean": sum(review_times) / len(review_times) if review_times else None
    }

    return {
        "total_annotations": total_annotations,
        "total_documents": total_documents,
        "total_with_missing_fields": total_with_missing_fields,
        "percentage_incomplete": round(100 * total_with_missing_fields / total_annotations, 2) if total_annotations > 0 else 0,
        "missing_field_counts": missing_field_counts,
        "per_document_stats": per_document_stats,
        "lead_time_stats": lead_time_stats,
        "review_time_stats": review_time_stats
    }

def generate_readable_report(processed_data, summary):
    """
    Generate a human-readable report of the preprocessing results.

    Args:
        processed_data: Output from preprocess_document_level_data
        summary: Output from summarize_document_level_data

    Returns:
        String containing a formatted report
    """
    # Start building the report
    report_lines = []

    # Overall summary
    report_lines.append("=" * 80)
    report_lines.append("DOCUMENT-LEVEL DATA PREPROCESSING REPORT")
    report_lines.append("=" * 80)
    report_lines.append("")

    report_lines.append(f"Processed {summary['total_annotations']} total annotations across {summary['total_documents']} documents")

    # Completeness information
    complete_count = summary['total_annotations'] - summary['total_with_missing_fields']
    report_lines.append(f"\nCOMPLETENESS SUMMARY:")
    report_lines.append(f"- Complete annotations: {complete_count} ({100-summary['percentage_incomplete']}%)")
    report_lines.append(f"- Incomplete annotations: {summary['total_with_missing_fields']} ({summary['percentage_incomplete']}%)")

    # Details of missing fields
    report_lines.append("\nMISSING FIELDS BREAKDOWN:")
    for field, count in summary['missing_field_counts'].items():
        if count > 0:
            report_lines.append(f"- {field}: missing in {count} annotations ({round(100*count/summary['total_annotations'], 1)}%)")

    # Document breakdown
    report_lines.append("\nDOCUMENT BREAKDOWN:")
    for doc_id, doc_stats in summary['per_document_stats'].items():
        report_lines.append(f"\nDocument ID {doc_id}:")
        report_lines.append(f"- Total annotations: {doc_stats['count']}")

        if doc_stats['correspondence_ratings']:
            avg_corr = sum(doc_stats['correspondence_ratings']) / len(doc_stats['correspondence_ratings'])
            report_lines.append(f"- Average correspondence rating: {avg_corr:.1f}")
        else:
            report_lines.append(f"- Average correspondence rating: No data")

        if doc_stats['readability_ratings']:
            avg_read = sum(doc_stats['readability_ratings']) / len(doc_stats['readability_ratings'])
            report_lines.append(f"- Average readability rating: {avg_read:.1f}")
        else:
            report_lines.append(f"- Average readability rating: No data")

        if doc_stats['lead_times']:
            avg_lead = sum(doc_stats['lead_times']) / len(doc_stats['lead_times']) / 60
            report_lines.append(f"- Average lead time: {avg_lead:.1f} minutes")

    # Time statistics
    report_lines.append("\nOVERALL TIME STATISTICS:")

    # Lead time
    if summary['lead_time_stats']['count'] > 0:
        avg_lead_min = round(summary['lead_time_stats']['mean'] / 60, 2)
        min_lead_min = round(summary['lead_time_stats']['min'] / 60, 2)
        max_lead_min = round(summary['lead_time_stats']['max'] / 60, 2)
        report_lines.append(f"- Lead time (active work time):")
        report_lines.append(f"  * Average: {avg_lead_min} minutes")
        report_lines.append(f"  * Range: {min_lead_min} to {max_lead_min} minutes")

    # Review time
    if summary['review_time_stats']['count'] > 0:
        avg_review_min = round(summary['review_time_stats']['mean'] / 60, 2)
        min_review_min = round(summary['review_time_stats']['min'] / 60, 2)
        max_review_min = round(summary['review_time_stats']['max'] / 60, 2)
        report_lines.append(f"- Review time (total elapsed time):")
        report_lines.append(f"  * Average: {avg_review_min} minutes")
        report_lines.append(f"  * Range: {min_review_min} to {max_review_min} minutes")

    # Sample of processed data
    report_lines.append("\nSAMPLE OF PROCESSED DATA:")

    # Select a complete sample if possible
    complete_samples = [item for item in processed_data if not item["missing_fields"]]
    sample = complete_samples[0] if complete_samples else processed_data[0]

    report_lines.append(f"Annotation ID: {sample['annotation_id']}")
    report_lines.append(f"Document ID: {sample.get('doc_id', 'N/A')}")
    report_lines.append(f"Annotator: {sample.get('annotator', 'N/A')}")

    if sample.get('correspondence_rating') is not None:
        report_lines.append(f"Correspondence Rating: {sample['correspondence_rating']}")

    if sample.get('readability_rating') is not None:
        report_lines.append(f"Readability Rating: {sample['readability_rating']}")

    report_lines.append(f"Lead Time: {sample.get('lead_time_minutes', 'N/A')} minutes")
    report_lines.append(f"Review Time: {sample.get('review_time_minutes', 'N/A')} minutes")

    # Finish the report
    report_lines.append("\n" + "=" * 80)
    report_lines.append("PREPROCESSING COMPLETE")
    report_lines.append("=" * 80)

    return "\n".join(report_lines)

def process_and_report(doc_id_to_annotations):
    """
    Process the annotations and generate a human-readable report.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations

    Returns:
        Tuple containing (processed_data, summary, report)
    """
    processed_data = preprocess_document_level_data(doc_id_to_annotations)
    summary = summarize_document_level_data(processed_data)
    report = generate_readable_report(processed_data, summary)

    # Print the report
    print(report)

    return processed_data, summary, report

# Example usage:
# processed_data, summary, report = process_and_report(doc_id_to_annotations)

In [None]:
# Run document-level preprocessing on the same data used for label enrichment
processed_data, summary, report = process_and_report(doc_id_to_annotations)

## 4. Exploratory Data Analysis

### Visualizing Overlaps in Translation Error Annotations - Code

This code produces a visualization of the annotated text, where annotations are highlighted and text highlighted in darker colors reflects that more annotators agreed that the span constituted an error. Hovering over the spans produces a pop up that states the number of annotators who marked that span, plus their categorization (label, subcategory, severity) and comments.

In [None]:
def create_span_overlap_visualization(doc_id, doc_id_to_annotations):
    """
    Create a visualization showing text with highlights based on annotation span overlap.

    Args:
        doc_id: The document ID to visualize
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations

    Returns:
        HTML string with the visualization
    """
    # Get all annotations for this document
    annotations = doc_id_to_annotations.get(doc_id, [])

    if not annotations:
        return f"<p>No annotations found for document ID {doc_id}</p>"

    # Get the text content from the first annotation (all should have the same text)
    text = annotations[0].get('text', '')

    if not text:
        return f"<p>No text content found for document ID {doc_id}</p>"

    # Collect all spans from all annotators
    all_spans = []
    annotator_ids = set()

    for annotation in annotations:
        annotator = annotation.get('annotator')
        annotator_ids.add(annotator)

        enriched_labels = annotation.get('enriched_labels', [])
        for label in enriched_labels:
            start = label.get('start', 0)
            end = label.get('end', 0)

            # Skip invalid spans
            if start >= end or start < 0 or end > len(text):
                continue

            all_spans.append({
                'annotator': annotator,
                'start': start,
                'end': end,
                'text': label.get('text', ''),
                'labels': label.get('labels', []),
                'subcategory': label.get('subcategory', ''),
                'severity': label.get('severity', ''),
                'comments': label.get('comments', '')
            })

    # Create an array to track spans at each character position
    # Instead of just counts, we'll store the full span information at each position
    char_spans = [[] for _ in range(len(text))]

    # Mark each span in the character spans array
    for span in all_spans:
        for i in range(span['start'], span['end']):
            if i < len(char_spans):
                char_spans[i].append(span)

    # Create a count array for the gradient calculations
    char_count = [len(spans) for spans in char_spans]

    # Find maximum overlap for normalization
    max_overlap = max(char_count) if char_count else 1

    # Generate HTML with spans for highlighting
    html_parts = []
    i = 0
    while i < len(text):
        if char_count[i] > 0:
            # Start of a highlighted region
            overlap = char_count[i]
            start_i = i

            # Find where this level of overlap ends
            while i < len(text) and char_count[i] == overlap:
                i += 1

            # Calculate highlight intensity (0 to 1)
            intensity = overlap / max_overlap

            # Generate color (lighter blue palette for better readability)
            r = int(220 - (100 * intensity))
            g = int(220 - (100 * intensity))
            b = int(255 - (35 * intensity))

            bg_color = f"rgb({r}, {g}, {b})"

            # Create detailed tooltip that shows all annotations for this span
            tooltip_content = []

            # Get all unique spans in this region
            span_positions = range(start_i, i)
            region_spans = set()
            for pos in span_positions:
                for span in char_spans[pos]:
                    # Use a tuple of annotator+start+end as a unique identifier for the span
                    span_id = (span['annotator'], span['start'], span['end'])
                    region_spans.add(span_id)

            # Get the full span objects from their IDs
            unique_spans = []
            for span_id in region_spans:
                for pos in span_positions:
                    for span in char_spans[pos]:
                        if (span['annotator'], span['start'], span['end']) == span_id and span not in unique_spans:
                            unique_spans.append(span)
                            break

            # Build tooltip content for each unique span
            for span in unique_spans:
                span_info = f"Annotator: {span['annotator']}"

                # Add labels if available
                if span['labels']:
                    labels_str = ', '.join(span['labels'])
                    span_info += f" | Labels: {labels_str}"

                # Add subcategory if available
                if span['subcategory']:
                    span_info += f" | Subcategory: {span['subcategory']}"

                # Add severity if available
                if span['severity']:
                    span_info += f" | Severity: {span['severity']}"

                # Add comments if available - tried both comments and comment fields
                if 'comments' in span and span['comments']:
                    # Escape any quotes in the comments to prevent breaking the HTML
                    comments = span['comments'].replace('"', '&quot;').replace("'", "&#39;")
                    span_info += f" | Comments: {comments}"

                tooltip_content.append(span_info)

            # Join all tooltip content with line breaks
            tooltip = f"{overlap} annotator(s)&#10;&#10;" + "&#10;".join(tooltip_content)

            # Create a span with appropriate styling and tooltip
            span_text = text[start_i:i].replace('\n', '<br>')
            html_parts.append(
                f'<span style="background-color: {bg_color};" '
                f'title="{tooltip}">{span_text}</span>'
            )
        else:
            # Non-highlighted text
            start_i = i

            # Find where non-highlighted region ends
            while i < len(text) and char_count[i] == 0:
                i += 1

            # Add the non-highlighted text
            span_text = text[start_i:i].replace('\n', '<br>')
            html_parts.append(span_text)

    # Balanced HTML structure with "just right" spacing
    html = f"""
    <div style="font-family: Arial, sans-serif; line-height: 1.3; margin: 0; padding: 0;">
        <h3 style="margin: 0 0 4px 0;">Document ID: {doc_id}</h3>
        <div style="margin: 0 0 6px 0; white-space: nowrap;">
            <span><strong>Number of unique annotators:</strong> {len(annotator_ids)}</span>
            <span style="margin-left: 10px;"><strong>Total annotations:</strong> {len(all_spans)}</span>
            <span style="margin-left: 10px;"><strong>Maximum overlap:</strong> {max_overlap} annotator{'' if max_overlap == 1 else 's'}</span>
        </div>
        <div style="border: 1px solid #ccc; padding: 10px; margin: 4px 0;">
            {''.join(html_parts)}
        </div>
        <div style="margin: 6px 0 0 0;">
            <p style="margin: 0 0 5px 0;"><strong>Legend:</strong></p>
            <div style="display: flex; flex-wrap: wrap; gap: 5px;">
    """

    # Generate a color legend with centered text
    for i in range(1, max_overlap + 1):
        intensity = i / max_overlap
        r = int(220 - (100 * intensity))
        g = int(220 - (100 * intensity))
        b = int(255 - (35 * intensity))
        bg_color = f"rgb({r}, {g}, {b})"

        html += f"""
            <div style="background-color: {bg_color}; display: flex; align-items: center; justify-content: center; min-width: 100px; height: 40px; text-align: center; margin-right: 5px;">
                {i} annotator{'' if i == 1 else 's'}
            </div>
        """

    html += """
            </div>
        </div>
    </div>
    """

    return html

def visualize_all_documents(doc_id_to_annotations, selected_doc_id=None):
    """
    Generate visualizations for all documents or a specific document in the dataset.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations
        selected_doc_id: Optional specific document ID to visualize

    Returns:
        HTML string with visualizations
    """
    all_html = "<h2 style='margin-bottom: 5px;'>Annotation Overlap Visualization</h2>"

    if selected_doc_id is not None:
        # Visualize only the selected document
        if selected_doc_id in doc_id_to_annotations:
            all_html += create_span_overlap_visualization(selected_doc_id, doc_id_to_annotations)
        else:
            all_html += f"<p>Document ID {selected_doc_id} not found in the dataset.</p>"
    else:
        # Visualize all documents
        # Sort document IDs to ensure consistent order
        doc_ids = sorted(doc_id_to_annotations.keys())

        for i, doc_id in enumerate(doc_ids):
            all_html += create_span_overlap_visualization(doc_id, doc_id_to_annotations)
            if i < len(doc_ids) - 1:  # Don't add hr after the last document
                all_html += "<hr style='margin: 12px 0;'>"

    return all_html

# Add the code to display the visualizations
from IPython.display import display, HTML

# Function to display all documents
def display_annotation_overlaps(doc_id=None):
    """
    Display annotation overlap visualizations.

    Args:
        doc_id: Optional specific document ID to visualize
    """
    html_output = visualize_all_documents(doc_id_to_annotations, doc_id)
    display(HTML(html_output))

### Visualizing Overlaps in Translation Error Annotations - Visualizations

In [None]:
# Run the visualization on all documents
display_annotation_overlaps()

In [None]:
# OR display a specific document (e.g., document ID 1)
display_annotation_overlaps(doc_id=1)

### Error Types Identified - Code

Here, visualizations on the types of errors (label, subcategory, severity) identified are presented.

In [None]:
def visualize_error_types(doc_id_to_annotations, doc_id=None, include_subcategories=False):
    """
    Visualize the distribution of error types across annotations.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations
        doc_id: Optional specific document ID to analyze (None for all documents)
        include_subcategories: Whether to show subcategories (default: False)
    """
    from collections import Counter

    # Filter annotations by document ID if specified
    if doc_id is not None:
        annotations = doc_id_to_annotations.get(doc_id, [])
        title_suffix = f" (Document ID: {doc_id})"
    else:
        # Combine all annotations
        annotations = []
        for doc_annotations in doc_id_to_annotations.values():
            annotations.extend(doc_annotations)
        title_suffix = " (All Documents)"

    # Extract error labels from all annotations
    labels_data = []
    subcategory_data = []

    for annotation in annotations:
        enriched_labels = annotation.get('enriched_labels', [])

        for label_item in enriched_labels:
            # Extract main error categories
            main_labels = label_item.get('labels', [])

            for main_label in main_labels:
                labels_data.append({
                    'doc_id': annotation.get('doc_id'),
                    'annotator': annotation.get('annotator'),
                    'error_type': main_label,
                    'severity': label_item.get('severity', 'Unknown')
                })

            # Extract subcategory if available
            subcategory = label_item.get('subcategory', 'None')
            if subcategory and subcategory != 'None':
                # Split subcategory (format is typically "CATEGORY: Description")
                subcategory_parts = subcategory.split(':', 1)
                if len(subcategory_parts) > 1:
                    category_code = subcategory_parts[0].strip()
                    description = subcategory_parts[1].strip()
                else:
                    category_code = subcategory
                    description = subcategory

                subcategory_data.append({
                    'doc_id': annotation.get('doc_id'),
                    'annotator': annotation.get('annotator'),
                    'category': category_code,
                    'description': description,
                    'full_subcategory': subcategory,
                    'severity': label_item.get('severity', 'Unknown')
                })

    # Create DataFrames
    labels_df = pd.DataFrame(labels_data)
    subcategory_df = pd.DataFrame(subcategory_data)

    # Print summary statistics
    print(f"Error Type Analysis{title_suffix}")
    print(f"Total annotations analyzed: {len(annotations)}")
    print(f"Total error labels found: {len(labels_df)}")
    print(f"Total error subcategories found: {len(subcategory_df)}")

    # If no error labels were found, return early
    if len(labels_df) == 0:
        print("No error labels found in the selected document(s).")
        return None

    # Count the frequency of each main error type
    main_counts = labels_df['error_type'].value_counts().reset_index()
    main_counts.columns = ['Error Type', 'Count']

    # Sort by count (descending)
    main_counts = main_counts.sort_values('Count', ascending=False)

    # Print main error type distribution
    print("\nMain Error Types Distribution:")
    for i, row in main_counts.iterrows():
        print(f"  {row['Error Type']}: {row['Count']} occurrences")

    # Create bar chart for main error types
    plt.figure(figsize=(12, 6))

    # Use a colorblind-friendly palette
    colors = sns.color_palette("colorblind", len(main_counts))

    # Create the bar chart
    bars = plt.bar(main_counts['Error Type'], main_counts['Count'], color=colors)

    # Add count labels on top of each bar
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height + 0.5,
                f'{int(height)}', ha='center', va='bottom')

    # Add labels and title
    plt.xlabel('Error Type', fontsize=12)
    plt.ylabel('Count', fontsize=12)
    plt.title(f'Distribution of Error Types{title_suffix}', fontsize=14)

    # Rotate x-axis labels if there are many categories
    if len(main_counts) > 5:
        plt.xticks(rotation=45, ha='right')

    # Add grid for better readability
    plt.grid(axis='y', linestyle=':', alpha=0.7)
    plt.tight_layout()
    plt.show()

    # Error type distribution by document if no specific doc_id was provided
    if doc_id is None and 'doc_id' in labels_df.columns:
        plt.figure(figsize=(14, 8))

        # Count error types by document
        doc_error_counts = labels_df.groupby(['doc_id', 'error_type']).size().reset_index(name='count')

        # Create grouped bar chart
        sns.barplot(x='doc_id', y='count', hue='error_type', data=doc_error_counts)

        plt.title('Error Types by Document', fontsize=14)
        plt.xlabel('Document ID', fontsize=12)
        plt.ylabel('Count', fontsize=12)
        plt.legend(title='Error Type')
        plt.grid(axis='y', linestyle=':', alpha=0.7)
        plt.tight_layout()
        plt.show()

    # If subcategories are requested, visualize them too
    if include_subcategories and len(subcategory_df) > 0:
        # Analyze subcategories
        # Group by category code and get counts
        subcategory_counts = subcategory_df['category'].value_counts().reset_index()
        subcategory_counts.columns = ['Category', 'Count']

        # Get the top 10 subcategories (if there are more than 10)
        if len(subcategory_counts) > 10:
            print("\nTop 10 Subcategories:")
            top_subcategories = subcategory_counts.head(10)
        else:
            print("\nAll Subcategories:")
            top_subcategories = subcategory_counts

        # Print subcategory distribution
        for i, row in top_subcategories.iterrows():
            print(f"  {row['Category']}: {row['Count']} occurrences")

        # Create detailed subcategory breakdown
        full_subcategory_counts = subcategory_df['full_subcategory'].value_counts().reset_index()
        full_subcategory_counts.columns = ['Subcategory', 'Count']

        # Get the top 15 full subcategories
        if len(full_subcategory_counts) > 15:
            top_full_subcategories = full_subcategory_counts.head(15)
        else:
            top_full_subcategories = full_subcategory_counts

        # Create horizontal bar chart for better readability with long labels
        plt.figure(figsize=(12, max(6, len(top_full_subcategories) * 0.4)))

        # Create horizontal bar chart
        bars = plt.barh(top_full_subcategories['Subcategory'],
                      top_full_subcategories['Count'],
                      color=sns.color_palette("colorblind", len(top_full_subcategories)))

        # Add count labels inside each bar
        for i, bar in enumerate(bars):
            width = bar.get_width()
            plt.text(width - 0.5, bar.get_y() + bar.get_height()/2.,
                    f'{int(width)}', ha='right', va='center',
                    color='white', fontweight='bold')

        plt.xlabel('Count', fontsize=12)
        plt.ylabel('Subcategory', fontsize=12)
        plt.title(f'Top Error Subcategories{title_suffix}', fontsize=14)
        plt.grid(axis='x', linestyle=':', alpha=0.7)
        plt.tight_layout()
        plt.show()

        # Add breakdown by severity if there's enough data
        if 'severity' in subcategory_df.columns and subcategory_df['severity'].nunique() > 1:
            plt.figure(figsize=(12, 6))

            # Count by severity and category
            severity_counts = subcategory_df.groupby(['category', 'severity']).size().reset_index(name='count')

            # Get top categories
            top_categories = subcategory_df['category'].value_counts().head(8).index
            severity_counts = severity_counts[severity_counts['category'].isin(top_categories)]

            # Create grouped bar chart
            sns.barplot(x='category', y='count', hue='severity', data=severity_counts)

            plt.title(f'Error Types by Severity{title_suffix}', fontsize=14)
            plt.xlabel('Error Category', fontsize=12)
            plt.ylabel('Count', fontsize=12)
            plt.legend(title='Severity')
            plt.grid(axis='y', linestyle=':', alpha=0.7)

            # Rotate x-axis labels if needed
            plt.xticks(rotation=45, ha='right')

            plt.tight_layout()
            plt.show()

    return labels_df, subcategory_df if include_subcategories else None

### Error Types Identified - Visualizations

In [None]:
# Basic error type analysis for all documents
visualize_error_types(doc_id_to_annotations);

In [None]:
# Analysis for a specific document
visualize_error_types(doc_id_to_annotations, doc_id=1);

In [None]:
# Include detailed subcategory analysis
visualize_error_types(doc_id_to_annotations, include_subcategories=True);

In [None]:
# Specific document with subcategories
visualize_error_types(doc_id_to_annotations, doc_id=2, include_subcategories=True);

### Annotator Data - Code

Here, summaries of annotator data that provide nuance to the analysis are presented, such as lead time and review time.

In [None]:
def summarize_annotation_timing(processed_data, doc_id=None):
    """
    Create a clear tabular summary of annotation timing statistics.

    Args:
        processed_data: Output from preprocess_document_level_data
        doc_id: Optional specific document ID to analyze (None for all documents)
    """
    import pandas as pd
    import numpy as np

    # Create a DataFrame from the processed data
    df = pd.DataFrame(processed_data)

    # Filter by document ID if specified
    if doc_id is not None:
        df = df[df['doc_id'] == doc_id]
        title_suffix = f" (Document ID: {doc_id})"
    else:
        title_suffix = " (All Documents)"

    print(f"\n==== ANNOTATION TIMING ANALYSIS{title_suffix} ====\n")

    # Overall statistics
    print("OVERALL TIMING STATISTICS:")
    print("--------------------------")

    for time_type, column in [('Lead Time', 'lead_time_minutes'),
                             ('Review Time', 'review_time_minutes')]:
        data = df[column].dropna()

        if len(data) > 0:
            print(f"\n{time_type} (minutes):")
            print(f"  Number of measurements: {len(data)}")
            print(f"  Median: {data.median():.1f}")
            print(f"  Mean: {data.mean():.1f}")
            print(f"  Q1 (25%): {data.quantile(0.25):.1f}")
            print(f"  Q3 (75%): {data.quantile(0.75):.1f}")
            print(f"  Min: {data.min():.1f}")
            print(f"  Max: {data.max():.1f}")

            # Calculate outliers by 1.5 IQR rule
            q1, q3 = data.quantile(0.25), data.quantile(0.75)
            iqr = q3 - q1
            upper_bound = q3 + 1.5 * iqr

            outliers = data[data > upper_bound]
            if len(outliers) > 0:
                print(f"\n  Outliers (> {upper_bound:.1f} min): {len(outliers)} values")
                print(f"  Outlier values: {', '.join([f'{x:.1f}' for x in sorted(outliers)])}")

    # Per-document analysis if not filtered
    if doc_id is None and 'doc_id' in df.columns:
        print("\n\nTIMING STATISTICS BY DOCUMENT:")
        print("-----------------------------")

        doc_ids = sorted(df['doc_id'].unique())

        # Create summary table data
        table_data = []
        for current_doc_id in doc_ids:
            doc_df = df[df['doc_id'] == current_doc_id]

            # Lead time stats
            lead_times = doc_df['lead_time_minutes'].dropna()
            lead_count = len(lead_times)
            lead_median = lead_times.median() if lead_count > 0 else float('nan')
            lead_q1 = lead_times.quantile(0.25) if lead_count > 0 else float('nan')
            lead_q3 = lead_times.quantile(0.75) if lead_count > 0 else float('nan')
            lead_min = lead_times.min() if lead_count > 0 else float('nan')
            lead_max = lead_times.max() if lead_count > 0 else float('nan')

            # Review time stats
            review_times = doc_df['review_time_minutes'].dropna()
            review_count = len(review_times)
            review_median = review_times.median() if review_count > 0 else float('nan')
            review_q1 = review_times.quantile(0.25) if review_count > 0 else float('nan')
            review_q3 = review_times.quantile(0.75) if review_count > 0 else float('nan')
            review_min = review_times.min() if review_count > 0 else float('nan')
            review_max = review_times.max() if review_count > 0 else float('nan')

            table_data.append({
                'Doc ID': current_doc_id,
                'Lead Count': lead_count,
                'Lead Median': lead_median,
                'Lead Q1-Q3': f"{lead_q1:.1f}-{lead_q3:.1f}" if lead_count > 0 else "N/A",
                'Lead Min-Max': f"{lead_min:.1f}-{lead_max:.1f}" if lead_count > 0 else "N/A",
                'Review Count': review_count,
                'Review Median': review_median,
                'Review Q1-Q3': f"{review_q1:.1f}-{review_q3:.1f}" if review_count > 0 else "N/A",
                'Review Min-Max': f"{review_min:.1f}-{review_max:.1f}" if review_count > 0 else "N/A"
            })

        # Create and format DataFrame
        summary_df = pd.DataFrame(table_data)

        # Format numeric columns
        for col in ['Lead Median', 'Review Median']:
            summary_df[col] = summary_df[col].apply(lambda x: f"{x:.1f}" if not pd.isna(x) else "N/A")

        # Print the table
        print("\nSummary by Document:")
        print(summary_df.to_string(index=False))

        # Print outliers by document
        print("\nOutliers by Document:")
        for current_doc_id in doc_ids:
            doc_df = df[df['doc_id'] == current_doc_id]

            print(f"\nDocument {current_doc_id}:")

            # Check for lead time outliers
            lead_times = doc_df['lead_time_minutes'].dropna()
            if len(lead_times) >= 4:  # Need reasonable amount of data for outlier detection
                q1, q3 = lead_times.quantile(0.25), lead_times.quantile(0.75)
                iqr = q3 - q1
                upper_bound = q3 + 1.5 * iqr
                outliers = lead_times[lead_times > upper_bound]

                if len(outliers) > 0:
                    print(f"  Lead Time outliers (> {upper_bound:.1f} min): {len(outliers)} values")
                    print(f"  Values: {', '.join([f'{x:.1f}' for x in sorted(outliers)])}")
                else:
                    print("  Lead Time: No outliers detected")
            else:
                print("  Lead Time: Insufficient data for outlier detection")

            # Check for review time outliers
            review_times = doc_df['review_time_minutes'].dropna()
            if len(review_times) >= 4:
                q1, q3 = review_times.quantile(0.25), review_times.quantile(0.75)
                iqr = q3 - q1
                upper_bound = q3 + 1.5 * iqr
                outliers = review_times[review_times > upper_bound]

                if len(outliers) > 0:
                    print(f"  Review Time outliers (> {upper_bound:.1f} min): {len(outliers)} values")
                    print(f"  Values: {', '.join([f'{x:.1f}' for x in sorted(outliers)])}")
                else:
                    print("  Review Time: No outliers detected")
            else:
                print("  Review Time: Insufficient data for outlier detection")

    # Return timing data if needed for further analysis
    timing_data = {
        'lead_time': df['lead_time_minutes'].dropna().values,
        'review_time': df['review_time_minutes'].dropna().values
    }

    return timing_data

### Annotator Data - Summaries

- Lead time: Active time to complete the task
- Review time: Time from start time to end time

In [None]:
# Show annotator distribution for a specific document
analyze_annotator_distribution(processed_data, doc_id=1)

In [None]:
# Show annotator distribution for a specific document
analyze_annotator_distribution(processed_data, doc_id=2)

In [None]:
# Show annotator distribution for a specific document
analyze_annotator_distribution(processed_data, doc_id=3)

In [None]:
# Overall summary for all documents
summarize_annotation_timing(processed_data);

In [None]:
# Summary for a specific document
summarize_annotation_timing(processed_data, doc_id=2);

### Overall correspondence and readability scores - Code

Here, visualizations on the overall correspondence and readability ratings given to a translation are generated.

In [None]:
def visualize_rating_distributions(processed_data, doc_id=None, plot_type='all'):
    """
    Visualize the distribution of correspondence and readability ratings.

    Args:
        processed_data: Output from preprocess_document_level_data
        doc_id: Optional specific document ID to visualize (None for all documents)
        plot_type: Type of plot to generate ('count', 'box', 'grouped', 'all')

    Returns:
        matplotlib figure(s)
    """
    from matplotlib.gridspec import GridSpec

    # Create a copy of the data to avoid modifying the original
    data = processed_data.copy()

    # Filter by document ID if specified
    if doc_id is not None:
        data = [item for item in data if item.get('doc_id') == doc_id]
        title_suffix = f" (Document ID: {doc_id})"
    else:
        title_suffix = " (All Documents)"

    # Extract correspondence and readability ratings
    correspondence_ratings = [item.get('correspondence_rating') for item in data
                             if item.get('correspondence_rating') is not None]
    readability_ratings = [item.get('readability_rating') for item in data
                          if item.get('readability_rating') is not None]

    # Count the occurrences of each rating
    corr_counts = pd.Series(correspondence_ratings).value_counts().sort_index()
    read_counts = pd.Series(readability_ratings).value_counts().sort_index()

    # Prepare a DataFrame for seaborn
    ratings_df = pd.DataFrame({
        'Rating Value': list(range(1, 5)) * 2,
        'Count': [corr_counts.get(i, 0) for i in range(1, 5)] + [read_counts.get(i, 0) for i in range(1, 5)],
        'Category': ['Correspondence'] * 4 + ['Readability'] * 4
    })

    # Long-format for box plots
    long_df = pd.DataFrame({
        'Rating Value': correspondence_ratings + readability_ratings,
        'Category': ['Correspondence'] * len(correspondence_ratings) + ['Readability'] * len(readability_ratings)
    })

    # Print summary statistics
    print(f"Correspondence Ratings{title_suffix}:")
    print(f"  Number of ratings: {len(correspondence_ratings)}")
    print(f"  Mean: {np.mean(correspondence_ratings):.2f}")
    print(f"  Median: {np.median(correspondence_ratings):.1f}")
    print(f"  Mode: {pd.Series(correspondence_ratings).mode().values}")
    print(f"  Distribution: {corr_counts.to_dict()}")

    print(f"\nReadability Ratings{title_suffix}:")
    print(f"  Number of ratings: {len(readability_ratings)}")
    print(f"  Mean: {np.mean(readability_ratings):.2f}")
    print(f"  Median: {np.median(readability_ratings):.1f}")
    print(f"  Mode: {pd.Series(readability_ratings).mode().values}")
    print(f"  Distribution: {read_counts.to_dict()}")

    # Create visualizations based on the requested plot type
    if plot_type == 'count' or plot_type == 'all':
        # Count Plot
        plt.figure(figsize=(12, 6))
        ax = sns.countplot(x='Rating Value', hue='Category', data=long_df)
        plt.title(f'Distribution of Rating Scores{title_suffix}')
        plt.xlabel('Rating (1-4 scale)')
        plt.ylabel('Count')

        # Add count labels on top of bars
        for p in ax.patches:
            height = p.get_height()
            if height > 0:
                ax.text(p.get_x() + p.get_width()/2.,
                        height + 0.1,
                        f'{height:.0f}',
                        ha="center")

        plt.tight_layout()
        plt.show()

    if plot_type == 'box' or plot_type == 'all':
        # Box Plot
        plt.figure(figsize=(10, 6))
        ax = sns.boxplot(x='Category', y='Rating Value', data=long_df)
        plt.title(f'Box Plot of Rating Distributions{title_suffix}')
        plt.ylabel('Rating (1-4 scale)')

        # Add jittered data points for better visibility
        sns.stripplot(x='Category', y='Rating Value', data=long_df,
                      color='black', alpha=0.5, jitter=True)

        # Set y-axis to show only the possible rating values
        plt.yticks([1, 2, 3, 4])
        plt.ylim(0.5, 4.5)

        plt.tight_layout()
        plt.show()

    if plot_type == 'grouped' or plot_type == 'all':
        # Grouped Bar Chart
        plt.figure(figsize=(12, 6))

        # Convert to percentages for easier comparison
        ratings_pct = ratings_df.copy()
        corr_total = sum(corr_counts)
        read_total = sum(read_counts)

        for i, row in ratings_pct.iterrows():
            if row['Category'] == 'Correspondence':
                ratings_pct.at[i, 'Percentage'] = (row['Count'] / corr_total * 100) if corr_total > 0 else 0
            else:
                ratings_pct.at[i, 'Percentage'] = (row['Count'] / read_total * 100) if read_total > 0 else 0

        # Create the grouped bar chart
        ax = sns.barplot(x='Rating Value', y='Percentage', hue='Category', data=ratings_pct)
        plt.title(f'Percentage of Ratings by Category{title_suffix}')
        plt.xlabel('Rating (1-4 scale)')
        plt.ylabel('Percentage (%)')

        # Add percentage labels on top of bars
        for p in ax.patches:
            height = p.get_height()
            if height > 0:
                ax.text(p.get_x() + p.get_width()/2.,
                        height + 1,
                        f'{height:.1f}%',
                        ha="center")

        plt.tight_layout()
        plt.show()

    if plot_type == 'all':
        # Combined visualization with all plots
        fig = plt.figure(figsize=(18, 10))
        gs = GridSpec(2, 2, figure=fig)

        # Count plot
        ax1 = fig.add_subplot(gs[0, 0])
        sns.countplot(x='Rating Value', hue='Category', data=long_df, ax=ax1)
        ax1.set_title('Distribution of Rating Scores')
        ax1.set_xlabel('Rating (1-4 scale)')
        ax1.set_ylabel('Count')

        # Box plot
        ax2 = fig.add_subplot(gs[0, 1])
        sns.boxplot(x='Category', y='Rating Value', data=long_df, ax=ax2)
        sns.stripplot(x='Category', y='Rating Value', data=long_df,
                      color='black', alpha=0.5, jitter=True, ax=ax2)
        ax2.set_title('Box Plot of Rating Distributions')
        ax2.set_ylabel('Rating (1-4 scale)')
        ax2.set_yticks([1, 2, 3, 4])
        ax2.set_ylim(0.5, 4.5)

        # Grouped bar chart
        ax3 = fig.add_subplot(gs[1, :])
        sns.barplot(x='Rating Value', y='Percentage', hue='Category', data=ratings_pct, ax=ax3)
        ax3.set_title('Percentage of Ratings by Category')
        ax3.set_xlabel('Rating (1-4 scale)')
        ax3.set_ylabel('Percentage (%)')

        fig.suptitle(f'Rating Analysis{title_suffix}', fontsize=16)
        plt.tight_layout()
        plt.subplots_adjust(top=0.9)
        plt.show()

    return ratings_df, long_df

### Overall correspondence and readability scores - Visualizations

#### All visualizations

In [None]:
# Show all plot types for a specific document
visualize_rating_distributions(processed_data, doc_id=1, plot_type='all');

#### Count plot on rating distributions

**How to Interpret Rating Distribution Charts**

This chart visualizes the distribution of translation quality ratings across two assessment categories: correspondence and readability.

*Reading the Count Plot*
- X-axis: Shows the rating scale (1-4), where 1 is lowest quality and 4 is highest quality
- Y-axis: Shows the count (number of annotators) who assigned each rating
- Color: Distinguishes between Readability (blue) and Correspondence (orange) ratings

*Key Insights*
- Rating Distribution: Taller bars indicate more common ratings
- Rating Patterns: Compare blue vs. orange bars to see differences between how annotators rated readability vs. correspondence
- Agreement Level: Bars concentrated at one rating value suggest stronger annotator agreement
- Quality Assessment: The concentration of ratings at higher numbers (3-4) indicates better perceived translation quality

*Document selection*

This visualization aggregates ratings across all documents. To view ratings for a specific document, use the document ID parameter when calling the visualization function.

Show only count plot for a single documents: visualize_rating_distributions(processed_data, doc_id=1, plot_type='count');

In [None]:
# Show only count plot for all documents
visualize_rating_distributions(processed_data, doc_id=None, plot_type='count');

#### Box plot on rating distribution

In [None]:
# Show only box plot for document ID 2
visualize_rating_distributions(processed_data, doc_id=2, plot_type='box');

#### Grouped bar chart of rating distributions

In [None]:
# Show only grouped bar chart for document ID 3
visualize_rating_distributions(processed_data, doc_id=3, plot_type='grouped');

## 5. Translation Error Annotations - Agreement Calculations

- Exact matching for span boundaries
- F1 for partial credit on spans
- Kappa for category agreement, subcategory agreement, and severity levels

### 5.1 Exact Matching for Span Boundaries


**How it works**
- Measures when two or more annotators identify exactly the same text span (identical start and end positions)
- Typically expressed as a percentage: "Annotators agreed on exact span boundaries for 65% of errors"

**What you'll learn**
- How precisely annotators agree on where errors begin and end
- Identifies potential ambiguity in annotation guidelines
- Shows if annotators are consistently identifying the same textual issues

In MQM translation error annotation context, exact matching shows how often annotators precisely agree on which specific words or phrases contain errors.

#### **Exact Matching for Span Boundaries - Code**

In [None]:
def calculate_exact_matches(doc_id_to_annotations):
    """
    Calculate exact matches between annotators for each document.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations

    Returns:
        Dictionary with exact match statistics
    """
    # Results structure
    results = {
        'per_document': {},
        'overall': {
            'total_comparisons': 0,
            'total_exact_matches': 0,
            'exact_match_rate': 0
        }
    }

    # Process each document
    for doc_id, annotations in doc_id_to_annotations.items():
        # Group annotations by annotator
        annotator_to_spans = {}

        for annotation in annotations:
            annotator = annotation.get('annotator')
            if not annotator:
                continue

            # Get all span boundaries for this annotator
            spans = []
            for label in annotation.get('enriched_labels', []):
                spans.append({
                    'start': label.get('start'),
                    'end': label.get('end'),
                    'text': label.get('text'),
                    'category': label.get('labels', [''])[0] if label.get('labels') else '',
                    'subcategory': label.get('subcategory', ''),
                    'severity': label.get('severity', '')
                })

            annotator_to_spans[annotator] = spans

        # Skip documents with fewer than 2 annotators
        if len(annotator_to_spans) < 2:
            continue

        # Initialize document results
        doc_results = {
            'annotator_pairs': [],
            'total_comparisons': 0,
            'total_exact_matches': 0,
            'exact_match_rate': 0
        }

        # Compare each pair of annotators
        for annotator1, annotator2 in combinations(annotator_to_spans.keys(), 2):
            spans1 = annotator_to_spans[annotator1]
            spans2 = annotator_to_spans[annotator2]

            # Count exact matches
            exact_matches = 0

            # Create sets of (start, end) tuples for each annotator
            spans1_set = {(span['start'], span['end']) for span in spans1}
            spans2_set = {(span['start'], span['end']) for span in spans2}

            # Find intersection (exact matches)
            exact_match_set = spans1_set.intersection(spans2_set)
            exact_matches = len(exact_match_set)

            # Calculate comparison stats
            total_spans = len(spans1) + len(spans2)
            unique_spans = len(spans1_set.union(spans2_set))

            # Avoid division by zero
            exact_match_rate = exact_matches / unique_spans if unique_spans > 0 else 0

            # Store pair results
            pair_result = {
                'annotator1': annotator1,
                'annotator2': annotator2,
                'spans1': len(spans1),
                'spans2': len(spans2),
                'unique_spans': unique_spans,
                'exact_matches': exact_matches,
                'exact_match_rate': exact_match_rate
            }

            doc_results['annotator_pairs'].append(pair_result)
            doc_results['total_comparisons'] += unique_spans
            doc_results['total_exact_matches'] += exact_matches

        # Calculate overall document exact match rate
        if doc_results['total_comparisons'] > 0:
            doc_results['exact_match_rate'] = doc_results['total_exact_matches'] / doc_results['total_comparisons']

        # Store document results
        results['per_document'][doc_id] = doc_results

        # Update overall statistics
        results['overall']['total_comparisons'] += doc_results['total_comparisons']
        results['overall']['total_exact_matches'] += doc_results['total_exact_matches']

    # Calculate overall exact match rate
    if results['overall']['total_comparisons'] > 0:
        results['overall']['exact_match_rate'] = results['overall']['total_exact_matches'] / results['overall']['total_comparisons']

    return results

In [None]:
def display_exact_match_results(exact_match_results):
    """
    Display exact match results in a readable format with visualizations.

    Args:
        exact_match_results: Output from calculate_exact_matches function
    """
    print("=" * 60)
    print("EXACT MATCH ANALYSIS FOR SPAN BOUNDARIES")
    print("=" * 60)

    # Overall results
    overall = exact_match_results['overall']
    print(f"\nOVERALL RESULTS:")
    print(f"Total comparisons: {overall['total_comparisons']}")
    print(f"Total exact matches: {overall['total_exact_matches']}")
    print(f"Exact match rate: {overall['exact_match_rate']:.2%}")

    # Per-document results
    print("\nRESULTS BY DOCUMENT:")
    for doc_id, doc_results in exact_match_results['per_document'].items():
        print(f"\nDocument {doc_id}:")
        print(f"  Exact match rate: {doc_results['exact_match_rate']:.2%}")
        print(f"  Exact matches: {doc_results['total_exact_matches']} out of {doc_results['total_comparisons']}")

        # Results by annotator pair
        print("\n  Results by annotator pair:")
        for pair in doc_results['annotator_pairs']:
            print(f"    Annotators {pair['annotator1']} and {pair['annotator2']}:")
            print(f"      Spans: {pair['spans1']} and {pair['spans2']}")
            print(f"      Exact matches: {pair['exact_matches']} out of {pair['unique_spans']} unique spans")
            print(f"      Exact match rate: {pair['exact_match_rate']:.2%}")

    # Create visualization
    plt.figure(figsize=(10, 6))

    # Prepare data for bar chart
    doc_ids = []
    doc_rates = []

    for doc_id, doc_results in exact_match_results['per_document'].items():
        doc_ids.append(f"Doc {doc_id}")
        doc_rates.append(doc_results['exact_match_rate'] * 100)

    # Add overall rate
    doc_ids.append("Overall")
    doc_rates.append(overall['exact_match_rate'] * 100)

    # Plot
    bars = plt.bar(doc_ids, doc_rates, color='skyblue')
    plt.axhline(y=overall['exact_match_rate'] * 100, color='r', linestyle='--', alpha=0.7, label='Overall Average')

    # Add value labels on top of bars
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height + 1,
                f'{height:.1f}%', ha='center', va='bottom')

    plt.title('Exact Match Rate by Document')
    plt.xlabel('Document')
    plt.ylabel('Exact Match Rate (%)')
    plt.ylim(0, max(doc_rates) * 1.2)  # Add some space for labels
    plt.legend()
    plt.tight_layout()

    plt.show()

In [None]:
# Calculate exact matches
exact_match_results = calculate_exact_matches(doc_id_to_annotations)

#### **Exact Matching for Span Boundaries - Visualizations**

In [None]:
# Display results
display_exact_match_results(exact_match_results)

### 5.2 F1 for Partial Credit on Spans

**How F1 Calculation Works**

The core concept: Instead of requiring exact span boundaries to match, we:
- Give credit when spans overlap
- Calculate how much they overlap relative to each annotator's span (precision and recall)
- Combine these into an F1 score
- Values range from 0 (no overlap) to 1 (perfect overlap)

For each span:
- Precision: What percentage of annotator 1's span is overlapped by annotator 2's span
- Recall: What percentage of annotator 2's span is overlapped by annotator 1's span
- F1: The harmonic mean of precision and recall (balances both)

Matching process:
- For each span from annotator 1, find the span from annotator 2 that gives the highest F1 score
- If multiple spans from annotator 2 overlap with a span from annotator 1, only the best match counts

Aggregation:
- Calculate average precision, recall, and F1 for each annotator pair
- Calculate averages per document and overall across all documents

**What You'll Learn From F1 Scores**

F1 scores give a more nuanced view of agreement than exact matching. For example, if one annotator marks "aerial carbon" as an error while another marks "aerial carbon in trees", F1 would give partial credit for this overlap.

F1 scores are likely be much higher than exact match percentages, showing that:
- Annotators are often identifying the same errors but choosing slightly different span boundaries
- Some annotator pairs have more similar annotation styles than others
- Some documents may have more clearly defined errors, leading to higher agreement

The F1 visualization will help you identify which annotators tend to agree more with others, and which documents have the most consistent annotations.

#### **F1 for Partial Credit on Spans - Code**

In [None]:
def calculate_span_f1(doc_id_to_annotations, overlap_threshold=0.0):
    """
    Calculate F1 scores for partial span matches between annotators.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations
        overlap_threshold: Minimum overlap ratio required to consider spans as matching (0.0 means any overlap)

    Returns:
        Dictionary with F1 statistics
    """
    # Results structure
    results = {
        'per_document': {},
        'overall': {
            'total_comparisons': 0,
            'avg_precision': 0,
            'avg_recall': 0,
            'avg_f1': 0
        }
    }

    overall_precisions = []
    overall_recalls = []
    overall_f1s = []

    # Process each document
    for doc_id, annotations in doc_id_to_annotations.items():
        # Group annotations by annotator
        annotator_to_spans = {}

        for annotation in annotations:
            annotator = annotation.get('annotator')
            if not annotator:
                continue

            # Get all span boundaries for this annotator
            spans = []
            for label in annotation.get('enriched_labels', []):
                spans.append({
                    'start': label.get('start'),
                    'end': label.get('end'),
                    'text': label.get('text'),
                    'length': label.get('end') - label.get('start'),
                    'category': label.get('labels', [''])[0] if label.get('labels') else '',
                    'subcategory': label.get('subcategory', ''),
                    'severity': label.get('severity', '')
                })

            annotator_to_spans[annotator] = spans

        # Skip documents with fewer than 2 annotators
        if len(annotator_to_spans) < 2:
            continue

        # Initialize document results
        doc_results = {
            'annotator_pairs': [],
            'avg_precision': 0,
            'avg_recall': 0,
            'avg_f1': 0
        }

        doc_precisions = []
        doc_recalls = []
        doc_f1s = []

        # Compare each pair of annotators
        for annotator1, annotator2 in combinations(annotator_to_spans.keys(), 2):
            spans1 = annotator_to_spans[annotator1]
            spans2 = annotator_to_spans[annotator2]

            # Skip if either annotator has no spans
            if not spans1 or not spans2:
                continue

            # Create binary arrays representing each character position
            # This is a more rigorous way to calculate true precision/recall
            max_pos = max([span['end'] for spans in [spans1, spans2] for span in spans])
            array1 = np.zeros(max_pos + 1, dtype=bool)
            array2 = np.zeros(max_pos + 1, dtype=bool)

            # Mark positions covered by each annotator's spans
            for span in spans1:
                array1[span['start']:span['end']] = True
            for span in spans2:
                array2[span['start']:span['end']] = True

            # Calculate true precision and recall using the character arrays
            # Precision: proportion of characters marked by annotator1 that are also marked by annotator2
            # Recall: proportion of characters marked by annotator2 that are also marked by annotator1
            true_positives = np.sum(array1 & array2)
            total_predicted = np.sum(array1)
            total_actual = np.sum(array2)

            precision = true_positives / total_predicted if total_predicted > 0 else 0
            recall = true_positives / total_actual if total_actual > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

            # Track match statistics for reporting purposes
            # Find overlapping spans for detailed reporting
            matches = []
            for span1 in spans1:
                for span2 in spans2:
                    overlap_start = max(span1['start'], span2['start'])
                    overlap_end = min(span1['end'], span2['end'])

                    if overlap_start < overlap_end:  # Spans overlap
                        matches.append({
                            'span1': span1,
                            'span2': span2,
                            'overlap_length': overlap_end - overlap_start
                        })

            # Store pair results
            pair_result = {
                'annotator1': annotator1,
                'annotator2': annotator2,
                'spans1': len(spans1),
                'spans2': len(spans2),
                'overlap_matches': len(matches),
                'precision': precision,
                'recall': recall,
                'f1': f1
            }

            doc_precisions.append(precision)
            doc_recalls.append(recall)
            doc_f1s.append(f1)

            overall_precisions.append(precision)
            overall_recalls.append(recall)
            overall_f1s.append(f1)

            doc_results['annotator_pairs'].append(pair_result)

        # Calculate document averages
        if doc_precisions:
            doc_results['avg_precision'] = sum(doc_precisions) / len(doc_precisions)
            doc_results['avg_recall'] = sum(doc_recalls) / len(doc_recalls)
            doc_results['avg_f1'] = sum(doc_f1s) / len(doc_f1s)

        # Store document results
        results['per_document'][doc_id] = doc_results

    # Calculate overall averages
    if overall_precisions:
        results['overall']['avg_precision'] = sum(overall_precisions) / len(overall_precisions)
        results['overall']['avg_recall'] = sum(overall_recalls) / len(overall_recalls)
        results['overall']['avg_f1'] = sum(overall_f1s) / len(overall_f1s)
        results['overall']['total_comparisons'] = len(overall_f1s)

    return results

In [None]:
def display_span_f1_results(f1_results):
    """
    Display F1 results in a readable format with visualizations.

    Args:
        f1_results: Output from calculate_span_f1 function
    """
    print("=" * 60)
    print("F1 ANALYSIS FOR PARTIAL SPAN MATCHES")
    print("=" * 60)

    # Overall results
    overall = f1_results['overall']
    print(f"\nOVERALL RESULTS:")
    print(f"Total annotator pair comparisons: {overall['total_comparisons']}")
    print(f"Average Precision: {overall['avg_precision']:.2%}")
    print(f"Average Recall: {overall['avg_recall']:.2%}")
    print(f"Average F1 Score: {overall['avg_f1']:.2%}")

    # Per-document results
    print("\nRESULTS BY DOCUMENT:")
    for doc_id, doc_results in f1_results['per_document'].items():
        print(f"\nDocument {doc_id}:")
        print(f"  Average F1 Score: {doc_results['avg_f1']:.2%}")
        print(f"  Average Precision: {doc_results['avg_precision']:.2%}")
        print(f"  Average Recall: {doc_results['avg_recall']:.2%}")

        # Results by annotator pair
        print("\n  Results by annotator pair:")
        for pair in sorted(doc_results['annotator_pairs'], key=lambda x: x['f1'], reverse=True):
            print(f"    Annotators {pair['annotator1']} and {pair['annotator2']}:")
            print(f"      Spans: {pair['spans1']} and {pair['spans2']}")
            print(f"      Overlapping spans: {pair['overlap_matches']}")  # Changed from 'matches' to 'overlap_matches'
            print(f"      Precision: {pair['precision']:.2%}")
            print(f"      Recall: {pair['recall']:.2%}")
            print(f"      F1 Score: {pair['f1']:.2%}")

    # Create visualization
    plt.figure(figsize=(12, 8))

    # Prepare data for bar chart
    doc_ids = []
    doc_f1s = []
    doc_precisions = []
    doc_recalls = []

    for doc_id, doc_results in f1_results['per_document'].items():
        doc_ids.append(f"Doc {doc_id}")
        doc_f1s.append(doc_results['avg_f1'] * 100)
        doc_precisions.append(doc_results['avg_precision'] * 100)
        doc_recalls.append(doc_results['avg_recall'] * 100)

    # Add overall scores
    doc_ids.append("Overall")
    doc_f1s.append(overall['avg_f1'] * 100)
    doc_precisions.append(overall['avg_precision'] * 100)
    doc_recalls.append(overall['avg_recall'] * 100)

    # Set bar width
    barWidth = 0.25

    # Set position of bars on X axis
    r1 = np.arange(len(doc_ids))
    r2 = [x + barWidth for x in r1]
    r3 = [x + barWidth for x in r2]

    # Create grouped bars
    plt.bar(r1, doc_f1s, width=barWidth, color='blue', label='F1 Score')
    plt.bar(r2, doc_precisions, width=barWidth, color='green', label='Precision')
    plt.bar(r3, doc_recalls, width=barWidth, color='orange', label='Recall')

    # Add labels and legend
    plt.xlabel('Document')
    plt.ylabel('Score (%)')
    plt.title('F1, Precision, and Recall by Document')
    plt.xticks([r + barWidth for r in range(len(doc_ids))], doc_ids)
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
# Calculate F1 scores for partial matches
# Any overlap is considered a match (threshold=0.0)
f1_results = calculate_span_f1(doc_id_to_annotations, overlap_threshold=0.0)

#### **F1 for Partial Credit on Spans - Visualizations**

In [None]:
# Display results
display_span_f1_results(f1_results)

## 5.3 Kappa for Category Agreement

**How it works**
- Cohen's Kappa (two annotators) or Fleiss' Kappa (multiple annotators) measures agreement on categories while accounting for chance agreement
- Scale typically ranges from -1 to 1:
 - < 0.20: Poor agreement
 - 0.21-0.40: Fair agreement
 - 0.41-0.60: Moderate agreement
 - 0.61-0.80: Substantial agreement
 - 0.81-1.00: Almost perfect agreement

**What you could potentially learn**
- How consistently annotators classify errors into categories (Accuracy, Terminology, Style)
- How consistently annotators use subcategories (e.g., "TERM: Wrong term")
- How consistently annotators assign severity levels (Minor vs. Major)
- Whether certain categories have better agreement than others

This is particularly valuable for the MQM framework since it has a complex hierarchy of error types.

**Challenges inherent in this approach**
- Low exact match rates and F1 scores present serious challenges for calculating meaningful Kappa agreement
- There will be even lower agreement as we transition from each level of agreement to the next:
 - Span agreement
 - Label agreement
 - Subcategory agreement
 - Severity agreement

**Parameters implemented to respond to challenges**
- Finding spans with substantial overlap (≥50%)
- Only calculating Kappa for these overlapping spans

**Recommendations**

Where there are low exact match rates, F1 scores, and Kappa scores for category agreement, proceeding to Kappa scoring for subcategory agreement and severity agreement is not recommended.

#### **Kappa for Category Agreement - Code**

In [None]:
def calculate_category_agreement(doc_id_to_annotations, attribute='labels'):
    """
    Calculate Cohen's Kappa for agreement on categories, subcategories, or severity levels
    across annotator pairs.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations
        attribute: The attribute to measure agreement on ('labels', 'subcategory', or 'severity')

    Returns:
        Dictionary with agreement statistics
    """
    # Results structure
    results = {
        'per_document': {},
        'overall': {
            'avg_kappa': 0,
            'category_counts': {},
            'pair_count': 0
        }
    }

    # Overall statistics
    all_kappas = []
    all_weights = []
    all_category_counts = {}

    # Process each document
    for doc_id, annotations in doc_id_to_annotations.items():
        # Group annotations by annotator
        annotator_to_spans = {}

        for annotation in annotations:
            annotator = annotation.get('annotator')
            if not annotator:
                continue

            # Extract spans for this annotator
            spans = []
            for label in annotation.get('enriched_labels', []):
                span = {
                    'start': label.get('start'),
                    'end': label.get('end'),
                    'text': label.get('text')
                }

                # Extract the relevant attribute value
                if attribute == 'labels':
                    # For labels, take the first item from the list
                    span['category'] = label.get('labels', [''])[0] if label.get('labels') else ''
                elif attribute == 'subcategory':
                    span['category'] = label.get('subcategory', '')
                elif attribute == 'severity':
                    span['category'] = label.get('severity', '')
                else:
                    continue

                spans.append(span)

            # Store spans for this annotator
            annotator_to_spans[annotator] = spans

        # Skip documents with fewer than 2 annotators
        if len(annotator_to_spans) < 2:
            continue

        # Initialize document results
        doc_results = {
            'annotator_pairs': [],
            'avg_kappa': 0,
            'category_counts': {},
            'pair_count': 0
        }

        # Calculate agreement for each pair of annotators
        doc_kappas = []
        doc_weights = []
        doc_category_counts = {}

        for annotator1, annotator2 in combinations(annotator_to_spans.keys(), 2):
            spans1 = annotator_to_spans[annotator1]
            spans2 = annotator_to_spans[annotator2]

            # Find overlapping spans
            overlapping_spans = []

            for span1 in spans1:
                for span2 in spans2:
                    # Check for overlap
                    if span1['start'] < span2['end'] and span2['start'] < span1['end']:
                        # Calculate overlap
                        overlap_start = max(span1['start'], span2['start'])
                        overlap_end = min(span1['end'], span2['end'])
                        overlap_len = overlap_end - overlap_start

                        # Calculate overlap ratio relative to the smaller span
                        span1_len = span1['end'] - span1['start']
                        span2_len = span2['end'] - span2['start']
                        min_len = min(span1_len, span2_len)
                        overlap_ratio = overlap_len / min_len if min_len > 0 else 0

                        # Only consider substantial overlaps (at least 50%)
                        if overlap_ratio >= 0.5:
                            overlapping_spans.append({
                                'span1': span1,
                                'span2': span2,
                                'overlap_ratio': overlap_ratio
                            })

            # Skip pairs with too few overlapping spans for Kappa
            if len(overlapping_spans) < 2:
                continue

            # Extract categories for overlapping spans
            categories1 = [overlap['span1']['category'] for overlap in overlapping_spans]
            categories2 = [overlap['span2']['category'] for overlap in overlapping_spans]

            # Count category frequencies
            for category in categories1 + categories2:
                doc_category_counts[category] = doc_category_counts.get(category, 0) + 1
                all_category_counts[category] = all_category_counts.get(category, 0) + 1

            # Calculate Cohen's Kappa
            try:
                kappa = cohen_kappa_score(categories1, categories2)
                weight = len(overlapping_spans)

                doc_kappas.append(kappa)
                doc_weights.append(weight)

                all_kappas.append(kappa)
                all_weights.append(weight)

                # Record pair results
                pair_result = {
                    'annotator1': annotator1,
                    'annotator2': annotator2,
                    'overlapping_spans': len(overlapping_spans),
                    'kappa': kappa
                }

                doc_results['annotator_pairs'].append(pair_result)
                doc_results['pair_count'] += 1
            except Exception as e:
                # Skip pairs with invalid Kappa calculation
                print(f"Warning: Could not calculate Kappa for annotators {annotator1} and {annotator2} - {e}")
                continue

        # Calculate weighted average Kappa for this document
        if doc_kappas:
            total_weight = sum(doc_weights)
            doc_results['avg_kappa'] = sum(k * w for k, w in zip(doc_kappas, doc_weights)) / total_weight
            doc_results['category_counts'] = doc_category_counts

        # Store document results
        results['per_document'][doc_id] = doc_results

    # Calculate overall weighted average Kappa
    if all_kappas:
        total_weight = sum(all_weights)
        results['overall']['avg_kappa'] = sum(k * w for k, w in zip(all_kappas, all_weights)) / total_weight
        results['overall']['category_counts'] = all_category_counts
        results['overall']['pair_count'] = len(all_kappas)

    return results

In [None]:
def display_category_agreement(agreement_results, attribute_name):
    """
    Display category agreement results in a readable format with visualizations.

    Args:
        agreement_results: Output from calculate_category_agreement function
        attribute_name: Name of the attribute (e.g., 'Category', 'Subcategory', 'Severity')
    """
    print("=" * 60)
    print(f"COHEN'S KAPPA ANALYSIS FOR {attribute_name.upper()} AGREEMENT")
    print("=" * 60)

    # Overall results
    overall = agreement_results['overall']
    print(f"\nOVERALL RESULTS:")

    # Interpret Kappa value
    kappa = overall['avg_kappa']
    interpretation = "Poor agreement"
    if kappa > 0.80:
        interpretation = "Almost perfect agreement"
    elif kappa > 0.60:
        interpretation = "Substantial agreement"
    elif kappa > 0.40:
        interpretation = "Moderate agreement"
    elif kappa > 0.20:
        interpretation = "Fair agreement"

    print(f"Average Cohen's Kappa: {kappa:.4f} - {interpretation}")
    print(f"Total annotator pairs analyzed: {overall['pair_count']}")

    # Category-specific results
    print(f"\n{attribute_name.upper()} FREQUENCY:")
    total_count = sum(overall['category_counts'].values())

    # Sort categories by frequency
    sorted_categories = sorted(overall['category_counts'].items(), key=lambda x: x[1], reverse=True)

    for category, count in sorted_categories:
        frequency = count / total_count if total_count > 0 else 0
        print(f"  {category}: {frequency:.2%}")

    # Per-document results
    print("\nRESULTS BY DOCUMENT:")
    for doc_id, doc_results in agreement_results['per_document'].items():
        doc_kappa = doc_results['avg_kappa']
        doc_interpretation = "Poor agreement"
        if doc_kappa > 0.80:
            doc_interpretation = "Almost perfect agreement"
        elif doc_kappa > 0.60:
            doc_interpretation = "Substantial agreement"
        elif doc_kappa > 0.40:
            doc_interpretation = "Moderate agreement"
        elif doc_kappa > 0.20:
            doc_interpretation = "Fair agreement"

        print(f"\nDocument {doc_id}:")
        print(f"  Kappa: {doc_kappa:.4f} - {doc_interpretation}")
        print(f"  Annotator pairs: {doc_results['pair_count']}")

        # Display results by annotator pair
        if doc_results['annotator_pairs']:
            print("\n  Results by annotator pair:")
            for pair in sorted(doc_results['annotator_pairs'], key=lambda x: x['kappa'], reverse=True):
                print(f"    Annotators {pair['annotator1']} and {pair['annotator2']}:")
                print(f"      Overlapping spans: {pair['overlapping_spans']}")
                print(f"      Kappa: {pair['kappa']:.4f}")

    # Create visualization
    plt.figure(figsize=(12, 8))

    # Prepare data for bar chart
    doc_ids = [f"Doc {doc_id}" for doc_id in agreement_results['per_document'].keys()]
    doc_kappas = [doc_result['avg_kappa'] for doc_result in agreement_results['per_document'].values()]

    # Add overall kappa
    doc_ids.append("Overall")
    doc_kappas.append(overall['avg_kappa'])

    # Color bars based on Kappa interpretation
    colors = []
    for kappa in doc_kappas:
        if kappa > 0.80:
            colors.append('darkgreen')
        elif kappa > 0.60:
            colors.append('green')
        elif kappa > 0.40:
            colors.append('yellow')
        elif kappa > 0.20:
            colors.append('orange')
        else:
            colors.append('red')

    # Create bar chart
    bars = plt.bar(doc_ids, doc_kappas, color=colors)

    # Add horizontal lines for Kappa interpretation thresholds
    plt.axhline(y=0.20, color='r', linestyle='--', alpha=0.5, label='Poor/Fair')
    plt.axhline(y=0.40, color='orange', linestyle='--', alpha=0.5, label='Fair/Moderate')
    plt.axhline(y=0.60, color='y', linestyle='--', alpha=0.5, label='Moderate/Substantial')
    plt.axhline(y=0.80, color='g', linestyle='--', alpha=0.5, label='Substantial/Almost Perfect')

    # Add value labels on top of bars
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height + 0.02,
                f'{height:.2f}', ha='center', va='bottom')

    # Add labels and title
    plt.xlabel('Document')
    plt.ylabel("Cohen's Kappa")
    plt.title(f"Average Cohen's Kappa for {attribute_name} Agreement by Document")
    plt.ylim(0, 1.0)
    plt.legend()
    plt.tight_layout()

    # Create a second chart for category frequencies
    if sorted_categories:
        plt.figure(figsize=(14, 6))

        # Prepare data (show top 10 categories)
        top_n = min(10, len(sorted_categories))
        top_categories = [cat for cat, _ in sorted_categories[:top_n]]
        frequencies = [count/total_count for cat, count in sorted_categories[:top_n]]

        # Create horizontal bar chart
        bars = plt.barh(top_categories, frequencies, color='skyblue')

        # Add value labels
        for bar in bars:
            width = bar.get_width()
            plt.text(width + 0.01, bar.get_y() + bar.get_height()/2,
                    f'{width:.1%}', va='center')

        # Add labels and title
        plt.xlabel('Frequency')
        plt.ylabel(attribute_name)
        plt.title(f"{attribute_name} Frequency Distribution")
        plt.xlim(0, max(frequencies) * 1.1 if frequencies else 0.1)
        plt.tight_layout()

    plt.show()

In [None]:
def analyze_category_agreement(doc_id_to_annotations):
    """
    Calculate and display agreement metrics for main error categories.

    Args:
        doc_id_to_annotations: Dictionary mapping doc_ids to lists of annotations

    Returns:
        The calculated agreement metrics
    """
    print("\nCalculating agreement for main categories...")
    category_agreement = calculate_category_agreement(doc_id_to_annotations, attribute='labels')
    display_category_agreement(category_agreement, attribute_name='Category')

    return category_agreement

#### **Kappa for Category Agreement - Visualization**

In [None]:
# Calculate and display Kappa for error categories
category_agreement = analyze_category_agreement(doc_id_to_annotations)