Skip to content

Commit

Permalink
sync: scalable Pool
Browse files Browse the repository at this point in the history
Introduce fixed-size P-local caches.
When local caches overflow/underflow a batch of items
is transferred to/from global mutex-protected cache.

benchmark                    old ns/op    new ns/op    delta
BenchmarkPool                    50554        22423  -55.65%
BenchmarkPool-4                 400359         5904  -98.53%
BenchmarkPool-16                403311         1598  -99.60%
BenchmarkPool-32                367310         1526  -99.58%

BenchmarkPoolOverlflow            5214         3633  -30.32%
BenchmarkPoolOverlflow-4         42663         9539  -77.64%
BenchmarkPoolOverlflow-8         46919        11385  -75.73%
BenchmarkPoolOverlflow-16        39454        13048  -66.93%

BenchmarkSprintfEmpty                    84           63  -25.68%
BenchmarkSprintfEmpty-2                 371           32  -91.13%
BenchmarkSprintfEmpty-4                 465           22  -95.25%
BenchmarkSprintfEmpty-8                 565           12  -97.77%
BenchmarkSprintfEmpty-16                498            5  -98.87%
BenchmarkSprintfEmpty-32                492            4  -99.04%

BenchmarkSprintfString                  259          229  -11.58%
BenchmarkSprintfString-2                574          144  -74.91%
BenchmarkSprintfString-4                651           77  -88.05%
BenchmarkSprintfString-8                868           47  -94.48%
BenchmarkSprintfString-16               825           33  -95.96%
BenchmarkSprintfString-32               825           30  -96.28%

BenchmarkSprintfInt                     213          188  -11.74%
BenchmarkSprintfInt-2                   448          138  -69.20%
BenchmarkSprintfInt-4                   624           52  -91.63%
BenchmarkSprintfInt-8                   691           31  -95.43%
BenchmarkSprintfInt-16                  724           18  -97.46%
BenchmarkSprintfInt-32                  718           16  -97.70%

BenchmarkSprintfIntInt                  311          282   -9.32%
BenchmarkSprintfIntInt-2                333          145  -56.46%
BenchmarkSprintfIntInt-4                642          110  -82.87%
BenchmarkSprintfIntInt-8                832           42  -94.90%
BenchmarkSprintfIntInt-16               817           24  -97.00%
BenchmarkSprintfIntInt-32               805           22  -97.17%

BenchmarkSprintfPrefixedInt             309          269  -12.94%
BenchmarkSprintfPrefixedInt-2           245          168  -31.43%
BenchmarkSprintfPrefixedInt-4           598           99  -83.36%
BenchmarkSprintfPrefixedInt-8           770           67  -91.23%
BenchmarkSprintfPrefixedInt-16          829           54  -93.49%
BenchmarkSprintfPrefixedInt-32          824           50  -93.83%

BenchmarkSprintfFloat                   418          398   -4.78%
BenchmarkSprintfFloat-2                 295          203  -31.19%
BenchmarkSprintfFloat-4                 585          128  -78.12%
BenchmarkSprintfFloat-8                 873           60  -93.13%
BenchmarkSprintfFloat-16                884           33  -96.24%
BenchmarkSprintfFloat-32                881           29  -96.62%

BenchmarkManyArgs                      1097         1069   -2.55%
BenchmarkManyArgs-2                     705          567  -19.57%
BenchmarkManyArgs-4                     792          319  -59.72%
BenchmarkManyArgs-8                     963          172  -82.14%
BenchmarkManyArgs-16                   1115          103  -90.76%
BenchmarkManyArgs-32                   1133           90  -92.03%

LGTM=rsc
R=golang-codereviews, bradfitz, minux.ma, gobot, rsc
CC=golang-codereviews
https://golang.org/cl/46010043
  • Loading branch information
dvyukov committed Jan 24, 2014
1 parent 9fa9613 commit f8e0057
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/pkg/go/build/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var pkgDeps = map[string][]string{
"errors": {},
"io": {"errors", "sync"},
"runtime": {"unsafe"},
"sync": {"sync/atomic", "unsafe"},
"sync": {"runtime", "sync/atomic", "unsafe"},
"sync/atomic": {"unsafe"},
"unsafe": {},

Expand Down
10 changes: 7 additions & 3 deletions src/pkg/runtime/mgc0.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,19 @@ clearpools(void)
{
void **pool, **next;
P *p, **pp;
uintptr off;
int32 i;

// clear sync.Pool's
for(pool = pools.head; pool != nil; pool = next) {
next = pool[0];
pool[0] = nil; // next
pool[1] = nil; // slice
pool[2] = nil;
pool[3] = nil;
pool[1] = nil; // local
pool[2] = nil; // localSize
off = (uintptr)pool[3] / sizeof(void*);
pool[off+0] = nil; // global slice
pool[off+1] = nil;
pool[off+2] = nil;
}
pools.head = nil;

Expand Down
20 changes: 20 additions & 0 deletions src/pkg/runtime/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3046,3 +3046,23 @@ haveexperiment(int8 *name)
}
return 0;
}

// func runtime_procPin() int
void
sync·runtime_procPin(intgo p)
{
M *mp;

mp = m;
// Disable preemption.
mp->locks++;
p = mp->p->id;
FLUSH(&p);
}

// func runtime_procUnpin()
void
sync·runtime_procUnpin(void)
{
m->locks--;
}
165 changes: 150 additions & 15 deletions src/pkg/sync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

package sync

import (
"runtime"
"sync/atomic"
"unsafe"
)

const (
cacheLineSize = 128
poolLocalSize = 2 * cacheLineSize
poolLocalCap = poolLocalSize/unsafe.Sizeof(*(*interface{})(nil)) - 1
)

// A Pool is a set of temporary objects that may be individually saved
// and retrieved.
//
Expand All @@ -26,29 +38,52 @@ package sync
//
// This is an experimental type and might not be released.
type Pool struct {
next *Pool // for use by runtime. must be first.
list []interface{} // offset known to runtime
mu Mutex // guards list
// The following fields are known to runtime.
next *Pool // for use by runtime
local *poolLocal // local fixed-size per-P pool, actually an array
localSize uintptr // size of the local array
globalOffset uintptr // offset of global
// The rest is not known to runtime.

// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}

pad [cacheLineSize]byte
// Read-mostly date above this point, mutable data follows.
mu Mutex
global []interface{} // global fallback pool
}

func runtime_registerPool(*Pool)
// Local per-P Pool appendix.
type poolLocal struct {
tail int
unused int
buf [poolLocalCap]interface{}
}

func init() {
var v poolLocal
if unsafe.Sizeof(v) != poolLocalSize {
panic("sync: incorrect pool size")
}
}

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
p.mu.Lock()
if p.list == nil {
runtime_registerPool(p)
l := p.pin()
t := l.tail
if t < int(poolLocalCap) {
l.buf[t] = x
l.tail = t + 1
runtime_procUnpin()
return
}
p.list = append(p.list, x)
p.mu.Unlock()
p.putSlow(l, x)
}

// Get selects an arbitrary item from the Pool, removes it from the
Expand All @@ -60,16 +95,116 @@ func (p *Pool) Put(x interface{}) {
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
l := p.pin()
t := l.tail
if t > 0 {
t -= 1
x := l.buf[t]
l.tail = t
runtime_procUnpin()
return x
}
return p.getSlow()
}

func (p *Pool) putSlow(l *poolLocal, x interface{}) {
// Grab half of items from local pool and put to global pool.
// Can not lock the mutex while pinned.
const N = int(poolLocalCap/2 + 1)
var buf [N]interface{}
buf[0] = x
for i := 1; i < N; i++ {
l.tail--
buf[i] = l.buf[l.tail]
}
runtime_procUnpin()

p.mu.Lock()
var x interface{}
if n := len(p.list); n > 0 {
x = p.list[n-1]
p.list[n-1] = nil // Just to be safe
p.list = p.list[:n-1]
p.global = append(p.global, buf[:]...)
p.mu.Unlock()
}

func (p *Pool) getSlow() (x interface{}) {
// Grab a batch of items from global pool and put to local pool.
// Can not lock the mutex while pinned.
runtime_procUnpin()
p.mu.Lock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
l = indexLocal(l, pid)
// Get the item to return.
last := len(p.global) - 1
if last >= 0 {
x = p.global[last]
p.global = p.global[:last]
}
// Try to refill local pool, we may have been rescheduled to another P.
if last > 0 && l.tail == 0 {
n := int(poolLocalCap / 2)
gl := len(p.global)
if n > gl {
n = gl
}
copy(l.buf[:], p.global[gl-n:])
p.global = p.global[:gl-n]
l.tail = n
}
}
runtime_procUnpin()
p.mu.Unlock()

if x == nil && p.New != nil {
x = p.New()
}
return x
return
}

// pin pins current goroutine to P, disables preemption and returns poolLocal pool for the P.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC can not happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid)
}
return p.pinSlow()
}

func (p *Pool) pinSlow() *poolLocal {
// Retry under the mutex.
runtime_procUnpin()
p.mu.Lock()
defer p.mu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
if p.local == nil {
p.globalOffset = unsafe.Offsetof(p.global)
runtime_registerPool(p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.local)), unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}

func indexLocal(l *poolLocal, i int) *poolLocal {
return (*poolLocal)(unsafe.Pointer(uintptr(unsafe.Pointer(l)) + unsafe.Sizeof(*l)*uintptr(i))) // uh...
}

// Implemented in runtime.
func runtime_registerPool(*Pool)
func runtime_procPin() int
func runtime_procUnpin()
46 changes: 29 additions & 17 deletions src/pkg/sync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync/atomic"
"testing"
"time"
"unsafe"
)

func TestPool(t *testing.T) {
Expand Down Expand Up @@ -125,28 +124,41 @@ func TestPoolStress(t *testing.T) {
}

func BenchmarkPool(b *testing.B) {
procs := runtime.GOMAXPROCS(-1)
var dec func() bool
if unsafe.Sizeof(b.N) == 8 {
n := int64(b.N)
dec = func() bool {
return atomic.AddInt64(&n, -1) >= 0
}
} else {
n := int32(b.N)
dec = func() bool {
return atomic.AddInt32(&n, -1) >= 0
}
var p Pool
var wg WaitGroup
n0 := uintptr(b.N)
n := n0
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
for b := 0; b < 100; b++ {
p.Put(1)
p.Get()
}
}
}()
}
wg.Wait()
}

func BenchmarkPoolOverlflow(b *testing.B) {
var p Pool
var wg WaitGroup
for i := 0; i < procs; i++ {
n0 := uintptr(b.N)
n := n0
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for dec() {
p.Put(1)
p.Get()
for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
for b := 0; b < 100; b++ {
p.Put(1)
}
for b := 0; b < 100; b++ {
p.Get()
}
}
}()
}
Expand Down

0 comments on commit f8e0057

Please sign in to comment.