Skip to content

Commit

Permalink
Merge 62f7109 into acbcebe
Browse files Browse the repository at this point in the history
  • Loading branch information
belak authored Aug 18, 2017
2 parents acbcebe + 62f7109 commit 586221f
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 27 deletions.
116 changes: 94 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package irc

import (
"errors"
"fmt"
"io"
"sync"
"time"
)

Expand All @@ -25,6 +28,11 @@ var clientFilters = map[string]func(*Client, *Message){
reply.Command = "PONG"
c.WriteMessage(reply)
},
"PONG": func(c *Client, m *Message) {
if c.incomingPongChan != nil {
c.incomingPongChan <- m.Trailing()
}
},
"PRIVMSG": func(c *Client, m *Message) {
// Clean up CTCP stuff so everyone doesn't have to parse it
// manually.
Expand All @@ -50,6 +58,10 @@ type ClientConfig struct {
User string
Name string

// Connection settings
PingFrequency time.Duration
PingTimeout time.Duration

// SendLimit is how frequent messages can be sent. If this is zero,
// there will be no limit.
SendLimit time.Duration
Expand All @@ -68,18 +80,16 @@ type Client struct {
config ClientConfig

// Internal state
currentNick string
limitTick *time.Ticker
limiter chan struct{}
tickDone chan struct{}
currentNick string
limiter chan struct{}
incomingPongChan chan string
}

// NewClient creates a client given an io stream and a client config.
func NewClient(rwc io.ReadWriter, config ClientConfig) *Client {
func NewClient(rw io.ReadWriter, config ClientConfig) *Client {
c := &Client{
Conn: NewConn(rwc),
config: config,
tickDone: make(chan struct{}),
Conn: NewConn(rw),
config: config,
}

// Replace the writer writeCallback with one of our own
Expand All @@ -97,52 +107,105 @@ func (c *Client) writeCallback(w *Writer, line string) error {
return err
}

func (c *Client) maybeStartLimiter() {
func (c *Client) maybeStartLimiter(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) {
if c.config.SendLimit == 0 {
return
}

wg.Add(1)

// If SendBurst is 0, this will be unbuffered, so keep that in mind.
c.limiter = make(chan struct{}, c.config.SendBurst)

c.limitTick = time.NewTicker(c.config.SendLimit)
limitTick := time.NewTicker(c.config.SendLimit)

go func() {
defer wg.Done()

var done bool
for !done {
select {
case <-c.limitTick.C:
case <-limitTick.C:
select {
case c.limiter <- struct{}{}:
default:
}
case <-c.tickDone:
case <-exiting:
done = true
}
}

c.limitTick.Stop()
limitTick.Stop()
close(c.limiter)
c.limiter = nil
c.tickDone <- struct{}{}
}()
}

func (c *Client) stopLimiter() {
if c.limiter == nil {
func (c *Client) maybeStartPingLoop(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) {
if c.config.PingFrequency <= 0 {
return
}

c.tickDone <- struct{}{}
<-c.tickDone
wg.Add(1)
c.incomingPongChan = make(chan string, 5)
go c.pingLoop(wg, errChan, exiting)
}

func (c *Client) pingLoop(wg *sync.WaitGroup, errChan chan error, exiting chan struct{}) {
defer wg.Done()

var (
sentPings []time.Time
pingTimeoutChan <-chan time.Time
ticker = time.NewTicker(c.config.PingFrequency)
)

defer ticker.Stop()

for {
// Reset the pingTimeoutChan if we have any pings we're waiting for
// and it isn't currently set.
if len(sentPings) > 0 && pingTimeoutChan == nil {
pingTimeoutChan = time.After(time.Now().Sub(sentPings[0]) + c.config.PingTimeout)
}

select {
case <-ticker.C:
timestamp := time.Now()
err := c.Writef("PING :%d", timestamp.Unix())
if err != nil {
errChan <- err
return
}
sentPings = append(sentPings, timestamp)
case <-pingTimeoutChan:
errChan <- errors.New("PING timeout")
return
case data := <-c.incomingPongChan:
if len(sentPings) == 0 || data != fmt.Sprintf("%d", sentPings[0].Unix()) {
continue
}

// Drop the first ping and clear the timeout chan
sentPings = sentPings[1:]
pingTimeoutChan = nil
case <-exiting:
return
}
}
}

// Run starts the main loop for this IRC connection. Note that it may break in
// strange and unexpected ways if it is called again before the first connection
// exits.
func (c *Client) Run() error {
c.maybeStartLimiter()
defer c.stopLimiter()
// exiting is used by the main goroutine here to ensure any sub-goroutines
// get closed when exiting.
exiting := make(chan struct{})
errChan := make(chan error, 3)
var wg sync.WaitGroup

c.maybeStartLimiter(&wg, errChan, exiting)
c.maybeStartPingLoop(&wg, errChan, exiting)

c.currentNick = c.config.Nick

Expand All @@ -156,7 +219,8 @@ func (c *Client) Run() error {
for {
m, err := c.ReadMessage()
if err != nil {
return err
errChan <- err
break
}

if f, ok := clientFilters[m.Command]; ok {
Expand All @@ -167,6 +231,14 @@ func (c *Client) Run() error {
c.config.Handler.Handle(c, m)
}
}

// Wait for an error from any goroutine, then signal we're exiting and wait
// for the goroutines to exit.
err := <-errChan
close(exiting)
wg.Wait()

return err
}

// CurrentNick returns what the nick of the client is known to be at this point
Expand Down
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ type Conn struct {

// NewConn creates a new Conn
func NewConn(rw io.ReadWriter) *Conn {
// Create the client
c := &Conn{
return &Conn{
NewReader(rw),
NewWriter(rw),
}

return c
}

// Writer is the outgoing side of a connection.
Expand Down Expand Up @@ -79,7 +76,10 @@ type Reader struct {
reader *bufio.Reader
}

// NewReader creates an irc.Reader from an io.Reader.
// NewReader creates an irc.Reader from an io.Reader. Note that once a reader is
// passed into this function, you should no longer use it as it is being used
// inside a bufio.Reader so you cannot rely on only the amount of data for a
// Message being read when you call ReadMessage.
func NewReader(r io.Reader) *Reader {
return &Reader{
nil,
Expand Down

0 comments on commit 586221f

Please sign in to comment.