From 4229ac857e39a91519ed6edc6f52c5cb8b89b1ca Mon Sep 17 00:00:00 2001 From: Sebastian Wicki Date: Mon, 4 May 2020 19:42:23 +0200 Subject: [PATCH] hubble/container: Properly deal with nil values in RingReader When the ring-buffer is not full, we must treat nil values the same as reads outside of the readable indices. This commit ensures that `read` treats nil values the same way `readFrom` does. This has an impact on the RingReader, as we should not try to either increase or decrease the index if the current value is unreadable. If the current index is unreadable, then we have either read past the last written value, or we have tried to access an index which has already been overwritten. This commit therefore adds an error return type to `RingReader.{Next,Prev}` in order to determine which of the two cases occured. Most notably, `Next` may now return ErrInvalidData. This means this means that the value it tried to access just got overwritten by the writer from behind. In such cases we want to return an error, as otherwise `RingReader.Next` would silently stop reading and subsequentially terminate the request as if it had reached most recent value in the ring buffer (which it has not). The error is also added to `Previous`, however in that case we only ever expect ErrInvalidData, as it traverses the ring buffer in the opposite direction of the writer. The check for concurrent writes is currently missing from `readFrom`, which can suffer from the same problem. This will be added in a follow-up PR. Fixes: cilium/hubble#131 Signed-off-by: Sebastian Wicki --- pkg/hubble/container/ring.go | 59 ++++++++++++++++---- pkg/hubble/container/ring_reader.go | 41 ++++++++------ pkg/hubble/container/ring_reader_test.go | 57 +++++++++++++------ pkg/hubble/container/ring_test.go | 70 ++++++++++++------------ pkg/hubble/observer/local_observer.go | 22 ++++++-- 5 files changed, 164 insertions(+), 85 deletions(-) diff --git a/pkg/hubble/container/ring.go b/pkg/hubble/container/ring.go index ef0e978d72bf..1d15c06e443b 100644 --- a/pkg/hubble/container/ring.go +++ b/pkg/hubble/container/ring.go @@ -16,6 +16,8 @@ package container import ( "context" + "errors" + "io" "sync/atomic" "unsafe" @@ -24,6 +26,11 @@ import ( "github.com/cilium/cilium/pkg/lock" ) +var ( + // ErrInvalidRead indicates that the requested position can no longer be read. + ErrInvalidRead = errors.New("read position is no longer valid") +) + // Ring is a ring buffer that stores *v1.Event type Ring struct { // mask to calculate the index to write into 'data'. @@ -38,6 +45,8 @@ type Ring struct { cycleExp uint8 // cycleMask is the mask used to calculate the correct cycle of a position. cycleMask uint64 + // halfCycle is half the total number of cycles + halfCycle uint64 // dataLen is the length of the internal buffer. dataLen uint64 // data is the internal buffer of this ring buffer. @@ -66,10 +75,14 @@ func NewRing(n int) *Ring { l := math.GetMask(msb) dataLen := uint64(l + 1) cycleExp := uint8(math.MSB(l+1)) - 1 + // half cycle is (^uint64(0)/dataLen)/2 == (^uint64(0)>>cycleExp)>>1 + halfCycle := (^uint64(0) >> cycleExp) >> 1 + return &Ring{ mask: l, cycleExp: cycleExp, cycleMask: ^uint64(0) >> cycleExp, + halfCycle: halfCycle, dataLen: dataLen, data: make([]*v1.Event, dataLen, dataLen), notifyMu: lock.Mutex{}, @@ -153,9 +166,12 @@ func (r *Ring) LastWrite() uint64 { return atomic.LoadUint64(&r.write) - 1 } -// read reads the *v1.Event from the given read position. Returns false if -// that position is no longer available to be read, returns true otherwise. -func (r *Ring) read(read uint64) (*v1.Event, bool) { +// read the *v1.Event from the given read position. Returns an error if +// the position is not valid. A position is invalid either because it has +// already been overwritten by the writer (in which case ErrInvalidRead is +// returned) or because the position is ahead of the writer (in which case +// io.EOF is returned). +func (r *Ring) read(read uint64) (*v1.Event, error) { readIdx := read & r.mask event := r.dataLoadAtomic(readIdx) @@ -180,11 +196,34 @@ func (r *Ring) read(read uint64) (*v1.Event, bool) { // cycle: 1f 1f 1f 1f 1f 1f 1f 1f 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 readCycle := read >> r.cycleExp writeCycle := lastWrite >> r.cycleExp - if (readCycle == writeCycle && readIdx < lastWriteIdx) || - (readCycle == (writeCycle-1)&r.cycleMask && readIdx > lastWriteIdx) { - return event, true + + prevWriteCycle := (writeCycle - 1) & r.cycleMask + maxWriteCycle := (writeCycle + r.halfCycle) & r.cycleMask + + switch { + // Case: Reader in current cycle and accessing valid indices + case readCycle == writeCycle && readIdx < lastWriteIdx: + if event == nil { + // This case should never happen, as the writer must never write + // a nil value. In case it happens anyway, we just stop reading. + return nil, io.EOF + } + return event, nil + // Case: Reader in previous cycle and accessing valid indices + case readCycle == prevWriteCycle && readIdx > lastWriteIdx: + if event == nil { + // If the ring buffer is not yet fully populated, we treat nil + // as a value which is about to be overwritten + return nil, ErrInvalidRead + } + return event, nil + // Case: Reader ahead of writer + case readCycle >= writeCycle && readCycle < maxWriteCycle: + return nil, io.EOF + // Case: Reader behind writer + default: + return nil, ErrInvalidRead } - return nil, false } // readFrom continues to read from the given position until the context is @@ -194,10 +233,6 @@ func (r *Ring) readFrom(ctx context.Context, read uint64) <-chan *v1.Event { const returnedBufferChLen = 1000 ch := make(chan *v1.Event, returnedBufferChLen) go func() { - // halfCycle is the middle of a cycle. - // a half cycle is (^uint64(0)/r.dataLen)/2 - // which translates into (^uint64(0)>>r.dataLen)>>1 - halfCycle := (^uint64(0) >> r.cycleExp) >> 1 defer func() { close(ch) }() @@ -260,7 +295,7 @@ func (r *Ring) readFrom(ctx context.Context, read uint64) <-chan *v1.Event { // write: f0 f1 f2 f3 f4 f5 f6 f7 f8 f9 fa fb fc fd fe ff 0 1 2 3 4 5 6 7 8 9 a b c d e f // index: 0 1 2 3 4 5 6 7 8 9 a b c d e f 0 1 2 3 4 5 6 7 8 9 a b c d e f // cycle: 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 1f 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 - case event == nil || readCycle >= (writeCycle+1)&r.cycleMask && readCycle < (halfCycle+writeCycle)&r.cycleMask: + case event == nil || readCycle >= (writeCycle+1)&r.cycleMask && readCycle < (r.halfCycle+writeCycle)&r.cycleMask: // The writer has already written a new event so there's no // need to stop the reader. diff --git a/pkg/hubble/container/ring_reader.go b/pkg/hubble/container/ring_reader.go index fe8f207e9b0d..b56d53a9f3fd 100644 --- a/pkg/hubble/container/ring_reader.go +++ b/pkg/hubble/container/ring_reader.go @@ -39,27 +39,36 @@ func NewRingReader(ring *Ring, start uint64) *RingReader { } // Previous reads the event at the current position and decrement the read -// position. When no more event can be read, Previous returns nil. -func (r *RingReader) Previous() *v1.Event { - var e *v1.Event - // when the ring is not full, ring.read() may return , true - // in such a case, one should continue reading - for ok := true; e == nil && ok; r.idx-- { - e, ok = r.ring.read(r.idx) +// position. Returns ErrInvalidRead if there are no older entries. +func (r *RingReader) Previous() (*v1.Event, error) { + // We only expect ErrInvalidRead to be returned when reading backwards, + // therefore we don't try to handle any errors here. + e, err := r.ring.read(r.idx) + if err != nil { + return nil, err } - return e + r.idx-- + return e, nil } // Next reads the event at the current position and increment the read position. -// When no more event can be read, Next returns nil. -func (r *RingReader) Next() *v1.Event { - var e *v1.Event - // when the ring is not full, ring.read() may return , true - // in such a case, one should continue reading - for ok := true; e == nil && ok; r.idx++ { - e, ok = r.ring.read(r.idx) +// Returns io.EOF if there are no more entries. May return ErrInvalidRead +// if the writer overtook this RingReader. +func (r *RingReader) Next() (*v1.Event, error) { + // There are two possible errors returned by read(): + // + // Reader ahead of writer (io.EOF): We have read past the writer. + // In this case, we want to return nil and don't bump the index, as we have + // read all existing values that exist now. + // Writer ahead of reader (ErrInvalidRead): The writer has already + // overwritten the values we wanted to read. In this case, we want to + // propagate the error, as trying to catch up would be very racy. + e, err := r.ring.read(r.idx) + if err != nil { + return nil, err } - return e + r.idx++ + return e, nil } // NextFollow reads the event at the current position and increment the read diff --git a/pkg/hubble/container/ring_reader_test.go b/pkg/hubble/container/ring_reader_test.go index 48cfeb9c0acd..5e3304f170c2 100644 --- a/pkg/hubble/container/ring_reader_test.go +++ b/pkg/hubble/container/ring_reader_test.go @@ -19,6 +19,7 @@ package container import ( "context" "fmt" + "io" "testing" "time" @@ -35,9 +36,10 @@ func TestRingReader_Previous(t *testing.T) { ring.Write(&v1.Event{Timestamp: ×tamp.Timestamp{Seconds: int64(i)}}) } tests := []struct { - start uint64 - count int - want []*v1.Event + start uint64 + count int + want []*v1.Event + wantErr error }{ { start: 13, @@ -70,15 +72,19 @@ func TestRingReader_Previous(t *testing.T) { }, }, { start: 0, - count: 2, + count: 1, want: []*v1.Event{ {Timestamp: ×tamp.Timestamp{Seconds: 0}}, - nil, }, }, { - start: 14, - count: 1, - want: []*v1.Event{nil}, + start: 14, + count: 1, + wantErr: io.EOF, + }, + { + start: ^uint64(0), + count: 1, + wantErr: ErrInvalidRead, }, } for _, tt := range tests { @@ -87,7 +93,14 @@ func TestRingReader_Previous(t *testing.T) { reader := NewRingReader(ring, tt.start) var got []*v1.Event for i := 0; i < tt.count; i++ { - got = append(got, reader.Previous()) + event, err := reader.Previous() + if err != tt.wantErr { + t.Errorf(`"%s" error = %v, wantErr %v`, name, err, tt.wantErr) + } + if err != nil { + return + } + got = append(got, event) } assert.Equal(t, tt.want, got) }) @@ -101,9 +114,10 @@ func TestRingReader_Next(t *testing.T) { } tests := []struct { - start uint64 - count int - want []*v1.Event + start uint64 + count int + want []*v1.Event + wantErr error }{ { start: 0, @@ -135,9 +149,13 @@ func TestRingReader_Next(t *testing.T) { {Timestamp: ×tamp.Timestamp{Seconds: 13}}, }, }, { - start: 14, - count: 1, - want: []*v1.Event{nil}, + start: ^uint64(0), + count: 1, + wantErr: ErrInvalidRead, + }, { + start: 14, + count: 1, + wantErr: io.EOF, }, } for _, tt := range tests { @@ -146,7 +164,14 @@ func TestRingReader_Next(t *testing.T) { reader := NewRingReader(ring, tt.start) var got []*v1.Event for i := 0; i < tt.count; i++ { - got = append(got, reader.Next()) + event, err := reader.Next() + if err != tt.wantErr { + t.Errorf(`"%s" error = %v, wantErr %v`, name, err, tt.wantErr) + } + if err != nil { + return + } + got = append(got, event) } assert.Equal(t, tt.want, got) }) diff --git a/pkg/hubble/container/ring_test.go b/pkg/hubble/container/ring_test.go index 2813c166472d..d9d9161c4c24 100644 --- a/pkg/hubble/container/ring_test.go +++ b/pkg/hubble/container/ring_test.go @@ -20,6 +20,7 @@ import ( "container/list" "container/ring" "context" + "io" "reflect" "testing" @@ -98,11 +99,11 @@ func TestRing_Read(t *testing.T) { read uint64 } tests := []struct { - name string - fields fields - args args - want *v1.Event - want1 bool + name string + fields fields + args args + want *v1.Event + wantErr error }{ { name: "normal read for the index 7", @@ -125,8 +126,8 @@ func TestRing_Read(t *testing.T) { args: args{ read: 0x7, }, - want: &v1.Event{Timestamp: ×tamp.Timestamp{Seconds: 7}}, - want1: true, + want: &v1.Event{Timestamp: ×tamp.Timestamp{Seconds: 7}}, + wantErr: nil, }, { name: "we can't read index 0 since we just wrote into it", @@ -149,8 +150,8 @@ func TestRing_Read(t *testing.T) { args: args{ read: 0x0, }, - want: nil, - want1: false, + want: nil, + wantErr: ErrInvalidRead, }, { name: "we can't read index 0x7 since we are one writing cycle ahead", @@ -174,8 +175,8 @@ func TestRing_Read(t *testing.T) { // The next possible entry that we can read is 0x10-0x7-0x1 = 0x8 (idx: 0) read: 0x7, }, - want: nil, - want1: false, + want: nil, + wantErr: ErrInvalidRead, }, { name: "we can read index 0x8 since it's the last entry that we can read in this cycle", @@ -199,8 +200,8 @@ func TestRing_Read(t *testing.T) { // The next possible entry that we can read is 0x10-0x7-0x1 = 0x8 (idx: 0) read: 0x8, }, - want: &v1.Event{Timestamp: ×tamp.Timestamp{Seconds: 0}}, - want1: true, + want: &v1.Event{Timestamp: ×tamp.Timestamp{Seconds: 0}}, + wantErr: nil, }, { name: "we overflow write and we are trying to read the previous writes, that we can't", @@ -225,8 +226,8 @@ func TestRing_Read(t *testing.T) { // next to be read: ^uint64(0) (idx: 7), last read: 0xfffffffffffffffe (idx: 6) read: ^uint64(0), }, - want: nil, - want1: false, + want: nil, + wantErr: ErrInvalidRead, }, { name: "we overflow write and we are trying to read the previous writes, that we can", @@ -250,8 +251,8 @@ func TestRing_Read(t *testing.T) { // next to be read: ^uint64(0) (idx: 7), last read: 0xfffffffffffffffe (idx: 6) read: ^uint64(0), }, - want: &v1.Event{Timestamp: ×tamp.Timestamp{Seconds: 7}}, - want1: true, + want: &v1.Event{Timestamp: ×tamp.Timestamp{Seconds: 7}}, + wantErr: nil, }, { name: "we overflow write and we are trying to read the 2 previously cycles", @@ -277,8 +278,8 @@ func TestRing_Read(t *testing.T) { // with a cycle that was already overwritten read: ^uint64(0) - 0x7, }, - want: nil, - want1: false, + want: nil, + wantErr: ErrInvalidRead, }, } for _, tt := range tests { @@ -295,8 +296,8 @@ func TestRing_Read(t *testing.T) { if !reflect.DeepEqual(got, tt.want) { t.Errorf("Ring.read() got = %v, want %v", got, tt.want) } - if got1 != tt.want1 { - t.Errorf("Ring.read() got1 = %v, want %v", got1, tt.want1) + if got1 != tt.wantErr { + t.Errorf("Ring.read() got1 = %v, want %v", got1, tt.wantErr) } }) } @@ -503,20 +504,17 @@ func TestRingFunctionalityInParallel(t *testing.T) { t.Errorf("lastWrite should be 0x0. Got %x", lastWrite) } - entry, ok := r.read(lastWrite) - if !ok { - t.Errorf("Should be able to read position %x", lastWrite) + entry, err := r.read(lastWrite) + if err != nil { + t.Errorf("Should be able to read position %x, got %v", lastWrite, err) } if entry.Timestamp.Seconds != int64(0) { t.Errorf("Read Event should be %+v, got %+v instead", ×tamp.Timestamp{Seconds: 0}, entry.Timestamp) } lastWrite-- - entry, ok = r.read(lastWrite) - if !ok { - t.Errorf("Should be able to read position %x", lastWrite) - } - if entry != nil { - t.Errorf("Read Event should be %+v, got %+v instead", nil, entry) + entry, err = r.read(lastWrite) + if err != ErrInvalidRead { + t.Errorf("Should not be able to read position %x, got %v", lastWrite, err) } } @@ -545,14 +543,14 @@ func TestRingFunctionalitySerialized(t *testing.T) { t.Errorf("lastWrite should be 0x1. Got %x", lastWrite) } - entry, ok := r.read(lastWrite) - if ok { - t.Errorf("Should not be able to read position %x", lastWrite) + entry, err := r.read(lastWrite) + if err != io.EOF { + t.Errorf("Should not be able to read position %x, got %v", lastWrite, err) } lastWrite-- - entry, ok = r.read(lastWrite) - if !ok { - t.Errorf("Should be able to read position %x", lastWrite) + entry, err = r.read(lastWrite) + if err != nil { + t.Errorf("Should be able to read position %x, got %v", lastWrite, err) } if entry.Timestamp.Seconds != int64(0) { t.Errorf("Read Event should be %+v, got %+v instead", ×tamp.Timestamp{Seconds: 0}, entry.Timestamp) diff --git a/pkg/hubble/observer/local_observer.go b/pkg/hubble/observer/local_observer.go index d25a1edf6ea6..6ce03e2a31a4 100644 --- a/pkg/hubble/observer/local_observer.go +++ b/pkg/hubble/observer/local_observer.go @@ -16,6 +16,7 @@ package observer import ( "context" + "errors" "fmt" "io" "strings" @@ -29,7 +30,7 @@ import ( "github.com/cilium/cilium/pkg/hubble/metrics" "github.com/cilium/cilium/pkg/hubble/observer/observeroption" "github.com/cilium/cilium/pkg/hubble/parser" - "github.com/cilium/cilium/pkg/hubble/parser/errors" + parserErrors "github.com/cilium/cilium/pkg/hubble/parser/errors" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" @@ -152,7 +153,7 @@ nextEvent: flow, err := decodeFlow(s.payloadParser, pl) if err != nil { - if !errors.IsErrInvalidType(err) { + if !parserErrors.IsErrInvalidType(err) { s.log.WithError(err).WithField("data", pl.Data).Debug("failed to decode payload") } continue @@ -392,13 +393,21 @@ func (r *flowsReader) Next(ctx context.Context) (*pb.Flow, error) { default: } var e *v1.Event + var err error if r.follow { e = r.ringReader.NextFollow(ctx) } else { if r.maxFlows > 0 && (r.flowsCount >= r.maxFlows) { return nil, io.EOF } - e = r.ringReader.Next() + e, err = r.ringReader.Next() + if err != nil { + if err == container.ErrInvalidRead { + // this error is sent over the wire and presented to the user + return nil, errors.New("requested data has been overwritten and is no longer available") + } + return nil, err + } } if e == nil { return nil, io.EOF @@ -450,9 +459,12 @@ func newRingReader(ring *container.Ring, req *observerpb.GetFlowsRequest, whitel // In order to avoid buffering events, we have to rewind first to find the // correct index, then create a new reader that starts from there for i := ring.Len(); i > 0; i, idx = i-1, idx-1 { - e := reader.Previous() - if e == nil { + e, err := reader.Previous() + if err == container.ErrInvalidRead { + idx++ // we went backward 1 too far break + } else if err != nil { + return nil, err } _, ok := e.Event.(*pb.Flow) if !ok || !filters.Apply(whitelist, blacklist, e) {