Skip to content

Commit

Permalink
add tests for case when connection not set
Browse files Browse the repository at this point in the history
  • Loading branch information
andyollylarkin committed Aug 25, 2023
1 parent 8452b7c commit dda2416
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 7 deletions.
51 changes: 51 additions & 0 deletions publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,54 @@ func TestConnectOK(t *testing.T) {
err = p.Connect()
assert.NoError(t, err)
}

func TestPublisherConnectionNotSetError(t *testing.T) {
retPub := func(t *testing.T) *watermillnet.Publisher {
p, err := watermillnet.NewPublisher(watermillnet.PublisherConfig{
RemoteAddr: pipeAddr{},
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
}, false)
require.NoError(t, err)

return p
}

tc := []struct {
name string
exec func(t *testing.T) error
expect error
}{
{name: "Get connection", exec: func(t *testing.T) error {
p := retPub(t)
_, err := p.GetConnection()

return err
}, expect: watermillnet.ErrConnectionNotSet},
{name: "Publish", exec: func(t *testing.T) error {
p := retPub(t)
err := p.Publish("", message.NewMessage("", nil))

return err
}, expect: watermillnet.ErrConnectionNotSet},
{name: "Close", exec: func(t *testing.T) error {
p := retPub(t)
err := p.Close()

return err
}, expect: watermillnet.ErrConnectionNotSet},
{name: "Connect", exec: func(t *testing.T) error {
p := retPub(t)
err := p.Connect()

return err
}, expect: watermillnet.ErrConnectionNotSet},
}

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
res := c.exec(t)
assert.ErrorIs(t, res, c.expect)
})
}
}
15 changes: 8 additions & 7 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ func (s *Subscriber) readContent() {
func (s *Subscriber) Close() error {
s.mu.Lock()

if s.conn == nil {
s.mu.Unlock()

return ErrConnectionNotSet
}

for _, v := range s.subscribers {
if !v.Closed {
v.Closed = true
Expand All @@ -365,13 +371,8 @@ func (s *Subscriber) Close() error {
s.processWg.Wait()

s.mu.Lock()
if s.conn != nil {
err := s.conn.Close()
s.mu.Unlock()

return err
}
err := s.conn.Close()
s.mu.Unlock()

return nil
return err
}
45 changes: 45 additions & 0 deletions subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestPublisherConnectErrorClosed(t *testing.T) {
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
})
s.SetConnection(pipeConn)
require.NoError(t, err)
err = s.Close()
require.NoError(t, err)
Expand Down Expand Up @@ -250,6 +251,50 @@ func TestSubscriberCancelByContext(t *testing.T) {
assert.Equal(t, true, complete)
}

func TestSubscriberConnectionNotSetError(t *testing.T) {
retPub := func(t *testing.T) *watermillnet.Subscriber {
s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
})
require.NoError(t, err)

return s
}

tc := []struct {
name string
exec func(t *testing.T) error
expect error
}{
{name: "Get connection", exec: func(t *testing.T) error {
p := retPub(t)
_, err := p.GetConnection()

return err
}, expect: watermillnet.ErrConnectionNotSet},
{name: "Subscribe", exec: func(t *testing.T) error {
p := retPub(t)
_, err := p.Subscribe(context.Background(), "")

return err
}, expect: watermillnet.ErrConnectionNotSet},
{name: "Close", exec: func(t *testing.T) error {
p := retPub(t)
err := p.Close()

return err
}, expect: watermillnet.ErrConnectionNotSet},
}

for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
res := c.exec(t)
assert.ErrorIs(t, res, c.expect)
})
}
}

// Subscribe topic independent

// Susccribe receive multi message

0 comments on commit dda2416

Please sign in to comment.