# NLTK Plugin

> Plugin implementation for NLTK-based text processing with character-level span tracking

In [None]:
#| default_exp plugin

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import logging
import os
from uuid import uuid4
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List

import nltk
from nltk.tokenize import PunktSentenceTokenizer

from cjm_text_plugin_system.plugin_interface import TextProcessingPlugin
from cjm_text_plugin_system.core import TextProcessResult, TextSpan
from cjm_text_plugin_system.storage import TextProcessStorage
from cjm_plugin_system.utils.hashing import hash_bytes
from cjm_plugin_system.utils.validation import (
    dict_to_config, config_to_dict, dataclass_to_jsonschema,
    SCHEMA_TITLE, SCHEMA_DESC, SCHEMA_ENUM
)
from cjm_text_plugin_nltk.meta import get_plugin_metadata

In [None]:
#| export
@dataclass
class NLTKPluginConfig:
    """Configuration for NLTK text processing plugin."""
    tokenizer: str = field(
        default="punkt",
        metadata={
            SCHEMA_TITLE: "Tokenizer",
            SCHEMA_DESC: "NLTK tokenizer to use for sentence splitting",
            SCHEMA_ENUM: ["punkt"]
        }
    )
    language: str = field(
        default="english",
        metadata={
            SCHEMA_TITLE: "Language",
            SCHEMA_DESC: "Language for tokenization (affects sentence boundary detection)",
            SCHEMA_ENUM: ["english", "german", "french", "spanish", "italian", "portuguese", "dutch"]
        }
    )

In [None]:
#| export
class NLTKPlugin(TextProcessingPlugin):
    """NLTK-based text processing plugin with character-level span tracking."""
    
    config_class = NLTKPluginConfig
    
    def __init__(self):
        """Initialize the NLTK plugin."""
        self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
        self.config: NLTKPluginConfig = None
        self._tokenizer: PunktSentenceTokenizer = None
        self._nltk_data_dir: Optional[str] = None
        self.storage: Optional[TextProcessStorage] = None
    
    @property
    def name(self) -> str:  # Plugin name identifier
        """Get the plugin name identifier."""
        return "nltk_text"
    
    @property
    def version(self) -> str:  # Plugin version string
        """Get the plugin version string."""
        return "1.0.0"

    def get_current_config(self) -> Dict[str, Any]:  # Current configuration as dictionary
        """Return current configuration state."""
        if not self.config:
            return {}
        return config_to_dict(self.config)

    def get_config_schema(self) -> Dict[str, Any]:  # JSON Schema for configuration
        """Return JSON Schema for UI generation."""
        return dataclass_to_jsonschema(NLTKPluginConfig)

    @staticmethod
    def get_config_dataclass() -> NLTKPluginConfig:  # Configuration dataclass
        """Return dataclass describing the plugin's configuration options."""
        return NLTKPluginConfig
    
    def _ensure_nltk_data(self) -> None:
        """Ensure required NLTK data packages are downloaded to the configured directory."""
        # Get NLTK data directory from environment (set by manifest env_vars)
        nltk_data_dir = os.environ.get("NLTK_DATA")
        
        if nltk_data_dir:
            # Ensure the directory exists
            os.makedirs(nltk_data_dir, exist_ok=True)
            
            # Replace NLTK's search path to ONLY use our directory
            # This prevents NLTK from finding/using data in ~/nltk_data
            nltk.data.path = [nltk_data_dir]
            
            self._nltk_data_dir = nltk_data_dir
            self.logger.info(f"Using NLTK data directory: {nltk_data_dir}")
            
            # Check if data exists in OUR directory specifically
            punkt_path = os.path.join(nltk_data_dir, "tokenizers", "punkt")
            punkt_tab_path = os.path.join(nltk_data_dir, "tokenizers", "punkt_tab")
            
            if not os.path.exists(punkt_path):
                self.logger.info(f"Downloading NLTK 'punkt' tokenizer to {nltk_data_dir}...")
                nltk.download('punkt', quiet=True, download_dir=nltk_data_dir)
            
            if not os.path.exists(punkt_tab_path):
                self.logger.info(f"Downloading NLTK 'punkt_tab' tokenizer to {nltk_data_dir}...")
                nltk.download('punkt_tab', quiet=True, download_dir=nltk_data_dir)
        else:
            # No custom directory - use NLTK defaults
            try:
                nltk.data.find('tokenizers/punkt')
            except LookupError:
                self.logger.info("Downloading NLTK 'punkt' tokenizer...")
                nltk.download('punkt', quiet=True)
            
            try:
                nltk.data.find('tokenizers/punkt_tab')
            except LookupError:
                self.logger.info("Downloading NLTK 'punkt_tab' tokenizer...")
                nltk.download('punkt_tab', quiet=True)
    
    def initialize(
        self,
        config: Optional[Any] = None  # Configuration dataclass, dict, or None
    ) -> None:
        """Initialize or re-configure the plugin (idempotent)."""
        # Parse new config
        new_config = dict_to_config(NLTKPluginConfig, config or {})
        
        # Check for changes if already running
        if self.config:
            if self.config.language != new_config.language:
                self.logger.info(f"Config change: Language {self.config.language} -> {new_config.language}")
                self._tokenizer = None  # Reset tokenizer for new language
        
        # Apply new config
        self.config = new_config
        
        # Ensure NLTK data is available
        self._ensure_nltk_data()
        
        # Initialize standardized storage
        db_path = get_plugin_metadata()["db_path"]
        self.storage = TextProcessStorage(db_path)
        
        self.logger.info(f"Initialized NLTK plugin with language '{self.config.language}'")
    
    def _get_tokenizer(self) -> PunktSentenceTokenizer:
        """Get or create the sentence tokenizer (lazy loading)."""
        if self._tokenizer is None:
            self._tokenizer = PunktSentenceTokenizer()
        return self._tokenizer
    
    def execute(
        self,
        action: str = "split_sentences",  # Operation: 'split_sentences'
        **kwargs
    ) -> Dict[str, Any]:  # JSON-serializable result
        """Execute a text processing operation."""
        if action == "split_sentences":
            text = kwargs.pop("text", "")
            job_id = kwargs.pop("job_id", str(uuid4()))
            result = self.split_sentences(text, **kwargs)
            
            # Serialize for IPC
            spans_data = [s.to_dict() for s in result.spans]
            
            # Save to standardized storage
            input_hash = hash_bytes(text.encode())
            try:
                self.storage.save(
                    job_id=job_id,
                    input_text=text,
                    input_hash=input_hash,
                    spans=spans_data,
                    metadata=result.metadata
                )
                self.logger.info(f"Saved result to DB (Job: {job_id})")
            except Exception as e:
                self.logger.error(f"Failed to save to DB: {e}")
            
            return {
                "spans": spans_data,
                "metadata": result.metadata
            }
        else:
            raise ValueError(f"Unknown action: {action}")

    def split_sentences(
        self,
        text: str,  # Input text to split into sentences
        **kwargs
    ) -> TextProcessResult:  # Result with TextSpan objects containing character indices
        """Split text into sentence spans with accurate character positions."""
        tokenizer = self._get_tokenizer()
        
        # Get (start, end) tuples using span_tokenize
        span_indices = list(tokenizer.span_tokenize(text))
        
        text_spans: List[TextSpan] = []
        for start, end in span_indices:
            span_text = text[start:end]
            text_spans.append(TextSpan(
                text=span_text,
                start_char=start,
                end_char=end,
                label="sentence"
            ))
        
        return TextProcessResult(
            spans=text_spans,
            metadata={
                "processor": self.name,
                "tokenizer": self.config.tokenizer if self.config else "punkt",
                "language": self.config.language if self.config else "english",
                "nltk_data_dir": self._nltk_data_dir
            }
        )
    
    def cleanup(self) -> None:
        """Clean up resources."""
        self._tokenizer = None
        self.logger.info("NLTK plugin cleanup completed")

## Testing the Plugin

In [None]:
# Test basic functionality
plugin = NLTKPlugin()

print(f"Plugin name: {plugin.name}")
print(f"Plugin version: {plugin.version}")
print(f"Config class: {plugin.config_class.__name__}")

# Test configuration dataclass
from dataclasses import fields

print("Available languages:")
lang_field = next(f for f in fields(NLTKPluginConfig) if f.name == "language")
for lang in lang_field.metadata.get(SCHEMA_ENUM, []):
    print(f"  - {lang}")

# Test initialization
plugin.initialize({"language": "english"})

current_config = plugin.get_current_config()
print(f"Current config: {current_config}")

# Test get_config_schema for UI generation
import json

schema = plugin.get_config_schema()
print("JSON Schema for NLTKPluginConfig:")
print(json.dumps(schema, indent=2))

Plugin name: nltk_text
Plugin version: 1.0.0
Config class: NLTKPluginConfig
Available languages:
  - english
  - german
  - french
  - spanish
  - italian
  - portuguese
  - dutch
Current config: {'tokenizer': 'punkt', 'language': 'english'}
JSON Schema for NLTKPluginConfig:
{
  "name": "NLTKPluginConfig",
  "title": "NLTKPluginConfig",
  "description": "Configuration for NLTK text processing plugin.",
  "type": "object",
  "properties": {
    "tokenizer": {
      "type": "string",
      "title": "Tokenizer",
      "description": "NLTK tokenizer to use for sentence splitting",
      "enum": [
        "punkt"
      ],
      "default": "punkt"
    },
    "language": {
      "type": "string",
      "title": "Language",
      "description": "Language for tokenization (affects sentence boundary detection)",
      "enum": [
        "english",
        "german",
        "french",
        "spanish",
        "italian",
        "portuguese",
        "dutch"
      ],
      "default": "english"
   

In [None]:
# Test split_sentences directly
text = "Hello world. How are you? I am fine! This is a test."
result = plugin.split_sentences(text)

print(f"Input: '{text}'")
print(f"Spans found: {len(result.spans)}")
print(f"Metadata: {result.metadata}")

for i, span in enumerate(result.spans):
    print(f"  {i}: '{span.text}' [{span.start_char}:{span.end_char}]")
    # Verify mapping back to original
    assert text[span.start_char:span.end_char] == span.text, f"Mismatch at span {i}"

Input: 'Hello world. How are you? I am fine! This is a test.'
Spans found: 4
Metadata: {'processor': 'nltk_text', 'tokenizer': 'punkt', 'language': 'english', 'nltk_data_dir': None}
  0: 'Hello world.' [0:12]
  1: 'How are you?' [13:25]
  2: 'I am fine!' [26:36]
  3: 'This is a test.' [37:52]


In [None]:
# Test execute() dispatcher (as Worker would call it)
json_result = plugin.execute(action="split_sentences", text=text)

print(f"JSON result from execute():")
print(f"  spans: {len(json_result['spans'])} items")
print(f"  metadata: {json_result['metadata']}")

for span_dict in json_result['spans']:
    print(f"    - {span_dict['text']!r} [{span_dict['start_char']}:{span_dict['end_char']}]")

JSON result from execute():
  spans: 4 items
  metadata: {'processor': 'nltk_text', 'tokenizer': 'punkt', 'language': 'english', 'nltk_data_dir': None}
    - 'Hello world.' [0:12]
    - 'How are you?' [13:25]
    - 'I am fine!' [26:36]
    - 'This is a test.' [37:52]


In [None]:
# Test with multi-paragraph text
multi_text = """First paragraph. It has two sentences.

Second paragraph starts here. And continues here!

Third paragraph: What about questions? They work too."""

result = plugin.split_sentences(multi_text)
print(f"Multi-paragraph text - {len(result.spans)} sentences found:")
for i, span in enumerate(result.spans):
    # Show first 50 chars of each span
    preview = span.text[:50] + "..." if len(span.text) > 50 else span.text
    print(f"  {i}: [{span.start_char:3d}:{span.end_char:3d}] {preview!r}")

Multi-paragraph text - 6 sentences found:
  0: [  0: 16] 'First paragraph.'
  1: [ 17: 38] 'It has two sentences.'
  2: [ 40: 69] 'Second paragraph starts here.'
  3: [ 70: 89] 'And continues here!'
  4: [ 91:129] 'Third paragraph: What about questions?'
  5: [130:144] 'They work too.'


In [None]:
# Cleanup
plugin.cleanup()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()