Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
andyollylarkin committed Aug 25, 2023
1 parent 05866be commit 3e204ee
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
3 changes: 3 additions & 0 deletions examples/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import (
"log"
"net"

"github.com/ThreeDotsLabs/watermill"
watermillnet "github.com/andyollylarkin/watermill-net"
"github.com/andyollylarkin/watermill-net/pkg"
"github.com/andyollylarkin/watermill-net/pkg/connection"
)

func main() {
logger := watermill.NewStdLogger(true, true)
l, _ := net.Listen("tcp4", "127.0.0.1:9090")
s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{
Marshaler: pkg.MessagePackMarshaler{},
Unmarshaler: pkg.MessagePackUnmarshaler{},
Logger: logger,
})

if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watermillnet

import (
"context"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -145,6 +146,8 @@ func (s *Subscriber) GetConnection() (Connection, error) {
}

// Connect establish connection.
// If connection was set via SetConnection, we should pass nil to l.
// If listener was set, subscriber will be waiting until the client reconnects when the connection is lost.
func (s *Subscriber) Connect(l Listener) error {
if l == nil && s.conn == nil {
return fmt.Errorf("listener cant be nil")
Expand Down Expand Up @@ -184,6 +187,10 @@ func (s *Subscriber) reconnect(l Listener) {
return
}

if s.logger != nil {
s.logger.Info("Client reconnected", watermill.LogFields{"addr": conn.RemoteAddr().String()})
}

s.mu.Lock()
s.conn = conn
s.mu.Unlock()
Expand Down Expand Up @@ -231,6 +238,12 @@ func (s *Subscriber) handle(ctx context.Context, readCh <-chan []byte, sub *sub)
continue
}

if msg.Message == nil {
s.mu.RUnlock()

continue
}

// create new watermill message bacause after marshal/unmarshal ack/nack channels is nil
watermillMsg := message.NewMessage(msg.Message.UUID, msg.Message.Payload)

Expand Down Expand Up @@ -314,7 +327,7 @@ func (s *Subscriber) readContent() {

lenRaw, err := r.ReadBytes(internal.LenDelimiter)
if err != nil {
if s.logger != nil {
if s.logger != nil && !errors.Is(err, io.EOF) {
s.logger.Error("Error read message", err, nil)
}

Expand Down
57 changes: 54 additions & 3 deletions subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,57 @@ func TestAllowNilListenerWhenConnectionSet(t *testing.T) {
require.NoError(t, err)
}

// Subscribe topic independent

// Susccribe receive multi message
// func TestCancelConsumeWhenReadEOF(t *testing.T) {
// pc := connection.NewTCPConnection(net.Dialer{}, time.Hour)
// nl, err := net.Listen("tcp4", ":0")
// require.NoError(t, err)

// l := connection.NewTCP4Listener(nl)

// var wg sync.WaitGroup

// wg.Add(1)

// go func() {
// _, err := l.Accept()
// require.NoError(t, err)
// // sconn.Close()
// wg.Done()
// }()

// err = pc.Connect(nl.Addr())
// require.NoError(t, err)
// wg.Wait()

// b := make([]byte, 10)
// _, err = pc.Read(b)
// require.ErrorIs(t, err, io.EOF)

// s, err := watermillnet.NewSubscriber(watermillnet.SubscriberConfig{
// Marshaler: pkg.MessagePackMarshaler{},
// Unmarshaler: pkg.MessagePackUnmarshaler{},
// })
// require.NoError(t, err)
// s.SetConnection(pc)
// err = s.Connect(nil)
// require.NoError(t, err)

// sub, err := s.Subscribe(context.Background(), "")

// var mu sync.Mutex
// var done bool = false

// wg.Add(1)
// go func() {
// for range sub {
// }

// mu.Lock()
// done = true
// mu.Unlock()
// wg.Done()
// }()

// wg.Wait()
// require.Equal(t, true, done)
// }

0 comments on commit 3e204ee

Please sign in to comment.