Skip to content

Commit

Permalink
feat: update publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed Mar 12, 2024
1 parent 67a90c8 commit 391f82e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 52 deletions.
6 changes: 0 additions & 6 deletions integration/watermill/gtm/provider.go

This file was deleted.

66 changes: 20 additions & 46 deletions integration/watermill/gtm/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package gtm
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"strings"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/keel/log"
mpv2 "github.com/foomo/sesamy-go/measurementprotocol/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)

var (
Expand All @@ -19,20 +21,24 @@ var (

type (
Publisher struct {
l *zap.Logger
url string
client *http.Client
marshalMessageFunc MarshalMessageFunc
marshalMessageFunc PublisherMarshalMessageFunc
closed bool
}
PublisherOption func(*Publisher)
// PublisherMarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
PublisherMarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)
)

// ------------------------------------------------------------------------------------------------
// ~ Constructor
// ------------------------------------------------------------------------------------------------

func NewPublisher(url string, opts ...PublisherOption) *Publisher {
func NewPublisher(l *zap.Logger, url string, opts ...PublisherOption) *Publisher {
inst := &Publisher{
l: l,
url: url,
client: http.DefaultClient,
}
Expand All @@ -52,7 +58,7 @@ func PublisherWithClient(v *http.Client) PublisherOption {
}
}

func PublisherWithMarshalMessageFunc(v MarshalMessageFunc) PublisherOption {
func PublisherWithMarshalMessageFunc(v PublisherMarshalMessageFunc) PublisherOption {
return func(o *Publisher) {
o.marshalMessageFunc = v
}
Expand Down Expand Up @@ -105,31 +111,27 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
}
}

// logFields := watermill.LogFields{
// "uuid": msg.UUID,
// "provider": ProviderName,
// }

// p.l.Trace("Publishing message", logFields)

r, _ := httputil.DumpRequestOut(req, true)
fmt.Println("--> Outgoing publish")
fmt.Println(string(r))
l := log.WithHTTPRequestOut(p.l, req).With(
zap.String("message_id", msg.UUID),
)

resp, err := p.client.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to publish message: %s", msg.UUID)
}
defer resp.Body.Close()

if err = p.handleResponseBody(resp); err != nil {
return err
}
l = l.With(log.FHTTPStatusCode(resp.StatusCode))

if resp.StatusCode >= http.StatusBadRequest {
if body, err := io.ReadAll(resp.Body); err == nil {
l = l.With(zap.String("http_response", string(body)))
}
l.Info("server responded with error")
return errors.Wrap(ErrErrorResponse, resp.Status)
}

// p.l.Trace("Message published", logFields)
l.Debug("message published")
}

return nil
Expand All @@ -143,31 +145,3 @@ func (p *Publisher) Close() error {
p.closed = true
return nil
}

// ------------------------------------------------------------------------------------------------
// ~ Private methods
// ------------------------------------------------------------------------------------------------

func (p *Publisher) handleResponseBody(resp *http.Response) error {
defer resp.Body.Close()

if resp.StatusCode < http.StatusBadRequest {
return nil
}

// body, err := io.ReadAll(resp.Body)
// if err != nil {
// return errors.Wrap(err, "could not read response body")
// }

// logFields = logFields.Add(watermill.LogFields{
// "http_status": resp.StatusCode,
// "http_response": string(body),
// })
// p.l.Info("Server responded with error", logFields)

return nil
}

// MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)

0 comments on commit 391f82e

Please sign in to comment.