Skip to content

Commit

Permalink
remove set addr from publisher config
Browse files Browse the repository at this point in the history
  • Loading branch information
andyollylarkin committed Aug 25, 2023
1 parent dda2416 commit 0d0181f
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 42 deletions.
3 changes: 1 addition & 2 deletions examples/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func main() {
pConn := connection.NewTCPConnection(net.Dialer{}, time.Second*30)

p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 9090},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}, false)
Expand All @@ -26,7 +25,7 @@ func main() {
log.Fatal(err)
}

err = p.Connect()
err = p.Connect(&net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 9090})

if err != nil {
log.Fatal(err)
Expand Down
3 changes: 1 addition & 2 deletions examples/retryable_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func main() {
watermill.NewStdLogger(true, true), addr, connection.DefaultErrorFilter, time.Second*4)

p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: addr,
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
Logger: watermill.NewStdLogger(true, true),
Expand All @@ -35,7 +34,7 @@ func main() {

p.SetConnection(wpConn)

err = p.Connect()
err = p.Connect(addr)

if err != nil {
log.Fatal(err)
Expand Down
11 changes: 2 additions & 9 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
)

type PublisherConfig struct {
RemoteAddr net.Addr
Marshaler Marshaler
Unmarshaler Unmarshaler
Logger watermill.LoggerAdapter
Expand All @@ -27,7 +26,6 @@ type Publisher struct {
conn Connection
marshaler Marshaler
unmarshaler Unmarshaler
addr net.Addr
logger watermill.LoggerAdapter
closed bool
mu sync.Mutex
Expand All @@ -42,7 +40,6 @@ func NewPublisher(config PublisherConfig, waitAck bool) (*Publisher, error) {
}

p := new(Publisher)
p.addr = config.RemoteAddr
p.marshaler = config.Marshaler
p.unmarshaler = config.Unmarshaler
p.logger = config.Logger
Expand All @@ -52,10 +49,6 @@ func NewPublisher(config PublisherConfig, waitAck bool) (*Publisher, error) {
}

func validatePublisherConfig(c PublisherConfig) error {
if c.RemoteAddr == nil {
return &InvalidConfigError{InvalidField: "Addr", InvalidReason: "cant be nil"}
}

if c.Marshaler == nil {
return &InvalidConfigError{InvalidField: "Marshaler", InvalidReason: "cant be nil"}
}
Expand Down Expand Up @@ -91,7 +84,7 @@ func (p *Publisher) GetConnection() (Connection, error) {
}

// Connect to remote side.
func (p *Publisher) Connect() error {
func (p *Publisher) Connect(addr net.Addr) error {
if p.closed {
return ErrPublisherClosed
}
Expand All @@ -100,7 +93,7 @@ func (p *Publisher) Connect() error {
return ErrConnectionNotSet
}

return p.conn.Connect(p.addr)
return p.conn.Connect(addr)
}

// Publish publishes provided messages to given topic.
Expand Down
26 changes: 3 additions & 23 deletions publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func TestPublishMessageRemoteSideReceiveOK(t *testing.T) {
pipeConn := NewPipeConnection()
uuid := watermill.NewUUID()
config := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}
Expand All @@ -65,7 +64,6 @@ func TestPublishMessageRemoteSideReceiveNackResponse(t *testing.T) {
pipeConn := NewPipeConnection()
uuid := watermill.NewUUID()
config := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}
Expand All @@ -91,7 +89,6 @@ func TestPublishMessageOnClosedPublisher(t *testing.T) {
pipeConn := NewPipeConnection()
uuid := watermill.NewUUID()
config := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}
Expand Down Expand Up @@ -121,19 +118,9 @@ func TestPublisherCreateError(t *testing.T) {
config watermillnet.PublisherConfig
expected string
}{
{
name: "Err Addr nil",
config: watermillnet.PublisherConfig{
RemoteAddr: nil,
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
},
expected: "invalid field: Addr. reason: cant be nil",
},
{
name: "Err Marshaler nil",
config: watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: nil,
Unmarshaler: pkg.MessagePackUnmarshaler{},
},
Expand All @@ -142,7 +129,6 @@ func TestPublisherCreateError(t *testing.T) {
{
name: "Err Unmarshaler nil",
config: watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: nil,
},
Expand All @@ -168,7 +154,6 @@ func TestPublisherCreateOK(t *testing.T) {
{
name: "Publisher no error",
config: watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
},
Expand All @@ -186,7 +171,6 @@ func TestPublisherCreateOK(t *testing.T) {
func TestPublishToClosedPublisher(t *testing.T) {
conn := NewPipeConnection()
config := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}
Expand All @@ -204,7 +188,6 @@ func TestPublishMultiMessage(t *testing.T) {
pipeConn := NewPipeConnection()
uuid := watermill.NewUUID()
config := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}
Expand Down Expand Up @@ -232,7 +215,6 @@ func TestPublishMultiMessage(t *testing.T) {
func TestConnectOnClosedPublisher(t *testing.T) {
conn := NewPipeConnection()
c := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}
Expand All @@ -242,29 +224,27 @@ func TestConnectOnClosedPublisher(t *testing.T) {
assert.NoError(t, err)
err = p.Close()
assert.NoError(t, err)
err = p.Connect()
err = p.Connect(pipeAddr{})
assert.ErrorIs(t, err, watermillnet.ErrPublisherClosed)
}

func TestConnectOK(t *testing.T) {
conn := NewPipeConnection()
c := watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}

p, err := watermillnet.NewPublisher(c, false)
p.SetConnection(conn)
assert.NoError(t, err)
err = p.Connect()
err = p.Connect(pipeAddr{})
assert.NoError(t, err)
}

func TestPublisherConnectionNotSetError(t *testing.T) {
retPub := func(t *testing.T) *watermillnet.Publisher {
p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}, false)
Expand Down Expand Up @@ -298,7 +278,7 @@ func TestPublisherConnectionNotSetError(t *testing.T) {
}, expect: watermillnet.ErrConnectionNotSet},
{name: "Connect", exec: func(t *testing.T) error {
p := retPub(t)
err := p.Connect()
err := p.Connect(pipeAddr{})

return err
}, expect: watermillnet.ErrConnectionNotSet},
Expand Down
9 changes: 3 additions & 6 deletions subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ func TestPublisherConnectError(t *testing.T) {
func TestCloseWaitAllConns(t *testing.T) {
pipeConn := NewPipeConnection()
p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}, false)
p.SetConnection(pipeConn)
require.NoError(t, err)
err = p.Connect()
err = p.Connect(pipeAddr{})
require.NoError(t, err)

s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{
Expand Down Expand Up @@ -169,13 +168,12 @@ func TestCloseWaitAllConns(t *testing.T) {
func TestCloseSocketConn(t *testing.T) {
pipeConn := NewPipeConnection()
p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}, false)
p.SetConnection(pipeConn)
require.NoError(t, err)
err = p.Connect()
err = p.Connect(pipeAddr{})
require.NoError(t, err)

s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{
Expand All @@ -200,13 +198,12 @@ func TestCloseSocketConn(t *testing.T) {
func TestSubscriberCancelByContext(t *testing.T) {
pipeConn := NewPipeConnection()
p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}, false)
p.SetConnection(pipeConn)
require.NoError(t, err)
err = p.Connect()
err = p.Connect(pipeAddr{})
require.NoError(t, err)

s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{
Expand Down

0 comments on commit 0d0181f

Please sign in to comment.