diff --git a/.travis.yml b/.travis.yml index 494bd20..4d0114d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ before_install: - rm ./testcases/*.go script: - - gometalinter --fast ./... -D gas + - gometalinter --fast ./... -D gas --cyclo-over=12 - go test -race -v ./... - go test -covermode=count -coverprofile=profile.cov diff --git a/client.go b/client.go index 0e45cb3..9323ff7 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,10 @@ package irc import ( + "errors" + "fmt" "io" + "sync" "time" ) @@ -25,6 +28,14 @@ var clientFilters = map[string]func(*Client, *Message){ reply.Command = "PONG" c.WriteMessage(reply) }, + "PONG": func(c *Client, m *Message) { + if c.incomingPongChan != nil { + select { + case c.incomingPongChan <- m.Trailing(): + default: + } + } + }, "PRIVMSG": func(c *Client, m *Message) { // Clean up CTCP stuff so everyone doesn't have to parse it // manually. @@ -50,6 +61,10 @@ type ClientConfig struct { User string Name string + // Connection settings + PingFrequency time.Duration + PingTimeout time.Duration + // SendLimit is how frequent messages can be sent. If this is zero, // there will be no limit. SendLimit time.Duration @@ -68,18 +83,16 @@ type Client struct { config ClientConfig // Internal state - currentNick string - limitTick *time.Ticker - limiter chan struct{} - tickDone chan struct{} + currentNick string + limiter chan struct{} + incomingPongChan chan string } // NewClient creates a client given an io stream and a client config. -func NewClient(rwc io.ReadWriter, config ClientConfig) *Client { +func NewClient(rw io.ReadWriter, config ClientConfig) *Client { c := &Client{ - Conn: NewConn(rwc), - config: config, - tickDone: make(chan struct{}), + Conn: NewConn(rw), + config: config, } // Replace the writer writeCallback with one of our own @@ -97,52 +110,105 @@ func (c *Client) writeCallback(w *Writer, line string) error { return err } -func (c *Client) maybeStartLimiter() { +func (c *Client) maybeStartLimiter(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) { if c.config.SendLimit == 0 { return } + wg.Add(1) + // If SendBurst is 0, this will be unbuffered, so keep that in mind. c.limiter = make(chan struct{}, c.config.SendBurst) - - c.limitTick = time.NewTicker(c.config.SendLimit) + limitTick := time.NewTicker(c.config.SendLimit) go func() { + defer wg.Done() + var done bool for !done { select { - case <-c.limitTick.C: + case <-limitTick.C: select { case c.limiter <- struct{}{}: default: } - case <-c.tickDone: + case <-exiting: done = true } } - c.limitTick.Stop() + limitTick.Stop() close(c.limiter) c.limiter = nil - c.tickDone <- struct{}{} }() } -func (c *Client) stopLimiter() { - if c.limiter == nil { +func (c *Client) maybeStartPingLoop(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) { + if c.config.PingFrequency <= 0 { return } - c.tickDone <- struct{}{} - <-c.tickDone + wg.Add(1) + c.incomingPongChan = make(chan string, 5) + go c.pingLoop(wg, errChan, exiting) +} + +func (c *Client) pingLoop(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) { + defer wg.Done() + + var ( + sentPings []time.Time + pingTimeoutChan <-chan time.Time + ticker = time.NewTicker(c.config.PingFrequency) + ) + + defer ticker.Stop() + + for { + // Reset the pingTimeoutChan if we have any pings we're waiting for + // and it isn't currently set. + if len(sentPings) > 0 && pingTimeoutChan == nil { + pingTimeoutChan = time.After(time.Now().Sub(sentPings[0]) + c.config.PingTimeout) + } + + select { + case <-ticker.C: + timestamp := time.Now() + err := c.Writef("PING :%d", timestamp.Unix()) + if err != nil { + errChan <- err + return + } + sentPings = append(sentPings, timestamp) + case <-pingTimeoutChan: + errChan <- errors.New("PING timeout") + return + case data := <-c.incomingPongChan: + if len(sentPings) == 0 || data != fmt.Sprintf("%d", sentPings[0].Unix()) { + continue + } + + // Drop the first ping and clear the timeout chan + sentPings = sentPings[1:] + pingTimeoutChan = nil + case <-exiting: + return + } + } } // Run starts the main loop for this IRC connection. Note that it may break in // strange and unexpected ways if it is called again before the first connection // exits. func (c *Client) Run() error { - c.maybeStartLimiter() - defer c.stopLimiter() + // exiting is used by the main goroutine here to ensure any sub-goroutines + // get closed when exiting. + exiting := make(chan struct{}) + errChan := make(chan error, 3) + var wg sync.WaitGroup + + c.maybeStartLimiter(&wg, errChan, exiting) + c.maybeStartPingLoop(&wg, errChan, exiting) c.currentNick = c.config.Nick @@ -156,7 +222,8 @@ func (c *Client) Run() error { for { m, err := c.ReadMessage() if err != nil { - return err + errChan <- err + break } if f, ok := clientFilters[m.Command]; ok { @@ -167,6 +234,14 @@ func (c *Client) Run() error { c.config.Handler.Handle(c, m) } } + + // Wait for an error from any goroutine, then signal we're exiting and wait + // for the goroutines to exit. + err := <-errChan + close(exiting) + wg.Wait() + + return err } // CurrentNick returns what the nick of the client is known to be at this point diff --git a/conn.go b/conn.go index 0c9c815..7f9b68e 100644 --- a/conn.go +++ b/conn.go @@ -15,13 +15,10 @@ type Conn struct { // NewConn creates a new Conn func NewConn(rw io.ReadWriter) *Conn { - // Create the client - c := &Conn{ + return &Conn{ NewReader(rw), NewWriter(rw), } - - return c } // Writer is the outgoing side of a connection. @@ -79,7 +76,10 @@ type Reader struct { reader *bufio.Reader } -// NewReader creates an irc.Reader from an io.Reader. +// NewReader creates an irc.Reader from an io.Reader. Note that once a reader is +// passed into this function, you should no longer use it as it is being used +// inside a bufio.Reader so you cannot rely on only the amount of data for a +// Message being read when you call ReadMessage. func NewReader(r io.Reader) *Reader { return &Reader{ nil,