Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing memory leak caused by blocking query #939

Merged
merged 7 commits into from
May 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions consul/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,41 @@ import (
// notify list.
type NotifyGroup struct {
l sync.Mutex
notify []chan struct{}
notify map[chan struct{}]struct{}
}

// Notify will do a non-blocking send to all waiting channels, and
// clear the notify list
func (n *NotifyGroup) Notify() {
n.l.Lock()
defer n.l.Unlock()
for _, ch := range n.notify {
for ch, _ := range n.notify {
select {
case ch <- struct{}{}:
default:
}
}
n.notify = n.notify[:0]
n.notify = nil
}

// Wait adds a channel to the notify group
func (n *NotifyGroup) Wait(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
n.notify = append(n.notify, ch)
if n.notify == nil {
n.notify = make(map[chan struct{}]struct{})
}
n.notify[ch] = struct{}{}
}

// Clear removes a channel from the notify group
func (n *NotifyGroup) Clear(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
if n.notify == nil {
return
}
delete(n.notify, ch)
}

// WaitCh allocates a channel that is subscribed to notifications
Expand Down
16 changes: 16 additions & 0 deletions consul/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,19 @@ func TestNotifyGroup(t *testing.T) {
t.Fatalf("should not block")
}
}

func TestNotifyGroup_Clear(t *testing.T) {
grp := &NotifyGroup{}

ch1 := grp.WaitCh()
grp.Clear(ch1)

grp.Notify()

// Should not get anything
select {
case <-ch1:
t.Fatalf("should not get message")
default:
}
}
54 changes: 36 additions & 18 deletions consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second

// defaultQueryTime is the amount of time we block waiting for a change
// if no time is specified. Previously we would wait the maxQueryTime.
defaultQueryTime = 300 * time.Second

// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter
jitterFraction = 16

// Warn if the Raft command is larger than this.
// If it's over 1MB something is probably being abusive.
raftWarnSize = 1024 * 1024
Expand Down Expand Up @@ -314,8 +323,9 @@ type blockingRPCOptions struct {
// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be prefered over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
var timeout <-chan time.Time
var timeout *time.Timer
var notifyCh chan struct{}
var state *StateStore

// Fast path non-blocking
if opts.queryOpts.MinQueryIndex == 0 {
Expand All @@ -327,30 +337,38 @@ func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
panic("no tables to block on")
}

// Restrict the max query time
// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > maxQueryTime {
opts.queryOpts.MaxQueryTime = maxQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = defaultQueryTime
}

// Ensure a time limit is set if we have an index
if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 {
opts.queryOpts.MaxQueryTime = maxQueryTime
}
// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)

// Setup a query timeout
if opts.queryOpts.MaxQueryTime > 0 {
timeout = time.After(opts.queryOpts.MaxQueryTime)
}
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)

// Setup the notify channel
notifyCh = make(chan struct{}, 1)

// Setup a notification channel for changes
SETUP_NOTIFY:
if opts.queryOpts.MinQueryIndex > 0 {
notifyCh = make(chan struct{}, 1)
state := s.fsm.State()
state.Watch(opts.tables, notifyCh)
// Ensure we tear down any watchers on return
state = s.fsm.State()
defer func() {
timeout.Stop()
state.StopWatch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
state.StopWatchKV(opts.kvPrefix, notifyCh)
}
}()

REGISTER_NOTIFY:
// Register the notification channel. This may be done
// multiple times if we have not reached the target wait index.
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}

RUN_QUERY:
Expand All @@ -372,8 +390,8 @@ RUN_QUERY:
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY
case <-timeout:
goto REGISTER_NOTIFY
case <-timeout.C:
}
}
return err
Expand Down
19 changes: 19 additions & 0 deletions consul/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,13 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
}
}

// StopWatch is used to unsubscribe a channel to a set of MDBTables
func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing a test for this function

for _, t := range tables {
s.watch[t].Clear(notify)
}
}

// WatchKV is used to subscribe a channel to changes in KV data
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
Expand All @@ -439,6 +446,18 @@ func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
s.kvWatch.Insert(prefix, grp)
}

// StopWatchKV is used to unsubscribe a channel from changes in KV data
func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) {
s.kvWatchLock.Lock()
defer s.kvWatchLock.Unlock()

// Check for an existing notify group
if raw, ok := s.kvWatch.Get(prefix); ok {
grp := raw.(*NotifyGroup)
grp.Clear(notify)
}
}

// notifyKV is used to notify any KV listeners of a change
// on a prefix
func (s *StateStore) notifyKV(path string, prefix bool) {
Expand Down
57 changes: 57 additions & 0 deletions consul/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,37 @@ func TestGetNodes(t *testing.T) {
}
}

func TestGetNodes_Watch_StopWatch(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()

notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)

store.Watch(store.QueryTables("Nodes"), notify1)
store.Watch(store.QueryTables("Nodes"), notify2)
store.StopWatch(store.QueryTables("Nodes"), notify2)

if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}

select {
case <-notify1:
default:
t.Fatalf("should be notified")
}

select {
case <-notify2:
t.Fatalf("should not be notified")
default:
}
}

func BenchmarkGetNodes(b *testing.B) {
store, err := testStateStore()
if err != nil {
Expand Down Expand Up @@ -1429,6 +1460,32 @@ func TestKVSSet_Watch(t *testing.T) {
}
}

func TestKVSSet_Watch_Stop(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()

notify1 := make(chan struct{}, 1)

store.WatchKV("", notify1)
store.StopWatchKV("", notify1)

// Create the entry
d := &structs.DirEntry{Key: "foo/baz", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}

// Check that we've not fired notify1
select {
case <-notify1:
t.Fatalf("should not notify ")
default:
}
}

func TestKVSSet_Get(t *testing.T) {
store, err := testStateStore()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions consul/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -222,3 +224,8 @@ func generateUUID() string {
buf[8:10],
buf[10:16])
}

// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}
11 changes: 11 additions & 0 deletions consul/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"regexp"
"testing"
"time"

"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -124,3 +125,13 @@ func TestGenerateUUID(t *testing.T) {
}
}
}

func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}
6 changes: 3 additions & 3 deletions website/source/docs/agent/http.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ query string parameter to the value of `X-Consul-Index`, indicating that the cli
to wait for any changes subsequent to that index.

In addition to `index`, endpoints that support blocking will also honor a `wait`
parameter specifying a maximum duration for the blocking request. If not set, it will
default to 10 minutes. This value can be specified in the form of "10s" or "5m" (i.e.,
10 seconds or 5 minutes, respectively).
parameter specifying a maximum duration for the blocking request. This is limited to
10 minutes. If not set, the wait time defaults to 5 minutes. This value can be specified
in the form of "10s" or "5m" (i.e., 10 seconds or 5 minutes, respectively).

A critical note is that the return of a blocking request is **no guarantee** of a change. It
is possible that the timeout was reached or that there was an idempotent write that does
Expand Down