# Plugin Interface

> Abstract base class defining the generic plugin interface

In [None]:
#| default_exp core.interface

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
from typing import Optional, Dict, Any, Tuple, Generator, Type
from cjm_plugin_system.utils.validation import (
    dict_to_config, config_to_dict, extract_defaults, validate_config,
    SCHEMA_TITLE, SCHEMA_DESC, SCHEMA_MIN, SCHEMA_MAX, SCHEMA_ENUM
)

## PluginInterface

The `PluginInterface` is a generic abstract base class that defines the contract all plugins must implement. This interface is completely domain-agnostic and can be subclassed to create specific plugin systems (e.g., transcription plugins, LLM plugins, image generation plugins).

Plugins use dataclass-based configuration for type safety and IDE support.

In [None]:
#| export
class PluginInterface(ABC):
    """Generic plugin interface that all plugins must implement."""

    # Must be overridden by domain-specific interfaces
    entry_point_group: str = None
    
    # Configuration dataclass type - override in concrete plugins
    config_class: Type = None

    def __init_subclass__(cls, **kwargs):
        """Enforce that domain-specific interfaces define entry_point_group."""
        super().__init_subclass__(**kwargs)

        # Check if this is a direct subclass of PluginInterface
        # (i.e., a domain-specific interface like TranscriptionPlugin)
        if PluginInterface in cls.__bases__:
            # Domain-specific interfaces must define entry_point_group
            if not hasattr(cls, 'entry_point_group') or cls.entry_point_group is None:
                raise TypeError(
                    f"Domain-specific interface '{cls.__name__}' must define "
                    f"'entry_point_group' class attribute. Example:\n"
                    f"    class {cls.__name__}(PluginInterface):\n"
                    f"        entry_point_group = 'your.plugins'"
                )

    @property
    @abstractmethod
    def name(self) -> str: # Unique identifier for this plugin
        """Unique plugin identifier."""
        pass

    @property
    @abstractmethod
    def version(self) -> str: # Semantic version string (e.g., "1.0.0")
        """Plugin version."""
        pass

    @abstractmethod
    def initialize(
        self,
        config:Optional[Any]=None # Configuration dataclass instance or dict
    ) -> None:
        """Initialize the plugin with configuration."""
        pass

    @abstractmethod
    def execute(
        self,
        *args,
        **kwargs
    ) -> Any: # Plugin-specific output
        """Execute the plugin's main functionality."""
        pass

    @abstractmethod
    def is_available(self) -> bool: # True if all required dependencies are available
        """Check if the plugin's dependencies are available."""
        pass

    @abstractmethod
    def get_current_config(self) -> Any: # Current configuration dataclass instance
        """Return the current configuration state."""
        pass

    @staticmethod
    @abstractmethod
    def get_config_dataclass() -> Any: # The dataclass describing the configuration options
        """Return dataclass describing the plugin's configuration options."""
        pass

    def get_config_defaults(self) -> Dict[str, Any]: # Default values from config_class
        """Extract default values from the configuration dataclass."""
        if self.config_class is None:
            return {}
        return extract_defaults(self.config_class)

    def cleanup(self) -> None:
        """Optional cleanup when plugin is unloaded."""
        pass

This is a domain-agnostic base class. Domain-specific plugin systems should subclass this interface and add their specific requirements.

**Important:** Domain-specific interfaces (direct subclasses of PluginInterface) MUST define the `entry_point_group` class attribute. This requirement is enforced at class definition time.

The interface provides:
- Abstract properties for plugin identity (`name`, `version`)
- Configuration management with dataclass-based configs (`config_class`)
- Lifecycle methods (`initialize`, `cleanup`)
- Dependency checking (`is_available`)
- Main execution method (`execute`)
- Optional streaming support via `execute_stream`

## Streaming Support

The plugin system includes optional streaming support. Plugins can implement `execute_stream()` to provide real-time streaming results.

In [None]:
#| export
def PluginInterface_supports_streaming(self) -> bool: # True if execute_stream is implemented
    """Check if this plugin supports streaming execution."""
    # Default: check if execute_stream is overridden from the base class
    return type(self).execute_stream != PluginInterface.execute_stream

def PluginInterface_execute_stream(
    self,
    *args, # Arguments for plugin execution
    **kwargs # Keyword arguments for plugin execution
) -> Generator[Any, None, Any]: # Yields partial results, returns final result
    """Stream execution results chunk by chunk."""
    # Default implementation: execute normally and yield complete result at once
    result = self.execute(*args, **kwargs)
    yield result
    return result

# Add the methods to the PluginInterface class
PluginInterface.supports_streaming = PluginInterface_supports_streaming
PluginInterface.execute_stream = PluginInterface_execute_stream

The default `execute_stream` implementation falls back to `execute()` without real streaming. Plugins can override this method to provide true streaming capabilities where partial results are yielded as they become available.

The `supports_streaming` method checks if a plugin has overridden the default `execute_stream` implementation, allowing callers to detect streaming support at runtime.

### Example: Creating a Domain-Specific Interface

First, let's create a configuration dataclass and domain-specific plugin interface:

In [None]:
from typing import List
from dataclasses import dataclass, field

# Define a configuration dataclass for text processing plugins
@dataclass
class TextProcessingConfig:
    """Configuration for text processing plugins."""
    mode:str = field(
        default="balanced",
        metadata={
            SCHEMA_TITLE: "Processing Mode",
            SCHEMA_DESC: "Processing mode to use",
            SCHEMA_ENUM: ["fast", "balanced", "quality"]
        }
    )
    threshold:float = field(
        default=0.5,
        metadata={
            SCHEMA_TITLE: "Threshold",
            SCHEMA_DESC: "Processing threshold",
            SCHEMA_MIN: 0.0,
            SCHEMA_MAX: 1.0
        }
    )
    max_workers:int = field(
        default=4,
        metadata={
            SCHEMA_TITLE: "Max Workers",
            SCHEMA_DESC: "Maximum number of workers",
            SCHEMA_MIN: 1,
            SCHEMA_MAX: 16
        }
    )
    enable_cache:bool = field(
        default=True,
        metadata={SCHEMA_TITLE: "Enable Cache", SCHEMA_DESC: "Enable result caching"}
    )

# Create a domain-specific plugin interface
class TextProcessingPlugin(PluginInterface):
    """Domain-specific interface for text processing plugins."""
    
    # REQUIRED: Define entry_point_group for this plugin type
    entry_point_group = "text_processing.plugins"
    
    # Default config class for this plugin type
    config_class = TextProcessingConfig
    
    @property
    @abstractmethod
    def supported_formats(self) -> List[str]:
        """File formats this plugin can process."""
        pass

# Now create a concrete plugin that inherits from the domain-specific interface
class ExamplePlugin(TextProcessingPlugin):
    """An example text processing plugin implementation."""
    
    # entry_point_group and config_class are inherited from TextProcessingPlugin

    def __init__(self):
        self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
        self.config: TextProcessingConfig = None
        self.resource = None

    @property
    def name(self) -> str:
        return "example_plugin"
    
    @property
    def version(self) -> str:
        return "1.0.0"
    
    @property
    def supported_formats(self) -> List[str]:
        return ["txt", "md"]
    
    def get_current_config(self) -> TextProcessingConfig:
        """Return the current configuration."""
        return self.config

    @staticmethod
    def get_config_dataclass() -> TextProcessingConfig:
        return TextProcessingConfig
    
    def initialize(self, config: Optional[Any] = None) -> None:
        """Initialize the plugin."""
        # Handle different config input types
        if config is None:
            self.config = TextProcessingConfig()
        elif isinstance(config, TextProcessingConfig):
            self.config = config
        elif isinstance(config, dict):
            self.config = dict_to_config(TextProcessingConfig, config, validate=True)
        else:
            raise TypeError(f"Expected TextProcessingConfig, dict, or None, got {type(config).__name__}")
        
        self.logger.info(f"Initializing {self.name} with config: {self.config}")
        
        # Simulate resource initialization
        self.resource = f"Resource-{self.config.mode}"
    
    def execute(self, input_data: Any, **kwargs) -> Any:
        """Execute the plugin's functionality."""
        self.logger.info(f"Executing {self.name} with resource: {self.resource}")
        self.logger.info(f"Config: {self.config}")
        return f"Processed {input_data} using {self.resource}"

    def is_available(self) -> bool:
        """Check availability."""
        return True
    
    def cleanup(self) -> None:
        """Clean up resources."""
        self.logger.info(f"Cleaning up {self.name}")
        self.resource = None

In [None]:
# Test the example plugin
logging.basicConfig(level=logging.INFO)

plugin = ExamplePlugin()
print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"Available: {plugin.is_available()}")
print(f"Entry point group (inherited): {plugin.entry_point_group}")
print(f"Config class (inherited): {plugin.config_class.__name__}")

# Get defaults
defaults = plugin.get_config_defaults()
print("\nDefault Configuration:")
for k, v in defaults.items():
    print(f"  {k}: {v!r}")

Plugin: example_plugin v1.0.0
Available: True
Entry point group (inherited): text_processing.plugins
Config class (inherited): TextProcessingConfig

Default Configuration:
  mode: 'balanced'
  threshold: 0.5
  max_workers: 4
  enable_cache: True


In [None]:
# This will raise a TypeError because entry_point_group is not defined
try:
    class BadPlugin(PluginInterface):
        """This will fail - missing entry_point_group!"""
        pass
except TypeError as e:
    print("✓ Correctly caught missing entry_point_group:")
    print(f"  {e}")

print("\n" + "="*60 + "\n")

# This works - entry_point_group is defined
try:
    class GoodPlugin(PluginInterface):
        """This works - has entry_point_group!"""
        entry_point_group = "good.plugins"
    print("✓ Successfully created domain-specific interface with entry_point_group")
    print(f"  entry_point_group = '{GoodPlugin.entry_point_group}'")
except TypeError as e:
    print(f"✗ Unexpected error: {e}")

✓ Correctly caught missing entry_point_group:
  Domain-specific interface 'BadPlugin' must define 'entry_point_group' class attribute. Example:
    class BadPlugin(PluginInterface):
        entry_point_group = 'your.plugins'


✓ Successfully created domain-specific interface with entry_point_group
  entry_point_group = 'good.plugins'


### Demonstrating entry_point_group Enforcement

The system enforces that domain-specific interfaces (direct subclasses of PluginInterface) must define `entry_point_group`:

In [None]:
# Initialize with a dataclass config
config = TextProcessingConfig(mode="quality", threshold=0.8)
plugin.initialize(config)
result = plugin.execute("sample_data")
print(f"Result: {result}")

# Get current config (returns dataclass instance)
current = plugin.get_current_config()
print("\nCurrent Configuration:")
print(f"  Type: {type(current).__name__}")
print(f"  mode: {current.mode}")
print(f"  threshold: {current.threshold}")
print(f"  max_workers: {current.max_workers}")
print(f"  enable_cache: {current.enable_cache}")

# Can also initialize with a dict (validates automatically)
plugin.initialize({"mode": "fast", "threshold": 0.3})
print(f"\nAfter dict init - mode: {plugin.get_current_config().mode}")

# Test validation - invalid enum value
try:
    plugin.initialize({"mode": "invalid_mode"})
except ValueError as e:
    print(f"\n✓ Caught invalid enum: {e}")

# Test validation - value out of range
try:
    plugin.initialize({"mode": "fast", "threshold": 2.0})
except ValueError as e:
    print(f"✓ Caught out of range: {e}")

# Cleanup
plugin.cleanup()

INFO:__main__.ExamplePlugin:Initializing example_plugin with config: TextProcessingConfig(mode='quality', threshold=0.8, max_workers=4, enable_cache=True)
INFO:__main__.ExamplePlugin:Executing example_plugin with resource: Resource-quality
INFO:__main__.ExamplePlugin:Config: TextProcessingConfig(mode='quality', threshold=0.8, max_workers=4, enable_cache=True)
INFO:__main__.ExamplePlugin:Initializing example_plugin with config: TextProcessingConfig(mode='fast', threshold=0.3, max_workers=4, enable_cache=True)
INFO:__main__.ExamplePlugin:Cleaning up example_plugin


Result: Processed sample_data using Resource-quality

Current Configuration:
  Type: TextProcessingConfig
  mode: quality
  threshold: 0.8
  max_workers: 4
  enable_cache: True

After dict init - mode: fast

✓ Caught invalid enum: mode: 'invalid_mode' is not one of ['fast', 'balanced', 'quality']
✓ Caught out of range: threshold: 2.0 is greater than maximum 1.0


In [None]:
# Test converting config to dict (useful for serialization)
current_config = plugin.get_current_config()
config_dict = config_to_dict(current_config)
print("Configuration as dictionary:")
for k, v in config_dict.items():
    print(f"  {k}: {v!r}")

Configuration as dictionary:
  mode: 'fast'
  threshold: 0.3
  max_workers: 4
  enable_cache: True


In [None]:
# Test streaming support
plugin.initialize({"mode": "balanced"})  # Reinitialize after cleanup
print(f"Supports streaming: {plugin.supports_streaming()}")

# The default implementation doesn't stream, it just yields the result once
print("\nStreaming execution:")
for chunk in plugin.execute_stream("stream_data"):
    print(f"  Chunk: {chunk}")

INFO:__main__.ExamplePlugin:Initializing example_plugin with config: TextProcessingConfig(mode='balanced', threshold=0.5, max_workers=4, enable_cache=True)
INFO:__main__.ExamplePlugin:Executing example_plugin with resource: Resource-balanced
INFO:__main__.ExamplePlugin:Config: TextProcessingConfig(mode='balanced', threshold=0.5, max_workers=4, enable_cache=True)


Supports streaming: False

Streaming execution:
  Chunk: Processed stream_data using Resource-balanced


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()