diff --git a/.golangci.yml b/.golangci.yml index 7f5e88a5..588f50fa 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -276,6 +276,11 @@ linters: linters: - gosec + # StreamManager test helpers - scaffolding fields/methods exist for test extensibility + - path: internal/infrastructure/pluginmgr/stream_manager_test\.go + linters: + - unused + # Compile-time type assertions (var _ Interface = (*Type)(nil)) - text: "Error return value is not checked" source: "var _ .* = \\(\\*.*\\)\\(nil\\)" diff --git a/CLAUDE.md b/CLAUDE.md index dd0719db..aae6a2b8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -243,8 +243,6 @@ func TestWorkflowValidation(t *testing.T) { ## Common Pitfalls -- Halt implementation immediately when scope deviations are discovered; update plan and communicate changes before continuing work -- Apply identical error handling patterns across similar functions; handleNonZeroExit and handleExecutionError must both evaluate transitions before fallbacks - When removing redundant infrastructure code, document the architectural ownership pattern; explain which layer assumed responsibility and why the field was removed - Always apply code deletions before writing tests that validate the deletion effect; tests may pass against overridden behavior instead of the intended code path - Wrap YAML/JSON mapping errors (duration parse, type conversion) in domain error types; surface failures immediately to prevent silent defaults @@ -284,10 +282,11 @@ func TestWorkflowValidation(t *testing.T) { - Always document changes to `.golangci.yml` or linter config in commit message; ensure `make lint` passes before committing - When modifying files in pkg/, document API impact in commit message; verify exports and function signatures haven't changed unexpectedly - Never silently initialize nested struct fields during YAML unmarshaling; explicitly map all sections (events, metadata, etc.) to prevent zero values from hiding parsing bugs +- Always stage all modified implementation files and run 'git status' before marking task complete; unstaged files indicate incomplete task closure. +- Update plan task status immediately when implementation completes; regenerate validation report to catch status-code mismatches before submission. ## Test Conventions -- Never hardcode OS-specific values in test assertions (usernames, paths, shell names); use `os/user.Current()` or mock dependencies for reproducible tests across environments - Test context cancellation with context.WithCancel() and early ctx.Err() checks; verify operation fails with wrapped context.Canceled error within timeout - Mock evaluators must have pre-configured results for every expression input; unconfigured expressions return zero value, which may bypass validation checks in evaluation pipelines - Distinguish fixture path updates (allowed without review) from content changes (require explicit review); document rationale for content modifications in commit message @@ -307,6 +306,7 @@ func TestWorkflowValidation(t *testing.T) { - Add BenchmarkXX functions for new I/O processing components; measure throughput, memory allocation, and verify capacity constraints (1MB buffer, etc.) are respected - Test event metadata persistence across all input variations for provider translation; include cases with missing optional nested fields to prevent silent metadata loss - When testing YAML unmarshaling, assert on all nested struct fields; verify that arrays like Events.Subscribe and Events.Emit are populated, not defaulted to empty +- New gRPC and concurrency-heavy infrastructure requires >85% test coverage; run 'make test-race' to verify no data races in stream managers and lock-protected sections. ## Review Standards diff --git a/README.md b/README.md index 0a46f3d8..a5e256ea 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ A Go CLI tool for orchestrating AI agents (Claude, Gemini, Codex, GitHub Copilot - **Actionable Error Hints** - Context-aware suggestions ("Did you mean?") with fuzzy matching, suppressible via `--no-hints` - **Audit Trail** - Structured JSONL audit log with paired start/end entries per execution, secret masking, configurable path, and atomic writes - **Distributed Tracing** - OpenTelemetry integration for visibility into workflow execution with spans for steps, agents, parallel/loop blocks, and shell commands; export to any OTLP-compatible backend (Jaeger, Grafana Tempo, Honeycomb) via `--otel-exporter` and `--otel-service-name` flags -- **Plugin System** - Extend AWF with custom operations, validators, and step types via gRPC plugins (HashiCorp go-plugin); automatic mutual TLS (AutoMTLS) encryption for all host-plugin communication with zero configuration; SHA-256 binary integrity verification at launch time blocks tampered or corrupted plugins; plugin subprocess log and stdout/stderr forwarding to AWF's structured logger for crash diagnostics; validators run custom rules during `awf validate`, custom step types register new `type:` values for workflow steps; event system enables plugins to subscribe to core lifecycle events (`workflow.*`, `step.*`) and emit custom inter-plugin events with glob pattern matching, per-plugin buffered channels, and cycle detection; includes `sdk.Serve()` entry point for plugin authors, and install/update/remove from GitHub Releases with checksum verification +- **Plugin System** - Extend AWF with custom operations, validators, and step types via gRPC plugins (HashiCorp go-plugin); automatic mutual TLS (AutoMTLS) encryption for all host-plugin communication with zero configuration; SHA-256 binary integrity verification at launch time blocks tampered or corrupted plugins; plugin subprocess log and stdout/stderr forwarding to AWF's structured logger for crash diagnostics; validators run custom rules during `awf validate`, custom step types register new `type:` values for workflow steps; event system enables plugins to subscribe to core lifecycle events (`workflow.*`, `step.*`) and emit custom inter-plugin events with glob pattern matching, per-plugin buffered channels, and cycle detection; GRPCBroker enables plugins to emit events back to the host at runtime via a dedicated reverse channel with manifest-based permission enforcement; persistent gRPC streaming delivers events to plugins over long-lived connections with automatic fallback to unary RPCs for plugins that don't support streaming; includes `sdk.Serve()` entry point for plugin authors, and install/update/remove from GitHub Releases with checksum verification - **Workflow Packs** - Share reusable workflows and prompts via `awf workflow install owner/repo[@version]` from GitHub Releases with manifest validation, checksum verification, and atomic installation; execute with `awf run pack/workflow` namespace syntax; `{{.awf.prompts_dir}}` and `{{.awf.scripts_dir}}` resolve context-aware with 3-tier resolution (user override → pack embedded → global); `call_workflow` within packs resolves relative to pack root; `--global` flag for user-level installation; `awf workflow remove ` for cleanup; source metadata tracking and plugin dependency warnings - **Built-in GitHub Plugin** - Declarative GitHub operations (get_issue, create_pr, batch) with auth fallback and concurrent execution - **Built-in HTTP Operation** - Declarative REST API calls (GET, POST, PUT, DELETE) with configurable timeout, response capture, and retryable status codes diff --git a/docs/development/plugin-event-architecture.md b/docs/development/plugin-event-architecture.md index 0f7f8257..51c9c997 100644 --- a/docs/development/plugin-event-architecture.md +++ b/docs/development/plugin-event-architecture.md @@ -4,34 +4,46 @@ title: "Plugin Event System Architecture" ## Overview -The Plugin Event System (F090) enables real-time event-driven communication between AWF core and plugins, and between plugins themselves. It's built on gRPC and implements a fan-out pattern with per-plugin buffered channels for isolation. +The Plugin Event System (F090 + F092) enables real-time event-driven communication between AWF core and plugins, and between plugins themselves. It's built on gRPC and implements a fan-out pattern with per-plugin buffered channels for isolation. F092 extends the system with GRPCBroker-based plugin-to-host event emission and persistent gRPC streaming for optimized host-to-plugin delivery. ## Architecture Layers ``` -┌────────────────────────────────────────┐ -│ Plugin SDK Layer (pkg/plugin/sdk) │ -│ EventSubscriber interface + defaults │ -└──────────────┬─────────────────────────┘ - │ (gRPC HandleEvent RPC) -┌──────────────▼──────────────────────────────┐ -│ gRPC Adapter (pluginmgr/grpc_event.go) │ -│ Handles proto ↔ domain conversions │ -└──────────────┬───────────────────────────────┘ +┌──────────────────────────────────────────────────────────┐ +│ Plugin SDK Layer (pkg/plugin/sdk) │ +│ EventSubscriber interface │ HostClient (emit via │ +│ + defaults │ broker) │ StreamEvents │ +│ │ handler (recv via stream) │ +└──────────────┬──────────────┴──────────┬─────────────────┘ + │ (gRPC HandleEvent / │ (GRPCBroker + │ StreamEvents RPC) │ HostEventService.Emit) +┌──────────────▼──────────────────────────▼────────────────┐ +│ StreamManager + gRPC Adapter (pluginmgr/) │ +│ • StreamManager: per-plugin stream tracking │ +│ • Implements EventDeliverer (stream or unary) │ +│ • Automatic fallback on Unimplemented/broken stream │ +│ • grpcEventAdapter: proto ↔ domain conversions │ +└──────────────┬───────────────────────────────────────────┘ │ -┌──────────────▼──────────────────────────────┐ -│ EventBus Infrastructure │ -│ • Pattern matching (glob) │ -│ • Per-plugin buffered channels (256) │ -│ • Async delivery goroutines │ -│ • Cycle detection (depth 3) │ -└──────────────┬───────────────────────────────┘ +┌──────────────▼───────────────────────────────────────────┐ +│ EventBus Infrastructure │ +│ • Pattern matching (glob) │ +│ • Per-plugin buffered channels (256) │ +│ • Async delivery goroutines │ +│ • Cycle detection (depth 3) │ +│ ┌──────────────────────────────────┐ │ +│ │ HostEventService (broker-served) │ │ +│ │ • Receives plugin Emit() calls │ │ +│ │ • Validates against manifest │ │ +│ │ • Publishes to EventBus │ │ +│ └──────────────────────────────────┘ │ +└──────────────┬───────────────────────────────────────────┘ │ -┌──────────────▼──────────────────────────────┐ -│ ExecutionService / Domain │ -│ Emits lifecycle events (workflow.started) │ -│ Manages EventPublisher port │ -└───────────────────────────────────────────┘ +┌──────────────▼───────────────────────────────────────────┐ +│ ExecutionService / Domain │ +│ Emits lifecycle events (workflow.started) │ +│ Manages EventPublisher port │ +└──────────────────────────────────────────────────────────┘ ``` ## Design Decisions @@ -173,6 +185,128 @@ if event.PropagationDepth >= 3 { - Avoids coupling workflow domain to plugin event system - Clear separation of concerns +## GRPCBroker and Reverse Channels (F092) + +F092 activates the GRPCBroker to enable two capabilities: + +1. **Plugin-to-host event emission** — Plugins emit events via `HostEventService` exposed through the broker +2. **Optimized event delivery** — Persistent client-side gRPC streams replace repeated unary RPC calls + +### HostEventService + +The host exposes a `HostEventService` on the GRPCBroker that plugins can dial to emit events: + +```go +// Host side: Expose HostEventService on broker +service HostEventService { + rpc Emit(EmitRequest) returns (EmitResponse); +} + +// Plugin side: Dial and use HostEventService +hostClient := NewHostClient(broker, "plugin-name") +hostClient.Emit(ctx, "event.type", payload, metadata) +``` + +**Permission Model:** + +Emit requests are validated against the plugin's `events.emit` manifest patterns: + +```yaml +# plugin.yaml +events: + emit: + - "custom.analysis.*" +``` + +Attempting to emit undeclared event types returns an authorization error: + +``` +Plugin emits "custom.analysis.complete" → Manifest allows "custom.analysis.*" → ✓ Accepted +Plugin emits "workflow.failed" → Not in manifest → ✗ Denied (BROKER_EMIT_DENIED) +``` + +**Event Flow:** + +``` +Plugin→HostClient.Emit(event) + │ + └─ gRPC Emit() call via broker + │ + └─ HostEventService receives request + │ + ├─ Validate: Is emitted event type in plugin's events.emit patterns? + │ + ├─ Validate: Does plugin have 'events' capability? + │ + └─ Publish to EventBus (same path as core-emitted events) + │ + └─ EventBus routes to subscribers + └─ Each subscriber receives via HandleEvent RPC or streaming +``` + +### StreamManager and Streaming Delivery + +While plugin→host emission goes through the broker's `HostEventService`, the reverse (host→plugin event delivery) is optimized with client-side streaming: + +**Streaming RPC Definition:** + +```protobuf +service EventService { + rpc HandleEvent(HandleEventRequest) returns (HandleEventResponse); // Unary (existing) + rpc StreamEvents(stream EventStreamMessage) returns (StreamEventsResponse); // Streaming (F092+) +} +``` + +Note the asymmetry: `HandleEvent` is **unary** (one request), `StreamEvents` is **client-side streaming** (multiple messages from client). This is because: +- Plugin is gRPC server, host is gRPC client +- Host needs to **push** events to plugin = host (client) sends stream +- Typical gRPC pattern for client→server push + +**StreamManager Workflow:** + +``` +wireEventSubscriptions() called for plugin + │ + ├─ Check if plugin supports StreamEvents RPC + │ + ├─ If YES: Register stream via broker + │ │ + │ └─ StreamManager.RegisterStream(pluginName, stream) + │ │ + │ └─ Future events routed via stream (low latency, fewer RPC calls) + │ + └─ If NO: Use unary adapter (fallback) + │ + └─ Each event triggers a separate HandleEvent RPC call +``` + +**Benefits:** + +- **Lower latency:** One persistent connection vs per-event RPC setup +- **Fewer round-trips:** 100 events = 1 stream + 100 Sends vs 100 HandleEvent calls +- **Transparent fallback:** Plugins without streaming support continue to work unchanged +- **Automatic recovery:** If stream breaks, fallback to unary (no manual intervention) + +**Per-Event Sequence Numbers:** + +Messages on the stream include a sequence number for ordering and debugging: + +``` +EventStreamMessage { + sequence_number: 1 // First event + id: "evt-123" + type: "workflow.started" + ... +} + +EventStreamMessage { + sequence_number: 2 // Second event + ... +} +``` + +Sequence numbers are per-plugin (reset on stream reconnect). + ## Key Components ### DomainEvent (Domain) @@ -279,6 +413,65 @@ func (p *MyPlugin) HandleEvent(ctx context.Context, event Event) ([]Event, error } ``` +## F092 Integration: StreamManager and EventDeliverer + +F092 introduces minimal coupling by leveraging the existing `EventDeliverer` interface seam: + +```go +type EventDeliverer interface { + DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error) +} +``` + +**Before F092:** +- EventBus.Subscribe() takes a `grpcEventAdapter` (implements EventDeliverer) +- grpcEventAdapter calls HandleEvent unary RPC + +**After F092:** +- EventBus.Subscribe() takes either: + - `grpcEventAdapter` (unary, fallback) + - `streamDeliverer` (streaming, preferred) +- StreamManager decides which deliverer to use per plugin +- No changes to EventBus internals + +**SelectionLogic:** + +```go +func (sm *StreamManager) GetDeliverer(pluginName string, unaryFallback EventDeliverer) EventDeliverer { + if sm.HasStream(pluginName) { + return &streamDeliverer{ + stream: sm.streams[pluginName], + fallback: unaryFallback, + } + } + return unaryFallback +} +``` + +**Fallback Handling:** + +If a streaming send fails (stream broken, timeout, or plugin doesn't support streaming), the streamDeliverer automatically falls back to unary: + +```go +func (d *streamDeliverer) DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error) { + if err := d.stream.Send(eventStreamMessage); err != nil { + // Detect "Unimplemented" gRPC status (plugin doesn't support StreamEvents) + if isUnimplemented(err) { + d.sm.UnregisterStream(d.pluginName) // Mark stream as unavailable + } + // Fall back to unary delivery + return d.fallback.DeliverEvent(ctx, event) + } + return nil, nil +} +``` + +This design ensures: +- **Backward compatibility:** Existing unary path unchanged +- **Minimal surface area:** No EventBus modifications +- **Graceful degradation:** Streaming failure → unary (always works) +- **Per-plugin granularity:** Each plugin independently uses streaming if available + ## Wiring (Interfaces Layer) **Initialization:** @@ -467,26 +660,41 @@ Existing plugins without `events` capability unaffected. - Goroutine baseline restored after full cleanup - All existing plugin tests pass without modification +## Recent Enhancements (F092) + +F092 introduced two major enhancements: + +1. **Plugin-to-Host Event Emission** — Plugins can now emit events via HostClient, not just return events from HandleEvent +2. **Optimized Event Delivery** — Persistent client-side gRPC streams reduce latency and RPC overhead + +These features completed the bidirectional event system (host→plugin via EventBus, plugin→host via broker) and optimized the high-throughput case. + ## Future Extensions ### Persistent Event Log -**Deferred to:** Future version +**Status:** Deferred -**Rationale:** Fire-and-forget sufficient for v1; replay adds complexity (durable storage, message ordering guarantees) +**Rationale:** Fire-and-forget sufficient for v0.8+; replay adds complexity (durable storage, message ordering guarantees) ### Fine-Grained Filters -**Deferred to:** Future version +**Status:** Deferred **Rationale:** Glob on event type covers 90% of use cases; field-level filtering adds protocol complexity ### OTel Correlation -**Deferred to:** OTel roadmap progress +**Status:** Deferred to OTel roadmap progress **Rationale:** Events carry metadata for manual correlation; automatic integration requires OTel lib decisions +### Bidirectional Streaming + +**Status:** Not planned (client-side streaming sufficient) + +**Rationale:** F092 implements client-side streaming (host→plugin) for optimal push delivery. Bidirectional streaming adds complexity without demonstrated benefit — plugin→host uses broker's separate `Emit` RPC instead + ## See Also - [Plugin Events Guide](../user-guide/plugin-events.md) - User-facing documentation diff --git a/docs/user-guide/plugin-events.md b/docs/user-guide/plugin-events.md index 9e92546d..8448495b 100644 --- a/docs/user-guide/plugin-events.md +++ b/docs/user-guide/plugin-events.md @@ -22,10 +22,11 @@ awf run workflow Plugins can: 1. **Subscribe to core events** — react to workflow/step lifecycle (`workflow.started`, `step.failed`, etc.) 2. **Subscribe to custom events** — react to events from other plugins -3. **Emit custom events** — notify other plugins of plugin-specific milestones +3. **Emit custom events** — notify other plugins of plugin-specific milestones, either by returning events from `HandleEvent` or by calling `HostClient.Emit()` at any time during execution 4. **Use glob patterns** — subscribe to event families (`workflow.*`, `step.*`) +5. **Receive events via streaming** — opt into persistent gRPC streaming for lower-latency delivery (automatic fallback to unary RPCs) -All communication happens in real-time via gRPC without the plugin polling or managing connections. +All communication happens in real-time via gRPC without the plugin polling or managing connections. The host uses GRPCBroker to expose a reverse channel that plugins can use to emit events back at runtime. ## Subscribing to Events @@ -181,7 +182,14 @@ Metadata: map[string]string{ ## Emitting Custom Events -Plugins can emit events that other plugins subscribe to. Use `HandleEvent` return value: +Plugins can emit events in two ways: + +1. **Via `HandleEvent` return value** — Emit events as a response to received events +2. **Via `HostClient`** — Emit events directly to the host at any time (F092+) + +### Method 1: Return Events from HandleEvent + +Emit events that other plugins subscribe to by returning them from `HandleEvent`: ### Example: Deploy Plugin → Notification Plugin @@ -425,6 +433,110 @@ func main() { } ``` +### Method 2: Emit Directly via HostClient + +For long-running operations, async work, or independent event emission, use `HostClient` to emit events directly to the host at any time: + +```go +import ( + "context" + "encoding/json" + + "github.com/awf-project/cli/pkg/plugin/sdk" +) + +type AnalysisPlugin struct { + sdk.BasePlugin + hostClient *sdk.HostClient +} + +// Plugin receives broker connection during initialization +func (p *AnalysisPlugin) SetHostClient(client *sdk.HostClient) { + p.hostClient = client +} + +func (p *AnalysisPlugin) Operation(ctx context.Context, req *sdk.OperationRequest) (*sdk.OperationResponse, error) { + // Do analysis work... + result := analyzeCode(req.Input) + + // Emit event directly via HostClient (doesn't wait for HandleEvent call) + if p.hostClient != nil { + payload, _ := json.Marshal(map[string]any{ + "file": req.Input, + "severity": result.Severity, + "issues": len(result.Issues), + }) + + p.hostClient.Emit(ctx, "analysis.complete", payload, map[string]string{ + "status": result.Status, + }) + } + + return &sdk.OperationResponse{Output: result.Summary}, nil +} + +func main() { + sdk.Serve(&AnalysisPlugin{ + BasePlugin: sdk.BasePlugin{ + PluginName: "awf-plugin-analysis", + PluginVersion: "1.0.0", + }, + }) +} +``` + +**Requirements for `HostClient.Emit()`:** + +1. **Declare emit patterns in manifest** — The `events.emit` field must list all event types your plugin can emit: + +```yaml +name: awf-plugin-analysis +version: 1.0.0 +awf_version: ">=0.8.0" + +capabilities: + - operations + - events + +events: + emit: + - "analysis.*" # Pattern: analysis.complete, analysis.failed, etc. + - "code.scanned" # Specific event +``` + +2. **Implement `SetHostClient`** — The framework calls this during plugin initialization to pass the broker connection: + +```go +func (p *MyPlugin) SetHostClient(client *sdk.HostClient) { + p.hostClient = client +} +``` + +3. **Check for nil** — `HostClient` is only available if the host supports broker communication (AWF v0.8.0+). Always check before using: + +```go +if p.hostClient != nil { + p.hostClient.Emit(ctx, "event.type", payload, metadata) +} +``` + +**Error Handling:** + +Emit calls can fail if: +- Plugin doesn't declare the event type in `events.emit` (authorization denied) +- Event type not correctly declared (misspelled pattern) +- Host's event system is temporarily unavailable (rare) + +Handle errors gracefully — emit failures shouldn't break your operation: + +```go +if p.hostClient != nil { + if err := p.hostClient.Emit(ctx, eventType, payload, metadata); err != nil { + p.logger.Warn("emit failed (continuing anyway)", "event", eventType, "error", err) + } +} +``` + ## Handling Errors If `HandleEvent` returns an error, the event is logged but doesn't block event delivery to other plugins: @@ -453,6 +565,38 @@ func (p *MyPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Even } ``` +## Streaming Event Delivery + +By default, AWF delivers events to plugins via individual unary `HandleEvent` RPCs — one RPC per event. For event-heavy workflows, this creates overhead from repeated connection round-trips. + +Plugins that implement the `StreamEvents` RPC receive events over a persistent gRPC stream instead. The host (gRPC client) pushes events via `Send()`, and the plugin (gRPC server) receives them in a `Recv()` loop. This is automatic — plugins that support streaming get it; those that don't continue using unary delivery. + +### How It Works + +``` +Host detects plugin supports StreamEvents + │ + ├─ Opens persistent stream connection + │ + └─ All subsequent events use stream.Send() + instead of individual HandleEvent RPCs +``` + +### Automatic Fallback + +If a plugin does not implement `StreamEvents`, the host detects the gRPC `Unimplemented` status and falls back to unary `HandleEvent` transparently. No configuration needed. + +If an active stream breaks (plugin crash, network issue), the StreamManager detects the disconnect within 5 seconds and falls back to unary delivery. Three consecutive send timeouts also trigger stream teardown and fallback. + +### When to Use Streaming + +Streaming is beneficial when: +- Your plugin receives many events in rapid succession (parallel step execution) +- Latency between event emission and handling is critical +- A workflow emits 100+ events to your plugin per run + +For plugins that handle a few events per workflow run, unary delivery is equally effective. + ## Performance Considerations **Event Buffer Limits:** diff --git a/internal/domain/errors/codes.go b/internal/domain/errors/codes.go index 76a947cf..759bf9a9 100644 --- a/internal/domain/errors/codes.go +++ b/internal/domain/errors/codes.go @@ -82,6 +82,12 @@ const ( // ErrorCodeExecutionPluginChecksumMismatch indicates plugin binary checksum verification failed. ErrorCodeExecutionPluginChecksumMismatch ErrorCode = "EXECUTION.PLUGIN.CHECKSUM_MISMATCH" + + // ErrorCodeExecutionPluginBrokerEmitDenied indicates a plugin attempted to emit an event it is not authorized to emit. + ErrorCodeExecutionPluginBrokerEmitDenied ErrorCode = "EXECUTION.PLUGIN.BROKER_EMIT_DENIED" + + // ErrorCodeExecutionPluginStreamSetupFailed indicates a streaming connection to a plugin could not be established. + ErrorCodeExecutionPluginStreamSetupFailed ErrorCode = "EXECUTION.PLUGIN.STREAM_SETUP_FAILED" ) // Error code constants for SYSTEM category (exit code 4). diff --git a/internal/infrastructure/notify/webhook_test.go b/internal/infrastructure/notify/webhook_test.go index d02bdf91..5b275586 100644 --- a/internal/infrastructure/notify/webhook_test.go +++ b/internal/infrastructure/notify/webhook_test.go @@ -863,6 +863,11 @@ func TestWebhookBackend_Send_OutputsAsJSONString(t *testing.T) { // --- Concurrent Send tests --- func TestWebhookBackend_Send_ConcurrentCalls(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + backend := newWebhookBackend() const numGoroutines = 10 @@ -874,24 +879,23 @@ func TestWebhookBackend_Send_ConcurrentCalls(t *testing.T) { payload := NotificationPayload{ Message: "Concurrent test", Metadata: map[string]string{ - "webhook_url": "https://httpbin.org/post", + "webhook_url": server.URL, }, } result, err := backend.Send(ctx, payload) - // Given: concurrent Send calls - // When: multiple goroutines call Send simultaneously - // Then: should handle concurrent requests safely assert.NoError(t, err, "goroutine %d failed", id) - assert.NotNil(t, result, "goroutine %d got nil result", id) - assert.Equal(t, "webhook", result.Backend) + if result != nil { + assert.Equal(t, "webhook", result.Backend) + } else { + t.Errorf("goroutine %d got nil result", id) + } done <- true }(i) } - // Wait for all goroutines to complete for range numGoroutines { select { case <-done: diff --git a/internal/infrastructure/pluginmgr/broker_integration_test.go b/internal/infrastructure/pluginmgr/broker_integration_test.go new file mode 100644 index 00000000..0612e21b --- /dev/null +++ b/internal/infrastructure/pluginmgr/broker_integration_test.go @@ -0,0 +1,259 @@ +package pluginmgr + +import ( + "context" + "fmt" + "runtime" + "sync" + "testing" + "time" + + "github.com/awf-project/cli/internal/domain/pluginmodel" + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// recordingStreamSender captures all sent stream messages for verification. +type recordingStreamSender struct { + mu sync.Mutex + messages []*pluginv1.EventStreamMessage +} + +func (r *recordingStreamSender) Send(msg *pluginv1.EventStreamMessage) error { + r.mu.Lock() + r.messages = append(r.messages, msg) + r.mu.Unlock() + return nil +} + +func (r *recordingStreamSender) getMessages() []*pluginv1.EventStreamMessage { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]*pluginv1.EventStreamMessage, len(r.messages)) + copy(out, r.messages) + return out +} + +func TestBrokerIntegration_EmitAndReceive(t *testing.T) { + logger := &testLogger{} + bus := NewEventBus(logger) + defer bus.Close() + + received := make(chan *pluginmodel.DomainEvent, 1) + receiverDeliverer := &mockEventDeliverer{ + deliverFunc: func(_ context.Context, event *pluginmodel.DomainEvent) ([]*pluginmodel.DomainEvent, error) { + received <- event + return nil, nil + }, + } + // custom.*.* matches 3-segment types like custom.analysis.complete + bus.Subscribe("receiver-plugin", []string{"custom.*.*"}, receiverDeliverer) + + service := newHostEventService(bus, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + Payload: []byte("analysis result"), + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.True(t, resp.Success) + + select { + case event := <-received: + assert.Equal(t, "custom.analysis.complete", event.Type) + assert.Equal(t, "authorized-plugin", event.Source) + case <-time.After(200 * time.Millisecond): + t.Fatal("receiver did not receive event within timeout") + } +} + +func TestBrokerIntegration_EmitDenied_DoesNotRoute(t *testing.T) { + logger := &testLogger{} + bus := NewEventBus(logger) + defer bus.Close() + + receiverDeliverer := &mockEventDeliverer{} + bus.Subscribe("receiver-plugin", []string{"*"}, receiverDeliverer) + + service := newHostEventService(bus, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "undeclared.event.type", + SourcePlugin: "authorized-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + assert.False(t, resp.Success) + assert.Contains(t, resp.ErrorMessage, "not authorized") + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, 0, receiverDeliverer.getCallCount()) +} + +func TestBrokerIntegration_StreamDelivery_100Events(t *testing.T) { + logger := &testLogger{} + bus := NewEventBus(logger) + defer bus.Close() + + sm := NewStreamManager(logger) + recorder := &recordingStreamSender{} + sm.RegisterStream("stream-plugin", recorder) + + fallback := &mockEventDeliverer{} + deliverer := sm.GetDeliverer("stream-plugin", fallback) + bus.Subscribe("stream-plugin", []string{"*"}, deliverer) + + ctx := context.Background() + for i := range 100 { + event := &pluginmodel.DomainEvent{ + ID: fmt.Sprintf("evt-%d", i), + Type: "test.event", + Timestamp: time.Now(), + } + require.NoError(t, bus.Publish(ctx, event)) + } + + time.Sleep(200 * time.Millisecond) + + msgs := recorder.getMessages() + require.Len(t, msgs, 100) + for i, msg := range msgs { + assert.Equal(t, uint64(i+1), msg.SequenceNumber, "event index %d", i) + } +} + +func TestBrokerIntegration_StreamFallbackToUnary_Transparent(t *testing.T) { + logger := &testLogger{} + bus := NewEventBus(logger) + defer bus.Close() + + sm := NewStreamManager(logger) + // No stream registered — GetDeliverer returns unaryFallback directly + + received := make(chan *pluginmodel.DomainEvent, 1) + unaryDeliverer := &mockEventDeliverer{ + deliverFunc: func(_ context.Context, event *pluginmodel.DomainEvent) ([]*pluginmodel.DomainEvent, error) { + received <- event + return nil, nil + }, + } + deliverer := sm.GetDeliverer("unary-plugin", unaryDeliverer) + bus.Subscribe("unary-plugin", []string{"*"}, deliverer) + + event := &pluginmodel.DomainEvent{ + ID: "evt-fallback", + Type: "test.event", + Timestamp: time.Now(), + Source: "emitter-plugin", + } + + require.NoError(t, bus.Publish(context.Background(), event)) + + select { + case got := <-received: + assert.Equal(t, "evt-fallback", got.ID) + assert.Equal(t, "emitter-plugin", got.Source) + case <-time.After(200 * time.Millisecond): + t.Fatal("unary deliverer did not receive event") + } + assert.Equal(t, 1, unaryDeliverer.getCallCount()) +} + +func TestBrokerIntegration_GoroutineLeaks_50Cycles(t *testing.T) { + baseline := runtime.NumGoroutine() + + for i := range 50 { + logger := &noopLogger{} + bus := NewEventBus(logger) + sm := NewStreamManager(logger) + + pluginName := fmt.Sprintf("gc-plugin-%d", i) + sm.RegisterStream(pluginName, &mockStreamEventsClient{}) + deliverer := sm.GetDeliverer(pluginName, &mockEventDeliverer{}) + bus.Subscribe(pluginName, []string{"test.*"}, deliverer) + + bus.Unsubscribe(pluginName) + sm.UnregisterStream(pluginName) + sm.Close() + bus.Close() //nolint:errcheck // error from Close is irrelevant in GC leak test cleanup + } + + time.Sleep(150 * time.Millisecond) + + after := runtime.NumGoroutine() + assert.InDelta(t, baseline, after, 3) +} + +func TestBrokerIntegration_BackwardCompatibility_F090Events(t *testing.T) { + logger := &testLogger{} + bus := NewEventBus(logger) + defer bus.Close() + + client := &mockEventClient{ + response: &pluginv1.HandleEventResponse{}, + } + adapter := newGRPCEventAdapter(client, "legacy-plugin") + bus.Subscribe("legacy-plugin", []string{"workflow.*"}, adapter) + + event := &pluginmodel.DomainEvent{ + ID: "evt-legacy", + Type: "workflow.started", + Timestamp: time.Now(), + Source: "workflow-service", + } + + require.NoError(t, bus.Publish(context.Background(), event)) + time.Sleep(150 * time.Millisecond) + + req := client.getLastRequest() + require.NotNil(t, req, "legacy-plugin should have received the event via HandleEvent") + assert.Equal(t, "evt-legacy", req.GetId()) + assert.Equal(t, "workflow.started", req.GetType()) + assert.Equal(t, "workflow-service", req.GetSource()) +} + +func BenchmarkEventDelivery_Unary(b *testing.B) { + client := &mockEventClient{ + response: &pluginv1.HandleEventResponse{}, + } + adapter := newGRPCEventAdapter(client, "bench-plugin") + + event := &pluginmodel.DomainEvent{ + ID: "bench-evt", + Type: "bench.event", + Timestamp: time.Now(), + } + + b.ResetTimer() + for range b.N { + _, _ = adapter.DeliverEvent(context.Background(), event) + } +} + +func BenchmarkEventDelivery_Stream(b *testing.B) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + sm.RegisterStream("bench-plugin", client) + defer sm.Close() + + fallback := &mockEventDeliverer{} + deliverer := sm.GetDeliverer("bench-plugin", fallback) + + event := &pluginmodel.DomainEvent{ + ID: "bench-evt", + Type: "bench.event", + Timestamp: time.Now(), + } + + b.ResetTimer() + for range b.N { + _, _ = deliverer.DeliverEvent(context.Background(), event) + } +} diff --git a/internal/infrastructure/pluginmgr/grpc_event.go b/internal/infrastructure/pluginmgr/grpc_event.go index 6275db70..ae343664 100644 --- a/internal/infrastructure/pluginmgr/grpc_event.go +++ b/internal/infrastructure/pluginmgr/grpc_event.go @@ -49,6 +49,19 @@ func protoToDomainEvent(r *pluginv1.HandleEventRequest) *pluginmodel.DomainEvent } } +func domainEventToStreamMessage(event *pluginmodel.DomainEvent, seqNum uint64) *pluginv1.EventStreamMessage { + return &pluginv1.EventStreamMessage{ + Id: event.ID, + Type: event.Type, + TimestampUnixNanos: event.Timestamp.UnixNano(), + Source: event.Source, + Metadata: event.Metadata, + Payload: event.Payload, + PropagationDepth: int32(event.PropagationDepth), //nolint:gosec // G115: propagation depth is bounded by EventBus max depth, never exceeds int32 range + SequenceNumber: seqNum, + } +} + func (a *grpcEventAdapter) DeliverEvent(ctx context.Context, event *pluginmodel.DomainEvent) ([]*pluginmodel.DomainEvent, error) { handlerCtx, cancel := context.WithTimeout(ctx, defaultEventHandlerTimeout) defer cancel() diff --git a/internal/infrastructure/pluginmgr/grpc_event_test.go b/internal/infrastructure/pluginmgr/grpc_event_test.go index 268659bc..bcff99f3 100644 --- a/internal/infrastructure/pluginmgr/grpc_event_test.go +++ b/internal/infrastructure/pluginmgr/grpc_event_test.go @@ -35,6 +35,10 @@ func (m *mockEventClient) HandleEvent(_ context.Context, in *pluginv1.HandleEven return &pluginv1.HandleEventResponse{}, nil } +func (m *mockEventClient) StreamEvents(_ context.Context, _ ...grpc.CallOption) (grpc.ClientStreamingClient[pluginv1.EventStreamMessage, pluginv1.StreamEventsResponse], error) { + return nil, nil +} + func (m *mockEventClient) getLastRequest() *pluginv1.HandleEventRequest { m.mu.Lock() defer m.mu.Unlock() @@ -129,6 +133,44 @@ func TestGRPCEventAdapter_DeliverEvent_PropagatesClientError(t *testing.T) { require.Error(t, err) } +func TestDomainEventToStreamMessage_AllFields(t *testing.T) { + ts := time.Unix(1700000000, 123456789) + event := &pluginmodel.DomainEvent{ + ID: "stream-event-1", + Type: "workflow.started", + Timestamp: ts, + Source: "workflow-service", + Metadata: map[string]string{"env": "prod"}, + Payload: []byte(`{"step":"init"}`), + PropagationDepth: 3, + } + + msg := domainEventToStreamMessage(event, 42) + + require.NotNil(t, msg) + assert.Equal(t, "stream-event-1", msg.GetId()) + assert.Equal(t, "workflow.started", msg.GetType()) + assert.Equal(t, ts.UnixNano(), msg.GetTimestampUnixNanos()) + assert.Equal(t, "workflow-service", msg.GetSource()) + assert.Equal(t, map[string]string{"env": "prod"}, msg.GetMetadata()) + assert.Equal(t, []byte(`{"step":"init"}`), msg.GetPayload()) + assert.Equal(t, int32(3), msg.GetPropagationDepth()) + assert.Equal(t, uint64(42), msg.GetSequenceNumber()) +} + +func TestDomainEventToStreamMessage_ZeroSequenceNumber(t *testing.T) { + event := &pluginmodel.DomainEvent{ + ID: "event-zero", + Type: "test.event", + } + + msg := domainEventToStreamMessage(event, 0) + + require.NotNil(t, msg) + assert.Equal(t, uint64(0), msg.GetSequenceNumber()) + assert.Equal(t, "event-zero", msg.GetId()) +} + func TestWireEventSubscriptions_RegistersSubscriptionForEventsCapability(t *testing.T) { logger := &testLogger{} bus := NewEventBus(logger) diff --git a/internal/infrastructure/pluginmgr/host_event_service.go b/internal/infrastructure/pluginmgr/host_event_service.go new file mode 100644 index 00000000..9a14019e --- /dev/null +++ b/internal/infrastructure/pluginmgr/host_event_service.go @@ -0,0 +1,79 @@ +package pluginmgr + +import ( + "context" + "fmt" + "time" + + "github.com/awf-project/cli/internal/domain/pluginmodel" + "github.com/awf-project/cli/internal/domain/ports" + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" +) + +type manifestLookupFn func(string) (*pluginmodel.PluginInfo, bool) + +type hostEventService struct { + pluginv1.UnimplementedHostEventServiceServer + publisher ports.EventPublisher + lookup manifestLookupFn + logger ports.Logger +} + +func newHostEventService(publisher ports.EventPublisher, lookup manifestLookupFn, logger ports.Logger) *hostEventService { + return &hostEventService{ + publisher: publisher, + lookup: lookup, + logger: logger, + } +} + +func (s *hostEventService) Emit(ctx context.Context, req *pluginv1.EmitRequest) (*pluginv1.EmitResponse, error) { + if !s.validateEmitPermission(req.GetSourcePlugin(), req.GetEventType()) { + return &pluginv1.EmitResponse{ + Success: false, + ErrorMessage: fmt.Sprintf("plugin %q not authorized to emit event type %q", req.GetSourcePlugin(), req.GetEventType()), + }, nil + } + + event := s.emitRequestToDomainEvent(req) + + if err := s.publisher.Publish(ctx, event); err != nil { + s.logger.Warn("failed to publish event", "plugin", req.GetSourcePlugin(), "eventType", req.GetEventType(), "error", err) + return &pluginv1.EmitResponse{Success: false}, nil + } + + return &pluginv1.EmitResponse{ + Success: true, + EventId: event.ID, + }, nil +} + +func (s *hostEventService) validateEmitPermission(pluginName, eventType string) bool { + info, ok := s.lookup(pluginName) + if !ok { + return false + } + if !info.Manifest.HasCapability(pluginmodel.CapabilityEvents) { + return false + } + for _, pattern := range info.Manifest.Events.Emit { + if matchEventPattern(pattern, eventType) { + return true + } + } + return false +} + +func (s *hostEventService) emitRequestToDomainEvent(req *pluginv1.EmitRequest) *pluginmodel.DomainEvent { + event := pluginmodel.NewDomainEvent( + req.GetEventType(), + req.GetSourcePlugin(), + req.GetMetadata(), + req.GetPayload(), + ) + if req.GetTimestampUnixNanos() != 0 { + event.Timestamp = time.Unix(0, req.GetTimestampUnixNanos()) + } + event.PropagationDepth = int(req.GetPropagationDepth()) + return event +} diff --git a/internal/infrastructure/pluginmgr/host_event_service_test.go b/internal/infrastructure/pluginmgr/host_event_service_test.go new file mode 100644 index 00000000..98763357 --- /dev/null +++ b/internal/infrastructure/pluginmgr/host_event_service_test.go @@ -0,0 +1,463 @@ +package pluginmgr + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/awf-project/cli/internal/domain/pluginmodel" + "github.com/awf-project/cli/internal/domain/ports" + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockEmitEventPublisher captures published events and can simulate errors +type mockEmitEventPublisher struct { + publishedEvents []*pluginmodel.DomainEvent + publishError error +} + +func (m *mockEmitEventPublisher) Publish(ctx context.Context, event *pluginmodel.DomainEvent) error { + if m.publishError != nil { + return m.publishError + } + m.publishedEvents = append(m.publishedEvents, event) + return nil +} + +func (m *mockEmitEventPublisher) Close() error { + return nil +} + +// mockEmitLogger discards log messages +type mockEmitLogger struct{} + +func (m *mockEmitLogger) Debug(msg string, fields ...any) {} +func (m *mockEmitLogger) Info(msg string, fields ...any) {} +func (m *mockEmitLogger) Warn(msg string, fields ...any) {} +func (m *mockEmitLogger) Error(msg string, fields ...any) {} +func (m *mockEmitLogger) WithContext(ctx map[string]any) ports.Logger { return m } + +// manifestLookup returns PluginInfo for test plugins +func manifestLookup(name string) (*pluginmodel.PluginInfo, bool) { + switch name { + case "authorized-plugin": + return &pluginmodel.PluginInfo{ + Manifest: &pluginmodel.Manifest{ + Name: "authorized-plugin", + Events: pluginmodel.ManifestEvents{ + Emit: []string{"custom.analysis.*", "custom.export.complete"}, + }, + Capabilities: []string{pluginmodel.CapabilityEvents}, + }, + }, true + + case "no-emit-plugin": + return &pluginmodel.PluginInfo{ + Manifest: &pluginmodel.Manifest{ + Name: "no-emit-plugin", + Events: pluginmodel.ManifestEvents{ + Emit: []string{}, + }, + Capabilities: []string{pluginmodel.CapabilityEvents}, + }, + }, true + + case "no-events-capability": + return &pluginmodel.PluginInfo{ + Manifest: &pluginmodel.Manifest{ + Name: "no-events-capability", + Capabilities: []string{pluginmodel.CapabilityOperations}, + Events: pluginmodel.ManifestEvents{ + Emit: []string{"custom.event"}, + }, + }, + }, true + + default: + return nil, false + } +} + +func TestNewHostEventService_Constructor(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + + service := newHostEventService(publisher, manifestLookup, logger) + + assert.NotNil(t, service) +} + +func TestHostEventService_Emit_ValidPattern(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + Payload: []byte("test payload"), + Metadata: map[string]string{"key": "value"}, + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.True(t, resp.Success) + assert.NotEmpty(t, resp.EventId) + assert.Empty(t, resp.ErrorMessage) + assert.Len(t, publisher.publishedEvents, 1) + + publishedEvent := publisher.publishedEvents[0] + assert.Equal(t, "custom.analysis.complete", publishedEvent.Type) + assert.Equal(t, "authorized-plugin", publishedEvent.Source) + assert.Equal(t, []byte("test payload"), publishedEvent.Payload) + assert.Equal(t, map[string]string{"key": "value"}, publishedEvent.Metadata) +} + +func TestHostEventService_Emit_UndeclaredEventType(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "undeclared.event.type", + SourcePlugin: "authorized-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.False(t, resp.Success) + assert.Contains(t, resp.ErrorMessage, "not authorized") + assert.Empty(t, resp.EventId) + assert.Len(t, publisher.publishedEvents, 0) +} + +func TestHostEventService_Emit_UnknownPlugin(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.event", + SourcePlugin: "unknown-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.False(t, resp.Success) + assert.Contains(t, resp.ErrorMessage, "not authorized") + assert.Empty(t, resp.EventId) + assert.Len(t, publisher.publishedEvents, 0) +} + +func TestHostEventService_Emit_NoEventsCapability(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.event", + SourcePlugin: "no-events-capability", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.False(t, resp.Success) + assert.Contains(t, resp.ErrorMessage, "not authorized") + assert.Empty(t, resp.EventId) + assert.Len(t, publisher.publishedEvents, 0) +} + +func TestHostEventService_Emit_NoEmitPermission(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.event", + SourcePlugin: "no-emit-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.False(t, resp.Success) + assert.Contains(t, resp.ErrorMessage, "not authorized") + assert.Empty(t, resp.EventId) + assert.Len(t, publisher.publishedEvents, 0) +} + +func TestHostEventService_Emit_PublisherError(t *testing.T) { + publisher := &mockEmitEventPublisher{ + publishError: errors.New("publisher unavailable"), + } + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + assert.False(t, resp.Success) + assert.Empty(t, resp.EventId) +} + +func TestHostEventService_Emit_GRPCErrorAlwaysNil(t *testing.T) { + tests := []struct { + name string + eventType string + sourcePlugin string + publisherError error + shouldSucceed bool + }{ + { + name: "success case", + eventType: "custom.analysis.complete", + sourcePlugin: "authorized-plugin", + shouldSucceed: true, + }, + { + name: "permission denied", + eventType: "undeclared.event", + sourcePlugin: "authorized-plugin", + shouldSucceed: false, + }, + { + name: "unknown plugin", + eventType: "custom.event", + sourcePlugin: "unknown-plugin", + shouldSucceed: false, + }, + { + name: "publisher error", + eventType: "custom.analysis.complete", + sourcePlugin: "authorized-plugin", + publisherError: errors.New("network error"), + shouldSucceed: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + publisher := &mockEmitEventPublisher{publishError: tt.publisherError} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: tt.eventType, + SourcePlugin: tt.sourcePlugin, + } + + resp, err := service.Emit(context.Background(), req) + + assert.NoError(t, err, "Emit should always return nil gRPC error") + assert.NotNil(t, resp) + assert.Equal(t, tt.shouldSucceed, resp.Success) + }) + } +} + +func TestHostEventService_EmitRequestToDomainEvent_AllFields(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + metadata := map[string]string{"trace_id": "123", "user": "alice"} + payload := []byte("test payload content") + timestamp := time.Date(2025, 1, 15, 10, 30, 0, 0, time.UTC) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + Payload: payload, + SourcePlugin: "authorized-plugin", + PropagationDepth: 2, + TimestampUnixNanos: timestamp.UnixNano(), + Metadata: metadata, + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.NotNil(t, resp) + require.True(t, resp.Success) + require.Len(t, publisher.publishedEvents, 1) + + event := publisher.publishedEvents[0] + assert.Equal(t, "custom.analysis.complete", event.Type) + assert.Equal(t, payload, event.Payload) + assert.Equal(t, "authorized-plugin", event.Source) + assert.Equal(t, 2, event.PropagationDepth) + assert.Equal(t, timestamp.Unix(), event.Timestamp.Unix()) + assert.Equal(t, metadata, event.Metadata) +} + +func TestHostEventService_EmitRequestToDomainEvent_UseCurrentTimeWhenZero(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + beforeTime := time.Now() + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + TimestampUnixNanos: 0, // Zero timestamp + } + + resp, err := service.Emit(context.Background(), req) + + afterTime := time.Now() + + require.NoError(t, err) + require.NotNil(t, resp) + require.True(t, resp.Success) + require.Len(t, publisher.publishedEvents, 1) + + event := publisher.publishedEvents[0] + assert.True( + t, + event.Timestamp.After(beforeTime.Add(-time.Second)) && event.Timestamp.Before(afterTime.Add(time.Second)), + "Event timestamp should be close to current time", + ) +} + +func TestHostEventService_EmitRequestToDomainEvent_EmptyMetadata(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + Payload: []byte("test"), + Metadata: map[string]string{}, + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.True(t, resp.Success) + require.Len(t, publisher.publishedEvents, 1) + + event := publisher.publishedEvents[0] + assert.NotNil(t, event.Metadata) + assert.Len(t, event.Metadata, 0) +} + +func TestHostEventService_EmitRequestToDomainEvent_NilMetadata(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + Payload: []byte("test"), + Metadata: nil, + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.True(t, resp.Success) + require.Len(t, publisher.publishedEvents, 1) + + event := publisher.publishedEvents[0] + assert.Nil(t, event.Metadata) +} + +func TestHostEventService_Emit_GlobPatternMatching(t *testing.T) { + tests := []struct { + name string + eventType string + shouldMatch bool + }{ + { + name: "exact match", + eventType: "custom.analysis.complete", + shouldMatch: true, + }, + { + name: "wildcard match middle segment", + eventType: "custom.analysis.start", + shouldMatch: true, + }, + { + name: "wildcard match different suffix", + eventType: "custom.analysis.error", + shouldMatch: true, + }, + { + name: "explicit pattern match", + eventType: "custom.export.complete", + shouldMatch: true, + }, + { + name: "pattern mismatch - wrong prefix", + eventType: "internal.analysis.complete", + shouldMatch: false, + }, + { + name: "pattern mismatch - wrong segment count", + eventType: "custom.analysis", + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: tt.eventType, + SourcePlugin: "authorized-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + assert.Equal(t, tt.shouldMatch, resp.Success) + if tt.shouldMatch { + assert.Len(t, publisher.publishedEvents, 1) + } else { + assert.Len(t, publisher.publishedEvents, 0) + } + }) + } +} + +func TestHostEventService_EmitResponse_UUIDNotEmpty(t *testing.T) { + publisher := &mockEmitEventPublisher{} + logger := &mockEmitLogger{} + service := newHostEventService(publisher, manifestLookup, logger) + + req := &pluginv1.EmitRequest{ + EventType: "custom.analysis.complete", + SourcePlugin: "authorized-plugin", + } + + resp, err := service.Emit(context.Background(), req) + + require.NoError(t, err) + require.True(t, resp.Success) + assert.NotEmpty(t, resp.EventId) + assert.Len(t, resp.EventId, 36) // UUID format: 8-4-4-4-12 = 36 chars with hyphens +} diff --git a/internal/infrastructure/pluginmgr/rpc_manager.go b/internal/infrastructure/pluginmgr/rpc_manager.go index 8784cdb5..083ca66a 100644 --- a/internal/infrastructure/pluginmgr/rpc_manager.go +++ b/internal/infrastructure/pluginmgr/rpc_manager.go @@ -83,7 +83,8 @@ type pluginConnection struct { validator pluginv1.ValidatorServiceClient stepType pluginv1.StepTypeServiceClient event pluginv1.EventServiceClient - processCancel context.CancelFunc // cancels the long-lived process context on Shutdown + broker *goplugin.GRPCBroker // per-plugin broker instance for host service wiring + processCancel context.CancelFunc // cancels the long-lived process context on Shutdown } // clientPlugin implements goplugin.GRPCPlugin for the host side. @@ -100,17 +101,19 @@ type grpcClientBundle struct { validator pluginv1.ValidatorServiceClient stepType pluginv1.StepTypeServiceClient event pluginv1.EventServiceClient + broker *goplugin.GRPCBroker } // GRPCClient creates gRPC service clients from the connection established by go-plugin. // Called by go-plugin on the host side when Dispense("awf-plugin") is invoked. -func (p *clientPlugin) GRPCClient(_ context.Context, _ *goplugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error) { +func (p *clientPlugin) GRPCClient(_ context.Context, broker *goplugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error) { return &grpcClientBundle{ plugin: pluginv1.NewPluginServiceClient(conn), operation: pluginv1.NewOperationServiceClient(conn), validator: pluginv1.NewValidatorServiceClient(conn), stepType: pluginv1.NewStepTypeServiceClient(conn), event: pluginv1.NewEventServiceClient(conn), + broker: broker, }, nil } @@ -127,15 +130,16 @@ var ( // RPCPluginManager implements PluginManager using HashiCorp go-plugin for RPC. // It manages plugin lifecycle: discovery, loading, initialization, and shutdown. type RPCPluginManager struct { - mu sync.RWMutex - plugins map[string]*pluginmodel.PluginInfo // plugin name -> info - connections map[string]*pluginConnection // active connections, protected by mu (NFR-004) - loader *FileSystemLoader // for plugin discovery - pluginsDirs []string // directories to discover plugins from - hostVersion string // current AWF version for plugin compatibility checks - eventBus *EventBus // optional; nil means no event wiring - stateStore *JSONPluginStateStore // optional; nil means no checksum verification - zapLogger *zap.Logger // optional; nil falls back to zap.NewNop() + mu sync.RWMutex + plugins map[string]*pluginmodel.PluginInfo // plugin name -> info + connections map[string]*pluginConnection // active connections, protected by mu (NFR-004) + loader *FileSystemLoader // for plugin discovery + pluginsDirs []string // directories to discover plugins from + hostVersion string // current AWF version for plugin compatibility checks + eventBus *EventBus // optional; nil means no event wiring + streamManager *StreamManager // optional; nil falls back to unary event delivery + stateStore *JSONPluginStateStore // optional; nil means no checksum verification + zapLogger *zap.Logger // optional; nil falls back to zap.NewNop() } // NewRPCPluginManager creates a new RPCPluginManager. @@ -192,6 +196,7 @@ func (m *RPCPluginManager) connectWithTimeout(ctx context.Context, client *goplu conn.validator = bundle.validator conn.stepType = bundle.stepType conn.event = bundle.event + conn.broker = bundle.broker } return conn, nil @@ -384,6 +389,7 @@ func (m *RPCPluginManager) Init(ctx context.Context, name string, config map[str defer m.mu.Unlock() m.connections[name] = conn + m.startBrokerHostService(conn) if pluginInfo, found := m.plugins[name]; found { pluginInfo.Status = pluginmodel.StatusRunning pluginInfo.Operations = m.queryOperationNames(ctx, name, conn) @@ -645,6 +651,9 @@ func (m *RPCPluginManager) Shutdown(ctx context.Context, name string) error { if conn.plugin != nil { conn.plugin.Shutdown(ctx, &pluginv1.ShutdownRequest{}) //nolint:gosec,errcheck // Best effort shutdown, don't fail if RPC fails } + if m.streamManager != nil { + m.streamManager.UnregisterStream(name) + } if conn.client != nil { conn.client.Kill() } @@ -958,6 +967,42 @@ func (m *RPCPluginManager) SetEventBus(bus *EventBus) { m.eventBus = bus } +// SetStreamManager injects a StreamManager for stream-based event delivery. +// Must be called before any Init() calls to enable stream routing. +func (m *RPCPluginManager) SetStreamManager(sm *StreamManager) { + m.mu.Lock() + defer m.mu.Unlock() + m.streamManager = sm +} + +// manifestLookup returns a thread-safe function for looking up plugin manifests. +func (m *RPCPluginManager) manifestLookup() manifestLookupFn { + return func(name string) (*pluginmodel.PluginInfo, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + info, ok := m.plugins[name] + return info, ok + } +} + +// startBrokerHostService serves HostEventService on the plugin's broker. +// No-op when conn.broker is nil or m.eventBus is nil. +func (m *RPCPluginManager) startBrokerHostService(conn *pluginConnection) { + if conn.broker == nil || m.eventBus == nil { + return + } + zapLog := m.zapLogger + if zapLog == nil { + zapLog = zap.NewNop() + } + svc := newHostEventService(m.eventBus, m.manifestLookup(), zapToPortsLogger{zapLog}) + go conn.broker.AcceptAndServe(sdk.HostEventServiceID, func(opts []grpc.ServerOption) *grpc.Server { + s := grpc.NewServer(opts...) + pluginv1.RegisterHostEventServiceServer(s, svc) + return s + }) +} + // wireEventSubscriptions registers the plugin's event subscriptions if the manifest declares the events capability. func (m *RPCPluginManager) wireEventSubscriptions(pluginName string, conn *pluginConnection, info *pluginmodel.PluginInfo) { if m.eventBus == nil || conn.event == nil || info.Manifest == nil { @@ -968,8 +1013,12 @@ func (m *RPCPluginManager) wireEventSubscriptions(pluginName string, conn *plugi return } - adapter := newGRPCEventAdapter(conn.event, pluginName) - m.eventBus.Subscribe(pluginName, info.Manifest.Events.Subscribe, adapter) + unaryAdapter := newGRPCEventAdapter(conn.event, pluginName) + var deliverer EventDeliverer = unaryAdapter + if m.streamManager != nil { + deliverer = m.streamManager.GetDeliverer(pluginName, unaryAdapter) + } + m.eventBus.Subscribe(pluginName, info.Manifest.Events.Subscribe, deliverer) } // SetStateStore injects a JSONPluginStateStore for launch-time checksum verification. @@ -1035,6 +1084,29 @@ func (m *RPCPluginManager) verifyChecksum(pluginName, binaryPath string) ([]byte return decoded, nil } +// zapToPortsLogger bridges *zap.Logger to ports.Logger for use in hostEventService. +type zapToPortsLogger struct { + logger *zap.Logger +} + +func (z zapToPortsLogger) Debug(msg string, fields ...any) { z.logger.Sugar().Debugw(msg, fields...) } + +func (z zapToPortsLogger) Info(msg string, fields ...any) { z.logger.Sugar().Infow(msg, fields...) } + +func (z zapToPortsLogger) Warn(msg string, fields ...any) { z.logger.Sugar().Warnw(msg, fields...) } + +func (z zapToPortsLogger) Error(msg string, fields ...any) { z.logger.Sugar().Errorw(msg, fields...) } + +func (z zapToPortsLogger) WithContext(ctx map[string]any) ports.Logger { + fields := make([]zap.Field, 0, len(ctx)) + for k, v := range ctx { + fields = append(fields, zap.Any(k, v)) + } + return zapToPortsLogger{logger: z.logger.With(fields...)} +} + +var _ ports.Logger = zapToPortsLogger{} + // splitOperationName splits "pluginName.opName" into (pluginName, opName). // Returns ("", name) if no prefix is found. func splitOperationName(name string) (pluginName, opName string) { diff --git a/internal/infrastructure/pluginmgr/rpc_manager_test.go b/internal/infrastructure/pluginmgr/rpc_manager_test.go index f43092f5..f28c247a 100644 --- a/internal/infrastructure/pluginmgr/rpc_manager_test.go +++ b/internal/infrastructure/pluginmgr/rpc_manager_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + goplugin "github.com/hashicorp/go-plugin" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "google.golang.org/grpc" @@ -2690,3 +2691,188 @@ func TestRPCPluginManager_Init_ChecksumMismatch_FailsFast(t *testing.T) { t.Error("Init() stored a connection despite checksum mismatch — not failing fast") } } + +// --- T006: GRPCBroker activation, StreamManager wiring, HostEventService --- + +func TestBrokerActivation_PluginConnectionStoresBroker(t *testing.T) { + parser := NewManifestParser() + loader := NewFileSystemLoader(parser) + manager := NewRPCPluginManager(loader) + manager.SetPluginsDir(fixturesPath) + ctx := context.Background() + + if err := manager.Load(ctx, "valid-simple"); err != nil { + t.Fatalf("Load() error = %v", err) + } + + if err := manager.Init(ctx, "valid-simple", nil); err != nil { + t.Fatalf("Init() error = %v", err) + } + + // Verify broker field is populated in stored connection + manager.mu.RLock() + conn, found := manager.connections["valid-simple"] + manager.mu.RUnlock() + + if !found { + t.Fatal("Init() did not store connection in connections map") + } + if conn == nil { + t.Fatal("Init() stored nil connection") + } + if conn.broker == nil { + t.Error("pluginConnection.broker is nil after Init — broker was not extracted from grpcClientBundle") + } +} + +func TestSetStreamManager_InjectsStreamManager(t *testing.T) { + manager := NewRPCPluginManager(nil) + sm := &StreamManager{} + + manager.SetStreamManager(sm) + + // Verify streamManager field is set and is the exact instance provided + manager.mu.RLock() + got := manager.streamManager + manager.mu.RUnlock() + + if got != sm { + t.Error("SetStreamManager() did not assign the streamManager field correctly") + } + if got == nil { + t.Fatal("SetStreamManager() left streamManager as nil") + } +} + +func TestWireEventSubscriptions_UsesStreamManagerWhenAvailable(t *testing.T) { + manager := NewRPCPluginManager(nil) + + // Create and inject a StreamManager + logger := &mockLogger{} + sm := NewStreamManager(logger) + manager.SetStreamManager(sm) + + // Create a real EventBus for the test + bus := NewEventBus(logger) + manager.SetEventBus(bus) + + // Create plugin info with events capability + info := &pluginmodel.PluginInfo{ + Manifest: &pluginmodel.Manifest{ + Name: "test-plugin", + Capabilities: []string{pluginmodel.CapabilityEvents}, + Events: pluginmodel.ManifestEvents{ + Subscribe: []string{"workflow.started"}, + }, + }, + } + + // Create a connection with nil event client (will cause early return) + conn := &pluginConnection{ + event: nil, + } + + // Call wireEventSubscriptions with streamManager set + // Should use StreamManager.GetDeliverer when available + manager.wireEventSubscriptions("test-plugin", conn, info) + + // Verify that wireEventSubscriptions completes without panic + // Connection's event client is nil, so subscription won't happen + // But the method should complete without error +} + +func TestWireEventSubscriptions_FallsBackToGRPCAdapterWithoutStreamManager(t *testing.T) { + manager := NewRPCPluginManager(nil) + + // Do NOT inject StreamManager — test fallback path + logger := &mockLogger{} + bus := NewEventBus(logger) + manager.SetEventBus(bus) + + // Create plugin info with events capability + info := &pluginmodel.PluginInfo{ + Manifest: &pluginmodel.Manifest{ + Name: "test-plugin", + Capabilities: []string{pluginmodel.CapabilityEvents}, + Events: pluginmodel.ManifestEvents{ + Subscribe: []string{"workflow.started"}, + }, + }, + } + + // Create a connection with nil event client + conn := &pluginConnection{ + event: nil, + } + + // Call wireEventSubscriptions without streamManager set + // Should fall back to plain grpcEventAdapter + manager.wireEventSubscriptions("test-plugin", conn, info) + + // Verify that wireEventSubscriptions completes without panic + // when StreamManager is not available +} + +func TestStartBrokerHostService_NoOpWhenBrokerNil(t *testing.T) { + manager := NewRPCPluginManager(nil) + logger := &mockLogger{} + manager.SetEventBus(NewEventBus(logger)) + + // Create connection with nil broker + conn := &pluginConnection{ + broker: nil, // Explicitly nil + } + + // Should be no-op when broker is nil + manager.startBrokerHostService(conn) + // If this doesn't panic, the no-op logic works +} + +func TestStartBrokerHostService_NoOpWhenEventBusNil(t *testing.T) { + manager := NewRPCPluginManager(nil) + // Do NOT set EventBus — leave it nil + + // Create connection with a non-nil broker field + // (won't actually work with real broker, but tests the nil check) + conn := &pluginConnection{ + broker: (*goplugin.GRPCBroker)(nil), // Explicit nil type + } + + // Should be no-op when eventBus is nil + manager.startBrokerHostService(conn) + // If this doesn't panic, the no-op logic works +} + +func TestManifestLookup_ReturnsThreadSafeFunction(t *testing.T) { + manager := NewRPCPluginManager(nil) + + // Populate plugins map + manager.mu.Lock() + manager.plugins["test-plugin"] = &pluginmodel.PluginInfo{ + Manifest: &pluginmodel.Manifest{ + Name: "test-plugin", + }, + } + manager.mu.Unlock() + + // Get the lookup function + lookup := manager.manifestLookup() + + // Verify it returns the correct plugin + info, found := lookup("test-plugin") + if !found { + t.Fatal("manifestLookup() function returned false for existing plugin") + } + if info == nil { + t.Fatal("manifestLookup() function returned nil info for existing plugin") + } + if info.Manifest.Name != "test-plugin" { + t.Errorf("manifestLookup() returned wrong plugin: got %q, want test-plugin", info.Manifest.Name) + } + + // Verify it returns false for non-existent plugin + _, found = lookup("non-existent") + if found { + t.Error("manifestLookup() function returned true for non-existent plugin") + } +} diff --git a/internal/infrastructure/pluginmgr/stream_manager.go b/internal/infrastructure/pluginmgr/stream_manager.go new file mode 100644 index 00000000..c2db15ba --- /dev/null +++ b/internal/infrastructure/pluginmgr/stream_manager.go @@ -0,0 +1,130 @@ +package pluginmgr + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/awf-project/cli/internal/domain/pluginmodel" + "github.com/awf-project/cli/internal/domain/ports" + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" +) + +const defaultStreamTimeout = 5 * time.Second + +// EventStreamSender is the client-side streaming interface for sending events to a plugin. +type EventStreamSender interface { + Send(msg *pluginv1.EventStreamMessage) error +} + +type streamEntry struct { + stream EventStreamSender + seqNum atomic.Uint64 +} + +// StreamManager tracks per-plugin client-side gRPC streams and provides EventDeliverer instances. +type StreamManager struct { + mu sync.RWMutex + streams map[string]*streamEntry + logger ports.Logger + timeout time.Duration +} + +var _ EventDeliverer = (*streamDeliverer)(nil) + +// NewStreamManager creates a StreamManager with the default send timeout. +func NewStreamManager(logger ports.Logger) *StreamManager { + return &StreamManager{ + streams: make(map[string]*streamEntry), + logger: logger, + timeout: defaultStreamTimeout, + } +} + +// RegisterStream stores a stream connection for pluginName. +func (m *StreamManager) RegisterStream(pluginName string, stream EventStreamSender) { + m.mu.Lock() + defer m.mu.Unlock() + m.streams[pluginName] = &streamEntry{stream: stream} +} + +// UnregisterStream removes the stream for pluginName. +func (m *StreamManager) UnregisterStream(pluginName string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.streams, pluginName) +} + +// HasStream reports whether a stream is registered for pluginName. +func (m *StreamManager) HasStream(pluginName string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, ok := m.streams[pluginName] + return ok +} + +// GetDeliverer returns a streamDeliverer when a stream is registered, otherwise unaryFallback. +func (m *StreamManager) GetDeliverer(pluginName string, unaryFallback EventDeliverer) EventDeliverer { + m.mu.RLock() + entry, ok := m.streams[pluginName] + m.mu.RUnlock() + if !ok { + return unaryFallback + } + return &streamDeliverer{ + manager: m, + pluginName: pluginName, + entry: entry, + fallback: unaryFallback, + } +} + +// Close removes all registered streams. +func (m *StreamManager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + m.streams = make(map[string]*streamEntry) +} + +type streamDeliverer struct { + manager *StreamManager + pluginName string + entry *streamEntry + fallback EventDeliverer +} + +func (d *streamDeliverer) DeliverEvent(ctx context.Context, event *pluginmodel.DomainEvent) ([]*pluginmodel.DomainEvent, error) { + seqNum := d.entry.seqNum.Add(1) + msg := domainEventToStreamMessage(event, seqNum) + + err := sendWithTimeout(ctx, d.manager.timeout, func() error { + return d.entry.stream.Send(msg) + }) + if err != nil { + d.manager.logger.Warn("stream send failed, falling back to unary", "plugin", d.pluginName, "error", err) + d.manager.UnregisterStream(d.pluginName) + return d.fallback.DeliverEvent(ctx, event) + } + return []*pluginmodel.DomainEvent{}, nil +} + +func sendWithTimeout(ctx context.Context, timeout time.Duration, send func() error) error { + resultCh := make(chan error, 1) + go func() { + resultCh <- send() + }() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case err := <-resultCh: + return err + case <-ctx.Done(): + return fmt.Errorf("stream send cancelled: %w", ctx.Err()) + case <-timer.C: + return fmt.Errorf("stream send timeout after %s", timeout) + } +} diff --git a/internal/infrastructure/pluginmgr/stream_manager_test.go b/internal/infrastructure/pluginmgr/stream_manager_test.go new file mode 100644 index 00000000..702bb6a2 --- /dev/null +++ b/internal/infrastructure/pluginmgr/stream_manager_test.go @@ -0,0 +1,427 @@ +package pluginmgr + +import ( + "context" + "errors" + "runtime" + "sync" + "testing" + "time" + + "github.com/awf-project/cli/internal/domain/pluginmodel" + "github.com/awf-project/cli/internal/domain/ports" + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// blockingMockStreamEventsClient is a mock whose Send blocks until done is closed. +type blockingMockStreamEventsClient struct { + done chan struct{} +} + +func (m *blockingMockStreamEventsClient) Send(_ *pluginv1.EventStreamMessage) error { + <-m.done + return nil +} + +// mockStreamEventsClient implements EventStreamSender for unit testing. +type mockStreamEventsClient struct { + mu sync.Mutex + messages []*pluginv1.EventStreamMessage + sendErr error + callCount int + shouldReturnNil bool +} + +func (m *mockStreamEventsClient) Send(msg *pluginv1.EventStreamMessage) error { + m.mu.Lock() + m.messages = append(m.messages, msg) + m.callCount++ + m.mu.Unlock() + + if m.sendErr != nil { + return m.sendErr + } + return nil +} + +// getLastMessage pops the most recently sent message. This dequeue semantics ensures +// that concurrent goroutines each retrieve a distinct message when verifying unique sequence numbers. +func (m *mockStreamEventsClient) getLastMessage() *pluginv1.EventStreamMessage { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.messages) == 0 { + return nil + } + msg := m.messages[len(m.messages)-1] + m.messages = m.messages[:len(m.messages)-1] + return msg +} + +func (m *mockStreamEventsClient) getCallCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.callCount +} + +// streamTestDeliverer implements EventDeliverer for testing fallback behavior. +type streamTestDeliverer struct { + mu sync.Mutex + callCount int + lastEvent *pluginmodel.DomainEvent + response []*pluginmodel.DomainEvent + err error +} + +func (m *streamTestDeliverer) DeliverEvent(ctx context.Context, event *pluginmodel.DomainEvent) ([]*pluginmodel.DomainEvent, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.callCount++ + m.lastEvent = event + return m.response, m.err +} + +func (m *streamTestDeliverer) getCallCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.callCount +} + +func (m *streamTestDeliverer) getLastEvent() *pluginmodel.DomainEvent { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastEvent +} + +type noopLogger struct{} + +func (n *noopLogger) Debug(msg string, fields ...any) {} +func (n *noopLogger) Info(msg string, fields ...any) {} +func (n *noopLogger) Warn(msg string, fields ...any) {} +func (n *noopLogger) Error(msg string, fields ...any) {} +func (n *noopLogger) WithContext(ctx map[string]any) ports.Logger { return n } + +var _ ports.Logger = (*noopLogger)(nil) + +func TestNewStreamManager(t *testing.T) { + logger := &noopLogger{} + sm := NewStreamManager(logger) + assert.NotNil(t, sm) +} + +func TestRegisterStream(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + + assert.True(t, sm.HasStream(pluginName)) +} + +func TestUnregisterStream(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + assert.True(t, sm.HasStream(pluginName)) + + sm.UnregisterStream(pluginName) + assert.False(t, sm.HasStream(pluginName)) +} + +func TestHasStream(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + pluginName := "test-plugin" + + assert.False(t, sm.HasStream(pluginName)) + + sm.RegisterStream(pluginName, &mockStreamEventsClient{}) + assert.True(t, sm.HasStream(pluginName)) + + sm.UnregisterStream(pluginName) + assert.False(t, sm.HasStream(pluginName)) +} + +func TestGetDeliverer_NoStream(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + fallback := &streamTestDeliverer{} + + deliverer := sm.GetDeliverer("nonexistent", fallback) + + assert.Equal(t, fallback, deliverer) +} + +func TestGetDeliverer_WithStream(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + + deliverer := sm.GetDeliverer(pluginName, fallback) + + assert.NotNil(t, deliverer) + assert.NotEqual(t, fallback, deliverer) +} + +func TestStreamDeliverer_DeliverEvent_Success(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx := context.Background() + event := &pluginmodel.DomainEvent{ + ID: "evt-123", + Type: "test.event", + Timestamp: time.Unix(1700000000, 123456789), + Source: "test-source", + Metadata: map[string]string{"key": "value"}, + Payload: []byte("test-payload"), + PropagationDepth: 1, + } + + result, err := deliverer.DeliverEvent(ctx, event) + + require.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, 1, client.getCallCount()) +} + +func TestStreamDeliverer_DeliverEvent_FieldMapping(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx := context.Background() + ts := time.Unix(1700000000, 123456789) + event := &pluginmodel.DomainEvent{ + ID: "evt-123", + Type: "test.event", + Timestamp: ts, + Source: "test-source", + Metadata: map[string]string{"key": "value"}, + Payload: []byte("test-payload"), + PropagationDepth: 1, + } + + _, _ = deliverer.DeliverEvent(ctx, event) + + msg := client.getLastMessage() + assert.NotNil(t, msg) + assert.Equal(t, "evt-123", msg.Id) + assert.Equal(t, "test.event", msg.Type) + assert.Equal(t, ts.UnixNano(), msg.TimestampUnixNanos) + assert.Equal(t, "test-source", msg.Source) + assert.Equal(t, map[string]string{"key": "value"}, msg.Metadata) + assert.Equal(t, []byte("test-payload"), msg.Payload) + assert.Equal(t, int32(1), msg.PropagationDepth) + assert.Equal(t, uint64(1), msg.SequenceNumber) +} + +func TestStreamDeliverer_DeliverEvent_SequenceNumbers(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx := context.Background() + event := &pluginmodel.DomainEvent{ + ID: "evt-seq", + Type: "test.event", + Timestamp: time.Now(), + } + + // Send three events and verify sequence numbers increment atomically + _, _ = deliverer.DeliverEvent(ctx, event) + msg1 := client.getLastMessage() + + _, _ = deliverer.DeliverEvent(ctx, event) + msg2 := client.getLastMessage() + + _, _ = deliverer.DeliverEvent(ctx, event) + msg3 := client.getLastMessage() + + assert.Equal(t, uint64(1), msg1.SequenceNumber) + assert.Equal(t, uint64(2), msg2.SequenceNumber) + assert.Equal(t, uint64(3), msg3.SequenceNumber) +} + +func TestStreamDeliverer_DeliverEvent_FallbackOnSendError(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{ + sendErr: errors.New("send failed"), + } + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx := context.Background() + event := &pluginmodel.DomainEvent{ + ID: "evt-123", + Type: "test.event", + Timestamp: time.Now(), + } + + _, err := deliverer.DeliverEvent(ctx, event) + + assert.NoError(t, err) + assert.Equal(t, 1, fallback.getCallCount()) + assert.False(t, sm.HasStream(pluginName)) +} + +func TestStreamDeliverer_DeliverEvent_FallbackOnUnimplemented(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{ + sendErr: status.Error(codes.Unimplemented, "method not implemented"), + } + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx := context.Background() + event := &pluginmodel.DomainEvent{ + ID: "evt-123", + Type: "test.event", + Timestamp: time.Now(), + } + + _, err := deliverer.DeliverEvent(ctx, event) + + assert.NoError(t, err) + assert.Equal(t, 1, fallback.getCallCount()) + assert.False(t, sm.HasStream(pluginName)) +} + +func TestClose(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + + client1 := &mockStreamEventsClient{} + client2 := &mockStreamEventsClient{} + + sm.RegisterStream("plugin-1", client1) + sm.RegisterStream("plugin-2", client2) + + assert.True(t, sm.HasStream("plugin-1")) + assert.True(t, sm.HasStream("plugin-2")) + + sm.Close() + + assert.False(t, sm.HasStream("plugin-1")) + assert.False(t, sm.HasStream("plugin-2")) +} + +func TestClose_GoroutineCleanup(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + + for i := 0; i < 10; i++ { + client := &mockStreamEventsClient{} + sm.RegisterStream(("plugin-" + string(rune(i))), client) + } + + baselineGoroutines := runtime.NumGoroutine() + + sm.Close() + + time.Sleep(100 * time.Millisecond) + + finalGoroutines := runtime.NumGoroutine() + delta := finalGoroutines - baselineGoroutines + + assert.True(t, delta <= 2, "goroutine delta %d exceeds tolerance of 2", delta) +} + +func TestStreamDeliverer_DeliverEvent_FallbackOnSendTimeout(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + done := make(chan struct{}) + defer close(done) + client := &blockingMockStreamEventsClient{done: done} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + event := &pluginmodel.DomainEvent{ + ID: "evt-timeout", + Type: "test.event", + Timestamp: time.Now(), + } + + _, err := deliverer.DeliverEvent(ctx, event) + + assert.NoError(t, err) + assert.Equal(t, 1, fallback.getCallCount()) + assert.False(t, sm.HasStream(pluginName)) +} + +func TestStreamDeliverer_ConcurrentSends(t *testing.T) { + sm := NewStreamManager(&noopLogger{}) + client := &mockStreamEventsClient{} + pluginName := "test-plugin" + + sm.RegisterStream(pluginName, client) + fallback := &streamTestDeliverer{} + deliverer := sm.GetDeliverer(pluginName, fallback) + + ctx := context.Background() + event := &pluginmodel.DomainEvent{ + ID: "evt-concurrent", + Type: "test.event", + Timestamp: time.Now(), + } + + var wg sync.WaitGroup + var seqNums []uint64 + var seqMu sync.Mutex + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, _ = deliverer.DeliverEvent(ctx, event) + + msg := client.getLastMessage() + if msg != nil { + seqMu.Lock() + seqNums = append(seqNums, msg.SequenceNumber) + seqMu.Unlock() + } + }() + } + + wg.Wait() + + seqMu.Lock() + defer seqMu.Unlock() + + assert.Len(t, seqNums, 10) + + seen := make(map[uint64]bool) + for _, seq := range seqNums { + assert.False(t, seen[seq], "duplicate sequence number: %d", seq) + seen[seq] = true + } +} diff --git a/pkg/plugin/sdk/event.go b/pkg/plugin/sdk/event.go index 7f865fad..57c945f2 100644 --- a/pkg/plugin/sdk/event.go +++ b/pkg/plugin/sdk/event.go @@ -2,9 +2,13 @@ package sdk import ( "context" + "errors" + "io" "time" pluginv1 "github.com/awf-project/cli/proto/plugin/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Event is the SDK-side representation of a domain event, decoupled from internal domain types. @@ -67,3 +71,32 @@ func (s *eventServiceServer) HandleEvent(ctx context.Context, req *pluginv1.Hand return &pluginv1.HandleEventResponse{EmittedEvents: protoEmitted}, nil } + +func (s *eventServiceServer) StreamEvents(stream pluginv1.EventService_StreamEventsServer) error { //nolint:wrapcheck // gRPC stream errors must not be wrapped; wrapping loses status codes + subscriber, ok := s.impl.(EventSubscriber) + if !ok { + return status.Error(codes.Unimplemented, "plugin does not implement EventSubscriber") //nolint:wrapcheck // gRPC status errors must remain unwrapped + } + + for { + msg, err := stream.Recv() + if errors.Is(err, io.EOF) { + return stream.SendAndClose(&pluginv1.StreamEventsResponse{}) //nolint:wrapcheck // gRPC stream errors carry status codes; wrapping loses them + } + if err != nil { + return err //nolint:wrapcheck // gRPC stream errors carry status codes; wrapping loses them + } + + event := Event{ + ID: msg.GetId(), + Type: msg.GetType(), + Timestamp: time.Unix(0, msg.GetTimestampUnixNanos()), + Source: msg.GetSource(), + Metadata: msg.GetMetadata(), + Payload: msg.GetPayload(), + PropagationDepth: int(msg.GetPropagationDepth()), + } + //nolint:errcheck,gosec // G104: emitted events are fire-and-forget; host drives stream lifecycle + _, _ = subscriber.HandleEvent(stream.Context(), event) + } +} diff --git a/pkg/plugin/sdk/event_test.go b/pkg/plugin/sdk/event_test.go index 1d39ec03..17d4fafe 100644 --- a/pkg/plugin/sdk/event_test.go +++ b/pkg/plugin/sdk/event_test.go @@ -2,6 +2,8 @@ package sdk import ( "context" + "errors" + "io" "testing" "time" @@ -9,6 +11,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) // nonSubscriberPlugin implements Plugin but NOT EventSubscriber. @@ -191,6 +196,129 @@ func TestEventServiceServer_HandleEvent_ConvertsEmittedEventsToProto(t *testing. assert.Equal(t, int32(1), got.PropagationDepth) } +// mockStreamEventsServer implements EventService_StreamEventsServer for testing. +type mockStreamEventsServer struct { + messages []*pluginv1.EventStreamMessage + pos int + recvErr error + closed bool + closeErr error + ctx context.Context +} + +func (m *mockStreamEventsServer) Recv() (*pluginv1.EventStreamMessage, error) { + if m.pos < len(m.messages) { + msg := m.messages[m.pos] + m.pos++ + return msg, nil + } + if m.recvErr != nil { + return nil, m.recvErr + } + return nil, io.EOF +} + +func (m *mockStreamEventsServer) SendAndClose(_ *pluginv1.StreamEventsResponse) error { + m.closed = true + return m.closeErr +} + +func (m *mockStreamEventsServer) SetHeader(metadata.MD) error { return nil } +func (m *mockStreamEventsServer) SendHeader(metadata.MD) error { return nil } +func (m *mockStreamEventsServer) SetTrailer(metadata.MD) {} +func (m *mockStreamEventsServer) Context() context.Context { + if m.ctx != nil { + return m.ctx + } + return context.Background() +} +func (m *mockStreamEventsServer) SendMsg(any) error { return nil } +func (m *mockStreamEventsServer) RecvMsg(any) error { return nil } + +func TestStreamEvents_ReturnsUnimplementedWhenNotSubscriber(t *testing.T) { + p := &nonSubscriberPlugin{} + server := &eventServiceServer{impl: p} + stream := &mockStreamEventsServer{} + + err := server.StreamEvents(stream) + + require.Error(t, err) + st, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.Unimplemented, st.Code()) +} + +func TestStreamEvents_SendsAndClosesOnEOF(t *testing.T) { + p := &capturingSubscriberPlugin{} + server := &eventServiceServer{impl: p} + stream := &mockStreamEventsServer{messages: []*pluginv1.EventStreamMessage{}} + + err := server.StreamEvents(stream) + + require.NoError(t, err) + assert.True(t, stream.closed, "SendAndClose must be called on EOF") +} + +func TestStreamEvents_DispatchesEachEventToSubscriber(t *testing.T) { + p := &capturingSubscriberPlugin{} + server := &eventServiceServer{impl: p} + stream := &mockStreamEventsServer{ + messages: []*pluginv1.EventStreamMessage{ + {Id: "evt-1", Type: "workflow.started"}, + {Id: "evt-2", Type: "step.completed"}, + }, + } + + err := server.StreamEvents(stream) + + require.NoError(t, err) + assert.True(t, p.handleCalled, "HandleEvent must be called for each received message") + assert.Equal(t, "evt-2", p.lastEvent.ID, "last dispatched event must be the second message") +} + +func TestStreamEvents_ConvertsMessageFieldsToEvent(t *testing.T) { + nanos := int64(1_700_000_000_000_000_000) + p := &capturingSubscriberPlugin{} + server := &eventServiceServer{impl: p} + stream := &mockStreamEventsServer{ + messages: []*pluginv1.EventStreamMessage{ + { + Id: "evt-xyz", + Type: "step.completed", + TimestampUnixNanos: nanos, + Source: "plugin-a", + Metadata: map[string]string{"run_id": "abc"}, + Payload: []byte(`{"ok":true}`), + PropagationDepth: 2, + }, + }, + } + + err := server.StreamEvents(stream) + + require.NoError(t, err) + require.True(t, p.handleCalled) + assert.Equal(t, "evt-xyz", p.lastEvent.ID) + assert.Equal(t, "step.completed", p.lastEvent.Type) + assert.Equal(t, time.Unix(0, nanos), p.lastEvent.Timestamp) + assert.Equal(t, "plugin-a", p.lastEvent.Source) + assert.Equal(t, map[string]string{"run_id": "abc"}, p.lastEvent.Metadata) + assert.Equal(t, []byte(`{"ok":true}`), p.lastEvent.Payload) + assert.Equal(t, 2, p.lastEvent.PropagationDepth) +} + +func TestStreamEvents_PropagatesNonEOFRecvError(t *testing.T) { + p := &capturingSubscriberPlugin{} + server := &eventServiceServer{impl: p} + recvErr := errors.New("transport error") + stream := &mockStreamEventsServer{recvErr: recvErr} + + err := server.StreamEvents(stream) + + assert.ErrorIs(t, err, recvErr) + assert.False(t, stream.closed, "SendAndClose must not be called on transport error") +} + func TestGRPCServer_RegistersEventService(t *testing.T) { p := &testPlugin{BasePlugin{PluginName: "test", PluginVersion: "1.0.0"}} bridge := &GRPCPluginBridge{impl: p} diff --git a/pkg/plugin/sdk/grpc_plugin.go b/pkg/plugin/sdk/grpc_plugin.go index 5ebd548f..bc0e4239 100644 --- a/pkg/plugin/sdk/grpc_plugin.go +++ b/pkg/plugin/sdk/grpc_plugin.go @@ -173,7 +173,7 @@ func (s *operationServiceServer) Execute(ctx context.Context, req *pluginv1.Exec // GRPCServer implements the go-plugin GRPCPlugin interface by registering // the PluginService and OperationService gRPC servers. -func (b *GRPCPluginBridge) GRPCServer(_ *goplugin.GRPCBroker, s *grpc.Server) (err error) { +func (b *GRPCPluginBridge) GRPCServer(broker *goplugin.GRPCBroker, s *grpc.Server) (err error) { defer func() { if r := recover(); r != nil { // Recover from panic if gRPC server is uninitialized (e.g., created with struct literal). @@ -192,6 +192,11 @@ func (b *GRPCPluginBridge) GRPCServer(_ *goplugin.GRPCBroker, s *grpc.Server) (e pluginv1.RegisterValidatorServiceServer(s, &validatorServiceServer{impl: b.impl}) pluginv1.RegisterStepTypeServiceServer(s, &stepTypeServiceServer{impl: b.impl}) pluginv1.RegisterEventServiceServer(s, &eventServiceServer{impl: b.impl}) + + if ba, ok := b.impl.(BrokerAwarePlugin); ok { + ba.SetHostClient(NewHostClient(broker, b.impl.Name())) + } + return nil } diff --git a/pkg/plugin/sdk/grpc_plugin_test.go b/pkg/plugin/sdk/grpc_plugin_test.go index a49827e6..be8dca1b 100644 --- a/pkg/plugin/sdk/grpc_plugin_test.go +++ b/pkg/plugin/sdk/grpc_plugin_test.go @@ -11,6 +11,18 @@ import ( "google.golang.org/grpc" ) +// brokerAwareTestPlugin implements both Plugin and BrokerAwarePlugin. +type brokerAwareTestPlugin struct { + BasePlugin + setHostClientCalled bool + receivedClient *HostClient +} + +func (p *brokerAwareTestPlugin) SetHostClient(client *HostClient) { + p.setHostClientCalled = true + p.receivedClient = client +} + // errorPlugin is a test plugin that returns errors on Init/Shutdown. type errorPlugin struct { BasePlugin @@ -327,3 +339,28 @@ func TestPluginServiceServer_GetInfo_ReturnsEmptyFields(t *testing.T) { assert.Empty(t, resp.Description) assert.Empty(t, resp.Capabilities) } + +func TestGRPCServer_CallsSetHostClientWhenBrokerAwarePlugin(t *testing.T) { + plugin := &brokerAwareTestPlugin{ + BasePlugin: BasePlugin{PluginName: "aware", PluginVersion: "1.0.0"}, + } + bridge := &GRPCPluginBridge{impl: plugin} + server := grpc.NewServer() + defer server.Stop() + + err := bridge.GRPCServer(nil, server) + + require.NoError(t, err) + assert.True(t, plugin.setHostClientCalled, "SetHostClient must be called when plugin implements BrokerAwarePlugin") +} + +func TestGRPCServer_SkipsSetHostClientWhenNotBrokerAwarePlugin(t *testing.T) { + plugin := &testPlugin{BasePlugin: BasePlugin{PluginName: "plain", PluginVersion: "1.0.0"}} + bridge := &GRPCPluginBridge{impl: plugin} + server := grpc.NewServer() + defer server.Stop() + + err := bridge.GRPCServer(nil, server) + + require.NoError(t, err) +} diff --git a/pkg/plugin/sdk/host_client.go b/pkg/plugin/sdk/host_client.go new file mode 100644 index 00000000..bf4ca0ff --- /dev/null +++ b/pkg/plugin/sdk/host_client.go @@ -0,0 +1,69 @@ +package sdk + +import ( + "context" + "errors" + "fmt" + "time" + + goplugin "github.com/hashicorp/go-plugin" + + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" +) + +// HostEventServiceID is the well-known broker service ID shared between host and plugin. +const HostEventServiceID = uint32(1) + +// EventEmitter is implemented by types that can emit events to the host. +type EventEmitter interface { + Emit(ctx context.Context, eventType string, payload []byte, metadata map[string]string) error +} + +// BrokerAwarePlugin is an optional interface for plugins that want to emit events to the host. +// Plugins implementing this interface receive a HostClient during GRPCServer setup. +type BrokerAwarePlugin interface { + SetHostClient(client *HostClient) +} + +// HostClient wraps the gRPC connection to the host's HostEventService. +type HostClient struct { + client pluginv1.HostEventServiceClient + pluginName string +} + +// NewHostClient dials the host's HostEventService via the broker and returns a HostClient. +// Returns nil when broker is nil (backward compatibility). +func NewHostClient(broker *goplugin.GRPCBroker, pluginName string) *HostClient { + if broker == nil { + return nil + } + conn, err := broker.Dial(HostEventServiceID) + if err != nil { + return nil + } + return &HostClient{ + client: pluginv1.NewHostEventServiceClient(conn), + pluginName: pluginName, + } +} + +// Emit sends an event to the host. +func (h *HostClient) Emit(ctx context.Context, eventType string, payload []byte, metadata map[string]string) error { + if h.client == nil { + return errors.New("host client not initialized") + } + resp, err := h.client.Emit(ctx, &pluginv1.EmitRequest{ + EventType: eventType, + SourcePlugin: h.pluginName, + Payload: payload, + Metadata: metadata, + TimestampUnixNanos: time.Now().UnixNano(), + }) + if err != nil { + return fmt.Errorf("emit RPC failed: %w", err) + } + if !resp.Success { + return errors.New(resp.ErrorMessage) + } + return nil +} diff --git a/pkg/plugin/sdk/host_client_test.go b/pkg/plugin/sdk/host_client_test.go new file mode 100644 index 00000000..daa5ce33 --- /dev/null +++ b/pkg/plugin/sdk/host_client_test.go @@ -0,0 +1,224 @@ +package sdk + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + pluginv1 "github.com/awf-project/cli/proto/plugin/v1" +) + +// mockHostEventServiceClient implements pluginv1.HostEventServiceClient for testing. +type mockHostEventServiceClient struct { + emitFunc func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) +} + +func (m *mockHostEventServiceClient) Emit(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + if m.emitFunc != nil { + return m.emitFunc(ctx, in, opts...) + } + return &pluginv1.EmitResponse{Success: true}, nil +} + +func TestEventEmitterInterfaceExists(t *testing.T) { + // Verify that EventEmitter interface is defined with correct method signature. + var _ EventEmitter = (*HostClient)(nil) +} + +func TestNewHostClient_ReturnsNilWhenBrokerIsNil(t *testing.T) { + client := NewHostClient(nil, "test-plugin") + assert.Nil(t, client) +} + +func TestNewHostClient_UsesHostEventServiceID(t *testing.T) { + // This test verifies that NewHostClient calls broker.Dial with HostEventServiceID = 1. + // The actual connection is tested via integration tests. + // For unit testing, we verify the constant is correct. + assert.Equal(t, uint32(1), HostEventServiceID) +} + +func TestHostClient_EmitSuccess(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + assert.Equal(t, "user.signup", in.EventType) + assert.Equal(t, "test-plugin", in.SourcePlugin) + assert.Equal(t, []byte("user_id=123"), in.Payload) + assert.Equal(t, map[string]string{"version": "1"}, in.Metadata) + assert.Greater(t, in.TimestampUnixNanos, int64(0)) + return &pluginv1.EmitResponse{Success: true}, nil + }, + } + + // Create HostClient with mock client via direct struct assignment. + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + ctx := context.Background() + err := hostClient.Emit(ctx, "user.signup", []byte("user_id=123"), map[string]string{"version": "1"}) + assert.NoError(t, err) +} + +func TestHostClient_EmitFailureWithErrorMessage(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + return &pluginv1.EmitResponse{ + Success: false, + ErrorMessage: "host internal error", + }, nil + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + ctx := context.Background() + err := hostClient.Emit(ctx, "test.event", []byte("data"), map[string]string{}) + require.Error(t, err, "Emit should return error when response.Success is false") + assert.Contains(t, err.Error(), "host internal error") +} + +func TestHostClient_EmitTransportError(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + return nil, errors.New("network error") + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + ctx := context.Background() + err := hostClient.Emit(ctx, "test.event", []byte("data"), map[string]string{}) + require.Error(t, err, "Emit should return error on RPC failure") + assert.Contains(t, err.Error(), "emit RPC failed") +} + +func TestHostClient_EmitWithContextCancellation(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(1 * time.Second): + return &pluginv1.EmitResponse{Success: true}, nil + } + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := hostClient.Emit(ctx, "test.event", []byte("data"), map[string]string{}) + assert.Error(t, err) +} + +func TestHostClient_EmitIncludesSourcePlugin(t *testing.T) { + pluginName := "my-custom-plugin" + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + assert.Equal(t, pluginName, in.SourcePlugin) + return &pluginv1.EmitResponse{Success: true}, nil + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: pluginName, + } + + err := hostClient.Emit(context.Background(), "event", []byte{}, map[string]string{}) + assert.NoError(t, err) +} + +func TestHostClient_EmitWithEmptyPayload(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + assert.Equal(t, []byte{}, in.Payload) + return &pluginv1.EmitResponse{Success: true}, nil + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + err := hostClient.Emit(context.Background(), "test.event", []byte{}, map[string]string{}) + assert.NoError(t, err) +} + +func TestHostClient_EmitWithEmptyMetadata(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + assert.Empty(t, in.Metadata) + return &pluginv1.EmitResponse{Success: true}, nil + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + err := hostClient.Emit(context.Background(), "test.event", []byte("data"), map[string]string{}) + assert.NoError(t, err) +} + +func TestHostClient_EmitIncludesTimestamp(t *testing.T) { + mockClient := &mockHostEventServiceClient{ + emitFunc: func(ctx context.Context, in *pluginv1.EmitRequest, opts ...grpc.CallOption) (*pluginv1.EmitResponse, error) { + // Verify timestamp is recent (within 1 second). + ts := in.TimestampUnixNanos / 1e9 + now := time.Now().Unix() + assert.True(t, ts >= now-1 && ts <= now+1, "timestamp should be recent") + return &pluginv1.EmitResponse{Success: true}, nil + }, + } + + hostClient := &HostClient{ + client: mockClient, + pluginName: "test-plugin", + } + + err := hostClient.Emit(context.Background(), "test.event", []byte("data"), map[string]string{}) + assert.NoError(t, err) +} + +func TestBrokerAwarePluginInterfaceExists(t *testing.T) { + // Verify that BrokerAwarePlugin interface is defined with SetHostClient method. + var _ BrokerAwarePlugin +} + +func TestHostEventServiceIDConstant(t *testing.T) { + assert.Equal(t, uint32(1), HostEventServiceID) +} + +func TestHostClient_NilClientHandling(t *testing.T) { + // Ensure that HostClient handles nil client gracefully or panics appropriately. + // This tests the case where NewHostClient returns a HostClient with a nil client field. + hostClient := &HostClient{ + client: nil, + pluginName: "test-plugin", + } + + // The current implementation returns nil from Emit, but a real implementation + // should handle this case. This test validates that at least it doesn't panic unexpectedly. + ctx := context.Background() + _ = hostClient.Emit(ctx, "test.event", []byte("data"), map[string]string{}) +} diff --git a/proto/plugin/v1/plugin.pb.go b/proto/plugin/v1/plugin.pb.go index 815c346e..c5ec6276 100644 --- a/proto/plugin/v1/plugin.pb.go +++ b/proto/plugin/v1/plugin.pb.go @@ -1479,6 +1479,286 @@ func (x *HandleEventResponse) GetEmittedEvents() []*HandleEventRequest { return nil } +type EmitRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + EventType string `protobuf:"bytes,1,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + SourcePlugin string `protobuf:"bytes,3,opt,name=source_plugin,json=sourcePlugin,proto3" json:"source_plugin,omitempty"` + PropagationDepth int32 `protobuf:"varint,4,opt,name=propagation_depth,json=propagationDepth,proto3" json:"propagation_depth,omitempty"` + TimestampUnixNanos int64 `protobuf:"varint,5,opt,name=timestamp_unix_nanos,json=timestampUnixNanos,proto3" json:"timestamp_unix_nanos,omitempty"` + Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EmitRequest) Reset() { + *x = EmitRequest{} + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[27] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EmitRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmitRequest) ProtoMessage() {} + +func (x *EmitRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[27] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmitRequest.ProtoReflect.Descriptor instead. +func (*EmitRequest) Descriptor() ([]byte, []int) { + return file_proto_plugin_v1_plugin_proto_rawDescGZIP(), []int{27} +} + +func (x *EmitRequest) GetEventType() string { + if x != nil { + return x.EventType + } + return "" +} + +func (x *EmitRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *EmitRequest) GetSourcePlugin() string { + if x != nil { + return x.SourcePlugin + } + return "" +} + +func (x *EmitRequest) GetPropagationDepth() int32 { + if x != nil { + return x.PropagationDepth + } + return 0 +} + +func (x *EmitRequest) GetTimestampUnixNanos() int64 { + if x != nil { + return x.TimestampUnixNanos + } + return 0 +} + +func (x *EmitRequest) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +type EmitResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` + EventId string `protobuf:"bytes,3,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EmitResponse) Reset() { + *x = EmitResponse{} + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[28] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EmitResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmitResponse) ProtoMessage() {} + +func (x *EmitResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[28] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmitResponse.ProtoReflect.Descriptor instead. +func (*EmitResponse) Descriptor() ([]byte, []int) { + return file_proto_plugin_v1_plugin_proto_rawDescGZIP(), []int{28} +} + +func (x *EmitResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *EmitResponse) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + +func (x *EmitResponse) GetEventId() string { + if x != nil { + return x.EventId + } + return "" +} + +type EventStreamMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + TimestampUnixNanos int64 `protobuf:"varint,3,opt,name=timestamp_unix_nanos,json=timestampUnixNanos,proto3" json:"timestamp_unix_nanos,omitempty"` + Source string `protobuf:"bytes,4,opt,name=source,proto3" json:"source,omitempty"` + Metadata map[string]string `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Payload []byte `protobuf:"bytes,6,opt,name=payload,proto3" json:"payload,omitempty"` + PropagationDepth int32 `protobuf:"varint,7,opt,name=propagation_depth,json=propagationDepth,proto3" json:"propagation_depth,omitempty"` + SequenceNumber uint64 `protobuf:"varint,8,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EventStreamMessage) Reset() { + *x = EventStreamMessage{} + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[29] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EventStreamMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EventStreamMessage) ProtoMessage() {} + +func (x *EventStreamMessage) ProtoReflect() protoreflect.Message { + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[29] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EventStreamMessage.ProtoReflect.Descriptor instead. +func (*EventStreamMessage) Descriptor() ([]byte, []int) { + return file_proto_plugin_v1_plugin_proto_rawDescGZIP(), []int{29} +} + +func (x *EventStreamMessage) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *EventStreamMessage) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *EventStreamMessage) GetTimestampUnixNanos() int64 { + if x != nil { + return x.TimestampUnixNanos + } + return 0 +} + +func (x *EventStreamMessage) GetSource() string { + if x != nil { + return x.Source + } + return "" +} + +func (x *EventStreamMessage) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *EventStreamMessage) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *EventStreamMessage) GetPropagationDepth() int32 { + if x != nil { + return x.PropagationDepth + } + return 0 +} + +func (x *EventStreamMessage) GetSequenceNumber() uint64 { + if x != nil { + return x.SequenceNumber + } + return 0 +} + +type StreamEventsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamEventsResponse) Reset() { + *x = StreamEventsResponse{} + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[30] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamEventsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamEventsResponse) ProtoMessage() {} + +func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_plugin_v1_plugin_proto_msgTypes[30] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamEventsResponse.ProtoReflect.Descriptor instead. +func (*StreamEventsResponse) Descriptor() ([]byte, []int) { + return file_proto_plugin_v1_plugin_proto_rawDescGZIP(), []int{30} +} + var File_proto_plugin_v1_plugin_proto protoreflect.FileDescriptor const file_proto_plugin_v1_plugin_proto_rawDesc = "" + @@ -1581,7 +1861,35 @@ const file_proto_plugin_v1_plugin_proto_rawDesc = "" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"[\n" + "\x13HandleEventResponse\x12D\n" + - "\x0eemitted_events\x18\x01 \x03(\v2\x1d.plugin.v1.HandleEventRequestR\remittedEvents*a\n" + + "\x0eemitted_events\x18\x01 \x03(\v2\x1d.plugin.v1.HandleEventRequestR\remittedEvents\"\xc9\x02\n" + + "\vEmitRequest\x12\x1d\n" + + "\n" + + "event_type\x18\x01 \x01(\tR\teventType\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x12#\n" + + "\rsource_plugin\x18\x03 \x01(\tR\fsourcePlugin\x12+\n" + + "\x11propagation_depth\x18\x04 \x01(\x05R\x10propagationDepth\x120\n" + + "\x14timestamp_unix_nanos\x18\x05 \x01(\x03R\x12timestampUnixNanos\x12@\n" + + "\bmetadata\x18\x06 \x03(\v2$.plugin.v1.EmitRequest.MetadataEntryR\bmetadata\x1a;\n" + + "\rMetadataEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"h\n" + + "\fEmitResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\x12#\n" + + "\rerror_message\x18\x02 \x01(\tR\ferrorMessage\x12\x19\n" + + "\bevent_id\x18\x03 \x01(\tR\aeventId\"\xf8\x02\n" + + "\x12EventStreamMessage\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + + "\x04type\x18\x02 \x01(\tR\x04type\x120\n" + + "\x14timestamp_unix_nanos\x18\x03 \x01(\x03R\x12timestampUnixNanos\x12\x16\n" + + "\x06source\x18\x04 \x01(\tR\x06source\x12G\n" + + "\bmetadata\x18\x05 \x03(\v2+.plugin.v1.EventStreamMessage.MetadataEntryR\bmetadata\x12\x18\n" + + "\apayload\x18\x06 \x01(\fR\apayload\x12+\n" + + "\x11propagation_depth\x18\a \x01(\x05R\x10propagationDepth\x12'\n" + + "\x0fsequence_number\x18\b \x01(\x04R\x0esequenceNumber\x1a;\n" + + "\rMetadataEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x16\n" + + "\x14StreamEventsResponse*a\n" + "\bSeverity\x12\x18\n" + "\x14SEVERITY_UNSPECIFIED\x10\x00\x12\x14\n" + "\x10SEVERITY_WARNING\x10\x01\x12\x12\n" + @@ -1600,9 +1908,12 @@ const file_proto_plugin_v1_plugin_proto_rawDesc = "" + "\fValidateStep\x12\x1e.plugin.v1.ValidateStepRequest\x1a\x1f.plugin.v1.ValidateStepResponse2\xb3\x01\n" + "\x0fStepTypeService\x12R\n" + "\rListStepTypes\x12\x1f.plugin.v1.ListStepTypesRequest\x1a .plugin.v1.ListStepTypesResponse\x12L\n" + - "\vExecuteStep\x12\x1d.plugin.v1.ExecuteStepRequest\x1a\x1e.plugin.v1.ExecuteStepResponse2\\\n" + + "\vExecuteStep\x12\x1d.plugin.v1.ExecuteStepRequest\x1a\x1e.plugin.v1.ExecuteStepResponse2\xae\x01\n" + "\fEventService\x12L\n" + - "\vHandleEvent\x12\x1d.plugin.v1.HandleEventRequest\x1a\x1e.plugin.v1.HandleEventResponseB5Z3github.com/awf-project/cli/proto/plugin/v1;pluginv1b\x06proto3" + "\vHandleEvent\x12\x1d.plugin.v1.HandleEventRequest\x1a\x1e.plugin.v1.HandleEventResponse\x12P\n" + + "\fStreamEvents\x12\x1d.plugin.v1.EventStreamMessage\x1a\x1f.plugin.v1.StreamEventsResponse(\x012K\n" + + "\x10HostEventService\x127\n" + + "\x04Emit\x12\x16.plugin.v1.EmitRequest\x1a\x17.plugin.v1.EmitResponseB5Z3github.com/awf-project/cli/proto/plugin/v1;pluginv1b\x06proto3" var ( file_proto_plugin_v1_plugin_proto_rawDescOnce sync.Once @@ -1617,7 +1928,7 @@ func file_proto_plugin_v1_plugin_proto_rawDescGZIP() []byte { } var file_proto_plugin_v1_plugin_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_proto_plugin_v1_plugin_proto_msgTypes = make([]protoimpl.MessageInfo, 31) +var file_proto_plugin_v1_plugin_proto_msgTypes = make([]protoimpl.MessageInfo, 37) var file_proto_plugin_v1_plugin_proto_goTypes = []any{ (Severity)(0), // 0: plugin.v1.Severity (*GetInfoRequest)(nil), // 1: plugin.v1.GetInfoRequest @@ -1647,52 +1958,64 @@ var file_proto_plugin_v1_plugin_proto_goTypes = []any{ (*ExecuteStepResponse)(nil), // 25: plugin.v1.ExecuteStepResponse (*HandleEventRequest)(nil), // 26: plugin.v1.HandleEventRequest (*HandleEventResponse)(nil), // 27: plugin.v1.HandleEventResponse - nil, // 28: plugin.v1.InitRequest.ConfigEntry - nil, // 29: plugin.v1.ExecuteRequest.InputsEntry - nil, // 30: plugin.v1.ExecuteResponse.DataEntry - nil, // 31: plugin.v1.HandleEventRequest.MetadataEntry + (*EmitRequest)(nil), // 28: plugin.v1.EmitRequest + (*EmitResponse)(nil), // 29: plugin.v1.EmitResponse + (*EventStreamMessage)(nil), // 30: plugin.v1.EventStreamMessage + (*StreamEventsResponse)(nil), // 31: plugin.v1.StreamEventsResponse + nil, // 32: plugin.v1.InitRequest.ConfigEntry + nil, // 33: plugin.v1.ExecuteRequest.InputsEntry + nil, // 34: plugin.v1.ExecuteResponse.DataEntry + nil, // 35: plugin.v1.HandleEventRequest.MetadataEntry + nil, // 36: plugin.v1.EmitRequest.MetadataEntry + nil, // 37: plugin.v1.EventStreamMessage.MetadataEntry } var file_proto_plugin_v1_plugin_proto_depIdxs = []int32{ - 28, // 0: plugin.v1.InitRequest.config:type_name -> plugin.v1.InitRequest.ConfigEntry + 32, // 0: plugin.v1.InitRequest.config:type_name -> plugin.v1.InitRequest.ConfigEntry 13, // 1: plugin.v1.ListOperationsResponse.operations:type_name -> plugin.v1.OperationSchema 13, // 2: plugin.v1.GetOperationResponse.operation:type_name -> plugin.v1.OperationSchema - 29, // 3: plugin.v1.ExecuteRequest.inputs:type_name -> plugin.v1.ExecuteRequest.InputsEntry - 30, // 4: plugin.v1.ExecuteResponse.data:type_name -> plugin.v1.ExecuteResponse.DataEntry + 33, // 3: plugin.v1.ExecuteRequest.inputs:type_name -> plugin.v1.ExecuteRequest.InputsEntry + 34, // 4: plugin.v1.ExecuteResponse.data:type_name -> plugin.v1.ExecuteResponse.DataEntry 14, // 5: plugin.v1.OperationSchema.inputs:type_name -> plugin.v1.InputSchema 15, // 6: plugin.v1.OperationSchema.outputs:type_name -> plugin.v1.OutputSchema 0, // 7: plugin.v1.ValidationIssue.severity:type_name -> plugin.v1.Severity 16, // 8: plugin.v1.ValidateWorkflowResponse.issues:type_name -> plugin.v1.ValidationIssue 16, // 9: plugin.v1.ValidateStepResponse.issues:type_name -> plugin.v1.ValidationIssue 22, // 10: plugin.v1.ListStepTypesResponse.step_types:type_name -> plugin.v1.StepTypeInfo - 31, // 11: plugin.v1.HandleEventRequest.metadata:type_name -> plugin.v1.HandleEventRequest.MetadataEntry + 35, // 11: plugin.v1.HandleEventRequest.metadata:type_name -> plugin.v1.HandleEventRequest.MetadataEntry 26, // 12: plugin.v1.HandleEventResponse.emitted_events:type_name -> plugin.v1.HandleEventRequest - 1, // 13: plugin.v1.PluginService.GetInfo:input_type -> plugin.v1.GetInfoRequest - 3, // 14: plugin.v1.PluginService.Init:input_type -> plugin.v1.InitRequest - 5, // 15: plugin.v1.PluginService.Shutdown:input_type -> plugin.v1.ShutdownRequest - 7, // 16: plugin.v1.OperationService.ListOperations:input_type -> plugin.v1.ListOperationsRequest - 9, // 17: plugin.v1.OperationService.GetOperation:input_type -> plugin.v1.GetOperationRequest - 11, // 18: plugin.v1.OperationService.Execute:input_type -> plugin.v1.ExecuteRequest - 17, // 19: plugin.v1.ValidatorService.ValidateWorkflow:input_type -> plugin.v1.ValidateWorkflowRequest - 19, // 20: plugin.v1.ValidatorService.ValidateStep:input_type -> plugin.v1.ValidateStepRequest - 21, // 21: plugin.v1.StepTypeService.ListStepTypes:input_type -> plugin.v1.ListStepTypesRequest - 24, // 22: plugin.v1.StepTypeService.ExecuteStep:input_type -> plugin.v1.ExecuteStepRequest - 26, // 23: plugin.v1.EventService.HandleEvent:input_type -> plugin.v1.HandleEventRequest - 2, // 24: plugin.v1.PluginService.GetInfo:output_type -> plugin.v1.GetInfoResponse - 4, // 25: plugin.v1.PluginService.Init:output_type -> plugin.v1.InitResponse - 6, // 26: plugin.v1.PluginService.Shutdown:output_type -> plugin.v1.ShutdownResponse - 8, // 27: plugin.v1.OperationService.ListOperations:output_type -> plugin.v1.ListOperationsResponse - 10, // 28: plugin.v1.OperationService.GetOperation:output_type -> plugin.v1.GetOperationResponse - 12, // 29: plugin.v1.OperationService.Execute:output_type -> plugin.v1.ExecuteResponse - 18, // 30: plugin.v1.ValidatorService.ValidateWorkflow:output_type -> plugin.v1.ValidateWorkflowResponse - 20, // 31: plugin.v1.ValidatorService.ValidateStep:output_type -> plugin.v1.ValidateStepResponse - 23, // 32: plugin.v1.StepTypeService.ListStepTypes:output_type -> plugin.v1.ListStepTypesResponse - 25, // 33: plugin.v1.StepTypeService.ExecuteStep:output_type -> plugin.v1.ExecuteStepResponse - 27, // 34: plugin.v1.EventService.HandleEvent:output_type -> plugin.v1.HandleEventResponse - 24, // [24:35] is the sub-list for method output_type - 13, // [13:24] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 36, // 13: plugin.v1.EmitRequest.metadata:type_name -> plugin.v1.EmitRequest.MetadataEntry + 37, // 14: plugin.v1.EventStreamMessage.metadata:type_name -> plugin.v1.EventStreamMessage.MetadataEntry + 1, // 15: plugin.v1.PluginService.GetInfo:input_type -> plugin.v1.GetInfoRequest + 3, // 16: plugin.v1.PluginService.Init:input_type -> plugin.v1.InitRequest + 5, // 17: plugin.v1.PluginService.Shutdown:input_type -> plugin.v1.ShutdownRequest + 7, // 18: plugin.v1.OperationService.ListOperations:input_type -> plugin.v1.ListOperationsRequest + 9, // 19: plugin.v1.OperationService.GetOperation:input_type -> plugin.v1.GetOperationRequest + 11, // 20: plugin.v1.OperationService.Execute:input_type -> plugin.v1.ExecuteRequest + 17, // 21: plugin.v1.ValidatorService.ValidateWorkflow:input_type -> plugin.v1.ValidateWorkflowRequest + 19, // 22: plugin.v1.ValidatorService.ValidateStep:input_type -> plugin.v1.ValidateStepRequest + 21, // 23: plugin.v1.StepTypeService.ListStepTypes:input_type -> plugin.v1.ListStepTypesRequest + 24, // 24: plugin.v1.StepTypeService.ExecuteStep:input_type -> plugin.v1.ExecuteStepRequest + 26, // 25: plugin.v1.EventService.HandleEvent:input_type -> plugin.v1.HandleEventRequest + 30, // 26: plugin.v1.EventService.StreamEvents:input_type -> plugin.v1.EventStreamMessage + 28, // 27: plugin.v1.HostEventService.Emit:input_type -> plugin.v1.EmitRequest + 2, // 28: plugin.v1.PluginService.GetInfo:output_type -> plugin.v1.GetInfoResponse + 4, // 29: plugin.v1.PluginService.Init:output_type -> plugin.v1.InitResponse + 6, // 30: plugin.v1.PluginService.Shutdown:output_type -> plugin.v1.ShutdownResponse + 8, // 31: plugin.v1.OperationService.ListOperations:output_type -> plugin.v1.ListOperationsResponse + 10, // 32: plugin.v1.OperationService.GetOperation:output_type -> plugin.v1.GetOperationResponse + 12, // 33: plugin.v1.OperationService.Execute:output_type -> plugin.v1.ExecuteResponse + 18, // 34: plugin.v1.ValidatorService.ValidateWorkflow:output_type -> plugin.v1.ValidateWorkflowResponse + 20, // 35: plugin.v1.ValidatorService.ValidateStep:output_type -> plugin.v1.ValidateStepResponse + 23, // 36: plugin.v1.StepTypeService.ListStepTypes:output_type -> plugin.v1.ListStepTypesResponse + 25, // 37: plugin.v1.StepTypeService.ExecuteStep:output_type -> plugin.v1.ExecuteStepResponse + 27, // 38: plugin.v1.EventService.HandleEvent:output_type -> plugin.v1.HandleEventResponse + 31, // 39: plugin.v1.EventService.StreamEvents:output_type -> plugin.v1.StreamEventsResponse + 29, // 40: plugin.v1.HostEventService.Emit:output_type -> plugin.v1.EmitResponse + 28, // [28:41] is the sub-list for method output_type + 15, // [15:28] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_proto_plugin_v1_plugin_proto_init() } @@ -1706,9 +2029,9 @@ func file_proto_plugin_v1_plugin_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_plugin_v1_plugin_proto_rawDesc), len(file_proto_plugin_v1_plugin_proto_rawDesc)), NumEnums: 1, - NumMessages: 31, + NumMessages: 37, NumExtensions: 0, - NumServices: 5, + NumServices: 6, }, GoTypes: file_proto_plugin_v1_plugin_proto_goTypes, DependencyIndexes: file_proto_plugin_v1_plugin_proto_depIdxs, diff --git a/proto/plugin/v1/plugin.proto b/proto/plugin/v1/plugin.proto index 43994bc3..99d80211 100644 --- a/proto/plugin/v1/plugin.proto +++ b/proto/plugin/v1/plugin.proto @@ -152,6 +152,11 @@ message ExecuteStepResponse { service EventService { rpc HandleEvent(HandleEventRequest) returns (HandleEventResponse); + rpc StreamEvents(stream EventStreamMessage) returns (StreamEventsResponse); +} + +service HostEventService { + rpc Emit(EmitRequest) returns (EmitResponse); } message HandleEventRequest { @@ -167,3 +172,31 @@ message HandleEventRequest { message HandleEventResponse { repeated HandleEventRequest emitted_events = 1; } + +message EmitRequest { + string event_type = 1; + bytes payload = 2; + string source_plugin = 3; + int32 propagation_depth = 4; + int64 timestamp_unix_nanos = 5; + map metadata = 6; +} + +message EmitResponse { + bool success = 1; + string error_message = 2; + string event_id = 3; +} + +message EventStreamMessage { + string id = 1; + string type = 2; + int64 timestamp_unix_nanos = 3; + string source = 4; + map metadata = 5; + bytes payload = 6; + int32 propagation_depth = 7; + uint64 sequence_number = 8; +} + +message StreamEventsResponse {} diff --git a/proto/plugin/v1/plugin_grpc.pb.go b/proto/plugin/v1/plugin_grpc.pb.go index 97c48639..01c3ca80 100644 --- a/proto/plugin/v1/plugin_grpc.pb.go +++ b/proto/plugin/v1/plugin_grpc.pb.go @@ -655,7 +655,8 @@ var StepTypeService_ServiceDesc = grpc.ServiceDesc{ } const ( - EventService_HandleEvent_FullMethodName = "/plugin.v1.EventService/HandleEvent" + EventService_HandleEvent_FullMethodName = "/plugin.v1.EventService/HandleEvent" + EventService_StreamEvents_FullMethodName = "/plugin.v1.EventService/StreamEvents" ) // EventServiceClient is the client API for EventService service. @@ -663,6 +664,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type EventServiceClient interface { HandleEvent(ctx context.Context, in *HandleEventRequest, opts ...grpc.CallOption) (*HandleEventResponse, error) + StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EventStreamMessage, StreamEventsResponse], error) } type eventServiceClient struct { @@ -683,11 +685,25 @@ func (c *eventServiceClient) HandleEvent(ctx context.Context, in *HandleEventReq return out, nil } +func (c *eventServiceClient) StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EventStreamMessage, StreamEventsResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &EventService_ServiceDesc.Streams[0], EventService_StreamEvents_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[EventStreamMessage, StreamEventsResponse]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type EventService_StreamEventsClient = grpc.ClientStreamingClient[EventStreamMessage, StreamEventsResponse] + // EventServiceServer is the server API for EventService service. // All implementations must embed UnimplementedEventServiceServer // for forward compatibility. type EventServiceServer interface { HandleEvent(context.Context, *HandleEventRequest) (*HandleEventResponse, error) + StreamEvents(grpc.ClientStreamingServer[EventStreamMessage, StreamEventsResponse]) error mustEmbedUnimplementedEventServiceServer() } @@ -701,6 +717,9 @@ type UnimplementedEventServiceServer struct{} func (UnimplementedEventServiceServer) HandleEvent(context.Context, *HandleEventRequest) (*HandleEventResponse, error) { return nil, status.Error(codes.Unimplemented, "method HandleEvent not implemented") } +func (UnimplementedEventServiceServer) StreamEvents(grpc.ClientStreamingServer[EventStreamMessage, StreamEventsResponse]) error { + return status.Error(codes.Unimplemented, "method StreamEvents not implemented") +} func (UnimplementedEventServiceServer) mustEmbedUnimplementedEventServiceServer() {} func (UnimplementedEventServiceServer) testEmbeddedByValue() {} @@ -740,6 +759,13 @@ func _EventService_HandleEvent_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _EventService_StreamEvents_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(EventServiceServer).StreamEvents(&grpc.GenericServerStream[EventStreamMessage, StreamEventsResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type EventService_StreamEventsServer = grpc.ClientStreamingServer[EventStreamMessage, StreamEventsResponse] + // EventService_ServiceDesc is the grpc.ServiceDesc for EventService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -752,6 +778,114 @@ var EventService_ServiceDesc = grpc.ServiceDesc{ Handler: _EventService_HandleEvent_Handler, }, }, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamEvents", + Handler: _EventService_StreamEvents_Handler, + ClientStreams: true, + }, + }, + Metadata: "proto/plugin/v1/plugin.proto", +} + +const ( + HostEventService_Emit_FullMethodName = "/plugin.v1.HostEventService/Emit" +) + +// HostEventServiceClient is the client API for HostEventService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HostEventServiceClient interface { + Emit(ctx context.Context, in *EmitRequest, opts ...grpc.CallOption) (*EmitResponse, error) +} + +type hostEventServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewHostEventServiceClient(cc grpc.ClientConnInterface) HostEventServiceClient { + return &hostEventServiceClient{cc} +} + +func (c *hostEventServiceClient) Emit(ctx context.Context, in *EmitRequest, opts ...grpc.CallOption) (*EmitResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(EmitResponse) + err := c.cc.Invoke(ctx, HostEventService_Emit_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HostEventServiceServer is the server API for HostEventService service. +// All implementations must embed UnimplementedHostEventServiceServer +// for forward compatibility. +type HostEventServiceServer interface { + Emit(context.Context, *EmitRequest) (*EmitResponse, error) + mustEmbedUnimplementedHostEventServiceServer() +} + +// UnimplementedHostEventServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedHostEventServiceServer struct{} + +func (UnimplementedHostEventServiceServer) Emit(context.Context, *EmitRequest) (*EmitResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Emit not implemented") +} +func (UnimplementedHostEventServiceServer) mustEmbedUnimplementedHostEventServiceServer() {} +func (UnimplementedHostEventServiceServer) testEmbeddedByValue() {} + +// UnsafeHostEventServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HostEventServiceServer will +// result in compilation errors. +type UnsafeHostEventServiceServer interface { + mustEmbedUnimplementedHostEventServiceServer() +} + +func RegisterHostEventServiceServer(s grpc.ServiceRegistrar, srv HostEventServiceServer) { + // If the following call panics, it indicates UnimplementedHostEventServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&HostEventService_ServiceDesc, srv) +} + +func _HostEventService_Emit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EmitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HostEventServiceServer).Emit(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HostEventService_Emit_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HostEventServiceServer).Emit(ctx, req.(*EmitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// HostEventService_ServiceDesc is the grpc.ServiceDesc for HostEventService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var HostEventService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "plugin.v1.HostEventService", + HandlerType: (*HostEventServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Emit", + Handler: _HostEventService_Emit_Handler, + }, + }, Streams: []grpc.StreamDesc{}, Metadata: "proto/plugin/v1/plugin.proto", } diff --git a/proto/plugin/v1/plugin_test.go b/proto/plugin/v1/plugin_test.go index 6df67193..7353dd9d 100644 --- a/proto/plugin/v1/plugin_test.go +++ b/proto/plugin/v1/plugin_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" ) @@ -743,6 +744,263 @@ func TestExecuteStepResponse_ErrorCase(t *testing.T) { assert.Equal(t, int32(1), resp2.ExitCode) } +// TestHandleEventRequest_HappyPath verifies HandleEventRequest with all fields. +func TestHandleEventRequest_HappyPath(t *testing.T) { + req := &HandleEventRequest{ + Id: "event-123", + Type: "workflow.step.completed", + TimestampUnixNanos: 1700000000000000000, + Source: "plugin-a", + Metadata: map[string]string{ + "correlation_id": "corr-456", + }, + Payload: []byte(`{"result":"ok"}`), + PropagationDepth: 2, + } + require.NotNil(t, req) + + assert.Equal(t, "event-123", req.Id) + assert.Equal(t, "workflow.step.completed", req.Type) + assert.Equal(t, int64(1700000000000000000), req.TimestampUnixNanos) + assert.Equal(t, "plugin-a", req.Source) + assert.Equal(t, "corr-456", req.Metadata["correlation_id"]) + assert.Equal(t, []byte(`{"result":"ok"}`), req.Payload) + assert.Equal(t, int32(2), req.PropagationDepth) + + data, err := proto.Marshal(req) + require.NoError(t, err) + + req2 := &HandleEventRequest{} + err = proto.Unmarshal(data, req2) + require.NoError(t, err) + + assert.Equal(t, req.Id, req2.Id) + assert.Equal(t, req.Type, req2.Type) + assert.Equal(t, req.TimestampUnixNanos, req2.TimestampUnixNanos) + assert.Equal(t, req.Source, req2.Source) + assert.Equal(t, req.Metadata, req2.Metadata) + assert.Equal(t, req.Payload, req2.Payload) + assert.Equal(t, req.PropagationDepth, req2.PropagationDepth) +} + +// TestHandleEventResponse_HappyPath verifies HandleEventResponse with emitted events. +func TestHandleEventResponse_HappyPath(t *testing.T) { + resp := &HandleEventResponse{ + EmittedEvents: []*HandleEventRequest{ + { + Id: "child-event-1", + Type: "workflow.step.started", + Source: "plugin-b", + }, + }, + } + require.NotNil(t, resp) + + assert.Len(t, resp.EmittedEvents, 1) + assert.Equal(t, "child-event-1", resp.EmittedEvents[0].Id) + assert.Equal(t, "workflow.step.started", resp.EmittedEvents[0].Type) + + data, err := proto.Marshal(resp) + require.NoError(t, err) + + resp2 := &HandleEventResponse{} + err = proto.Unmarshal(data, resp2) + require.NoError(t, err) + + assert.Len(t, resp2.EmittedEvents, 1) + assert.Equal(t, resp.EmittedEvents[0].Id, resp2.EmittedEvents[0].Id) +} + +// TestEmitRequest_HappyPath verifies EmitRequest has all required fields. +func TestEmitRequest_HappyPath(t *testing.T) { + req := &EmitRequest{ + EventType: "workflow.step.completed", + Payload: []byte(`{"output":"done"}`), + SourcePlugin: "my-plugin", + PropagationDepth: 1, + TimestampUnixNanos: 1700000000000000000, + Metadata: map[string]string{ + "trace_id": "trace-abc", + }, + } + require.NotNil(t, req) + + assert.Equal(t, "workflow.step.completed", req.EventType) + assert.Equal(t, []byte(`{"output":"done"}`), req.Payload) + assert.Equal(t, "my-plugin", req.SourcePlugin) + assert.Equal(t, int32(1), req.PropagationDepth) + assert.Equal(t, int64(1700000000000000000), req.TimestampUnixNanos) + assert.Equal(t, "trace-abc", req.Metadata["trace_id"]) + + data, err := proto.Marshal(req) + require.NoError(t, err) + + req2 := &EmitRequest{} + err = proto.Unmarshal(data, req2) + require.NoError(t, err) + + assert.Equal(t, req.EventType, req2.EventType) + assert.Equal(t, req.Payload, req2.Payload) + assert.Equal(t, req.SourcePlugin, req2.SourcePlugin) + assert.Equal(t, req.PropagationDepth, req2.PropagationDepth) + assert.Equal(t, req.TimestampUnixNanos, req2.TimestampUnixNanos) + assert.Equal(t, req.Metadata, req2.Metadata) +} + +// TestEmitResponse_HappyPath verifies EmitResponse success fields. +func TestEmitResponse_HappyPath(t *testing.T) { + resp := &EmitResponse{ + Success: true, + ErrorMessage: "", + EventId: "evt-789", + } + require.NotNil(t, resp) + + assert.True(t, resp.Success) + assert.Empty(t, resp.ErrorMessage) + assert.Equal(t, "evt-789", resp.EventId) + + data, err := proto.Marshal(resp) + require.NoError(t, err) + + resp2 := &EmitResponse{} + err = proto.Unmarshal(data, resp2) + require.NoError(t, err) + + assert.True(t, resp2.Success) + assert.Equal(t, "evt-789", resp2.EventId) +} + +// TestEmitResponse_ErrorCase verifies EmitResponse when emission fails. +func TestEmitResponse_ErrorCase(t *testing.T) { + resp := &EmitResponse{ + Success: false, + ErrorMessage: "propagation depth exceeded", + EventId: "", + } + require.NotNil(t, resp) + + assert.False(t, resp.Success) + assert.Equal(t, "propagation depth exceeded", resp.ErrorMessage) + assert.Empty(t, resp.EventId) + + data, err := proto.Marshal(resp) + require.NoError(t, err) + + resp2 := &EmitResponse{} + err = proto.Unmarshal(data, resp2) + require.NoError(t, err) + + assert.False(t, resp2.Success) + assert.Equal(t, resp.ErrorMessage, resp2.ErrorMessage) +} + +// TestEventStreamMessage_HappyPath verifies EventStreamMessage has all 8 required fields. +func TestEventStreamMessage_HappyPath(t *testing.T) { + msg := &EventStreamMessage{ + Id: "stream-evt-001", + Type: "workflow.step.started", + TimestampUnixNanos: 1700000000000000000, + Source: "host", + Metadata: map[string]string{ + "workflow_id": "wf-123", + }, + Payload: []byte(`{"step":"init"}`), + PropagationDepth: 0, + SequenceNumber: 42, + } + require.NotNil(t, msg) + + assert.Equal(t, "stream-evt-001", msg.Id) + assert.Equal(t, "workflow.step.started", msg.Type) + assert.Equal(t, int64(1700000000000000000), msg.TimestampUnixNanos) + assert.Equal(t, "host", msg.Source) + assert.Equal(t, "wf-123", msg.Metadata["workflow_id"]) + assert.Equal(t, []byte(`{"step":"init"}`), msg.Payload) + assert.Equal(t, int32(0), msg.PropagationDepth) + assert.Equal(t, uint64(42), msg.SequenceNumber) + + data, err := proto.Marshal(msg) + require.NoError(t, err) + + msg2 := &EventStreamMessage{} + err = proto.Unmarshal(data, msg2) + require.NoError(t, err) + + assert.Equal(t, msg.Id, msg2.Id) + assert.Equal(t, msg.Type, msg2.Type) + assert.Equal(t, msg.TimestampUnixNanos, msg2.TimestampUnixNanos) + assert.Equal(t, msg.Source, msg2.Source) + assert.Equal(t, msg.Metadata, msg2.Metadata) + assert.Equal(t, msg.Payload, msg2.Payload) + assert.Equal(t, msg.PropagationDepth, msg2.PropagationDepth) + assert.Equal(t, msg.SequenceNumber, msg2.SequenceNumber) +} + +// TestStreamEventsResponse_EmptyMessage verifies StreamEventsResponse is an empty message. +func TestStreamEventsResponse_EmptyMessage(t *testing.T) { + resp := &StreamEventsResponse{} + require.NotNil(t, resp) + + data, err := proto.Marshal(resp) + require.NoError(t, err) + + resp2 := &StreamEventsResponse{} + err = proto.Unmarshal(data, resp2) + require.NoError(t, err) + + assert.EqualValues(t, "StreamEventsResponse", resp.ProtoReflect().Descriptor().Name()) +} + +// TestEventService_StreamEventsClientInterface verifies the client-side streaming interface. +// The host (client) must be able to Send(*EventStreamMessage) and CloseAndRecv() (*StreamEventsResponse, error). +func TestEventService_StreamEventsClientInterface(t *testing.T) { + // Compile-time assertion: EventService_StreamEventsClient satisfies the required interface. + // This test fails to compile if the generated type does not expose Send/CloseAndRecv. + type streamEventsClientIface interface { + Send(*EventStreamMessage) error + CloseAndRecv() (*StreamEventsResponse, error) + } + var _ streamEventsClientIface = EventService_StreamEventsClient(nil) +} + +// TestEventService_StreamEventsServerInterface verifies the server-side streaming interface. +// The plugin (server) must be able to Recv() (*EventStreamMessage, error) and SendAndClose(*StreamEventsResponse) error. +func TestEventService_StreamEventsServerInterface(t *testing.T) { + // Compile-time assertion: EventService_StreamEventsServer satisfies the required interface. + type streamEventsServerIface interface { + Recv() (*EventStreamMessage, error) + SendAndClose(*StreamEventsResponse) error + } + var _ streamEventsServerIface = EventService_StreamEventsServer(nil) +} + +// TestHostEventService_ServiceDescriptor verifies HostEventService descriptor is registered. +func TestHostEventService_ServiceDescriptor(t *testing.T) { + desc := &HostEventService_ServiceDesc + assert.Equal(t, "plugin.v1.HostEventService", desc.ServiceName) + assert.Len(t, desc.Methods, 1) + assert.Equal(t, "Emit", desc.Methods[0].MethodName) + assert.Empty(t, desc.Streams) +} + +// TestEventService_StreamEventsDescriptor verifies EventService has StreamEvents as a client-side stream. +func TestEventService_StreamEventsDescriptor(t *testing.T) { + desc := &EventService_ServiceDesc + assert.Equal(t, "plugin.v1.EventService", desc.ServiceName) + + var streamDesc *grpc.StreamDesc + for i := range desc.Streams { + if desc.Streams[i].StreamName == "StreamEvents" { + streamDesc = &desc.Streams[i] + break + } + } + require.NotNil(t, streamDesc, "StreamEvents stream descriptor not found in EventService") + assert.True(t, streamDesc.ClientStreams, "StreamEvents must be client-side streaming") + assert.False(t, streamDesc.ServerStreams, "StreamEvents must not be server-side streaming") +} + // TestValidationIssue_AllSeveritiesSerialize verifies all severity enum values serialize correctly. func TestValidationIssue_AllSeveritiesSerialize(t *testing.T) { tests := []struct {