# NER Training Data Preparation

Combine datasets from all domains and prepare for NER model training.

**Goals:**
- Unify document formats across domains
- Create BIO/BILOU annotations
- Export to spaCy and HuggingFace formats

In [None]:
import sys
from pathlib import Path

sys.path.insert(0, str(Path('../corpus-core/src').resolve()))
sys.path.insert(0, str(Path('../pipelines/src').resolve()))

from corpus_core.loaders import ParquetLoader
from corpus_core.models import Document
import pandas as pd
import json

## Load All Domain Documents

In [None]:
loader = ParquetLoader(Path('../datasets'))

# Collect all documents
all_documents = []

# Congress documents
if loader.exists('congress', 'congress_documents'):
    congress_docs = loader.read('congress', 'congress_documents').to_pylist()
    all_documents.extend(congress_docs)
    print(f"Congress documents: {len(congress_docs)}")

# EDGAR documents  
if loader.exists('edgar', 'edgar_documents'):
    edgar_docs = loader.read('edgar', 'edgar_documents').to_pylist()
    all_documents.extend(edgar_docs)
    print(f"EDGAR documents: {len(edgar_docs)}")

# Reddit documents
if loader.exists('reddit', 'reddit_documents'):
    reddit_docs = loader.read('reddit', 'reddit_documents').to_pylist()
    all_documents.extend(reddit_docs)
    print(f"Reddit documents: {len(reddit_docs)}")

print(f"\nTotal documents: {len(all_documents)}")

In [None]:
# Convert to DataFrame for analysis
if all_documents:
    docs_df = pd.DataFrame(all_documents)
    print("\nDocument distribution by domain:")
    print(docs_df['domain'].value_counts())

## Create Training Dataset

In [None]:
def prepare_training_example(doc: dict) -> dict:
    """
    Prepare a document for NER training.
    
    Returns a dict with:
    - id: Document ID
    - text: Full text content
    - domain: Source domain
    - metadata: Additional context
    """
    # Combine title and content
    text = doc.get('title', '')
    content = doc.get('content', '')
    if content:
        text = f"{text}\n\n{content}"
    
    # Add sections if present
    sections = doc.get('sections', {})
    if sections:
        for section_name, section_text in sections.items():
            if section_text:
                text += f"\n\n[{section_name}]\n{section_text}"
    
    return {
        'id': doc.get('id', ''),
        'text': text[:50000],  # Cap at 50k chars
        'domain': doc.get('domain', 'unknown'),
        'document_type': doc.get('document_type', ''),
        'source': doc.get('source', ''),
    }

# Prepare all examples
if all_documents:
    training_examples = [prepare_training_example(doc) for doc in all_documents]
    print(f"Prepared {len(training_examples)} training examples")

In [None]:
# Show sample
if training_examples:
    sample = training_examples[0]
    print(f"ID: {sample['id']}")
    print(f"Domain: {sample['domain']}")
    print(f"Type: {sample['document_type']}")
    print(f"Text length: {len(sample['text'])} chars")
    print("-" * 50)
    print(sample['text'][:500])

## Generate NER Annotations (Auto-labeling)

In [None]:
try:
    import spacy
    nlp = spacy.load('en_core_web_sm')
    
    def auto_annotate(text: str, max_length: int = 5000):
        """
        Auto-annotate text using spaCy.
        
        Returns list of (start, end, label) tuples.
        """
        doc = nlp(text[:max_length])
        return [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]
    
    # Test on sample
    if training_examples:
        sample_text = training_examples[0]['text']
        annotations = auto_annotate(sample_text)
        
        print(f"Found {len(annotations)} entities in sample:")
        for start, end, label in annotations[:10]:
            print(f"  {label:10} | {sample_text[start:end][:50]}")
            
except ImportError:
    print("spaCy not installed. Run: pip install spacy && python -m spacy download en_core_web_sm")

## Export to spaCy Training Format

In [None]:
def to_spacy_format(text: str, annotations: list) -> tuple:
    """
    Convert to spaCy training format.
    
    Returns (text, {"entities": [(start, end, label), ...]})
    """
    return (text, {"entities": annotations})

# Create spaCy training data
if training_examples and 'nlp' in dir():
    spacy_training_data = []
    
    for example in training_examples[:100]:  # Sample for demo
        text = example['text']
        annotations = auto_annotate(text, max_length=2000)
        spacy_training_data.append(to_spacy_format(text[:2000], annotations))
    
    print(f"Created {len(spacy_training_data)} spaCy training examples")
    
    # Save to file
    output_path = Path('../datasets/training/spacy_train.json')
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_path, 'w') as f:
        json.dump(spacy_training_data, f)
    print(f"Saved to {output_path}")

## Export to HuggingFace Format (IOB2)

In [None]:
def to_iob2_format(text: str, annotations: list) -> list:
    """
    Convert to IOB2 token format for HuggingFace.
    
    Returns list of {"tokens": [...], "ner_tags": [...]} dicts.
    """
    if 'nlp' not in dir():
        return []
    
    doc = nlp(text)
    tokens = [token.text for token in doc]
    ner_tags = ['O'] * len(tokens)
    
    # Map annotations to tokens
    for start, end, label in annotations:
        for i, token in enumerate(doc):
            if token.idx >= start and token.idx + len(token) <= end:
                if token.idx == start:
                    ner_tags[i] = f'B-{label}'
                else:
                    ner_tags[i] = f'I-{label}'
    
    return {"tokens": tokens, "ner_tags": ner_tags}

# Test
if training_examples and 'nlp' in dir():
    sample_text = training_examples[0]['text'][:500]
    sample_annotations = auto_annotate(sample_text, max_length=500)
    iob_example = to_iob2_format(sample_text, sample_annotations)
    
    print("Sample IOB2 output:")
    for token, tag in list(zip(iob_example['tokens'], iob_example['ner_tags']))[:20]:
        if tag != 'O':
            print(f"  {token:20} -> {tag}")

In [None]:
# Create HuggingFace dataset
if training_examples and 'nlp' in dir():
    hf_training_data = []
    
    for example in training_examples[:100]:
        text = example['text'][:1000]
        annotations = auto_annotate(text, max_length=1000)
        iob_data = to_iob2_format(text, annotations)
        iob_data['id'] = example['id']
        iob_data['domain'] = example['domain']
        hf_training_data.append(iob_data)
    
    # Save to JSONL
    output_path = Path('../datasets/training/ner_train.jsonl')
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_path, 'w') as f:
        for item in hf_training_data:
            f.write(json.dumps(item) + '\n')
    print(f"Saved {len(hf_training_data)} examples to {output_path}")

## Dataset Statistics

In [None]:
if training_examples:
    # Overall stats
    total_chars = sum(len(ex['text']) for ex in training_examples)
    total_words = sum(len(ex['text'].split()) for ex in training_examples)
    
    print("Dataset Statistics:")
    print(f"  Total documents: {len(training_examples):,}")
    print(f"  Total characters: {total_chars:,}")
    print(f"  Total words: {total_words:,}")
    print(f"  Avg chars/doc: {total_chars // len(training_examples):,}")
    print(f"  Avg words/doc: {total_words // len(training_examples):,}")
    
    # By domain
    print("\nBy Domain:")
    domain_stats = {}
    for ex in training_examples:
        domain = ex['domain']
        if domain not in domain_stats:
            domain_stats[domain] = {'count': 0, 'chars': 0}
        domain_stats[domain]['count'] += 1
        domain_stats[domain]['chars'] += len(ex['text'])
    
    for domain, stats in domain_stats.items():
        print(f"  {domain}: {stats['count']} docs, {stats['chars']:,} chars")

## Next Steps

1. **Manual Annotation**: Use Prodigy or Label Studio for correction
2. **Train Custom Model**: Fine-tune transformer on domain data
3. **Evaluate**: Compare against baseline spaCy model
4. **Iterate**: Add more data sources, refine entity types