Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
345 changes: 335 additions & 10 deletions iac/providerclient/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ const (
// IaCServiceRunner is the gRPC service name for the optional
// IaCProviderRunner service.
IaCServiceRunner = "workflow.plugin.external.iac.IaCProviderRunner"
// IaCServiceResourceDriver is the gRPC service name for the optional
// ResourceDriver service. When advertised, Adapter.ResourceDriver(type)
// returns a per-resource-type bridge that routes Create/Read/Update/Delete/
// Diff/HealthCheck/Scale/SensitiveKeys through the plugin's gRPC process.
IaCServiceResourceDriver = "workflow.plugin.external.iac.ResourceDriver"
)

// RegionListerProvider is a capability-discovery interface implemented by
Expand Down Expand Up @@ -187,6 +192,310 @@ func (r *runnerAdapter) JobLogs(ctx context.Context, handle interfaces.JobHandle
}
}

// ResourceDriverProvider is a capability-discovery interface implemented by
// *Adapter. Steps that need per-action CRUD type-assert the registered
// interfaces.IaCProvider to ResourceDriverProvider and call
// ResourceDriver(resourceType). The accessor is always present on *Adapter;
// absence of the underlying gRPC service is signalled by the returned
// ErrProviderMethodUnimplemented error (NOT a nil first return — when the
// service IS advertised the bridge is returned, and when it is NOT the error is
// returned with a nil driver). Callers use errors.Is to detect absence and skip
// or fall back.
type ResourceDriverProvider interface {
// ResourceDriver returns the per-resource-type CRUD driver for resourceType,
// or a nil driver wrapping ErrProviderMethodUnimplemented when the plugin did
// not advertise the ResourceDriver service.
ResourceDriver(resourceType string) (interfaces.ResourceDriver, error)
}

// resourceDriverAdapter wraps pb.ResourceDriverClient and satisfies
// interfaces.ResourceDriver for a specific resource type. It is the concrete
// type underlying all objects returned by Adapter.ResourceDriver(). The
// resource_type field is carried on every RPC so the plugin can route to the
// correct per-type implementation (the DO plugin's 14-driver pattern).
type resourceDriverAdapter struct {
client pb.ResourceDriverClient
resourceType string
}

// Create calls ResourceDriver.Create with JSON-encoded spec.Config.
func (r *resourceDriverAdapter) Create(ctx context.Context, spec interfaces.ResourceSpec) (*interfaces.ResourceOutput, error) {
pbSpec, err := specToPB(spec)
if err != nil {
return nil, fmt.Errorf("providerclient: encode Create spec: %w", err)
}
resp, err := r.client.Create(ctx, &pb.ResourceCreateRequest{
ResourceType: r.resourceType,
Spec: pbSpec,
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "Create")
}
return resourceOutputFromPB(resp.GetOutput())
}

// Read calls ResourceDriver.Read with the resource ref.
func (r *resourceDriverAdapter) Read(ctx context.Context, ref interfaces.ResourceRef) (*interfaces.ResourceOutput, error) {
resp, err := r.client.Read(ctx, &pb.ResourceReadRequest{
ResourceType: r.resourceType,
Ref: refToPB(ref),
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "Read")
}
return resourceOutputFromPB(resp.GetOutput())
}

// Update calls ResourceDriver.Update with the resource ref and desired spec.
func (r *resourceDriverAdapter) Update(ctx context.Context, ref interfaces.ResourceRef, spec interfaces.ResourceSpec) (*interfaces.ResourceOutput, error) {
pbSpec, err := specToPB(spec)
if err != nil {
return nil, fmt.Errorf("providerclient: encode Update spec: %w", err)
}
resp, err := r.client.Update(ctx, &pb.ResourceUpdateRequest{
ResourceType: r.resourceType,
Ref: refToPB(ref),
Spec: pbSpec,
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "Update")
}
return resourceOutputFromPB(resp.GetOutput())
}

// Delete calls ResourceDriver.Delete with the resource ref.
func (r *resourceDriverAdapter) Delete(ctx context.Context, ref interfaces.ResourceRef) error {
_, err := r.client.Delete(ctx, &pb.ResourceDeleteRequest{
ResourceType: r.resourceType,
Ref: refToPB(ref),
})
if err != nil {
return mapResourceDriverGRPCError(err, "Delete")
}
return nil
}

// Diff calls ResourceDriver.Diff with the desired spec and current output.
func (r *resourceDriverAdapter) Diff(ctx context.Context, desired interfaces.ResourceSpec, current *interfaces.ResourceOutput) (*interfaces.DiffResult, error) {
pbSpec, err := specToPB(desired)
if err != nil {
return nil, fmt.Errorf("providerclient: encode Diff desired spec: %w", err)
}
pbCurrent, err := resourceOutputToPB(current)
if err != nil {
return nil, fmt.Errorf("providerclient: encode Diff current output: %w", err)
}
resp, err := r.client.Diff(ctx, &pb.ResourceDiffRequest{
ResourceType: r.resourceType,
Desired: pbSpec,
Current: pbCurrent,
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "Diff")
}
return diffResultFromPB(resp.GetResult())
}

// HealthCheck calls ResourceDriver.HealthCheck with the resource ref.
func (r *resourceDriverAdapter) HealthCheck(ctx context.Context, ref interfaces.ResourceRef) (*interfaces.HealthResult, error) {
resp, err := r.client.HealthCheck(ctx, &pb.ResourceHealthCheckRequest{
ResourceType: r.resourceType,
Ref: refToPB(ref),
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "HealthCheck")
}
return healthResultFromPB(resp.GetResult()), nil
}

// Scale calls ResourceDriver.Scale with the resource ref and replica count.
func (r *resourceDriverAdapter) Scale(ctx context.Context, ref interfaces.ResourceRef, replicas int) (*interfaces.ResourceOutput, error) {
// Reject out-of-int32-range replica counts explicitly rather than silently
// saturating (mirrors cmd/wfctl typedResourceDriver.Scale) — a clamp could
// trigger an unintended scale operation.
if replicas < math.MinInt32 || replicas > math.MaxInt32 {
return nil, fmt.Errorf("providerclient %s: scale replicas %d out of int32 range", r.resourceType, replicas)
}
resp, err := r.client.Scale(ctx, &pb.ResourceScaleRequest{
ResourceType: r.resourceType,
Ref: refToPB(ref),
Replicas: int32(replicas), //nolint:gosec // G115: range-checked above
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "Scale")
}
return resourceOutputFromPB(resp.GetOutput())
}
Comment thread
intel352 marked this conversation as resolved.

// SensitiveKeys calls ResourceDriver.SensitiveKeys (no ctx — mirrors the
// interfaces.ResourceDriver contract which is a non-context accessor).
func (r *resourceDriverAdapter) SensitiveKeys() []string {
resp, err := r.client.SensitiveKeys(context.Background(), &pb.SensitiveKeysRequest{
ResourceType: r.resourceType,
})
if err != nil {
// Non-fatal — log and return empty; callers must tolerate nil/empty.
log.Printf("providerclient: ResourceDriver.SensitiveKeys(%s) RPC failed: %v", r.resourceType, err)
return nil
}
return append([]string(nil), resp.GetKeys()...)
}

// Troubleshoot satisfies the optional interfaces.Troubleshooter. wfctl calls it
// (via troubleshootAfterFailure's driver.(interfaces.Troubleshooter) assert) when
// a health-check poll times out or a deploy returns a generic error, surfacing
// provider-side diagnostics. gRPC Unimplemented (the legitimate negative signal
// when the plugin's driver does not implement Troubleshoot) is translated to
// interfaces.ErrProviderMethodUnimplemented so callers can errors.Is and fall
// back to the original failure message. Mirrors typedResourceDriver.Troubleshoot.
func (r *resourceDriverAdapter) Troubleshoot(ctx context.Context, ref interfaces.ResourceRef, failureMsg string) ([]interfaces.Diagnostic, error) {
resp, err := r.client.Troubleshoot(ctx, &pb.TroubleshootRequest{
ResourceType: r.resourceType,
Ref: refToPB(ref),
FailureMsg: failureMsg,
})
if err != nil {
return nil, mapResourceDriverGRPCError(err, "Troubleshoot")
}
return diagnosticsFromPB(resp.GetDiagnostics()), nil
}

// mapResourceDriverGRPCError translates well-known gRPC status codes to engine
// sentinel errors so callers can use errors.Is for typed identification.
//
// Each mapped case wraps with %w/%w (sentinel + original gRPC error), NOT %w/%v,
// so callers recover BOTH the interfaces sentinel via errors.Is AND the
// underlying gRPC status via status.FromError walking the unwrap chain (mirrors
// cmd/wfctl/iac_typed_adapter.go translateRPCErr's load-bearing comment). With
// %v the status code/details get demoted to a flat string and consumers that
// classify by code (rate-limit retry, transient backoff, etc.) lose the signal.
func mapResourceDriverGRPCError(err error, method string) error {
if err == nil {
return nil
}
switch status.Code(err) {
case codes.NotFound:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrResourceNotFound, method, err)
case codes.AlreadyExists:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrResourceAlreadyExists, method, err)
case codes.ResourceExhausted:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrRateLimited, method, err)
case codes.Unavailable, codes.DeadlineExceeded:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrTransient, method, err)
case codes.Unauthenticated:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrUnauthorized, method, err)
case codes.PermissionDenied:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrForbidden, method, err)
case codes.InvalidArgument, codes.FailedPrecondition:
return fmt.Errorf("%w: ResourceDriver.%s: %w", interfaces.ErrValidation, method, err)
case codes.Unimplemented:
return fmt.Errorf("%w: ResourceDriver.%s not implemented by plugin: %w", interfaces.ErrProviderMethodUnimplemented, method, err)
default:
// Preserve attribution (method name) while keeping the gRPC status
// observable through the unwrap chain.
return fmt.Errorf("ResourceDriver.%s: %w", method, err)
}
}

// resourceOutputFromPB converts a proto ResourceOutput to interfaces.ResourceOutput.
func resourceOutputFromPB(o *pb.ResourceOutput) (*interfaces.ResourceOutput, error) {
if o == nil {
return nil, nil
}
outputs, err := unmarshalJSONMap(o.GetOutputsJson())
if err != nil {
return nil, fmt.Errorf("providerclient: unmarshal ResourceOutput.outputs_json: %w", err)
}
sensitive := make(map[string]bool, len(o.GetSensitive()))
for k, v := range o.GetSensitive() {
sensitive[k] = v
}
return &interfaces.ResourceOutput{
Name: o.GetName(),
Type: o.GetType(),
ProviderID: o.GetProviderId(),
Outputs: outputs,
Sensitive: sensitive,
Status: o.GetStatus(),
}, nil
}

// resourceOutputToPB converts an interfaces.ResourceOutput to proto ResourceOutput.
func resourceOutputToPB(o *interfaces.ResourceOutput) (*pb.ResourceOutput, error) {
if o == nil {
return nil, nil
}
outputsJSON, err := marshalJSONMap(o.Outputs)
if err != nil {
return nil, err
}
sensitive := make(map[string]bool, len(o.Sensitive))
for k, v := range o.Sensitive {
sensitive[k] = v
}
return &pb.ResourceOutput{
Name: o.Name,
Type: o.Type,
ProviderId: o.ProviderID,
OutputsJson: outputsJSON,
Sensitive: sensitive,
Status: o.Status,
}, nil
}

// diffResultFromPB converts a proto DiffResult to interfaces.DiffResult.
func diffResultFromPB(d *pb.DiffResult) (*interfaces.DiffResult, error) {
if d == nil {
return nil, nil
}
changes, err := changesFromPB(d.GetChanges())
if err != nil {
return nil, err
}
return &interfaces.DiffResult{
NeedsUpdate: d.GetNeedsUpdate(),
NeedsReplace: d.GetNeedsReplace(),
Changes: changes,
}, nil
}

// healthResultFromPB converts a proto HealthResult to interfaces.HealthResult.
func healthResultFromPB(h *pb.HealthResult) *interfaces.HealthResult {
if h == nil {
return nil
}
return &interfaces.HealthResult{
Healthy: h.GetHealthy(),
Message: h.GetMessage(),
}
}

// diagnosticsFromPB converts []*pb.Diagnostic to []interfaces.Diagnostic
// (Troubleshooter results). Mirrors typedResourceDriver.Troubleshoot's mapping.
func diagnosticsFromPB(diags []*pb.Diagnostic) []interfaces.Diagnostic {
out := make([]interfaces.Diagnostic, 0, len(diags))
for _, d := range diags {
out = append(out, interfaces.Diagnostic{
ID: d.GetId(),
Phase: d.GetPhase(),
Cause: d.GetCause(),
At: timeFromPB(d.GetAt()),
Detail: d.GetDetail(),
})
}
return out
}

// Compile-time guards: resourceDriverAdapter must satisfy interfaces.ResourceDriver
// (the required per-action CRUD surface) AND interfaces.Troubleshooter (the
// optional diagnostics surface troubleshootAfterFailure type-asserts). Mirrors
// the typedResourceDriver guards in cmd/wfctl/iac_typed_adapter.go; they catch
// signature drift between the interface and this bridge at compile time.
var (
_ interfaces.ResourceDriver = (*resourceDriverAdapter)(nil)
_ interfaces.Troubleshooter = (*resourceDriverAdapter)(nil)
)

// Adapter wraps the pb.IaCProviderRequired gRPC client (and advertisement-gated
// optional clients) as interfaces.IaCProvider. Optional sub-interfaces
// (IaCProviderRegionLister, DriftConfigDetector) are exposed via typed accessors
Expand All @@ -202,11 +511,12 @@ func (r *runnerAdapter) JobLogs(ctx context.Context, handle interfaces.JobHandle
//
// Compile-time guards are in adapter_test.go.
type Adapter struct {
conn grpc.ClientConnInterface
required pb.IaCProviderRequiredClient
regionLister *regionListerImpl // nil when IaCServiceRegionLister not advertised
drift *driftDetectorAdapter // nil when IaCServiceDriftDetector not advertised
runner *runnerAdapter // nil when IaCServiceRunner not advertised
conn grpc.ClientConnInterface
required pb.IaCProviderRequiredClient
regionLister *regionListerImpl // nil when IaCServiceRegionLister not advertised
drift *driftDetectorAdapter // nil when IaCServiceDriftDetector not advertised
runner *runnerAdapter // nil when IaCServiceRunner not advertised
resourceDriver pb.ResourceDriverClient // nil when IaCServiceResourceDriver not advertised

// Capabilities cache. Populated on first call to fetchCapabilities via
// capsOnce; reused for the adapter's lifetime (capabilities don't change
Expand Down Expand Up @@ -264,6 +574,9 @@ func New(conn grpc.ClientConnInterface, advertisedServices map[string]bool) *Ada
if advertisedServices[IaCServiceRunner] {
a.runner = &runnerAdapter{client: pb.NewIaCProviderRunnerClient(conn)}
}
if advertisedServices[IaCServiceResourceDriver] {
a.resourceDriver = pb.NewResourceDriverClient(conn)
}
return a
}

Expand Down Expand Up @@ -441,12 +754,24 @@ func (a *Adapter) ResolveSizing(resourceType string, size interfaces.Size, hints
return sizingFromPB(resp.GetSizing())
}

// ResourceDriver returns an error — the full ResourceDriver optional service is
// not in PR-1 scope. Steps that need per-action CRUD use wfctlhelpers.ApplyPlanWithHooks
// which dispatches through the provider's ResourceDriver on the plugin side.
// ResourceDriver returns a per-resource-type driver bridged over the optional
// ResourceDriver gRPC service. When the plugin advertised IaCServiceResourceDriver,
// the returned driver routes Create/Read/Update/Delete/Diff/HealthCheck/Scale/
// SensitiveKeys through the plugin's gRPC process carrying resource_type on every
// RPC (the plugin dispatches internally to the correct per-type driver, mirroring
// the DO plugin's 14-driver type-routing pattern). When the service was not
// advertised, returns ErrProviderMethodUnimplemented so callers can errors.Is-check
// and skip or fall back appropriately — matching the DetectDrift/RegionLister
// precedent for other optional services.
func (a *Adapter) ResourceDriver(resourceType string) (interfaces.ResourceDriver, error) {
return nil, fmt.Errorf("%w: ResourceDriver optional service not wired in PR-1 adapter",
interfaces.ErrProviderMethodUnimplemented)
if a.resourceDriver == nil {
return nil, fmt.Errorf("%w: ResourceDriver service not advertised by plugin",
interfaces.ErrProviderMethodUnimplemented)
}
return &resourceDriverAdapter{
client: a.resourceDriver,
resourceType: resourceType,
}, nil
}

// SupportedCanonicalKeys returns the canonical keys from the cached
Expand Down
Loading
Loading