Skip to content

Commit

Permalink
feat(pubsub): support payload wrapping for push subs (#8292)
Browse files Browse the repository at this point in the history
* feat(pubsub): support payload wrapping for push subs

* fix lint issues

* fix lint issues
  • Loading branch information
hongalex committed Jul 24, 2023
1 parent d3f60b3 commit fd49db5
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/vet.sh
Expand Up @@ -55,6 +55,7 @@ golint ./... 2>&1 | (
grep -vE " executeStreamingSql(Min|Rnd)Time" |
grep -vE " executeSql(Min|Rnd)Time" |
grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" |
grep -vE "pubsub\/subscription\.go.+ type name will be used as pubsub.PubsubWrapper by other packages" |
grep -v "ClusterId" |
grep -v "InstanceId" |
grep -v "firestore.arrayUnion" |
Expand Down
5 changes: 5 additions & 0 deletions pubsub/pstest/fake.go
Expand Up @@ -498,6 +498,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
}
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
} else if ps.PushConfig.Wrapper == nil {
// Wrapper should default to PubsubWrapper.
ps.PushConfig.Wrapper = &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
}
}
// Consider any table set to mean the config is active.
// We don't convert nil config to empty like with PushConfig above
Expand Down
3 changes: 3 additions & 0 deletions pubsub/pstest/fake_test.go
Expand Up @@ -1543,6 +1543,9 @@ func TestSubscriptionPushPull(t *testing.T) {
// Create a push subscription.
pc := &pb.PushConfig{
PushEndpoint: "some-endpoint",
Wrapper: &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
},
}
got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
AckDeadlineSeconds: minAckDeadlineSecs,
Expand Down
70 changes: 67 additions & 3 deletions pubsub/subscription.go
Expand Up @@ -148,6 +148,10 @@ type PushConfig struct {
// This field is optional and should be set only by users interested in
// authenticated push.
AuthenticationMethod AuthenticationMethod

// The format of the delivered message to the push endpoint is defined by
// the chosen wrapper. When unset, `PubsubWrapper` is used.
Wrapper Wrapper
}

func (pc *PushConfig) toProto() *pb.PushConfig {
Expand All @@ -165,12 +169,19 @@ func (pc *PushConfig) toProto() *pb.PushConfig {
default: // TODO: add others here when GAIC adds more definitions.
}
}
if w := pc.Wrapper; w != nil {
switch wt := w.(type) {
case *PubsubWrapper:
pbCfg.Wrapper = wt.toProto()
case *NoWrapper:
pbCfg.Wrapper = wt.toProto()
default:
}
}
return pbCfg
}

// AuthenticationMethod is used by push points to verify the source of push requests.
// This interface defines fields that are part of a closed alpha that may not be accessible
// to all users.
// AuthenticationMethod is used by push subscriptions to verify the source of push requests.
type AuthenticationMethod interface {
isAuthMethod() bool
}
Expand Down Expand Up @@ -212,6 +223,49 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
}
}

// Wrapper defines the format of message delivered to push endpoints.
type Wrapper interface {
isWrapper() bool
}

// PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON
// representation of a PubsubMessage
// (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
type PubsubWrapper struct{}

var _ Wrapper = (*PubsubWrapper)(nil)

func (p *PubsubWrapper) isWrapper() bool { return true }

func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ {
if p == nil {
return nil
}
return &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
}
}

// NoWrapper denotes not wrapping the payload sent to the push endpoint.
type NoWrapper struct {
WriteMetadata bool
}

var _ Wrapper = (*NoWrapper)(nil)

func (n *NoWrapper) isWrapper() bool { return true }

func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ {
if n == nil {
return nil
}
return &pb.PushConfig_NoWrapper_{
NoWrapper: &pb.PushConfig_NoWrapper{
WriteMetadata: n.WriteMetadata,
},
}
}

// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int

Expand Down Expand Up @@ -648,6 +702,16 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
}
}
}
if w := pbPc.Wrapper; w != nil {
switch wt := w.(type) {
case *pb.PushConfig_PubsubWrapper_:
pc.Wrapper = &PubsubWrapper{}
case *pb.PushConfig_NoWrapper_:
pc.Wrapper = &NoWrapper{
WriteMetadata: wt.NoWrapper.WriteMetadata,
}
}
}
return pc
}

Expand Down
37 changes: 22 additions & 15 deletions pubsub/subscription_test.go
Expand Up @@ -154,7 +154,7 @@ func TestListTopicSubscriptions(t *testing.T) {

const defaultRetentionDuration = 168 * time.Hour

func TestUpdateSubscription(t *testing.T) {
func TestSubscriptionConfig(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
Expand Down Expand Up @@ -191,13 +191,14 @@ func TestUpdateSubscription(t *testing.T) {
ServiceAccountEmail: "foo@example.com",
Audience: "client-12345",
},
Wrapper: &PubsubWrapper{},
},
EnableExactlyOnceDelivery: false,
State: SubscriptionStateActive,
}
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
if !testutil.Equal(cfg, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", cfg, want)
if diff := testutil.Diff(cfg, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}

got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
Expand All @@ -206,10 +207,13 @@ func TestUpdateSubscription(t *testing.T) {
Labels: map[string]string{"label": "value"},
ExpirationPolicy: 72 * time.Hour,
PushConfig: &PushConfig{
Endpoint: "https://example.com/push",
Endpoint: "https://example2.com/push",
AuthenticationMethod: &OIDCToken{
ServiceAccountEmail: "foo@example.com",
Audience: "client-12345",
ServiceAccountEmail: "bar@example.com",
Audience: "client-98765",
},
Wrapper: &NoWrapper{
WriteMetadata: true,
},
},
EnableExactlyOnceDelivery: true,
Expand All @@ -225,17 +229,20 @@ func TestUpdateSubscription(t *testing.T) {
Labels: map[string]string{"label": "value"},
ExpirationPolicy: 72 * time.Hour,
PushConfig: PushConfig{
Endpoint: "https://example.com/push",
Endpoint: "https://example2.com/push",
AuthenticationMethod: &OIDCToken{
ServiceAccountEmail: "foo@example.com",
Audience: "client-12345",
ServiceAccountEmail: "bar@example.com",
Audience: "client-98765",
},
Wrapper: &NoWrapper{
WriteMetadata: true,
},
},
EnableExactlyOnceDelivery: true,
State: SubscriptionStateActive,
}
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
if diff := testutil.Diff(got, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}

got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
Expand All @@ -247,8 +254,8 @@ func TestUpdateSubscription(t *testing.T) {
}
want.RetentionDuration = 2 * time.Hour
want.Labels = nil
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
if diff := testutil.Diff(got, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}

_, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
Expand All @@ -264,8 +271,8 @@ func TestUpdateSubscription(t *testing.T) {
t.Fatal(err)
}
want.ExpirationPolicy = time.Duration(0)
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
if diff := testutil.Diff(got, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}
}

Expand Down

0 comments on commit fd49db5

Please sign in to comment.