Skip to content

Commit

Permalink
Merge d0a014b into acbcebe
Browse files Browse the repository at this point in the history
  • Loading branch information
belak committed Aug 22, 2017
2 parents acbcebe + d0a014b commit e26c95b
Show file tree
Hide file tree
Showing 5 changed files with 459 additions and 120 deletions.
138 changes: 116 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package irc

import (
"errors"
"fmt"
"io"
"sync"
"time"
)

Expand All @@ -25,6 +28,14 @@ var clientFilters = map[string]func(*Client, *Message){
reply.Command = "PONG"
c.WriteMessage(reply)
},
"PONG": func(c *Client, m *Message) {
if c.incomingPongChan != nil {
select {
case c.incomingPongChan <- m.Trailing():
default:
}
}
},
"PRIVMSG": func(c *Client, m *Message) {
// Clean up CTCP stuff so everyone doesn't have to parse it
// manually.
Expand All @@ -50,6 +61,10 @@ type ClientConfig struct {
User string
Name string

// Connection settings
PingFrequency time.Duration
PingTimeout time.Duration

// SendLimit is how frequent messages can be sent. If this is zero,
// there will be no limit.
SendLimit time.Duration
Expand All @@ -68,18 +83,18 @@ type Client struct {
config ClientConfig

// Internal state
currentNick string
limitTick *time.Ticker
limiter chan struct{}
tickDone chan struct{}
currentNick string
limiter chan struct{}
incomingPongChan chan string
errChan chan error
}

// NewClient creates a client given an io stream and a client config.
func NewClient(rwc io.ReadWriter, config ClientConfig) *Client {
func NewClient(rw io.ReadWriter, config ClientConfig) *Client {
c := &Client{
Conn: NewConn(rwc),
config: config,
tickDone: make(chan struct{}),
Conn: NewConn(rw),
config: config,
errChan: make(chan error, 1),
}

// Replace the writer writeCallback with one of our own
Expand All @@ -97,52 +112,122 @@ func (c *Client) writeCallback(w *Writer, line string) error {
return err
}

func (c *Client) maybeStartLimiter() {
func (c *Client) maybeStartLimiter(wg *sync.WaitGroup, exiting chan struct{}) {
if c.config.SendLimit == 0 {
return
}

wg.Add(1)

// If SendBurst is 0, this will be unbuffered, so keep that in mind.
c.limiter = make(chan struct{}, c.config.SendBurst)

c.limitTick = time.NewTicker(c.config.SendLimit)
limitTick := time.NewTicker(c.config.SendLimit)

go func() {
defer wg.Done()

var done bool
for !done {
select {
case <-c.limitTick.C:
case <-limitTick.C:
select {
case c.limiter <- struct{}{}:
default:
}
case <-c.tickDone:
case <-exiting:
done = true
}
}

c.limitTick.Stop()
limitTick.Stop()
close(c.limiter)
c.limiter = nil
c.tickDone <- struct{}{}
}()
}

func (c *Client) stopLimiter() {
if c.limiter == nil {
func (c *Client) maybeStartPingLoop(wg *sync.WaitGroup, exiting chan struct{}) {
if c.config.PingFrequency <= 0 {
return
}

c.tickDone <- struct{}{}
<-c.tickDone
wg.Add(1)

c.incomingPongChan = make(chan string, 5)

go func() {
defer wg.Done()

pingHandlers := make(map[string]chan struct{})
ticker := time.NewTicker(c.config.PingFrequency)

defer ticker.Stop()

for {
select {
case <-ticker.C:
// Each time we get a tick, we send off a ping and start a
// goroutine to handle the pong.
timestamp := time.Now().Unix()
pongChan := make(chan struct{})
pingHandlers[fmt.Sprintf("%d", timestamp)] = pongChan
wg.Add(1)
go c.handlePing(timestamp, pongChan, wg, exiting)
case data := <-c.incomingPongChan:
// Make sure the pong gets routed to the correct
// goroutine.
c := pingHandlers[data]
delete(pingHandlers, data)

if c != nil {
c <- struct{}{}
}
case <-exiting:
return
}
}
}()
}

func (c *Client) handlePing(timestamp int64, pongChan chan struct{}, wg *sync.WaitGroup, exiting chan struct{}) {
defer wg.Done()

err := c.Writef("PING :%d", timestamp)
if err != nil {
c.sendError(err)
return
}

timer := time.NewTimer(c.config.PingTimeout)
defer timer.Stop()

select {
case <-timer.C:
c.sendError(errors.New("Ping Timeout"))
case <-pongChan:
return
case <-exiting:
return
}
}

func (c *Client) sendError(err error) {
select {
case c.errChan <- err:
default:
}
}

// Run starts the main loop for this IRC connection. Note that it may break in
// strange and unexpected ways if it is called again before the first connection
// exits.
func (c *Client) Run() error {
c.maybeStartLimiter()
defer c.stopLimiter()
// exiting is used by the main goroutine here to ensure any sub-goroutines
// get closed when exiting.
exiting := make(chan struct{})
var wg sync.WaitGroup

c.maybeStartLimiter(&wg, exiting)
c.maybeStartPingLoop(&wg, exiting)

c.currentNick = c.config.Nick

Expand All @@ -156,7 +241,8 @@ func (c *Client) Run() error {
for {
m, err := c.ReadMessage()
if err != nil {
return err
c.sendError(err)
break
}

if f, ok := clientFilters[m.Command]; ok {
Expand All @@ -167,6 +253,14 @@ func (c *Client) Run() error {
c.config.Handler.Handle(c, m)
}
}

// Wait for an error from any goroutine, then signal we're exiting and wait
// for the goroutines to exit.
err := <-c.errChan
close(exiting)
wg.Wait()

return err
}

// CurrentNick returns what the nick of the client is known to be at this point
Expand Down

0 comments on commit e26c95b

Please sign in to comment.