Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changed AddNoPublisherHandler signature #82

Merged
merged 3 commits into from Jun 1, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions components/cqrs/command_processor.go
Expand Up @@ -130,15 +130,15 @@ func (p CommandProcessor) Handlers() []CommandHandler {
return p.handlers
}

func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) {
cmd := handler.NewCommand()
cmdName := p.marshaler.Name(cmd)

if err := p.validateCommand(cmd); err != nil {
return nil, err
}

return func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) error {
cmd := handler.NewCommand()
messageCmdName := p.marshaler.NameFromMessage(msg)

Expand All @@ -148,7 +148,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
"expected_command_type": cmdName,
"received_command_type": messageCmdName,
})
return nil, nil
return nil
}

logger.Debug("Handling command", watermill.LogFields{
Expand All @@ -157,15 +157,15 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
})

if err := p.marshaler.Unmarshal(msg, cmd); err != nil {
return nil, err
return err
}

if err := handler.Handle(msg.Context(), cmd); err != nil {
logger.Debug("Error when handling command", watermill.LogFields{"err": err})
return nil, err
return err
}

return nil, nil
return nil
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions components/cqrs/event_processor.go
Expand Up @@ -115,15 +115,15 @@ func (p EventProcessor) Handlers() []EventHandler {
return p.handlers
}

func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) {
initEvent := handler.NewEvent()
expectedEventName := p.marshaler.Name(initEvent)

if err := p.validateEvent(initEvent); err != nil {
return nil, err
}

return func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) error {
event := handler.NewEvent()
messageEventName := p.marshaler.NameFromMessage(msg)

Expand All @@ -133,7 +133,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
"expected_event_type": expectedEventName,
"received_event_type": messageEventName,
})
return nil, nil
return nil
}

logger.Debug("Handling event", watermill.LogFields{
Expand All @@ -142,15 +142,15 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
})

if err := p.marshaler.Unmarshal(msg, event); err != nil {
return nil, err
return err
}

if err := handler.Handle(msg.Context(), event); err != nil {
logger.Debug("Error when handling event", watermill.LogFields{"err": err})
return nil, err
return err
}

return nil, nil
return nil
}, nil
}

Expand Down
11 changes: 9 additions & 2 deletions message/router.go
Expand Up @@ -30,6 +30,9 @@ var (
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any message.
roblaszczak marked this conversation as resolved.
Show resolved Hide resolved
type NoPublishHandlerFunc func(msg *Message) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ReadOnlyHandlerFunc or ConsumeOnlyHandlerFunc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds better @maclav3 ? and does we want to also change AddNoPublisherHandler?


// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
Expand Down Expand Up @@ -228,9 +231,13 @@ func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc HandlerFunc,
handlerFunc NoPublishHandlerFunc,
) {
r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFunc)
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
return nil, handlerFunc(msg)
}

r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFuncAdapter)
}

// Run runs all plugins and handlers and starts subscribing to provided topics.
Expand Down
43 changes: 19 additions & 24 deletions message/router_test.go
Expand Up @@ -82,9 +82,9 @@ func TestRouter_functional(t *testing.T) {
"test_subscriber_2",
subscribeTopic,
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
func(msg *message.Message) (err error) {
receivedMessagesCh2 <- msg
return nil, nil
return nil
},
)

Expand Down Expand Up @@ -133,15 +133,15 @@ func TestRouter_functional_nack(t *testing.T) {
"test_subscriber_1",
"subscribe_topic",
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
func(msg *message.Message) (err error) {
messageReceived <- msg

if !internal.IsChannelClosed(nackSend) {
msg.Nack()
close(nackSend)
}

return nil, nil
return nil
},
)

Expand Down Expand Up @@ -182,17 +182,17 @@ func TestRouter_stop_when_all_handlers_stopped(t *testing.T) {
"handler_1",
"foo",
pubSub1,
func(msg *message.Message) (messages []*message.Message, e error) {
return nil, nil
func(msg *message.Message) (e error) {
return nil
},
)

r.AddNoPublisherHandler(
"handler_2",
"foo",
pubSub2,
func(msg *message.Message) (messages []*message.Message, e error) {
return nil, nil
func(msg *message.Message) (e error) {
return nil
},
)

Expand Down Expand Up @@ -298,21 +298,15 @@ func TestRouterNoPublisherHandler(t *testing.T) {
)
require.NoError(t, err)

msgReceived := false
wait := make(chan struct{})

r.AddNoPublisherHandler(
"test_no_publisher_handler",
"subscribe_topic",
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
if msgReceived {
require.True(t, msg.Ack())
close(wait)
return nil, nil
}
msgReceived = true
return message.Messages{msg}, nil
func(msg *message.Message) (err error) {
close(wait)
return nil
},
)

Expand All @@ -325,12 +319,13 @@ func TestRouterNoPublisherHandler(t *testing.T) {
err = pubSub.Publish("subscribe_topic", publishedMsg)
require.NoError(t, err)

<-wait
select {
case <-wait:
// ok
case <-time.After(time.Second):
t.Fatal("no message received")
}

// handler has no publisher, so the router should complain about it
// however, it returns no error for now (because of how messages are processed in the router),
// so let's just look for the error in the logger.
assert.True(t, logger.HasError(message.ErrOutputInNoPublisherHandler))
require.NoError(t, r.Close())
}

Expand All @@ -351,9 +346,9 @@ func BenchmarkRouterNoPublisherHandler(b *testing.B) {
"handler",
"benchmark_topic",
sub,
func(msg *message.Message) (messages []*message.Message, e error) {
func(msg *message.Message) (e error) {
allProcessedWg.Done()
return nil, nil
return nil
},
)

Expand Down