Skip to content

Commit

Permalink
chore: split logic into multiple files
Browse files Browse the repository at this point in the history
This only moves code around, no functional changes.
  • Loading branch information
geyslan committed Oct 26, 2023
1 parent 6f549ef commit c192943
Show file tree
Hide file tree
Showing 12 changed files with 1,707 additions and 1,497 deletions.
16 changes: 16 additions & 0 deletions buf-common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package libbpfgo

/*
#cgo LDFLAGS: -lelf -lz
#include "libbpfgo.h"
*/
import "C"

const (
// Maximum number of channels (RingBuffers + PerfBuffers) supported
maxEventChannels = 512
)

var (
eventChannels = newRWArray(maxEventChannels)
)
107 changes: 107 additions & 0 deletions buf-perf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package libbpfgo

/*
#cgo LDFLAGS: -lelf -lz
#include "libbpfgo.h"
*/
import "C"

import (
"fmt"
"sync"
"syscall"
)

//
// PerfBuffer
//

type PerfBuffer struct {
pb *C.struct_perf_buffer
bpfMap *BPFMap
slot uint
eventsChan chan []byte
lostChan chan uint64
stop chan struct{}
closed bool
wg sync.WaitGroup
}

// Poll will wait until timeout in milliseconds to gather
// data from the perf buffer.
func (pb *PerfBuffer) Poll(timeout int) {
pb.stop = make(chan struct{})
pb.wg.Add(1)
go pb.poll(timeout)
}

// Deprecated: use PerfBuffer.Poll() instead.
func (pb *PerfBuffer) Start() {
pb.Poll(300)
}

func (pb *PerfBuffer) Stop() {
if pb.stop != nil {
// Tell the poll goroutine that it's time to exit
close(pb.stop)

// The event and lost channels should be drained here since the consumer
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
go func() {
// revive:disable:empty-block
for range pb.eventsChan {
}

if pb.lostChan != nil {
for range pb.lostChan {
}
}
// revive:enable:empty-block
}()

// Wait for the poll goroutine to exit
pb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(pb.eventsChan)
if pb.lostChan != nil {
close(pb.lostChan)
}

// This allows Stop() to be called multiple times safely
pb.stop = nil
}
}

func (pb *PerfBuffer) Close() {
if pb.closed {
return
}
pb.Stop()
C.perf_buffer__free(pb.pb)
eventChannels.remove(pb.slot)
pb.closed = true
}

// todo: consider writing the perf polling in go as c to go calls (callback) are expensive
func (pb *PerfBuffer) poll(timeout int) error {
defer pb.wg.Done()

for {
select {
case <-pb.stop:
return nil
default:
err := C.perf_buffer__poll(pb.pb, C.int(timeout))
if err < 0 {
if syscall.Errno(-err) == syscall.EINTR {
continue
}
return fmt.Errorf("error polling perf buffer: %d", err)
}
}
}
}
106 changes: 106 additions & 0 deletions buf-ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package libbpfgo

/*
#cgo LDFLAGS: -lelf -lz
#include "libbpfgo.h"
*/
import "C"

import (
"fmt"
"sync"
"syscall"
)

//
// RingBuffer
//

type RingBuffer struct {
rb *C.struct_ring_buffer
bpfMap *BPFMap
slot uint
stop chan struct{}
closed bool
wg sync.WaitGroup
}

// Poll will wait until timeout in milliseconds to gather
// data from the ring buffer.
func (rb *RingBuffer) Poll(timeout int) {
rb.stop = make(chan struct{})
rb.wg.Add(1)
go rb.poll(timeout)
}

// Deprecated: use RingBuffer.Poll() instead.
func (rb *RingBuffer) Start() {
rb.Poll(300)
}

func (rb *RingBuffer) Stop() {
if rb.stop != nil {
// Tell the poll goroutine that it's time to exit
close(rb.stop)

// The event channel should be drained here since the consumer
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
eventChan := eventChannels.get(rb.slot).(chan []byte)
go func() {
// revive:disable:empty-block
for range eventChan {
}
// revive:enable:empty-block
}()

// Wait for the poll goroutine to exit
rb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(eventChan)

// This allows Stop() to be called multiple times safely
rb.stop = nil
}
}

func (rb *RingBuffer) Close() {
if rb.closed {
return
}
rb.Stop()
C.ring_buffer__free(rb.rb)
eventChannels.remove(rb.slot)
rb.closed = true
}

func (rb *RingBuffer) isStopped() bool {
select {
case <-rb.stop:
return true
default:
return false
}
}

func (rb *RingBuffer) poll(timeout int) error {
defer rb.wg.Done()

for {
err := C.ring_buffer__poll(rb.rb, C.int(timeout))
if rb.isStopped() {
break
}

if err < 0 {
if syscall.Errno(-err) == syscall.EINTR {
continue
}
return fmt.Errorf("error polling ring buffer: %d", err)
}
}
return nil
}

0 comments on commit c192943

Please sign in to comment.