Skip to content
55 changes: 55 additions & 0 deletions cmd/wfctl/deploy_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,61 @@ func (r *remoteIaCProvider) DetectDrift(_ context.Context, refs []interfaces.Res
return drifts, nil
}

// DetectDriftWithApplied dispatches to the v2 RPC if the remote plugin
// supports it, falling back to legacy DetectDrift when the remote returns
// "method not found". This is the wire-format counterpart of the type-
// assertion fallback in callers; both layers need the fallback because:
// - Caller-side: provider may not implement the interface (type-
// assertion fails → legacy path).
// - Wire-side: provider implements the interface in newer code but the
// LOADED plugin binary on disk is older (manifest accepted but RPC
// dispatch returns method-not-found).
func (r *remoteIaCProvider) DetectDriftWithApplied(ctx context.Context, refs []interfaces.ResourceRef, applied map[string]map[string]any) ([]interfaces.DriftResult, error) {
refsAny, err := jsonToAny(refs)
if err != nil {
return nil, fmt.Errorf("IaCProvider.DetectDriftWithApplied: marshal refs: %w", err)
}
appliedAny, err := jsonToAny(applied)
if err != nil {
return nil, fmt.Errorf("IaCProvider.DetectDriftWithApplied: marshal applied: %w", err)
}
res, err := r.invoker.InvokeService("IaCProvider.DetectDriftWithApplied", map[string]any{
"refs": refsAny, "applied": appliedAny,
})
if err != nil {
// Method-not-found from the plugin: fall back to legacy DetectDrift.
// Match on substring to tolerate framework-specific error wrappers
// (HashiCorp go-plugin emits "method not found"; future schemes may
// wrap the literal differently). DO NOT fall back on other errors —
// a transient cloud failure must propagate so callers don't silently
// lose drift signal.
if isMethodNotFound(err) {
return r.DetectDrift(ctx, refs)
}
return nil, err
}
raw, ok := res["drifts"]
if !ok {
return nil, nil
}
var drifts []interfaces.DriftResult
if err := anyToStruct(raw, &drifts); err != nil {
return nil, fmt.Errorf("IaCProvider.DetectDriftWithApplied: decode result: %w", err)
}
return drifts, nil
}

// isMethodNotFound reports whether err is the canonical signal a remote
// plugin emits when it has no case for the requested RPC method. Tolerant
// substring match across known wrapper formats.
func isMethodNotFound(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "method not found") || strings.Contains(msg, "Method not found")
}

func (r *remoteIaCProvider) Import(_ context.Context, cloudID string, resourceType string) (*interfaces.ResourceState, error) {
res, err := r.invoker.InvokeService("IaCProvider.Import", map[string]any{
"provider_id": cloudID,
Expand Down
77 changes: 77 additions & 0 deletions cmd/wfctl/deploy_providers_remote_iac_compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

import (
"context"
"errors"
"testing"

"github.com/GoCodeAlone/workflow/interfaces"
)

// TestRemoteIaC_OptionalDriftConfigDetector_FallsBackOnLegacyPlugin pins the
// pipeline's compat story: when a remote plugin does NOT support
// IaCProvider.DetectDriftWithApplied (legacy plugin: returns "method not
// found"), the wfctl caller falls through to legacy IaCProvider.DetectDrift.
//
// This is the wire-format counterpart of the type-assertion fallback in
// runInfraApplyRefreshPhase / runInfraStatusDrift; without this gate, a v0.22
// wfctl talking to a v0.10.3 DO plugin (or an out-of-tree plugin) could trip
// a hard error instead of falling back.
func TestRemoteIaC_OptionalDriftConfigDetector_FallsBackOnLegacyPlugin(t *testing.T) {
si := &multiMethodStubInvoker{
// Method not found = canonical signal a legacy plugin emits when
// it doesn't have a case for this RPC name.
errByMethod: map[string]error{
"IaCProvider.DetectDriftWithApplied": errors.New("method not found: IaCProvider.DetectDriftWithApplied"),
},
// Legacy DetectDrift returns success.
respByMethod: map[string]map[string]any{
"IaCProvider.DetectDrift": {
"drifts": []any{map[string]any{"name": "x", "type": "infra.test", "drifted": false, "class": "in-sync"}},
},
},
}
p := &remoteIaCProvider{invoker: si}

// Caller-side type-assertion: remoteIaCProvider implements
// DetectDriftWithApplied (it always does, since the wfctl-side wrapper
// ALWAYS exposes the method). The fallback happens INSIDE the wrapper:
// if the remote returns method-not-found, the wrapper retries with
// legacy DetectDrift.
refs := []interfaces.ResourceRef{{Name: "x", Type: "infra.test"}}
drifts, err := p.DetectDriftWithApplied(context.Background(), refs, nil)
if err != nil {
t.Fatalf("DetectDriftWithApplied should NOT propagate method-not-found; should fall back. err=%v", err)
}
if len(drifts) != 1 || drifts[0].Class != interfaces.DriftClassInSync {
t.Errorf("expected fallback InSync drift; got %+v", drifts)
}

// Verify the second call to invoker WAS the legacy method.
if !si.calledMethods["IaCProvider.DetectDrift"] {
t.Errorf("expected fallback to call IaCProvider.DetectDrift; called methods: %v", si.calledMethods)
}
}

// multiMethodStubInvoker is a test double for remoteServiceInvoker that supports
// per-method response and error configuration (unlike the basic stubInvoker
// which records only a single method/resp/err).
type multiMethodStubInvoker struct {
calledMethods map[string]bool
respByMethod map[string]map[string]any
errByMethod map[string]error
}

func (s *multiMethodStubInvoker) InvokeService(method string, args map[string]any) (map[string]any, error) {
if s.calledMethods == nil {
s.calledMethods = map[string]bool{}
}
s.calledMethods[method] = true
if err, ok := s.errByMethod[method]; ok && err != nil {
return nil, err
}
if resp, ok := s.respByMethod[method]; ok {
return resp, nil
}
return nil, nil
}
27 changes: 27 additions & 0 deletions cmd/wfctl/deploy_providers_remote_iac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,33 @@ func TestRemoteIaC_DetectDrift_Empty(t *testing.T) {
}
}

// ── DetectDriftWithApplied ─────────────────────────────────────────────────────

func TestRemoteIaC_DetectDriftWithApplied_HappyPath(t *testing.T) {
si := &stubInvoker{resp: map[string]any{
"drifts": []any{map[string]any{
"name": "x",
"type": "infra.test",
"drifted": true,
"class": "config",
"fields": []any{"region"},
}},
}}
p := newIaCProvider(si)
refs := []interfaces.ResourceRef{{Name: "x", Type: "infra.test"}}
applied := map[string]map[string]any{"x": {"region": "nyc1"}}
drifts, err := p.DetectDriftWithApplied(context.Background(), refs, applied)
if err != nil {
t.Fatalf("DetectDriftWithApplied: %v", err)
}
if si.method != "IaCProvider.DetectDriftWithApplied" {
t.Errorf("method: got %q, want IaCProvider.DetectDriftWithApplied", si.method)
}
if len(drifts) != 1 || drifts[0].Class != interfaces.DriftClassConfig {
t.Errorf("drifts: %+v", drifts)
}
}

// ── Import ────────────────────────────────────────────────────────────────────

func TestRemoteIaC_Import(t *testing.T) {
Expand Down
75 changes: 39 additions & 36 deletions cmd/wfctl/infra_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,18 +490,19 @@ func applyWithProviderAndStore(ctx context.Context, provider interfaces.IaCProvi

now := time.Now().UTC()
rs := interfaces.ResourceState{
ID: r.Name,
Name: r.Name,
Type: r.Type,
Provider: providerType,
ProviderRef: providerRef,
ProviderID: r.ProviderID,
ConfigHash: configHashMap(appliedCfg),
AppliedConfig: appliedCfg,
Outputs: r.Outputs,
Dependencies: dependencies,
CreatedAt: now,
UpdatedAt: now,
ID: r.Name,
Name: r.Name,
Type: r.Type,
Provider: providerType,
ProviderRef: providerRef,
ProviderID: r.ProviderID,
ConfigHash: configHashMap(appliedCfg),
AppliedConfig: appliedCfg,
AppliedConfigSource: "apply",
Outputs: r.Outputs,
Dependencies: dependencies,
CreatedAt: now,
UpdatedAt: now,
}
if saveErr := store.SaveResource(ctx, rs); saveErr != nil {
return fmt.Errorf("%s/%s: persist state after apply: %w", r.Type, r.Name, saveErr)
Expand Down Expand Up @@ -639,18 +640,19 @@ func resourceStateFromLiveOutput(spec interfaces.ResourceSpec, providerType stri
appliedConfig := liveConfigFromOutputs(live.Outputs)
now := time.Now().UTC()
return interfaces.ResourceState{
ID: spec.Name,
Name: spec.Name,
Type: spec.Type,
Provider: providerType,
ProviderRef: resourceSpecProviderRef(spec),
ProviderID: live.ProviderID,
ConfigHash: configHashMap(appliedConfig),
AppliedConfig: appliedConfig,
Outputs: cloneMap(live.Outputs),
Dependencies: append([]string(nil), spec.DependsOn...),
CreatedAt: now,
UpdatedAt: now,
ID: spec.Name,
Name: spec.Name,
Type: spec.Type,
Provider: providerType,
ProviderRef: resourceSpecProviderRef(spec),
ProviderID: live.ProviderID,
ConfigHash: configHashMap(appliedConfig),
AppliedConfig: appliedConfig,
AppliedConfigSource: "adoption",
Outputs: cloneMap(live.Outputs),
Dependencies: append([]string(nil), spec.DependsOn...),
CreatedAt: now,
UpdatedAt: now,
}, nil
}

Expand Down Expand Up @@ -959,18 +961,19 @@ func applyPrecomputedPlanWithStore(ctx context.Context, plan interfaces.IaCPlan,

now := time.Now().UTC()
rs := interfaces.ResourceState{
ID: r.Name,
Name: r.Name,
Type: r.Type,
Provider: providerType,
ProviderRef: providerRef,
ProviderID: r.ProviderID,
ConfigHash: configHashMap(appliedCfg),
AppliedConfig: appliedCfg,
Outputs: r.Outputs,
Dependencies: dependencies,
CreatedAt: now,
UpdatedAt: now,
ID: r.Name,
Name: r.Name,
Type: r.Type,
Provider: providerType,
ProviderRef: providerRef,
ProviderID: r.ProviderID,
ConfigHash: configHashMap(appliedCfg),
AppliedConfig: appliedCfg,
AppliedConfigSource: "apply",
Outputs: r.Outputs,
Dependencies: dependencies,
CreatedAt: now,
UpdatedAt: now,
}
if saveErr := store.SaveResource(ctx, rs); saveErr != nil {
return fmt.Errorf("%s/%s: persist state after apply: %w", r.Type, r.Name, saveErr)
Expand Down
16 changes: 15 additions & 1 deletion cmd/wfctl/infra_apply_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,21 @@ func runInfraApplyRefreshPhase(
return nil
}

results, err := provider.DetectDrift(ctx, refs)
// Use DriftConfigDetector when the provider supports it (optional interface).
// Short-circuits to legacy DetectDrift when appliedMap is nil (no "apply"-
// provenance entries available) to avoid unnecessary RPC round-trips.
var results []interfaces.DriftResult
var err error
if d, ok := provider.(interfaces.DriftConfigDetector); ok {
appliedMap := buildAppliedSpecMap(states, refs)
if appliedMap != nil {
results, err = d.DetectDriftWithApplied(ctx, refs, appliedMap)
} else {
results, err = provider.DetectDrift(ctx, refs)
}
} else {
results, err = provider.DetectDrift(ctx, refs)
}
Comment on lines +49 to +63
if err != nil {
// Transient or auth error — propagate; do NOT prune anything.
return fmt.Errorf("detect drift: %w", err)
Expand Down
76 changes: 76 additions & 0 deletions cmd/wfctl/infra_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,3 +1592,79 @@ modules:
t.Errorf("stderr = %q, want it to contain 'warning'", stderrOutput)
}
}

// ── TestApply_StateRecordsAppliedConfigSource* ─────────────────────────────────

// TestApply_StateRecordsAppliedConfigSourceApply asserts that applyWithProviderAndStore
// writes AppliedConfigSource="apply" on successful resource creation.
func TestApply_StateRecordsAppliedConfigSourceApply(t *testing.T) {
spec := interfaces.ResourceSpec{
Name: "res-A",
Type: "infra.test",
Config: map[string]any{"k": "v"},
}
fake := &stateReturningProvider{
applyResult: &interfaces.ApplyResult{
Resources: []interfaces.ResourceOutput{
{Name: "res-A", Type: "infra.test", ProviderID: "id-A", Outputs: map[string]any{"k": "v"}},
},
},
}
store := &fakeStateStore{}

if err := applyWithProviderAndStore(t.Context(), fake, "test", []interfaces.ResourceSpec{spec}, nil, store, io.Discard, ""); err != nil {
t.Fatalf("apply: %v", err)
}

store.mu.Lock()
defer store.mu.Unlock()
if len(store.saved) != 1 {
t.Fatalf("expected 1 saved state; got %d", len(store.saved))
}
saved := store.saved[0]
if saved.AppliedConfigSource != "apply" {
t.Errorf("AppliedConfigSource: got %q, want %q", saved.AppliedConfigSource, "apply")
}
}

// TestAdoption_StateRecordsAppliedConfigSourceAdoption asserts that state saved
// via the adoption path (adoptExistingResources) records AppliedConfigSource="adoption".
func TestAdoption_StateRecordsAppliedConfigSourceAdoption(t *testing.T) {
spec := interfaces.ResourceSpec{
Name: "site-dns",
Type: "infra.dns",
Config: map[string]any{"provider": "do-provider", "domain": "example.com"},
}
driver := &readDriver{
expectedProviderID: "example.com",
readOut: &interfaces.ResourceOutput{
Name: "site-dns",
Type: "infra.dns",
ProviderID: "do-domain-123",
Outputs: map[string]any{"domain": "example.com"},
},
}
provider := &readBackedProvider{driver: driver}
store := &fakeStateStore{}

if err := applyWithProviderAndStore(t.Context(), provider, "digitalocean", []interfaces.ResourceSpec{spec}, nil, store, io.Discard, ""); err != nil {
t.Fatalf("adopt: %v", err)
}

store.mu.Lock()
defer store.mu.Unlock()
// The first saved state is from adoption; subsequent saves are from apply.
var adoptedState *interfaces.ResourceState
for i := range store.saved {
if store.saved[i].Name == "site-dns" && store.saved[i].AppliedConfigSource == "adoption" {
adoptedState = &store.saved[i]
break
}
}
if adoptedState == nil {
t.Fatalf("expected adopted state with AppliedConfigSource=adoption; saved=%+v", store.saved)
}
if adoptedState.AppliedConfigSource != "adoption" {
t.Errorf("AppliedConfigSource: got %q, want %q", adoptedState.AppliedConfigSource, "adoption")
}
}
Loading
Loading