Skip to content

Commit

Permalink
Correctly handle --first/--last when reading flows from a file/stdin
Browse files Browse the repository at this point in the history
Signed-off-by: Chance Zibolski <chance.zibolski@gmail.com>
  • Loading branch information
chancez committed Mar 29, 2023
1 parent 2b1d642 commit 7718220
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 16 deletions.
3 changes: 2 additions & 1 deletion cmd/observe/agent_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"os/signal"

Expand Down Expand Up @@ -102,7 +103,7 @@ func getAgentEventsRequest() (*observerpb.GetAgentEventsRequest, error) {
switch {
case selectorOpts.all:
// all is an alias for last=uint64_max
selectorOpts.last = ^uint64(0)
selectorOpts.last = math.MaxUint64
case selectorOpts.last == 0:
// no specific parameters were provided, just a vanilla `hubble events agent`
selectorOpts.last = defaults.EventsPrintCount
Expand Down
116 changes: 101 additions & 15 deletions cmd/observe/io_reader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ package observe
import (
"bufio"
"context"
"fmt"
"io"
"math"

"github.com/cilium/cilium/api/v1/observer"
observerpb "github.com/cilium/cilium/api/v1/observer"
"github.com/cilium/cilium/pkg/container"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/cilium/cilium/pkg/hubble/filters"
"github.com/cilium/hubble/pkg/logger"
Expand Down Expand Up @@ -59,11 +63,18 @@ func (o *IOReaderObserver) ServerStatus(_ context.Context, _ *observerpb.ServerS

// ioReaderClient implements Observer_GetFlowsClient.
type ioReaderClient struct {
grpc.ClientStream

scanner *bufio.Scanner
request *observerpb.GetFlowsRequest
allow filters.FilterFuncs
deny filters.FilterFuncs
grpc.ClientStream

// Used for --last
buffer *container.RingBuffer
resps []*observer.GetFlowsResponse
// Used for --first/--last
flowsReturned uint64
}

func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *observerpb.GetFlowsRequest) (*ioReaderClient, error) {
Expand All @@ -75,36 +86,111 @@ func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *obs
if err != nil {
return nil, err
}

var buf *container.RingBuffer
// last
if n := request.GetNumber(); !request.GetFirst() && n != 0 && n != math.MaxUint64 {
if n > 1_000_000 {
return nil, fmt.Errorf("--last must be <= 1_000_000, got %d", n)
}
buf = container.NewRingBuffer(int(n))
}
return &ioReaderClient{
scanner: scanner,
request: request,
allow: allow,
deny: deny,
buffer: buf,
}, nil
}

func (c *ioReaderClient) Recv() (*observerpb.GetFlowsResponse, error) {
if c.returnedEnoughFlows() {
return nil, io.EOF
}

for c.scanner.Scan() {
line := c.scanner.Text()
var res observerpb.GetFlowsResponse
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
if err != nil {
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
res := c.unmarshalNext()
if res == nil {
continue
}
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
continue
}
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
continue
}
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
continue

switch {
case c.isLast():
// store flows in a FIFO buffer, effectively keeping the last N flows
// until we finish reading from the stream
c.buffer.Add(res)
case c.isFirst():
// track number of flows returned, so we can exit once we've given back N flows
c.flowsReturned++
return res, nil
default: // --all
return res, nil
}
return &res, nil
}

if err := c.scanner.Err(); err != nil {
return nil, err
}

if res := c.popFromLastBuffer(); res != nil {
return res, nil
}

return nil, io.EOF
}

func (c *ioReaderClient) isFirst() bool {
return c.request.GetFirst() && c.request.GetNumber() != 0 && c.request.GetNumber() != math.MaxUint64
}

func (c *ioReaderClient) isLast() bool {
return c.buffer != nil && c.request.GetNumber() != math.MaxUint64
}

func (c *ioReaderClient) returnedEnoughFlows() bool {
return c.request.GetNumber() > 0 && c.flowsReturned >= c.request.GetNumber()
}

func (c *ioReaderClient) popFromLastBuffer() *observer.GetFlowsResponse {
// Handle --last by iterating over our FIFO and returning one item each time.
if c.isLast() {
if len(c.resps) == 0 {
// Iterate over the buffer and store them in a slice, because we cannot
// index into the ring buffer itself
// TODO: Add the ability to index into the ring buffer and we could avoid
// this copy.
c.buffer.Iterate(func(i interface{}) {
c.resps = append(c.resps, i.(*observer.GetFlowsResponse))
})
}

// return the next element from the buffered results
if len(c.resps) > int(c.flowsReturned) {
resp := c.resps[c.flowsReturned]
c.flowsReturned++
return resp
}
}
return nil
}

func (c *ioReaderClient) unmarshalNext() *observer.GetFlowsResponse {
var res observerpb.GetFlowsResponse
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
if err != nil {
line := c.scanner.Text()
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
return nil
}
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
return nil
}
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
return nil
}
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
return nil
}
return &res
}
77 changes: 77 additions & 0 deletions cmd/observe/io_reader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"testing"

"github.com/cilium/cilium/api/v1/flow"
flowpb "github.com/cilium/cilium/api/v1/flow"
observerpb "github.com/cilium/cilium/api/v1/observer"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -70,6 +71,82 @@ func Test_getFlowsTimeRange(t *testing.T) {
assert.Equal(t, io.EOF, err)
}

func Test_getFlowsLast(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_FORWARDED}},
Time: &timestamppb.Timestamp{Seconds: 0},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_DROPPED}},
Time: &timestamppb.Timestamp{Seconds: 100},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_ERROR}},
Time: &timestamppb.Timestamp{Seconds: 200},
},
}
var flowStrings []string
for _, f := range flows {
b, err := f.MarshalJSON()
assert.NoError(t, err)
flowStrings = append(flowStrings, string(b))
}
server := NewIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n"))
req := observerpb.GetFlowsRequest{
Number: 2,
First: false,
}
client, err := server.GetFlows(context.Background(), &req)
assert.NoError(t, err)
res, err := client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[1], res)
res, err = client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[2], res)
_, err = client.Recv()
assert.Equal(t, io.EOF, err)
}

func Test_getFlowsFirst(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_FORWARDED}},
Time: &timestamppb.Timestamp{Seconds: 0},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_DROPPED}},
Time: &timestamppb.Timestamp{Seconds: 100},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flow.Flow{Verdict: flow.Verdict_ERROR}},
Time: &timestamppb.Timestamp{Seconds: 200},
},
}
var flowStrings []string
for _, f := range flows {
b, err := f.MarshalJSON()
assert.NoError(t, err)
flowStrings = append(flowStrings, string(b))
}
server := NewIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n"))
req := observerpb.GetFlowsRequest{
Number: 2,
First: true,
}
client, err := server.GetFlows(context.Background(), &req)
assert.NoError(t, err)
res, err := client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[0], res)
res, err = client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[1], res)
_, err = client.Recv()
assert.Equal(t, io.EOF, err)
}

func Test_getFlowsFilter(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
Expand Down
Loading

0 comments on commit 7718220

Please sign in to comment.