Skip to content

Commit

Permalink
add: optionally wait ack/nack message
Browse files Browse the repository at this point in the history
  • Loading branch information
andyollylarkin committed Aug 24, 2023
1 parent 5b4a7c8 commit f91ec35
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 19 deletions.
2 changes: 1 addition & 1 deletion examples/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
RemoteAddr: &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 9090},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
})
}, false)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/retryable_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
Logger: watermill.NewStdLogger(true, true),
})
}, false)
if err != nil {
log.Fatal(err)
}
Expand Down
20 changes: 15 additions & 5 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type Publisher struct {
logger watermill.LoggerAdapter
closed bool
mu sync.Mutex
waitAck bool
}

func NewPublisher(config PublisherConfig) (*Publisher, error) {
func NewPublisher(config PublisherConfig, waitAck bool) (*Publisher, error) {
if err := validatePublisherConfig(config); err != nil {
return nil, err
}
Expand All @@ -45,6 +46,7 @@ func NewPublisher(config PublisherConfig) (*Publisher, error) {
p.marshaler = config.Marshaler
p.unmarshaler = config.Unmarshaler
p.logger = config.Logger
p.waitAck = waitAck

return p, nil
}
Expand Down Expand Up @@ -106,22 +108,30 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {

b = internal.PrepareMessageForSend(b)

retry.Do(context.Background(), retry.NewConstant(time.Second*3), func(ctx context.Context) error { //nolint: gomnd
err = retry.Do(context.Background(), retry.NewConstant(time.Second*3), func(ctx context.Context) error { //nolint: gomnd
_, err = p.conn.Write(b)

if err != nil {
return err
}

if err = p.handleResponse(); err != nil { // wait ack or nack
if errors.Is(err, ErrIOTimeout) {
return retry.RetryableError(err)
if p.waitAck {
if err = p.handleResponse(); err != nil { // wait ack or nack
if errors.Is(err, ErrIOTimeout) {
return retry.RetryableError(err)
}

return err
}
}

return nil
})

if err != nil {
return err
}

if p.logger != nil {
fields := watermill.LogFields{
"uuid": msg.UUID,
Expand Down
18 changes: 9 additions & 9 deletions publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestPublishMessageRemoteSideReceiveOK(t *testing.T) {
rs.Write(ackMsg)
}()

p, err := watermillnet.NewPublisher(config)
p, err := watermillnet.NewPublisher(config, true)
require.NoError(t, err)
err = p.Publish("test_topic", message.NewMessage("", []byte("Hello world")))
require.NoError(t, err)
Expand All @@ -79,7 +79,7 @@ func TestPublishMessageRemoteSideReceiveNackResponse(t *testing.T) {
rs.Write(ackMsg)
}()

p, err := watermillnet.NewPublisher(config)
p, err := watermillnet.NewPublisher(config, true)
require.NoError(t, err)
err = p.Publish("test_topic", message.NewMessage("", []byte("Hello world")))
require.Error(t, err)
Expand All @@ -105,7 +105,7 @@ func TestPublishMessageOnClosedPublisher(t *testing.T) {
rs.Write(ackMsg)
}()

p, err := watermillnet.NewPublisher(config)
p, err := watermillnet.NewPublisher(config, false)
require.NoError(t, err)
err = p.Close()
require.NoError(t, err)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestPublisherCreateError(t *testing.T) {

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
_, err := watermillnet.NewPublisher(c.config)
_, err := watermillnet.NewPublisher(c.config, false)
assert.Error(t, err)
assert.ErrorContains(t, err, c.expected)
})
Expand All @@ -191,7 +191,7 @@ func TestPublisherCreateOK(t *testing.T) {

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
_, err := watermillnet.NewPublisher(c.config)
_, err := watermillnet.NewPublisher(c.config, false)
assert.NoError(t, err)
})
}
Expand All @@ -206,7 +206,7 @@ func TestPublishToClosedPublisher(t *testing.T) {
Unmarshaler: pkg.MessagePackUnmarshaler{},
}

p, err := watermillnet.NewPublisher(config)
p, err := watermillnet.NewPublisher(config, false)
require.NoError(t, err)
err = p.Close()
require.NoError(t, err)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestPublishMultiMessage(t *testing.T) {
}
}()

p, err := watermillnet.NewPublisher(config)
p, err := watermillnet.NewPublisher(config, true)
require.NoError(t, err)
err = p.Publish("test_topic", message.NewMessage("", []byte("Hello world")), //send 2 messages
message.NewMessage("", []byte("Hello world2")))
Expand All @@ -251,7 +251,7 @@ func TestConnectOnClosedPublisher(t *testing.T) {
Unmarshaler: pkg.MessagePackUnmarshaler{},
}

p, err := watermillnet.NewPublisher(c)
p, err := watermillnet.NewPublisher(c, false)
assert.NoError(t, err)
err = p.Close()
assert.NoError(t, err)
Expand All @@ -267,7 +267,7 @@ func TestConnectOK(t *testing.T) {
Unmarshaler: pkg.MessagePackUnmarshaler{},
}

p, err := watermillnet.NewPublisher(c)
p, err := watermillnet.NewPublisher(c, false)
assert.NoError(t, err)
err = p.Connect()
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestCloseWaitAllConns(t *testing.T) {
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
})
}, false)
require.NoError(t, err)
err = p.Connect()
require.NoError(t, err)
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestCloseSocketConn(t *testing.T) {
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
})
}, false)
require.NoError(t, err)
err = p.Connect()
require.NoError(t, err)
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestSubscriberCancelByContext(t *testing.T) {
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
})
}, false)
require.NoError(t, err)
err = p.Connect()
require.NoError(t, err)
Expand Down

0 comments on commit f91ec35

Please sign in to comment.