diff --git a/internal/pkg/deploy/cloudformation/stack/transformers.go b/internal/pkg/deploy/cloudformation/stack/transformers.go index 8e852f2236f..2a7b32bd533 100644 --- a/internal/pkg/deploy/cloudformation/stack/transformers.go +++ b/internal/pkg/deploy/cloudformation/stack/transformers.go @@ -45,6 +45,16 @@ const ( capacityProviderFargate = "FARGATE" ) +// Time interval options. +const ( + retentionMinValueSeconds = 0 + retentionMaxValueSeconds = 1209600 + delayMinValueSeconds = 0 + delayMaxValueSeconds = 900 + timeoutMinValueSeconds = 0 + timeoutMaxValueSeconds = 43200 +) + var ( errEphemeralBadSize = errors.New("ephemeral storage must be between 20 GiB and 200 GiB") errInvalidSpotConfig = errors.New(`"count.spot" and "count.range" cannot be specified together`) @@ -634,26 +644,40 @@ func convertTopic(t manifest.Topic, accountID, partition, region, app, env, svc }, nil } -func convertSubscribe(s *manifest.SubscribeConfig, validTopicARNs []string) (*template.SubscribeOpts, error) { +func convertSubscribe(s *manifest.SubscribeConfig, validTopicARNs []string, accountID, region, app, env, svc string) (*template.SubscribeOpts, error) { if s == nil || s.Topics == nil { return nil, nil } + sqsEndpoint, err := endpoints.DefaultResolver().EndpointFor(endpoints.SqsServiceID, region) + if err != nil { + return nil, err + } + var subscriptions template.SubscribeOpts for _, sb := range *s.Topics { - ts, err := convertTopicSubscription(sb, validTopicARNs) + ts, err := convertTopicSubscription(sb, validTopicARNs, sqsEndpoint.URL, accountID, app, env, svc) if err != nil { return nil, err } subscriptions.Topics = append(subscriptions.Topics, ts) } + queue, err := convertQueue(s.Queue, sqsEndpoint.URL, accountID, app, env, svc) + if err != nil { + return nil, err + } + subscriptions.Queue = queue return &subscriptions, nil } -func convertTopicSubscription(t manifest.TopicSubscription, validTopicARNs []string) (*template.TopicSubscription, error) { - err := validateTopicSubscription(t, validTopicARNs) +func convertTopicSubscription(t manifest.TopicSubscription, validTopicARNs []string, url, accountID, app, env, svc string) (*template.TopicSubscription, error) { + err := validateTopicSubscription(t, validTopicARNs, app, env) + if err != nil { + return nil, fmt.Errorf(`invalid topic subscription "%s": %w`, t.Name, err) + } + queue, err := convertQueue(t.Queue, url, accountID, app, env, svc) if err != nil { return nil, fmt.Errorf(`invalid topic subscription "%s": %w`, t.Name, err) } @@ -661,6 +685,84 @@ func convertTopicSubscription(t manifest.TopicSubscription, validTopicARNs []str return &template.TopicSubscription{ Name: aws.String(t.Name), Service: aws.String(t.Service), + Queue: queue, + }, nil +} + +func convertQueue(q *manifest.SQSQueue, url, accountID, app, env, svc string) (*template.SQSQueue, error) { + if q == nil { + return nil, nil + } + retention, err := convertRetention(q.Retention) + if err != nil { + return nil, fmt.Errorf(" `retention` %w", err) + } + delay, err := convertDelay(q.Delay) + if err != nil { + return nil, fmt.Errorf("`delay` %w", err) + } + timeout, err := convertTimeout(q.Timeout) + if err != nil { + return nil, fmt.Errorf("`timeout` %w", err) + } + deadletter, err := convertDeadLetter(q.DeadLetter) + if err != nil { + return nil, err + } + + return &template.SQSQueue{ + Retention: retention, + Delay: delay, + Timeout: timeout, + DeadLetter: deadletter, + FIFO: convertFIFO(q.FIFO), + }, nil +} + +func convertTime(t *time.Duration, floor, ceiling time.Duration) (*int64, error) { + if t == nil { + return nil, nil + } + + if err := validateTime(*t, floor, ceiling); err != nil { + return nil, err + } + + return aws.Int64(int64(t.Seconds())), nil +} + +func convertRetention(t *time.Duration) (*int64, error) { + return convertTime(t, retentionMinValueSeconds*time.Second, retentionMaxValueSeconds*time.Second) +} + +func convertDelay(t *time.Duration) (*int64, error) { + return convertTime(t, delayMinValueSeconds*time.Second, delayMaxValueSeconds*time.Second) +} + +func convertTimeout(t *time.Duration) (*int64, error) { + return convertTime(t, timeoutMinValueSeconds*time.Second, timeoutMaxValueSeconds*time.Second) +} + +func convertFIFO(f *manifest.FIFOOrBool) *template.FIFOQueue { + if f == nil || !aws.BoolValue(f.Enabled) { + return nil + } + + return &template.FIFOQueue{ + HighThroughput: aws.BoolValue(f.FIFO.HighThroughput), + } +} + +func convertDeadLetter(d *manifest.DeadLetterQueue) (*template.DeadLetterQueue, error) { + if d == nil { + return nil, nil + } + if err := validateDeadLetter(d); err != nil { + return nil, err + } + + return &template.DeadLetterQueue{ + Tries: d.Tries, }, nil } diff --git a/internal/pkg/deploy/cloudformation/stack/transformers_test.go b/internal/pkg/deploy/cloudformation/stack/transformers_test.go index 6e005855954..15b057971e5 100644 --- a/internal/pkg/deploy/cloudformation/stack/transformers_test.go +++ b/internal/pkg/deploy/cloudformation/stack/transformers_test.go @@ -547,8 +547,8 @@ func Test_convertAutoscaling(t *testing.T) { func Test_convertHTTPHealthCheck(t *testing.T) { // These are used by reference to represent the output of the manifest.durationp function. - duration15Seconds := time.Duration(15 * time.Second) - duration60Seconds := time.Duration(60 * time.Second) + duration15Seconds := 15 * time.Second + duration60Seconds := 60 * time.Second testCases := map[string]struct { inputPath *string inputSuccessCodes *string @@ -1565,7 +1565,14 @@ func Test_convertPublish(t *testing.T) { } func Test_convertSubscribe(t *testing.T) { - validTopics := []string{"arn:aws:us-east-1:123456789012:app-env-svc-name", "arn:aws:us-east-1:123456789012:app-env-svc-name2"} + validTopics := []string{"arn:aws:sns:us-west-2:123456789123:app-env-svc-name", "arn:aws:sns:us-west-2:123456789123:app-env-svc-name2"} + accountId := "123456789123" + region := "us-west-2" + app := "app" + env := "env" + svc := "svc" + duration111Seconds := 111 * time.Second + duration5Days := 120 * time.Hour testCases := map[string]struct { inSubscribe *manifest.SubscribeConfig @@ -1584,7 +1591,7 @@ func Test_convertSubscribe(t *testing.T) { }, wantedError: fmt.Errorf(`invalid topic subscription "": %w`, errMissingPublishTopicField), }, - "valid publish": { + "valid subscribe": { inSubscribe: &manifest.SubscribeConfig{ Topics: &[]manifest.TopicSubscription{ { @@ -1592,6 +1599,20 @@ func Test_convertSubscribe(t *testing.T) { Service: "svc", }, }, + Queue: &manifest.SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &manifest.DeadLetterQueue{ + Tries: aws.Uint16(35), + }, + FIFO: &manifest.FIFOOrBool{ + Enabled: aws.Bool(true), + FIFO: manifest.FIFOQueue{ + HighThroughput: aws.Bool(false), + }, + }, + }, }, wanted: &template.SubscribeOpts{ Topics: []*template.TopicSubscription{ @@ -1600,6 +1621,37 @@ func Test_convertSubscribe(t *testing.T) { Service: aws.String("svc"), }, }, + Queue: &template.SQSQueue{ + Retention: aws.Int64(111), + Delay: aws.Int64(111), + Timeout: aws.Int64(111), + DeadLetter: &template.DeadLetterQueue{ + Tries: aws.Uint16(35), + }, + FIFO: &template.FIFOQueue{ + HighThroughput: false, + }, + }, + }, + }, + "valid subscribe with minimal queue": { + inSubscribe: &manifest.SubscribeConfig{ + Topics: &[]manifest.TopicSubscription{ + { + Name: "name", + Service: "svc", + }, + }, + Queue: &manifest.SQSQueue{}, + }, + wanted: &template.SubscribeOpts{ + Topics: []*template.TopicSubscription{ + { + Name: aws.String("name"), + Service: aws.String("svc"), + }, + }, + Queue: &template.SQSQueue{}, }, }, "invalid topic name": { @@ -1635,15 +1687,96 @@ func Test_convertSubscribe(t *testing.T) { }, wantedError: fmt.Errorf(`invalid topic subscription "topic1": %w`, errTopicSubscriptionNotAllowed), }, + "sneaky topic not allowed": { + inSubscribe: &manifest.SubscribeConfig{ + Topics: &[]manifest.TopicSubscription{ + { + Name: "sneakytopic", + Service: "svc-name", + }, + }, + }, + wantedError: fmt.Errorf(`invalid topic subscription "sneakytopic": %w`, errTopicSubscriptionNotAllowed), + }, + "subscribe queue delay invalid": { + inSubscribe: &manifest.SubscribeConfig{ + Topics: &[]manifest.TopicSubscription{ + { + Name: "name", + Service: "svc", + }, + }, + Queue: &manifest.SQSQueue{ + Delay: &duration5Days, + }, + }, + wantedError: fmt.Errorf("`delay` must be between 0s and 15m0s"), + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - got, err := convertSubscribe(tc.inSubscribe, validTopics) + got, err := convertSubscribe(tc.inSubscribe, validTopics, accountId, region, app, env, svc) if tc.wantedError != nil { require.EqualError(t, err, tc.wantedError.Error()) } else { - require.Equal(t, got, tc.wanted) + require.Equal(t, tc.wanted, got) } }) } } + +func Test_convertFIFO(t *testing.T) { + testCases := map[string]struct { + inFIFO *manifest.FIFOOrBool + + wanted *template.FIFOQueue + wantedError error + }{ + "empty FIFO": { + inFIFO: &manifest.FIFOOrBool{}, + wanted: nil, + }, + "FIFO with enabled false": { + inFIFO: &manifest.FIFOOrBool{ + Enabled: aws.Bool(false), + }, + wanted: nil, + }, + "FIFO with enabled true and no high throughput": { + inFIFO: &manifest.FIFOOrBool{ + Enabled: aws.Bool(true), + }, + wanted: &template.FIFOQueue{ + HighThroughput: false, + }, + }, + "FIFO with enabled true and high throughput false": { + inFIFO: &manifest.FIFOOrBool{ + Enabled: aws.Bool(true), + FIFO: manifest.FIFOQueue{ + HighThroughput: aws.Bool(false), + }, + }, + wanted: &template.FIFOQueue{ + HighThroughput: false, + }, + }, + "FIFO with enabled true and high throughput true": { + inFIFO: &manifest.FIFOOrBool{ + Enabled: aws.Bool(true), + FIFO: manifest.FIFOQueue{ + HighThroughput: aws.Bool(true), + }, + }, + wanted: &template.FIFOQueue{ + HighThroughput: true, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + got := convertFIFO(tc.inFIFO) + require.Equal(t, tc.wanted, got) + }) + } +} diff --git a/internal/pkg/deploy/cloudformation/stack/validate.go b/internal/pkg/deploy/cloudformation/stack/validate.go index 2f9c8012a56..d68449a3eb0 100644 --- a/internal/pkg/deploy/cloudformation/stack/validate.go +++ b/internal/pkg/deploy/cloudformation/stack/validate.go @@ -7,9 +7,10 @@ import ( "errors" "fmt" "regexp" - "strings" + "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/copilot-cli/internal/pkg/manifest" ) @@ -28,6 +29,7 @@ var ( errNoSourceVolume = errors.New("`source_volume` cannot be empty") errEmptyEFSConfig = errors.New("bad EFS configuration: `efs` cannot be empty") errMissingPublishTopicField = errors.New("field `publish.topics[].name` cannot be empty") + errDeadLetterQueueTries = fmt.Errorf("DeadLetter `tries` field cannot exceed %d", deadLetterTriesMaxValue) ) // Conditional errors. @@ -50,14 +52,14 @@ var ( errTopicSubscriptionNotAllowed = errors.New("topic not in list of topics available to subscribe to") ) -// Container dependency status options +// Container dependency status options. var ( essentialContainerValidStatuses = []string{dependsOnStart, dependsOnHealthy} dependsOnValidStatuses = []string{dependsOnStart, dependsOnComplete, dependsOnSuccess, dependsOnHealthy} sidecarDependsOnValidStatuses = []string{dependsOnStart, dependsOnComplete, dependsOnSuccess} ) -// Regex options +// Regex options. var ( awsSNSTopicRegexp = regexp.MustCompile(`^[a-zA-Z0-9_-]*$`) // Validates that an expression contains only letters, numbers, underscores, and hyphens. awsNameRegexp = regexp.MustCompile(`^[a-z][a-z0-9\-]+$`) // Validates that an expression starts with a letter and only contains letters, numbers, and hyphens. @@ -65,6 +67,12 @@ var ( trailingPunctRegExp = regexp.MustCompile(`[\-\.]$`) // Check for trailing dash or dot. ) +// Options for SQS Queues. +var ( + resourceNameFormat = "%s-%s-%s-%s" // Format for copilot resource names of form app-env-svc-name + deadLetterTriesMaxValue = 1000 +) + // Validate that paths contain only an approved set of characters to guard against command injection. // We can accept 0-9A-Za-z-_. func validatePath(input string, maxLength int) error { @@ -463,7 +471,7 @@ func isCorrectSvcNameFormat(s string) bool { return len(trailingMatch) == 0 } -func validateTopicSubscription(ts manifest.TopicSubscription, validTopicARNs []string) error { +func validateTopicSubscription(ts manifest.TopicSubscription, validTopicARNs []string, app, env string) error { if err := validatePubSubName(ts.Name); err != nil { return err } @@ -473,17 +481,33 @@ func validateTopicSubscription(ts manifest.TopicSubscription, validTopicARNs []s } // Check that the topic is included in the list of available topics + topicName := fmt.Sprintf(resourceNameFormat, app, env, ts.Service, ts.Name) for _, topicARN := range validTopicARNs { - splitArn := strings.Split(topicARN, ":") - topicName := strings.Split(splitArn[len(splitArn)-1], "-") - if len(topicName) < 4 { + arn, err := arn.Parse(topicARN) + if err != nil { continue } + validTopicName := arn.Resource - if topicName[2] == ts.Service && topicName[3] == ts.Name { + if validTopicName == topicName { return nil } } return errTopicSubscriptionNotAllowed } + +func validateTime(t, floor, ceiling time.Duration) error { + if t < floor || t > ceiling { + return fmt.Errorf("must be between %v and %v", floor, ceiling) + } + + return nil +} + +func validateDeadLetter(dl *manifest.DeadLetterQueue) error { + if aws.Uint16Value(dl.Tries) > uint16(deadLetterTriesMaxValue) { + return errDeadLetterQueueTries + } + return nil +} diff --git a/internal/pkg/deploy/cloudformation/stack/validate_test.go b/internal/pkg/deploy/cloudformation/stack/validate_test.go index 92f4a12c264..1ff7d845079 100644 --- a/internal/pkg/deploy/cloudformation/stack/validate_test.go +++ b/internal/pkg/deploy/cloudformation/stack/validate_test.go @@ -4,8 +4,10 @@ package stack import ( + "errors" "fmt" "testing" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/copilot-cli/internal/pkg/manifest" @@ -567,9 +569,11 @@ func TestValidateWorkerName(t *testing.T) { } func TestValidateTopicSubscription(t *testing.T) { - validTopics := []string{"arn:aws:us-east-1:123456789012:app-env-svc-name", "arn:aws:us-east-1:123456789012:app-env-svc-name2"} + app := "app" + env := "env" testCases := map[string]struct { - inTS manifest.TopicSubscription + inTS manifest.TopicSubscription + inValidTopics []string wantErr error }{ @@ -578,32 +582,112 @@ func TestValidateTopicSubscription(t *testing.T) { Name: "name2", Service: "svc", }, - wantErr: nil, + inValidTopics: []string{"arn:aws:sns:us-east-1:123456789012:app-env-svc-name", "arn:aws:sns:us-east-1:123456789012:app-env-svc-name2"}, + wantErr: nil, }, "empty name": { inTS: manifest.TopicSubscription{ Service: "svc", }, - wantErr: errMissingPublishTopicField, + inValidTopics: []string{"arn:aws:sns:us-east-1:123456789012:app-env-svc-name", "arn:aws:sns:us-east-1:123456789012:app-env-svc-name2"}, + wantErr: errMissingPublishTopicField, }, "empty svc name": { inTS: manifest.TopicSubscription{ Name: "theName", }, - wantErr: errInvalidSvcName, + inValidTopics: []string{"arn:aws:sns:us-east-1:123456789012:app-env-svc-name", "arn:aws:sns:us-east-1:123456789012:app-env-svc-name2"}, + wantErr: errInvalidSvcName, }, "topic not in list of valid topics": { inTS: manifest.TopicSubscription{ Name: "badName", Service: "svc", }, - wantErr: errTopicSubscriptionNotAllowed, + inValidTopics: []string{"arn:aws:sns:us-east-1:123456789012:app-env-svc-name", "arn:aws:sns:us-east-1:123456789012:app-env-svc-name2"}, + wantErr: errTopicSubscriptionNotAllowed, + }, + "topic in list of valid topics but one cannot be parsed": { + inTS: manifest.TopicSubscription{ + Name: "name2", + Service: "svc", + }, + inValidTopics: []string{"arn:aws:sns:us-east-1:123456789012:app-env-svc-name", "", "arn:aws:sns:us-east-1:123456789012:app-env-svc-name2"}, + wantErr: nil, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := validateTopicSubscription(tc.inTS, tc.inValidTopics, app, env) + + if tc.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.wantErr.Error()) + } + }) + } +} + +func TestValidateTime(t *testing.T) { + testCases := map[string]struct { + inTime time.Duration + inFloor time.Duration + inCeiling time.Duration + + wantErr error + }{ + "good case": { + inTime: 500 * time.Second, + inFloor: 0 * time.Second, + inCeiling: 600 * time.Second, + wantErr: nil, + }, + "bad time": { + inTime: 500 * time.Hour, + inFloor: 0 * time.Second, + inCeiling: 600 * time.Second, + wantErr: errors.New("must be between 0s and 10m0s"), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := validateTime(tc.inTime, tc.inFloor, tc.inCeiling) + + if tc.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.wantErr.Error()) + } + }) + } +} + +func TestValidateDeadLetter(t *testing.T) { + testCases := map[string]struct { + inDL *manifest.DeadLetterQueue + + wantErr error + }{ + "good case": { + inDL: &manifest.DeadLetterQueue{ + Tries: aws.Uint16(35), + }, + wantErr: nil, + }, + "wrong number of tries": { + inDL: &manifest.DeadLetterQueue{ + Tries: aws.Uint16(9999), + }, + wantErr: errDeadLetterQueueTries, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - err := validateTopicSubscription(tc.inTS, validTopics) + err := validateDeadLetter(tc.inDL) if tc.wantErr == nil { require.NoError(t, err) diff --git a/internal/pkg/manifest/worker_svc.go b/internal/pkg/manifest/worker_svc.go index e8b3369af52..9402a004466 100644 --- a/internal/pkg/manifest/worker_svc.go +++ b/internal/pkg/manifest/worker_svc.go @@ -4,15 +4,23 @@ package manifest import ( + "errors" + "time" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/copilot-cli/internal/pkg/template" "github.com/imdario/mergo" + "gopkg.in/yaml.v3" ) const ( workerSvcManifestPath = "workloads/services/worker/manifest.yml" ) +var ( + errUnmarshalFIFO = errors.New("cannot unmarshal field `fifo` under `subscribe`") +) + // WorkerService holds the configuration to create a worker service. type WorkerService struct { Workload `yaml:",inline"` @@ -44,12 +52,67 @@ type WorkerServiceProps struct { // SubscribeConfig represents the configurable options for setting up subscriptions. type SubscribeConfig struct { Topics *[]TopicSubscription `yaml:"topics"` + Queue *SQSQueue `yaml:"queue"` } // TopicSubscription represents the configurable options for setting up a SNS Topic Subscription. type TopicSubscription struct { - Name string `yaml:"name"` - Service string `yaml:"service"` + Name string `yaml:"name"` + Service string `yaml:"service"` + Queue *SQSQueue `yaml:"queue"` +} + +// SQSQueue represents the configurable options for setting up a SQS Queue. +type SQSQueue struct { + Retention *time.Duration `yaml:"retention"` + Delay *time.Duration `yaml:"delay"` + Timeout *time.Duration `yaml:"timeout"` + DeadLetter *DeadLetterQueue `yaml:"dead_letter"` + FIFO *FIFOOrBool `yaml:"fifo"` +} + +// DeadLetterQueue represents the configurable options for setting up a Dead-Letter Queue. +type DeadLetterQueue struct { + Tries *uint16 `yaml:"tries"` +} + +// FIFOOrBool contains custom unmarshaling logic for the `fifo` field in the manifest. +type FIFOOrBool struct { + FIFO FIFOQueue + Enabled *bool +} + +// FIFOQueue represents the configurable options for setting up a FIFO queue. +type FIFOQueue struct { + HighThroughput *bool `yaml:"high_throughput"` +} + +// IsEmpty returns empty if the struct has all zero members. +func (q *FIFOQueue) IsEmpty() bool { + return q.HighThroughput == nil +} + +// UnmarshalYAML implements the yaml(v2) interface. It allows FIFOQueue to be specified as a +// string or a struct alternately. +func (q *FIFOOrBool) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := unmarshal(&q.FIFO); err != nil { + switch err.(type) { + case *yaml.TypeError: + break + default: + return err + } + } + + if !q.FIFO.IsEmpty() { + q.Enabled = nil + return nil + } + + if err := unmarshal(&q.Enabled); err != nil { + return errUnmarshalFIFO + } + return nil } // NewWorkerService applies the props to a default Worker service configuration with diff --git a/internal/pkg/manifest/worker_svc_test.go b/internal/pkg/manifest/worker_svc_test.go index fb83228fcc0..8959207f23f 100644 --- a/internal/pkg/manifest/worker_svc_test.go +++ b/internal/pkg/manifest/worker_svc_test.go @@ -301,8 +301,8 @@ func TestWorkerSvc_ApplyEnv(t *testing.T) { Subscribe: &SubscribeConfig{ Topics: &[]TopicSubscription{ { - Name: "topicName", - Service: "bestService", + Name: "topicName2", + Service: "bestService2", }, }, }, @@ -410,6 +410,253 @@ func TestWorkerSvc_ApplyEnv(t *testing.T) { }, }, } + duration111Seconds := 111 * time.Second + mockWorkerServiceWithSubscribeNilOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: nil, + }, + }, + } + mockWorkerServiceWithNilSubscribeOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: nil, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + } + mockWorkerServiceWithEmptySubscribeOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{}, + }, + }, + } + mockWorkerServiceWithSubscribeTopicNilOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: nil, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + } + mockWorkerServiceWithNilSubscribeTopicOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Topics: nil, + }, + }, + }, + } + mockWorkerServiceWithSubscribeTopicEmptyOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{}, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + } + mockWorkerServiceWithSubscribeQueueNilOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Queue: nil, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + } + mockWorkerServiceWithNilSubscribeQueueOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Queue: nil, + }, + }, + }, + } + mockWorkerServiceWithSubscribeQueueEmptyOverride := WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{}, + }, + }, + Environments: map[string]*WorkerServiceConfig{ + "test-sub": { + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + } testCases := map[string]struct { svc *WorkerService inEnvName string @@ -486,8 +733,8 @@ func TestWorkerSvc_ApplyEnv(t *testing.T) { Subscribe: &SubscribeConfig{ Topics: &[]TopicSubscription{ { - Name: "topicName", - Service: "bestService", + Name: "topicName2", + Service: "bestService2", }, }, }, @@ -573,6 +820,258 @@ func TestWorkerSvc_ApplyEnv(t *testing.T) { }, original: &mockWorkerServiceWithImageOverrideLocationByBuild, }, + "with nil subscribe overriden by full subscribe": { + svc: &mockWorkerServiceWithNilSubscribeOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + + original: &mockWorkerServiceWithNilSubscribeOverride, + }, + "with full subscribe and nil subscribe env": { + svc: &mockWorkerServiceWithSubscribeNilOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + + original: &mockWorkerServiceWithSubscribeNilOverride, + }, + "with full subscribe and empty subscribe env": { + svc: &mockWorkerServiceWithEmptySubscribeOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{}, + }, + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + + original: &mockWorkerServiceWithEmptySubscribeOverride, + }, + "with nil subscribe topic overriden by full subscribe topic": { + svc: &mockWorkerServiceWithNilSubscribeTopicOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + + original: &mockWorkerServiceWithNilSubscribeTopicOverride, + }, + "with full subscribe topic and nil subscribe topic env": { + svc: &mockWorkerServiceWithSubscribeTopicNilOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + + original: &mockWorkerServiceWithSubscribeTopicNilOverride, + }, + "with empty subscribe topic overriden by full subscribe topic": { + svc: &mockWorkerServiceWithSubscribeTopicEmptyOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{}, + }, + Subscribe: &SubscribeConfig{ + Topics: &[]TopicSubscription{ + { + Name: "name", + Service: "svc", + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + }, + }, + + original: &mockWorkerServiceWithSubscribeTopicEmptyOverride, + }, + "with nil subscribe queue overriden by full subscribe queue": { + svc: &mockWorkerServiceWithNilSubscribeQueueOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + + original: &mockWorkerServiceWithNilSubscribeQueueOverride, + }, + "with full subscribe queue and nil subscribe queue env": { + svc: &mockWorkerServiceWithSubscribeQueueNilOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + + original: &mockWorkerServiceWithSubscribeQueueNilOverride, + }, + "with empty subscribe queue overriden by full subscribe queue": { + svc: &mockWorkerServiceWithSubscribeQueueEmptyOverride, + inEnvName: "test-sub", + wanted: &WorkerService{ + Workload: Workload{ + Name: aws.String("phonetool"), + Type: aws.String(WorkerServiceType), + }, + WorkerServiceConfig: WorkerServiceConfig{ + ImageConfig: ImageWithHealthcheck{ + Image: Image{}, + }, + Subscribe: &SubscribeConfig{ + Queue: &SQSQueue{ + Retention: &duration111Seconds, + Delay: &duration111Seconds, + Timeout: &duration111Seconds, + DeadLetter: &DeadLetterQueue{Tries: aws.Uint16(10)}, + FIFO: &FIFOOrBool{Enabled: aws.Bool(true)}, + }, + }, + }, + }, + + original: &mockWorkerServiceWithSubscribeQueueEmptyOverride, + }, } for name, tc := range testCases { @@ -746,3 +1245,61 @@ func TestWorkerSvc_ApplyEnv_CountOverrides(t *testing.T) { }) } } + +type testFIFO struct { + FIFO *FIFOOrBool `yaml:"fifo"` +} + +func Test_UnmarshalFifo(t *testing.T) { + testCases := map[string]struct { + manifest []byte + want testFIFO + wantErr error + }{ + "fifo specified": { + manifest: []byte(` +fifo: + high_throughput: true`), + want: testFIFO{ + FIFO: &FIFOOrBool{ + FIFO: FIFOQueue{ + HighThroughput: aws.Bool(true), + }, + }, + }, + }, + "enabled": { + manifest: []byte(` +fifo: true`), + want: testFIFO{ + FIFO: &FIFOOrBool{ + Enabled: aws.Bool(true), + }, + }, + }, + "invalid input": { + manifest: []byte(` +fifo: xyz`), + wantErr: errUnmarshalFIFO, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // GIVEN + v := testFIFO{ + FIFO: &FIFOOrBool{}, + } + + // WHEN + err := yaml.Unmarshal(tc.manifest, &v) + // THEN + if tc.wantErr == nil { + require.NoError(t, err) + require.Equal(t, tc.want.FIFO.Enabled, v.FIFO.Enabled) + require.Equal(t, tc.want.FIFO.FIFO.HighThroughput, v.FIFO.FIFO.HighThroughput) + } else { + require.EqualError(t, err, tc.wantErr.Error()) + } + }) + } +} diff --git a/internal/pkg/template/workload.go b/internal/pkg/template/workload.go index d1a7b04911c..5e9bd5a3d54 100644 --- a/internal/pkg/template/workload.go +++ b/internal/pkg/template/workload.go @@ -43,9 +43,7 @@ const ( // Constants for ARN options. const ( - snsArnPattern = "arn:%s:sns:%s:%s:%s-%s-%s-%s" - AWSPartition = "aws" - AWSChinaPartition = "aws-cn" + snsARNPattern = "arn:%s:sns:%s:%s:%s-%s-%s-%s" ) var ( @@ -237,12 +235,33 @@ type Topic struct { // SubscribeOpts holds configuration needed if the service has subscriptions. type SubscribeOpts struct { Topics []*TopicSubscription + Queue *SQSQueue } // TopicSubscription holds information needed to render a SNS Topic Subscription in a container definition. type TopicSubscription struct { Name *string Service *string + Queue *SQSQueue +} + +// SQSQueue holds information needed to render a SQS Queue in a container definition. +type SQSQueue struct { + Retention *int64 + Delay *int64 + Timeout *int64 + DeadLetter *DeadLetterQueue + FIFO *FIFOQueue +} + +// DeadLetterQueue holds information needed to render a dead-letter SQS Queue in a container definition. +type DeadLetterQueue struct { + Tries *uint16 +} + +// FIFOQueue holds information needed to specify a SQS Queue as FIFO in a container definition. +type FIFOQueue struct { + HighThroughput bool } // NetworkOpts holds AWS networking configuration for the workloads. @@ -440,7 +459,7 @@ func envControllerParameters(o WorkloadOpts) []string { return parameters } -// ARN determines the arn for a topic using the SNSTopic arn start and the name of the topic +// ARN determines the arn for a topic using the SNSTopic name and account information func (t Topic) ARN() string { - return fmt.Sprintf(snsArnPattern, t.Partition, t.Region, t.AccountID, t.App, t.Env, t.Svc, aws.StringValue(t.Name)) + return fmt.Sprintf(snsARNPattern, t.Partition, t.Region, t.AccountID, t.App, t.Env, t.Svc, aws.StringValue(t.Name)) }