Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle --first/--last when reading flows from a stdin #958

Merged
merged 1 commit into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/observe/agent_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"os/signal"

Expand Down Expand Up @@ -102,7 +103,7 @@ func getAgentEventsRequest() (*observerpb.GetAgentEventsRequest, error) {
switch {
case selectorOpts.all:
// all is an alias for last=uint64_max
selectorOpts.last = ^uint64(0)
selectorOpts.last = math.MaxUint64
case selectorOpts.last == 0:
// no specific parameters were provided, just a vanilla `hubble events agent`
selectorOpts.last = defaults.EventsPrintCount
Expand Down
115 changes: 100 additions & 15 deletions cmd/observe/io_reader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package observe
import (
"bufio"
"context"
"fmt"
"io"
"math"

observerpb "github.com/cilium/cilium/api/v1/observer"
"github.com/cilium/cilium/pkg/container"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/cilium/cilium/pkg/hubble/filters"
"github.com/cilium/hubble/pkg/logger"
Expand Down Expand Up @@ -59,11 +62,18 @@ func (o *IOReaderObserver) ServerStatus(_ context.Context, _ *observerpb.ServerS

// ioReaderClient implements Observer_GetFlowsClient.
type ioReaderClient struct {
grpc.ClientStream

scanner *bufio.Scanner
request *observerpb.GetFlowsRequest
allow filters.FilterFuncs
deny filters.FilterFuncs
grpc.ClientStream

// Used for --last
buffer *container.RingBuffer
resps []*observerpb.GetFlowsResponse
// Used for --first/--last
flowsReturned uint64
}

func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *observerpb.GetFlowsRequest) (*ioReaderClient, error) {
Expand All @@ -75,36 +85,111 @@ func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *obs
if err != nil {
return nil, err
}

var buf *container.RingBuffer
// last
if n := request.GetNumber(); !request.GetFirst() && n != 0 && n != math.MaxUint64 {
if n > 1_000_000 {
return nil, fmt.Errorf("--last must be <= 1_000_000, got %d", n)
}
buf = container.NewRingBuffer(int(n))
}
return &ioReaderClient{
scanner: scanner,
request: request,
allow: allow,
deny: deny,
buffer: buf,
}, nil
}

func (c *ioReaderClient) Recv() (*observerpb.GetFlowsResponse, error) {
if c.returnedEnoughFlows() {
return nil, io.EOF
}

for c.scanner.Scan() {
line := c.scanner.Text()
var res observerpb.GetFlowsResponse
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
if err != nil {
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
res := c.unmarshalNext()
if res == nil {
continue
}
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
continue
}
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
continue
}
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
continue

switch {
case c.isLast():
// store flows in a FIFO buffer, effectively keeping the last N flows
// until we finish reading from the stream
c.buffer.Add(res)
case c.isFirst():
// track number of flows returned, so we can exit once we've given back N flows
c.flowsReturned++
return res, nil
default: // --all
return res, nil
}
return &res, nil
}

if err := c.scanner.Err(); err != nil {
return nil, err
}

if res := c.popFromLastBuffer(); res != nil {
return res, nil
}

return nil, io.EOF
}

func (c *ioReaderClient) isFirst() bool {
return c.request.GetFirst() && c.request.GetNumber() != 0 && c.request.GetNumber() != math.MaxUint64
}

func (c *ioReaderClient) isLast() bool {
return c.buffer != nil && c.request.GetNumber() != math.MaxUint64
}

func (c *ioReaderClient) returnedEnoughFlows() bool {
return c.request.GetNumber() > 0 && c.flowsReturned >= c.request.GetNumber()
}

func (c *ioReaderClient) popFromLastBuffer() *observerpb.GetFlowsResponse {
// Handle --last by iterating over our FIFO and returning one item each time.
if c.isLast() {
if len(c.resps) == 0 {
// Iterate over the buffer and store them in a slice, because we cannot
// index into the ring buffer itself
// TODO: Add the ability to index into the ring buffer and we could avoid
// this copy.
c.buffer.Iterate(func(i interface{}) {
c.resps = append(c.resps, i.(*observerpb.GetFlowsResponse))
})
}

// return the next element from the buffered results
if len(c.resps) > int(c.flowsReturned) {
resp := c.resps[c.flowsReturned]
c.flowsReturned++
return resp
}
}
return nil
}

func (c *ioReaderClient) unmarshalNext() *observerpb.GetFlowsResponse {
var res observerpb.GetFlowsResponse
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
if err != nil {
line := c.scanner.Text()
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
return nil
}
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
return nil
}
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
return nil
}
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
return nil
}
return &res
}
76 changes: 76 additions & 0 deletions cmd/observe/io_reader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,82 @@ func Test_getFlowsTimeRange(t *testing.T) {
assert.Equal(t, io.EOF, err)
}

func Test_getFlowsLast(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}},
Time: &timestamppb.Timestamp{Seconds: 0},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}},
Time: &timestamppb.Timestamp{Seconds: 100},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_ERROR}},
Time: &timestamppb.Timestamp{Seconds: 200},
},
}
var flowStrings []string
for _, f := range flows {
b, err := f.MarshalJSON()
assert.NoError(t, err)
flowStrings = append(flowStrings, string(b))
}
server := NewIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n"))
req := observerpb.GetFlowsRequest{
Number: 2,
First: false,
}
client, err := server.GetFlows(context.Background(), &req)
assert.NoError(t, err)
res, err := client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[1], res)
res, err = client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[2], res)
_, err = client.Recv()
assert.Equal(t, io.EOF, err)
}

func Test_getFlowsFirst(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}},
Time: &timestamppb.Timestamp{Seconds: 0},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}},
Time: &timestamppb.Timestamp{Seconds: 100},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_ERROR}},
Time: &timestamppb.Timestamp{Seconds: 200},
},
}
var flowStrings []string
for _, f := range flows {
b, err := f.MarshalJSON()
assert.NoError(t, err)
flowStrings = append(flowStrings, string(b))
}
server := NewIOReaderObserver(strings.NewReader(strings.Join(flowStrings, "\n") + "\n"))
req := observerpb.GetFlowsRequest{
Number: 2,
First: true,
}
client, err := server.GetFlows(context.Background(), &req)
assert.NoError(t, err)
res, err := client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[0], res)
res, err = client.Recv()
assert.NoError(t, err)
assert.Equal(t, flows[1], res)
_, err = client.Recv()
assert.Equal(t, io.EOF, err)
}

func Test_getFlowsFilter(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
Expand Down
Loading