New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Add loosely ordered channels? #16364

Open
YuriyNasretdinov opened this Issue Jul 13, 2016 · 9 comments

Comments

Projects
None yet
6 participants
@YuriyNasretdinov

YuriyNasretdinov commented Jul 13, 2016

What version of Go are you using (go version)?
go version go1.6 darwin/amd64

What operating system and processor architecture are you using (go env)?

GOARCH="amd64"
GOBIN=""
GOEXE=""
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOOS="darwin"
GOPATH="/Users/yuriy/gopath"
GORACE=""
GOROOT="/usr/local/go"
GOTOOLDIR="/usr/local/go/pkg/tool/darwin_amd64"
GO15VENDOREXPERIMENT="1"
CC="clang"
GOGCCFLAGS="-fPIC -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fno-common"
CXX="clang++"
CGO_ENABLED="1"

What did you do?

I ran a benchmark to see how much time it is needed to process N elements using multiple cores. So what benchmark below does is it runs "myvalue += 1" N times in each of 8 goroutines for both consumer and producer threads and checks the results.

Generally in some cases it would be great to use a single channel for distributing load (in this case, adding "1") among workers and actually get job done faster when you use more cores if the operations themselves do not take much time.

It is not achievable with current channels because they imply ordering constraints for events and sometimes you don't need that. So I suggest to consider (maybe?) adding loosely-ordered channels that would allow one to both reduce channel send and receive cost as well as allowing them to scale.

In this "+1" example the only solution that actually benefits from adding more cores is the sharded channel one.

package main

import (
    "sync"
    "sync/atomic"
    "testing"
)

const CORES = 8

type BigStruct struct {
    value               int64
    preventFalseSharing [1024]byte
}

var (
    mych         = make(chan int64, 10)
    mychs        = make([]chan int64, CORES)
    myvalues     = make([]BigStruct, CORES)
    myvalue      = int64(0)
    myvalueMutex sync.Mutex
)

func rcvChan(shard int) {
    myvalues[shard].value += <-mych
}

func rcvChanSharded(shard int) {
    myvalues[shard].value += <-mychs[shard]
}

func sendChan(shard int) {
    mych <- 1
}

func sendChanSharded(shard int) {
    mychs[shard] <- 1
}

func sendMutex(shard int) {
    myvalueMutex.Lock()
    myvalue++
    myvalueMutex.Unlock()
}

func sendAtomic(shard int) {
    atomic.AddInt64(&myvalue, 1)
}

func megaBench(b *testing.B, numproc int, sendfunc, rcvfunc func(int)) {
    myvalue = 0
    myvalues = make([]BigStruct, CORES)
    waitCh := make(chan bool)
    mych = make(chan int64, 10)

    for j := 0; j < numproc; j++ {
        mychs[j] = make(chan int64, 10)
    }

    for j := 0; j < numproc; j++ {
        go func(j int) {
            for i := 0; i < b.N; i++ {
                sendfunc(j)
            }
            waitCh <- true
        }(j)
        go func(j int) {
            for i := 0; i < b.N; i++ {
                rcvfunc(j)
            }
            waitCh <- true
        }(j)
    }

    for j := 0; j < numproc; j++ {
        <-waitCh
        <-waitCh
    }

    for j := 0; j < numproc; j++ {
        myvalue += myvalues[j].value
    }

    if myvalue != int64(b.N*numproc) {
        b.Errorf("Wrong number of iterations: got %d, expected %d", myvalue, b.N*numproc)
    }
}

func BenchmarkChan(b *testing.B)        { megaBench(b, CORES, rcvChan, sendChan) }
func BenchmarkChanSharded(b *testing.B) { megaBench(b, CORES, rcvChanSharded, sendChanSharded) }
func BenchmarkAtomic(b *testing.B)      { megaBench(b, CORES, func(shard int) {}, sendAtomic) }
func BenchmarkMutex(b *testing.B)       { megaBench(b, CORES, func(shard int) {}, sendMutex) }

What did you expect to see?

I would really like for channels to scale when more cores are used instead of them slowing down. I do not believe it is possible with current channel constraints so option to allow creation of loosely-ordered channels would be nice instead.

What did you see instead?

BenchmarkChan            2000000           840 ns/op
BenchmarkChan-2          1000000          1208 ns/op
BenchmarkChan-4          1000000          1684 ns/op
BenchmarkChan-8           500000          2592 ns/op
BenchmarkChanSharded     2000000           833 ns/op
BenchmarkChanSharded-2   3000000           445 ns/op
BenchmarkChanSharded-4   5000000           265 ns/op
BenchmarkChanSharded-8   5000000           237 ns/op
BenchmarkAtomic         20000000            80.8 ns/op
BenchmarkAtomic-2       10000000           184 ns/op
BenchmarkAtomic-4       10000000           215 ns/op
BenchmarkAtomic-8       10000000           182 ns/op
BenchmarkMutex           5000000           241 ns/op
BenchmarkMutex-2         2000000           814 ns/op
BenchmarkMutex-4         2000000          1021 ns/op
BenchmarkMutex-8         1000000          1059 ns/op

You can see that the only solution that scales (ns/op decreases when you add more cores) is sharded channel one. I have 4 physical cores and 8 logical ones so do not pay too much attention to results of 8 threads.

@bradfitz bradfitz added the Proposal label Jul 13, 2016

@bradfitz

This comment has been minimized.

Member

bradfitz commented Jul 13, 2016

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Jul 13, 2016

As far as I can see, the only difference between a loosely-ordered channel and an unbuffered channel would be that with a loosely-ordered channel it would be unpredictable whether the goroutine reading from the channel would be able to see memory writes done by the goroutine sending on the channel before the actual send. My apologies if I misunderstand.

First I would say that I think that would be very difficult to implement. Any use of channels implies locking. So (I think) you are suggesting that the channel implementation should be rewritten to use only relaxed memory reads and writes.

Second, my first reaction is that these would be very hard to use correctly. C++ has many different kinds of atomic operations, and they are extremely hard for non-experts to use correctly. We explicitly do not want to emulate that in Go.

@YuriyNasretdinov

This comment has been minimized.

YuriyNasretdinov commented Jul 13, 2016

Sorry, Ian, I am not really sure about why memory writes would not be seen by a reader side, but maybe you are right that it can be possible if there are no memory barriers when working with a loosely-ordered channel.

The simplest implementation of loosely-ordered channel is just a sharded one, basically. So you still need to take a mutex when trying to read or write to each shard in this case so all the memory guarantees are the same as for a mutex or a channel.

The only downside of sharded channel is that if distribution is not even enough then some shards would not have any entries while others could have too many. So if that happens you might just try to "steal" entries from other shards (e.g. take random shard and try to get entries from there). If you did not manage to find any entries for reasonable amount of tries (e.g. 3 tries) then you could force shards rebalancing (take a mutex per each shard and then shuffle elements around).

All of that will work well if you have a huge stream of events which, in my opinion, is not uncommon to try to process in go.

@YuriyNasretdinov

This comment has been minimized.

YuriyNasretdinov commented Jul 13, 2016

So, I forgot to mention why it is even a proposal for go runtime. Go does not have generics and does not have good means to block when there are no events to get. So it would be really ugly if implemented in go. I saw one (strange) attempt to do this here: http://zhen.org/blog/ring-buffer-variable-length-low-latency-disruptor-style/

@ianlancetaylor

This comment has been minimized.

Contributor

ianlancetaylor commented Jul 14, 2016

I'm sorry, I don't understand what you mean by a loosely-ordered channel. Can you explain more precisely?

In Go it is already possible for many goroutines to read from a single channel (and for many goroutines to write to a single channel) so I don't understand what a sharded channel would look like.

Are you suggesting that when a buffered channel has many readers, we implement several different buffers for the channel, and let each goroutine read from one buffer? Thus there would theoretically be less lock contention? How would we decide when to use multiple buffers?

@YuriyNasretdinov

This comment has been minimized.

YuriyNasretdinov commented Jul 14, 2016

By loosely-ordered channel I understand a channel that does not guarantee FIFO. What it means more specifically is that two writers could write "a" and "b" to a channel (in that order) and readers could get it as "b" and "a". It is also possible that "a" and "b" could be written at the same moment in time (e.g. in less than 1 cycle of a time difference from different cores) so there is no sensible way to even define what order of events here means. It is a weaker guarantee than a FIFO and allows to receive and send events with much higher throughput.

Are you suggesting that when a buffered channel has many readers, we implement several different buffers for the channel, and let each goroutine read from one buffer?

Yes.

Thus there would theoretically be less lock contention?

Yes, if implemented and used properly :). Basically any "proper usage" would mean that you need several (e.g. 4+) goroutines trying to rcv/send to a channel at the same time.

How would we decide when to use multiple buffers?

If you decide to use multiple buffers at runtime then it would break FIFO guarantee of a channel so it must be specified when doing make(...). Very limited suggestion for syntax would be make(chan something, buffer_size, shards_count). I do not know whether or not you can achieve higher throughput on a channel using lock-free data structures so I couldn't suggest to use it.

@YuriyNasretdinov

This comment has been minimized.

YuriyNasretdinov commented Jul 14, 2016

I would suggest looking at this problem from the following standpoint: there used to be an issue with garbage collector latency that was too high sometimes. You, as a program developer, could split your executable into several instances and shard data manually between instances if it was possible. Sometimes it is not as easy to shard data though so programs that really need huge heaps, a lot of connections, or both, had to find other ways around it to achieve reasonable GC latency.

My suggestion about adding opportunity to specify the fact that you do not care about order of events allows you to avoid doing custom sharding for channels when channel communication is very convenient but becomes a bottleneck. Sometimes distribution of events would be uneven so you need to have some kind of rebalancing sometimes. And the further you go with it the more obvious it becomes that it might be better to solve this problem once and share it with everyone :)

@dgryski

This comment has been minimized.

Contributor

dgryski commented Jul 14, 2016

Some of the discussion in #11506 might be relevant.

@quentinmit quentinmit added this to the Proposal milestone Jul 29, 2016

@rw

This comment has been minimized.

rw commented Aug 11, 2016

I like this idea and would take it further.

I suggest having a set of channel types (or annotations) that allow users to make tradeoffs when they know what they are doing.

For example, let's have different optimized channel implementations for SPSC, SPMC, and MPSC situations[*].

In general this would let us 'respect the developer' a bit more, instead of assuming a worst-case MPMC situation.

[*] For future searchers: SPSC means Single-producer/Single-consumer, SPMC means Single-producer/Multi-consumer, MPSC means Multi-producer/Single-consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment