From e020008df4799501216b676f3fa97d55f3cfc315 Mon Sep 17 00:00:00 2001 From: alsm Date: Fri, 11 Aug 2017 16:45:52 +0100 Subject: [PATCH] Simplifed the keepalive mechanism Cond vars, multiple timers, locks, handling resets. It'd gotten to complex and unwieldy, and worse didn't seem to work reliably. There is now a simple calculation for sending a ping and determining if we haven't received one. There is a side effect that we actually wait the longer PingTimeout or keepalive/2 (when keepalive <= 10 seconds) before triggering that we haven't seen an expected pingresp. Finally, hopefully the resolution to #126 --- client.go | 14 ++--- net.go | 6 +-- ping.go | 155 ++++++++++-------------------------------------------- 3 files changed, 39 insertions(+), 136 deletions(-) diff --git a/client.go b/client.go index 3bf0c6cf..2a9051b6 100644 --- a/client.go +++ b/client.go @@ -77,9 +77,9 @@ type client struct { stop chan struct{} persist Store options ClientOptions - pingResp *sync.Cond - packetResp *sync.Cond - keepaliveReset *sync.Cond + lastSent time.Time + lastReceived time.Time + pingOutstanding bool status uint32 workers sync.WaitGroup } @@ -237,11 +237,10 @@ func (c *client) Connect() Token { c.ibound = make(chan packets.ControlPacket) c.errors = make(chan error, 1) c.stop = make(chan struct{}) - c.pingResp = sync.NewCond(&sync.Mutex{}) - c.packetResp = sync.NewCond(&sync.Mutex{}) - c.keepaliveReset = sync.NewCond(&sync.Mutex{}) if c.options.KeepAlive != 0 { + c.lastReceived = time.Now() + c.lastSent = time.Now() c.workers.Add(1) go keepalive(c) } @@ -343,6 +342,9 @@ func (c *client) reconnect() { } if c.options.KeepAlive != 0 { + c.pingOutstanding = false + c.lastReceived = time.Now() + c.lastSent = time.Now() c.workers.Add(1) go keepalive(c) } diff --git a/net.go b/net.go index 3c9e9ba9..56c7b180 100644 --- a/net.go +++ b/net.go @@ -125,7 +125,7 @@ func incoming(c *client) { case c.ibound <- cp: // Notify keepalive logic that we recently received a packet if c.options.KeepAlive != 0 { - c.packetResp.Broadcast() + c.lastReceived = time.Now() } case <-c.stop: // This avoids a deadlock should a message arrive while shutting down. @@ -205,7 +205,7 @@ func outgoing(c *client) { } // Reset ping timer after sending control packet. if c.options.KeepAlive != 0 { - c.keepaliveReset.Broadcast() + c.lastSent = time.Now() } } } @@ -228,7 +228,7 @@ func alllogic(c *client) { switch m := msg.(type) { case *packets.PingrespPacket: DEBUG.Println(NET, "received pingresp") - c.pingResp.Broadcast() + c.pingOutstanding = false case *packets.SubackPacket: DEBUG.Println(NET, "received suback, id:", m.MessageID) token := c.getToken(m.MessageID) diff --git a/ping.go b/ping.go index fefcff08..f9f05fdf 100644 --- a/ping.go +++ b/ping.go @@ -16,148 +16,49 @@ package mqtt import ( "errors" - "sync" "time" "github.com/eclipse/paho.mqtt.golang/packets" ) func keepalive(c *client) { + defer c.workers.Done() DEBUG.Println(PNG, "keepalive starting") + var checkInterval time.Duration + var pingSent time.Time - var condWG sync.WaitGroup - pingStop := make(chan struct{}) - - defer func() { - close(pingStop) - c.keepaliveReset.Broadcast() - c.pingResp.Broadcast() - c.packetResp.Broadcast() - condWG.Wait() - c.workers.Done() - }() - - receiveInterval := c.options.KeepAlive + (1 * time.Second) - pingTimer := timer{Timer: time.NewTimer(c.options.KeepAlive)} - receiveTimer := timer{Timer: time.NewTimer(receiveInterval)} - pingRespTimer := timer{Timer: time.NewTimer(c.options.PingTimeout)} - - pingRespTimer.Stop() - - condWG.Add(3) - go func() { - defer condWG.Done() - for { - c.pingResp.L.Lock() - c.pingResp.Wait() - c.pingResp.L.Unlock() - select { - case <-pingStop: - return - default: - } - DEBUG.Println(NET, "resetting ping timeout timer") - pingRespTimer.Stop() - pingTimer.Reset(c.options.KeepAlive) - receiveTimer.Reset(receiveInterval) - } - }() - - go func() { - defer condWG.Done() - for { - c.packetResp.L.Lock() - c.packetResp.Wait() - c.packetResp.L.Unlock() - select { - case <-pingStop: - return - default: - } - DEBUG.Println(NET, "resetting receive timer") - receiveTimer.Reset(receiveInterval) - } - }() + if c.options.KeepAlive > 10*time.Second { + checkInterval = 5 * time.Second + } else { + checkInterval = c.options.KeepAlive / 2 + } - go func() { - defer condWG.Done() - for { - c.keepaliveReset.L.Lock() - c.keepaliveReset.Wait() - c.keepaliveReset.L.Unlock() - select { - case <-pingStop: - return - default: - } - DEBUG.Println(NET, "resetting ping timer") - pingTimer.Reset(c.options.KeepAlive) - } - }() + intervalTicker := time.NewTicker(checkInterval) + defer intervalTicker.Stop() for { select { case <-c.stop: DEBUG.Println(PNG, "keepalive stopped") return - case <-pingTimer.C: - sendPing(&pingTimer, &pingRespTimer, c) - case <-receiveTimer.C: - receiveTimer.SetRead(true) - receiveTimer.Reset(receiveInterval) - sendPing(&pingTimer, &pingRespTimer, c) - case <-pingRespTimer.C: - pingRespTimer.SetRead(true) - CRITICAL.Println(PNG, "pingresp not received, disconnecting") - c.errors <- errors.New("pingresp not received, disconnecting") - pingTimer.Stop() - return + case <-intervalTicker.C: + if time.Now().Sub(c.lastSent) >= c.options.KeepAlive || time.Now().Sub(c.lastReceived) >= c.options.KeepAlive { + if !c.pingOutstanding { + DEBUG.Println(PNG, "keepalive sending ping") + ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) + //We don't want to wait behind large messages being sent, the Write call + //will block until it it able to send the packet. + c.pingOutstanding = true + ping.Write(c.conn) + c.lastSent = time.Now() + pingSent = time.Now() + } + } + if c.pingOutstanding && time.Now().Sub(pingSent) >= c.options.PingTimeout { + CRITICAL.Println(PNG, "pingresp not received, disconnecting") + c.errors <- errors.New("pingresp not received, disconnecting") + return + } } } } - -type timer struct { - sync.Mutex - *time.Timer - readFrom bool -} - -func (t *timer) SetRead(v bool) { - t.Lock() - t.readFrom = v - t.Unlock() -} - -func (t *timer) Stop() bool { - t.Lock() - defer t.SetRead(true) - defer t.Unlock() - - if !t.Timer.Stop() && !t.readFrom { - <-t.C - return false - } - return true -} - -func (t *timer) Reset(d time.Duration) bool { - t.Lock() - defer t.SetRead(false) - defer t.Unlock() - if !t.Timer.Stop() && !t.readFrom { - <-t.C - } - - return t.Timer.Reset(d) -} - -func sendPing(pt *timer, rt *timer, c *client) { - pt.SetRead(true) - DEBUG.Println(PNG, "keepalive sending ping") - ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) - //We don't want to wait behind large messages being sent, the Write call - //will block until it it able to send the packet. - ping.Write(c.conn) - - rt.Reset(c.options.PingTimeout) -}