Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use monotonic time for keep alive #303

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Use monotonic time for keep alive
  • Loading branch information
johnhydroware committed Apr 15, 2019
commit 52b2e623cac86a657edc0603bd9c6c1cac6c59c9
12 changes: 6 additions & 6 deletions client.go
Expand Up @@ -95,8 +95,8 @@ type Client interface {

// client implements the Client interface
type client struct {
lastSent int64
lastReceived int64
lastSent atomic.Value
lastReceived atomic.Value
pingOutstanding int32
status uint32
sync.RWMutex
Expand Down Expand Up @@ -300,8 +300,8 @@ func (c *client) Connect() Token {

if c.options.KeepAlive != 0 {
atomic.StoreInt32(&c.pingOutstanding, 0)
atomic.StoreInt64(&c.lastReceived, time.Now().Unix())
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
c.lastReceived.Store(time.Now())
c.lastSent.Store(time.Now())
c.workers.Add(1)
go keepalive(c)
}
Expand Down Expand Up @@ -412,8 +412,8 @@ func (c *client) reconnect() {

if c.options.KeepAlive != 0 {
atomic.StoreInt32(&c.pingOutstanding, 0)
atomic.StoreInt64(&c.lastReceived, time.Now().Unix())
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
c.lastReceived.Store(time.Now())
c.lastSent.Store(time.Now())
c.workers.Add(1)
go keepalive(c)
}
Expand Down
4 changes: 2 additions & 2 deletions net.go
Expand Up @@ -137,7 +137,7 @@ func incoming(c *client) {
case c.ibound <- cp:
// Notify keepalive logic that we recently received a packet
if c.options.KeepAlive != 0 {
atomic.StoreInt64(&c.lastReceived, time.Now().Unix())
c.lastReceived.Store(time.Now())
}
case <-c.stop:
// This avoids a deadlock should a message arrive while shutting down.
Expand Down Expand Up @@ -221,7 +221,7 @@ func outgoing(c *client) {
}
// Reset ping timer after sending control packet.
if c.options.KeepAlive != 0 {
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
c.lastSent.Store(time.Now())
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions ping.go
Expand Up @@ -43,16 +43,19 @@ func keepalive(c *client) {
DEBUG.Println(PNG, "keepalive stopped")
return
case <-intervalTicker.C:
DEBUG.Println(PNG, "ping check", time.Now().Unix()-atomic.LoadInt64(&c.lastSent))
if time.Now().Unix()-atomic.LoadInt64(&c.lastSent) >= c.options.KeepAlive || time.Now().Unix()-atomic.LoadInt64(&c.lastReceived) >= c.options.KeepAlive {
lastSent := c.lastSent.Load().(time.Time)
lastReceived := c.lastReceived.Load().(time.Time)

DEBUG.Println(PNG, "ping check", time.Since(lastSent).Seconds())
if time.Since(lastSent) >= time.Duration(c.options.KeepAlive*int64(time.Second)) || time.Since(lastReceived) >= time.Duration(c.options.KeepAlive*int64(time.Second)) {
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
//We don't want to wait behind large messages being sent, the Write call
//will block until it it able to send the packet.
atomic.StoreInt32(&c.pingOutstanding, 1)
ping.Write(c.conn)
atomic.StoreInt64(&c.lastSent, time.Now().Unix())
c.lastSent.Store(time.Now())
pingSent = time.Now()
}
}
Expand Down