diff --git a/components/cqrs/command_processor.go b/components/cqrs/command_processor.go index 67c4a6865..479e0c488 100644 --- a/components/cqrs/command_processor.go +++ b/components/cqrs/command_processor.go @@ -130,7 +130,7 @@ func (p CommandProcessor) Handlers() []CommandHandler { return p.handlers } -func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) { +func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) { cmd := handler.NewCommand() cmdName := p.marshaler.Name(cmd) @@ -138,7 +138,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water return nil, err } - return func(msg *message.Message) ([]*message.Message, error) { + return func(msg *message.Message) error { cmd := handler.NewCommand() messageCmdName := p.marshaler.NameFromMessage(msg) @@ -148,7 +148,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water "expected_command_type": cmdName, "received_command_type": messageCmdName, }) - return nil, nil + return nil } logger.Debug("Handling command", watermill.LogFields{ @@ -157,15 +157,15 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water }) if err := p.marshaler.Unmarshal(msg, cmd); err != nil { - return nil, err + return err } if err := handler.Handle(msg.Context(), cmd); err != nil { logger.Debug("Error when handling command", watermill.LogFields{"err": err}) - return nil, err + return err } - return nil, nil + return nil }, nil } diff --git a/components/cqrs/event_processor.go b/components/cqrs/event_processor.go index d63d23b18..19e57aece 100644 --- a/components/cqrs/event_processor.go +++ b/components/cqrs/event_processor.go @@ -115,7 +115,7 @@ func (p EventProcessor) Handlers() []EventHandler { return p.handlers } -func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) { +func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) { initEvent := handler.NewEvent() expectedEventName := p.marshaler.Name(initEvent) @@ -123,7 +123,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill return nil, err } - return func(msg *message.Message) ([]*message.Message, error) { + return func(msg *message.Message) error { event := handler.NewEvent() messageEventName := p.marshaler.NameFromMessage(msg) @@ -133,7 +133,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill "expected_event_type": expectedEventName, "received_event_type": messageEventName, }) - return nil, nil + return nil } logger.Debug("Handling event", watermill.LogFields{ @@ -142,15 +142,15 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill }) if err := p.marshaler.Unmarshal(msg, event); err != nil { - return nil, err + return err } if err := handler.Handle(msg.Context(), event); err != nil { logger.Debug("Error when handling event", watermill.LogFields{"err": err}) - return nil, err + return err } - return nil, nil + return nil }, nil } diff --git a/message/router.go b/message/router.go index 5a9fdb0e9..dc73e879a 100644 --- a/message/router.go +++ b/message/router.go @@ -30,6 +30,9 @@ var ( // (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers). type HandlerFunc func(msg *Message) ([]*Message, error) +// NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any messages. +type NoPublishHandlerFunc func(msg *Message) error + // HandlerMiddleware allows us to write something like decorators to HandlerFunc. // It can execute something before handler (for example: modify consumed message) // or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.). @@ -228,9 +231,13 @@ func (r *Router) AddNoPublisherHandler( handlerName string, subscribeTopic string, subscriber Subscriber, - handlerFunc HandlerFunc, + handlerFunc NoPublishHandlerFunc, ) { - r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFunc) + handlerFuncAdapter := func(msg *Message) ([]*Message, error) { + return nil, handlerFunc(msg) + } + + r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFuncAdapter) } // Run runs all plugins and handlers and starts subscribing to provided topics. diff --git a/message/router_test.go b/message/router_test.go index 1701d309a..10cbfe8be 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -82,9 +82,9 @@ func TestRouter_functional(t *testing.T) { "test_subscriber_2", subscribeTopic, pubSub, - func(msg *message.Message) (producedMessages []*message.Message, err error) { + func(msg *message.Message) (err error) { receivedMessagesCh2 <- msg - return nil, nil + return nil }, ) @@ -133,7 +133,7 @@ func TestRouter_functional_nack(t *testing.T) { "test_subscriber_1", "subscribe_topic", pubSub, - func(msg *message.Message) (producedMessages []*message.Message, err error) { + func(msg *message.Message) (err error) { messageReceived <- msg if !internal.IsChannelClosed(nackSend) { @@ -141,7 +141,7 @@ func TestRouter_functional_nack(t *testing.T) { close(nackSend) } - return nil, nil + return nil }, ) @@ -182,8 +182,8 @@ func TestRouter_stop_when_all_handlers_stopped(t *testing.T) { "handler_1", "foo", pubSub1, - func(msg *message.Message) (messages []*message.Message, e error) { - return nil, nil + func(msg *message.Message) (e error) { + return nil }, ) @@ -191,8 +191,8 @@ func TestRouter_stop_when_all_handlers_stopped(t *testing.T) { "handler_2", "foo", pubSub2, - func(msg *message.Message) (messages []*message.Message, e error) { - return nil, nil + func(msg *message.Message) (e error) { + return nil }, ) @@ -298,21 +298,15 @@ func TestRouterNoPublisherHandler(t *testing.T) { ) require.NoError(t, err) - msgReceived := false wait := make(chan struct{}) r.AddNoPublisherHandler( "test_no_publisher_handler", "subscribe_topic", pubSub, - func(msg *message.Message) (producedMessages []*message.Message, err error) { - if msgReceived { - require.True(t, msg.Ack()) - close(wait) - return nil, nil - } - msgReceived = true - return message.Messages{msg}, nil + func(msg *message.Message) (err error) { + close(wait) + return nil }, ) @@ -325,12 +319,13 @@ func TestRouterNoPublisherHandler(t *testing.T) { err = pubSub.Publish("subscribe_topic", publishedMsg) require.NoError(t, err) - <-wait + select { + case <-wait: + // ok + case <-time.After(time.Second): + t.Fatal("no message received") + } - // handler has no publisher, so the router should complain about it - // however, it returns no error for now (because of how messages are processed in the router), - // so let's just look for the error in the logger. - assert.True(t, logger.HasError(message.ErrOutputInNoPublisherHandler)) require.NoError(t, r.Close()) } @@ -351,9 +346,9 @@ func BenchmarkRouterNoPublisherHandler(b *testing.B) { "handler", "benchmark_topic", sub, - func(msg *message.Message) (messages []*message.Message, e error) { + func(msg *message.Message) (e error) { allProcessedWg.Done() - return nil, nil + return nil }, )