Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AddRingBuf() using ring_buffer__add #430

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions buf-ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type RingBuffer struct {
rb *C.struct_ring_buffer
bpfMap *BPFMap
slot uint
slots []uint
stop chan struct{}
closed bool
wg sync.WaitGroup
Expand Down Expand Up @@ -50,20 +50,25 @@ func (rb *RingBuffer) Stop() {
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
eventChan := eventChannels.get(rb.slot).(chan []byte)
go func() {
// revive:disable:empty-block
for range eventChan {
}
// revive:enable:empty-block
}()
for _, slot := range rb.slots {
eventChan := eventChannels.get(slot).(chan []byte)
go func() {
// revive:disable:empty-block
for range eventChan {
}
// revive:enable:empty-block
}()
}

// Wait for the poll goroutine to exit
rb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(eventChan)
for _, slot := range rb.slots {
eventChan := eventChannels.get(slot).(chan []byte)
close(eventChan)
}

// Reset pb.stop to allow multiple safe calls to Stop()
rb.stop = nil
Expand All @@ -76,7 +81,9 @@ func (rb *RingBuffer) Close() {

rb.Stop()
C.ring_buffer__free(rb.rb)
eventChannels.remove(rb.slot)
for _, slot := range rb.slots {
eventChannels.remove(slot)
}
rb.closed = true
}

Expand Down
14 changes: 14 additions & 0 deletions libbpfgo.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx)
return rb;
}

int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx)
{
int ret = ring_buffer__add(rb, map_fd, ringbufferCallback, (void *) ctx);
if (ret != 0) {
int saved_errno = errno;
fprintf(stderr, "Failed to add ring buffer: %s\n", strerror(errno));
errno = saved_errno;

return ret;
}

return ret;
}

struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx)
{
struct perf_buffer_opts pb_opts = {};
Expand Down
1 change: 1 addition & 0 deletions libbpfgo.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
void cgo_libbpf_set_print_fn();

struct ring_buffer *cgo_init_ring_buf(int map_fd, uintptr_t ctx);
int cgo_add_ring_buf(struct ring_buffer *rb, int map_fd, uintptr_t ctx);
struct perf_buffer *cgo_init_perf_buf(int map_fd, int page_cnt, uintptr_t ctx);

void cgo_bpf_map__initial_value(struct bpf_map *map, void *value);
Expand Down
27 changes: 26 additions & 1 deletion module.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,18 +343,43 @@ func (m *Module) InitRingBuf(mapName string, eventsChan chan []byte) (*RingBuffe

rbC, errno := C.cgo_init_ring_buf(C.int(bpfMap.FileDescriptor()), C.uintptr_t(slot))
if rbC == nil {
eventChannels.remove(uint(slot))
return nil, fmt.Errorf("failed to initialize ring buffer: %w", errno)
}

ringBuf := &RingBuffer{
rb: rbC,
bpfMap: bpfMap,
slot: uint(slot),
slots: []uint{uint(slot)},
}
m.ringBufs = append(m.ringBufs, ringBuf)
return ringBuf, nil
}

func (m *Module) AddRingBuf(ringBuf *RingBuffer, mapName string, eventsChan chan []byte) (bool, error) {
bpfMap, err := m.GetMap(mapName)
if err != nil {
return false, err
}

if eventsChan == nil {
return false, fmt.Errorf("events channel can not be nil")
}

slot := eventChannels.put(eventsChan)
if slot == -1 {
return false, fmt.Errorf("max ring buffers reached")
}
ringBuf.slots = append(ringBuf.slots, uint(slot))

ret, errno := C.cgo_add_ring_buf(ringBuf.rb, C.int(bpfMap.FileDescriptor()), C.uintptr_t(slot))
if ret != 0 {
eventChannels.remove(uint(slot))
return false, fmt.Errorf("failed to add ring buffer: %w", errno)
}
return true, nil
}

func (m *Module) InitPerfBuf(mapName string, eventsChan chan []byte, lostChan chan uint64, pageCnt int) (*PerfBuffer, error) {
bpfMap, err := m.GetMap(mapName)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions selftest/ringbuffers/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");
} events1 SEC(".maps"), events2 SEC(".maps");

long ringbuffer_flags = 0;

Expand All @@ -18,13 +18,22 @@ int kprobe__sys_mmap(struct pt_regs *ctx)
int *process;

// Reserve space on the ringbuffer for the sample
process = bpf_ringbuf_reserve(&events, sizeof(int), ringbuffer_flags);
process = bpf_ringbuf_reserve(&events1, sizeof(int), ringbuffer_flags);
if (!process) {
return 0;
}

*process = 2021;

bpf_ringbuf_submit(process, ringbuffer_flags);

process = bpf_ringbuf_reserve(&events2, sizeof(int), ringbuffer_flags);
if (!process) {
return 0;
}

*process = 2024;

bpf_ringbuf_submit(process, ringbuffer_flags);
return 0;
}
46 changes: 33 additions & 13 deletions selftest/ringbuffers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func resizeMap(module *bpf.Module, name string, size uint32) error {
m, err := module.GetMap("events")
m, err := module.GetMap(name)
if err != nil {
return err
}
Expand All @@ -39,7 +39,7 @@ func main() {
}
defer bpfModule.Close()

if err = resizeMap(bpfModule, "events", 8192); err != nil {
if err = resizeMap(bpfModule, "events1", 8192); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}
Expand All @@ -58,32 +58,52 @@ func main() {
os.Exit(-1)
}

eventsChannel := make(chan []byte)
rb, err := bpfModule.InitRingBuf("events", eventsChannel)
eventsChannel1 := make(chan []byte)
rb, err := bpfModule.InitRingBuf("events1", eventsChannel1)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}

eventsChannel2 := make(chan []byte)
ret, err := bpfModule.AddRingBuf(rb, "events2", eventsChannel2)
if !ret {
fmt.Fprintln(os.Stderr, err)
os.Exit(-1)
}

rb.Poll(300)

numberOfEventsReceived := 0
numberOfEvent1Received := 0
numberOfEvent2Received := 0
go func() {
for {
syscall.Mmap(999, 999, 999, 1, 1)
time.Sleep(time.Second / 2)
}
}()

recvLoop:
for {
b := <-eventsChannel
if binary.LittleEndian.Uint32(b) != 2021 {
fmt.Fprintf(os.Stderr, "invalid data retrieved\n")
os.Exit(-1)
}
numberOfEventsReceived++
if numberOfEventsReceived > 5 {
break recvLoop
select {
case b := <-eventsChannel1:
if binary.LittleEndian.Uint32(b) != 2021 {
fmt.Fprintf(os.Stderr, "invalid data retrieved\n")
os.Exit(-1)
}
numberOfEvent1Received++
if numberOfEvent1Received > 5 && numberOfEvent2Received > 5 {
break recvLoop
}
case b := <-eventsChannel2:
if binary.LittleEndian.Uint32(b) != 2024 {
fmt.Fprintf(os.Stderr, "invalid data retrieved\n")
os.Exit(-1)
}
numberOfEvent2Received++
if numberOfEvent1Received > 5 && numberOfEvent2Received > 5 {
break recvLoop
}
}
}

Expand Down
Loading