Skip to content

Commit

Permalink
feat: add message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed May 22, 2024
1 parent 8419d07 commit 5cb6719
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 45 deletions.
34 changes: 34 additions & 0 deletions integration/watermill/gtag/messagehandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package gtag

import (
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
"github.com/pkg/errors"
)

func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
var payload *gtag.Payload

// unmarshal payload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal payload")
}

// handle payload
if err := handler(payload, msg); err != nil {
return nil, err
}

// marshal payload
b, err := json.Marshal(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal payload")
}
msg.Payload = b

return []*message.Message{msg}, nil
}
}
30 changes: 0 additions & 30 deletions integration/watermill/measurementprotocol/v2/eventhandler.go

This file was deleted.

34 changes: 34 additions & 0 deletions integration/watermill/mpv2/messagehandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mpv2

import (
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
"github.com/pkg/errors"
)

func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
var payload *mpv2.Payload[any]

// unmarshal payload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal payload")
}

// handle payload
if err := handler(payload, msg); err != nil {
return nil, err
}

// marshal payload
b, err := json.Marshal(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal payload")
}
msg.Payload = b

return []*message.Message{msg}, nil
}
}
5 changes: 0 additions & 5 deletions integration/watermill/mpv2/provider.go

This file was deleted.

14 changes: 4 additions & 10 deletions integration/watermill/mpv2/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,21 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Payload[any]) error {
func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error {
// marshal message payload
payload, err := json.Marshal(event)
jsonPayload, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "failed to marshal payload")
}

msg := message.NewMessage(s.uuidFunc(), payload)
msg := message.NewMessage(s.uuidFunc(), jsonPayload)
l = l.With(zap.String("message_id", msg.UUID))
// if labeler, ok := keellog.LabelerFromRequest(r); ok {
// labeler.Add(zap.String("message_id", msg.UUID))
// }
// if event.EventName != nil {
// msg.Metadata.Set(MetadataEventName, gtag.Get(event.EventName).String())
// }

// TODO filter headers?
for name, headers := range r.Header {
msg.Metadata.Set(name, strings.Join(headers, ","))
}
//

// if cookies := r.Cookies(); len(cookies) > 0 {
// values := make([]string, len(cookies))
// for i, cookie := range r.Cookies() {
Expand Down

0 comments on commit 5cb6719

Please sign in to comment.