Skip to content

feat(providers/nats): NATS provider + DO App Platform deploy target + conformance suite — PR 6#2

Merged
intel352 merged 10 commits intomainfrom
feat/nats-do-provider
May 4, 2026
Merged

feat(providers/nats): NATS provider + DO App Platform deploy target + conformance suite — PR 6#2
intel352 merged 10 commits intomainfrom
feat/nats-do-provider

Conversation

@intel352
Copy link
Copy Markdown
Contributor

@intel352 intel352 commented May 4, 2026

Summary

  • Task 19 — NATS provider + DigitalOcean App Platform deploy target: implements the providers.Provider interface for NATS (Name, Resources, ConnectionString, StreamResources, Probe). Resources(cfg, TargetDigitalOceanApp) emits infra.container_service (NATS image, all three ports 4222/8222/6222, JetStream flags, cluster flag unconditionally) and infra.storage (Spaces bucket for JetStream persistence when enabled). Rejects "latest" version. storage_ref links container to volume. deploy_target label hardcoded to "digitalocean.app_platform".

  • Task 20 — AWS ECS/EKS + Kubernetes deploy-target stubs: adds deploy_aws_ecs.go, deploy_aws_eks.go, deploy_kubernetes.go — each returns a nil resource slice + actionable "not implemented" error with the target name and suggested stub filename. Default arm of the switch also improved. All four stub targets (ECS, EKS, Kubernetes, SelfHosted) covered in deploy_stubs_test.go as a single table test.

  • Task 21 — ConnectionString + StreamResources IaC emission: ConnectionString resolves uri.<env> first (env-specific override) then falls back to base uri key, errors when absent. StreamResources emits one nats.stream_create resource per StreamConfig with NATS-native retention policy strings ("limits"/"workqueue"/"interest" — not proto enum names), defaults num_replicas to 1, validates non-empty name and at least one subject, populates server_uri from state, emits max_age/ack_wait durations when set.

  • Task 22 — Conformance test suite: providers/conformance_test.go (package providers_test) with two tests:

    • TestNATSConformance_DOApp (gated INTEGRATION_NATS_DO=1): 9 lifecycle subtests — provision, stream, connect, connect/env-override, connect/missing-state, probe, probe/empty-uri, publish (JetStream stream + js.Publish), consume (sub.NextMsg), ack (msg.Ack), drain (sub.Drain), teardown (js.DeleteStream + StreamInfo confirms gone). Uses NATS Go client as test-only dep; NATS_URL overrides default localhost:4222; nc closed via parent t.Cleanup. Kind-based resource lookup (not positional index).
    • TestNATSConformance_StubTargets (no gate, always runs): 4 subtests confirming ECS/EKS/kubernetes/self_hosted each error with "not implemented" and the target name.

Type safety

All resource properties use map[string]string. Zero map[string]any. Proto fields accessed via generated getters. No plugin.json changes — contracts gate unaffected.

Test counts

  • providers/nats package: 42 tests across nats_test.go, deploy_digitalocean_app_test.go, deploy_stubs_test.go
  • providers package: 17 conformance tests (13 DOApp lifecycle + 4 StubTargets)
  • All packages green: go test ./providers/... ./gen/... ./iac/...

Reviewer note

Attempted gh pr edit --add-reviewer Copilot per U-10 — expected to fail (permissions gate).

🤖 Generated with Claude Code

intel352 and others added 10 commits May 3, 2026 18:44
…t — Task 19

Implements providers.Provider for NATS with a fully-activated
digitalocean.app_platform deploy target. Resources() emits an
infra.container_service with the official NATS Docker image, JetStream
flags (-js -sd /data), monitoring port exposure, and replica count from
ClusterConfig. ConnectionString reads the 'uri' output from provisioned
state. StreamResources and Probe are clean stubs (Task 21 wires them up).

13 TDD tests cover: Name, unsupported target rejection, container_service
emission, image+version, default version, replica mapping, JetStream
enabled/disabled flags, client port, labels, ConnectionString error path,
nil StreamResources, and empty-URI Probe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eam volume, test split

Three issues from spec-review resolved:

1. Port 6222 now unconditionally included in internal_ports (was missing despite
   natsClusterPort constant being defined). buildInternalPorts() now emits
   4222,8222,6222 for all replica counts so the container can join a cluster
   without redeploy.

2. JetStream volume: emit infra.storage (DigitalOcean Spaces bucket) alongside
   infra.container_service when JetStream is enabled. The DO plugin's SpacesDriver
   realizes infra.storage as an S3-compatible bucket; workflow-plugin-digitalocean
   infra.storage is the applicable storage primitive for DO App Platform.
   storage_size_bytes property propagated from JetStreamConfig.MaxStorageBytes.

3. Test split: deploy_digitalocean_app_test.go now holds all DO App Platform-specific
   tests (12 tests); nats_test.go holds provider-level tests (5 tests).
   New tests: TestDOApp_ClusterPort (table, single+multi-replica), TestDOApp_MonitorPort,
   TestDOApp_JetStreamVolume, TestDOApp_JetStreamVolumeAbsent,
   TestDOApp_JetStreamVolumeStorageSizeProperty.

18 tests total, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nk, cluster flag, label, version guard

Five quality-review issues resolved:

1. (Critical) Add storage_ref property linking infra.container_service to the
   infra.storage resource when JetStream is enabled. The workflow engine uses
   this explicit edge to inject Spaces credentials into the container at
   provisioning time. New tests: TestDOApp_StorageRefLinksContainerToVolume,
   TestDOApp_StorageRefAbsentWithoutJetStream.

2. (Important) Make --cluster flag unconditional in buildRunCommand — matches
   the always-exposed port 6222 and enables zero-config scale-up. Remove the
   replicas > 1 guard. New test: TestDOApp_ClusterFlagAlwaysPresent (table,
   single+multi-replica).

3. (Important) Hardcode deploy_target label as providers.TargetDigitalOceanApp
   instead of reading cfg.GetDeployTarget() (which was silently empty in most
   callers). TestDOApp_Labels now asserts the label value.

4. (Important) Reject "latest" (case-insensitive) version with an explicit error.
   Pinned versions required for reproducible deployments. New test:
   TestDOApp_LatestVersionRejected (table: latest/LATEST/Latest).

5. (Minor) Remove spurious nats_version property from infra.storage resource —
   SpacesDriver has no use for the NATS server version.

25 tests total, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Task 20

Register three not-yet-activated deploy targets for the NATS provider so
config referencing them fails fast with a clear, actionable error rather than
a silent no-op or generic fallthrough message.

New files:
- providers/nats/deploy_aws_ecs.go  — resourcesForAWSECS stub
- providers/nats/deploy_aws_eks.go  — resourcesForAWSEKS stub
- providers/nats/deploy_kubernetes.go — resourcesForKubernetes stub

Each stub returns nil resources + an error that names the target and describes
what file to implement when activating it.

Updated nats.go dispatch switch: explicit cases for TargetAWSECS, TargetAWSEKS,
TargetKubernetes route to the stubs. TargetSelfHosted falls to the default arm.

New test file: providers/nats/deploy_stubs_test.go — 9 tests covering:
- Each stub returns a non-nil error containing "not implemented"
- Each error message names the target string
- All stubs return nil (not empty) resource slices

35 tests total, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…fault msg, table test

Three quality-review issues resolved:

1. (Important) Add TargetSelfHosted to stub test coverage. TestNATSStub_ErrorBehavior
   now covers aws.ecs, aws.eks, kubernetes, and self_hosted — the default arm
   can no longer return nil, nil silently without a test catching it.

2. (Minor) Improve default-branch error message to match named-stub quality:
   "nats: deploy target %q is not implemented for the pilot; only %q is active
   — add a deploy_%s.go stub to activate this target"
   Requires "strings" import added to nats.go.

3. (Minor) Collapse the 7 individual test functions in deploy_stubs_test.go
   into one table-driven TestNATSStub_ErrorBehavior with 4 subtests. Each
   subtest asserts: error non-nil, resource slice nil, error contains
   "not implemented", error mentions the target name string.

30 tests total, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ssion

Task 21 — implement ConnectionString (env-scoped uri.* lookup with base
fallback) and StreamResources (nats.stream_create per JetStream stream,
with server_uri from state, max_age/ack_wait durations, nil-entry skipping).
11 new tests; all 37 nats package tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…alidation

Fix 4 issues from quality review:
- Map RetentionPolicy enum to NATS-native strings (workqueue/interest/limits)
  via retentionPolicyString() instead of proto .String()
- Return error on empty stream name
- Return error on empty subjects list
- Default num_replicas to 1 when unset (proto zero value)
Tighten retention_policy test to assert exact NATS value; add 3 new tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Task 22 — add providers/conformance_test.go with 7 lifecycle subtests:
provision, stream, connect, connect/env-override, connect/missing-state,
probe, probe/empty-uri. Gated behind INTEGRATION_NATS_DO=1; skips cleanly
when unset. Uses providers.Provider interface (not concrete type).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…phases

Extends conformance_test.go with the five runtime lifecycle phases (5–9)
using the NATS Go client as a test-only dependency. Phases connect to a
live NATS JetStream server (NATS_URL or localhost:4222), create a
CONFORMANCE_TEST stream, publish, subscribe+fetch, ack, drain, and delete.
nc cleanup registered on parent test to outlive all subtests.

Also adds TestNATSConformance_StubTargets (no gate) covering ECS/EKS/k8s/
self_hosted error contract. All 13 DOApp subtests pass against a running
NATS server; StubTargets passes unconditionally.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ce test

Fix 2 quality issues:
- go.mod: move nats.go to direct requires block (was incorrectly indirect)
- conformance provision subtest: replace positional resources[0]/[1] access
  with kind-based lookup so test is resilient to ordering changes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@intel352 intel352 merged commit b31a3ea into main May 4, 2026
4 checks passed
@intel352 intel352 deleted the feat/nats-do-provider branch May 4, 2026 00:35
intel352 added a commit that referenced this pull request May 4, 2026
Addresses 5 code-reviewer findings on Task 23:

Important #1 — Missing sdk.TypedModuleProvider compile-time assertions:
Added ClusterModuleFactory, StreamModuleFactory, ConsumerModuleFactory —
each a named struct with 'var _ sdk.TypedModuleProvider = (*T)(nil)'.
CreateTypedModule unpacks anypb.Any and delegates to the NewXModule
constructor. Returns sdk.ErrTypedContractNotHandled for wrong types so
the plugin's routing loop can skip.

Important #2 — deploy_target not explicitly validated before
ValidateProviderTarget: added an explicit empty-string check for
config.deploy_target in NewClusterModule, producing a clear error
message distinct from the provider × target compatibility error.

Important #3 — NATS connection lifecycle: module did not own the
connection it declared. Added natsConnCache (sync.Mutex-guarded
map[string]*nats.Conn), GetOrDialNATSConn (lazy dial via injectable
natsDialFn), RegisterNATSConn / GetNATSConn / closeNATSConn helpers.
Stop() now calls closeNATSConn, which closes and evicts the cached
connection (nil-guarded, idempotent). natsDialFn is a package-level
var so integration tests and unit tests can inject without a real server.

Minor #4 — consumer goroutine leak concern: explicit comment in
consumerModule.Start documenting that no goroutines are launched —
consumption is pull-based, driven by step.eventbus.consume.

Minor #5 — factory tests missing: added TypedModuleProvider tests for
all three factory types (TypedModuleTypes, WrongType, NilConfig paths).

All 28 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
intel352 added a commit that referenced this pull request May 4, 2026
…egration tests (#3)

* feat: add infra.eventbus + stream + consumer module factories

Implements Task 23 (PR 7) — three typed module factories for the
workflow-plugin-eventbus gRPC plugin:

- module.go / module_test.go: infra.eventbus — validates provider ×
  deploy_target using the existing conformance matrix, registers
  ClusterConfig in a global registry, resolves broker URI from
  EVENTBUS_<NAME>_URI or NATS_URL env vars for runtime step use.
- stream.go / stream_test.go: infra.eventbus.stream — validates name +
  subjects, registers StreamConfig for consume and trigger lookups.
- consumer.go / consumer_test.go: infra.eventbus.consumer — validates
  name + stream_name, registers ConsumerConfig for step.eventbus.consume
  and trigger.eventbus.subscribe.

All three types implement sdk.ModuleInstance with compile-time assertions
(var _ sdk.ModuleInstance = (*T)(nil)). Zero map[string]any — all config
boundaries use typed proto pointers (proto messages embed sync.Mutex via
protoimpl.MessageState). 19 unit tests, all passing.

Adds github.com/GoCodeAlone/workflow v0.20.1 as a direct dependency for
the sdk.ModuleInstance interface.

wfctl audit plugins -strict-contracts: module 3/3 strict, step 3/3
strict, trigger 1/1 strict — no findings on workflow-plugin-eventbus.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(modules): add TypedModuleProvider factories + NATS conn lifecycle

Addresses 5 code-reviewer findings on Task 23:

Important #1 — Missing sdk.TypedModuleProvider compile-time assertions:
Added ClusterModuleFactory, StreamModuleFactory, ConsumerModuleFactory —
each a named struct with 'var _ sdk.TypedModuleProvider = (*T)(nil)'.
CreateTypedModule unpacks anypb.Any and delegates to the NewXModule
constructor. Returns sdk.ErrTypedContractNotHandled for wrong types so
the plugin's routing loop can skip.

Important #2 — deploy_target not explicitly validated before
ValidateProviderTarget: added an explicit empty-string check for
config.deploy_target in NewClusterModule, producing a clear error
message distinct from the provider × target compatibility error.

Important #3 — NATS connection lifecycle: module did not own the
connection it declared. Added natsConnCache (sync.Mutex-guarded
map[string]*nats.Conn), GetOrDialNATSConn (lazy dial via injectable
natsDialFn), RegisterNATSConn / GetNATSConn / closeNATSConn helpers.
Stop() now calls closeNATSConn, which closes and evicts the cached
connection (nil-guarded, idempotent). natsDialFn is a package-level
var so integration tests and unit tests can inject without a real server.

Minor #4 — consumer goroutine leak concern: explicit comment in
consumerModule.Start documenting that no goroutines are launched —
consumption is pull-based, driven by step.eventbus.consume.

Minor #5 — factory tests missing: added TypedModuleProvider tests for
all three factory types (TypedModuleTypes, WrongType, NilConfig paths).

All 28 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(modules): address 2 blockers + 1 minor from second review

BLOCKER-A (module_test.go): Replace bare os.Unsetenv calls in
TestClusterModule_InitNoURIWhenEnvNotSet with LookupEnv + t.Cleanup
restore pattern. Bare Unsetenv permanently cleared NATS_URL for all
subsequent tests in the run; the cleanup now restores any pre-existing
value on exit.

BLOCKER-B (module.go): Eliminate connCacheMu→urlMu nested lock ordering
in GetOrDialNATSConn by extracting the URI lookup between the fast-path
unlock and the slow-path re-lock. Pattern:
  1. Lock, check cache, unlock (fast path — no urlMu touch).
  2. GetBusURI with no lock held (urlMu only, no nesting).
  3. Dial with no lock held.
  4. Lock, double-check (another goroutine may have won), insert or
     discard the redundant connection, unlock.
Added lock-ordering doc comment to the NATS cache section.

MINOR-4 (all three *_test.go): Add if err != nil { t.Fatalf } after
NewXModule constructors in StopUnregisters and StopEvictsNATSConn tests
so a nil module doesn't panic on the next line.

All 28 tests pass, -race clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(steps): add step.eventbus.publish/consume/ack with typed proto

- steps/publish.go: PublishHandler uses JetStream PublishMsg, returns
  broker sequence + RFC3339 acked_at timestamp
- steps/consume.go: ConsumeHandler binds to existing durable JetStream
  consumer via BindStream; maps msg.Reply→ack_token, Metadata→sequence
  and published_at; ErrTimeout is treated as empty batch (not an error)
- steps/ack.go: AckHandler publishes empty payload to ack_token (the
  JetStream reply subject), acknowledging the message
- steps/factories.go: TypedStepFactory vars + All() slice for plugin
  server registration; compile-time TypedStepProvider assertions
- module.go: add DefaultBusConn() — single-bus helper for step handlers
- consumer.go: add GetConsumerByName() — resolve ConsumerConfig by
  durable name (cfg.name) for step.eventbus.consume lookup

All 9 new step unit tests pass alongside existing 24 module tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(steps): address 1 critical + 2 important + 2 minor from quality review

CRITICAL: pass nats.Context(ctx) to nc.JetStream() in publish.go and
consume.go so that context cancellation/deadline is respected by both
js.PublishMsg and sub.Fetch — previously ctx was dead code in all
JetStream calls.

IMPORTANT-1: sort registered cluster names before selecting default in
DefaultBusConn(); eliminates non-deterministic map iteration that would
route to a random broker in multi-bus deployments.

IMPORTANT-2: cap batch_size at maxBatchSize=1000 in ConsumeHandler;
prevents a caller from passing 2^31-1 and blocking fetch indefinitely.

MINOR-3: clarify sub.Drain() nolint comment — "ephemeral per-fetch"
explains why best-effort is the right posture for a PullSubscribe.

MINOR-4: add explicit invariant comment to TestPublishHandler_NoBusRegistered
naming the t.Cleanup guarantee that makes the test's assumption safe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(trigger): add trigger.eventbus.subscribe + gRPC plugin entrypoint + integration test

- trigger.go: SubscribeTriggerModuleFactory (TypedModuleProvider) + subscribeTrigger
  (ModuleInstance + TriggerInstance) with bounded goroutine, clean shutdown via
  context cancel + done channel, fetchPollInterval backpressure on idle streams.
  cb=nil path is a no-op Start (external plugin gRPC transport).
- trigger_test.go: 7 unit tests covering factory, validation, and nil-callback lifecycle.
- cmd/workflow-plugin-eventbus/main.go: eventbusPlugin wires all 4 module factories
  (cluster, stream, consumer, trigger), 3 step factories (publish, consume, ack),
  TriggerProvider, and ContractProvider returning all 7 strict-proto descriptors.
- integration_test.go: real gRPC subprocess transport — compiles binary, starts via
  go-plugin, verifies manifest, contract registry (7 contracts, all STRICT_PROTO),
  infra.eventbus module lifecycle, publish/consume/ack step error paths over wire,
  trigger module lifecycle, and GetModuleTypes/GetStepTypes/GetTriggerTypes RPCs.
  No live NATS server required; exercises descriptive error paths.

All tests pass (GOWORK=off go test ./... -count=1).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(trigger): address 2 spec-reviewer failures

Failure 1 — plugin.go missing:
- Split cmd/workflow-plugin-eventbus/main.go into main.go (entrypoint only:
  sdk.Serve(&eventbusPlugin{})) and plugin.go (package main: eventbusPlugin
  type, compile-time assertions, moduleFactories, stepFactories, and all
  interface method impls: Manifest, TypedModuleTypes, CreateTypedModule,
  TypedStepTypes, CreateTypedStep, TriggerTypes, CreateTrigger, ContractRegistry).

Failure 2 — integration_test.go wrong gate + missing NATS scenario:
- TestE2E_EventbusPluginScenario: removed testing.Short() gate — runs always.
  Extracted declareModule helper (Create→Init→Start+Cleanup) to reduce repetition.
- TestE2E_EventbusPluginScenario_WithNATS: new test gated on INTEGRATION_NATS=1.
  Reads NATS_URL from env (inherited by subprocess). Pre-creates JetStream stream
  + durable consumer directly via nats.go in test process. Declares infra.eventbus,
  infra.eventbus.stream, infra.eventbus.consumer modules via gRPC. Publishes 10
  messages via step.eventbus.publish (each ExecuteStep call over gRPC wire).
  Consumes all 10 via step.eventbus.consume batch_size=10. Acks each via
  step.eventbus.ack using ack_token from ConsumeResponse. Asserts len(messages)==10.

All tests pass (GOWORK=off go test ./... -count=1). WithNATS test skips correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(trigger): address 1 critical + 2 important + 2 minor from quality review

CRITICAL — callback map didn't match Message proto contract (trigger.go):
  fetchAndFire now builds *eventbusv1.Message the same way ConsumeHandler does:
  Payload=m.Data ([]byte, not string), AckToken=m.Reply (not "reply"), plus
  Headers (map[string]string from NATS headers), Sequence (stream seq as string),
  PublishedAt (RFC3339 UTC). Callback data map keys are the six proto field names:
  subject, payload, headers, sequence, published_at, ack_token.

IMPORTANT-1 — zero test coverage for fetchAndFire (trigger_test.go):
  Added four new tests using an embedded NATS server (nats-server/v2, already an
  indirect dep, now explicit in go.mod):
  - TestSubscribeTrigger_FetchAndFire_CallbackData: publishes 1 msg with a custom
    header, asserts all six data map fields are present with correct Go types
    ([]byte for payload, string for ack_token — catches the CRITICAL regression).
  - TestSubscribeTrigger_FetchLoop_ExitsOnCancel: Stop returns within 5s after
    Start; goroutine exits cleanly on ctx cancel.
  - TestSubscribeTrigger_FetchLoop_RetryOnError: publishes after Start; loop keeps
    polling until the message arrives (retry path).
  - TestSubscribeTrigger_DoubleStart: second Start returns error; first goroutine
    exits cleanly on Stop (double-start guard test).

IMPORTANT-2 — double-Start goroutine leak (trigger.go):
  Start() now returns an error if t.cancel != nil (already started).

MINOR-3 — time.After allocation in retry backoff (trigger.go):
  Replaced time.After with time.NewTimer + backoff.Reset so a single timer is
  reused across all retry iterations in fetchLoop.

MINOR-4 — CreateTrigger silent type assertion discard (plugin.go):
  Added configString() helper that returns an explicit error when a config key
  is present but not a string (e.g. config["name"]=42 → "config[name] must be
  a string, got int"), rather than silently returning "" and confusing callers.

All tests pass (GOWORK=off go test ./... -count=1).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(trigger): add UnregisterNATSConn + correct test cleanups (parting nit)

- module.go: export UnregisterNATSConn(instanceName) — evicts the entry from
  natsConnCache without closing the connection (for tests that own the conn
  lifetime via nc.Close() + embedded-server shutdown).
- trigger_test.go: replace the three duplicate UnregisterBusURI cleanups that
  followed RegisterNATSConn with UnregisterNATSConn, ensuring natsConnCache is
  cleaned between tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant