# Text Analysis

This notebook performs an analysis of multilingual medical text text cleaning rules:
- Document structure analysis
- Duplicate document detection
- Annotation and label distribution
- Sentence-level language detection
- Text cleaning pattern analysis

## Setup

In [None]:
import os
import sys
import json
import re
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter, defaultdict
from typing import Dict, List, Tuple, Set, Any, Optional
import spacy
from langdetect import detect, LangDetectException

PATH_ROOT = os.path.dirname(os.getcwd())  # Path to the root of the project

# Create directories for output
os.makedirs(os.path.join(PATH_ROOT, "data", "processed"), exist_ok=True)

try:  # Load spaCy models
    nlp_es = spacy.load("es_core_news_sm")
    nlp_ca = spacy.load("ca_core_news_sm")
    print("spaCy models loaded successfully\n")
except OSError:
    print("ERROR: You need to download spaCy models")
    print("python -m spacy download es_core_news_sm")
    print("python -m spacy download ca_core_news_sm")

# Add project root to system path
sys.path.append("..")

try:  # Import utility functions
    from utils.json import *
    from utils.language import *
except ImportError:
    print("Warning: Unable to import utility modules. Some functions may not work")

# File paths
FILE_TRAIN = os.path.join(PATH_ROOT, "data", "raw", "train.json")
FILE_TEST = os.path.join(PATH_ROOT, "data", "raw", "test.json")

In [None]:
data = []  # Try loading train data
for file_path in [FILE_TRAIN, FILE_TEST]:
    if not data:
        data = load_json_data(file_path)
        if data:
            print(f"Loaded {len(data)} documents")
            break

## Document Structure Analysis

In [None]:
print("DOCUMENT STRUCTURE ANALYSIS\n")
structure = explore_json_structure(data)  # Explore document structure

### Duplicate Document Analysis

In [None]:
print("DUPLICATE DOCUMENT ANALYSIS\n")

if not data:
    print("No data available for duplicate analysis")

true_counts = {}

# Count each unique document ID
for doc in data:
    if "data" in doc and "id" in doc["data"]:
        doc_id = doc["data"]["id"]
        true_counts[doc_id] = true_counts.get(doc_id, 0) + 1

# Find actually duplicated IDs
actual_duplicates = {id: count for id, count in true_counts.items() if count > 1}

print(f"Number of documents: {len(data)}")
print(f"Number of unique document IDs: {len(true_counts)}")
print(f"Number of truly duplicated IDs: {len(actual_duplicates)}")
print()

if actual_duplicates:
    print("True duplicate document IDs:")
    for doc_id, count in sorted(actual_duplicates.items(), key=lambda x: x[1], reverse=True):
        print(f"ID {doc_id} appears {count} times")
else:
    print("No duplicate document IDs found")

### Annotation Analysis

In [None]:
def extract_annotations() -> pd.DataFrame:
    """
    Extract annotations from documents
    """

    all_annotations = []

    for doc in data:
        if "data" not in doc or "id" not in doc["data"] or "text" not in doc["data"]:
            continue

        doc_id = doc["data"]["id"]
        text = doc["data"]["text"]

        # Extract annotations
        if "predictions" in doc and doc["predictions"]:
            for pred in doc["predictions"]:
                if "result" in pred and pred["result"]:
                    for result in pred["result"]:
                        if "value" in result and "labels" in result["value"] and result["value"]["labels"]:
                            value = result["value"]
                            start = value.get("start")
                            end = value.get("end")
                            labels = value.get("labels", [])

                            if start is not None and end is not None and start < len(text) and end <= len(text):
                                annotation_text = text[start:end]

                                for label in labels:
                                    all_annotations.append(
                                        {
                                            "doc_id": doc_id,
                                            "annotation_id": result.get("id"),
                                            "start": start,
                                            "end": end,
                                            "label": label,
                                            "text": annotation_text,
                                            "length": len(annotation_text),
                                        }
                                    )

    # Convert to DataFrame
    if all_annotations:
        return pd.DataFrame(all_annotations)
    else:
        # Return empty DataFrame with expected columns
        return pd.DataFrame(columns=["doc_id", "annotation_id", "start", "end", "label", "text", "length"])


annotations_df = extract_annotations()  # Extract annotations


print("LABEL DISTRIBUTION ANALYSIS\n")

if annotations_df.empty:
    print("No annotations found for analysis")

# Get label counts
label_counts = annotations_df["label"].value_counts()

print(f"Found {len(annotations_df)} annotations across {annotations_df.doc_id.nunique()} documents")
print("\nLabel distribution:")
for label, count in label_counts.items():
    print(f"  - {label}: {count} annotations ({count/len(annotations_df)*100:.1f}%)")

plt.figure(figsize=(10, 6))
ax = sns.barplot(x=label_counts.index, y=label_counts.values, hue=label_counts.index, palette="viridis", legend=False)
plt.title("Distribution of Annotation Labels")
plt.xlabel("Label")
plt.ylabel("Count")
plt.xticks(rotation=45)

# Add count values on top of bars
for i, count in enumerate(label_counts.values):
    ax.text(i, count + 25, str(count), ha="center")

plt.tight_layout()
plt.show()

print("\nSample annotations:")
sample_size = min(5, len(annotations_df))
for i, (_, row) in enumerate(annotations_df.sample(sample_size).iterrows()):
    print(f"  {i+1}. Label: {row.label}")
    print(f"\tText: '{row.text}'\n")

## Sentence-Level Language Detection

In [None]:
def extract_sentences(data, max_docs=100) -> Dict[str, List[Dict[str, str]]]:
    """
    Extract and process sentences from documents
    """

    print(f"Extracting sentences from up to {max_docs} documents...")
    sentences_by_lang = {"es": [], "ca": []}
    doc_languages = {"mixed": 0, "es_only": 0, "ca_only": 0, "total": 0}
    processed = 0

    for i, doc in enumerate(data):
        if processed >= max_docs:
            break

        if "data" not in doc or "text" not in doc["data"] or not doc["data"]["text"]:
            continue

        text = doc["data"]["text"]
        doc_id = doc["data"].get("id", f"doc_{i}")
        doc_langs = set()

        try:  # Segment document into sentences
            doc_nlp = nlp_es(text)
            # Process each sentence
            sent_found = False

            for sent in doc_nlp.sents:  # Get sentences using spaCy
                sent_text = sent.text.strip()
                if not sent_text or len(sent_text) < 10:
                    continue

                sent_found = True
                sent_lang = detect_sentence_language(sent_text)
                doc_langs.add(sent_lang)
                sentences_by_lang[sent_lang].append({"text": sent_text, "doc_id": doc_id})

            # Fallback to simple sentence splitting if spaCy doesn't find sentences
            if not sent_found:
                simple_sentences = [s.strip() for s in re.split(r"[.!?]\s+", text) if len(s.strip()) > 10]
                for sent_text in simple_sentences:
                    sent_lang = detect_sentence_language(sent_text)
                    doc_langs.add(sent_lang)
                    sentences_by_lang[sent_lang].append({"text": sent_text, "doc_id": doc_id})

            # Update document language statistics
            if len(doc_langs) > 1:
                doc_languages["mixed"] += 1
            elif "es" in doc_langs:
                doc_languages["es_only"] += 1
            elif "ca" in doc_langs:
                doc_languages["ca_only"] += 1

            doc_languages["total"] += 1
            processed += 1

        except Exception as e:
            print(f"Error processing document {i}: {e}")
            continue

    # Print statistics
    es_count = len(sentences_by_lang["es"])
    ca_count = len(sentences_by_lang["ca"])
    total = es_count + ca_count

    print(f"Extracted {total} sentences from {processed} documents")
    if total > 0:
        print(f"Spanish: {es_count} sentences ({es_count/total*100:.1f}%)")
        print(f"Catalan: {ca_count} sentences ({ca_count/total*100:.1f}%)")

        # Document language composition
        print("\nDocument language composition:")
        if doc_languages["total"] > 0:
            print(f"Spanish only: {doc_languages['es_only']} ({doc_languages['es_only']/doc_languages['total']*100:.1f}%)")
            print(f"Catalan only: {doc_languages['ca_only']} ({doc_languages['ca_only']/doc_languages['total']*100:.1f}%)")
            print(f"Mixed languages: {doc_languages['mixed']} ({doc_languages['mixed']/doc_languages['total']*100:.1f}%)")

        plt.figure(figsize=(10, 6))  # Visual distribution of sentences by language
        plt.pie([es_count, ca_count], labels=["Spanish", "Catalan"], autopct="%1.1f%%", colors=["#3274A1", "#E1812C"], startangle=90)
        plt.axis("equal")
        plt.title("Sentence Language Distribution")
        plt.tight_layout()
        plt.show()

    return sentences_by_lang


print("SENTENCE EXTRACTION AND LANGUAGE DETECTION\n")
sentences = extract_sentences(data)  # Extract sentences

# Show sample sentences
print("\nSample sentences:")
for lang, name in [("es", "Spanish"), ("ca", "Catalan")]:
    print(f"\n{name} examples:")
    for i, sent in enumerate(sentences[lang][:3]):
        print(f"  {i+1}. {sent['text']}")

## Text Cleaning Pattern Analysis

### Abbreviation Analysis

In [None]:
print("ABBREVIATION ANALYSIS ")
print("Finding abbreviations that should be expanded during text cleaning\n")

for lang, name in [("es", "SPANISH"), ("ca", "CATALAN")]:
    # Find standard abbreviations ending with period
    abbrev_pattern = r"\b([a-zA-ZáéíóúüñçÁÉÍÓÚÜÑÇàèìòù]{1,5}\.)\s"
    abbrevs_count = Counter()
    abbrevs_context = {}

    for sent_data in sentences[lang]:
        text = sent_data["text"]
        for match in re.finditer(abbrev_pattern, text):
            abbrev = match.group(1)
            abbrevs_count[abbrev] += 1
            # Store context if we don't have it yet
            if abbrev not in abbrevs_context:
                start = max(0, match.start() - 15)
                end = min(len(text), match.end() + 15)
                abbrevs_context[abbrev] = text[start:end]

    print(f"{name} ABBREVIATIONS:")
    if abbrevs_count:
        for abbrev, count in abbrevs_count.most_common(10):
            context = abbrevs_context.get(abbrev, "")
            print(f"  {abbrev} ({count} occurrences)")
            print(f"    Context: '{context}'")
            # Suggest expansion
            expansion = ""
            if abbrev.lower() == "dr.":
                expansion = "doctor" if lang == "es" else "doctor"
            elif abbrev.lower() == "dra.":
                expansion = "doctora" if lang == "es" else "doctora"
            elif abbrev.lower() == "d.":
                expansion = "de"
            elif abbrev.lower() == "sr.":
                expansion = "señor" if lang == "es" else "senyor"
            elif abbrev.lower() == "sra.":
                expansion = "señora" if lang == "es" else "senyora"
            elif abbrev.lower() == "t.a.":
                expansion = "tensión arterial" if lang == "es" else "tensió arterial"
            elif abbrev.lower() == "hosp.":
                expansion = "hospital"
            elif abbrev.lower() == "tto.":
                expansion = "tratamiento" if lang == "es" else "tractament"
            else:
                expansion = abbrev.lower().replace(".", "")
            if expansion:
                print(f"    Suggested expansion: '{abbrev}' → '{expansion}'\n")
    else:
        print("  No abbreviations found.")
    print()

### Language-Specific Pattern Analysis

In [None]:
print("LANGUAGE-SPECIFIC PATTERN ANALYSIS ")
print("Finding language-specific patterns like apostrophes and contractions\n")

# Catalan apostrophes
print("CATALAN APOSTROPHES:")
apostrophe_pattern = r"\b([ldmnst]\'[a-zàèìòùáéíóúçñ]+)"
apostrophe_types = Counter()
apostrophe_examples = defaultdict(list)

for sent_data in sentences["ca"]:
    text = sent_data["text"]
    for match in re.finditer(apostrophe_pattern, text, re.IGNORECASE):
        found = match.group(1).lower()
        apos_type = found[0] + "'"
        apostrophe_types[apos_type] += 1
        if len(apostrophe_examples[apos_type]) < 3:  # Store a few examples
            start = max(0, match.start() - 15)
            end = min(len(text), match.end() + 15)
            context = text[start:end]
            apostrophe_examples[apos_type].append((found, context))

if apostrophe_types:
    for apos_type, count in apostrophe_types.most_common():
        print(f"  {apos_type} ({count} occurrences)")
        for example, context in apostrophe_examples[apos_type]:  # Show examples
            print(f"    Example: '{example}'")
            print(f"    Context: '{context}'\n")
        # Suggest replacement
        if apos_type == "d'":
            print(f"    Current function transforms: '{apos_type}example' → 'de example'")
        elif apos_type == "l'":
            print(f"    Current function transforms: '{apos_type}example' → 'el example'")
        else:  # For other types not handled in current function
            letter = apos_type[0]
            if letter == "n":
                replacement = "en"
            elif letter == "s":
                replacement = "se"
            elif letter == "m":
                replacement = "me"
            elif letter == "t":
                replacement = "te"
            else:
                replacement = letter + "e"
            print(f"    Suggested transformation: '{apos_type}example' → '{replacement} example'")
        print()
else:
    print("  No Catalan apostrophes found.")
    print()

# Spanish contractions
print("SPANISH CONTRACTIONS:")
contraction_pattern = r"\b(del|al|d el|a el)\b"
contraction_count = Counter()
contraction_examples = defaultdict(list)

for sent_data in sentences["es"]:
    text = sent_data["text"]
    for match in re.finditer(contraction_pattern, text, re.IGNORECASE):
        found = match.group(1).lower()
        contraction_count[found] += 1
        # Store a few examples
        if len(contraction_examples[found]) < 3:
            start = max(0, match.start() - 15)
            end = min(len(text), match.end() + 15)
            context = text[start:end]
            contraction_examples[found].append(context)

if contraction_count:
    for contraction, count in contraction_count.most_common():
        print(f"  {contraction} ({count} occurrences)")
        # Show examples
        for context in contraction_examples[contraction]:
            print(f"    Context: '{context}'")
        # Suggest replacement
        if contraction in ["d el", "del"]:
            print("    Suggested rule: Normalize 'd el' → 'del'")
        elif contraction in ["a el", "al"]:
            print("    Suggested rule: Normalize 'a el' → 'al'")
        print()
else:
    print("  No Spanish contractions found.")
    print()

### Patterns to Preserve Analysis

In [None]:
print("PATTERNS TO PRESERVE ANALYSIS ")
print("Identifying patterns that should be preserved during cleaning\n")

preservation_patterns = {  # Define patterns to check
    "dates": r"\d{1,2}[\/-]\d{1,2}[\/-]\d{2,4}",
    "times": r"\d{1,2}:\d{1,2}",
    "measurements": r"\d+[\.\,]?\d*\s*(?:mg|kg|g|ml|l|cm|mm|mmHg)",
    "temperatures": r"\d+[\.\,]\d*\s*°[CF]",
    "decimal_numbers": r"\d+[\.\,]\d+",
    "hyphenated_terms": r"[a-zA-ZáéíóúüñçÁÉÍÓÚÜÑÇàèìòù]+-[a-zA-ZáéíóúüñçÁÉÍÓÚÜÑÇàèìòù]+",
}

# Analyze all sentences
all_sentences = sentences["es"] + sentences["ca"]

for pattern_name, pattern in preservation_patterns.items():
    examples = []
    count = 0

    for sent_data in all_sentences:
        text = sent_data["text"]
        for match in re.finditer(pattern, text):
            count += 1
            if len(examples) < 3:
                matched_text = match.group(0)
                start = max(0, match.start() - 20)
                end = min(len(text), match.end() + 20)
                context = text[start:end]
                # This simulates applying the current cleaning function without actually calling it
                cleaned_text = re.sub(r"[^\w\s-]", "", matched_text)
                examples.append((matched_text, cleaned_text, context))

    # Print results
    print(f"{pattern_name.upper()} ({count} occurrences):")
    if examples:
        print(f"  Pattern: {pattern}")
        for i, (original, cleaned, context) in enumerate(examples):
            print(f"  Example {i+1}: '{original}' | Context: '{context}'")
            if original != cleaned:
                print(f"    Current cleaning would change it to: '{cleaned}'\n")
            else:
                print(f"    Current cleaning would preserve it as is\n")
    else:
        print("  No examples found")
    print()

### Punctuation Removal Impact

In [None]:
print("PUNCTUATION REMOVAL IMPACT ")
print("Analyzing the impact of punctuation removal in the current function\n")

# Current function removes all punctuation except hyphens
punct_pattern = r"[^\w\s-]"

# Find all punctuation in the text corpus
punct_counts = Counter()
punct_contexts = {}

# Categorize punctuation
punct_categories = {
    "decimal_separators": [".", ","],
    "date_separators": ["/", "-"],
    "time_separators": [":"],
    "unit_markers": ["°", "%"],
    "apostrophes": ["'"],
    "standard_punctuation": [".", ",", ";", ":", "!", "?"],
    "brackets": ["(", ")", "[", "]"],
}

all_sentences = sentences["es"] + sentences["ca"]

for sent_data in all_sentences:
    text = sent_data["text"]
    # Count all punctuation
    for char in re.findall(punct_pattern, text):
        punct_counts[char] += 1
        if char not in punct_contexts:
            pos = text.find(char)  # Find the position of this character
            if pos >= 0:
                start = max(0, pos - 15)
                end = min(len(text), pos + 15)
                punct_contexts[char] = text[start:end]

print("PUNCTUATION USAGE BY CATEGORY:")
for category, chars in punct_categories.items():
    category_chars = [c for c in chars if c in punct_counts]
    if category_chars:
        print(f"\n{category}:")
        for char in category_chars:
            context = punct_contexts.get(char, "")
            print(f"  '{char}' ({punct_counts[char]} occurrences)")
            print(f"    Context: '{context}'")

print("\nCRITICAL INFORMATION LOSS FROM PUNCTUATION REMOVAL:")
critical_patterns = [(r"\d[\.\,]\d", "Decimal numbers"), (r"\d[/\-]\d", "Dates"), (r"\d:\d", "Times"), (r"\d\s*°[CF]", "Temperatures")]

for pattern, description in critical_patterns:
    print(f"\n{description}:")
    examples_found = False

    for sent_data in all_sentences:
        text = sent_data["text"]
        for match in re.finditer(pattern, text):
            examples_found = True
            original = match.group(0)
            cleaned = re.sub(punct_pattern, "", original)
            start = max(0, match.start() - 10)
            end = min(len(text), match.end() + 10)
            context = text[start:end]

            print(f"  Original: '{original}'")
            print(f"  Current cleaning result: '{cleaned}'")
            print(f"  Context: '{context}'\n")
            break  # Just one example per pattern

    if not examples_found:
        print("  No examples found")