Skip to content

Commit

Permalink
Improve pingloop and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
belak committed Aug 21, 2017
1 parent 1f0cac9 commit 96372b4
Showing 1 changed file with 64 additions and 59 deletions.
123 changes: 64 additions & 59 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package irc

import (
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -86,13 +85,15 @@ type Client struct {
currentNick string
limiter chan struct{}
incomingPongChan chan string
errChan chan error
}

// NewClient creates a client given an io stream and a client config.
func NewClient(rw io.ReadWriter, config ClientConfig) *Client {
c := &Client{
Conn: NewConn(rw),
config: config,
Conn: NewConn(rw),
config: config,
errChan: make(chan error, 1),
}

// Replace the writer writeCallback with one of our own
Expand All @@ -110,7 +111,7 @@ func (c *Client) writeCallback(w *Writer, line string) error {
return err
}

func (c *Client) maybeStartLimiter(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) {
func (c *Client) maybeStartLimiter(wg *sync.WaitGroup, exiting chan struct{}) {
if c.config.SendLimit == 0 {
return
}
Expand Down Expand Up @@ -143,70 +144,75 @@ func (c *Client) maybeStartLimiter(wg *sync.WaitGroup, errChan chan error, exiti
}()
}

func (c *Client) maybeStartPingLoop(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) {
func (c *Client) maybeStartPingLoop(wg *sync.WaitGroup, exiting chan struct{}) {
if c.config.PingFrequency <= 0 {
return
}

wg.Add(1)

c.incomingPongChan = make(chan string, 5)
go c.pingLoop(wg, errChan, exiting)
}

type pingDeadline struct {
Deadline <-chan time.Time
Data string
go func() {
defer wg.Done()

pingHandlers := make(map[string]chan struct{})
ticker := time.NewTicker(c.config.PingFrequency)

defer ticker.Stop()

for {
select {
case <-ticker.C:
// Each time we get a tick, we send off a ping and start a
// goroutine to handle the pong.
timestamp := time.Now().Unix()
pongChan := make(chan struct{})
pingHandlers[fmt.Sprintf("%d", timestamp)] = pongChan
wg.Add(1)
go c.handlePing(timestamp, pongChan, wg, exiting)
case data := <-c.incomingPongChan:
// Make sure the pong gets routed to the correct
// goroutine.
c := pingHandlers[data]
delete(pingHandlers, data)

if c != nil {
c <- struct{}{}
}
case <-exiting:
return
}
}
}()
}

func (c *Client) pingLoop(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) {
func (c *Client) handlePing(timestamp int64, pongChan chan struct{}, wg *sync.WaitGroup, exiting chan struct{}) {
defer wg.Done()

var (
needsPong = map[string]bool{}
pingDeadlines []*pingDeadline
currentDeadline *pingDeadline
ticker = time.NewTicker(c.config.PingFrequency)
)
err := c.Writef("PING :%d", timestamp)
if err != nil {
c.sendError(err)
return
}

defer ticker.Stop()
timer := time.NewTimer(c.config.PingTimeout)
defer timer.Stop()

for {
// Find the next ping we haven't received yet.
for len(pingDeadlines) > 0 && !needsPong[currentDeadline.Data] {
currentDeadline = pingDeadlines[0]
pingDeadlines = pingDeadlines[1:]
}

select {
case <-ticker.C:
// Every time the ticker fires off we need to send a ping
// and store that we sent it.
timestamp := time.Now()
err := c.Writef("PING :%d", timestamp.Unix())
if err != nil {
errChan <- err
return
}
deadline := &pingDeadline{
Deadline: time.After(c.config.PingTimeout),
Data: fmt.Sprintf("%d", timestamp.Unix()),
}
needsPong[deadline.Data] = true
pingDeadlines = append(pingDeadlines, deadline)
case <-currentDeadline.Deadline:
// When the deadline comes back, if we still haven't
// gotten a pong, we kill the connection.
if !needsPong[currentDeadline.Data] {
continue
}
select {
case <-timer.C:
c.sendError(err)
case <-pongChan:
return
case <-exiting:
return
}
}

errChan <- errors.New("PING timeout")
return
case data := <-c.incomingPongChan:
delete(needsPong, data)
case <-exiting:
return
}
func (c *Client) sendError(err error) {
select {
case c.errChan <- err:
default:
}
}

Expand All @@ -217,11 +223,10 @@ func (c *Client) Run() error {
// exiting is used by the main goroutine here to ensure any sub-goroutines
// get closed when exiting.
exiting := make(chan struct{})
errChan := make(chan error, 3)
var wg sync.WaitGroup

c.maybeStartLimiter(&wg, errChan, exiting)
c.maybeStartPingLoop(&wg, errChan, exiting)
c.maybeStartLimiter(&wg, exiting)
c.maybeStartPingLoop(&wg, exiting)

c.currentNick = c.config.Nick

Expand All @@ -235,7 +240,7 @@ func (c *Client) Run() error {
for {
m, err := c.ReadMessage()
if err != nil {
errChan <- err
c.sendError(err)
break
}

Expand All @@ -250,7 +255,7 @@ func (c *Client) Run() error {

// Wait for an error from any goroutine, then signal we're exiting and wait
// for the goroutines to exit.
err := <-errChan
err := <-c.errChan
close(exiting)
wg.Wait()

Expand Down

0 comments on commit 96372b4

Please sign in to comment.