Skip to content

Commit

Permalink
Check for goroutine leaks in healtchecks and sniffer
Browse files Browse the repository at this point in the history
This is a backport of olivere#543 for elastic.v3 (see also olivere#545).
  • Loading branch information
olivere authored and Dylan Jennings committed Aug 7, 2017
1 parent ecbf429 commit 5a6c61f
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
global:
- GO15VENDOREXPERIMENT=1
matrix:
- ES_VERSION=2.4.4
- ES_VERSION=2.4.6
allow_failures:
- go: tip
before_script:
Expand Down
9 changes: 9 additions & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@
Adam Alix [@adamalix](https://github.com/adamalix)
Adam Weiner [@adamweiner](https://github.com/adamweiner)
Adrian Lungu [@AdrianLungu](https://github.com/AdrianLungu)
alehano [@alehano](https://github.com/alehano)
Alex [@akotlar](https://github.com/akotlar)
Alexandre Olivier [@aliphen](https://github.com/aliphen)
Alexey Sharov [@nizsheanez](https://github.com/nizsheanez)
AndreKR [@AndreKR](https://github.com/AndreKR)
Andrew Dunham [@andrew-d](https://github.com/andrew-d)
Andrew Gaul [@andrewgaul](https://github.com/andrewgaul)
Arquivei [@arquivei](https://github.com/arquivei)
Benjamin Fernandes [@LotharSee](https://github.com/LotharSee)
Benjamin Zarzycki [@kf6nux](https://github.com/kf6nux)
Braden Bassingthwaite [@bbassingthwaite-va](https://github.com/bbassingthwaite-va)
Brady Love [@bradylove](https://github.com/bradylove)
Bryan Conklin [@bmconklin](https://github.com/bmconklin)
Bruce Zhou [@brucez-isell](https://github.com/brucez-isell)
cforbes [@cforbes](https://github.com/cforbes)
Chris M [@tebriel](https://github.com/tebriel)
Christophe Courtaut [@kri5](https://github.com/kri5)
Conrad Pankoff [@deoxxa](https://github.com/deoxxa)
Expand All @@ -31,8 +34,11 @@ Daniel Heckrath [@DanielHeckrath](https://github.com/DanielHeckrath)
Daniel Imfeld [@dimfeld](https://github.com/dimfeld)
Dwayne Schultz [@myshkin5](https://github.com/myshkin5)
Ellison Leão [@ellisonleao](https://github.com/ellisonleao)
Erwin [@eticzon](https://github.com/eticzon)
Eugene Egorov [@EugeneEgorov](https://github.com/EugeneEgorov)
Fanfan [@wenpos](https://github.com/wenpos)
Faolan C-P [@fcheslack](https://github.com/fcheslack)
Filip Tepper [@filiptepper](https://github.com/filiptepper)
Gerhard Häring [@ghaering](https://github.com/ghaering)
Guilherme Silveira [@guilherme-santos](https://github.com/guilherme-santos)
Guillaume J. Charmes [@creack](https://github.com/creack)
Expand All @@ -54,6 +60,7 @@ John Stanford [@jxstanford](https://github.com/jxstanford)
jun [@coseyo](https://github.com/coseyo)
Junpei Tsuji [@jun06t](https://github.com/jun06t)
Kenta SUZUKI [@suzuken](https://github.com/suzuken)
Kevin Mulvey [@kmulvey](https://github.com/kmulvey)
Kyle Brandt [@kylebrandt](https://github.com/kylebrandt)
Leandro Piccilli [@lpic10](https://github.com/lpic10)
Maciej Lisiewski [@c2h5oh](https://github.com/c2h5oh)
Expand All @@ -71,6 +78,7 @@ Nicholas Wolff [@nwolff](https://github.com/nwolff)
Nick K [@utrack](https://github.com/utrack)
Nick Whyte [@nickw444](https://github.com/nickw444)
Orne Brocaar [@brocaar](https://github.com/brocaar)
Pete C [@peteclark-ft](https://github.com/peteclark-ft)
Radoslaw Wesolowski [r--w](https://github.com/r--w)
Ryan Schmukler [@rschmukler](https://github.com/rschmukler)
Sacheendra talluri [@sacheendra](https://github.com/sacheendra)
Expand All @@ -85,6 +93,7 @@ Tetsuya Morimoto [@t2y](https://github.com/t2y)
TimeEmit [@TimeEmit](https://github.com/timeemit)
TusharM [@tusharm](https://github.com/tusharm)
wangtuo [@wangtuo](https://github.com/wangtuo)
Wédney Yuri [@wedneyyuri](https://github.com/wedneyyuri)
wolfkdy [@wolfkdy](https://github.com/wolfkdy)
Wyndham Blanton [@wyndhblb](https://github.com/wyndhblb)
Yarden Bar [@ayashjorden](https://github.com/ayashjorden)
Expand Down
66 changes: 44 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,8 +799,12 @@ func (c *Client) sniff(timeout time.Duration) error {

// Start sniffing on all found URLs
ch := make(chan []*conn, len(urls))

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for _, url := range urls {
go func(url string) { ch <- c.sniffNode(url) }(url)
go func(url string) { ch <- c.sniffNode(ctx, url) }(url)
}

// Wait for the results to come back, or the process times out.
Expand All @@ -811,7 +815,7 @@ func (c *Client) sniff(timeout time.Duration) error {
c.updateConns(conns)
return nil
}
case <-time.After(timeout):
case <-ctx.Done():
// We get here if no cluster responds in time
return ErrNoClient
}
Expand All @@ -822,7 +826,7 @@ func (c *Client) sniff(timeout time.Duration) error {
// in sniff. If successful, it returns the list of node URLs extracted
// from the result of calling Nodes Info API. Otherwise, an empty array
// is returned.
func (c *Client) sniffNode(url string) []*conn {
func (c *Client) sniffNode(ctx context.Context, url string) []*conn {
var nodes []*conn

// Call the Nodes Info API at /_nodes/http
Expand All @@ -840,7 +844,7 @@ func (c *Client) sniffNode(url string) []*conn {
}
c.mu.RUnlock()

res, err := c.c.Do((*http.Request)(req))
res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
if err != nil {
return nodes
}
Expand Down Expand Up @@ -974,37 +978,55 @@ func (c *Client) healthcheck(timeout time.Duration, force bool) {
conns := c.conns
c.connsMu.RUnlock()

timeoutInMillis := int64(timeout / time.Millisecond)

for _, conn := range conns {
params := make(url.Values)
params.Set("timeout", fmt.Sprintf("%dms", timeoutInMillis))
req, err := NewRequest("HEAD", conn.URL()+"/?"+params.Encode())
if err == nil {
// Run the HEAD request against ES with a timeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Goroutine executes the HTTP request, returns an error and sets status
var status int
errc := make(chan error, 1)
go func(url string) {
req, err := NewRequest("HEAD", url)
if err != nil {
errc <- err
return
}
if basicAuth {
req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
}
if c.prepareRequest != nil {
c.prepareRequest((*http.Request)(req))
}
res, err := c.c.Do((*http.Request)(req))
res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
if err == nil {
status = res.StatusCode
if res.Body != nil {
defer res.Body.Close()
res.Body.Close()
}
if res.StatusCode >= 200 && res.StatusCode < 300 {
conn.MarkAsAlive()
} else {
conn.MarkAsDead()
c.errorf("elastic: %s is dead [status=%d]", conn.URL(), res.StatusCode)
}
} else {
c.errorf("elastic: %s is dead", conn.URL())
conn.MarkAsDead()
}
} else {
errc <- err
}(conn.URL())

// Wait for the Goroutine (or its timeout)
select {
case <-ctx.Done(): // timeout
c.errorf("elastic: %s is dead", conn.URL())
conn.MarkAsDead()
break
case err := <-errc:
if err != nil {
c.errorf("elastic: %s is dead", conn.URL())
conn.MarkAsDead()
break
}
if status >= 200 && status < 300 {
conn.MarkAsAlive()
} else {
conn.MarkAsDead()
c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
}
break
}
}
}
Expand Down
142 changes: 141 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"errors"
"fmt"
"log"
"net"
"net/http"
"reflect"
"regexp"
"strings"
"testing"
"time"

"github.com/fortytw2/leaktest"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -264,6 +267,74 @@ func TestClientHealthcheckStartupTimeout(t *testing.T) {
}
}

func TestClientHealthcheckTimeoutLeak(t *testing.T) {
// This test test checks if healthcheck requests are canceled
// after timeout.
// It contains couple of hacks which won't be needed once we
// stop supporting Go1.7.
// On Go1.7 it uses server side effects to monitor if connection
// was closed,
// and on Go 1.8+ we're additionally honestly monitoring routine
// leaks via leaktest.
mux := http.NewServeMux()

var reqDone bool
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
cn, ok := w.(http.CloseNotifier)
if !ok {
t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
}
<-cn.CloseNotify()
reqDone = true
})

lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Couldn't setup listener: %v", err)
}
addr := lis.Addr().String()

srv := &http.Server{
Handler: mux,
}
go srv.Serve(lis)

cli := &Client{
c: &http.Client{},
conns: []*conn{
&conn{
url: "http://" + addr + "/",
},
},
}

type closer interface {
Shutdown(context.Context) error
}

// pre-Go1.8 Server can't Shutdown
cl, isServerCloseable := (interface{}(srv)).(closer)

// Since Go1.7 can't Shutdown() - there will be leak from server
// Monitor leaks on Go 1.8+
if isServerCloseable {
defer leaktest.CheckTimeout(t, time.Second*10)()
}

cli.healthcheck(time.Millisecond*500, true)

if isServerCloseable {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
cl.Shutdown(ctx)
}

<-time.After(time.Second)
if !reqDone {
t.Fatal("Request wasn't canceled or stopped")
}
}

// -- NewSimpleClient --

func TestSimpleClientDefaults(t *testing.T) {
Expand Down Expand Up @@ -400,7 +471,7 @@ func TestClientSniffNode(t *testing.T) {
}

ch := make(chan []*conn)
go func() { ch <- client.sniffNode(DefaultURL) }()
go func() { ch <- client.sniffNode(context.Background(), DefaultURL) }()

select {
case nodes := <-ch:
Expand Down Expand Up @@ -454,6 +525,75 @@ func TestClientSniffOnDefaultURL(t *testing.T) {
}
}

func TestClientSniffTimeoutLeak(t *testing.T) {
// This test test checks if sniff requests are canceled
// after timeout.
// It contains couple of hacks which won't be needed once we
// stop supporting Go1.7.
// On Go1.7 it uses server side effects to monitor if connection
// was closed,
// and on Go 1.8+ we're additionally honestly monitoring routine
// leaks via leaktest.
mux := http.NewServeMux()

var reqDone bool
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
cn, ok := w.(http.CloseNotifier)
if !ok {
t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
}
<-cn.CloseNotify()
reqDone = true
})

lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Couldn't setup listener: %v", err)
}
addr := lis.Addr().String()

srv := &http.Server{
Handler: mux,
}
go srv.Serve(lis)

cli := &Client{
c: &http.Client{},
conns: []*conn{
&conn{
url: "http://" + addr + "/",
},
},
snifferEnabled: true,
}

type closer interface {
Shutdown(context.Context) error
}

// pre-Go1.8 Server can't Shutdown
cl, isServerCloseable := (interface{}(srv)).(closer)

// Since Go1.7 can't Shutdown() - there will be leak from server
// Monitor leaks on Go 1.8+
if isServerCloseable {
defer leaktest.CheckTimeout(t, time.Second*10)()
}

cli.sniff(time.Millisecond * 500)

if isServerCloseable {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
cl.Shutdown(ctx)
}

<-time.After(time.Second)
if !reqDone {
t.Fatal("Request wasn't canceled or stopped")
}
}

func TestClientExtractHostname(t *testing.T) {
tests := []struct {
Scheme string
Expand Down

0 comments on commit 5a6c61f

Please sign in to comment.