From 9995b72fc7f1e1a7812dfc8d602003c3b02a7487 Mon Sep 17 00:00:00 2001 From: Matt Brittan Date: Fri, 4 Jun 2021 10:27:46 +1200 Subject: [PATCH] Resolve rare deadlock that could occur when network error occurs while multiple other operations are in progress. Ref #509 --- client.go | 17 +++++++- fvt_client_test.go | 101 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/client.go b/client.go index 60d8e7f..847daee 100644 --- a/client.go +++ b/client.go @@ -607,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet commsIncomingPub = nil continue } - incomingPubChan <- pub + // Care is needed here because an error elsewhere could trigger a deadlock + sendPubLoop: + for { + select { + case incomingPubChan <- pub: + break sendPubLoop + case err, ok := <-commsErrors: + if !ok { // commsErrors has been closed so we can ignore it + commsErrors = nil + continue + } + ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err) + c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress) + continue + } + } case err, ok := <-commsErrors: if !ok { commsErrors = nil diff --git a/fvt_client_test.go b/fvt_client_test.go index ac8dcba..bfb3760 100644 --- a/fvt_client_test.go +++ b/fvt_client_test.go @@ -16,7 +16,10 @@ package mqtt import ( "bytes" + "context" "fmt" + "runtime" + "sync" "testing" "time" @@ -1444,3 +1447,101 @@ func Test_ResumeSubsWithReconnect(t *testing.T) { c.Disconnect(250) } + +// Issue 209 - occasional deadlock when connections are lost unexpectedly +// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of +// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number +// of times). Following the fix it ran 10,000 times without issue. +// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish +func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) { + topic := "/test/DisconnectWhileProcessingIncomingPublish" + + pops := NewClientOptions() + pops.AddBroker(FVTTCP) + // pops.SetOrderMatters(false) // Not really needed but consistent... + pops.SetClientID("dwpip-pub") + p := NewClient(pops) + + sops := NewClientOptions() + sops.AddBroker(FVTTCP) + sops.SetAutoReconnect(false) // We dont want the connection to be re-established + sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear... + // sops.SetOrderMatters(false) + sops.SetClientID("dwpip-sub") + // We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured) + sDisconnected := make(chan struct{}) + sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) }) + + msgReceived := make(chan struct{}) + var oneMsgReceived sync.Once + var f MessageHandler = func(client Client, msg Message) { + // No need to do anything when message received (just want ACK sent ASAP) + oneMsgReceived.Do(func() { close(msgReceived) }) + } + + s := NewClient(sops).(*client) // s = subscriber + if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error()) + } + + if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error()) + } + + // Use a go routine to swamp the broker with messages + if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // p = publisher + t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error()) + } + // We will hammer both the publisher and subscriber with messages + ctx, cancel := context.WithCancel(context.Background()) + pubDone := make(chan struct{}) + go func() { + defer close(pubDone) + i := 0 + for { + p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i)) + // After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time) + go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }() + i++ + + if ctx.Err() != nil { + return + } + } + }() + + // Wait until we have received a message (ensuring that the stream of messages has started) + select { + case <-msgReceived: // All good + case <-time.After(time.Second): + t.Errorf("no messages received") + } + + // We need the connection to drop; unfortunately using any internal method (`s.conn.Close()` etc) will hide the + // behaviour because any calls to Read/Write will return immediately. So we just ask the broker to disconnect.. + dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) + err := dm.Write(s.conn) + if err != nil { + t.Fatalf("error dending disconnect packet: %s", err) + } + + // Lets give the library up to a second to shutdown (indicated by the status changing) + select { + case <-sDisconnected: // All good + case <-time.After(time.Second): + cancel() // no point leaving publisher running + time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!) + buf := make([]byte, 1<<20) + stacklen := runtime.Stack(buf, true) + t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen]) + } + + cancel() // no point leaving publisher running + + select { + case <-pubDone: + case <-time.After(time.Second): + t.Errorf("pubdone not closed within a second") + } + p.Disconnect(250) // Close publisher +}