-
Notifications
You must be signed in to change notification settings - Fork 18.6k
Description
I've gone over this so many times, thinking I must've missed something, but I do believe I have found a case where the race detector returns a false positive (ie data race where there really isn't a data race). It seems to be something that happens when writing to a channel in a select-case statement directly.
The unit tests trigger the race detector, even though I'm ensuring all calls accessing the channel have been made using a callback and a waitgroup.
I have the channels in a map, which I access through a mutex. The data race vanishes the moment I explicitly remove the type that holds the channel from this map. The only way I am able to do is because the mutex is released, so once again: I'm certain everything behaves correctly. Code below
What version of Go are you using (go version)?
$ go version go version go1.14.2 linux/amd64
Does this issue reproduce with the latest release?
Yes
What operating system and processor architecture are you using (go env)?
go env Output
$ go env GO111MODULE="" GOARCH="amd64" GOBIN="" GOCACHE="/home/XXXX/.cache/go-build" GOENV="/home/XXXX/.config/go/env" GOEXE="" GOFLAGS="" GOHOSTARCH="amd64" GOHOSTOS="linux" GOINSECURE="" GONOPROXY="" GONOSUMDB="" GOOS="linux" GOPATH="/home/XXXX/go" GOPRIVATE="" GOPROXY="direct" GOROOT="/usr/lib/golang" GOSUMDB="off" GOTMPDIR="" GOTOOLDIR="/usr/lib/golang/pkg/tool/linux_amd64" GCCGO="gccgo" AR="ar" CC="gcc" CXX="g++" CGO_ENABLED="1" GOMOD="/home/XXXX/projects/datarace/go.mod" CGO_CFLAGS="-g -O2" CGO_CPPFLAGS="" CGO_CXXFLAGS="-g -O2" CGO_FFLAGS="-g -O2" CGO_LDFLAGS="-g -O2" PKG_CONFIG="pkg-config" GOGCCFLAGS="-fPIC -m64 -pthread -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build729449265=/tmp/go-build -gno-record-gcc-switches"
What did you do?
I'm writing a simple message event bus where the broker pushes data onto a channel of its subscribers/consumers. If the channel buffer is full, I don't want the broker to block, so I'm using routines, and a select statement to skip writes to a channel with a full buffer. To make life easier WRT testing, I'm mocking a subscriber interface, and I'm exposing the channels through functions (similar to context.Context.Done() and the like).
My tests all pass, and everything behaves as expected. However, running the same tests with the race detector, I'm getting what I believe to be a false positive. I have a test where I send data to a subscriber that isn't consuming the messages. The channel buffer is full, and I want to ensure that the broker doesn't block. To make sure I've tried to send all data, I'm using a waitgroup to check if the subscriber has indeed been accessed N number of times (where N is the number of events I'm sending). Once the waitgroup is done, I validate what data is on the channel, make sure it's empty, and then close it. The statement where I close the channel is marked as a data race.
If I do the exact same thing, but remove the subscriber from the broker, the data race magically is no more. Here's the code to reproduce the issue:
broker.go
package broker
import (
"context"
"log"
"sync"
)
//go:generate go run github.com/golang/mock/mockgen -destination mocks/sub_mock.go -package mocks my.pkg/race/broker Sub
type Sub interface {
C() chan<- interface{}
Done() <-chan struct{}
}
type Broker struct {
mu sync.Mutex
ctx context.Context
subs map[int]Sub
keys []int
}
func New(ctx context.Context) *Broker {
return &Broker{
ctx: ctx,
subs: map[int]Sub{},
keys: []int{},
}
}
func (b *Broker) Send(v interface{}) {
b.mu.Lock()
go func() {
rm := make([]int, 0, len(b.subs))
defer func() {
if len(rm) > 0 {
b.unsub(rm...)
}
b.mu.Unlock()
}()
for k, s := range b.subs {
select {
case <-b.ctx.Done():
return
case <-s.Done():
rm = append(rm, k)
case s.C() <- v:
continue
default:
log.Printf("Skipped sub %d", k)
}
}
}()
}
func (b *Broker) Subscribe(s Sub) int {
b.mu.Lock()
k := b.key()
b.subs[k] = s
b.mu.Unlock()
return k
}
func (b *Broker) Unsubscribe(k int) {
b.mu.Lock()
b.unsub(k)
b.mu.Unlock()
}
func (b *Broker) key() int {
if len(b.keys) > 0 {
k := b.keys[0]
b.keys = b.keys[1:]
return k
}
return len(b.subs) + 1
}
func (b *Broker) unsub(keys ...int) {
for _, k := range keys {
if _, ok := b.subs[k]; !ok {
return
}
delete(b.subs, k)
b.keys = append(b.keys, k)
}
}broker_test.go
package broker_test
import (
"context"
"sync"
"testing"
"my.pkg/race/broker"
"my.pkg/race/broker/mocks"
"github.com/golang/mock/gomock"
"github.com/tj/assert"
)
type tstBroker struct {
*broker.Broker
cfunc context.CancelFunc
ctx context.Context
ctrl *gomock.Controller
}
func getBroker(t *testing.T) *tstBroker {
ctx, cfunc := context.WithCancel(context.Background())
ctrl := gomock.NewController(t)
return &tstBroker{
Broker: broker.New(ctx),
cfunc: cfunc,
ctx: ctx,
ctrl: ctrl,
}
}
func TestRace(t *testing.T) {
broker := getBroker(t)
defer broker.Finish()
sub := mocks.NewMockSub(broker.ctrl)
cCh, dCh := make(chan interface{}, 1), make(chan struct{})
vals := []interface{}{1, 2, 3}
wg := sync.WaitGroup{}
wg.Add(len(vals))
sub.EXPECT().Done().Times(len(vals)).Return(dCh)
sub.EXPECT().C().Times(len(vals)).Return(cCh).Do(func() {
wg.Done()
})
k := broker.Subscribe(sub)
assert.NotZero(t, k)
for _, v := range vals {
broker.Send(v)
}
wg.Wait()
// I've tried to send all 3 values, channels should be safe to close now
close(dCh)
// channel had buffer of 1, so first value should be present
assert.Equal(t, vals[0], <-cCh)
// other values should be skipped due to default select
assert.Equal(t, 0, len(cCh))
close(cCh)
}
func TestNoRace(t *testing.T) {
broker := getBroker(t)
defer broker.Finish()
sub := mocks.NewMockSub(broker.ctrl)
cCh, dCh := make(chan interface{}, 1), make(chan struct{})
vals := []interface{}{1, 2, 3}
wg := sync.WaitGroup{}
wg.Add(len(vals))
sub.EXPECT().Done().Times(len(vals)).Return(dCh)
sub.EXPECT().C().Times(len(vals)).Return(cCh).Do(func() {
wg.Done()
})
k := broker.Subscribe(sub)
assert.NotZero(t, k)
for _, v := range vals {
broker.Send(v)
}
wg.Wait()
// I've tried to send all 3 values, channels should be safe to close now
close(dCh)
// channel had buffer of 1, so first value should be present
assert.Equal(t, vals[0], <-cCh)
// other values should be skipped due to default select
assert.Equal(t, 0, len(cCh))
// add this line, and data race magically vanishes
broker.Unsubscribe(k)
close(cCh)
}
func (b *tstBroker) Finish() {
b.cfunc()
b.ctrl.Finish()
}See the data race by running: go test -v -race ./broker/... -run TestRace
What did you expect to see?
I expect to see log output showing that the subscriber was skipped twice (output I do indeed see), and no data race
What did you see instead?
I still saw the code behaved as expected, but I do see a data race reported:
go test -v -race ./broker/... -run TestRace
=== RUN TestRace
2020/05/13 16:24:06 Skipped sub 1
2020/05/13 16:24:06 Skipped sub 1
==================
WARNING: DATA RACE
Write at 0x00c00011a4f0 by goroutine 7:
runtime.closechan()
/usr/lib/golang/src/runtime/chan.go:335 +0x0
my.pkg/race/broker_test.TestRace()
/home/XXXX/projects/race/broker/stuff_test.go:56 +0x7c8
testing.tRunner()
/usr/lib/golang/src/testing/testing.go:991 +0x1eb
Previous read at 0x00c00011a4f0 by goroutine 10:
runtime.chansend()
/usr/lib/golang/src/runtime/chan.go:142 +0x0
my.pkg/race/broker.(*Broker).Send.func1()
/home/XXXX/projects/race/broker/stuff.go:46 +0x369
Goroutine 7 (running) created at:
testing.(*T).Run()
/usr/lib/golang/src/testing/testing.go:1042 +0x660
testing.runTests.func1()
/usr/lib/golang/src/testing/testing.go:1284 +0xa6
testing.tRunner()
/usr/lib/golang/src/testing/testing.go:991 +0x1eb
testing.runTests()
/usr/lib/golang/src/testing/testing.go:1282 +0x527
testing.(*M).Run()
/usr/lib/golang/src/testing/testing.go:1199 +0x2ff
main.main()
_testmain.go:48 +0x223
Goroutine 10 (finished) created at:
my.pkg/race/broker.(*Broker).Send()
/home/XXXX/projects/race/broker/stuff.go:32 +0x70
my.pkg/race/broker_test.TestRace()
/home/XXXX/projects/race/broker/stuff_test.go:47 +0x664
testing.tRunner()
/usr/lib/golang/src/testing/testing.go:991 +0x1eb
==================
TestRace: testing.go:906: race detected during execution of test
--- FAIL: TestRace (0.00s)
: testing.go:906: race detected during execution of test
FAIL
FAIL my.pkg/race/broker 0.009s
? my.pkg/race/broker/mocks [no test files]
Though I'm not certain, my guess is that the expression s.C() <- v, because it's a case expression, is what trips the race detector up here. The channel buffer is full, so any writes would be blocking if I'd put the channel write in the default case. As it stands, the write cannot possibly be executed, so instead my code logs the fact that a subscriber is being skipped, the routine ends (defer func unlocks the mutex), and the mock callback decrements the waitgroup. Once the waitgroup is empty, all calls to my mock subscriber have been made, and the channel can be safely closed.
It seems, however, that I need to add the additional call, removing the mock from the broker to "reset" the race detector state. I'll try and have a look at the source, maybe something jumps out.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status