New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sync: support for sharded values #18802

Open
aclements opened this Issue Jan 26, 2017 · 55 comments

Comments

Projects
None yet
@aclements
Member

aclements commented Jan 26, 2017

Per-CPU sharded values are a useful and common way to reduce contention on shared write-mostly values. However, this technique is currently difficult or impossible to use in Go (though there have been attempts, such as @jonhoo's https://github.com/jonhoo/drwmutex and @bcmills' https://go-review.googlesource.com/#/c/35676/).

We propose providing an API for creating and working with sharded values. Sharding would be encapsulated in a type, say sync.Sharded, that would have Get() interface{}, Put(interface{}), and Do(func(interface{})) methods. Get and Put would always have to be paired to make Do possible. (This is actually the same API that was proposed in #8281 (comment) and rejected, but perhaps we have a better understanding of the issues now.) This idea came out of off-and-on discussions between at least @rsc, @hyangah, @RLH, @bcmills, @Sajmani, and myself.

This is a counter-proposal to various proposals to expose the current thread/P ID as a way to implement sharded values (#8281, #18590). These have been turned down as exposing low-level implementation details, tying Go to an API that may be inappropriate or difficult to support in the future, being difficult to use correctly (since the ID may change at any time), being difficult to specify, and as being broadly susceptible to abuse.

There are several dimensions to the design of such an API.

Get and Put can be blocking or non-blocking:

  • With non-blocking Get and Put, sync.Sharded behaves like a collection. Get returns immediately with the current shard's value or nil if the shard is empty. Put stores a value for the current shard if the shard's slot is empty (which may be different from where Get was called, but would often be the same). If the shard's slot is not empty, Put could either put to some overflow list (in which case the state is potentially unbounded), or run some user-provided combiner (which would bound the state).

  • With blocking Get and Put, sync.Sharded behaves more like a lock. Get returns and locks the current shard's value, blocking further Gets from that shard. Put sets the shard's value and unlocks it. In this case, Put has to know which shard the value came from, so Get can either return a put function (though that would require allocating a closure) or some opaque value that must be passed to Put that internally identifies the shard.

  • It would also be possible to combine these behaviors by using an overflow list with a bounded size. Specifying 0 would yield lock-like behavior, while specifying a larger value would give some slack where Get and Put remain non-blocking without allowing the state to become completely unbounded.

Do could be consistent or inconsistent:

  • If it's consistent, then it passes the callback a snapshot at a single instant. I can think of two ways to do this: block until all outstanding values are Put and also block further Gets until the Do can complete; or use the "current" value of each shard even if it's checked out. The latter requires that shard values be immutable, but it makes Do non-blocking.

  • If it's inconsistent, then it can wait on each shard independently. This is faster and doesn't affect Get and Put, but the caller can only get a rough idea of the combined value. This is fine for uses like approximate statistics counters.

It may be that we can't make this decision at the API level and have to provide both forms of Do.

I think this is a good base API, but I can think of a few reasonable extensions:

  • Provide Peek and CompareAndSwap. If a user of the API can be written in terms of these, then Do would always be able to get an immediate consistent snapshot.

  • Provide a Value operation that uses the user-provided combiner (if we go down that API route) to get the combined value of the sync.Sharded.

@aclements aclements added the Proposal label Jan 26, 2017

@aclements

This comment has been minimized.

Member

aclements commented Jan 26, 2017

My own inclination is towards the non-blocking API with a bounded overflow list. A blocking API seems antithetical to the goal of reducing contention and may lead to performance anomalies if a goroutine or OS thread is descheduled while it has a shard checked out and a non-blocking API with a required combiner may prevent certain use cases (e.g., large structures, or uses that never read the whole sharded value.) It also devolves to the blocking API if the bound is 0.

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Jan 26, 2017

The proposal as written is rather abstract. I think it would help to examine the specific use cases that people have for such a thing.

For example, it's clear that one use case is collecting metrics. Presumably the idea is that you have some sort of server, and it wants to log various metrics for each request. The metrics only need to be accumulated when they are reported, and reporting happens much less often than collection. Using a lock;update;unlock sequence will lead to lock contention. But (let's say) we need the metrics to be accurate. So the idea of sharding for this case is a lock;update;unlock sequence with a sharded lock, and an accumulate step that does lock;collect;zero;unlock for each sharded metric. That gives us the values we need while minimizing lock contention.

One way to implement this use case is for the sync.Sharded to require a combiner method as you describe. Conceptually, then:

func (s *Sharded) Get() interface{} {
    s.LockCurrentShard()
    r := s.CurrentShardValue()
    s.SetCurrentShardValue(nil)
    s.UnlockCurrentShard()
    return r
}

func (s *Sharded) Put(v interface{}) {
    s.LockCurrentShard()
    defer s.UnlockCurrentShard()
    c := s.CurrentShardValue()
    if c == nil {
        s.SetCurrentShardValue(v)
    } else {
        m := s.Combine(c, v) // Combine function defined by user.
        s.SetCurrentShardValue(m)
    }
}

For typical metrics the Do method does not to be consistent. However, it's not hard to have a consistent Do as long as the function passed to Do does not use the sync.Sharded value itself.

With this outline, we see that there is no need for sync.Sharded to maintain an overflow list. Any case that wants to use an overflow list will do so in the Combine function. Obviously the Combine function must not use the sync.Sharded value, as that may lead to deadlock, but otherwise it can do whatever it likes.

What other uses are there for sync.Sharded, and what sorts of implementation do they suggest?

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 26, 2017

I had been considering a somewhat narrower API containing only Get and one of {Range, Do, ForEach}, bounding the number of distinct values to the number of threads in the program. The calling code would provide a func() interface{} at construction time to use when Get is called on a thread without an existing value.

The semantics would be similar to the non-blocking proposal: Get returns the current shard's value (but does not guarantee exclusiveness), and Range iterates over all existing values.

Because of the lack of exclusiveness, application code would still have to use atomic and/or sync to manipulate the values, but if the value is uncontended and usually owned by the same core's cache, the overhead of that application-side synchronization would be relatively small (compared to locking overhead for non-sharded values).

That approach two a few advantages over the alternatives in the current proposal.

  1. There is no "overflow list" to manage. The number of values is strictly bounded to the number of threads, and the value for a given thread cannot accidentally migrate away or be dropped.
  2. Application code using atomic values (as for the stats-counter use case in #8281) would not have to deal with lock-ordering (as it would with a blocking Get and Put).
  3. There is no possibility of deadlock (or overallocation of values) due to a missing Put in application code. This is perhaps less significant if we can make trivial defer usage less expensive (#14939) and/or add a vet check (along the lines of lostcancel), but it seems simpler to avoid the problem in the first place.

It has one disadvantage that I'm aware of:

  1. Application code must include its own synchronization code.

Are there other tradeoffs for or against the narrower Get/Range API?

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 26, 2017

[Using the "current" value of each shard even if it's checked out] requires that shard values be immutable, but it makes Do non-blocking.

It doesn't even require immutability: "externally synchronized" and/or "atomic" would suffice, although "externally synchronized" carries the risk of lock-ordering issues.

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 26, 2017

One way to implement [consistent counting] is for the sync.Sharded to require a combiner method as you describe.

Anything that reduces values seems tricky to get right: you'd have to ensure that Do iterates in an order such that Combine cannot move a value that Do has already iterated over into one that it has yet to encounter (and vice-versa), otherwise you risk under- or double-counting that value.

I don't immediately see how to provide that property for Do in the general case without reintroducing a cross-thread contention point in Put, but it may be possible.

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Jan 26, 2017

For a consistent Do, first lock all the shards, then run the function on each value, then unlock all the shards. For an inconsistent Do, it doesn't matter.

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 26, 2017

For a consistent Do, first lock all the shards, then run the function on each value, then unlock all the shards.

That essentially makes Do a stop-the-world operation: it not only blocks all of those threads until Do completes, but also invalidates the cache lines containing the per-shard locks in each of the local CPU caches.

Ideally, Do should produce much less interference in the steady state: it should only acquire/invalidate locks that are not in the fast path of Get and Put. If the values are read using atomic, that doesn't need to invalidate any cache lines at all: the core processing Do might need to wait to receive an up-to-date value, but since there is no write to the cross-core data the existing cached value doesn't need to be invalidated.

I guess that means I'm in favor of an inconsistent Do, provided that we don't discover a very compelling use-case for making it consistent.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 26, 2017

For some usages there should be strict knowledge of bounding number of allocated "values", ie number of allocated values should not change. And preferrably, values should be allocated at predictable time, for example, at container (Sharded) creation. For that kind of usage, interface with Put is unuseful.

Probably, it should be separate container:

//NewFixSharded preallocates all values by calling alloc function, and returns new FixSharded.
//FixSharded never changes its size, ie never allocates new value after construction.
NewFixShareded(alloc func() interface) *FixSharded {}
//NewFixShardedN preallocates exactly n values by calling alloc function, and returns new FixSharded.
NewFixSharededN(n int, alloc func() interface) *FixSharded {}
func (a *FixSharded) Get() interface{} {}

If size never changes, there is no need in Do or ForEach or locks.
Application code must include its own synchronization code.

Rational: GOMAXPROCS changes rarely (almost never), so dynamic allocation excessive.

I could be mistaken about GOMAXPROCS constantness.

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Jan 26, 2017

@bcmills Well, as I said earlier, I think we need to look at specific use cases. For the specific use case I was discussing, I assert that the cost of a consistent Do is irrelevant, because it is run very infrequently.

What specific use case do you have in mind?

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 26, 2017

@ianlancetaylor I'm specifically thinking about counting (as in #8281) and CPU-local caching (e.g. buffering unconditional stores to a shared map, a potential optimization avenue for #18177).

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 26, 2017

I'm thinking about stat-collectors and high-performance RPC.

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Jan 26, 2017

@bcmills For counting, it seems to me you would use an inconsistent Do. If you need to avoid inconsistency while still using an inconsistent Do, have the combiner store the additional counts elsewhere and not modify the previously stored value. Presumably the combiner is only called in rare cases, so the speed of that rare case is not too important. You could even mitigate that cost by stacking sync.Sharded values.

I don't actually see how to write a consistent Do that does not disturb the fast path of Get and Put at all.

One approach for buffering stores to a shared map would be a Do that removes all the values, replacing them with nil. Come to think it, that would work for counters also. But it does interfere with the fast path.

@funny-falcon Can you expand on what you mean by "high-performance RPC"? I don't see why you need a global distributed value for RPC.

@balasanjay

This comment has been minimized.

Contributor

balasanjay commented Jan 27, 2017

Perhaps stating the obvious, but one slightly tricky thing is when to GC stale per-thread values when GOMAXPROCs is decreased.

For some use-cases (e.g. distributed mutexes), they will presumably have a reference keeping the stale values alive.

For others (e.g. counters), you'd need to keep around the value until its been accumulated.

Also, in the pony category: if I want a distributed int64 counter, they would have sufficient padding to avoid false-sharing, but if I allocate multiple such counters, they could be instantiated within the padding, so to speak. I think this could maybe be built in user-space on top of a more low-level API, but if its possible for the API to provide it directly, that'd be great.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 27, 2017

@ianlancetaylor I maintain connector to in-memory transactional database capable to serve more than 1M requests per second.

To be able to send that rate of requests, and to be able to scale smoothly with CPU cores, I have to shard internal data structures of connector. (And I need to build custom hash table, and build custom timers. But sharding is a base of improvement). Without sharding there is too many lock contention.

If there will be shard-to-cpu alignment (even if it will be not strict), it will help further reduce lock contention and improve CPU-cache utilization.

As I understood, most of users doesn't change GOMAXPROCS on the fly, so I'd prefer fixed number of preallocated shards, cause then I can easily map responses from server back to shard.

I still think, simple low-level "ProcHint" api (as proposed in #18590 (comment) ) will be sufficient. But if want for api to look "higher level", then I'd be satisfied with FixSharded.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 27, 2017

Link to improved ProcHint api proposal: #18590 (comment)

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 27, 2017

Excuse me for a bit offtopic:
How often GOMAXPROCS changed at runtime in production workloads? What patterns of this change exists?

@rsc

This comment has been minimized.

Contributor

rsc commented Jan 27, 2017

Programs might change GOMAXPROCS in response to getting more or less of a machine as co-tenancy changes.

@Sajmani

This comment has been minimized.

Contributor

Sajmani commented Jan 27, 2017

I'll document some concrete use case examples:

  1. Scalar counters incremented in Go server request handlers
func serveRequest(...) {
  requestCounter.Add(1)
}
func serveCounter(w responseWriter, ...) {
  w.Print(requestCounter.Count())
}

I believe with @aclements 's API we would implement this as:

func (c *counter) Add(n int) {
  if v, ok := c.shards.Get().(int); ok {
    v += n
    c.shards.Put(v)
  } else {
    c.shards.Put(n)
  }
}
func (c *counter) Count() (v int) {
  c.shards.Do(func(shard interface{}) {
    v += shard.(int)
  })
  return v
}
  1. Non-scalar (map) metrics in Go server request handlers
func serveRequest(...) {
  user := userFromRequest(req)
  userRequestCounter.Add(1, user)
}
func serveCounter(w responseWriter, ...) {
  w.Print(userRequestCounter.Map())
}

I believe with @aclements 's API we would implement this as:

func (c *mapCounter) Add(n int, key string) {
  if m, ok := c.shards.Get().(map[string]int); ok {
    m[key] += n
    c.shards.Put(m)
  } else {
    c.shards.Put(map[string]int{key: n})
  }
}
func (c *mapCounter) Map() map[string]int {
  m := make(map[string]int)
  c.shards.Do(func(shard interface{}) {
    for key, count := range shard.(map[string]int) {
      m[key] += count
    }
  })
  return m
}

@aclements does that all look right?

@bradfitz

This comment has been minimized.

Member

bradfitz commented Jan 27, 2017

we would implement this as

  if v, ok := c.shards.Get().(int); ok {
    v += n
    c.shards.Put(v)
  } else {
    c.shards.Put(n)
  }

... allocating an integer for every increment? (ints into interfaces cause an allocation)

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 27, 2017

And my experience with sync.Pool is that calling Put with the type-asserted value tends to introduce an extra allocation too (for the interface{} value).

So we'd probably actually want to write it as:

p := c.shards.Get()
if p == nil {
  p = new(int)
}
*(p.(*int)) += n
c.shards.Put(p)

(Or else we'll want to fix the extra allocation for the Put call through some compiler optimization.)

@jonhoo

This comment has been minimized.

jonhoo commented Jan 27, 2017

I wonder if this could also be used to build a better multi-core RWLock similar to my drwmutex. From the proposal thus far, it sound like it might be tricky to implement something like "as a writer, take all locks, and disallow new locks to be added while you hold those locks".

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 27, 2017

@jonhoo

it sounds like it might be tricky to implement something like "as a writer, take all locks, and disallow new locks to be added while you hold those locks".

Tricky but possible, I think. You can add a sync.Mutex to be acquired in the exclusive path and when adding new locks, but you have to be careful about lock-ordering.

The harder part is that if you want to satisfy the existing sync.RWMutex API you have to handle the possibility that the RUnlock call occurs on a different thread from the RLock. One thing you could do is keep a locked bool on each of the sharded values and add a slow-path fallback for the case where your thread's read-lock isn't the one you locked.

A sketch with the blocking version of Get and Put:

type readerLock struct {
  locked bool
  mu sync.Mutex
}

func (m *RWMutex) RLock() {
  i := m.readers.Get()
  l, _ := i.(*readerLock)
  if l != nil && !l.locked {
    l.Lock()
    return
  }
  if l.locked {
    m.readers.Put(i)  // Put this one back and allocate a new one.
  }

  l = &readerLock{locked: true}
  l.Lock()
  m.add.Lock()
  m.readers.Put(l)
  m.add.Unlock()
}

func (m *RWMutex) RUnlock() {
  i := m.readers.Get()
  l, _ := i.(*readerLock)
  if l != nil && l.locked {
    l.Unlock()
    return
  }
  unlocked := false
  m.readers.Do(func(i interface{}) {
    if unlocked {
      return
    }
    l := i.(*readerLock)
    if l.locked {
      l.Unlock()
      unlocked = true
    }
  })
}
@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 28, 2017

Technique used by @valyala to improve timers at https://go-review.googlesource.com/#/c/34784/ exactly shows why runtime.ProcHint() (or Pid()) is useful for high performance multi-core programming.

I agree with @valyala that ProcMaxHint most likely doesn't need. It is enough to have fixed size number of "shards".

So, even if you decided to stick with sync.Sharded, then, please, add also sync.FixSharded with preallocated "values".

@bcmills

This comment has been minimized.

Member

bcmills commented Jan 28, 2017

That approach looks like it would be just as easy to implement (and perhaps with less cross-core contention) using Sharded.

Note that there's nothing stopping a user of the proposed Sharded API from using a fixed set of values with it. FixSharded is redundant.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jan 28, 2017

@bcmills, well, yes: if allocator function returns pointers to preallocated values, then it looks like FixSharded. You are right.

@robpike

This comment has been minimized.

Contributor

robpike commented Jan 30, 2017

Please don't call this sharding, which is either an obscure English term or a term of art for distributed computing. Neither fits. It's a per-CPU thing, so call it something like perCPU or percpu.

@Sajmani

This comment has been minimized.

Contributor

Sajmani commented Jan 30, 2017

@rhysh

This comment has been minimized.

Contributor

rhysh commented Oct 5, 2017

I ran into a similar problem recently, and also solved it by abusing sync.Pool to estimate the current P's id to reduce contention. My use-case is a bit different from the ones listed so far, but my approach to address it has been similar.

We have some high-volume RPC servers that need to emit some information about each request as UDP packets. It's not practical to create a new UDP socket for each request, and sharing a single socket for the whole process results in a lot of CPU cycles spent in internal/poll.(*fdMutex).rwlock. Writing to a single shared []byte buffer doesn't work either because there'll be a lot of time spent in sync.(*Mutex).Lock (which got much worse after upgrading to Go 1.9). Using locking rather than channels means there's no way for a would-be user to time out and give up if they've been waiting too long. For my use-case, writing directly to a UDP connection is best so we don't have to worry about losing data if the process exits quickly or crashes.

For my use case:

  1. Creating a new resource takes time (could require a DNS lookup if we're not careful), and can result in an error
  2. The resource needs to be cleaned up if the "pool" size changes
  3. Using the resource can result in an error
  4. Would-be users may want to give up if they'd need to wait too long
  5. Users would prefer access to any resource if it can happen quickly, rather than waiting for the resource "assigned" to this P. Locality certainly improves performance, but forcing locality leads to high variance in the performance when there are many concurrent users.

My workaround is to create 2*GOMAXPROCS UDP connections, and to call an API like this:

func(ctx context.Context, fn func(n int) error) error

The provided closure is called once (unless the context expires), and while it runs has exclusive access to the resource at offset n. (Or the resource could be passed directly via interface{}.)

My workaround doesn't implement my entire wishlist (it doesn't steal resources from other Ps, it doesn't deal with variable GOMAXPROCS), but it did make my package completely disappear from the execution tracer's synchronous blocking profile, and it doesn't suffer from sync.Pool's tendency to create a zillion resources in response to a burst in demand.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Oct 5, 2017

@rhysh How did you solve cleaning sync.Pool on GC?

@bcmills

This comment has been minimized.

Member

bcmills commented Oct 5, 2017

@funny-falcon You can work around it by (ab)using finalizers, as illustrated in https://golang.org/cl/35676.

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Oct 5, 2017

@rhysh What do you think of #17520?

@rhysh

This comment has been minimized.

Contributor

rhysh commented Oct 5, 2017

@funny-falcon The sync.Pool entries are structs that include an index and a channel. The channel is used as a semaphore for handing off the exclusive right to use the resource, and the index describes which resource to use. The sync.Pool gets cleared at every GC, but we can easily reconstruct new equivalent values to put in the pool; an atomically-accessed counter will give us a new index, mod the (fixed) number of resources we have. This means there's a bit of garbage, but it doesn't require finalizers.

@ianlancetaylor Faster UDP writes would help my application a lot. Is it reasonable to expect a single UDP connection to support the needs of an whole server, both as far as Go and the OS are concerned?

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Oct 5, 2017

@rhysh For Go, each access to the UDP connection would require an atomic compare-and-swap, so sharing a single UDP connection with many goroutines will tend to share that memory location to many cores. I would think that would be fairly inconsequential compared to the cost of actually writing to the socket, but I don't actually know.

For the OS, I have no idea.

@cespare

This comment has been minimized.

Contributor

cespare commented Nov 14, 2017

It would be great to reach consensus about what problem is most important to solve so that this proposal can be unstuck. Maybe adding our experience will help.

At our company we have the exact issue that @Sajmani describes in #18802 (comment): we need servers running on many-cpu systems to be able to increment counters with low overhead.

In the meantime, I'm perusing the buffet of hacky/unsafe workarounds.

@Sajmani

This comment has been minimized.

Contributor

Sajmani commented Nov 15, 2017

@bcmills

This comment has been minimized.

Member

bcmills commented Dec 1, 2017

Another use-case for per-thread / sharded values relates to #20387.

Libraries may need to use their own rand.Source to avoid fragile assumptions about the seeding (or lack thereof) of the default Source, but individual rand.Source instances require locking, which could introduce a lot of contention in rand-heavy code.

@cespare

This comment has been minimized.

Contributor

cespare commented Mar 8, 2018

@Sajmani Here's a demo: https://github.com/cespare/misc/blob/master/fastcount/fastcount.go.

It's not really all that interesting -- it basically demonstrates what we all know about contention on multi-core CPUs. But maybe it will give us something concrete to discuss.

As one example from our internal code, we have a program doing performance-critical CPU-bound work. It uses atomic counters for recording details about certain operations. The program currently runs on 36-core machines and we may move it to larger hardware soon.

With this program, we need to be careful what we count, because adding counters into the fast path is surprisingly slow (hundreds of ns). In the past, adding counters in the critical path has caused unacceptable throughput regressions.

My demo program shows this sort of simple counter implemented in three different ways:

  1. with a sync.Mutex
  2. as an int64 modified via atomic.AddInt64 and friends
  3. as a per-CPU sharded set of int64s where we use the RDTSCP instruction from assembly to get the current core ID

Given a flavor of counter, it simply spins up as many goroutines as it has available CPUs and tries to count as fast as possible. If we restrict it to a single CPU, they're all pretty cheap:

$ numactl -C 0 ./fastcount -type mutex
num workers: 1
57466000 incs/sec; 57466000 incs/sec/worker; avg latency 17ns
57712000 incs/sec; 57712000 incs/sec/worker; avg latency 17ns
57410000 incs/sec; 57410000 incs/sec/worker; avg latency 17ns
^C
$ numactl -C 0 ./fastcount -type atomic
num workers: 1
135590000 incs/sec; 135590000 incs/sec/worker; avg latency 7ns
136088000 incs/sec; 136088000 incs/sec/worker; avg latency 7ns
135568000 incs/sec; 135568000 incs/sec/worker; avg latency 7ns
^C
$ numactl -C 0 ./fastcount -type rdtscp
num workers: 1
45226000 incs/sec; 45226000 incs/sec/worker; avg latency 22ns
45424000 incs/sec; 45424000 incs/sec/worker; avg latency 22ns
45178000 incs/sec; 45178000 incs/sec/worker; avg latency 22ns
^C

If we run on all 72 cores on my test server, mutexes and atomics scale terribly:

$ ./fastcount -type mutex
num workers: 72
10575990 incs/sec; 146889 incs/sec/worker; avg latency 6.807µs
10628817 incs/sec; 147622 incs/sec/worker; avg latency 6.774µs
10651535 incs/sec; 147938 incs/sec/worker; avg latency 6.759µs
^C
$ ./fastcount -type atomic
num workers: 72
46358625 incs/sec; 643870 incs/sec/worker; avg latency 1.553µs
46468854 incs/sec; 645401 incs/sec/worker; avg latency 1.549µs
46242229 incs/sec; 642253 incs/sec/worker; avg latency 1.557µs
^C
$ ./fastcount -type rdtscp
num workers: 72
2936379161 incs/sec; 40783044 incs/sec/worker; avg latency 24ns
2998082182 incs/sec; 41640030 incs/sec/worker; avg latency 24ns
2971579024 incs/sec; 41271931 incs/sec/worker; avg latency 24ns
^C

The mutex increment was 398x slower and the atomic increment was 222x slower.

Here's a chart which compares the latency of these three counter types when restricted to various numbers of CPUs. Note the log10 scale; Google sheets didn't want to draw a bar for the value of 7 in that first group for some reason.

screen shot 2018-03-08 at 1 26 02 am

Also, note that my RDTSCP-based implementation has quite a bit of overhead which a runtime-integrated solution doesn't have. RDTSCP is an expensive instruction and the assembly function cannot be inlined. However, the runtime can quickly look up the current P's ID. I expect that a sharded counter with the runtime's help like that should have an increment latency of <10ns on my hardware.

@dvyukov

This comment has been minimized.

Member

dvyukov commented Mar 16, 2018

/sub

There was also this proposal with counters, pools and scalable mutexes:
https://codereview.appspot.com/4850045/

@balasanjay

This comment has been minimized.

Contributor

balasanjay commented Jun 10, 2018

I spent some time thinking about this issue this week, and I eventually concluded that @bcmills's initial comment is spot on.

That API addresses all of the use-cases mentioned here, it should be relatively straightforward to implement, and the implementation should be very fast (I counted 4 atomic loads in the fast path once its warm).

I wrote up a more long-form proposal and will send it out on Monday (I left it on a work machine that appears to have dropped off the network).

balasanjay added a commit to balasanjay/proposal that referenced this issue Jun 12, 2018

@gopherbot

This comment has been minimized.

gopherbot commented Jun 12, 2018

Change https://golang.org/cl/118135 mentions this issue: design: add 18802-percpu-sharded.md

@balasanjay

This comment has been minimized.

Contributor

balasanjay commented Jun 12, 2018

Alright, sent a concrete proposal doc to the proposal repo. Happy to move it elsewhere, if people prefer.

gopherbot pushed a commit to golang/proposal that referenced this issue Jun 12, 2018

design: add 18802-percpu-sharded.md
For golang/go#18802.

Change-Id: I10678de235500dc9a9a217e4cf114731c8bd1162
GitHub-Last-Rev: 4cec2db
GitHub-Pull-Request: golang/proposal#13
Reviewed-on: https://go-review.googlesource.com/118135
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
@balasanjay

This comment has been minimized.

Contributor

balasanjay commented Jun 13, 2018

@philhofer

This comment has been minimized.

Contributor

philhofer commented Jun 13, 2018

So, Paul McKenney's "Is Parallel Programming Hard, And, If So, What Can You Do About It?" has an entire chapter called "counting," and it outlines five or six different use cases for concurrent counters and a very different implementation for each use case. And that's just counters. So when @Sajmani says "I just want to count fast," it could mean one of a number of things in practicality.

I don't think there is a design that covers a wide number of use cases while simultaneously satisfying the Go team's desire to hide footguns from Go users. For instance, if you had access to a goroutine "id" and the ability to turn preemption on or off, you'd probably have enough to implement RCU lists and trees, concurrent counters of various flavors, and more. But I have a lot of trouble imagining this implementation of Go ever exposing those features to users, even in spite of the fact that folks like @cespare have to resort to home-cooked solutions that are equally dangerous.

@balasanjay 's design is basically McKenney's "array-based statistical counters" (the relaxed variety). The "order independent accumulators" section really wants a faster channel implementation (which I remember @dvyukov implementing and then deciding not to merge), and the "RWLock" section really wants RCU.

I guess what I'm saying is that I think the design should just be atomic.Counter, and then we should come up with separate solutions for the "RWLock" and "order independent accumulator" cases.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jun 13, 2018

@philhofer, no. I need 'sharded' for RPC client: there are N reusable byte-buffers to serialize requests to, and I need to contend as little as possible on them. Without 'sharded' I need:

  • extra fast channel,
  • extra fast allocator,
  • extra fast garbage collector.

Now I use "pure man sharding", ie just atomic counter into array of shards. But real "sharded" will be much better, because it will lead to better byte-buffer utilization beside lesser contention.

@balasanjay

This comment has been minimized.

Contributor

balasanjay commented Jun 13, 2018

@philhofer Thanks for the reference! I have a few thoughts, here.

  1. I read through the chapter on counters (not in a huge amount of detail, admittedly), and one thing that jumped out at me is that every efficient solution presented relied on some form of per-thread storage. In other words, all of these solutions are built on top of a primitive capable of associating a value with each thread. This seems indicative to me that the right solution is something in that space.

  2. I don't believe the design I wrote up maps to "array-based statistical counters". It might store an array internally, but that array is fundamentally an array to pointers returned by the client of the API. In other words, if the client of the API chose to return pointers into a fixed buffer, then yes, we would have "array-based statistical counters". But if instead they just returned a new instance of a struct with padding, then it would effectively be the "Per-Thread-Variable-Based Implementation" in section 5.2.4.

  3. Can you point to something specific in the counters section that you believe cannot be implemented on top of some sort of per-cpu API? (Or anywhere in the document really, I'd be most interested in seeing use-cases not covered by the proposal to see if they could be reasonably accommodated without compromising other use-cases). As far as I can tell, any of these can be built on top of a per-CPU sharded API.

  4. Per your previous note that there are multiple possible ways to optimize counters; given that, how could we standardize a single atomic.Counter? It seems like we have to standardize a primitive that can be used to build it, otherwise you'd be right that we'd be focusing on one case too much.

  5. I don't think a faster channel implementation would solve the "order independent" part of that problem. No channel implementation could drop exact ordering if a single goroutine enqueued two items back-to-back. And yet, if the goroutine migrated between CPUs between the two statements, then a sharded design might indeed buffer them separately. And to go the other way, I believe it makes it difficult (impossible?) to implement a channel which minimize contention to the degree that a sharded primitive would. Perhaps @dvyukov can correct me here.

  6. I also don't understand the relationship between an RWLock and RCU. RCU is good for read-frequently, updated-rarely data. To solve a write-frequently case (like an RWLock), using RCU would not work very well. In the very best case, you'd get just as much contention on the RCU pointer as you would on the atomic counter in the current implementation.

@philhofer

This comment has been minimized.

Contributor

philhofer commented Jun 13, 2018

@balasanjay Thanks for the well-considered response. I'll address each point independently:

  1. Yes; the two most important primitives, beyond the usual battery of atomics, are per-execution-context storage and the ability to manage preemption. If my memory serves, "goroutine-local storage" has been suggested and rejected a number of times over the last few years, and I'd imagine "P-local storage" would face similar scrutiny.

  2. You're right. What I meant to point out is that the design, as it stands, has the same guarantees (with respect to counter consistency) as the statistical array-based counters.

  3. Some of the limit counter cases don't fit that well to the design you outlined. The signal-theft limit counter design in particular requires access to a lot more low-level firepower. IIRC some of the algorithms use a ragged barrier to avoid actually issuing memory barriers, although nowadays you can achieve the same effect on Linux with membarrier(2).

  4. I'm suggesting that API more as a stopgap measure to solve the most common use-case in this thread. I think we agree that the problem space is very large. In principle, if we agreed on one particular read consistency model, then the entire API could be

func (c *Counter) Add(v int64)
func (c *Counter) Value() int64

I'll be the first to acknowledge that this doesn't cover a wide variety of use-cases, but it is also a small and "obvious" API that would be easy to support, since we could implement it poorly today without any per-thread trickery at all.

Separately, there is a performance argument for this API: we can make Value() consistent without explicit read-side locking by issuing a single memory barrier after the store in Add().

And, finally: there's no unsightly boxing or unboxing. I have a strong distaste for Go APIs that abuse interface{}, although I realize that sometimes there is no way around it.

  1. Yes, the consistency guarantees of channels limit the degree to which their performance can be improved. With that being said, I could see it being very confusing to have a language feature that's been effectively deprecated for half of its use-cases by another stdlib feature. If there's some room for improvements in channel performance, why not try for that before replacing them? They're in the language to solve exactly the sorts of problem you describe.

Separately, I think if your application is actually bottlenecked by sending structures between threads, then you have suboptimal work partitioning. This particular class of problems can typically be avoided with careful architecture. One of the reasons the channel perf work wasn't committed was because there were too few examples of real production code that was sufficiently hurt by poor channel performance. If you have counterexamples, I'm sure people would be interested to see them.

  1. RWLocks are not for data that is written frequently: why would you bother with the extra overhead of allowing concurrent reads if writes dominate reads? If you need to frequently and consistently write the same data to one place, then nothing will save you from contention.
@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jun 13, 2018

@philhofer Your api example for counters are easily (and efficiently) implemented on top of sharded value without "boxing/unboxing" on every operation:

type Counter struct {
  sh Sharded
}
func (c *Counter) Add(v uint64) {
   p := c.sh.GetOrCreate(func() interface{}{ return new(uint64) }).(*uint64)
   atomic.AddUint64(p, v)
}
func (c *Counter) Value() uint64 {
   sum := uint64(0)
   c.sh.Do(func(pi interface{}){ sum += *(pi.(*uint64)) }))
}

Atomic operations are really fast if no cache-line movements involved. Since Go's allocator is TCMalloc derived, it has per-thread allocation cache, therefore, doubtfully they will share same cache line.
And pointers are packed into interface without boxing.

So, Sharded solves "multithreaded counting" very well.

@funny-falcon

This comment has been minimized.

Contributor

funny-falcon commented Jun 13, 2018

Separately, I think if your application is actually bottlenecked by sending structures between threads, then you have suboptimal work partitioning. This particular class of problems can typically be avoided with careful architecture.

There are use-cases where some part of work could be partitioned, but other not. Example is RPC client: you may partition serialization and some of bookkeeping management, but you may not partition socket writting. And it is not an option to have many sockets, because it will hurt server performance.

@balasanjay

This comment has been minimized.

Contributor

balasanjay commented Jun 14, 2018

  1. So, goroutine-local storage and P-local storage are very very different things to me. With the former, code can assume that Get(); Get(); will return the same value twice. This obviously invites abuse; people would've used it for context-like use-cases, which would've crippled the language's concurrency in a viral fashion. That is to say, if an API presents a blocking interface, but is internally implemented by using a vast web of goroutines, that is completely opaque to the caller, where goroutine-local storage would've completely broken this important property.

    P-local storage does not make this guarantee (and indeed, my implementation sketch goes out of its way to break it on purpose in tests and with the race detector enabled), so it cannot cause the same abuse (or folks could try, but it would fail in tests, and be flaky in prod).

  2. (only here to fix markdown's auto numbering)

  3. Yes, I did sort of ignore the sections mentioning signals. Maybe I'm just out of the loop, but signals strike me as far too unreliable to use for these kinds of use-cases. Non-portable, questionable performance when available, often weird semantics (e.g. when composing together two systems).

  4. To be clear, are you suggesting that this Counter would be internally sharded (e.g. use something like the Sharded implementation)?

    If so, then it seems too specific. @Sajmani gave an example above of counters with keys; these are very common in servers (e.g. a counter that counts RPC requests often has a key of method name, so that each method has its own separate counter). This does not address that very very adjacent use-case. To say nothing of things that range outward even further (max-value-trackers, histograms, etc).

    If not, then I don't see the difference between using the atomic package directly.

    Also, no argument from regarding the unpleasantness of interface{}, but as @funny-falcon suggested, the boxing is mitigated if you use a pointer. This is the same as atomic.Value or sync.Pool in that respect.

  5. I am certainly not suggesting that this (or more precisely, something built on this) would deprecate channels, any more than sync.Map deprecated maps. Its a tool when users run into a specific problem; it'll be uglier to use, and probably take (much) more memory. I suspect these will result in the ecosystem largely not noticing (except for the users who really are running into these problems).

    Also, for the record, I think channels not a very good analogy here; you wouldn't use a channel to implement buffering. Channels make sense if there's a goroutine on the "other side" pulling data out, but if you want to batch/buffer data, then the plan is to push data in (e.g. by calling Flush()). These are two very different data flow models; if I'm batching work to perform a RPC instead of a per-element RPC, then it doesn't follow that I would use a channel for this.

  6. Yes, RWMutex is for data that is read frequently, and written rarely. But when you acquire a read-lock by calling (*RWMutex).RLock, it increments a shared counter internally: https://golang.org/src/sync/rwmutex.go#L48. So, a read-mostly use case has been turned into a write-mostly use-case; this is the problem of contention caused by an RWMutex.

    RCU is an excellent option in many cases, but not in all of them. For instance, if the object in question is large, and the "copy" would be prohibitively expensive to memory consumption; or if you provide an API for single mutations, and you don't want client code performing N mutations to be O(N^2).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment