Skip to content

Commit

Permalink
Add WithCustomAttributes for PubSub
Browse files Browse the repository at this point in the history
Enables to pre-fill PubSub Message Attributes independent of CloudEvent
spec.
Thus this allows for setting arbitrary Message Attributes (given that they
are not later overwritten by CloudEvent Attributes).

Signed-off-by: Andreas Bergmeier <andreas.bergmeier@otto.de>
  • Loading branch information
AndreasBergmeier6176 committed Oct 31, 2023
1 parent 0836a52 commit ed7be6b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 2 deletions.
25 changes: 25 additions & 0 deletions protocol/pubsub/v2/attributes.go
@@ -0,0 +1,25 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package pubsub

import (
"context"

"github.com/cloudevents/sdk-go/v2/binding"
)

type withCustomAttributes struct{}

func AttributesFrom(ctx context.Context) map[string]string {
return binding.GetOrDefaultFromCtx(ctx, withCustomAttributes{}, make(map[string]string)).(map[string]string)
}

// WithCustomAttributes sets Message Attributes without any CloudEvent logic.
// Note that this function is not intended for CloudEvent Extensions or any `ce-`-prefixed Attributes.
// For these please see `Event` and `Event.SetExtension`.
func WithCustomAttributes(ctx context.Context, attrs map[string]string) context.Context {
return context.WithValue(ctx, withCustomAttributes{}, attrs)
}
3 changes: 3 additions & 0 deletions protocol/pubsub/v2/doc.go
Expand Up @@ -5,5 +5,8 @@

/*
Package pubsub implements a Pub/Sub binding using google.cloud.com/go/pubsub module
PubSub Messages can be modified beyond what CloudEvents cover by using `WithOrderingKey`
or `WithCustomAttributes`. See function docs for more details.
*/
package pubsub
4 changes: 3 additions & 1 deletion protocol/pubsub/v2/protocol.go
Expand Up @@ -110,7 +110,9 @@ func (t *Protocol) Send(ctx context.Context, in binding.Message, transformers ..

conn := t.getOrCreateConnection(ctx, topic, "", "")

msg := &pubsub.Message{}
msg := &pubsub.Message{
Attributes: AttributesFrom(ctx),
}

if key, ok := ctx.Value(withOrderingKey{}).(string); ok {
if !t.MessageOrdering {
Expand Down
69 changes: 69 additions & 0 deletions protocol/pubsub/v2/protocol_test.go
Expand Up @@ -3,14 +3,17 @@ package pubsub
import (
"context"
"fmt"
"reflect"
"testing"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/pstest"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/cloudevents/sdk-go/v2/test"
)
Expand All @@ -20,6 +23,32 @@ type testPubsubClient struct {
conn *grpc.ClientConn
}

func (pc *testPubsubClient) NewWithAttributesInterceptor(ctx context.Context, projectID, orderingKey string) (*pubsub.Client, error) {
pc.srv = pstest.NewServer()
conn, err := grpc.Dial(pc.srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(customAttributesInterceptor(map[string]string{
"Content-Type": "text/json",
"ce-dataschema": "http://example.com/schema",
"ce-exbinary": "AAECAw==",
"ce-exbool": "true",
"ce-exint": "42",
"ce-exstring": "exstring",
"ce-extime": "2020-03-21T12:34:56.78Z",
"ce-exurl": "http://example.com/source",
"ce-id": "full-event",
"ce-source": "http://example.com/source",
"ce-specversion": "1.0",
"ce-subject": "topic",
"ce-time": "2020-03-21T12:34:56.78Z",
"ce-type": "com.example.FullEvent",
"Proxy-Authorization": "YWxhZGRpbjpvcGVuc2VzYW1l",
})))
if err != nil {
return nil, err
}
pc.conn = conn
return pubsub.NewClient(ctx, projectID, option.WithGRPCConn(conn))
}

func (pc *testPubsubClient) NewWithOrderInterceptor(ctx context.Context, projectID, orderingKey string) (*pubsub.Client, error) {
pc.srv = pstest.NewServer()
conn, err := grpc.Dial(pc.srv.Addr, grpc.WithInsecure(), grpc.WithUnaryInterceptor(orderingKeyInterceptor(orderingKey)))
Expand All @@ -35,6 +64,20 @@ func (pc *testPubsubClient) Close() {
pc.conn.Close()
}

func customAttributesInterceptor(attrs map[string]string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if method == "/google.pubsub.v1.Publisher/Publish" {
pr, _ := req.(*pubsubpb.PublishRequest)
for _, m := range pr.Messages {
if !reflect.DeepEqual(m.Attributes, attrs) {
return fmt.Errorf("expecting Attributes %q, got %q", attrs, m.Attributes)
}
}
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if method == "/google.pubsub.v1.Publisher/Publish" {
Expand All @@ -49,6 +92,32 @@ func orderingKeyInterceptor(orderingKey string) grpc.UnaryClientInterceptor {
}
}

func TestPublishMessageHasCustomAttributes(t *testing.T) {
require := require.New(t)
ctx := context.Background()
pc := &testPubsubClient{}
defer pc.Close()

projectID, topicID, orderingKey := "test-project", "test-topic", "foobar"

client, err := pc.NewWithAttributesInterceptor(ctx, projectID, orderingKey)
require.NoError(err, "create pubsub client")
defer client.Close()

prot, err := New(ctx,
WithClient(client),
WithProjectID(projectID),
WithTopicID(topicID),
AllowCreateTopic(true),
)
require.NoError(err, "create protocol")

err = prot.Send(WithCustomAttributes(ctx, map[string]string{
"Proxy-Authorization": "YWxhZGRpbjpvcGVuc2VzYW1l",
}), test.FullMessage())
require.NoError(err)
}

func TestPublishMessageHasOrderingKey(t *testing.T) {
require := require.New(t)
ctx := context.Background()
Expand Down
1 change: 0 additions & 1 deletion protocol/pubsub/v2/write_pubsub_message.go
Expand Up @@ -46,7 +46,6 @@ func (b *pubsubMessagePublisher) SetStructuredEvent(ctx context.Context, f forma
}

func (b *pubsubMessagePublisher) Start(ctx context.Context) error {
b.Attributes = make(map[string]string)
return nil
}

Expand Down

0 comments on commit ed7be6b

Please sign in to comment.