feat: scaffold workflow-plugin-eventbus with proto + provider interface — PR 5#1
Merged
feat: scaffold workflow-plugin-eventbus with proto + provider interface — PR 5#1
Conversation
- go.mod: module github.com/GoCodeAlone/workflow-plugin-eventbus, go 1.26 - proto/eventbus.proto: full proto contract (ClusterConfig, JetStream/Kafka/KinesisConfig, ResourceLimits, StreamConfig, ConsumerConfig, Publish/Consume/Ack Request+Response, Message) - gen/eventbus.pb.go: generated Go bindings (protoc-gen-go v1.36.11) - plugin.json: module/step/trigger type capabilities manifest - plugin.contracts.json: strict_proto contracts for all 3 modules, 3 steps, 1 trigger - Makefile: proto-gen, build, test, vet targets (GOWORK=off) - .github/workflows/ci.yml: mirrors workflow-plugin-audit-chain (test + wfctl-strict-contracts) - .github/workflows/release.yml: GoReleaser v2 on v* tag - .goreleaser.yaml: cross-platform builds (linux/darwin/windows, amd64/arm64) - cmd/workflow-plugin-eventbus/main.go: scaffold entrypoint (TODO: wire sdk.Serve in Task 17) wfctl strict-contracts: OK workflow-plugin-eventbus v0.1.0 go build ./...: exit 0 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- iac/iac.go: typed Resource + State structs (zero map[string]any); State.Output() helper
- providers/target.go: DeployTarget enum (8 constants) + ValidateProviderTarget matrix:
nats → {DO App, AWS ECS/EKS, k8s, self-hosted}
kafka → {DO Managed Kafka, AWS MSK, k8s, self-hosted}
kinesis → {AWS Kinesis only}
unknown provider → error
- providers/provider.go: Provider interface (Name/Resources/ConnectionString/StreamResources/HealthCheck)
- providers/provider_test.go: table-driven TDD covering all 25 provider×target combos
+ unknown provider; regression invariant proven (revert→FAIL, restore→PASS)
go build ./...: exit 0 | go test ./...: PASS (25 subtests)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- proto/eventbus.proto: replace string fields with typed enums: RetentionPolicy (LIMITS/INTEREST/WORKQUEUE) → StreamConfig.retention_policy DeliverPolicy (ALL/LAST/NEW/BY_START_SEQUENCE/BY_START_TIME) → ConsumerConfig.deliver_policy AckPolicy (EXPLICIT/NONE/ALL) → ConsumerConfig.ack_policy KafkaSecurityProtocol (PLAINTEXT/SSL/SASL_PLAINTEXT/SASL_SSL) → KafkaConfig.security_protocol KafkaSaslMechanism (PLAIN/SCRAM_SHA_256/SCRAM_SHA_512) → KafkaConfig.sasl_mechanism - gen/eventbus.pb.go: regenerated (protoc-gen-go v1.36.11) - .github/workflows/test.yml: renamed from ci.yml per task spec wfctl strict-contracts: OK | go build ./...: exit 0 | go test ./...: PASS Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Task 16: - cmd/main.go: add var version = "dev" so -X main.version ldflags stamps correctly; improve panic message with repo URL - gen/proto_smoke_test.go: 3 round-trip tests (ClusterConfig, ConsumerConfig enums, StreamConfig RetentionPolicy) — CI test job is no longer vacuously green - Makefile: pin proto-gen comment to protoc v7.34.1 + protoc-gen-go v1.36.11 with upgrade warning Task 17: - providers/provider.go: HealthStatus typed string (healthy/degraded/unreachable); HealthCheck.Error → Err; Provider.HealthCheck → Probe (avoids method/type collision) - iac/iac.go: Output struct with Sensitive bool enforces sensitivity per design §Security; State.Outputs map[string]Output; Resource gains Properties map[string]string for downstream IaC plugin configuration (e.g. image, shard_count, storage_size) - providers/provider_test.go: edge-case tests for empty provider + wrong-case providers go build ./...: exit 0 | go test ./...: 6 packages PASS (29 subtests) | wfctl: OK Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
retention_policy: limits → RETENTION_POLICY_LIMITS (line 41) ack_policy: explicit → ACK_POLICY_EXPLICIT (lines 50, 75) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eferred per manifest out-of-scope) - providers/kafka/kafka.go: stub Provider — all methods return errNotImplemented; Probe returns HealthStatusUnreachable. Fails fast on config errors rather than panicking. - providers/kinesis/kinesis.go: same shape, kinesis-specific error message. - providers/kafka/kafka_test.go: 5 tests (Name, Resources/ConnectionString/StreamResources error, Probe unreachable) - providers/kinesis/kinesis_test.go: same 5 tests TDD regression invariant: revert→FAIL (4 failures), restore→PASS go build ./...: exit 0 | go test ./...: 4 packages PASS (39 tests total) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
proto messages embed protoimpl.MessageState which contains sync.Mutex;
passing by value is rejected by go vet as unsafe mutex copy.
- providers/provider.go: Resources(cfg *ClusterConfig, ...) and
StreamResources(streams []*StreamConfig, ...)
- providers/kafka/kafka.go: matching method signatures
- providers/kinesis/kinesis.go: matching method signatures
- providers/kafka/kafka_test.go: Resources(&ClusterConfig{}, ...)
- providers/kinesis/kinesis_test.go: Resources(&ClusterConfig{}, ...)
go vet ./...: exit 0 | go test ./...: PASS (42 tests, -race)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
intel352
added a commit
that referenced
this pull request
May 4, 2026
Addresses 5 code-reviewer findings on Task 23: Important #1 — Missing sdk.TypedModuleProvider compile-time assertions: Added ClusterModuleFactory, StreamModuleFactory, ConsumerModuleFactory — each a named struct with 'var _ sdk.TypedModuleProvider = (*T)(nil)'. CreateTypedModule unpacks anypb.Any and delegates to the NewXModule constructor. Returns sdk.ErrTypedContractNotHandled for wrong types so the plugin's routing loop can skip. Important #2 — deploy_target not explicitly validated before ValidateProviderTarget: added an explicit empty-string check for config.deploy_target in NewClusterModule, producing a clear error message distinct from the provider × target compatibility error. Important #3 — NATS connection lifecycle: module did not own the connection it declared. Added natsConnCache (sync.Mutex-guarded map[string]*nats.Conn), GetOrDialNATSConn (lazy dial via injectable natsDialFn), RegisterNATSConn / GetNATSConn / closeNATSConn helpers. Stop() now calls closeNATSConn, which closes and evicts the cached connection (nil-guarded, idempotent). natsDialFn is a package-level var so integration tests and unit tests can inject without a real server. Minor #4 — consumer goroutine leak concern: explicit comment in consumerModule.Start documenting that no goroutines are launched — consumption is pull-based, driven by step.eventbus.consume. Minor #5 — factory tests missing: added TypedModuleProvider tests for all three factory types (TypedModuleTypes, WrongType, NilConfig paths). All 28 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
intel352
added a commit
that referenced
this pull request
May 4, 2026
…egration tests (#3) * feat: add infra.eventbus + stream + consumer module factories Implements Task 23 (PR 7) — three typed module factories for the workflow-plugin-eventbus gRPC plugin: - module.go / module_test.go: infra.eventbus — validates provider × deploy_target using the existing conformance matrix, registers ClusterConfig in a global registry, resolves broker URI from EVENTBUS_<NAME>_URI or NATS_URL env vars for runtime step use. - stream.go / stream_test.go: infra.eventbus.stream — validates name + subjects, registers StreamConfig for consume and trigger lookups. - consumer.go / consumer_test.go: infra.eventbus.consumer — validates name + stream_name, registers ConsumerConfig for step.eventbus.consume and trigger.eventbus.subscribe. All three types implement sdk.ModuleInstance with compile-time assertions (var _ sdk.ModuleInstance = (*T)(nil)). Zero map[string]any — all config boundaries use typed proto pointers (proto messages embed sync.Mutex via protoimpl.MessageState). 19 unit tests, all passing. Adds github.com/GoCodeAlone/workflow v0.20.1 as a direct dependency for the sdk.ModuleInstance interface. wfctl audit plugins -strict-contracts: module 3/3 strict, step 3/3 strict, trigger 1/1 strict — no findings on workflow-plugin-eventbus. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(modules): add TypedModuleProvider factories + NATS conn lifecycle Addresses 5 code-reviewer findings on Task 23: Important #1 — Missing sdk.TypedModuleProvider compile-time assertions: Added ClusterModuleFactory, StreamModuleFactory, ConsumerModuleFactory — each a named struct with 'var _ sdk.TypedModuleProvider = (*T)(nil)'. CreateTypedModule unpacks anypb.Any and delegates to the NewXModule constructor. Returns sdk.ErrTypedContractNotHandled for wrong types so the plugin's routing loop can skip. Important #2 — deploy_target not explicitly validated before ValidateProviderTarget: added an explicit empty-string check for config.deploy_target in NewClusterModule, producing a clear error message distinct from the provider × target compatibility error. Important #3 — NATS connection lifecycle: module did not own the connection it declared. Added natsConnCache (sync.Mutex-guarded map[string]*nats.Conn), GetOrDialNATSConn (lazy dial via injectable natsDialFn), RegisterNATSConn / GetNATSConn / closeNATSConn helpers. Stop() now calls closeNATSConn, which closes and evicts the cached connection (nil-guarded, idempotent). natsDialFn is a package-level var so integration tests and unit tests can inject without a real server. Minor #4 — consumer goroutine leak concern: explicit comment in consumerModule.Start documenting that no goroutines are launched — consumption is pull-based, driven by step.eventbus.consume. Minor #5 — factory tests missing: added TypedModuleProvider tests for all three factory types (TypedModuleTypes, WrongType, NilConfig paths). All 28 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(modules): address 2 blockers + 1 minor from second review BLOCKER-A (module_test.go): Replace bare os.Unsetenv calls in TestClusterModule_InitNoURIWhenEnvNotSet with LookupEnv + t.Cleanup restore pattern. Bare Unsetenv permanently cleared NATS_URL for all subsequent tests in the run; the cleanup now restores any pre-existing value on exit. BLOCKER-B (module.go): Eliminate connCacheMu→urlMu nested lock ordering in GetOrDialNATSConn by extracting the URI lookup between the fast-path unlock and the slow-path re-lock. Pattern: 1. Lock, check cache, unlock (fast path — no urlMu touch). 2. GetBusURI with no lock held (urlMu only, no nesting). 3. Dial with no lock held. 4. Lock, double-check (another goroutine may have won), insert or discard the redundant connection, unlock. Added lock-ordering doc comment to the NATS cache section. MINOR-4 (all three *_test.go): Add if err != nil { t.Fatalf } after NewXModule constructors in StopUnregisters and StopEvictsNATSConn tests so a nil module doesn't panic on the next line. All 28 tests pass, -race clean. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(steps): add step.eventbus.publish/consume/ack with typed proto - steps/publish.go: PublishHandler uses JetStream PublishMsg, returns broker sequence + RFC3339 acked_at timestamp - steps/consume.go: ConsumeHandler binds to existing durable JetStream consumer via BindStream; maps msg.Reply→ack_token, Metadata→sequence and published_at; ErrTimeout is treated as empty batch (not an error) - steps/ack.go: AckHandler publishes empty payload to ack_token (the JetStream reply subject), acknowledging the message - steps/factories.go: TypedStepFactory vars + All() slice for plugin server registration; compile-time TypedStepProvider assertions - module.go: add DefaultBusConn() — single-bus helper for step handlers - consumer.go: add GetConsumerByName() — resolve ConsumerConfig by durable name (cfg.name) for step.eventbus.consume lookup All 9 new step unit tests pass alongside existing 24 module tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(steps): address 1 critical + 2 important + 2 minor from quality review CRITICAL: pass nats.Context(ctx) to nc.JetStream() in publish.go and consume.go so that context cancellation/deadline is respected by both js.PublishMsg and sub.Fetch — previously ctx was dead code in all JetStream calls. IMPORTANT-1: sort registered cluster names before selecting default in DefaultBusConn(); eliminates non-deterministic map iteration that would route to a random broker in multi-bus deployments. IMPORTANT-2: cap batch_size at maxBatchSize=1000 in ConsumeHandler; prevents a caller from passing 2^31-1 and blocking fetch indefinitely. MINOR-3: clarify sub.Drain() nolint comment — "ephemeral per-fetch" explains why best-effort is the right posture for a PullSubscribe. MINOR-4: add explicit invariant comment to TestPublishHandler_NoBusRegistered naming the t.Cleanup guarantee that makes the test's assumption safe. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(trigger): add trigger.eventbus.subscribe + gRPC plugin entrypoint + integration test - trigger.go: SubscribeTriggerModuleFactory (TypedModuleProvider) + subscribeTrigger (ModuleInstance + TriggerInstance) with bounded goroutine, clean shutdown via context cancel + done channel, fetchPollInterval backpressure on idle streams. cb=nil path is a no-op Start (external plugin gRPC transport). - trigger_test.go: 7 unit tests covering factory, validation, and nil-callback lifecycle. - cmd/workflow-plugin-eventbus/main.go: eventbusPlugin wires all 4 module factories (cluster, stream, consumer, trigger), 3 step factories (publish, consume, ack), TriggerProvider, and ContractProvider returning all 7 strict-proto descriptors. - integration_test.go: real gRPC subprocess transport — compiles binary, starts via go-plugin, verifies manifest, contract registry (7 contracts, all STRICT_PROTO), infra.eventbus module lifecycle, publish/consume/ack step error paths over wire, trigger module lifecycle, and GetModuleTypes/GetStepTypes/GetTriggerTypes RPCs. No live NATS server required; exercises descriptive error paths. All tests pass (GOWORK=off go test ./... -count=1). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(trigger): address 2 spec-reviewer failures Failure 1 — plugin.go missing: - Split cmd/workflow-plugin-eventbus/main.go into main.go (entrypoint only: sdk.Serve(&eventbusPlugin{})) and plugin.go (package main: eventbusPlugin type, compile-time assertions, moduleFactories, stepFactories, and all interface method impls: Manifest, TypedModuleTypes, CreateTypedModule, TypedStepTypes, CreateTypedStep, TriggerTypes, CreateTrigger, ContractRegistry). Failure 2 — integration_test.go wrong gate + missing NATS scenario: - TestE2E_EventbusPluginScenario: removed testing.Short() gate — runs always. Extracted declareModule helper (Create→Init→Start+Cleanup) to reduce repetition. - TestE2E_EventbusPluginScenario_WithNATS: new test gated on INTEGRATION_NATS=1. Reads NATS_URL from env (inherited by subprocess). Pre-creates JetStream stream + durable consumer directly via nats.go in test process. Declares infra.eventbus, infra.eventbus.stream, infra.eventbus.consumer modules via gRPC. Publishes 10 messages via step.eventbus.publish (each ExecuteStep call over gRPC wire). Consumes all 10 via step.eventbus.consume batch_size=10. Acks each via step.eventbus.ack using ack_token from ConsumeResponse. Asserts len(messages)==10. All tests pass (GOWORK=off go test ./... -count=1). WithNATS test skips correctly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(trigger): address 1 critical + 2 important + 2 minor from quality review CRITICAL — callback map didn't match Message proto contract (trigger.go): fetchAndFire now builds *eventbusv1.Message the same way ConsumeHandler does: Payload=m.Data ([]byte, not string), AckToken=m.Reply (not "reply"), plus Headers (map[string]string from NATS headers), Sequence (stream seq as string), PublishedAt (RFC3339 UTC). Callback data map keys are the six proto field names: subject, payload, headers, sequence, published_at, ack_token. IMPORTANT-1 — zero test coverage for fetchAndFire (trigger_test.go): Added four new tests using an embedded NATS server (nats-server/v2, already an indirect dep, now explicit in go.mod): - TestSubscribeTrigger_FetchAndFire_CallbackData: publishes 1 msg with a custom header, asserts all six data map fields are present with correct Go types ([]byte for payload, string for ack_token — catches the CRITICAL regression). - TestSubscribeTrigger_FetchLoop_ExitsOnCancel: Stop returns within 5s after Start; goroutine exits cleanly on ctx cancel. - TestSubscribeTrigger_FetchLoop_RetryOnError: publishes after Start; loop keeps polling until the message arrives (retry path). - TestSubscribeTrigger_DoubleStart: second Start returns error; first goroutine exits cleanly on Stop (double-start guard test). IMPORTANT-2 — double-Start goroutine leak (trigger.go): Start() now returns an error if t.cancel != nil (already started). MINOR-3 — time.After allocation in retry backoff (trigger.go): Replaced time.After with time.NewTimer + backoff.Reset so a single timer is reused across all retry iterations in fetchLoop. MINOR-4 — CreateTrigger silent type assertion discard (plugin.go): Added configString() helper that returns an explicit error when a config key is present but not a string (e.g. config["name"]=42 → "config[name] must be a string, got int"), rather than silently returning "" and confusing callers. All tests pass (GOWORK=off go test ./... -count=1). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(trigger): add UnregisterNATSConn + correct test cleanups (parting nit) - module.go: export UnregisterNATSConn(instanceName) — evicts the entry from natsConnCache without closing the connection (for tests that own the conn lifetime via nc.Close() + embedded-server shutdown). - trigger_test.go: replace the three duplicate UnregisterBusURI cleanups that followed RegisterNATSConn with UnregisterNATSConn, ensuring natsConnCache is cleaned between tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
First PR on GoCodeAlone/workflow-plugin-eventbus (BMW E2E fulfillment pilot, PR 5 of cycle-3). Scaffolds the repo from empty (LICENSE only) to a fully testable, CI-validated plugin skeleton ready for NATS provider implementation.
What's in this PR
go.mod,Makefile,README.md,cmd/, CI workflows, GoReleaser config)proto/eventbus.proto+gen/eventbus.pb.go) — typed proto3 messages for all 3 module types, 3 step types, and 1 trigger type; 5 finite-vocabulary fields use proper proto enums (RetentionPolicy, DeliverPolicy, AckPolicy, KafkaSecurityProtocol, KafkaSaslMechanism) rather than stringly-typed stringsplugin.json+plugin.contracts.json) — 7 capability entries, 7 contracts; passeswfctl plugin validate --strict-contractsproviders/provider.go) — typedProviderinterface withHealthStatusenum; proto messages passed by pointer (avoidsgo vetlock-by-value onprotoimpl.MessageState)providers/target.go) — 8DeployTargetconstants;ValidateProviderTargetencodes the full provider×target matrix (nats/kafka/kinesis × 8 targets); rejects invalid combos at config-load timeiac/iac.go) —ResourcewithPropertiesmap;Statewith sensitivity-markedOutputstruct (per design §Security)providers/kafka/,providers/kinesis/) — registered but not activated per pilot manifest (NATS only for BMW pilot); fail fast with clear error.github/workflows/test.yml) — mirrors workflow-plugin-audit-chain exactly:testjob +wfctl-strict-contractsjob-raceLessons carried from audit-chain PR 4
plugin.contracts.jsonvalidated locally withwfctl strict-contractsbefore every DM to reviewers — zero CI surprisego vetmutex-copy errors discovered during review)Notes
Test plan
go build ./...exits 0go vet ./...exits 0go test ./... -race -count=1— all 4 packages pass (42 tests)wfctl plugin validate --file plugin.json --strict-contracts→ OKtest+wfctl-strict-contractsjobs both green on push🤖 Generated with Claude Code