-
Notifications
You must be signed in to change notification settings - Fork 12
/
ringbuffer.go
148 lines (123 loc) · 3.16 KB
/
ringbuffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package manager
import (
"errors"
"fmt"
"sync"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/ringbuf"
)
type RingBufferOptions struct {
// RingBufferSize - Size in bytes of the ring buffer. Defaults to the manager value if not set.
RingBufferSize int
// ErrChan - Reader error channel
ErrChan chan error
// DataHandler - Callback function called when a new sample was retrieved from the perf
// ring buffer.
DataHandler func(CPU int, data []byte, ringBuffer *RingBuffer, manager *Manager)
}
type RingBuffer struct {
manager *Manager
ringReader *ringbuf.Reader
wgReader sync.WaitGroup
// Map - A PerfMap has the same features as a normal Map
Map
RingBufferOptions
}
// loadNewRingBuffer - Creates a new ring buffer map instance, loads it and sets up the ring buffer reader
func loadNewRingBuffer(spec *ebpf.MapSpec, options MapOptions, ringBufferOptions RingBufferOptions) (*RingBuffer, error) {
ringBuffer := RingBuffer{
Map: Map{
arraySpec: spec,
Name: spec.Name,
MapOptions: options,
},
RingBufferOptions: ringBufferOptions,
}
var err error
if ringBuffer.array, err = ebpf.NewMap(spec); err != nil {
return nil, err
}
if ringBuffer.PinPath != "" {
if err = ringBuffer.array.Pin(ringBuffer.PinPath); err != nil {
return nil, fmt.Errorf("couldn't pin map %s at %s: %w", ringBuffer.Name, ringBuffer.PinPath, err)
}
}
return &ringBuffer, nil
}
// init - Initialize a ring buffer
func (rb *RingBuffer) init(manager *Manager) error {
rb.manager = manager
if rb.DataHandler == nil {
return fmt.Errorf("no DataHandler set for %s", rb.Name)
}
// Set default values if not already set
if rb.RingBufferSize == 0 {
rb.RingBufferSize = manager.options.DefaultRingBufferSize
}
// Initialize the underlying map structure
if err := rb.Map.init(); err != nil {
return err
}
return nil
}
// Start - Starts fetching events on a perf ring buffer
func (rb *RingBuffer) Start() error {
rb.stateLock.Lock()
defer rb.stateLock.Unlock()
if rb.state == running {
return nil
}
if rb.state < initialized {
return ErrMapNotInitialized
}
// Create and start the perf map
var err error
if rb.ringReader, err = ringbuf.NewReader(rb.array); err != nil {
return err
}
// Start listening for data
rb.wgReader.Add(1)
go func() {
var record ringbuf.Record
var err error
for {
if err = rb.ringReader.ReadInto(&record); err != nil {
if isRingBufferClosed(err) {
rb.wgReader.Done()
return
}
if rb.ErrChan != nil {
rb.ErrChan <- err
}
continue
}
rb.DataHandler(0, record.RawSample, rb, rb.manager)
}
}()
rb.state = running
return nil
}
// Stop - Stops the perf ring buffer
func (rb *RingBuffer) Stop(cleanup MapCleanupType) error {
rb.stateLock.Lock()
defer rb.stateLock.Unlock()
if rb.state <= stopped {
return nil
}
rb.state = stopped
// close ring reader
err := rb.ringReader.Close()
rb.wgReader.Wait()
// close underlying map
if errTmp := rb.Map.close(cleanup); errTmp != nil {
if err == nil {
err = errTmp
} else {
err = fmt.Errorf("%s: %w", err.Error(), errTmp)
}
}
return err
}
func isRingBufferClosed(err error) bool {
return errors.Is(err, ringbuf.ErrClosed)
}