Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memleak in safe.Pool #6140

Merged
merged 7 commits into from Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 7 additions & 50 deletions pkg/safe/routine.go
Expand Up @@ -19,14 +19,13 @@ type routineCtx func(ctx context.Context)

// Pool is a pool of go routines
type Pool struct {
routines []routine
routinesCtx []routineCtx
waitGroup sync.WaitGroup
lock sync.Mutex
baseCtx context.Context
baseCancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
routines []routine
waitGroup sync.WaitGroup
lock sync.Mutex
baseCtx context.Context
baseCancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
}

// NewPool creates a Pool
Expand All @@ -46,17 +45,9 @@ func (p *Pool) Ctx() context.Context {
return p.baseCtx
}

// AddGoCtx adds a recoverable goroutine with a context without starting it
func (p *Pool) AddGoCtx(goroutine routineCtx) {
p.lock.Lock()
p.routinesCtx = append(p.routinesCtx, goroutine)
p.lock.Unlock()
}

// GoCtx starts a recoverable goroutine with a context
func (p *Pool) GoCtx(goroutine routineCtx) {
p.lock.Lock()
p.routinesCtx = append(p.routinesCtx, goroutine)
p.waitGroup.Add(1)
Go(func() {
defer p.waitGroup.Done()
Expand All @@ -65,17 +56,6 @@ func (p *Pool) GoCtx(goroutine routineCtx) {
p.lock.Unlock()
}

// addGo adds a recoverable goroutine, and can be stopped with stop chan
func (p *Pool) addGo(goroutine func(stop chan bool)) {
p.lock.Lock()
newRoutine := routine{
goroutine: goroutine,
stop: make(chan bool, 1),
}
p.routines = append(p.routines, newRoutine)
p.lock.Unlock()
}

// Go starts a recoverable goroutine, and can be stopped with stop chan
func (p *Pool) Go(goroutine func(stop chan bool)) {
p.lock.Lock()
Expand Down Expand Up @@ -114,29 +94,6 @@ func (p *Pool) Cleanup() {
p.baseCancel()
}

// Start starts all stopped routines
func (p *Pool) Start() {
p.lock.Lock()
defer p.lock.Unlock()
p.ctx, p.cancel = context.WithCancel(p.baseCtx)
for i := range p.routines {
p.waitGroup.Add(1)
p.routines[i].stop = make(chan bool, 1)
Go(func() {
defer p.waitGroup.Done()
p.routines[i].goroutine(p.routines[i].stop)
})
}

for _, routine := range p.routinesCtx {
p.waitGroup.Add(1)
Go(func() {
defer p.waitGroup.Done()
routine(p.ctx)
})
}
}

// Go starts a recoverable goroutine
func Go(goroutine func()) {
GoWithRecover(goroutine, defaultRecoverGoroutine)
Expand Down
58 changes: 0 additions & 58 deletions pkg/safe/routine_test.go
Expand Up @@ -67,13 +67,6 @@ func TestPoolWithCtx(t *testing.T) {
p.GoCtx(testRoutine.routineCtx)
},
},
{
desc: "AddGoCtx()",
fn: func(p *Pool) {
p.AddGoCtx(testRoutine.routineCtx)
p.Start()
},
},
}

for _, test := range testCases {
Expand All @@ -87,9 +80,6 @@ func TestPoolWithCtx(t *testing.T) {

test.fn(p)
defer p.Cleanup()
if len(p.routinesCtx) != 1 {
t.Fatalf("After %s, Pool did have %d goroutineCtxs, expected 1", test.desc, len(p.routinesCtx))
}

testDone := make(chan bool, 1)
go func() {
Expand Down Expand Up @@ -140,40 +130,6 @@ func TestPoolWithStopChan(t *testing.T) {
}
}

func TestPoolStartWithStopChan(t *testing.T) {
testRoutine := newFakeRoutine()

p := NewPool(context.Background())

timer := time.NewTimer(500 * time.Millisecond)
defer timer.Stop()

// Insert the stopped test goroutine via private fields into the Pool.
// There currently is no way to insert a routine via exported funcs that is not started immediately.
p.lock.Lock()
newRoutine := routine{
goroutine: testRoutine.routine,
}
p.routines = append(p.routines, newRoutine)
p.lock.Unlock()
p.Start()

testDone := make(chan bool, 1)
go func() {
<-testRoutine.startSig
p.Cleanup()
testDone <- true
}()
select {
case <-timer.C:
testRoutine.Lock()
defer testRoutine.Unlock()
t.Fatalf("Pool.Start() did not complete in time, goroutine started equals '%t'", testRoutine.started)
case <-testDone:
return
}
}

func TestPoolCleanupWithGoPanicking(t *testing.T) {
testRoutine := func(stop chan bool) {
panic("BOOM")
Expand All @@ -193,26 +149,12 @@ func TestPoolCleanupWithGoPanicking(t *testing.T) {
p.Go(testRoutine)
},
},
{
desc: "addGo() and Start()",
fn: func(p *Pool) {
p.addGo(testRoutine)
p.Start()
},
},
{
desc: "GoCtx()",
fn: func(p *Pool) {
p.GoCtx(testCtxRoutine)
},
},
{
desc: "AddGoCtx() and Start()",
fn: func(p *Pool) {
p.AddGoCtx(testCtxRoutine)
p.Start()
},
},
}

for _, test := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/service/loadbalancer/mirror/mirror.go
Expand Up @@ -100,7 +100,7 @@ type blackholeResponseWriter struct{}
func (b blackholeResponseWriter) Flush() {}

func (b blackholeResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return nil, nil, errors.New("you can hijack connection on blackholeResponseWriter")
return nil, nil, errors.New("connection on blackholeResponseWriter cannot be hijacked")
}

func (b blackholeResponseWriter) Header() http.Header {
Expand Down