Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Adding send mode to Receive Adapter for PullSubscription. #66

Merged
merged 4 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions cmd/pubsub/push_transformer/main.go

This file was deleted.

18 changes: 5 additions & 13 deletions cmd/pubsub/receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,20 @@ func main() {

ctx := logging.WithLogger(signals.NewContext(), logger)

var env envConfig
if err := envconfig.Process("", &env); err != nil {
startable := adapter.Adapter{}
if err := envconfig.Process("", &startable); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

if env.Project == "" {
if startable.Project == "" {
project, err := metadata.ProjectID()
if err != nil {
logger.Fatal("failed to find project id. ", zap.Error(err))
}
env.Project = project
startable.Project = project
}

logger.Info("using project.", zap.String("project", env.Project))

startable := &adapter.Adapter{
ProjectID: env.Project,
TopicID: env.Topic,
SinkURI: env.Sink,
SubscriptionID: env.Subscription,
TransformerURI: env.Transformer,
}
logger.Info("using project.", zap.String("project", startable.Project))

logger.Info("Starting Pub/Sub Receive Adapter.", zap.Any("adapter", startable))
if err := startable.Start(ctx); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/pubsub/v1alpha1/pull_subscription_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (
)

func (s *PullSubscription) SetDefaults(ctx context.Context) {
if s.ObjectMeta.Annotations == nil {
s.ObjectMeta.Annotations = map[string]string{
PubSubModeAnnotation: PubSubModeCloudEventsBinary,
}
} else if _, ok := s.ObjectMeta.Annotations[PubSubModeAnnotation]; !ok {
s.ObjectMeta.Annotations[PubSubModeAnnotation] = PubSubModeCloudEventsBinary
}

s.Spec.SetDefaults(ctx)
}

Expand Down
49 changes: 44 additions & 5 deletions pkg/apis/pubsub/v1alpha1/pull_subscription_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,54 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPullSubscriptionDefaults(t *testing.T) {
want := &PullSubscription{Spec: PullSubscriptionSpec{}}
tests := []struct {
name string
start *PullSubscription
want *PullSubscription
}{{
name: "non-nil",
start: &PullSubscription{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
},
Spec: PullSubscriptionSpec{},
},
want: &PullSubscription{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
PubSubModeAnnotation: PubSubModeCloudEventsBinary,
},
},
Spec: PullSubscriptionSpec{},
},
}, {
name: "nil annotations",
start: &PullSubscription{
ObjectMeta: metav1.ObjectMeta{},
Spec: PullSubscriptionSpec{},
},
want: &PullSubscription{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
PubSubModeAnnotation: PubSubModeCloudEventsBinary,
},
},
Spec: PullSubscriptionSpec{},
},
}}

got := want.DeepCopy()
got.SetDefaults(context.Background())
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := test.start
got.SetDefaults(context.Background())

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
if diff := cmp.Diff(test.want, got); diff != "" {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
}
})
}
}
12 changes: 12 additions & 0 deletions pkg/apis/pubsub/v1alpha1/pull_subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type PullSubscription struct {
Status PullSubscriptionStatus `json:"status,omitempty"`
}

func (p *PullSubscription) PubSubMode() string {
if mode, ok := p.ObjectMeta.Annotations[PubSubModeAnnotation]; ok {
return mode
}
return ""
}

// Check that PullSubscription can be validated and can be defaulted.
var _ runtime.Object = (*PullSubscription)(nil)

Expand Down Expand Up @@ -85,6 +92,11 @@ const (
// PubSubEventType is the GcpPubSub CloudEvent type, in case PullSubscription
// doesn't send a CloudEvent itself.
PubSubEventType = "google.pubsub.topic.publish"

PubSubModeAnnotation = "pubsub.cloud.run/mode"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this would be clearer as format than mode? mode is a little vague.

PubSubModeCloudEventsBinary = "CloudEventsBinary"
PubSubModeCloudEventsStructured = "CloudEventsStructured"
PubSubModePushCompatible = "PushCompatible"
)

// PubSubEventSource returns the Cloud Pub/Sub CloudEvent source value.
Expand Down
30 changes: 28 additions & 2 deletions pkg/apis/pubsub/v1alpha1/pull_subscription_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package v1alpha1
import (
"testing"

"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func TestPubSubEventSource(t *testing.T) {
Expand All @@ -48,3 +48,29 @@ func TestPullSubscriptionGetGroupVersionKind(t *testing.T) {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
}
}

func TestPullSubscriptionPubSubMode_nil(t *testing.T) {
want := ""

c := &PullSubscription{}
got := c.PubSubMode()

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
}
}

func TestPullSubscriptionPubSubMode_set(t *testing.T) {
want := "test"

c := &PullSubscription{ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
PubSubModeAnnotation: "test",
},
}}
got := c.PubSubMode()

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
}
}
10 changes: 10 additions & 0 deletions pkg/apis/pubsub/v1alpha1/pull_subscription_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ var (
}
)

func TestPubSubCheckValidationFields(t *testing.T) {
obj := pullSubscriptionSpec.DeepCopy()

err := obj.Validate(context.TODO())

if err != nil {
t.Fatalf("Unexpected validation field error. Expected %v. Actual %v", nil, err)
}
}

func TestPubSubCheckImmutableFields(t *testing.T) {
testCases := map[string]struct {
orig interface{}
Expand Down
70 changes: 40 additions & 30 deletions pkg/pubsub/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,27 @@ import (
// Adapter implements the Pub/Sub adapter to deliver Pub/Sub messages from a
// pre-existing topic/subscription to a Sink.
type Adapter struct {
// ProjectID is the pre-existing eventing project id to use.
ProjectID string
// TopicID is the pre-existing eventing pub/sub topic id to use.
TopicID string
// SubscriptionID is the pre-existing eventing pub/sub subscription id to use.
SubscriptionID string
// SinkURI is the URI messages will be forwarded on to.
SinkURI string
// TransformerURI is the URI messages will be forwarded on to for any transformation
// before they are sent to SinkURI.
TransformerURI string

// SendMode describes how the adapter sends events. Default: Binary
SendMode ModeType
// Environment variable containing project id.
Project string `envconfig:"PROJECT_ID"`

// Environment variable containing the sink URI.
Sink string `envconfig:"SINK_URI" required:"true"`

// Environment variable containing the transformer URI.
Transformer string `envconfig:"TRANSFORMER_URI"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need Transformer here? What is its purpose if we have SendMode?


// Topic is the environment variable containing the PubSub Topic being
// subscribed to's name. In the form that is unique within the project.
// E.g. 'laconia', not 'projects/my-gcp-project/topics/laconia'.
Topic string `envconfig:"PUBSUB_TOPIC_ID" required:"true"`

// Subscription is the environment variable containing the name of the
// subscription to use.
Subscription string `envconfig:"PUBSUB_SUBSCRIPTION_ID" required:"true"`

// SendMode describes how the adapter sends events.
// One of [binary, structured, push]. Default: binary
SendMode ModeType `envconfig:"SEND_MODE" default:"binary" required:"true"`

// inbound is the cloudevents client to use to receive events.
inbound cloudevents.Client
Expand All @@ -67,6 +74,8 @@ const (
Binary ModeType = "binary"
// Structured mode is structured encoding.
Structured ModeType = "structured"
// Push mode emulates Pub/Sub push encoding.
Push ModeType = "push"
// DefaultSendMode is the default choice.
DefaultSendMode = Binary
)
Expand All @@ -88,15 +97,15 @@ func (a *Adapter) Start(ctx context.Context) error {

// Send events on HTTP.
if a.outbound == nil {
if a.outbound, err = a.newHTTPClient(a.SinkURI); err != nil {
if a.outbound, err = a.newHTTPClient(a.Sink); err != nil {
return fmt.Errorf("failed to create outbound cloudevent client: %s", err.Error())
}
}

// Make the transformer client in case the TransformerURI has been set.
if a.TransformerURI != "" {
if a.Transformer != "" {
if a.transformer == nil {
if a.transformer, err = kncloudevents.NewDefaultClient(a.TransformerURI); err != nil {
if a.transformer, err = kncloudevents.NewDefaultClient(a.Transformer); err != nil {
return fmt.Errorf("failed to create transformer cloudevent client: %s", err.Error())
}
}
Expand All @@ -106,7 +115,7 @@ func (a *Adapter) Start(ctx context.Context) error {
}

func (a *Adapter) receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()), zap.Any("sink", a.SinkURI))
logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()), zap.Any("sink", a.Sink))

// If a transformer has been configured, then transform the message.
if a.transformer != nil {
Expand All @@ -125,6 +134,11 @@ func (a *Adapter) receive(ctx context.Context, event cloudevents.Event, resp *cl
event = *transformedEvent
}

// If send mode is Push, convert to Pub/Sub Push payload style.
if a.SendMode == Push {
event = ConvertToPush(ctx, event)
}

if r, err := a.outbound.Send(ctx, event); err != nil {
return err
} else if r != nil {
Expand All @@ -148,16 +162,12 @@ func (a *Adapter) convert(ctx context.Context, m transport.Message, err error) (
event.SetType(v1alpha1.PubSubEventType)
event.Data = msg.Data
event.DataEncoded = true

// The following is experimental additions to support converting pubsub
// messages received by the PullSubscription into a form that matches
// how Pub/Sub produces http push requests. These might change as
// more testing is done with this concept.
// Attributes are extensions.
if msg.Attributes != nil && len(msg.Attributes) > 0 {
event.SetExtension("attributes", msg.Attributes)
for k, v := range msg.Attributes {
event.SetExtension(k, v)
}
}
event.SetExtension("topic", tx.Topic)
event.SetExtension("subscription", tx.Subscription)

return &event, nil
}
Expand All @@ -166,9 +176,9 @@ func (a *Adapter) convert(ctx context.Context, m transport.Message, err error) (

func (a *Adapter) newPubSubClient(ctx context.Context) (cloudevents.Client, error) {
tOpts := []cepubsub.Option{
cepubsub.WithProjectID(a.ProjectID),
cepubsub.WithTopicID(a.TopicID),
cepubsub.WithSubscriptionID(a.SubscriptionID),
cepubsub.WithProjectID(a.Project),
cepubsub.WithTopicID(a.Topic),
cepubsub.WithSubscriptionID(a.Subscription),
}

// Make a pubsub transport for the CloudEvents client.
Expand All @@ -189,7 +199,7 @@ func (a *Adapter) newHTTPClient(target string) (cloudevents.Client, error) {
}

switch a.SendMode {
case Binary:
case Binary, Push:
tOpts = append(tOpts, cloudevents.WithBinaryEncoding())
case Structured:
tOpts = append(tOpts, cloudevents.WithStructuredEncoding())
Expand Down
Loading