Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ACK when receiving malformed events #906

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type ceClient struct {
eventDefaulterFns []EventDefaulter
pollGoroutines int
blockingCallback bool
ackMalformedEvent bool
}

func (c *ceClient) applyOptions(opts ...Option) error {
Expand Down Expand Up @@ -202,7 +203,13 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
return fmt.Errorf("client already has a receiver")
}

invoker, err := newReceiveInvoker(fn, c.observabilityService, c.inboundContextDecorators, c.eventDefaulterFns...)
invoker, err := newReceiveInvoker(
fn,
c.observabilityService,
c.inboundContextDecorators,
c.eventDefaulterFns,
c.ackMalformedEvent,
)
if err != nil {
return err
}
Expand Down
87 changes: 87 additions & 0 deletions v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/google/go-cmp/cmp"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/test"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
Expand Down Expand Up @@ -399,6 +401,68 @@ func TestClientContext(t *testing.T) {
wg.Wait()
}

func TestClientStartReceiverWithAckMalformedEvent(t *testing.T) {
testCases := []struct {
name string
opts []client.Option
expectedAck bool
}{
{
name: "without ack",
},
{
name: "with ack",
opts: []client.Option{client.WithAckMalformedEvent()},
expectedAck: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// make sure the receiver goroutine is closed
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

receiver := &mockReceiver{
finished: make(chan struct{}),
}

// only need 1 goroutine to exercise this
tc.opts = append(tc.opts, client.WithPollGoroutines(1))

c, err := client.New(receiver, tc.opts...)
if err != nil {
t.Fatalf("failed to construct client: %v", err)
}

go c.StartReceiver(ctx, func(ctx context.Context, e event.Event) protocol.Result {
t.Error("receiver callback called unexpectedly")
return nil
})

ctx, cancelTimeout := context.WithTimeout(ctx, time.Second)
defer cancelTimeout()

select {
case <-receiver.finished:
// continue to rest of the test
case <-ctx.Done():
t.Fatalf("timed out waiting for receiver to complete")
}

if tc.expectedAck {
if protocol.IsNACK(receiver.result) {
t.Errorf("receiver did not receive ACK: %v", receiver.result)
}
} else {
if protocol.IsACK(receiver.result) {
t.Errorf("receiver did not receive NACK: %v", receiver.result)
}
}
})
}
}

type requestValidation struct {
Host string
Headers http.Header
Expand Down Expand Up @@ -488,3 +552,26 @@ func isImportantHeader(h string) bool {
}
return true
}

type mockReceiver struct {
mu sync.Mutex
count int
result error
finished chan struct{}
}

func (m *mockReceiver) Receive(ctx context.Context) (binding.Message, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.count > 0 {
return nil, io.EOF
}

m.count++

return binding.WithFinish(test.UnknownMessage, func(err error) {
m.result = err
close(m.finished)
}), nil
}
2 changes: 1 addition & 1 deletion v2/client/http_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) {
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil) //TODO(slinkydeveloper) maybe not nil?
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil, nil, false) //TODO(slinkydeveloper) maybe not nil?
if err != nil {
return nil, err
}
Expand Down
14 changes: 11 additions & 3 deletions v2/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ type Invoker interface {

var _ Invoker = (*receiveInvoker)(nil)

func newReceiveInvoker(fn interface{}, observabilityService ObservabilityService, inboundContextDecorators []func(context.Context, binding.Message) context.Context, fns ...EventDefaulter) (Invoker, error) {
func newReceiveInvoker(
fn interface{},
observabilityService ObservabilityService,
inboundContextDecorators []func(context.Context, binding.Message) context.Context,
fns []EventDefaulter,
ackMalformedEvent bool,
) (Invoker, error) {
r := &receiveInvoker{
eventDefaulterFns: fns,
observabilityService: observabilityService,
inboundContextDecorators: inboundContextDecorators,
ackMalformedEvent: ackMalformedEvent,
}

if fn, err := receiver(fn); err != nil {
Expand All @@ -44,6 +51,7 @@ type receiveInvoker struct {
observabilityService ObservabilityService
eventDefaulterFns []EventDefaulter
inboundContextDecorators []func(context.Context, binding.Message) context.Context
ackMalformedEvent bool
}

func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) {
Expand All @@ -58,13 +66,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
switch {
case eventErr != nil && r.fn.hasEventIn:
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
return respFn(ctx, nil, protocol.NewReceipt(false, "failed to convert Message to Event: %w", eventErr))
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
case r.fn != nil:
// Check if event is valid before invoking the receiver function
if e != nil {
if validationErr := e.Validate(); validationErr != nil {
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
return respFn(ctx, nil, protocol.NewReceipt(false, "validation error in incoming event: %w", validationErr))
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
}
}

Expand Down
13 changes: 13 additions & 0 deletions v2/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,16 @@ func WithBlockingCallback() Option {
return nil
}
}

// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged
// rather than being permanently not-acknowledged. This can be useful when a protocol does not
// provide a responder implementation and would otherwise cause the receiver to be partially or
// fully stuck.
func WithAckMalformedEvent() Option {
return func(i interface{}) error {
mattdowdell marked this conversation as resolved.
Show resolved Hide resolved
if c, ok := i.(*ceClient); ok {
c.ackMalformedEvent = true
}
return nil
}
}
28 changes: 28 additions & 0 deletions v2/client/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,31 @@ func TestWith_Defaulters(t *testing.T) {
})
}
}

func TestWithAckMalformedEvent(t *testing.T) {
testCases := []struct {
name string
opts []Option
expected bool
}{
{
name: "unset",
},
{
name: "set",
opts: []Option{WithAckMalformedEvent()},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := &ceClient{}
client.applyOptions(tc.opts...)

if client.ackMalformedEvent != tc.expected {
t.Errorf("unexpected ackMalformedEvent; want: %t; got: %t", tc.expected, client.ackMalformedEvent)
}
})
}
}