Skip to content

Commit

Permalink
feat: add no publish handler
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed May 30, 2024
1 parent 4921daa commit 9c4ec4a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
2 changes: 1 addition & 1 deletion integration/watermill/gtag/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"
)

func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
var payload *gtag.Payload

Expand Down
23 changes: 23 additions & 0 deletions integration/watermill/gtag/nopublishmessagehandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package gtag

import (
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
"github.com/pkg/errors"
)

func NoPublishMessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) error {
return func(msg *message.Message) error {
var payload *gtag.Payload

// unmarshal payload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return errors.Wrap(err, "failed to unmarshal payload")
}

// handle payload
return handler(payload, msg)
}
}
2 changes: 1 addition & 1 deletion integration/watermill/mpv2/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"
)

func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
var payload *mpv2.Payload[any]

Expand Down
23 changes: 23 additions & 0 deletions integration/watermill/mpv2/nopublishmessagehandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package mpv2

import (
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
"github.com/pkg/errors"
)

func NoPublishMessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.NoPublishHandlerFunc {
return func(msg *message.Message) error {
var payload *mpv2.Payload[any]

// unmarshal payload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return errors.Wrap(err, "failed to unmarshal payload")
}

// handle payload
return handler(payload, msg)
}
}

0 comments on commit 9c4ec4a

Please sign in to comment.