Skip to content

Commit

Permalink
pubsub/awssnssqs: add support for AWS SDK v2 for URL openers (google#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent authored Oct 19, 2021
1 parent cfbf972 commit bb89706
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
61 changes: 48 additions & 13 deletions pubsub/awssnssqs/awssnssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ type lazySessionOpener struct {
err error
}

func (o *lazySessionOpener) defaultOpener() (*URLOpener, error) {
func (o *lazySessionOpener) defaultOpener(u *url.URL) (*URLOpener, error) {
if gcaws.UseV2(u.Query()) {
return &URLOpener{UseV2: true}, nil
}
o.init.Do(func() {
sess, err := gcaws.NewDefaultSession()
if err != nil {
Expand All @@ -165,15 +168,15 @@ func (o *lazySessionOpener) defaultOpener() (*URLOpener, error) {
}

func (o *lazySessionOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
opener, err := o.defaultOpener()
opener, err := o.defaultOpener(u)
if err != nil {
return nil, fmt.Errorf("open topic %v: failed to open default session: %v", u, err)
}
return opener.OpenTopicURL(ctx, u)
}

func (o *lazySessionOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
opener, err := o.defaultOpener()
opener, err := o.defaultOpener(u)
if err != nil {
return nil, fmt.Errorf("open subscription %v: failed to open default session: %v", u, err)
}
Expand All @@ -200,7 +203,14 @@ const SQSScheme = "awssqs"
// For SQS topics and subscriptions, the URL's host+path is prefixed with
// "https://" to create the queue URL.
//
// The following query parameters are supported:
// Use "awssdk=v1" to force using AWS SDK v1, "awssdk=v2" to force using AWS SDK v2,
// or anything else to accept the default.
//
// For V1, see gocloud.dev/aws/ConfigFromURLParams for supported query parameters
// for overriding the aws.Session from the URL.
// For V2, see gocloud.dev/aws/V2ConfigFromURLParams.
//
// In addition, the following query parameters are supported:
//
// - raw (for "awssqs" Subscriptions only): sets SubscriberOptions.Raw. The
// value must be parseable by `strconv.ParseBool`.
Expand All @@ -209,7 +219,11 @@ const SQSScheme = "awssqs"
// See gocloud.dev/aws/ConfigFromURLParams for other query parameters
// that affect the default AWS session.
type URLOpener struct {
// UseV2 indicates whether the AWS SDK V2 should be used.
UseV2 bool

// ConfigProvider configures the connection to AWS.
// It must be set to a non-nil value if UseV2 is false.
ConfigProvider client.ConfigProvider

// TopicOptions specifies the options to pass to OpenTopic.
Expand All @@ -220,6 +234,25 @@ type URLOpener struct {

// OpenTopicURL opens a pubsub.Topic based on u.
func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
// Trim leading "/" if host is empty, so that
// awssns:///arn:aws:service:region:accountid:resourceType/resourcePath
// gives "arn:..." instead of "/arn:...".
topicARN := strings.TrimPrefix(path.Join(u.Host, u.Path), "/")
qURL := "https://" + path.Join(u.Host, u.Path)
if o.UseV2 {
cfg, err := gcaws.V2ConfigFromURLParams(ctx, u.Query())
if err != nil {
return nil, fmt.Errorf("open topic %v: %v", u, err)
}
switch u.Scheme {
case SNSScheme:
return OpenSNSTopicV2(ctx, snsv2.NewFromConfig(cfg), topicARN, &o.TopicOptions), nil
case SQSScheme:
return OpenSQSTopicV2(ctx, sqsv2.NewFromConfig(cfg), qURL, &o.TopicOptions), nil
default:
return nil, fmt.Errorf("open topic %v: unsupported scheme", u)
}
}
configProvider := &gcaws.ConfigOverrider{
Base: o.ConfigProvider,
}
Expand All @@ -230,13 +263,8 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic
configProvider.Configs = append(configProvider.Configs, overrideCfg)
switch u.Scheme {
case SNSScheme:
// Trim leading "/" if host is empty, so that
// awssns:///arn:aws:service:region:accountid:resourceType/resourcePath
// gives "arn:..." instead of "/arn:...".
topicARN := strings.TrimPrefix(path.Join(u.Host, u.Path), "/")
return OpenSNSTopic(ctx, configProvider, topicARN, &o.TopicOptions), nil
case SQSScheme:
qURL := "https://" + path.Join(u.Host, u.Path)
return OpenSQSTopic(ctx, configProvider, qURL, &o.TopicOptions), nil
default:
return nil, fmt.Errorf("open topic %v: unsupported scheme", u)
Expand All @@ -245,9 +273,6 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic

// OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
configProvider := &gcaws.ConfigOverrider{
Base: o.ConfigProvider,
}
// Clone the options since we might override Raw.
opts := o.SubscriptionOptions
q := u.Query()
Expand All @@ -267,12 +292,22 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu
}
q.Del("waittime")
}
qURL := "https://" + path.Join(u.Host, u.Path)
if o.UseV2 {
cfg, err := gcaws.V2ConfigFromURLParams(ctx, q)
if err != nil {
return nil, fmt.Errorf("open subscription %v: %v", u, err)
}
return OpenSubscriptionV2(ctx, sqsv2.NewFromConfig(cfg), qURL, &opts), nil
}
overrideCfg, err := gcaws.ConfigFromURLParams(q)
if err != nil {
return nil, fmt.Errorf("open subscription %v: %v", u, err)
}
configProvider := &gcaws.ConfigOverrider{
Base: o.ConfigProvider,
}
configProvider.Configs = append(configProvider.Configs, overrideCfg)
qURL := "https://" + path.Join(u.Host, u.Path)
return OpenSubscription(ctx, configProvider, qURL, &opts), nil
}

Expand Down
6 changes: 6 additions & 0 deletions pubsub/awssnssqs/awssnssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,8 @@ func TestOpenTopicFromURL(t *testing.T) {
{"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath", false},
// OK, setting region.
{"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath?region=us-east-2", false},
// OK, setting usev2.
{"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath?awssdk=v2", false},
// Invalid parameter.
{"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath?param=value", true},

Expand All @@ -665,6 +667,8 @@ func TestOpenTopicFromURL(t *testing.T) {
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue", false},
// OK, setting region.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?region=us-east-2", false},
// OK, setting usev2.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?awssdk=v2", false},
// Invalid parameter.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?param=value", true},
}
Expand Down Expand Up @@ -698,6 +702,8 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?raw=foo", true},
// OK, setting waittime.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?waittime=5s", false},
// OK, setting usev2.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?awssdk=v2", false},
// Invalid waittime.
{"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?waittime=foo", true},
// Invalid parameter.
Expand Down

0 comments on commit bb89706

Please sign in to comment.