From 788edc48d69606bd0b71baa9d7d384c70b27baa2 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 25 May 2026 05:18:48 -0400 Subject: [PATCH] feat: map observability requirements for aws --- .github/workflows/ci.yml | 2 +- .github/workflows/release.yml | 2 +- .goreleaser.yaml | 1 + cmd/workflow-plugin-aws/main.go | 15 +- cmd/workflow-plugin-aws/plugin.json | 82 ++++++++ drivers/ecs.go | 108 ++++++++++- drivers/ecs_test.go | 81 +++++++- go.mod | 2 +- go.sum | 2 + internal/iacserver.go | 6 +- internal/iacserver_mapper.go | 291 ++++++++++++++++++++++++++++ internal/iacserver_mapper_test.go | 224 +++++++++++++++++++++ plugin.json | 4 +- 13 files changed, 794 insertions(+), 26 deletions(-) create mode 100644 cmd/workflow-plugin-aws/plugin.json create mode 100644 internal/iacserver_mapper.go create mode 100644 internal/iacserver_mapper_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8ff08e2..0bcc6e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,4 +44,4 @@ jobs: go-version-file: go.mod - name: Validate strict plugin contracts - run: go run github.com/GoCodeAlone/workflow/cmd/wfctl@v0.61.0 plugin validate --file plugin.json --strict-contracts + run: go run github.com/GoCodeAlone/workflow/cmd/wfctl@v0.64.3 plugin validate --file plugin.json --strict-contracts diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5a39692..6326f99 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,7 +25,7 @@ jobs: - uses: GoCodeAlone/setup-wfctl@v1 with: - version: v0.64.0 + version: v0.64.3 - name: Validate plugin contract for publish (pre-build) run: wfctl plugin validate-contract --for-publish --tag "${{ github.ref_name }}" . diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 55a4a8d..8747260 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -4,6 +4,7 @@ project_name: workflow-plugin-aws before: hooks: + - "sh -c \"cp plugin.json cmd/workflow-plugin-aws/plugin.json\"" - "sh -c \"rm -rf .release && mkdir -p .release && cp plugin.json .release/plugin.json && cp plugin.contracts.json .release/plugin.contracts.json && sed -i.bak 's/\\\"version\\\": \\\".*\\\"/\\\"version\\\": \\\"{{ .Version }}\\\"/' .release/plugin.json && rm -f .release/plugin.json.bak\"" - "sh -c \"sed -i.bak 's|/releases/download/v[^/]*/|/releases/download/{{ .Tag }}/|g' .release/plugin.json && rm -f .release/plugin.json.bak\"" - "sh -c \"export GOPRIVATE=github.com/GoCodeAlone/*; WFCTL_VERSION=$(GOWORK=off go list -m github.com/GoCodeAlone/workflow | awk '{print $2}') && GOWORK=off go run github.com/GoCodeAlone/workflow/cmd/wfctl@${WFCTL_VERSION} plugin validate --file .release/plugin.json --strict-contracts\"" diff --git a/cmd/workflow-plugin-aws/main.go b/cmd/workflow-plugin-aws/main.go index 9c4f9d4..aa53421 100644 --- a/cmd/workflow-plugin-aws/main.go +++ b/cmd/workflow-plugin-aws/main.go @@ -11,14 +11,23 @@ package main import ( + _ "embed" + "github.com/GoCodeAlone/workflow-plugin-aws/internal" sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" ) +// pluginJSON is copied from the repository root by GoReleaser before builds +// and is committed for local builds/tests. +// +//go:embed plugin.json +var pluginJSON []byte + func main() { sdk.ServeIaCPlugin(internal.NewIaCServer(), sdk.IaCServeOptions{ - Modules: internal.ModuleProviders(), - Steps: internal.StepProviders(), - BuildVersion: sdk.ResolveBuildVersion(internal.Version), + ManifestProvider: sdk.MustEmbedManifest(pluginJSON), + Modules: internal.ModuleProviders(), + Steps: internal.StepProviders(), + BuildVersion: sdk.ResolveBuildVersion(internal.Version), }) } diff --git a/cmd/workflow-plugin-aws/plugin.json b/cmd/workflow-plugin-aws/plugin.json new file mode 100644 index 0000000..b31ea0b --- /dev/null +++ b/cmd/workflow-plugin-aws/plugin.json @@ -0,0 +1,82 @@ +{ + "name": "workflow-plugin-aws", + "version": "0.0.0", + "author": "GoCodeAlone", + "description": "AWS provider plugin for workflow IaC — manages ECS, EKS, RDS, ElastiCache, VPC, ALB, Route53, ECR, API Gateway, Security Groups, IAM, S3, ACM, and AutoScaling Group resources", + "license": "MIT", + "type": "external", + "tier": "community", + "minEngineVersion": "0.64.3", + "iacServices": [ + "workflow.plugin.external.iac.IaCProviderRequired", + "workflow.plugin.external.iac.IaCProviderEnumerator", + "workflow.plugin.external.iac.IaCProviderDriftDetector", + "workflow.plugin.external.iac.IaCProviderCredentialRevoker", + "workflow.plugin.external.iac.IaCProviderMigrationRepairer", + "workflow.plugin.external.iac.IaCProviderValidator", + "workflow.plugin.external.iac.IaCProviderDriftConfigDetector", + "workflow.plugin.external.iac.IaCProviderRequirementMapper", + "workflow.plugin.external.iac.ResourceDriver", + "workflow.plugin.external.iac.IaCStateBackend" + ], + "keywords": [ + "aws", + "iac", + "infrastructure", + "ecs", + "eks", + "rds", + "vpc", + "s3", + "autoscaling" + ], + "homepage": "https://github.com/GoCodeAlone/workflow-plugin-aws", + "repository": "https://github.com/GoCodeAlone/workflow-plugin-aws", + "capabilities": { + "configProvider": false, + "moduleTypes": [ + "iac.provider", + "aws.credentials", + "storage.s3" + ], + "stepTypes": [ + "step.s3_upload" + ], + "triggerTypes": [], + "iacStateBackends": [ + "s3" + ] + }, + "downloads": [ + { + "os": "linux", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-aws/releases/download/v0.0.0/workflow-plugin-aws-linux-amd64.tar.gz" + }, + { + "os": "linux", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-aws/releases/download/v0.0.0/workflow-plugin-aws-linux-arm64.tar.gz" + }, + { + "os": "darwin", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-aws/releases/download/v0.0.0/workflow-plugin-aws-darwin-amd64.tar.gz" + }, + { + "os": "darwin", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-aws/releases/download/v0.0.0/workflow-plugin-aws-darwin-arm64.tar.gz" + }, + { + "os": "windows", + "arch": "amd64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-aws/releases/download/v0.0.0/workflow-plugin-aws-windows-amd64.tar.gz" + }, + { + "os": "windows", + "arch": "arm64", + "url": "https://github.com/GoCodeAlone/workflow-plugin-aws/releases/download/v0.0.0/workflow-plugin-aws-windows-arm64.tar.gz" + } + ] +} diff --git a/drivers/ecs.go b/drivers/ecs.go index c39ab24..a5a67b5 100644 --- a/drivers/ecs.go +++ b/drivers/ecs.go @@ -3,6 +3,7 @@ package drivers import ( "context" "fmt" + "sort" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/ecs" @@ -61,6 +62,10 @@ func (d *ECSDriver) Create(ctx context.Context, spec interfaces.ResourceSpec) (* } replicas := int32(intProp(spec.Config, "replicas", 1)) + containerDefinitions := []ecstypes.ContainerDefinition{ + containerDefinitionFromConfig(spec.Name, spec.Config), + } + // Register task definition tdIn := &ecs.RegisterTaskDefinitionInput{ Family: awssdk.String(spec.Name), @@ -68,13 +73,7 @@ func (d *ECSDriver) Create(ctx context.Context, spec interfaces.ResourceSpec) (* NetworkMode: ecstypes.NetworkModeAwsvpc, Cpu: awssdk.String(cpu), Memory: awssdk.String(memory), - ContainerDefinitions: []ecstypes.ContainerDefinition{ - { - Name: awssdk.String(spec.Name), - Image: awssdk.String(image), - Essential: awssdk.Bool(true), - }, - }, + ContainerDefinitions: containerDefinitions, } tdOut, err := d.client.RegisterTaskDefinition(ctx, tdIn) if err != nil { @@ -148,9 +147,7 @@ func (d *ECSDriver) Update(ctx context.Context, ref interfaces.ResourceRef, spec NetworkMode: ecstypes.NetworkModeAwsvpc, Cpu: awssdk.String(cpu), Memory: awssdk.String(mem), - ContainerDefinitions: []ecstypes.ContainerDefinition{ - {Name: awssdk.String(ref.Name), Image: awssdk.String(image), Essential: awssdk.Bool(true)}, - }, + ContainerDefinitions: []ecstypes.ContainerDefinition{containerDefinitionFromConfig(ref.Name, spec.Config)}, }) if err != nil { return nil, fmt.Errorf("ecs: re-register task def %q: %w", ref.Name, err) @@ -165,6 +162,97 @@ func (d *ECSDriver) Update(ctx context.Context, ref interfaces.ResourceRef, spec return ecsServiceToOutput(ref.Name, out.Service), nil } +func containerDefinitionFromConfig(name string, cfg map[string]any) ecstypes.ContainerDefinition { + image, _ := cfg["image"].(string) + def := ecstypes.ContainerDefinition{ + Name: awssdk.String(name), + Image: awssdk.String(image), + Essential: awssdk.Bool(true), + } + if command := stringSliceProp(cfg, "command"); len(command) > 0 { + def.Command = command + } + def.Environment = ecsEnvironment(cfg["env_vars"]) + def.Secrets = ecsSecrets(cfg["env_vars_secret"]) + def.PortMappings = ecsPortMappings(cfg["ports"]) + return def +} + +func ecsEnvironment(raw any) []ecstypes.KeyValuePair { + values := stringMap(raw) + keys := sortedMapKeys(values) + out := make([]ecstypes.KeyValuePair, 0, len(keys)) + for _, key := range keys { + value := values[key] + out = append(out, ecstypes.KeyValuePair{Name: awssdk.String(key), Value: awssdk.String(value)}) + } + return out +} + +func ecsSecrets(raw any) []ecstypes.Secret { + values := stringMap(raw) + keys := sortedMapKeys(values) + out := make([]ecstypes.Secret, 0, len(keys)) + for _, key := range keys { + value := values[key] + out = append(out, ecstypes.Secret{Name: awssdk.String(key), ValueFrom: awssdk.String(value)}) + } + return out +} + +func ecsPortMappings(raw any) []ecstypes.PortMapping { + items, ok := raw.([]any) + if !ok { + return nil + } + out := make([]ecstypes.PortMapping, 0, len(items)) + for _, item := range items { + m, ok := item.(map[string]any) + if !ok { + continue + } + port := intProp(m, "port", 0) + if port <= 0 { + continue + } + protocol := ecstypes.TransportProtocolTcp + if p, _ := m["protocol"].(string); p == "udp" { + protocol = ecstypes.TransportProtocolUdp + } + out = append(out, ecstypes.PortMapping{ + ContainerPort: awssdk.Int32(int32(port)), + Protocol: protocol, + }) + } + return out +} + +func stringMap(raw any) map[string]string { + switch values := raw.(type) { + case map[string]string: + return values + case map[string]any: + out := make(map[string]string, len(values)) + for key, value := range values { + if str, ok := value.(string); ok { + out[key] = str + } + } + return out + default: + return nil + } +} + +func sortedMapKeys(values map[string]string) []string { + keys := make([]string, 0, len(values)) + for key := range values { + keys = append(keys, key) + } + sort.Strings(keys) + return keys +} + func (d *ECSDriver) Delete(ctx context.Context, ref interfaces.ResourceRef) error { // Scale to 0 first _, _ = d.client.UpdateService(ctx, &ecs.UpdateServiceInput{ diff --git a/drivers/ecs_test.go b/drivers/ecs_test.go index 4c9f00f..7b59e2b 100644 --- a/drivers/ecs_test.go +++ b/drivers/ecs_test.go @@ -15,6 +15,7 @@ import ( type mockECSClient struct { registerOut *ecs.RegisterTaskDefinitionOutput + registerIn *ecs.RegisterTaskDefinitionInput registerErr error createSvcOut *ecs.CreateServiceOutput createSvcErr error @@ -27,7 +28,8 @@ type mockECSClient struct { deregErr error } -func (m *mockECSClient) RegisterTaskDefinition(_ context.Context, _ *ecs.RegisterTaskDefinitionInput, _ ...func(*ecs.Options)) (*ecs.RegisterTaskDefinitionOutput, error) { +func (m *mockECSClient) RegisterTaskDefinition(_ context.Context, in *ecs.RegisterTaskDefinitionInput, _ ...func(*ecs.Options)) (*ecs.RegisterTaskDefinitionOutput, error) { + m.registerIn = in return m.registerOut, m.registerErr } func (m *mockECSClient) CreateService(_ context.Context, _ *ecs.CreateServiceInput, _ ...func(*ecs.Options)) (*ecs.CreateServiceOutput, error) { @@ -56,12 +58,12 @@ func TestECSDriver_Create(t *testing.T) { }, createSvcOut: &ecs.CreateServiceOutput{ Service: &ecstypes.Service{ - ServiceArn: awssdk.String(svcARN), - ServiceName: awssdk.String("my-svc"), - Status: awssdk.String("ACTIVE"), - DesiredCount: 1, - RunningCount: 0, - TaskDefinition: awssdk.String("arn:aws:ecs:us-east-1:123:task-definition/my-svc:1"), + ServiceArn: awssdk.String(svcARN), + ServiceName: awssdk.String("my-svc"), + Status: awssdk.String("ACTIVE"), + DesiredCount: 1, + RunningCount: 0, + TaskDefinition: awssdk.String("arn:aws:ecs:us-east-1:123:task-definition/my-svc:1"), }, }, } @@ -88,6 +90,71 @@ func TestECSDriver_Create(t *testing.T) { } } +func TestECSDriver_Create_CollectorConfig(t *testing.T) { + mock := &mockECSClient{ + registerOut: &ecs.RegisterTaskDefinitionOutput{ + TaskDefinition: &ecstypes.TaskDefinition{ + TaskDefinitionArn: awssdk.String("arn:aws:ecs:us-east-1:123:task-definition/observability-collector:1"), + }, + }, + createSvcOut: &ecs.CreateServiceOutput{ + Service: &ecstypes.Service{ + ServiceArn: awssdk.String("arn:aws:ecs:us-east-1:123:service/default/observability-collector"), + ServiceName: awssdk.String("observability-collector"), + Status: awssdk.String("ACTIVE"), + DesiredCount: 1, + RunningCount: 0, + TaskDefinition: awssdk.String("arn:aws:ecs:us-east-1:123:task-definition/observability-collector:1"), + }, + }, + } + d := drivers.NewECSDriverWithClient(mock, "default") + _, err := d.Create(context.Background(), interfaces.ResourceSpec{ + Name: "observability-collector", + Type: "infra.container_service", + Config: map[string]any{ + "image": "public.ecr.aws/aws-observability/aws-otel-collector:latest", + "command": []any{"--config=env:AOT_CONFIG_CONTENT"}, + "env_vars": map[string]any{ + "AOT_CONFIG_CONTENT": "receivers: {}", + }, + "env_vars_secret": map[string]any{ + "DD_API_KEY": "arn:aws:secretsmanager:us-east-1:123:secret:datadog", + }, + "ports": []any{ + map[string]any{"port": 4317, "protocol": "tcp"}, + map[string]any{"port": 4318, "protocol": "tcp"}, + }, + }, + }) + if err != nil { + t.Fatalf("Create failed: %v", err) + } + if mock.registerIn == nil { + t.Fatal("RegisterTaskDefinition input was not captured") + } + containers := mock.registerIn.ContainerDefinitions + if len(containers) != 1 { + t.Fatalf("container definitions len = %d, want 1", len(containers)) + } + c := containers[0] + if len(c.Command) != 1 { + t.Fatalf("command = %v, want one entry", c.Command) + } + if got := c.Command[0]; got != "--config=env:AOT_CONFIG_CONTENT" { + t.Fatalf("command[0] = %q", got) + } + if len(c.Environment) != 1 || awssdk.ToString(c.Environment[0].Name) != "AOT_CONFIG_CONTENT" { + t.Fatalf("environment = %+v", c.Environment) + } + if len(c.Secrets) != 1 || awssdk.ToString(c.Secrets[0].Name) != "DD_API_KEY" { + t.Fatalf("secrets = %+v", c.Secrets) + } + if len(c.PortMappings) != 2 || awssdk.ToInt32(c.PortMappings[0].ContainerPort) != 4317 { + t.Fatalf("port mappings = %+v", c.PortMappings) + } +} + func TestECSDriver_Create_MissingImage(t *testing.T) { d := drivers.NewECSDriverWithClient(&mockECSClient{}, "default") _, err := d.Create(context.Background(), interfaces.ResourceSpec{ diff --git a/go.mod b/go.mod index 71ca335..3f54a40 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GoCodeAlone/workflow-plugin-aws go 1.26.0 require ( - github.com/GoCodeAlone/workflow v0.64.0 + github.com/GoCodeAlone/workflow v0.64.3 github.com/aws/aws-sdk-go-v2 v1.41.7 github.com/aws/aws-sdk-go-v2/config v1.32.16 github.com/aws/aws-sdk-go-v2/credentials v1.19.15 diff --git a/go.sum b/go.sum index 8a67730..431f7be 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/GoCodeAlone/modular/modules/reverseproxy/v2 v2.8.0 h1:cvdLHbM/vzvygQT github.com/GoCodeAlone/modular/modules/reverseproxy/v2 v2.8.0/go.mod h1:/9ipMG4qM2CHQ14BfXKdVlYRJelef6M8MFI5TbZv67M= github.com/GoCodeAlone/workflow v0.64.0 h1:2CpbYPwIqdGDb3xi3YJpwcteIum4ehBSrnRql/1YvB4= github.com/GoCodeAlone/workflow v0.64.0/go.mod h1:659GGDrw3QJ7b625y9rf8QhKIpt1VCoEG0MxKu5tGQs= +github.com/GoCodeAlone/workflow v0.64.3 h1:r0jMoRJXJI8lz44c70mFjGcpy24IWpOTtkX7BC0/fas= +github.com/GoCodeAlone/workflow v0.64.3/go.mod h1:659GGDrw3QJ7b625y9rf8QhKIpt1VCoEG0MxKu5tGQs= github.com/GoCodeAlone/yaegi v0.17.2 h1:WK6Y6e0t1a6U7r+S2dN3CGWW1PizYD3zO0zneToZPxM= github.com/GoCodeAlone/yaegi v0.17.2/go.mod h1:z5Pr6Wse6QJcQvpgxTxzMAevFarH0N37TG88Y9dprx0= github.com/IBM/sarama v1.47.0 h1:GcQFEd12+KzfPYeLgN69Fh7vLCtYRhVIx0rO4TZO318= diff --git a/internal/iacserver.go b/internal/iacserver.go index 814a01f..be2059f 100644 --- a/internal/iacserver.go +++ b/internal/iacserver.go @@ -47,6 +47,7 @@ type awsIaCServer struct { pb.UnimplementedIaCProviderMigrationRepairerServer pb.UnimplementedIaCProviderValidatorServer pb.UnimplementedIaCProviderDriftConfigDetectorServer + pb.UnimplementedIaCProviderRequirementMapperServer pb.UnimplementedResourceDriverServer pb.UnimplementedIaCStateBackendServer @@ -83,8 +84,9 @@ var ( // IaCProviderDriftDetectorServer requires BOTH DetectDrift AND DetectDriftWithSpecs. // Both are implemented below: DetectDrift is the real check; DetectDriftWithSpecs // delegates to DetectDrift (existence-only behavior; ignores the specs map). - _ pb.IaCProviderDriftDetectorServer = (*awsIaCServer)(nil) - _ pb.ResourceDriverServer = (*awsIaCServer)(nil) + _ pb.IaCProviderDriftDetectorServer = (*awsIaCServer)(nil) + _ pb.ResourceDriverServer = (*awsIaCServer)(nil) + _ pb.IaCProviderRequirementMapperServer = (*awsIaCServer)(nil) // awsIaCServer also SERVES the typed IaC state-backend contract (s3 // backend). The SDK serve hook auto-registers this via type-assertion at // plugin startup — see cmd/workflow-plugin-aws/main.go. diff --git a/internal/iacserver_mapper.go b/internal/iacserver_mapper.go new file mode 100644 index 0000000..81afd7c --- /dev/null +++ b/internal/iacserver_mapper.go @@ -0,0 +1,291 @@ +package internal + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "strings" + + pb "github.com/GoCodeAlone/workflow/plugin/external/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + awsCollectorModuleName = "observability-collector" + awsCollectorImage = "public.ecr.aws/aws-observability/aws-otel-collector:latest" + awsCollectorType = "infra.container_service" +) + +// MapRequirements maps canonical derived-IaC requirements into AWS-owned +// resource shapes. The v1 mapper emits an ECS service running the AWS OTel +// Collector; app sidecar placement can be supplied explicitly by applications +// with the same satisfies keys when a customer needs per-task sidecars. +func (s *awsIaCServer) MapRequirements(_ context.Context, req *pb.MapRequirementsRequest) (*pb.MapRequirementsResponse, error) { + if req.GetProvider() != "" && req.GetProvider() != "aws" { + return nil, status.Errorf(codes.InvalidArgument, "aws mapper cannot satisfy provider %q", req.GetProvider()) + } + + resp := &pb.MapRequirementsResponse{} + var accepted []*pb.IaCRequirement + for _, requirement := range req.GetRequirements() { + switch diag := awsRejectUnsupportedRequirement(req.GetRuntime(), requirement); { + case diag != nil: + resp.Rejected = append(resp.Rejected, diag) + default: + accepted = append(accepted, requirement) + resp.AcceptedKeys = append(resp.AcceptedKeys, requirement.GetKey()) + } + } + if len(accepted) == 0 { + return resp, nil + } + + cfg := awsCollectorModuleConfig(accepted) + configJSON, err := json.Marshal(cfg) + if err != nil { + return nil, fmt.Errorf("aws requirement mapper: encode collector config: %w", err) + } + resp.Modules = append(resp.Modules, &pb.DerivedModuleSpec{ + Name: awsCollectorModuleName, + Type: awsCollectorType, + Satisfies: append([]string(nil), resp.GetAcceptedKeys()...), + ConfigJson: configJSON, + }) + resp.Notes = append(resp.Notes, &pb.RequirementNote{ + Key: strings.Join(resp.GetAcceptedKeys(), ","), + Message: "AWS ECS derivation emits a generic AWS OTel Collector service. Use an explicit infra.container_service module with the same satisfies keys when an application requires collector sidecars in its own task definition.", + Interactive: false, + }) + return resp, nil +} + +func awsRejectUnsupportedRequirement(runtime pb.RequirementRuntime, req *pb.IaCRequirement) *pb.RequirementDiagnostic { + key := req.GetKey() + if req.GetKind() != pb.RequirementKind_REQUIREMENT_KIND_OBSERVABILITY { + return awsRequirementDiagnostic(key, "unsupported_kind", "aws can only derive observability requirements today") + } + if hint := req.GetResourceTypeHint(); hint != "" && hint != awsCollectorType { + return awsRequirementDiagnostic(key, "unsupported_resource_type_hint", + fmt.Sprintf("aws observability derivation emits %s, not %s", awsCollectorType, hint)) + } + if runtime != pb.RequirementRuntime_REQUIREMENT_RUNTIME_ECS { + return awsRequirementDiagnostic(key, "unsupported_runtime", "aws observability derivation currently targets ECS") + } + if !awsRequirementAllowsRuntime(req, runtime) { + return awsRequirementDiagnostic(key, "unsupported_runtime", "requirement does not allow ECS") + } + if !awsRequirementAllowsDeploymentMode(req) { + return awsRequirementDiagnostic(key, "unsupported_deployment_mode", + "aws ECS maps sidecar intent to an explicit or sibling collector service; daemonset mode is not valid for ECS") + } + return nil +} + +func awsRequirementAllowsRuntime(req *pb.IaCRequirement, runtime pb.RequirementRuntime) bool { + if len(req.GetRuntimes()) == 0 { + return true + } + for _, candidate := range req.GetRuntimes() { + if candidate == runtime { + return true + } + } + return false +} + +func awsRequirementAllowsDeploymentMode(req *pb.IaCRequirement) bool { + modes := req.GetDeploymentModes() + if len(modes) == 0 { + return true + } + for _, mode := range modes { + switch mode { + case pb.DeploymentMode_DEPLOYMENT_MODE_SIDECAR, + pb.DeploymentMode_DEPLOYMENT_MODE_SIBLING_SERVICE, + pb.DeploymentMode_DEPLOYMENT_MODE_MANAGED: + return true + } + } + return false +} + +func awsRequirementDiagnostic(key, code, message string) *pb.RequirementDiagnostic { + return &pb.RequirementDiagnostic{Key: key, Code: code, Message: message} +} + +func awsCollectorModuleConfig(reqs []*pb.IaCRequirement) map[string]any { + signals := awsRequestedSignals(reqs) + backends := awsRequestedBackends(reqs) + collectorConfig := awsBuildCollectorConfig(signals, backends) + + envVars := map[string]any{ + "AOT_CONFIG_CONTENT": collectorConfig, + } + secretVars := make(map[string]any) + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_OTEL) { + envVars["OTEL_EXPORTER_OTLP_ENDPOINT"] = "${vars.otel_exporter_otlp_endpoint}" + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_DATADOG) { + envVars["DD_SITE"] = "${vars.datadog_site}" + secretVars["DD_API_KEY"] = "${secrets.datadog_api_key_arn}" + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_LOKI) { + envVars["LOKI_ENDPOINT"] = "${vars.loki_endpoint}" + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_GRAFANA) { + envVars["GRAFANA_OTLP_ENDPOINT"] = "${vars.grafana_otlp_endpoint}" + secretVars["GRAFANA_OTLP_HEADERS"] = "${secrets.grafana_otlp_headers_arn}" + } + + cfg := map[string]any{ + "image": awsCollectorImage, + "command": []any{"--config=env:AOT_CONFIG_CONTENT"}, + "cpu": "256", + "memory": "512", + "replicas": 1, + "ports": awsCollectorPorts(backends), + "env_vars": envVars, + "env_vars_secret": secretVars, + } + return cfg +} + +func awsRequestedSignals(reqs []*pb.IaCRequirement) map[pb.TelemetrySignal]bool { + out := make(map[pb.TelemetrySignal]bool) + for _, req := range reqs { + for _, signal := range req.GetTelemetrySignals() { + if signal != pb.TelemetrySignal_TELEMETRY_SIGNAL_UNSPECIFIED { + out[signal] = true + } + } + } + if len(out) == 0 { + out[pb.TelemetrySignal_TELEMETRY_SIGNAL_TRACES] = true + out[pb.TelemetrySignal_TELEMETRY_SIGNAL_METRICS] = true + out[pb.TelemetrySignal_TELEMETRY_SIGNAL_LOGS] = true + } + return out +} + +func awsRequestedBackends(reqs []*pb.IaCRequirement) map[pb.ObservabilityBackend]bool { + out := make(map[pb.ObservabilityBackend]bool) + for _, req := range reqs { + for _, backend := range req.GetObservabilityBackends() { + if backend != pb.ObservabilityBackend_OBSERVABILITY_BACKEND_UNSPECIFIED { + out[backend] = true + } + } + } + if len(out) == 0 { + out[pb.ObservabilityBackend_OBSERVABILITY_BACKEND_OTEL] = true + } + return out +} + +func awsCollectorPorts(backends map[pb.ObservabilityBackend]bool) []any { + ports := []any{ + map[string]any{"port": 4317, "protocol": "tcp"}, + map[string]any{"port": 4318, "protocol": "tcp"}, + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_PROMETHEUS) { + ports = append(ports, map[string]any{"port": 9464, "protocol": "tcp"}) + } + return ports +} + +func awsBuildCollectorConfig(signals map[pb.TelemetrySignal]bool, backends map[pb.ObservabilityBackend]bool) string { + var b strings.Builder + b.WriteString("receivers:\n") + b.WriteString(" otlp:\n") + b.WriteString(" protocols:\n") + b.WriteString(" grpc:\n") + b.WriteString(" endpoint: 0.0.0.0:4317\n") + b.WriteString(" http:\n") + b.WriteString(" endpoint: 0.0.0.0:4318\n") + b.WriteString("processors:\n") + b.WriteString(" batch: {}\n") + b.WriteString("exporters:\n") + awsWriteExporters(&b, backends) + b.WriteString("service:\n") + b.WriteString(" pipelines:\n") + if signals[pb.TelemetrySignal_TELEMETRY_SIGNAL_TRACES] { + awsWritePipeline(&b, "traces", awsExportersForSignal(pb.TelemetrySignal_TELEMETRY_SIGNAL_TRACES, backends)) + } + if signals[pb.TelemetrySignal_TELEMETRY_SIGNAL_METRICS] { + awsWritePipeline(&b, "metrics", awsExportersForSignal(pb.TelemetrySignal_TELEMETRY_SIGNAL_METRICS, backends)) + } + if signals[pb.TelemetrySignal_TELEMETRY_SIGNAL_LOGS] { + awsWritePipeline(&b, "logs", awsExportersForSignal(pb.TelemetrySignal_TELEMETRY_SIGNAL_LOGS, backends)) + } + return b.String() +} + +func awsWriteExporters(b *strings.Builder, backends map[pb.ObservabilityBackend]bool) { + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_OTEL) { + b.WriteString(" otlp:\n") + b.WriteString(" endpoint: ${env:OTEL_EXPORTER_OTLP_ENDPOINT}\n") + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_DATADOG) { + b.WriteString(" datadog:\n") + b.WriteString(" api:\n") + b.WriteString(" key: ${env:DD_API_KEY}\n") + b.WriteString(" site: ${env:DD_SITE}\n") + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_PROMETHEUS) { + b.WriteString(" prometheus:\n") + b.WriteString(" endpoint: 0.0.0.0:9464\n") + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_LOKI) { + b.WriteString(" loki:\n") + b.WriteString(" endpoint: ${env:LOKI_ENDPOINT}\n") + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_GRAFANA) { + b.WriteString(" otlp/grafana_otlp:\n") + b.WriteString(" endpoint: ${env:GRAFANA_OTLP_ENDPOINT}\n") + b.WriteString(" headers:\n") + b.WriteString(" authorization: ${env:GRAFANA_OTLP_HEADERS}\n") + } +} + +func awsWritePipeline(b *strings.Builder, name string, exporters []string) { + if len(exporters) == 0 { + return + } + b.WriteString(" ") + b.WriteString(name) + b.WriteString(":\n") + b.WriteString(" receivers: [otlp]\n") + b.WriteString(" processors: [batch]\n") + b.WriteString(" exporters: [") + b.WriteString(strings.Join(exporters, ", ")) + b.WriteString("]\n") +} + +func awsExportersForSignal(signal pb.TelemetrySignal, backends map[pb.ObservabilityBackend]bool) []string { + var exporters []string + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_OTEL) { + exporters = append(exporters, "otlp") + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_DATADOG) { + exporters = append(exporters, "datadog") + } + if signal == pb.TelemetrySignal_TELEMETRY_SIGNAL_METRICS && + awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_PROMETHEUS) { + exporters = append(exporters, "prometheus") + } + if signal == pb.TelemetrySignal_TELEMETRY_SIGNAL_LOGS && + awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_LOKI) { + exporters = append(exporters, "loki") + } + if awsHasBackend(backends, pb.ObservabilityBackend_OBSERVABILITY_BACKEND_GRAFANA) { + exporters = append(exporters, "otlp/grafana_otlp") + } + sort.Strings(exporters) + return exporters +} + +func awsHasBackend(backends map[pb.ObservabilityBackend]bool, backend pb.ObservabilityBackend) bool { + return backends[backend] +} diff --git a/internal/iacserver_mapper_test.go b/internal/iacserver_mapper_test.go new file mode 100644 index 0000000..33f6617 --- /dev/null +++ b/internal/iacserver_mapper_test.go @@ -0,0 +1,224 @@ +package internal + +import ( + "context" + "encoding/json" + "net" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/GoCodeAlone/workflow-plugin-aws/provider" + pb "github.com/GoCodeAlone/workflow/plugin/external/proto" + sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" +) + +const mapperTestBufSize = 1024 * 1024 + +func TestAWSRequirementMapper_MapsObservabilityToECSCollector(t *testing.T) { + conn := newMapperTestConn(t) + client := pb.NewIaCProviderRequirementMapperClient(conn) + + resp, err := client.MapRequirements(context.Background(), &pb.MapRequirementsRequest{ + Provider: "aws", + Runtime: pb.RequirementRuntime_REQUIREMENT_RUNTIME_ECS, + Environment: "prod", + Requirements: []*pb.IaCRequirement{{ + Key: "observability.telemetry.default", + Kind: pb.RequirementKind_REQUIREMENT_KIND_OBSERVABILITY, + Runtimes: []pb.RequirementRuntime{ + pb.RequirementRuntime_REQUIREMENT_RUNTIME_ECS, + }, + TelemetrySignals: []pb.TelemetrySignal{ + pb.TelemetrySignal_TELEMETRY_SIGNAL_TRACES, + pb.TelemetrySignal_TELEMETRY_SIGNAL_METRICS, + pb.TelemetrySignal_TELEMETRY_SIGNAL_LOGS, + }, + ObservabilityBackends: []pb.ObservabilityBackend{ + pb.ObservabilityBackend_OBSERVABILITY_BACKEND_OTEL, + pb.ObservabilityBackend_OBSERVABILITY_BACKEND_DATADOG, + pb.ObservabilityBackend_OBSERVABILITY_BACKEND_PROMETHEUS, + pb.ObservabilityBackend_OBSERVABILITY_BACKEND_LOKI, + pb.ObservabilityBackend_OBSERVABILITY_BACKEND_GRAFANA, + }, + DeploymentModes: []pb.DeploymentMode{ + pb.DeploymentMode_DEPLOYMENT_MODE_SIDECAR, + pb.DeploymentMode_DEPLOYMENT_MODE_SIBLING_SERVICE, + }, + }}, + }) + if err != nil { + t.Fatalf("MapRequirements: %v", err) + } + if got := resp.GetAcceptedKeys(); len(got) != 1 || got[0] != "observability.telemetry.default" { + t.Fatalf("accepted_keys = %v, want [observability.telemetry.default]", got) + } + if len(resp.GetRejected()) != 0 { + t.Fatalf("rejected = %+v, want none", resp.GetRejected()) + } + if len(resp.GetModules()) != 1 { + t.Fatalf("modules len = %d, want 1", len(resp.GetModules())) + } + mod := resp.GetModules()[0] + if mod.GetName() != "observability-collector" { + t.Errorf("module name = %q, want observability-collector", mod.GetName()) + } + if mod.GetType() != "infra.container_service" { + t.Errorf("module type = %q, want infra.container_service", mod.GetType()) + } + if got := mod.GetSatisfies(); len(got) != 1 || got[0] != "observability.telemetry.default" { + t.Errorf("module satisfies = %v, want [observability.telemetry.default]", got) + } + + var cfg map[string]any + if err := json.Unmarshal(mod.GetConfigJson(), &cfg); err != nil { + t.Fatalf("config_json: %v", err) + } + if cfg["image"] != "public.ecr.aws/aws-observability/aws-otel-collector:latest" { + t.Errorf("image = %v", cfg["image"]) + } + envVars, ok := cfg["env_vars"].(map[string]any) + if !ok { + t.Fatalf("env_vars missing or wrong type: %#v", cfg["env_vars"]) + } + collectorConfig, _ := envVars["AOT_CONFIG_CONTENT"].(string) + for _, want := range []string{"otlp:", "datadog:", "prometheus:", "loki:", "grafana_otlp:", "traces:", "metrics:", "logs:"} { + if !strings.Contains(collectorConfig, want) { + t.Fatalf("collector config missing %q:\n%s", want, collectorConfig) + } + } + secretVars, ok := cfg["env_vars_secret"].(map[string]any) + if !ok { + t.Fatalf("env_vars_secret missing or wrong type: %#v", cfg["env_vars_secret"]) + } + if secretVars["DD_API_KEY"] != "${secrets.datadog_api_key_arn}" { + t.Errorf("DD_API_KEY = %v", secretVars["DD_API_KEY"]) + } + if strings.Contains(string(mod.GetConfigJson()), "plain-api-key") { + t.Fatal("config_json contains plaintext secret") + } +} + +func TestAWSRequirementMapper_RejectsUnsupportedRuntime(t *testing.T) { + conn := newMapperTestConn(t) + client := pb.NewIaCProviderRequirementMapperClient(conn) + + resp, err := client.MapRequirements(context.Background(), &pb.MapRequirementsRequest{ + Provider: "aws", + Runtime: pb.RequirementRuntime_REQUIREMENT_RUNTIME_CLOUD_RUN, + Requirements: []*pb.IaCRequirement{{ + Key: "observability.telemetry.default", + Kind: pb.RequirementKind_REQUIREMENT_KIND_OBSERVABILITY, + }}, + }) + if err != nil { + t.Fatalf("MapRequirements: %v", err) + } + if len(resp.GetModules()) != 0 { + t.Fatalf("modules = %+v, want none", resp.GetModules()) + } + if got := resp.GetRejected(); len(got) != 1 || got[0].GetCode() != "unsupported_runtime" { + t.Fatalf("rejected = %+v, want unsupported_runtime", got) + } +} + +func TestAWSRequirementMapper_RejectsUnsupportedDeploymentMode(t *testing.T) { + conn := newMapperTestConn(t) + client := pb.NewIaCProviderRequirementMapperClient(conn) + + resp, err := client.MapRequirements(context.Background(), &pb.MapRequirementsRequest{ + Provider: "aws", + Runtime: pb.RequirementRuntime_REQUIREMENT_RUNTIME_ECS, + Requirements: []*pb.IaCRequirement{{ + Key: "observability.telemetry.default", + Kind: pb.RequirementKind_REQUIREMENT_KIND_OBSERVABILITY, + DeploymentModes: []pb.DeploymentMode{ + pb.DeploymentMode_DEPLOYMENT_MODE_DAEMONSET, + }, + }}, + }) + if err != nil { + t.Fatalf("MapRequirements: %v", err) + } + if len(resp.GetModules()) != 0 { + t.Fatalf("modules = %+v, want none", resp.GetModules()) + } + if got := resp.GetRejected(); len(got) != 1 || got[0].GetCode() != "unsupported_deployment_mode" { + t.Fatalf("rejected = %+v, want unsupported_deployment_mode", got) + } +} + +func TestAWSRequirementMapper_UnregisteredProviderName(t *testing.T) { + conn := newMapperTestConn(t) + client := pb.NewIaCProviderRequirementMapperClient(conn) + + _, err := client.MapRequirements(context.Background(), &pb.MapRequirementsRequest{ + Provider: "gcp", + Requirements: []*pb.IaCRequirement{{ + Key: "observability.telemetry.default", + Kind: pb.RequirementKind_REQUIREMENT_KIND_OBSERVABILITY, + }}, + }) + if err == nil { + t.Fatal("MapRequirements: expected provider mismatch error") + } + if status.Code(err) != codes.InvalidArgument { + t.Fatalf("MapRequirements code = %v, want InvalidArgument; err=%v", status.Code(err), err) + } +} + +func TestPluginManifestAdvertisesRequirementMapper(t *testing.T) { + data, err := os.ReadFile(filepath.Join(hostConformanceRepoRoot(t), "plugin.json")) + if err != nil { + t.Fatalf("read plugin.json: %v", err) + } + var manifest struct { + MinEngineVersion string `json:"minEngineVersion"` + IaCServices []string `json:"iacServices"` + } + if err := json.Unmarshal(data, &manifest); err != nil { + t.Fatalf("parse plugin.json: %v", err) + } + if manifest.MinEngineVersion != "0.64.3" { + t.Fatalf("minEngineVersion = %q, want 0.64.3", manifest.MinEngineVersion) + } + const mapperService = "workflow.plugin.external.iac.IaCProviderRequirementMapper" + for _, svc := range manifest.IaCServices { + if svc == mapperService { + return + } + } + t.Fatalf("iacServices missing %s: %v", mapperService, manifest.IaCServices) +} + +func newMapperTestConn(t *testing.T) *grpc.ClientConn { + t.Helper() + + listener := bufconn.Listen(mapperTestBufSize) + t.Cleanup(func() { _ = listener.Close() }) + + server := grpc.NewServer() + if err := sdk.RegisterAllIaCProviderServices(server, newAWSIaCServer(provider.NewAWSProviderConcrete())); err != nil { + t.Fatalf("RegisterAllIaCProviderServices: %v", err) + } + go func() { _ = server.Serve(listener) }() + t.Cleanup(server.Stop) + + conn, err := grpc.NewClient("passthrough:///bufnet", + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return listener.DialContext(ctx) + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatalf("grpc.NewClient: %v", err) + } + t.Cleanup(func() { _ = conn.Close() }) + return conn +} diff --git a/plugin.json b/plugin.json index a7e0c92..b31ea0b 100644 --- a/plugin.json +++ b/plugin.json @@ -6,7 +6,7 @@ "license": "MIT", "type": "external", "tier": "community", - "minEngineVersion": "0.64.0", + "minEngineVersion": "0.64.3", "iacServices": [ "workflow.plugin.external.iac.IaCProviderRequired", "workflow.plugin.external.iac.IaCProviderEnumerator", @@ -15,6 +15,8 @@ "workflow.plugin.external.iac.IaCProviderMigrationRepairer", "workflow.plugin.external.iac.IaCProviderValidator", "workflow.plugin.external.iac.IaCProviderDriftConfigDetector", + "workflow.plugin.external.iac.IaCProviderRequirementMapper", + "workflow.plugin.external.iac.ResourceDriver", "workflow.plugin.external.iac.IaCStateBackend" ], "keywords": [