Skip to content

Commit

Permalink
Merge pull request #620 from eyakubovich/fix-ringbuf-stop
Browse files Browse the repository at this point in the history
Fix RingBuffer shutdown
  • Loading branch information
grantseltzer committed Mar 15, 2021
2 parents 1fd89c3 + 09b2b47 commit b995873
Showing 1 changed file with 46 additions and 12 deletions.
58 changes: 46 additions & 12 deletions libbpfgo/libbpfgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"syscall"
"unsafe"
)
Expand Down Expand Up @@ -231,8 +232,9 @@ type PerfBuffer struct {
type RingBuffer struct {
rb *C.struct_ring_buffer
bpfMap *BPFMap
stopped bool
closed bool
stop chan struct{}
closed bool
wg sync.WaitGroup
}

// BPF is using locked memory for BPF maps and various other things.
Expand Down Expand Up @@ -648,39 +650,71 @@ func (m *Module) InitRingBuf(mapName string, eventsChan chan []byte) (*RingBuffe
}

func (rb *RingBuffer) Start() {
rb.stop = make(chan struct{})
rb.wg.Add(1)
go rb.poll()
rb.stopped = false
}

func (rb *RingBuffer) Stop() {
rb.stopped = true
if rb.stop != nil {
// Tell the poll goroutine that it's time to exit
close(rb.stop)

// The event channel should be drained here since the consumer
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
eventChan := eventChannels[uintptr(rb.bpfMap.fd)]
go func() {
for _ = range eventChan {
}
}()

// Wait for the poll goroutine to exit
rb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(eventChan)

// This allows Stop() to be called multiple times safely
rb.stop = nil
}
}

func (rb *RingBuffer) Close() {
if rb.closed {
return
}
if !rb.stopped {
rb.Stop()
}
rb.Stop()
C.ring_buffer__free(rb.rb)
rb.closed = true
}

func (rb *RingBuffer) isStopped() bool {
select {
case _, _ = <-rb.stop:
return true
default:
return false
}
}

func (rb *RingBuffer) poll() error {
defer rb.wg.Done()

for {
err := C.ring_buffer__poll(rb.rb, 0)
err := C.ring_buffer__poll(rb.rb, 300)
if rb.isStopped() {
break
}

if err < 0 {
if syscall.Errno(-err) == syscall.EINTR {
continue
}
return fmt.Errorf("error polling ring buffer: %d", err)
}

if rb.stopped {
break
}
}
return nil
}
Expand Down

0 comments on commit b995873

Please sign in to comment.