Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ subscribe:
topics:
- name: givesdogs
service: dogsvc
filter_policy:
store:
- example_corp
event:
- anything-but: order_cancelled
cutomer_interests:
- rugby
- football
- baseball
price_usd:
- numeric:
- ">="
- 100
- name: giveshuskies
service: dogsvc
queue:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ Resources:
Properties:
TopicArn: !Join ['', [!Sub 'arn:${AWS::Partition}:sns:${AWS::Region}:${AWS::AccountId}:', !Ref AppName, '-', !Ref EnvName, '-dogsvc-givesdogs']]
Protocol: 'sqs'
FilterPolicy: {"cutomer_interests": ["rugby", "football", "baseball"], "event": [{"anything-but": "order_cancelled"}], "price_usd": [{"numeric": [">=", 100]}], "store": ["example_corp"]}
Endpoint: !GetAtt EventsQueue.Arn
dogsvcgiveshuskiesSNSTopicSubscription:
Metadata:
Expand Down
47 changes: 32 additions & 15 deletions internal/pkg/deploy/cloudformation/stack/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package stack

import (
"encoding/json"
"fmt"
"hash/crc32"
"strconv"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/aws/copilot-cli/internal/pkg/aws/s3"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"

"github.com/aws/copilot-cli/internal/pkg/manifest"
"github.com/aws/copilot-cli/internal/pkg/template"
Expand Down Expand Up @@ -648,36 +648,53 @@ func convertPublish(topics []manifest.Topic, accountID, region, app, env, svc st
return &publishers, nil
}

func convertSubscribe(s manifest.SubscribeConfig, accountID, region, app, env, svc string) (*template.SubscribeOpts, error) {
func convertSubscribe(s manifest.SubscribeConfig) (*template.SubscribeOpts, error) {
if s.Topics == nil {
return nil, nil
}
sqsEndpoint, err := endpoints.DefaultResolver().EndpointFor(endpoints.SqsServiceID, region)
if err != nil {
return nil, err
}
var subscriptions template.SubscribeOpts
for _, sb := range s.Topics {
ts := convertTopicSubscription(sb, sqsEndpoint.URL, accountID, app, env, svc)
ts, err := convertTopicSubscription(sb)
if err != nil {
return nil, err
}
subscriptions.Topics = append(subscriptions.Topics, ts)
}
subscriptions.Queue = convertQueue(s.Queue)
return &subscriptions, nil
}

func convertTopicSubscription(t manifest.TopicSubscription, url, accountID, app, env, svc string) *template.TopicSubscription {
func convertTopicSubscription(t manifest.TopicSubscription) (
*template.TopicSubscription, error) {
filterPolicy, err := convertFilterPolicy(t.FilterPolicy)
if err != nil {
return nil, err
}
if aws.BoolValue(t.Queue.Enabled) {
return &template.TopicSubscription{
Name: t.Name,
Service: t.Service,
Queue: &template.SQSQueue{},
}
Name: t.Name,
Service: t.Service,
Queue: &template.SQSQueue{},
FilterPolicy: filterPolicy,
}, nil
}
return &template.TopicSubscription{
Name: t.Name,
Service: t.Service,
Queue: convertQueue(t.Queue.Advanced),
Name: t.Name,
Service: t.Service,
Queue: convertQueue(t.Queue.Advanced),
FilterPolicy: filterPolicy,
}, nil
}

func convertFilterPolicy(filterPolicy map[string]interface{}) (*string, error) {
if len(filterPolicy) == 0 {
return nil, nil
}
bytes, err := json.Marshal(filterPolicy)
if err != nil {
return nil, fmt.Errorf(`convert "filter_policy" to a JSON string: %w`, err)
}
return aws.String(string(bytes)), nil
}

func convertQueue(q manifest.SQSQueue) *template.SQSQueue {
Expand Down
18 changes: 9 additions & 9 deletions internal/pkg/deploy/cloudformation/stack/transformers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,12 +1308,10 @@ func Test_convertPublish(t *testing.T) {
}

func Test_convertSubscribe(t *testing.T) {
accountId := "123456789123"
region := "us-west-2"
app := "app"
env := "env"
svc := "svc"
duration111Seconds := 111 * time.Second
mockStruct := map[string]interface{}{
"store": []string{"example_corp"},
}
testCases := map[string]struct {
inSubscribe manifest.SubscribeConfig

Expand Down Expand Up @@ -1366,16 +1364,18 @@ func Test_convertSubscribe(t *testing.T) {
Queue: manifest.SQSQueueOrBool{
Enabled: aws.Bool(true),
},
FilterPolicy: mockStruct,
},
},
Queue: manifest.SQSQueue{},
},
wanted: &template.SubscribeOpts{
Topics: []*template.TopicSubscription{
{
Name: aws.String("name"),
Service: aws.String("svc"),
Queue: &template.SQSQueue{},
Name: aws.String("name"),
Service: aws.String("svc"),
Queue: &template.SQSQueue{},
FilterPolicy: aws.String(`{"store":["example_corp"]}`),
},
},
Queue: nil,
Expand All @@ -1384,7 +1384,7 @@ func Test_convertSubscribe(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
got, err := convertSubscribe(tc.inSubscribe, accountId, region, app, env, svc)
got, err := convertSubscribe(tc.inSubscribe)
require.Equal(t, tc.wanted, got)
require.NoError(t, err)
})
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/deploy/cloudformation/stack/worker_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *WorkerService) Template() (string, error) {
if err != nil {
return "", err
}
subscribe, err := convertSubscribe(s.manifest.Subscribe, s.rc.AccountID, s.rc.Region, s.app, s.env, s.name)
subscribe, err := convertSubscribe(s.manifest.Subscribe)
if err != nil {
return "", err
}
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/manifest/validate_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func isValid(typ reflect.Type) error {
return nil
}
}
if typ.Kind() == reflect.Interface {
return nil
}
// For slice and map, validate its member type.
if typ.Kind() == reflect.Array || typ.Kind() == reflect.Slice || typ.Kind() == reflect.Map {
if err := isValid(typ.Elem()); err != nil {
Expand Down
12 changes: 7 additions & 5 deletions internal/pkg/manifest/worker_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ func (s *SubscribeConfig) IsEmpty() bool {

// TopicSubscription represents the configurable options for setting up a SNS Topic Subscription.
type TopicSubscription struct {
Name *string `yaml:"name"`
Service *string `yaml:"service"`
Queue SQSQueueOrBool `yaml:"queue"`
Name *string `yaml:"name"`
Service *string `yaml:"service"`
FilterPolicy map[string]interface{} `yaml:"filter_policy"`
Queue SQSQueueOrBool `yaml:"queue"`
}

// SQSQueueOrBool contains custom unmarshaling logic for the `queue` field in the manifest.
// SQSQueueOrBool is a custom type which supports unmarshaling yaml which
// can either be of type bool or type SQSQueue.
type SQSQueueOrBool struct {
Advanced SQSQueue
Enabled *bool
Expand All @@ -78,7 +80,7 @@ func (q *SQSQueueOrBool) IsEmpty() bool {
return q.Advanced.IsEmpty() && q.Enabled == nil
}

// UnmarshalYAML implements the yaml(v3) interface. It allows SQSQueue to be specified as a
// UnmarshalYAML implements the yaml(v3) interface. It allows SQSQueueOrBool to be specified as a
// string or a struct alternately.
func (q *SQSQueueOrBool) UnmarshalYAML(value *yaml.Node) error {
if err := value.Decode(&q.Advanced); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/template/template_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package template
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws/arn"
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/arn"

"github.com/aws/aws-sdk-go/aws"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ QueuePolicy:
Properties:
TopicArn: !Join ['', [!Sub 'arn:${AWS::Partition}:sns:${AWS::Region}:${AWS::AccountId}:', !Ref AppName, '-', !Ref EnvName, '-{{$topic.Service}}-{{$topic.Name}}']]
Protocol: 'sqs'
{{- if $topic.FilterPolicy}}
FilterPolicy: {{$topic.FilterPolicy}}
{{- end}}
{{- if $topic.Queue}}
Endpoint: !GetAtt {{logicalIDSafe $topic.Service}}{{logicalIDSafe $topic.Name}}EventsQueue.Arn
{{- else}}
Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/template/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,10 @@ func (s *SubscribeOpts) HasTopicQueues() bool {

// TopicSubscription holds information needed to render a SNS Topic Subscription in a container definition.
type TopicSubscription struct {
Name *string
Service *string
Queue *SQSQueue
Name *string
Service *string
FilterPolicy *string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to call it out here: I think we are probably fine with string instead of *string for a lot of these variables because "" being a string of length of 0 will result in false when evaluated in a if statement

Queue *SQSQueue
}

// SQSQueue holds information needed to render a SQS Queue in a container definition.
Expand Down