Skip to content

Commit

Permalink
sync: do not clear sync.Pools completely every GC
Browse files Browse the repository at this point in the history
Specifically, we clear only half of the poolLocals. To do this we
also have to switch from a global array of Pools to a linked list,
so that we can retain Pools in use and drop Pools that are empty
without allocating.

This means that, for a Pool that suddenly stops being used and gets
dropped:
- during the first GC: half of the poolLocals are cleared
- during the second GC: the second half of the poolLocals are cleared
- during the third GC: the Pool itself is dropped from allPools

This simplified approach is chosen as this allows to not have to worry
about resizing the shared arrays during clearPools and it does not add
any synchronization (or atomic operations) during Put/Get.

Fixes golang#22950
  • Loading branch information
CAFxX committed Feb 14, 2019
1 parent 7cf31d8 commit 05f48ad
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 8 deletions.
99 changes: 91 additions & 8 deletions src/sync/pool.go
Expand Up @@ -205,7 +205,7 @@ func (p *Pool) pinSlow() *poolLocal {
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
allPools.pushBack(p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
Expand All @@ -215,32 +215,56 @@ func (p *Pool) pinSlow() *poolLocal {
return &local[pid]
}

var cleanupCount uint

func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Defensively zero out everything, 2 reasons:
// When pools or poolLocals need to be emptied we defensively zero out everything for 2 reasons:
// 1. To prevent false retention of whole Pools.
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
// it will retain whole Pool. So next cycle memory consumption would be doubled.
for i, p := range allPools {
allPools[i] = nil
// Under normal circumstances we don't delete all pools, instead we drop half of the poolLocals
// every cycle, and whole pools if all poolLocals are empty when starting the cleanup (this means
// that a non-empty Pool will take 3 GC cycles to be completely deleted: the first will delete
// half of the poolLocals, the second the remaining half and the third the now empty Pool itself).

// deleteAll is a placeholder for dynamically controlling whether pools are aggressively
// or partially cleaned up. If true, all pools are emptied every GC; if false only half of
// the poolLocals are dropped. For now it is a constant so that it can be optimized away
// at compile-time; ideally the runtime should decide whether to set deleteAll to true
// based on memory pressure.
const deleteAll = false

for e := allPools.front(); e != nil; e = e.nextElement() {
p := e.value
empty := true
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
if l.private != nil || len(l.shared) > 0 {
empty = false
}
if i%2 == int(cleanupCount%2) && !deleteAll {
continue
}
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
if empty || deleteAll {
p.local = nil
p.localSize = 0
allPools.remove(e)
}
}
allPools = []*Pool{}
cleanupCount++
}

var (
allPoolsMu Mutex
allPools []*Pool
allPools list
)

func init() {
Expand All @@ -256,3 +280,62 @@ func indexLocal(l unsafe.Pointer, i int) *poolLocal {
func runtime_registerPoolCleanup(cleanup func())
func runtime_procPin() int
func runtime_procUnpin()

// Stripped-down and specialized version of container/list (to avoid using interface{}
// casts, since they can allocate and allocation is forbidden in poolCleanup). Note that
// these functions are so small and simple that they all end up completely inlined.

type element struct {
next, prev *element
list *list
value *Pool
}

func (e *element) nextElement() *element {
if p := e.next; e.list != nil && p != &e.list.root {
return p
}
return nil
}

type list struct {
root element
}

func (l *list) front() *element {
if l.root.next == &l.root {
return nil
}
return l.root.next
}

func (l *list) lazyInit() {
if l.root.next == nil {
l.root.next = &l.root
l.root.prev = &l.root
}
}

func (l *list) insert(e, at *element) {
n := at.next
at.next = e
e.prev = at
e.next = n
n.prev = e
e.list = l
}

func (l *list) remove(e *element) {
if e.list == l {
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil
e.prev = nil
e.list = nil
}
}

func (l *list) pushBack(v *Pool) {
l.lazyInit()
l.insert(&element{value: v}, l.root.prev)
}
112 changes: 112 additions & 0 deletions src/sync/pool_test.go
Expand Up @@ -8,8 +8,11 @@
package sync_test

import (
"bytes"
"math/rand"
"runtime"
"runtime/debug"
"strconv"
. "sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -43,6 +46,7 @@ func TestPool(t *testing.T) {
p.Put("c")
debug.SetGCPercent(100) // to allow following GC to actually run
runtime.GC()
runtime.GC() // we now keep some objects until two consecutive GCs
if g := p.Get(); g != nil {
t.Fatalf("got %#v; want nil after GC", g)
}
Expand Down Expand Up @@ -120,6 +124,62 @@ loop:
}
}

func TestPoolPartialRelease(t *testing.T) {
if runtime.GOMAXPROCS(-1) <= 1 {
t.Skip("pool partial release test is only stable when GOMAXPROCS > 1")
}

// disable GC so we can control when it happens.
defer debug.SetGCPercent(debug.SetGCPercent(-1))

Ps := runtime.GOMAXPROCS(-1)
Gs := Ps * 10
Gobjs := 10000

var p Pool
var wg WaitGroup
start := int32(0)
for i := 0; i < Gs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt32(&start, 1)
for atomic.LoadInt32(&start) < int32(Ps) {
// spin until enough Gs are ready to go
}
for j := 0; j < Gobjs; j++ {
p.Put(new (string))
}
}()
}
wg.Wait()

waitGC()

inpool := 0
for p.Get() != nil {
inpool++
}
objs := Gs * Gobjs
min, max := objs/2 - objs/Ps, objs/2 + objs/Ps
if inpool < min || inpool > max {
// GC should have dropped half of the per-P shards; because we don't know the
// exact distribution of the objects in the shards when we started, and we don't
// know which shards have been cleared, we consider the test successful as long
// as after GC the number of remaining objects is half ± objs/Ps.
t.Fatalf("objects in pool %d, expected between %d and %d", inpool, min, max)
}
}

func waitGC() {
ch := make(chan struct{})
runtime.SetFinalizer(&[16]byte{}, func(_ interface{}) {
close(ch)
})
runtime.GC()
<-ch
}

func TestPoolStress(t *testing.T) {
const P = 10
N := int(1e6)
Expand Down Expand Up @@ -173,3 +233,55 @@ func BenchmarkPoolOverflow(b *testing.B) {
}
})
}

var bufSizes = []int{1 << 8, 1 << 12, 1 << 16, 1 << 20, 1 << 24}

func BenchmarkPoolBuffer(b *testing.B) {
for _, sz := range bufSizes {
sz := sz
b.Run(strconv.Itoa(sz), func(b *testing.B) {
var p Pool
var i int64
b.RunParallel(func(pb *testing.PB) {
rnd := rand.New(rand.NewSource(atomic.AddInt64(&i, 1)))
var j int
for pb.Next() {
buf, _ := p.Get().(*bytes.Buffer)
if buf == nil {
buf = &bytes.Buffer{}
}
buf.Grow(rnd.Intn(sz * 2))

go p.Put(buf)
j++
if j%256 == 0 {
runtime.Gosched()
}
}
})
})
}
}

func BenchmarkNoPoolBuffer(b *testing.B) {
for _, sz := range bufSizes {
sz := sz
b.Run(strconv.Itoa(sz), func(b *testing.B) {
var i int64
b.RunParallel(func(pb *testing.PB) {
rnd := rand.New(rand.NewSource(atomic.AddInt64(&i, 1)))
var j int
for pb.Next() {
buf := &bytes.Buffer{}
buf.Grow(rnd.Intn(sz * 2))

go runtime.KeepAlive(buf)
j++
if j%256 == 0 {
runtime.Gosched()
}
}
})
})
}
}

0 comments on commit 05f48ad

Please sign in to comment.