Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental-field-mask flags #1101

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/observe/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -549,6 +550,7 @@ func handleFlowArgs(writer io.Writer, ofilter *flowFilter, debug bool) (err erro
hubprinter.WithColor(formattingOpts.color),
}

jsonOut := false
switch formattingOpts.output {
case "compact":
opts = append(opts, hubprinter.Compact())
Expand All @@ -562,6 +564,7 @@ func handleFlowArgs(writer io.Writer, ofilter *flowFilter, debug bool) (err erro
fallthrough
case "jsonpb":
opts = append(opts, hubprinter.JSONPB())
jsonOut = true
case "tab", "table":
if selectorOpts.follow {
return fmt.Errorf("table output format is not compatible with follow mode")
Expand All @@ -570,6 +573,15 @@ func handleFlowArgs(writer io.Writer, ofilter *flowFilter, debug bool) (err erro
default:
return fmt.Errorf("invalid output format: %s", formattingOpts.output)
}
if !jsonOut {
if len(experimentalOpts.fieldMask) > 0 {
return fmt.Errorf("%s output format is not compatible with custom field mask", formattingOpts.output)
}
if experimentalOpts.useDefaultMasks {
experimentalOpts.fieldMask = defaults.FieldMask
}
}

if otherOpts.ignoreStderr {
opts = append(opts, hubprinter.IgnoreStderr())
}
Expand Down Expand Up @@ -701,6 +713,16 @@ func getFlowsRequest(ofilter *flowFilter, allowlist []string, denylist []string)
First: first,
}

if len(experimentalOpts.fieldMask) > 0 {
fm, err := fieldmaskpb.New(&flowpb.Flow{}, experimentalOpts.fieldMask...)
if err != nil {
return nil, fmt.Errorf("failed to construct field mask: %w", err)
}
req.Experimental = &observerpb.GetFlowsRequest_Experimental{
FieldMask: fm,
}
}

return req, nil
}

Expand Down
49 changes: 49 additions & 0 deletions cmd/observe/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package observe
import (
"encoding/json"
"fmt"
"os"
"strings"
"testing"
"time"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/cilium/hubble/pkg/defaults"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -125,3 +127,50 @@ denylist:
assert.NoError(t, err)
assert.Equal(t, expected, out)
}

func Test_getFlowsRequest_ExperimentalFieldMask_valid(t *testing.T) {
selectorOpts.until = ""
experimentalOpts.fieldMask = []string{"time", "verdict"}
filter := newFlowFilter()
req, err := getFlowsRequest(filter, nil, nil)
assert.NoError(t, err)
assert.Equal(t, &observerpb.GetFlowsRequest{
Number: 20,
Experimental: &observerpb.GetFlowsRequest_Experimental{
FieldMask: &fieldmaskpb.FieldMask{Paths: []string{"time", "verdict"}},
},
}, req)
}

func Test_getFlowsRequest_ExperimentalFieldMask_invalid(t *testing.T) {
experimentalOpts.fieldMask = []string{"time", "verdict", "invalid-field"}
filter := newFlowFilter()
_, err := getFlowsRequest(filter, nil, nil)
assert.ErrorContains(t, err, "invalid-field")
}

func Test_getFlowsRequest_ExperimentalUseDefaultFieldMask(t *testing.T) {
selectorOpts.until = ""
formattingOpts.output = "dict"
experimentalOpts.fieldMask = nil
experimentalOpts.useDefaultMasks = true
filter := newFlowFilter()
require.NoError(t, handleFlowArgs(os.Stdout, filter, false))
req, err := getFlowsRequest(filter, nil, nil)
assert.NoError(t, err)
assert.Equal(t, &observerpb.GetFlowsRequest{
Number: 20,
Experimental: &observerpb.GetFlowsRequest_Experimental{
FieldMask: &fieldmaskpb.FieldMask{Paths: defaults.FieldMask},
},
}, req)
}

func Test_getFlowsRequest_ExperimentalFieldMask_non_json_output(t *testing.T) {
selectorOpts.until = ""
formattingOpts.output = "compact"
experimentalOpts.fieldMask = []string{"time", "verdict"}
filter := newFlowFilter()
err := handleFlowArgs(os.Stdout, filter, false)
assert.ErrorContains(t, err, "not compatible")
}
11 changes: 11 additions & 0 deletions cmd/observe/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ var (
inputFile string
}

experimentalOpts struct {
fieldMask []string
useDefaultMasks bool
}

printer *hubprinter.Printer

// selector flags
Expand Down Expand Up @@ -150,6 +155,12 @@ func init() {

otherFlags.StringVar(&otherOpts.inputFile, "input-file", "",
"Query flows from this file instead of the server. Use '-' to read from stdin.")

otherFlags.StringSliceVar(&experimentalOpts.fieldMask, "experimental-field-mask", nil,
"Experimental: Comma-separated list of fields for mask. Fields not in the mask will be removed from server response.")

otherFlags.BoolVar(&experimentalOpts.useDefaultMasks, "experimental-use-default-field-masks", false,
"Experimental: request only visible fields when the output format is compact, tab, or dict.")
}

// New observer command.
Expand Down
8 changes: 5 additions & 3 deletions cmd/observe_help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ Server Flags:
--tls-server-name string Specify a server name to verify the hostname on the returned certificate (eg: 'instance.hubble-relay.cilium.io').

Other Flags:
--input-file string Query flows from this file instead of the server. Use '-' to read from stdin.
--print-raw-filters Print allowlist/denylist filters and exit without sending the request to Hubble server
-s, --silent-errors Silently ignores errors and warnings
--experimental-field-mask strings Experimental: Comma-separated list of fields for mask. Fields not in the mask will be removed from server response.
--experimental-use-default-field-masks Experimental: request only visible fields when the output format is compact, tab, or dict.
--input-file string Query flows from this file instead of the server. Use '-' to read from stdin.
--print-raw-filters Print allowlist/denylist filters and exit without sending the request to Hubble server
-s, --silent-errors Silently ignores errors and warnings

Global Flags:
--config string Optional config file (default "%s")
Expand Down
4 changes: 4 additions & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var (
// ConfigFile is the path to an optional configuration file.
// It may be unset.
ConfigFile string

// FieldMask is a list of requested fields when using "dict", "tab", or "compact"
// output format and no custom mask is specified.
FieldMask = []string{"time", "source.identity", "source.namespace", "source.pod_name", "destination.identity", "destination.namespace", "destination.pod_name", "source_service", "destination_service", "l4", "IP", "ethernet", "l7", "Type", "node_name", "is_reply", "event_type", "verdict", "Summary"}
)

func init() {
Expand Down
92 changes: 60 additions & 32 deletions pkg/printer/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,76 @@ import (
flowpb "github.com/cilium/cilium/api/v1/flow"
observerpb "github.com/cilium/cilium/api/v1/observer"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/hubble/pkg/defaults"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func TestPrinter_WriteProtoFlow(t *testing.T) {
buf := bytes.Buffer{}
f := flowpb.Flow{
Time: &timestamppb.Timestamp{
Seconds: 1234,
Nanos: 567800000,
},
Type: flowpb.FlowType_L3_L4,
NodeName: "k8s1",
Verdict: flowpb.Verdict_DROPPED,
IP: &flowpb.IP{
Source: "1.1.1.1",
Destination: "2.2.2.2",
},
Source: &flowpb.Endpoint{
Identity: 4,
},
Destination: &flowpb.Endpoint{
Identity: 12345,
},
L4: &flowpb.Layer4{
Protocol: &flowpb.Layer4_TCP{
TCP: &flowpb.TCP{
SourcePort: 31793,
DestinationPort: 8080,
},
var f = flowpb.Flow{
Time: &timestamppb.Timestamp{
Seconds: 1234,
Nanos: 567800000,
},
Type: flowpb.FlowType_L3_L4,
NodeName: "k8s1",
Verdict: flowpb.Verdict_DROPPED,
IP: &flowpb.IP{
Source: "1.1.1.1",
Destination: "2.2.2.2",
},
Source: &flowpb.Endpoint{
Identity: 4,
},
Destination: &flowpb.Endpoint{
Identity: 12345,
},
L4: &flowpb.Layer4{
Protocol: &flowpb.Layer4_TCP{
TCP: &flowpb.TCP{
SourcePort: 31793,
DestinationPort: 8080,
},
},
EventType: &flowpb.CiliumEventType{
Type: monitorAPI.MessageTypeDrop,
SubType: 133,
},
Summary: "TCP Flags: SYN",
IsReply: &wrapperspb.BoolValue{Value: false},
},
EventType: &flowpb.CiliumEventType{
Type: monitorAPI.MessageTypeDrop,
SubType: 133,
},
Summary: "TCP Flags: SYN",
IsReply: &wrapperspb.BoolValue{Value: false},
}

func TestPrinter_AllFieldsInMask(t *testing.T) {
fm := make(map[string]bool)
for _, field := range defaults.FieldMask {
fm[field] = true
}
check := func(msg protoreflect.Message, prefix string) {
fds := msg.Descriptor().Fields()
for i := 0; i < fds.Len(); i++ {
fd := fds.Get(i)
if !msg.Has(fd) {
continue
}
name := prefix + string(fd.Name())
if name == "source" || name == "destination" {
// Skip compound fields.
continue
}
assert.True(t, fm[name], name)
}
}
check(f.ProtoReflect(), "")
check(f.Source.ProtoReflect(), "source.")
check(f.Destination.ProtoReflect(), "destination.")
}

func TestPrinter_WriteProtoFlow(t *testing.T) {
buf := bytes.Buffer{}
reply := proto.Clone(&f).(*flowpb.Flow)
reply.IsReply = &wrapperspb.BoolValue{Value: true}
unknown := proto.Clone(&f).(*flowpb.Flow)
Expand Down