Skip to content

Commit

Permalink
Simplifed the keepalive mechanism
Browse files Browse the repository at this point in the history
Cond vars, multiple timers, locks, handling resets. It'd gotten to complex and unwieldy, and worse didn't seem to work reliably.
There is now a simple calculation for sending a ping and determining if we haven't received one.
There is a side effect that we actually wait the longer PingTimeout or keepalive/2 (when keepalive <= 10 seconds) before triggering that we haven't seen an expected pingresp.
Finally, hopefully the resolution to #126
  • Loading branch information
alsm committed Aug 11, 2017
1 parent aff1577 commit e020008
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 136 deletions.
14 changes: 8 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ type client struct {
stop chan struct{}
persist Store
options ClientOptions
pingResp *sync.Cond
packetResp *sync.Cond
keepaliveReset *sync.Cond
lastSent time.Time
lastReceived time.Time
pingOutstanding bool
status uint32
workers sync.WaitGroup
}
Expand Down Expand Up @@ -237,11 +237,10 @@ func (c *client) Connect() Token {
c.ibound = make(chan packets.ControlPacket)
c.errors = make(chan error, 1)
c.stop = make(chan struct{})
c.pingResp = sync.NewCond(&sync.Mutex{})
c.packetResp = sync.NewCond(&sync.Mutex{})
c.keepaliveReset = sync.NewCond(&sync.Mutex{})

if c.options.KeepAlive != 0 {
c.lastReceived = time.Now()
c.lastSent = time.Now()
c.workers.Add(1)
go keepalive(c)
}
Expand Down Expand Up @@ -343,6 +342,9 @@ func (c *client) reconnect() {
}

if c.options.KeepAlive != 0 {
c.pingOutstanding = false
c.lastReceived = time.Now()
c.lastSent = time.Now()
c.workers.Add(1)
go keepalive(c)
}
Expand Down
6 changes: 3 additions & 3 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func incoming(c *client) {
case c.ibound <- cp:
// Notify keepalive logic that we recently received a packet
if c.options.KeepAlive != 0 {
c.packetResp.Broadcast()
c.lastReceived = time.Now()
}
case <-c.stop:
// This avoids a deadlock should a message arrive while shutting down.
Expand Down Expand Up @@ -205,7 +205,7 @@ func outgoing(c *client) {
}
// Reset ping timer after sending control packet.
if c.options.KeepAlive != 0 {
c.keepaliveReset.Broadcast()
c.lastSent = time.Now()
}
}
}
Expand All @@ -228,7 +228,7 @@ func alllogic(c *client) {
switch m := msg.(type) {
case *packets.PingrespPacket:
DEBUG.Println(NET, "received pingresp")
c.pingResp.Broadcast()
c.pingOutstanding = false
case *packets.SubackPacket:
DEBUG.Println(NET, "received suback, id:", m.MessageID)
token := c.getToken(m.MessageID)
Expand Down
155 changes: 28 additions & 127 deletions ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,148 +16,49 @@ package mqtt

import (
"errors"
"sync"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
)

func keepalive(c *client) {
defer c.workers.Done()
DEBUG.Println(PNG, "keepalive starting")
var checkInterval time.Duration
var pingSent time.Time

var condWG sync.WaitGroup
pingStop := make(chan struct{})

defer func() {
close(pingStop)
c.keepaliveReset.Broadcast()
c.pingResp.Broadcast()
c.packetResp.Broadcast()
condWG.Wait()
c.workers.Done()
}()

receiveInterval := c.options.KeepAlive + (1 * time.Second)
pingTimer := timer{Timer: time.NewTimer(c.options.KeepAlive)}
receiveTimer := timer{Timer: time.NewTimer(receiveInterval)}
pingRespTimer := timer{Timer: time.NewTimer(c.options.PingTimeout)}

pingRespTimer.Stop()

condWG.Add(3)
go func() {
defer condWG.Done()
for {
c.pingResp.L.Lock()
c.pingResp.Wait()
c.pingResp.L.Unlock()
select {
case <-pingStop:
return
default:
}
DEBUG.Println(NET, "resetting ping timeout timer")
pingRespTimer.Stop()
pingTimer.Reset(c.options.KeepAlive)
receiveTimer.Reset(receiveInterval)
}
}()

go func() {
defer condWG.Done()
for {
c.packetResp.L.Lock()
c.packetResp.Wait()
c.packetResp.L.Unlock()
select {
case <-pingStop:
return
default:
}
DEBUG.Println(NET, "resetting receive timer")
receiveTimer.Reset(receiveInterval)
}
}()
if c.options.KeepAlive > 10*time.Second {
checkInterval = 5 * time.Second
} else {
checkInterval = c.options.KeepAlive / 2
}

go func() {
defer condWG.Done()
for {
c.keepaliveReset.L.Lock()
c.keepaliveReset.Wait()
c.keepaliveReset.L.Unlock()
select {
case <-pingStop:
return
default:
}
DEBUG.Println(NET, "resetting ping timer")
pingTimer.Reset(c.options.KeepAlive)
}
}()
intervalTicker := time.NewTicker(checkInterval)
defer intervalTicker.Stop()

for {
select {
case <-c.stop:
DEBUG.Println(PNG, "keepalive stopped")
return
case <-pingTimer.C:
sendPing(&pingTimer, &pingRespTimer, c)
case <-receiveTimer.C:
receiveTimer.SetRead(true)
receiveTimer.Reset(receiveInterval)
sendPing(&pingTimer, &pingRespTimer, c)
case <-pingRespTimer.C:
pingRespTimer.SetRead(true)
CRITICAL.Println(PNG, "pingresp not received, disconnecting")
c.errors <- errors.New("pingresp not received, disconnecting")
pingTimer.Stop()
return
case <-intervalTicker.C:
if time.Now().Sub(c.lastSent) >= c.options.KeepAlive || time.Now().Sub(c.lastReceived) >= c.options.KeepAlive {
if !c.pingOutstanding {
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
//We don't want to wait behind large messages being sent, the Write call
//will block until it it able to send the packet.
c.pingOutstanding = true
ping.Write(c.conn)
c.lastSent = time.Now()
pingSent = time.Now()
}
}
if c.pingOutstanding && time.Now().Sub(pingSent) >= c.options.PingTimeout {
CRITICAL.Println(PNG, "pingresp not received, disconnecting")
c.errors <- errors.New("pingresp not received, disconnecting")
return
}
}
}
}

type timer struct {
sync.Mutex
*time.Timer
readFrom bool
}

func (t *timer) SetRead(v bool) {
t.Lock()
t.readFrom = v
t.Unlock()
}

func (t *timer) Stop() bool {
t.Lock()
defer t.SetRead(true)
defer t.Unlock()

if !t.Timer.Stop() && !t.readFrom {
<-t.C
return false
}
return true
}

func (t *timer) Reset(d time.Duration) bool {
t.Lock()
defer t.SetRead(false)
defer t.Unlock()
if !t.Timer.Stop() && !t.readFrom {
<-t.C
}

return t.Timer.Reset(d)
}

func sendPing(pt *timer, rt *timer, c *client) {
pt.SetRead(true)
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
//We don't want to wait behind large messages being sent, the Write call
//will block until it it able to send the packet.
ping.Write(c.conn)

rt.Reset(c.options.PingTimeout)
}

0 comments on commit e020008

Please sign in to comment.