Skip to content

Commit

Permalink
hubble/container: Properly deal with nil values in RingReader
Browse files Browse the repository at this point in the history
When the ring-buffer is not full, we must treat nil values the same as
reads outside of the readable indices. This commit ensures that `read`
treats nil values the same way `readFrom` does.

This has an impact on the RingReader, as we should not try to either
increase or decrease the index if the current value is unreadable.
If the current index is unreadable, then we have either read past the
last written value, or we have tried to access an index which has
already been overwritten.

This commit therefore adds an error return type to
`RingReader.{Next,Prev}` in order to determine which of the two cases
occured.

Most notably, `Next` may now return ErrInvalidData. This means this
means that the value it tried to access just got overwritten by the
writer from behind. In such cases we want to return an error, as
otherwise `RingReader.Next` would silently stop reading and
subsequentially terminate the request as if it had reached most
recent value in the ring buffer (which it has not).

The error is also added to `Previous`, however in that case
we only ever expect ErrInvalidData, as it traverses the ring buffer
in the opposite direction of the writer.

The check for concurrent writes is currently missing from `readFrom`,
which can suffer from the same problem. This will be added in a
follow-up PR.

Fixes: cilium/hubble#131

Signed-off-by: Sebastian Wicki <sebastian@isovalent.com>
  • Loading branch information
gandro committed May 6, 2020
1 parent c7cdb6d commit 4229ac8
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 85 deletions.
59 changes: 47 additions & 12 deletions pkg/hubble/container/ring.go
Expand Up @@ -16,6 +16,8 @@ package container

import (
"context"
"errors"
"io"
"sync/atomic"
"unsafe"

Expand All @@ -24,6 +26,11 @@ import (
"github.com/cilium/cilium/pkg/lock"
)

var (
// ErrInvalidRead indicates that the requested position can no longer be read.
ErrInvalidRead = errors.New("read position is no longer valid")
)

// Ring is a ring buffer that stores *v1.Event
type Ring struct {
// mask to calculate the index to write into 'data'.
Expand All @@ -38,6 +45,8 @@ type Ring struct {
cycleExp uint8
// cycleMask is the mask used to calculate the correct cycle of a position.
cycleMask uint64
// halfCycle is half the total number of cycles
halfCycle uint64
// dataLen is the length of the internal buffer.
dataLen uint64
// data is the internal buffer of this ring buffer.
Expand Down Expand Up @@ -66,10 +75,14 @@ func NewRing(n int) *Ring {
l := math.GetMask(msb)
dataLen := uint64(l + 1)
cycleExp := uint8(math.MSB(l+1)) - 1
// half cycle is (^uint64(0)/dataLen)/2 == (^uint64(0)>>cycleExp)>>1
halfCycle := (^uint64(0) >> cycleExp) >> 1

return &Ring{
mask: l,
cycleExp: cycleExp,
cycleMask: ^uint64(0) >> cycleExp,
halfCycle: halfCycle,
dataLen: dataLen,
data: make([]*v1.Event, dataLen, dataLen),
notifyMu: lock.Mutex{},
Expand Down Expand Up @@ -153,9 +166,12 @@ func (r *Ring) LastWrite() uint64 {
return atomic.LoadUint64(&r.write) - 1
}

// read reads the *v1.Event from the given read position. Returns false if
// that position is no longer available to be read, returns true otherwise.
func (r *Ring) read(read uint64) (*v1.Event, bool) {
// read the *v1.Event from the given read position. Returns an error if
// the position is not valid. A position is invalid either because it has
// already been overwritten by the writer (in which case ErrInvalidRead is
// returned) or because the position is ahead of the writer (in which case
// io.EOF is returned).
func (r *Ring) read(read uint64) (*v1.Event, error) {
readIdx := read & r.mask
event := r.dataLoadAtomic(readIdx)

Expand All @@ -180,11 +196,34 @@ func (r *Ring) read(read uint64) (*v1.Event, bool) {
// cycle: 1f 1f 1f 1f 1f 1f 1f 1f 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1
readCycle := read >> r.cycleExp
writeCycle := lastWrite >> r.cycleExp
if (readCycle == writeCycle && readIdx < lastWriteIdx) ||
(readCycle == (writeCycle-1)&r.cycleMask && readIdx > lastWriteIdx) {
return event, true

prevWriteCycle := (writeCycle - 1) & r.cycleMask
maxWriteCycle := (writeCycle + r.halfCycle) & r.cycleMask

switch {
// Case: Reader in current cycle and accessing valid indices
case readCycle == writeCycle && readIdx < lastWriteIdx:
if event == nil {
// This case should never happen, as the writer must never write
// a nil value. In case it happens anyway, we just stop reading.
return nil, io.EOF
}
return event, nil
// Case: Reader in previous cycle and accessing valid indices
case readCycle == prevWriteCycle && readIdx > lastWriteIdx:
if event == nil {
// If the ring buffer is not yet fully populated, we treat nil
// as a value which is about to be overwritten
return nil, ErrInvalidRead
}
return event, nil
// Case: Reader ahead of writer
case readCycle >= writeCycle && readCycle < maxWriteCycle:
return nil, io.EOF
// Case: Reader behind writer
default:
return nil, ErrInvalidRead
}
return nil, false
}

// readFrom continues to read from the given position until the context is
Expand All @@ -194,10 +233,6 @@ func (r *Ring) readFrom(ctx context.Context, read uint64) <-chan *v1.Event {
const returnedBufferChLen = 1000
ch := make(chan *v1.Event, returnedBufferChLen)
go func() {
// halfCycle is the middle of a cycle.
// a half cycle is (^uint64(0)/r.dataLen)/2
// which translates into (^uint64(0)>>r.dataLen)>>1
halfCycle := (^uint64(0) >> r.cycleExp) >> 1
defer func() {
close(ch)
}()
Expand Down Expand Up @@ -260,7 +295,7 @@ func (r *Ring) readFrom(ctx context.Context, read uint64) <-chan *v1.Event {
// write: f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 fa fb fc fd fe ff 0 1 2 3 4 5 6 7 8 9 a b c d e f
// index: 0 1 2 3 4 5 6 7 8 9 a b c d e f 0 1 2 3 4 5 6 7 8 9 a b c d e f
// cycle: 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
case event == nil || readCycle >= (writeCycle+1)&r.cycleMask && readCycle < (halfCycle+writeCycle)&r.cycleMask:
case event == nil || readCycle >= (writeCycle+1)&r.cycleMask && readCycle < (r.halfCycle+writeCycle)&r.cycleMask:
// The writer has already written a new event so there's no
// need to stop the reader.

Expand Down
41 changes: 25 additions & 16 deletions pkg/hubble/container/ring_reader.go
Expand Up @@ -39,27 +39,36 @@ func NewRingReader(ring *Ring, start uint64) *RingReader {
}

// Previous reads the event at the current position and decrement the read
// position. When no more event can be read, Previous returns nil.
func (r *RingReader) Previous() *v1.Event {
var e *v1.Event
// when the ring is not full, ring.read() may return <nil>, true
// in such a case, one should continue reading
for ok := true; e == nil && ok; r.idx-- {
e, ok = r.ring.read(r.idx)
// position. Returns ErrInvalidRead if there are no older entries.
func (r *RingReader) Previous() (*v1.Event, error) {
// We only expect ErrInvalidRead to be returned when reading backwards,
// therefore we don't try to handle any errors here.
e, err := r.ring.read(r.idx)
if err != nil {
return nil, err
}
return e
r.idx--
return e, nil
}

// Next reads the event at the current position and increment the read position.
// When no more event can be read, Next returns nil.
func (r *RingReader) Next() *v1.Event {
var e *v1.Event
// when the ring is not full, ring.read() may return <nil>, true
// in such a case, one should continue reading
for ok := true; e == nil && ok; r.idx++ {
e, ok = r.ring.read(r.idx)
// Returns io.EOF if there are no more entries. May return ErrInvalidRead
// if the writer overtook this RingReader.
func (r *RingReader) Next() (*v1.Event, error) {
// There are two possible errors returned by read():
//
// Reader ahead of writer (io.EOF): We have read past the writer.
// In this case, we want to return nil and don't bump the index, as we have
// read all existing values that exist now.
// Writer ahead of reader (ErrInvalidRead): The writer has already
// overwritten the values we wanted to read. In this case, we want to
// propagate the error, as trying to catch up would be very racy.
e, err := r.ring.read(r.idx)
if err != nil {
return nil, err
}
return e
r.idx++
return e, nil
}

// NextFollow reads the event at the current position and increment the read
Expand Down
57 changes: 41 additions & 16 deletions pkg/hubble/container/ring_reader_test.go
Expand Up @@ -19,6 +19,7 @@ package container
import (
"context"
"fmt"
"io"
"testing"
"time"

Expand All @@ -35,9 +36,10 @@ func TestRingReader_Previous(t *testing.T) {
ring.Write(&v1.Event{Timestamp: &timestamp.Timestamp{Seconds: int64(i)}})
}
tests := []struct {
start uint64
count int
want []*v1.Event
start uint64
count int
want []*v1.Event
wantErr error
}{
{
start: 13,
Expand Down Expand Up @@ -70,15 +72,19 @@ func TestRingReader_Previous(t *testing.T) {
},
}, {
start: 0,
count: 2,
count: 1,
want: []*v1.Event{
{Timestamp: &timestamp.Timestamp{Seconds: 0}},
nil,
},
}, {
start: 14,
count: 1,
want: []*v1.Event{nil},
start: 14,
count: 1,
wantErr: io.EOF,
},
{
start: ^uint64(0),
count: 1,
wantErr: ErrInvalidRead,
},
}
for _, tt := range tests {
Expand All @@ -87,7 +93,14 @@ func TestRingReader_Previous(t *testing.T) {
reader := NewRingReader(ring, tt.start)
var got []*v1.Event
for i := 0; i < tt.count; i++ {
got = append(got, reader.Previous())
event, err := reader.Previous()
if err != tt.wantErr {
t.Errorf(`"%s" error = %v, wantErr %v`, name, err, tt.wantErr)
}
if err != nil {
return
}
got = append(got, event)
}
assert.Equal(t, tt.want, got)
})
Expand All @@ -101,9 +114,10 @@ func TestRingReader_Next(t *testing.T) {
}

tests := []struct {
start uint64
count int
want []*v1.Event
start uint64
count int
want []*v1.Event
wantErr error
}{
{
start: 0,
Expand Down Expand Up @@ -135,9 +149,13 @@ func TestRingReader_Next(t *testing.T) {
{Timestamp: &timestamp.Timestamp{Seconds: 13}},
},
}, {
start: 14,
count: 1,
want: []*v1.Event{nil},
start: ^uint64(0),
count: 1,
wantErr: ErrInvalidRead,
}, {
start: 14,
count: 1,
wantErr: io.EOF,
},
}
for _, tt := range tests {
Expand All @@ -146,7 +164,14 @@ func TestRingReader_Next(t *testing.T) {
reader := NewRingReader(ring, tt.start)
var got []*v1.Event
for i := 0; i < tt.count; i++ {
got = append(got, reader.Next())
event, err := reader.Next()
if err != tt.wantErr {
t.Errorf(`"%s" error = %v, wantErr %v`, name, err, tt.wantErr)
}
if err != nil {
return
}
got = append(got, event)
}
assert.Equal(t, tt.want, got)
})
Expand Down

0 comments on commit 4229ac8

Please sign in to comment.