# protocol

> Protocol definitions for worker communication and plugin manager integration.

In [None]:
#| default_exp core.protocol

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from typing import Dict, Any, Optional, Iterator, Protocol
from enum import Enum
from dataclasses import dataclass

## Request and Response Types

These enums define the message types used for communication between the parent process and worker processes. The parent sends requests and the worker responds with various response types.

In [None]:
#| export
class WorkerRequestType(Enum):
    """Types of requests sent to worker process."""
    INIT = "init"
    EXECUTE = "execute"
    RELOAD = "reload"
    UNLOAD = "unload"
    GET_STATE = "get_state"
    STOP = "stop"

In [None]:
#| export
class WorkerResponseType(Enum):
    """Types of responses from worker process."""
    READY = "ready"
    RESULT = "result"
    STREAM_CHUNK = "stream_chunk"
    RESPONSE = "response"
    STATE = "state"
    ERROR = "error"

## Data Structures

These dataclasses define the structure of messages passed through multiprocessing queues. They provide `to_dict()` and `from_dict()` methods for serialization since objects passed through queues need to be picklable.

In [None]:
#| export
@dataclass
class WorkerRequest:
    """Base structure for worker requests."""
    type: WorkerRequestType
    data: Dict[str, Any]

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for queue serialization."""
        return {
            'type': self.type.value,
            **self.data
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':
        """Create from dictionary received from queue."""
        req_type = WorkerRequestType(data['type'])
        request_data = {k: v for k, v in data.items() if k != 'type'}
        return cls(type=req_type, data=request_data)

In [None]:
# Test WorkerRequest serialization
request = WorkerRequest(
    type=WorkerRequestType.EXECUTE,
    data={'job_id': 'test-123', 'plugin_name': 'test_plugin', 'param1': 'value1'}
)
request

WorkerRequest(type=<WorkerRequestType.EXECUTE: 'execute'>, data={'job_id': 'test-123', 'plugin_name': 'test_plugin', 'param1': 'value1'})

In [None]:
# Test to_dict conversion
request_dict = request.to_dict()
request_dict

{'type': 'execute',
 'job_id': 'test-123',
 'plugin_name': 'test_plugin',
 'param1': 'value1'}

In [None]:
# Test from_dict deserialization
restored_request = WorkerRequest.from_dict(request_dict)
restored_request

WorkerRequest(type=<WorkerRequestType.EXECUTE: 'execute'>, data={'job_id': 'test-123', 'plugin_name': 'test_plugin', 'param1': 'value1'})

In [None]:
#| export
@dataclass
class WorkerResponse:
    """Base structure for worker responses."""
    type: WorkerResponseType
    data: Dict[str, Any]

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for queue serialization."""
        return {
            'type': self.type.value,
            **self.data
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':
        """Create from dictionary received from queue."""
        resp_type = WorkerResponseType(data['type'])
        response_data = {k: v for k, v in data.items() if k != 'type'}
        return cls(type=resp_type, data=response_data)

In [None]:
# Test WorkerResponse
response = WorkerResponse(
    type=WorkerResponseType.RESULT,
    data={'job_id': 'test-123', 'status': 'success', 'data': {'output': 'result'}}
)
response_dict = response.to_dict()
response_dict

{'type': 'result',
 'job_id': 'test-123',
 'status': 'success',
 'data': {'output': 'result'}}

In [None]:
#| export
@dataclass
class WorkerStreamChunk:
    """Structure for streaming job results."""
    job_id: str  # Unique identifier for the job
    chunk: str  # Text chunk from streaming output
    is_final: bool = False  # Whether this is the final chunk
    metadata: Optional[Dict[str, Any]] = None  # Optional metadata

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for queue serialization."""
        return {
            'type': WorkerResponseType.STREAM_CHUNK.value,
            'job_id': self.job_id,
            'chunk': self.chunk,
            'is_final': self.is_final,
            'metadata': self.metadata or {}
        }

In [None]:
# Test WorkerStreamChunk
chunk = WorkerStreamChunk(
    job_id='test-456',
    chunk='This is a streaming chunk',
    is_final=False,
    metadata={'index': 1}
)
chunk.to_dict()

{'type': 'stream_chunk',
 'job_id': 'test-456',
 'chunk': 'This is a streaming chunk',
 'is_final': False,
 'metadata': {'index': 1}}

In [None]:
#| export
@dataclass
class WorkerResult:
    """Structure for job execution results."""
    job_id: str  # Unique identifier for the job
    status: str  # 'success' or 'error'
    data: Optional[Dict[str, Any]] = None  # Result data on success
    error: Optional[str] = None  # Error message on failure

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for queue serialization."""
        result = {
            'type': WorkerResponseType.RESULT.value,
            'job_id': self.job_id,
            'status': self.status,
        }
        if self.data is not None:
            result['data'] = self.data
        if self.error is not None:
            result['error'] = self.error
        return result

In [None]:
# Test WorkerResult - success case
result = WorkerResult(
    job_id='test-789',
    status='success',
    data={'text': 'Job completed successfully', 'metadata': {'time': 1.5}}
)
result.to_dict()

{'type': 'result',
 'job_id': 'test-789',
 'status': 'success',
 'data': {'text': 'Job completed successfully', 'metadata': {'time': 1.5}}}

In [None]:
# Test WorkerResult - error case
error_result = WorkerResult(
    job_id='test-999',
    status='error',
    error='Plugin execution failed: Invalid input'
)
error_result.to_dict()

{'type': 'result',
 'job_id': 'test-999',
 'status': 'error',
 'error': 'Plugin execution failed: Invalid input'}

## Plugin Manager Protocol

The `PluginManagerAdapter` is a Protocol class that uses structural subtyping (duck typing). Any class that implements these methods will satisfy the protocol, even without explicit inheritance.

This protocol defines the interface that plugin managers must implement to work with the worker system. It handles:
- Plugin discovery and loading
- Plugin execution (both standard and streaming)
- Plugin lifecycle management (reload/unload)
- Streaming capability detection

In [None]:
#| export
class PluginManagerAdapter(Protocol):
    """
    Protocol that plugin managers must satisfy for worker integration.

    Uses structural subtyping (duck typing) - plugin managers don't need to
    explicitly inherit from this, they just need to implement these methods.
    """

    def discover_plugins(self) -> list:
        """
        Discover available plugins.

        Returns:
            List of plugin metadata/data objects
        """
        ...

    def load_plugin(self, plugin_data: Any, config: Dict[str, Any]) -> None:
        """
        Load a plugin with configuration.

        Args:
            plugin_data: Plugin metadata/data from discovery
            config: Plugin configuration dictionary
        """
        ...

    def execute_plugin(self, plugin_name: str, **params) -> Any:
        """
        Execute a plugin with given parameters.

        Args:
            plugin_name: Name of the plugin to execute
            **params: Plugin-specific parameters

        Returns:
            Plugin execution result
        """
        ...

    def execute_plugin_stream(self, plugin_name: str, **params) -> Iterator[str]:
        """
        Execute a plugin with streaming output.

        Args:
            plugin_name: Name of the plugin to execute
            **params: Plugin-specific parameters

        Yields:
            String chunks from plugin execution
        """
        ...

    def reload_plugin(self, plugin_name: str, config: Optional[Dict[str, Any]] = None) -> None:
        """
        Reload a plugin with new configuration.

        Args:
            plugin_name: Name of the plugin to reload
            config: New configuration (None to unload)
        """
        ...

    def unload_plugin(self, plugin_name: str) -> None:
        """
        Unload a plugin to free resources.

        Args:
            plugin_name: Name of the plugin to unload
        """
        ...

    def check_streaming_support(self, plugin_name: str) -> bool:
        """
        Check if a plugin supports streaming execution.

        Args:
            plugin_name: Name of the plugin to check

        Returns:
            True if plugin supports streaming
        """
        ...

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()