Skip to content

F092: GRPCBroker and gRPC Streaming for Plugin Event System #334

@pocky

Description

@pocky

F092: GRPCBroker and gRPC Streaming for Plugin Event System

Scope

In Scope

  • GRPCBroker activation enabling plugins to emit events to the host at runtime via a reverse channel
  • HostEventService exposed through the broker for plugin→host event delivery
  • Plugin SDK wiring to discover and connect to HostEventService via the broker
  • Integration of broker-emitted events with the existing EventBus for cross-plugin routing
  • Server-side gRPC streaming RPC (StreamEvents) as an alternative to unary HandleEvent
  • StreamManager on the host side managing persistent streaming connections per plugin
  • Fallback from streaming to unary HandleEvent for plugins that do not support streaming
  • Benchmarks comparing unary vs streaming latency and throughput

Out of Scope

  • VersionedPlugins negotiation (Sprint 3 — prerequisite, tracked separately)
  • Reattach / long-lived plugin support (Sprint 5 — depends on this feature)
  • Health checking via grpc_health_v1 (orthogonal, can be added independently)
  • Multiplexing or TTY forwarding (no current use case)
  • Changes to AutoMTLS, SecureConfig, or logger forwarding (completed in F091)

Deferred

Item Rationale Follow-up
Bidirectional streaming (full duplex) Server-side streaming covers current needs; bidirectional adds connection management complexity without demonstrated benefit Future
Event persistence / replay Requires durable queue infrastructure; current in-memory EventBus is sufficient for synchronous workflow execution Future
Plugin-to-plugin direct channels Routing through the host EventBus maintains centralized cycle detection and access control Future
Rate limiting on broker emit No abuse vector exists — plugins are locally installed binaries; add if multi-tenant hosting emerges Future

User Stories

US1: Plugin Emits Events to Host at Runtime (P1 - Must Have)

As a plugin developer,
I want my plugin to emit events to the AWF host at runtime through a dedicated gRPC channel,
So that my plugin can signal state changes, progress, and custom domain events that other plugins and the core workflow engine can react to.

Why this priority: The current event system (F090) is unidirectional — host pushes events to plugins via HandleEvent. Plugins declare events.emit in their manifest but have no runtime channel to actually send events. This gap makes the emit declaration dead configuration and blocks any plugin-driven event workflow.

Acceptance Scenarios:

  1. Given a plugin with events.emit: ["custom.analysis.complete"] in its manifest, When the plugin calls HostEventService.Emit with event type custom.analysis.complete and a JSON payload, Then the host EventBus receives the event and routes it to all plugins subscribed to that pattern.
  2. Given plugin A emits custom.data.ready and plugin B subscribes to custom.data.*, When plugin A emits via the broker, Then plugin B receives the event through its HandleEvent RPC within 100ms.
  3. Given a plugin attempts to emit an event type not declared in its manifest events.emit, When the Emit call is made, Then the host rejects the call with a permission error and logs a warning.
  4. Given a plugin emits an event that would create a cycle (depth exceeds maxPropagationDepth), When the EventBus processes the event, Then propagation stops and a warning is logged with the cycle trace.

Independent Test: Install two test plugins — emitter and receiver. Run a workflow that triggers the emitter. Verify the receiver's HandleEvent is called with the emitted event payload.

US2: Streaming Event Delivery (P2 - Should Have)

As a workflow operator running event-heavy workflows,
I want the host to deliver events to plugins over a persistent gRPC stream instead of repeated unary calls,
So that event delivery has lower latency and reduced connection overhead when many events fire in rapid succession.

Why this priority: Unary HandleEvent works correctly but creates a new RPC round-trip per event. During parallel step execution with multiple plugins, this becomes N×M unary calls. Streaming amortizes connection setup and enables push-based delivery. However, the unary path is functional — streaming is an optimization, not a prerequisite.

Acceptance Scenarios:

  1. Given a plugin that implements the StreamEvents RPC, When the host detects streaming support during connection setup, Then the host uses the persistent stream for all subsequent event delivery to that plugin.
  2. Given a plugin that does NOT implement StreamEvents, When the host attempts to open a stream and receives Unimplemented, Then the host falls back to unary HandleEvent without error and logs the fallback at debug level.
  3. Given an active stream connection, When the plugin process crashes or the stream breaks, Then the StreamManager detects the disconnect within 5 seconds, cleans up resources, and falls back to unary delivery if the plugin reconnects.

Independent Test: Run a workflow emitting 100 events to a streaming-capable plugin. Verify all 100 events arrive in order. Compare wall-clock time against unary delivery with the same event count.

US3: Broker Channel Lifecycle Management (P3 - Nice to Have)

As a workflow operator,
I want the GRPCBroker channel to start and stop cleanly with the plugin lifecycle,
So that broker resources (ports, connections) do not leak between workflow runs and stale connections do not cause delivery failures.

Why this priority: With short-lived plugins (current default), the broker channel exists only during workflow execution. Lifecycle issues surface primarily under error conditions or rapid plugin restarts — important for robustness but not blocking initial adoption.

Acceptance Scenarios:

  1. Given a plugin is stopped via awf plugin stop or workflow completion, When the plugin process exits, Then the broker connection is closed, the HostEventService listener is deregistered, and no goroutines remain blocked on the channel.
  2. Given a plugin crashes mid-execution while holding an open broker connection, When the host detects the process exit, Then the broker connection is cleaned up within the existing rpc_manager shutdown sequence without deadlock.

Independent Test: Start and stop a plugin 50 times in sequence. Verify goroutine count returns to baseline after each cycle using runtime.NumGoroutine() in a test harness.

Edge Cases

  • What happens when a plugin emits an event while the host EventBus is shutting down? The emit call returns an error; the event is dropped and logged.
  • What happens when the broker port conflicts with another service? go-plugin GRPCBroker selects dynamic ports — collisions are handled by the OS. If binding fails, the broker returns an error during GRPCServer setup and the plugin starts without broker capability.
  • What happens when a streaming connection stalls (plugin stops reading but process is alive)? The StreamManager applies a per-message send timeout (5s default). After 3 consecutive timeouts, the stream is torn down and delivery falls back to unary.
  • What happens when a plugin emits at high frequency (>1000 events/sec)? The EventBus channel buffer (256) applies backpressure. If the buffer fills, the emit call blocks until space is available or the context is canceled.
  • How does the system handle concurrent emit calls from the same plugin? HostEventService.Emit is safe for concurrent use — each call is an independent gRPC request processed by the EventBus's existing synchronization.

Requirements

Functional Requirements

  • FR-001: System MUST activate GRPCBroker in goplugin.ClientConfig and pass the broker instance to GRPCServer/GRPCClient implementations instead of discarding it.
  • FR-002: System MUST expose a HostEventService on the broker's gRPC server with an Emit(EmitRequest) returns (EmitResponse) RPC.
  • FR-003: System MUST validate that emitted event types match the plugin's declared events.emit manifest patterns before routing to the EventBus.
  • FR-004: System MUST integrate broker-received events into the existing EventBus with full cycle detection (propagation_depth enforcement).
  • FR-005: Plugin SDK MUST provide a HostClient wrapper that plugins use to emit events, abstracting the broker connection details.
  • FR-006: System MUST define a StreamEvents server-side streaming RPC in the plugin proto that delivers events over a persistent connection.
  • FR-007: System MUST implement a StreamManager that tracks active streams per plugin and routes events to the appropriate delivery mechanism (stream or unary).
  • FR-008: System MUST fall back to unary HandleEvent when a plugin does not implement or support StreamEvents, detected via gRPC Unimplemented status.
  • FR-009: System MUST close broker connections and streaming channels during plugin shutdown without goroutine leaks or deadlocks.
  • FR-010: System MUST log all broker and streaming lifecycle events (connect, disconnect, fallback) at appropriate log levels (info for lifecycle, debug for per-event, warn for errors).

Non-Functional Requirements

  • NFR-001: Broker event emit round-trip (plugin emit → EventBus receive) MUST complete in under 50ms at p95 on localhost.
  • NFR-002: Streaming delivery MUST handle sustained throughput of 500 events/second per plugin without message loss.
  • NFR-003: No secrets, plugin binary paths, or internal addresses in log messages at info level or below.
  • NFR-004: Broker activation MUST NOT increase plugin startup time by more than 50ms compared to the current non-broker path.
  • NFR-005: StreamManager MUST detect broken streams within 5 seconds and complete fallback to unary within 1 second of detection.

Success Criteria

  • SC-001: A plugin can emit an event at runtime and a second subscribed plugin receives it within 100ms end-to-end, verified by integration test with two concurrent plugins.
  • SC-002: Event delivery over streaming uses at least 40% fewer gRPC round-trips than unary delivery when delivering 100+ events to a single plugin, measured by benchmark.
  • SC-003: Fallback from streaming to unary is transparent — a plugin without StreamEvents support receives events identically to the current behavior with zero configuration changes.
  • SC-004: Zero goroutine leaks after 50 plugin start/stop cycles, verified by test harness comparing runtime.NumGoroutine() before and after.
  • SC-005: All existing F090 event system tests continue to pass without modification, confirming backward compatibility.

Key Entities

Entity Description Key Attributes
HostEventService gRPC service exposed by the host via GRPCBroker, receives events emitted by plugins service ID (broker), allowed emit patterns (from manifest)
EmitRequest Event payload sent by a plugin to the host through the broker event_type, payload (bytes), source_plugin, propagation_depth, timestamp
EmitResponse Acknowledgment from host to plugin after processing an emit success (bool), error_message, event_id
StreamManager Host-side component managing persistent streaming connections per plugin active_streams (map plugin→stream), fallback registry, send timeout
EventStreamMessage Proto message used in the StreamEvents RPC for streaming delivery event (PluginEvent), sequence_number, ack_required

Assumptions

  • go-plugin v1.7.0's GRPCBroker API is stable and supports exposing custom gRPC services from the host side without version upgrade.
  • The existing EventBus channel buffer size (256) is sufficient for broker-emitted events; backpressure via blocking is acceptable behavior.
  • Plugins are single-process, single-connection — no need for multiplexed broker channels to the same plugin.
  • VersionedPlugins (Sprint 3) will be implemented before or concurrently with this feature to cleanly separate v1 (no broker) and v2 (with broker) plugin capabilities.
  • Server-side streaming (host→plugin) is sufficient; bidirectional streaming is not needed because plugin→host communication uses the broker's separate Emit RPC.

Metadata

  • Status: backlog
  • Version: v0.8.0
  • Priority: high
  • Estimation: L

Dependencies

  • Blocked by: F090 (event system — done), F091 (AutoMTLS/SecureConfig — done)
  • Unblocks: Reattach/long-lived plugins (Sprint 5), advanced plugin orchestration patterns

Clarifications

Section populated during clarify step with resolved ambiguities.

Notes

  • The GRPCBroker parameter is currently accepted but discarded (_) in both GRPCServer and GRPCClient methods of grpc_plugin.go. Activation requires storing the broker reference and using it to dial/serve the HostEventService.
  • Proto changes: EmitRequest/EmitResponse messages and HostEventService service definition added to proto/plugin/v1/plugin.proto. The StreamEvents RPC is added to the existing PluginService.
  • The StreamManager should use a registry pattern — each plugin registers its delivery preference (stream or unary) at connection time, and the EventBus dispatches through the StreamManager rather than directly calling HandleEvent.
  • Benchmark suite should compare: (a) 100 unary HandleEvent calls, (b) 100 events over a single stream, (c) mixed scenario with 5 plugins (3 streaming, 2 unary).
  • Files affected: rpc_manager.go, grpc_plugin.go, serve.go, event_bus.go, new host_service.go, new stream_manager.go, plugin.proto.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions