From 2166a745d8826ba87800a0265f0726923384894a Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Fri, 3 May 2019 15:59:22 +0200 Subject: [PATCH] lib/protocol: Don't call receiver after calling Closed (fixes #4170) --- lib/protocol/common_test.go | 4 ++ lib/protocol/protocol.go | 75 ++++++++++++++++++++++------------- lib/protocol/protocol_test.go | 47 ++++++++++++++++++++++ 3 files changed, 99 insertions(+), 27 deletions(-) diff --git a/lib/protocol/common_test.go b/lib/protocol/common_test.go index 8faf9a3f0a8..4d51a3d0ec6 100644 --- a/lib/protocol/common_test.go +++ b/lib/protocol/common_test.go @@ -13,6 +13,7 @@ type TestModel struct { hash []byte weakHash uint32 fromTemporary bool + indexFn func(DeviceID, string, []FileInfo) closedCh chan struct{} closedErr error } @@ -24,6 +25,9 @@ func newTestModel() *TestModel { } func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) { + if t.indexFn != nil { + t.indexFn(deviceID, folder, files) + } } func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) { diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index d5ec3115f96..e5b079e369c 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -182,11 +182,14 @@ type rawConnection struct { nextID int32 nextIDMut sync.Mutex - outbox chan asyncMessage - closed chan struct{} - closeOnce sync.Once - sendCloseOnce sync.Once - compression Compression + inbox chan message + outbox chan asyncMessage + clusterConfigBox chan *ClusterConfig + receiverLoopStopped chan struct{} + closed chan struct{} + closeOnce sync.Once + sendCloseOnce sync.Once + compression Compression } type asyncResult struct { @@ -220,15 +223,18 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv cw := &countingWriter{Writer: writer} c := rawConnection{ - id: deviceID, - name: name, - receiver: nativeModel{receiver}, - cr: cr, - cw: cw, - awaiting: make(map[int32]chan asyncResult), - outbox: make(chan asyncMessage), - closed: make(chan struct{}), - compression: compress, + id: deviceID, + name: name, + receiver: nativeModel{receiver}, + cr: cr, + cw: cw, + awaiting: make(map[int32]chan asyncResult), + inbox: make(chan message), + outbox: make(chan asyncMessage), + clusterConfigBox: make(chan *ClusterConfig), + receiverLoopStopped: make(chan struct{}), + closed: make(chan struct{}), + compression: compress, } return wireFormatConnection{&c} @@ -237,8 +243,9 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv // Start creates the goroutines for sending and receiving of messages. It must // be called exactly once after creating a connection. func (c *rawConnection) Start() { + go c.readerLoop() go func() { - err := c.readerLoop() + err := c.receiverLoop() c.internalClose(err) }() go c.writerLoop() @@ -348,25 +355,37 @@ func (c *rawConnection) ping() bool { return c.send(&Ping{}, nil) } -func (c *rawConnection) readerLoop() (err error) { +func (c *rawConnection) readerLoop() { fourByteBuf := make([]byte, 4) - state := stateInitial for { + msg, err := c.readMessage(fourByteBuf) + if err != nil { + if err == errUnknownMessage { + // Unknown message types are skipped, for future extensibility. + continue + } + c.internalClose(err) + return + } select { + case c.inbox <- msg: case <-c.closed: - return ErrClosed - default: + return } - msg, err := c.readMessage(fourByteBuf) - if err == errUnknownMessage { - // Unknown message types are skipped, for future extensibility. - continue - } - if err != nil { - return err - } + } +} +func (c *rawConnection) receiverLoop() (err error) { + defer close(c.receiverLoopStopped) + var msg message + state := stateInitial + for { + select { + case msg = <-c.inbox: + case <-c.closed: + return ErrClosed + } switch msg := msg.(type) { case *ClusterConfig: l.Debugln("read ClusterConfig message") @@ -847,6 +866,8 @@ func (c *rawConnection) internalClose(err error) { } c.awaitingMut.Unlock() + <-c.receiverLoopStopped + c.receiver.Closed(c, err) }) } diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index aafdebf1438..6a4e2156afa 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -125,6 +125,53 @@ func TestCloseOnBlockingSend(t *testing.T) { } } +func TestCloseRace(t *testing.T) { + indexReceived := make(chan struct{}) + unblockIndex := make(chan struct{}) + m0 := newTestModel() + m0.indexFn = func(_ DeviceID, _ string, _ []FileInfo) { + close(indexReceived) + <-unblockIndex + } + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "c0", CompressNever).(wireFormatConnection).Connection.(*rawConnection) + c0.Start() + c1 := NewConnection(c1ID, br, aw, m1, "c1", CompressNever) + c1.Start() + c0.ClusterConfig(ClusterConfig{}) + c1.ClusterConfig(ClusterConfig{}) + + c1.Index("default", nil) + select { + case <-indexReceived: + case <-time.After(time.Second): + t.Fatal("timed out before receiving index") + } + + go c0.internalClose(errManual) + select { + case <-c0.closed: + case <-time.After(time.Second): + t.Fatal("timed out before c0.closed was closed") + } + + select { + case <-m0.closedCh: + t.Errorf("receiver.Closed called before receiver.Index") + default: + } + + close(unblockIndex) + + if err := m0.closedError(); err != errManual { + t.Fatal("Connection should be closed") + } +} + func TestMarshalIndexMessage(t *testing.T) { if testing.Short() { quickCfg.MaxCount = 10