diff --git a/examples/server.go b/examples/server.go index 6557885..98bdb07 100644 --- a/examples/server.go +++ b/examples/server.go @@ -6,16 +6,19 @@ import ( "log" "net" + "github.com/ThreeDotsLabs/watermill" watermillnet "github.com/andyollylarkin/watermill-net" "github.com/andyollylarkin/watermill-net/pkg" "github.com/andyollylarkin/watermill-net/pkg/connection" ) func main() { + logger := watermill.NewStdLogger(true, true) l, _ := net.Listen("tcp4", "127.0.0.1:9090") s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{ Marshaler: pkg.MessagePackMarshaler{}, Unmarshaler: pkg.MessagePackUnmarshaler{}, + Logger: logger, }) if err != nil { diff --git a/subscriber.go b/subscriber.go index 1043415..0465d73 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,6 +2,7 @@ package watermillnet import ( "context" + "errors" "fmt" "io" "sync" @@ -145,6 +146,8 @@ func (s *Subscriber) GetConnection() (Connection, error) { } // Connect establish connection. +// If connection was set via SetConnection, we should pass nil to l. +// If listener was set, subscriber will be waiting until the client reconnects when the connection is lost. func (s *Subscriber) Connect(l Listener) error { if l == nil && s.conn == nil { return fmt.Errorf("listener cant be nil") @@ -184,6 +187,10 @@ func (s *Subscriber) reconnect(l Listener) { return } + if s.logger != nil { + s.logger.Info("Client reconnected", watermill.LogFields{"addr": conn.RemoteAddr().String()}) + } + s.mu.Lock() s.conn = conn s.mu.Unlock() @@ -231,6 +238,12 @@ func (s *Subscriber) handle(ctx context.Context, readCh <-chan []byte, sub *sub) continue } + if msg.Message == nil { + s.mu.RUnlock() + + continue + } + // create new watermill message bacause after marshal/unmarshal ack/nack channels is nil watermillMsg := message.NewMessage(msg.Message.UUID, msg.Message.Payload) @@ -314,7 +327,7 @@ func (s *Subscriber) readContent() { lenRaw, err := r.ReadBytes(internal.LenDelimiter) if err != nil { - if s.logger != nil { + if s.logger != nil && !errors.Is(err, io.EOF) { s.logger.Error("Error read message", err, nil) } diff --git a/subscriber_test.go b/subscriber_test.go index 1f6d0a3..2bc4add 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -304,6 +304,57 @@ func TestAllowNilListenerWhenConnectionSet(t *testing.T) { require.NoError(t, err) } -// Subscribe topic independent - -// Susccribe receive multi message +// func TestCancelConsumeWhenReadEOF(t *testing.T) { +// pc := connection.NewTCPConnection(net.Dialer{}, time.Hour) +// nl, err := net.Listen("tcp4", ":0") +// require.NoError(t, err) + +// l := connection.NewTCP4Listener(nl) + +// var wg sync.WaitGroup + +// wg.Add(1) + +// go func() { +// _, err := l.Accept() +// require.NoError(t, err) +// // sconn.Close() +// wg.Done() +// }() + +// err = pc.Connect(nl.Addr()) +// require.NoError(t, err) +// wg.Wait() + +// b := make([]byte, 10) +// _, err = pc.Read(b) +// require.ErrorIs(t, err, io.EOF) + +// s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{ +// Marshaler: pkg.MessagePackMarshaler{}, +// Unmarshaler: pkg.MessagePackUnmarshaler{}, +// }) +// require.NoError(t, err) +// s.SetConnection(pc) +// err = s.Connect(nil) +// require.NoError(t, err) + +// sub, err := s.Subscribe(context.Background(), "") + +// var mu sync.Mutex +// var done bool = false + +// wg.Add(1) +// go func() { +// for range sub { +// } + +// mu.Lock() +// done = true +// mu.Unlock() +// wg.Done() +// }() + +// wg.Wait() +// require.Equal(t, true, done) +// }