-
Notifications
You must be signed in to change notification settings - Fork 694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Add support for overwritable buffer. #953
perf: Add support for overwritable buffer. #953
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Backported""" opened discussions from previous PR.
@@ -315,6 +326,10 @@ func (pr *Reader) ReadInto(rec *Record) error { | |||
pr.mu.Lock() | |||
defer pr.mu.Unlock() | |||
|
|||
if pr.overwritable && !pr.paused { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lmd 5 days ago:
The access to pr.paused is racy since Pause can modify it concurrently, you need to acquire pr.pauseMu to access safely. This creates another problem unfortunately:
go func() { reader.Read() }() // This must not block, even if Read is blocked in poller.Wait! reader.Pause()Using an atomic variable for pr.paused doesn't help: we need to ensure that the ring stays paused while we read from it, otherwise we race with the kernel.
I'm not yet sure how best to fix this :(
eiffel-fl, 5 days ago:
You are right, I just wanted to minimize the critical section by not taking the lock to read this variable.
But it seems indeed better to have the whole function running under lock.
lmd, 5 days ago:
But it seems indeed better to have the whole function running under lock.
I think that's not possible since then calling .Pause would also block. Not sure how to fix that yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed a fixup that takes pauseMu at the start of the function, and drops it before entering the poller. Not super clean but the easiest thing I could come up with.
perf/ring.go
Outdated
// +---+------+----------+-------+---+--+ | ||
// This code snippet was highly inspired by gobpf: | ||
// https://github.com/iovisor/gobpf/blob/16120a1bf4d4abc1f9cf37fecfb86009a1631b9f/elf/perf.go#L148 | ||
if (read & rr.mask) < previousReadMasked { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lmd, 5 days ago:
This is not necessary, since it's OK for read to return
n < len(p)
(see https://pkg.go.dev/io#Reader). It's the callers responsibility to call Read until the buffer is filled, which we do inreadRawSample
, etc viaio.ReadFull
:Lines 113 to 115 in 8c2039b
if _, err := io.ReadFull(rd, buf); err != nil { return nil, fmt.Errorf("read sample size: %v", err) } Instead, copy what the current implementation does and truncate the read:
Lines 142 to 150 in 8c2039b
// Truncate if the read wraps in the ring buffer if remainder := cap(rr.ring) - start; n > remainder { n = remainder } // Truncate if there isn't enough data if remainder := int(rr.head - rr.tail); n > remainder { n = remainder }
eiffel-fl, 5 days ago:
I understand
Read()
can return less than the buffer size. But if we do this here, it means we will have uncomplete event in the buffer? And where the read should start after this?
lmd, yesterday:
When entering into
Read()
we have three important data:* rr.read: the last position we've read at * rr.head: the last position the kernel has written at * len(p): the length of data we want
Let's ignore wrapping of read and head for a second. We can define
unread := read - head
.* if unread == 0: return io.EOF since there is no more data. * if len(p) > unread: only copy unread bytes. the caller will call again and hit the case above. * else: copy len(p) bytes
Keep in mind that
Read
is invoked viaio.ReadFull
which will turn an io.EOF at the wrong position into anErrUnexpectedEOF
.You're right that we will read incomplete events into the buffer, but that's OK. It's not the readers responsibility to detect an incomplete event. Instead we do that in readRecordFromRing as suggested in #863 (comment)
effeil-fl, yesterday:
Let's ignore wrapping of read and head for a second. We can define
unread := read - head
.* if unread == 0: return io.EOF since there is no more data.
After some thinking, I think this does not work. Indeed, we want to start to read, from left to right, beginning from head. So, on the first call, this would be true and we will actually never read anything.
I am clearly confused by the fact you read the header and the data in separated function and the fact the loop is called by
ReadFull()
, I need to adapt my implementation to this changes.
effeil-fl, yesterday:
I think I found a workaround. I will take another look before opening a new PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added previousHead
to avoid testing on len(p)
.
But I do not understand your comment, particularly the part about "ignor[ing] wrapping".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we want to start to read, from left to right, beginning from head. So, on the first call, this would be true and we will actually never read anything.
I think that is because you're initializing head == read
in readHead()
. See my larger comment.
perf/ring.go
Outdated
// +---+------+----------+-------+---+--+ | ||
// |..E|D....D|C........C|B.....B|A..|E.| | ||
// +---+------+----------+-------+---+--+ | ||
if read-rr.head+uint64(len(p)) > rr.mask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lmd, 5 days ago:
I'm guessing this code is meant to handle a partially overwritten record? I think a better solution would be to handle this in
Reader.readRecordFromRing
. This will allow you to drop this clause altogether.func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { defer ring.writeTail() rec.CPU = ring.cpu err := readRecord(ring, rec, pr.eventHeader) if pr.overWritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) { return errEOR } return err }We need to handle both
io.EOF
andio.ErrUnexpectedEOF
since the record can be truncated at any point. It'd be even nicer to handle this inreadRecord
, but that seems a lot more involved.
eiffel-fl, 5 days ago:
I do not understand what you are advising here. How can I catch
io.EOF
inreadRecordFromRing()
if I do not return it from there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this code checks we do not read more than rr.mask
, i.e. we do not loop around, so I am not sure to understand your comment there?
perf/reader_test.go
Outdated
@@ -263,6 +265,287 @@ func TestPerfReaderLostSample(t *testing.T) { | |||
} | |||
} | |||
|
|||
func outputSamplesProgOverwritable(sampleSizes ...int) (*ebpf.Program, *ebpf.Map, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lmd, 5 days ago:
Why can't you use mustOutputSamplesProg?
eiffel-fl, 5 days ago:
outputSamplesProg()
write several times the same value to the buffer, i.e.0x0102030404030201
, while in my test I need to read different value each time to ensure the over writting worked correctly.
lmd, yesterday:
Looks like you're just writing out all
0
, am I missing something? https://github.com/cilium/ebpf/pull/863/files#diff-feb05b5561903cb2ac8643ec174ee12321237a86a4f75f767e62ce8fd9d40367R287
eiffel-fl, yesterday:
I think you are missing this one where I store
i
inR0
before writingR0
at the corresponding address: https://github.com/cilium/ebpf/pull/863/files#diff-feb05b5561903cb2ac8643ec174ee12321237a86a4f75f767e62ce8fd9d40367R303
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to summarise where I want to get.
- ringReader.Read() is only concerned with reading available data from the ring.
It doesn't know or care that the ring contains samples. - readRecord() is only concerned with assembling a record. If that doesn't
work for some reason it passes on the error from ringReader.Read() (and ReadFull
for io.ErrUnexpectedEOF).
The overwritten ring makes this separation a bit annoying, since we are basically
guaranteed that the ring contains "broken" half overwritten records. Those will
trigger io.EOF and io.ErrUnexpectedEOF. Therefore:
- Reader.readRecordFromRing() is a convenient place to discard errors that
we know are caused by a partially overwritten record.
Seems like we agree on 2 and 3, but 1 is where we differ. The trickiest part is
to figure out how much data is in the ring. From what we get from the kernel
we can deduce the following (head == meta.Data_head, cap == cap(ring)
)
head == 0
: the kernel has never written to this ring. (in theory it could
also have written 2^64 bytes but that is large enough to ignore.)head > 0 - cap
: the ring has been partially written. it contains
cap - (head & mask)
bytes.head <= 0 - cap
: the ring has been fully written. it containscap
bytes.
Using this we can rewrite reverseReader.loadHead
:
func (rr *reverseReader) loadHead() {
rr.head = atomic.LoadUint64(&rr.meta.Data_head)
switch {
case rr.head == 0:
// renamed read to tail since read and head only differ in one char
rr.tail = rr.head
case rr.head > 0 - cap(rr.ring):
// mind boggling due to under / overflow
rr.tail = 0
default:
// ring has been written
rr.tail = rr.head + cap(rr.ring)
}
}
Read becomes:
func (rr *reverseReader) Read(p []byte) (int, error) {
start := int(rr.head & rr.mask)
n := len(p)
// Truncate if the read wraps in the ring buffer
if remainder := cap(rr.ring) - start; n > remainder {
n = remainder
}
// Truncate if there isn't enough data
if remainder := int(rr.tail - rr.head); n > remainder {
n = remainder
}
copy(p, rr.ring[start:start+n])
rr.tail -= uint64(n)
if rr.tail == rr.head {
return n, io.EOF
}
return n, nil
}
This code is untested so ymmv, but hopefully you get the idea.
perf/ring.go
Outdated
// +---+------+----------+-------+---+--+ | ||
// This code snippet was highly inspired by gobpf: | ||
// https://github.com/iovisor/gobpf/blob/16120a1bf4d4abc1f9cf37fecfb86009a1631b9f/elf/perf.go#L148 | ||
if (read & rr.mask) < previousReadMasked { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we want to start to read, from left to right, beginning from head. So, on the first call, this would be true and we will actually never read anything.
I think that is because you're initializing head == read
in readHead()
. See my larger comment.
I think we are misunderstanding each other. I tested your code and tried to adapt it but it does not work and I do not actually see how to make it work.
If you take a look at Inspektor Gadget code, I indeed keep the previous head position in a dedicated variable: Please feel free to share if there is something you think I misunderstand. |
2662ca1
to
54bb03c
Compare
Here is an implementation that passes the tests you wrote: lmb@e9f7189 You're right that what I had in my comment didn't work out of the box, see below.
The diagram was very helpful, thanks! I now understand that we need to adjust
Yes, there are multiple ways of implementing this logic. I want to do it in a way that minimises the difference between the existing forward reader and the new reverse reader. This makes it easier to review for me and other contributors. |
54bb03c
to
361f0cd
Compare
I tested it and it works fine, thank you!
With current code, we will read:
Sadly, in 3, the kernel writes only some events in the buffer, so with the actual code, it means the intersection of [H, H + B] and [H', H' + B] is not empty, which will lead us to read events twice.
I should be able to modify your code to deal with this. What do you think about it?
I would like to have this diagram written somewhere, either in code comment or commit message, as it does really help understanding.
Definitely! My goal with Inspektor Gadget was only to have "just works" code. |
6cf5910
to
759f916
Compare
Ah, good catch! I think this might fix the problem: 0e71c04
Why doesn't 1) emit sample via mustOutputSamplesProg() 2) read until DeadlineExceeded 3) emit another sample 4) read again and assert that you don't read the first sample work? |
This is possible we can do without
Normally, I modified the tests to add a case where I read the buffer a second time. |
0e71c04
to
55bf903
Compare
Urgh, yes, sorry. Can you add tests that only exercise |
93224ff
to
df22084
Compare
Done! |
df22084
to
6888a72
Compare
6888a72
to
6e2857f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eiffel-fl I've pushed a bunch of fixups that address my remaining concerns. Please take a look at them and tell me if they make sense and whether you agree with the changes.
@@ -315,6 +326,10 @@ func (pr *Reader) ReadInto(rec *Record) error { | |||
pr.mu.Lock() | |||
defer pr.mu.Unlock() | |||
|
|||
if pr.overwritable && !pr.paused { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed a fixup that takes pauseMu at the start of the function, and drops it before entering the poller. Not super clean but the easiest thing I could come up with.
6e2857f
to
237e4ad
Compare
Hi!
Thank you a lot for these fixups, I tested them and merge them to the base commit! |
I've fixed it like this: diff --git a/go.mod b/go.mod
index 317621c..a0e9b84 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@ require (
github.com/frankban/quicktest v1.14.4
github.com/google/go-cmp v0.5.9
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
- golang.org/x/sys v0.2.0
+ golang.org/x/sys v0.6.0
)
require (
diff --git a/go.sum b/go.sum
index 0456fd4..0cdba88 100644
--- a/go.sum
+++ b/go.sum
@@ -12,5 +12,5 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
-golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
-golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
diff --git a/internal/unix/types_linux.go b/internal/unix/types_linux.go
index a37804f..ac3eb83 100644
--- a/internal/unix/types_linux.go
+++ b/internal/unix/types_linux.go
@@ -63,11 +63,7 @@ const (
PERF_EVENT_IOC_ENABLE = linux.PERF_EVENT_IOC_ENABLE
PERF_EVENT_IOC_SET_BPF = linux.PERF_EVENT_IOC_SET_BPF
PerfBitWatermark = linux.PerfBitWatermark
- // In C, struct perf_event_attr has a write_backward field which is a bit in a
- // 64-length bitfield.
- // In Golang, there is a Bits field which 64 bits long.
- // From C, we can deduce write_backward is the is the 27th bit.
- PerfBitWriteBackward = linux.CBitFieldMaskBit27
+ PerfBitWriteBackward = linux.PerfBitWriteBackward
PERF_SAMPLE_RAW = linux.PERF_SAMPLE_RAW
PERF_FLAG_FD_CLOEXEC = linux.PERF_FLAG_FD_CLOEXEC
RLIM_INFINITY = linux.RLIM_INFINITY
diff --git a/internal/unix/types_other.go b/internal/unix/types_other.go
index a51d33c..28e3a8f 100644
--- a/internal/unix/types_other.go
+++ b/internal/unix/types_other.go
@@ -67,6 +67,7 @@ const (
PERF_EVENT_IOC_ENABLE
PERF_EVENT_IOC_SET_BPF
PerfBitWatermark
+ PerfBitWriteBackward
PERF_SAMPLE_RAW
PERF_FLAG_FD_CLOEXEC
RLIM_INFINITY |
5388c91
to
9dd165c
Compare
The follow up commit introduces reverseReader, and splitting the rename makes the diff easier to understand. Signed-off-by: Lorenz Bauer <lmb@isovalent.com>
Compared to conventional perf buffer, an overwritable perf buffer never stops writing. Instead, the oldest data are overwritten by the newest. As a consequence, the ReaderOptions was modified to add a new boolean to indicate if perf buffer should be overwritable. Overwritable perf buffers can only be read when they are paused, to avoid a race with the kernel. Co-developed-by: Lorenz Bauer <lmb@isovalent.com> Signed-off-by: Francis Laniel <flaniel@linux.microsoft.com>
9dd165c
to
1f81fa5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I had to add a separate commit which renames forwardReader to make GH display the diff more sensibly.
Thanks for sticking with this PR and for answering my questions, I know it took a long time!
I then understand why you asked for this to be pushed upstream :D!
You are welcome! I thank you for the good review and for your patience too! This code is far from being easy but we come up with a good solution and were able to merge it! Thank you :D :D :D! |
Hi.
This PR supersedes #814 and #863.
Compared to conventionnal perf buffer, an overwritable perf buffer never stops writing.
Instead, the oldest data are overwritten by the newest.
As a consequence, the
ReaderOptions
was modified to add a new boolean to indicate if perf buffer is overwritable.To read these buffers, a new function
ReadCallback()
was introduced. This function reads the buffer from the current head until it wraps the buffer (or there are no more data to read).For each read record, the callback is called.
This PR also adds test which validates the new function is working.
If you see any way to improve this contribution, feel free to share.
Best regards and thank you in advance.