Skip to content

Commit

Permalink
Merge pull request #649 from eyakubovich/fix-chan-map-race
Browse files Browse the repository at this point in the history
Fix eventsChannels race
  • Loading branch information
grantseltzer committed Apr 6, 2021
2 parents 5052cb8 + 23597a0 commit ff03f7b
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 37 deletions.
74 changes: 74 additions & 0 deletions libbpfgo/helpers/rwArray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package helpers

import (
"sync"
)

type slot struct {
value interface{}
used bool
}

// RWArray allows for multiple concurrent readers but
// only a single writer. The writers lock a mutex while the readers
// are lock free.
// It is implemented as an array of slots where each slot holds a
// value (of type interface{}) and a boolean marker to indicate if it's
// in use or not. The insertion (Put) performs a linear probe
// looking for an available slot as indicated by the in-use marker.
// While probing, it is not touching the value itself, as it's
// being read without a lock by the readers.
type RWArray struct {
slots []slot
mux sync.Mutex
}

func NewRWArray(capacity uint) RWArray {
return RWArray{
slots: make([]slot, capacity),
}
}

func (a *RWArray) Put(v interface{}) int {
a.mux.Lock()
defer a.mux.Unlock()

limit := len(a.slots)

for i := 0; i < limit; i++ {
if !a.slots[i].used {
a.slots[i].value = v
a.slots[i].used = true
return i
}
}

return -1
}

func (a *RWArray) Remove(index uint) {
a.mux.Lock()
defer a.mux.Unlock()

if int(index) >= len(a.slots) {
return
}

a.slots[index].value = nil
a.slots[index].used = false
}

func (a *RWArray) Get(index uint) interface{} {
if int(index) >= len(a.slots) {
return nil
}

// N.B. If slot[index].used == false, this is technically
// a race since Put() might be putting the value in there
// at the same time.
return a.slots[index].value
}

func (a *RWArray) Capacity() uint {
return uint(len(a.slots))
}
138 changes: 138 additions & 0 deletions libbpfgo/helpers/rwArray_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package helpers

import (
"sync"
"testing"
)

func TestRWArrayWrite(t *testing.T) {
a := NewRWArray(1024)

last := 0

for i := 0; i < 1000; i++ {
slot1 := a.Put(&i)
if slot1 < 0 {
t.Errorf("failed to put")
}

if last != slot1 {
t.Fatalf("Put didn't occupy first available; expected=%v, got=%v", last, slot1)
}

slot2 := a.Put(&i)
if slot2 < 0 {
t.Fatalf("failed to put")
}

if slot1 >= slot2 {
t.Fatalf("slot1 (%v) < slot2 (%v)", slot1, slot2)
}

a.Remove(uint(slot2))

last = slot2
}
}

func TestRWArrayExhaust(t *testing.T) {
a := NewRWArray(1024)

last := -1

for {
v := 123
slot := a.Put(&v)

if slot < 0 {
if uint(last) != a.Capacity()-1 {
t.Fatalf("failed to put, last=%v", last)
}
return
}

if slot != last+1 {
t.Fatalf("Put returned non-sequential slot; expected=%v, got=%v", last+1, slot)
}

last = slot
}
}

func TestRWArrayRead(t *testing.T) {
a := NewRWArray(1024)

for i := 0; i < 1000; i++ {
v := i
slot := a.Put(&v)
if slot != i {
t.Errorf("Put returned non-sequential slot; expected=%v, got=%v", i, slot)
}
}

for i := 0; i < 1000; i++ {
v := a.Get(uint(i)).(*int)
if *v != i {
t.Errorf("Get returned wrong valuue; expected=%v, got=%v", i, *v)
}
}
}

// Designed to be run under race detector
func TestRWArrayConcurrent(t *testing.T) {
a := NewRWArray(16 * 1024)
capacity := a.Capacity()

stop := make(chan struct{})
wg := sync.WaitGroup{}

// Populate every other slot
v := 123
for i := uint(0); i < capacity; i++ {
a.Put(&v)
}
for i := uint(1); i < capacity; i += 2 {
a.Remove(i)
}

writer := func() {
for {
// fill the holes
for i := uint(0); i < capacity/2; i++ {
a.Put(&v)
}

// make some holes
for i := uint(1); i < capacity; i += 2 {
a.Remove(i)
}

// time to exit?
select {
case _, _ = <-stop:
return
default:
}
}
}

reader := func() {
for rounds := 0; rounds < 10; rounds++ {
for i := uint(0); i < capacity; i += 2 {
a.Get(i)
}
}

wg.Done()
}

go writer()

wg.Add(3)
go reader()
go reader()
go reader()

wg.Wait()
close(stop)
}
12 changes: 7 additions & 5 deletions libbpfgo/libbpf_cb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import (

//export perfCallback
func perfCallback(ctx unsafe.Pointer, cpu C.int, data unsafe.Pointer, size C.int) {
eventChannels[uintptr(ctx)] <- C.GoBytes(data, size)
pb := eventChannels.Get(uint(uintptr(ctx))).(*PerfBuffer)
pb.eventsChan <- C.GoBytes(data, size)
}

//export perfLostCallback
func perfLostCallback(ctx unsafe.Pointer, cpu C.int, cnt C.ulonglong) {
lostChan := lostChannels[uintptr(ctx)]
if lostChan != nil {
lostChan <- uint64(cnt)
pb := eventChannels.Get(uint(uintptr(ctx))).(*PerfBuffer)
if pb.lostChan != nil {
pb.lostChan <- uint64(cnt)
}
}

//export ringbufferCallback
func ringbufferCallback(ctx unsafe.Pointer, data unsafe.Pointer, size C.int) C.int {
eventChannels[uintptr(ctx)] <- C.GoBytes(data, size)
ch := eventChannels.Get(uint(uintptr(ctx))).(chan []byte)
ch <- C.GoBytes(data, size)
return C.int(0)
}

0 comments on commit ff03f7b

Please sign in to comment.