diff --git a/pubsub/awssnssqs/awssnssqs.go b/pubsub/awssnssqs/awssnssqs.go index 7ce10bdbbf..feb709aa5e 100644 --- a/pubsub/awssnssqs/awssnssqs.go +++ b/pubsub/awssnssqs/awssnssqs.go @@ -150,7 +150,10 @@ type lazySessionOpener struct { err error } -func (o *lazySessionOpener) defaultOpener() (*URLOpener, error) { +func (o *lazySessionOpener) defaultOpener(u *url.URL) (*URLOpener, error) { + if gcaws.UseV2(u.Query()) { + return &URLOpener{UseV2: true}, nil + } o.init.Do(func() { sess, err := gcaws.NewDefaultSession() if err != nil { @@ -165,7 +168,7 @@ func (o *lazySessionOpener) defaultOpener() (*URLOpener, error) { } func (o *lazySessionOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { - opener, err := o.defaultOpener() + opener, err := o.defaultOpener(u) if err != nil { return nil, fmt.Errorf("open topic %v: failed to open default session: %v", u, err) } @@ -173,7 +176,7 @@ func (o *lazySessionOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubs } func (o *lazySessionOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) { - opener, err := o.defaultOpener() + opener, err := o.defaultOpener(u) if err != nil { return nil, fmt.Errorf("open subscription %v: failed to open default session: %v", u, err) } @@ -200,7 +203,14 @@ const SQSScheme = "awssqs" // For SQS topics and subscriptions, the URL's host+path is prefixed with // "https://" to create the queue URL. // -// The following query parameters are supported: +// Use "awssdk=v1" to force using AWS SDK v1, "awssdk=v2" to force using AWS SDK v2, +// or anything else to accept the default. +// +// For V1, see gocloud.dev/aws/ConfigFromURLParams for supported query parameters +// for overriding the aws.Session from the URL. +// For V2, see gocloud.dev/aws/V2ConfigFromURLParams. +// +// In addition, the following query parameters are supported: // // - raw (for "awssqs" Subscriptions only): sets SubscriberOptions.Raw. The // value must be parseable by `strconv.ParseBool`. @@ -209,7 +219,11 @@ const SQSScheme = "awssqs" // See gocloud.dev/aws/ConfigFromURLParams for other query parameters // that affect the default AWS session. type URLOpener struct { + // UseV2 indicates whether the AWS SDK V2 should be used. + UseV2 bool + // ConfigProvider configures the connection to AWS. + // It must be set to a non-nil value if UseV2 is false. ConfigProvider client.ConfigProvider // TopicOptions specifies the options to pass to OpenTopic. @@ -220,6 +234,25 @@ type URLOpener struct { // OpenTopicURL opens a pubsub.Topic based on u. func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { + // Trim leading "/" if host is empty, so that + // awssns:///arn:aws:service:region:accountid:resourceType/resourcePath + // gives "arn:..." instead of "/arn:...". + topicARN := strings.TrimPrefix(path.Join(u.Host, u.Path), "/") + qURL := "https://" + path.Join(u.Host, u.Path) + if o.UseV2 { + cfg, err := gcaws.V2ConfigFromURLParams(ctx, u.Query()) + if err != nil { + return nil, fmt.Errorf("open topic %v: %v", u, err) + } + switch u.Scheme { + case SNSScheme: + return OpenSNSTopicV2(ctx, snsv2.NewFromConfig(cfg), topicARN, &o.TopicOptions), nil + case SQSScheme: + return OpenSQSTopicV2(ctx, sqsv2.NewFromConfig(cfg), qURL, &o.TopicOptions), nil + default: + return nil, fmt.Errorf("open topic %v: unsupported scheme", u) + } + } configProvider := &gcaws.ConfigOverrider{ Base: o.ConfigProvider, } @@ -230,13 +263,8 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic configProvider.Configs = append(configProvider.Configs, overrideCfg) switch u.Scheme { case SNSScheme: - // Trim leading "/" if host is empty, so that - // awssns:///arn:aws:service:region:accountid:resourceType/resourcePath - // gives "arn:..." instead of "/arn:...". - topicARN := strings.TrimPrefix(path.Join(u.Host, u.Path), "/") return OpenSNSTopic(ctx, configProvider, topicARN, &o.TopicOptions), nil case SQSScheme: - qURL := "https://" + path.Join(u.Host, u.Path) return OpenSQSTopic(ctx, configProvider, qURL, &o.TopicOptions), nil default: return nil, fmt.Errorf("open topic %v: unsupported scheme", u) @@ -245,9 +273,6 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic // OpenSubscriptionURL opens a pubsub.Subscription based on u. func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) { - configProvider := &gcaws.ConfigOverrider{ - Base: o.ConfigProvider, - } // Clone the options since we might override Raw. opts := o.SubscriptionOptions q := u.Query() @@ -267,12 +292,22 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu } q.Del("waittime") } + qURL := "https://" + path.Join(u.Host, u.Path) + if o.UseV2 { + cfg, err := gcaws.V2ConfigFromURLParams(ctx, q) + if err != nil { + return nil, fmt.Errorf("open subscription %v: %v", u, err) + } + return OpenSubscriptionV2(ctx, sqsv2.NewFromConfig(cfg), qURL, &opts), nil + } overrideCfg, err := gcaws.ConfigFromURLParams(q) if err != nil { return nil, fmt.Errorf("open subscription %v: %v", u, err) } + configProvider := &gcaws.ConfigOverrider{ + Base: o.ConfigProvider, + } configProvider.Configs = append(configProvider.Configs, overrideCfg) - qURL := "https://" + path.Join(u.Host, u.Path) return OpenSubscription(ctx, configProvider, qURL, &opts), nil } diff --git a/pubsub/awssnssqs/awssnssqs_test.go b/pubsub/awssnssqs/awssnssqs_test.go index ca5966dde7..9873b1f956 100644 --- a/pubsub/awssnssqs/awssnssqs_test.go +++ b/pubsub/awssnssqs/awssnssqs_test.go @@ -657,6 +657,8 @@ func TestOpenTopicFromURL(t *testing.T) { {"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath", false}, // OK, setting region. {"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath?region=us-east-2", false}, + // OK, setting usev2. + {"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath?awssdk=v2", false}, // Invalid parameter. {"awssns:///arn:aws:service:region:accountid:resourceType/resourcePath?param=value", true}, @@ -665,6 +667,8 @@ func TestOpenTopicFromURL(t *testing.T) { {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue", false}, // OK, setting region. {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?region=us-east-2", false}, + // OK, setting usev2. + {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?awssdk=v2", false}, // Invalid parameter. {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?param=value", true}, } @@ -698,6 +702,8 @@ func TestOpenSubscriptionFromURL(t *testing.T) { {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?raw=foo", true}, // OK, setting waittime. {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?waittime=5s", false}, + // OK, setting usev2. + {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?awssdk=v2", false}, // Invalid waittime. {"awssqs://sqs.us-east-2.amazonaws.com/99999/my-queue?waittime=foo", true}, // Invalid parameter.