From cdf2589b0d9c53f5a5c5383f700ca96b6d910ed6 Mon Sep 17 00:00:00 2001 From: Sam Lock Date: Tue, 16 Apr 2024 15:20:04 +0100 Subject: [PATCH] enhancement: Add audit log filtering to Hub backend (#2073) Add support to specify filters for certain Audit Log fields. The values are removed prior to exporting to the Hub backend. The following fields are included: * metadata * peer * method: `input(s)` and `output(s)` (Decision Logs only) We support a subset of JSONPath operations, as follows: - dot notation: `foo.bar.baz` - or bracket-notation: `['foo']['bar']['baz]` - or combinations thereof `bar` or `baz` above can be map keys, nested messages or structs. We support list indexing with Ints or wildcards: - foo.bar[0] - foo.bar[*] Wildcards can also operate on member names as a match-all. E.g `foo[*].baz` will match both `baz` values in the pseudo-object below: ``` { 'foo': { 'pow': { 'baz', }, 'bosh': { 'baz', }, } } ``` --------- Signed-off-by: Sam Lock Co-authored-by: Charith Ellawala --- .../partials/fullconfiguration.adoc | 10 + go.mod | 1 + go.sum | 2 + internal/audit/hub/conf.go | 11 +- internal/audit/hub/filter.go | 475 +++++++++++++++++ internal/audit/hub/filter_test.go | 488 ++++++++++++++++++ internal/audit/hub/hub.go | 88 ++-- internal/audit/hub/hub_test.go | 32 +- 8 files changed, 1073 insertions(+), 34 deletions(-) create mode 100644 internal/audit/hub/filter.go create mode 100644 internal/audit/hub/filter_test.go diff --git a/docs/modules/configuration/partials/fullconfiguration.adoc b/docs/modules/configuration/partials/fullconfiguration.adoc index 3813d78dc..0d76feb01 100644 --- a/docs/modules/configuration/partials/fullconfiguration.adoc +++ b/docs/modules/configuration/partials/fullconfiguration.adoc @@ -24,6 +24,16 @@ audit: flushInterval: 1s gcInterval: 60s maxBatchSize: 32 + mask: # Mask defines a list of attributes to exclude from the audit logs, specified as lists of JSONPaths + checkResources: + - inputs[*].principal.attr.foo + - inputs[*].auxData + - outputs + metadata: ['authorization'] + peer: + - address + - forwarded_for + planResources: ['input.principal.attr.nestedMap.foo'] retentionPeriod: 168h # How long to keep records for storagePath: /path/to/dir # Path to store the data kafka: diff --git a/go.mod b/go.mod index 6342ab6b7..ccb130f60 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/microsoft/go-mssqldb v1.7.0 github.com/minio/minio-go/v7 v7.0.69 github.com/nlepage/go-tarfs v1.2.1 + github.com/ohler55/ojg v1.21.4 github.com/oklog/ulid/v2 v2.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/ory/dockertest/v3 v3.10.0 diff --git a/go.sum b/go.sum index 117589abc..58939e283 100644 --- a/go.sum +++ b/go.sum @@ -579,6 +579,8 @@ github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJm github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nlepage/go-tarfs v1.2.1 h1:o37+JPA+ajllGKSPfy5+YpsNHDjZnAoyfvf5GsUa+Ks= github.com/nlepage/go-tarfs v1.2.1/go.mod h1:rno18mpMy9aEH1IiJVftFsqPyIpwqSUiAOpJYjlV2NA= +github.com/ohler55/ojg v1.21.4 h1:2iWyz/xExx0XySVIxR9kWFxIdsLNrpWLrKuAcs5aOZU= +github.com/ohler55/ojg v1.21.4/go.mod h1:gQhDVpQLqrmnd2eqGAvJtn+NfKoYJbe/A4Sj3/Vro4o= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= diff --git a/internal/audit/hub/conf.go b/internal/audit/hub/conf.go index 4d1514f73..487261c4b 100644 --- a/internal/audit/hub/conf.go +++ b/internal/audit/hub/conf.go @@ -32,7 +32,9 @@ var ( ) type Conf struct { - Ingest IngestConf `yaml:"ingest" conf:",ignore"` + Ingest IngestConf `yaml:"ingest" conf:",ignore"` + // Mask defines a list of attributes to exclude from the audit logs, specified as lists of JSONPaths + Mask MaskConf `yaml:"mask"` local.Conf `yaml:",inline"` } @@ -51,6 +53,13 @@ type IngestConf struct { NumGoRoutines uint `yaml:"numGoRoutines" conf:",example=8"` } +type MaskConf struct { + Peer []string `yaml:"peer" conf:",example=\n - address\n - forwarded_for"` + Metadata []string `yaml:"metadata" conf:",example=['authorization']"` + CheckResources []string `yaml:"checkResources" conf:",example=\n - inputs[*].principal.attr.foo\n - inputs[*].auxData\n - outputs"` + PlanResources []string `yaml:"planResources" conf:",example=['input.principal.attr.nestedMap.foo']"` +} + func (c *Conf) Key() string { return confKey } diff --git a/internal/audit/hub/filter.go b/internal/audit/hub/filter.go new file mode 100644 index 000000000..7b28ffdd0 --- /dev/null +++ b/internal/audit/hub/filter.go @@ -0,0 +1,475 @@ +// Copyright 2021-2024 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +package hub + +import ( + "errors" + "fmt" + "slices" + "strconv" + "unicode" + "unicode/utf8" + + logsv1 "github.com/cerbos/cloud-api/genpb/cerbos/cloud/logs/v1" + "go.uber.org/multierr" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/known/structpb" +) + +const ( + unionedPathPrefix = "entries[*][*]" + peerPrefix = unionedPathPrefix + ".peer" + metadataPrefix = unionedPathPrefix + ".metadata" + + checkResourcesDeprecatedPrefix = "entries[*].decisionLogEntry" + checkResourcesPrefix = "entries[*].decisionLogEntry.checkResources" + planResourcesPrefix = "entries[*].decisionLogEntry.planResources" +) + +type tokenType int8 + +const ( + tokenUnknown tokenType = iota + tokenAccessor + tokenIndex + tokenWildcard +) + +type Token struct { + val any + children map[string]*Token + typ tokenType +} + +func (t *Token) key() string { + switch t.typ { + case tokenAccessor: + return t.val.(string) //nolint:forcetypeassert + case tokenIndex: + return "[" + strconv.Itoa(t.val.(int)) + "]" //nolint:forcetypeassert + case tokenWildcard: + return "[*]" + default: + return "" + } +} + +type AuditLogFilter struct { + astRoot *Token +} + +func NewAuditLogFilter(conf MaskConf) (*AuditLogFilter, error) { + root, err := parseJSONPathExprs(conf) + if err != nil { + return nil, err + } + + return &AuditLogFilter{ + astRoot: root, + }, nil +} + +func parseJSONPathExprs(conf MaskConf) (ast *Token, outErr error) { + root := &Token{} + + parse := func(rule string) error { + if err := Tokenize(root, rule); err != nil { + return err + } + + return nil + } + + for _, r := range conf.Peer { + if err := parse(peerPrefix + "." + r); err != nil { + outErr = multierr.Append(outErr, err) + } + } + + for _, r := range conf.Metadata { + if err := parse(metadataPrefix + "." + r); err != nil { + outErr = multierr.Append(outErr, err) + } + } + + for _, r := range conf.CheckResources { + if err := parse(checkResourcesPrefix + "." + r); err != nil { + outErr = multierr.Append(outErr, err) + } + + if err := parse(checkResourcesDeprecatedPrefix + "." + r); err != nil { + outErr = multierr.Append(outErr, err) + } + } + + for _, r := range conf.PlanResources { + if err := parse(planResourcesPrefix + "." + r); err != nil { + outErr = multierr.Append(outErr, err) + } + } + + return root, outErr +} + +type state int8 + +const ( + stateUnknown state = iota + statePlainAccessor + stateParenOpen + stateWildcard + stateNumberOpen + stateStringOpen + stateStringClosed + stateClosed +) + +type tokenBuilder struct { + curToken *Token + buf string + size int + t tokenType + s state +} + +func (tb *tokenBuilder) WriteRune(r rune) error { + defer func() { + tb.size += utf8.RuneLen(r) + }() + + switch tb.size { + case 0: + switch { + case unicode.IsLetter(r): + tb.s = statePlainAccessor + tb.t = tokenAccessor + case r == '[': + tb.s = stateParenOpen + return nil + default: + return fmt.Errorf("invalid first character: %c", r) + } + case 1: + switch tb.s { + case statePlainAccessor: + case stateParenOpen: + switch { + case unicode.IsDigit(r): + tb.s = stateNumberOpen + tb.t = tokenIndex + case r == '*': + tb.s = stateWildcard + tb.t = tokenWildcard + return nil + case r == '\'': + tb.s = stateStringOpen + tb.t = tokenAccessor + return nil + default: + return fmt.Errorf("invalid character following '[': %c", r) + } + default: + return fmt.Errorf("unexpected state: %v", tb.s) + } + default: + switch tb.s { + case statePlainAccessor: + if !unicode.IsLetter(r) && !unicode.IsNumber(r) && r != '_' { + return fmt.Errorf("unexpected character for accessor: %c", r) + } + case stateNumberOpen: + switch { + case unicode.IsDigit(r): + case r == ']': + tb.s = stateClosed + return nil + default: + return fmt.Errorf("unexpected character in number: %c", r) + } + case stateWildcard: + if r != ']' { + return fmt.Errorf("expected ']' after '*', found: %c", r) + } + tb.s = stateClosed + return nil + case stateStringOpen: + if r == '\'' { + tb.s = stateStringClosed + return nil + } + case stateStringClosed: + if r != ']' { + return fmt.Errorf("expected ']' after string, found: %c", r) + } + tb.s = stateClosed + return nil + default: + return fmt.Errorf("unexpected state: %v", tb.s) + } + } + + tb.buf += string(r) + + return nil +} + +func (tb *tokenBuilder) Flush() error { + switch tb.s { + case stateClosed, statePlainAccessor: + case stateStringOpen: + return errors.New("invalid string not closed") + default: + return fmt.Errorf("flush called in an invalid state: %v", tb.s) + } + + if tb.size > 0 { + var value any + switch tb.t { //nolint:exhaustive + case tokenAccessor: + value = tb.buf + case tokenIndex: + idx, err := strconv.Atoi(tb.buf) + if err != nil { + return err + } + value = idx + } + + t := &Token{ + typ: tb.t, + val: value, + } + + if tb.curToken.children == nil { + tb.curToken.children = make(map[string]*Token) + } + + if cached, ok := tb.curToken.children[t.key()]; ok { + tb.curToken = cached + } else { + tb.curToken.children[t.key()] = t + tb.curToken = t + } + } + + tb.buf = "" + tb.t = tokenUnknown + tb.size = 0 + + return nil +} + +func Tokenize(root *Token, path string) error { + curs := 0 + b := &tokenBuilder{ + curToken: root, + } + + var prev rune + for curs < len(path) { + r, size := utf8.DecodeRuneInString(path[curs:]) + curs += size + + // handle and validate token boundaries + switch r { + case '.': + switch { + case curs == size: + return errors.New("invalid first character: '.'") + case curs == len(path): + return errors.New("invalid final character: '.'") + case prev == '.': + return errors.New("invalid empty token") + } + + if err := b.Flush(); err != nil { + return err + } + case '[': + if err := b.Flush(); err != nil { + return err + } + if err := b.WriteRune(r); err != nil { + return err + } + case ']': + if err := b.WriteRune(r); err != nil { + return err + } + if err := b.Flush(); err != nil { + return err + } + default: + if err := b.WriteRune(r); err != nil { + return err + } + } + + prev = r + } + + if err := b.Flush(); err != nil { + return err + } + + return nil +} + +func (f *AuditLogFilter) Filter(ingestBatch *logsv1.IngestBatch) error { + if f.astRoot == nil { + return nil + } + + ib := ingestBatch.ProtoReflect() + for _, c := range f.astRoot.children { + visit(c, ib) + } + + return nil +} + +// We support a subset of JSONPath operations, as follows: +// +// - dot notation: `foo.bar.baz` +// - or bracket-notation: `['foo']['bar']['baz]` +// - or combinations thereof +// +// `bar` or `baz` above can be map keys, nested messages or structs. +// +// We support list indexing with Ints or wildcards: +// - foo.bar[0] +// - foo.bar[*] +// +// Wildcards can also operate on member names as a match-all. E.g `foo[*].baz` +// will match both `baz` values in the pseudo-object below: +// +// { +// 'foo': { +// 'pow': { +// 'baz', +// }, +// 'bosh': { +// 'baz', +// }, +// } +// } +func visit(t *Token, m protoreflect.Message) { + if m.Type().Descriptor().FullName() == "google.protobuf.Value" { + visitStructpb(t, m.Interface().(*structpb.Value)) //nolint:forcetypeassert + return + } + + var fieldsToInspect []protoreflect.FieldDescriptor + switch t.typ { //nolint:exhaustive + case tokenAccessor: + fd := m.Descriptor().Fields().ByJSONName(t.val.(string)) //nolint:forcetypeassert + if fd == nil { + return + } else if t.children == nil { + if m.Has(fd) { + m.Clear(fd) + } + return + } + fieldsToInspect = []protoreflect.FieldDescriptor{fd} + case tokenWildcard: + fds := m.Descriptor().Fields() + fieldsToInspect = make([]protoreflect.FieldDescriptor, fds.Len()) + for i := 0; i < fds.Len(); i++ { + fieldsToInspect[i] = fds.Get(i) + } + } + + for _, fd := range fieldsToInspect { + v := m.Get(fd) + for _, c := range t.children { + switch { + case fd.IsMap(): + mapVal := v.Map() // apparently retrieving typed values each iteration causes no slow-down + k, ok := c.val.(string) + if !ok { + continue + } + mapKey := protoreflect.ValueOfString(k).MapKey() + mv := mapVal.Get(mapKey) + if mv.IsValid() { + if c.children == nil { + mapVal.Clear(mapKey) + continue + } + if fd.MapValue().Message() != nil { + visit(c, mv.Message()) + } + } + case fd.IsList(): + listVal := v.List() + handleArrayIndex := func(idx int) { + // For array indexes, reach ahead to the next token. + for _, nc := range c.children { + visit(nc, listVal.Get(idx).Message()) + } + } + + switch c.typ { //nolint:exhaustive + case tokenWildcard: + for i := 0; i < listVal.Len(); i++ { + handleArrayIndex(i) + } + case tokenIndex: + if idx := c.val.(int); idx < listVal.Len() { //nolint:forcetypeassert + handleArrayIndex(idx) + } + } + case fd.Message() != nil: + visit(c, v.Message()) + } + } + } +} + +func visitStructpb(t *Token, v *structpb.Value) { + for _, c := range t.children { + switch k := v.GetKind().(type) { + case *structpb.Value_StructValue: + name, ok := c.val.(string) + if !ok { + continue + } + if c.children == nil { + delete(k.StructValue.Fields, name) + continue + } + if fv, ok := k.StructValue.Fields[name]; ok { + visitStructpb(c, fv) + } + case *structpb.Value_ListValue: + switch c.typ { //nolint:exhaustive + case tokenWildcard: + if c.children == nil { + v = nil + continue + } + for i := 0; i < len(k.ListValue.Values); i++ { + visitStructpb(c, k.ListValue.Values[i]) + } + case tokenIndex: + idx, ok := c.val.(int) + if !ok { + continue + } + if c.children == nil { + if l := len(k.ListValue.Values); idx < l { + if l == 1 { + v = nil + } else { + k.ListValue.Values = slices.Delete(k.ListValue.Values, idx, idx+1) + } + } + continue + } + visitStructpb(c, k.ListValue.Values[idx]) + } + } + } +} diff --git a/internal/audit/hub/filter_test.go b/internal/audit/hub/filter_test.go new file mode 100644 index 000000000..673e27af0 --- /dev/null +++ b/internal/audit/hub/filter_test.go @@ -0,0 +1,488 @@ +// Copyright 2021-2024 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +//go:build !race +// +build !race + +package hub_test + +import ( + "errors" + "fmt" + "strings" + "testing" + "time" + + auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1" + effectv1 "github.com/cerbos/cerbos/api/genpb/cerbos/effect/v1" + enginev1 "github.com/cerbos/cerbos/api/genpb/cerbos/engine/v1" + "github.com/cerbos/cerbos/internal/audit/hub" + logsv1 "github.com/cerbos/cloud-api/genpb/cerbos/cloud/logs/v1" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestAuditLogFilter(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + t.Run("Tokenise", func(t *testing.T) { + tc := []struct { + path string + wantErr error + }{ + { + ".foo", + errors.New("invalid first character: '.'"), + }, + { + "foo.", + errors.New("invalid final character: '.'"), + }, + { + "foo..bar", + errors.New("invalid empty token"), + }, + { + "foo[bar].baz", + errors.New("invalid character following '[': b"), + }, + { + "foo['bar].baz", + errors.New("invalid string not closed"), + }, + { + "foo[\"bar\"]", + errors.New("invalid character following '[': \""), + }, + { + "foo'.baz", + errors.New("unexpected character for accessor: '"), + }, + { + "foo].baz", + errors.New("unexpected character for accessor: ]"), + }, + { + "'foo'.baz", + errors.New("invalid first character: '"), + }, + { + "foo[1r]", + errors.New("unexpected character in number: r"), + }, + } + + for _, c := range tc { + err := hub.Tokenize(&hub.Token{}, c.path) + if c.wantErr != nil { + require.Error(t, err, fmt.Sprintf("failed path: `%s`", c.path)) + require.Equal(t, c.wantErr.Error(), err.Error()) + } else { + require.NoError(t, err, fmt.Sprintf("failed path: `%s`", c.path)) + } + } + }) + + t.Run("Filter", func(t *testing.T) { + now := time.Now() + ts0 := timestamppb.New(now) + ts1 := timestamppb.New(now.Add(1 * time.Second)) + ts2 := timestamppb.New(now.Add(2 * time.Second)) + ts3 := timestamppb.New(now.Add(3 * time.Second)) + + maskConf := hub.MaskConf{ + Metadata: []string{"metadata_key_2"}, + Peer: []string{ + "address", + "forwardedFor", + }, + CheckResources: []string{ + "inputs[0].principal.id", + "inputs[0].principal.attr.attr1", + "inputs[*]['principal']['attr']['attr2']", + "inputs[*].principal.attr.someMap.nestedAttr1", + "inputs[*].principal.attr.someMap.someSomeMap.nestedNestedAttr1", + "inputs[*].principal.attr.someList[0]", + "outputs", + }, + PlanResources: []string{ + "['input']['principal']['attr']['someMap']['nestedAttr1']", + "input.principal.attr.someList[1]", + "output.filterDebug", + }, + } + + logEntries := []*logsv1.IngestBatch_Entry{ + { + Kind: logsv1.IngestBatch_ENTRY_KIND_ACCESS_LOG, + Timestamp: ts0, + Entry: &logsv1.IngestBatch_Entry_AccessLogEntry{ + AccessLogEntry: &auditv1.AccessLogEntry{ + CallId: "1", + Timestamp: ts0, + Peer: &auditv1.Peer{ + Address: "1.1.1.1", + UserAgent: "Mozilla/5.0", + ForwardedFor: "2.2.2.2", + }, + Metadata: map[string]*auditv1.MetaValues{ + "metadata_key_1": {Values: []string{"1"}}, + "metadata_key_2": {Values: []string{"2"}}, + "metadata_key_3": {Values: []string{"3"}}, + }, + Method: "/cerbos.svc.v1.CerbosService/Check", + }, + }, + }, + { + Kind: logsv1.IngestBatch_ENTRY_KIND_DECISION_LOG, + Timestamp: ts1, + Entry: &logsv1.IngestBatch_Entry_DecisionLogEntry{ + DecisionLogEntry: &auditv1.DecisionLogEntry{ + CallId: "2", + Timestamp: ts1, + Peer: &auditv1.Peer{ + Address: "1.1.1.1", + UserAgent: "curl/7.68.0", + }, + Metadata: map[string]*auditv1.MetaValues{ + "metadata_key_2": {Values: []string{"2"}}, + }, + Method: &auditv1.DecisionLogEntry_PlanResources_{ + PlanResources: &auditv1.DecisionLogEntry_PlanResources{ + Input: &enginev1.PlanResourcesInput{ + RequestId: "1", + Action: "a1", + Principal: &enginev1.Principal{ + Id: "test", + Roles: []string{"a", "b"}, + Attr: map[string]*structpb.Value{ + "attr1": structpb.NewNumberValue(1), + "attr2": structpb.NewNumberValue(2), + "attr3": structpb.NewNumberValue(3), + "someMap": structpb.NewStructValue(&structpb.Struct{ + Fields: map[string]*structpb.Value{ + "nestedAttr1": structpb.NewNumberValue(1), + "nestedAttr2": structpb.NewNumberValue(2), + }, + }), + "someList": structpb.NewListValue(&structpb.ListValue{ + Values: []*structpb.Value{ + structpb.NewStringValue("index0"), + structpb.NewStringValue("index1"), + }, + }), + }, + }, + Resource: &enginev1.PlanResourcesInput_Resource{ + Kind: "test:kind", + Attr: map[string]*structpb.Value{}, + }, + }, + Output: &enginev1.PlanResourcesOutput{ + RequestId: "1", + Action: "a1", + Kind: "test:kind", + Filter: &enginev1.PlanResourcesFilter{ + Kind: enginev1.PlanResourcesFilter_KIND_ALWAYS_ALLOWED, + }, + FilterDebug: "debug string", + }, + Error: "BOOM", + }, + }, + }, + }, + }, + // Old DecisionLog CheckResources schema with deprecated fields + { + Kind: logsv1.IngestBatch_ENTRY_KIND_DECISION_LOG, + Timestamp: ts2, + Entry: &logsv1.IngestBatch_Entry_DecisionLogEntry{ + DecisionLogEntry: &auditv1.DecisionLogEntry{ + CallId: "3", + Timestamp: ts2, + Peer: &auditv1.Peer{ + Address: "1.1.1.1", + UserAgent: "curl/7.68.0", + }, + Metadata: map[string]*auditv1.MetaValues{ + "metadata_key_1": {Values: []string{"1"}}, + }, + // Deprecated, but require backwards compatibility + Inputs: []*enginev1.CheckInput{ + { + RequestId: "1", + Resource: &enginev1.Resource{ + Kind: "test:kind", + Id: "test", + }, + Principal: &enginev1.Principal{ + Id: "test", + Roles: []string{"a", "b"}, + Attr: map[string]*structpb.Value{ + "attr1": structpb.NewNumberValue(1), + "attr2": structpb.NewNumberValue(2), + "attr3": structpb.NewNumberValue(3), + "someMap": structpb.NewStructValue(&structpb.Struct{ + Fields: map[string]*structpb.Value{ + "nestedAttr1": structpb.NewNumberValue(1), + "nestedAttr2": structpb.NewNumberValue(2), + "someSomeMap": structpb.NewStructValue(&structpb.Struct{ + Fields: map[string]*structpb.Value{ + "nestedNestedAttr1": structpb.NewNumberValue(1), + }, + }), + }, + }), + "someList": structpb.NewListValue(&structpb.ListValue{ + Values: []*structpb.Value{ + structpb.NewStringValue("index0"), + structpb.NewStringValue("index1"), + }, + }), + }, + }, + Actions: []string{"a1", "a2"}, + }, + { + RequestId: "2", + Resource: &enginev1.Resource{ + Kind: "test:kind", + Id: "test", + }, + Principal: &enginev1.Principal{ + Id: "test", + Roles: []string{"a", "b"}, + Attr: map[string]*structpb.Value{ + "attr1": structpb.NewNumberValue(1), + "attr2": structpb.NewNumberValue(2), + "attr3": structpb.NewNumberValue(3), + }, + }, + Actions: []string{"a1", "a2"}, + }, + }, + Outputs: []*enginev1.CheckOutput{ + { + RequestId: "1", + ResourceId: "test", + Actions: map[string]*enginev1.CheckOutput_ActionEffect{ + "a1": {Effect: effectv1.Effect_EFFECT_ALLOW, Policy: "resource.test.v1"}, + "a2": {Effect: effectv1.Effect_EFFECT_ALLOW, Policy: "resource.test.v1"}, + }, + }, + }, + Error: "BOOM", + }, + }, + }, + // New DecisionLog CheckResources schema + { + Kind: logsv1.IngestBatch_ENTRY_KIND_DECISION_LOG, + Timestamp: ts3, + Entry: &logsv1.IngestBatch_Entry_DecisionLogEntry{ + DecisionLogEntry: &auditv1.DecisionLogEntry{ + CallId: "4", + Timestamp: ts3, + Peer: &auditv1.Peer{ + Address: "1.1.1.1", + UserAgent: "curl/7.68.0", + }, + Metadata: map[string]*auditv1.MetaValues{ + "metadata_key_1": {Values: []string{"1"}}, + }, + Method: &auditv1.DecisionLogEntry_CheckResources_{ + CheckResources: &auditv1.DecisionLogEntry_CheckResources{ + Inputs: []*enginev1.CheckInput{ + { + RequestId: "1", + Resource: &enginev1.Resource{ + Kind: "test:kind", + Id: "test", + }, + Principal: &enginev1.Principal{ + Id: "test", + Roles: []string{"a", "b"}, + Attr: map[string]*structpb.Value{ + "attr1": structpb.NewNumberValue(1), + "attr2": structpb.NewNumberValue(2), + "attr3": structpb.NewNumberValue(3), + "someMap": structpb.NewStructValue(&structpb.Struct{ + Fields: map[string]*structpb.Value{ + "nestedAttr1": structpb.NewNumberValue(1), + "nestedAttr2": structpb.NewNumberValue(2), + "someSomeMap": structpb.NewStructValue(&structpb.Struct{ + Fields: map[string]*structpb.Value{ + "nestedNestedAttr1": structpb.NewNumberValue(1), + "nestedNestedAttr2": structpb.NewNumberValue(1), + }, + }), + }, + }), + "someList": structpb.NewListValue(&structpb.ListValue{ + Values: []*structpb.Value{ + structpb.NewStringValue("index0"), + structpb.NewStringValue("index1"), + }, + }), + }, + }, + Actions: []string{"a1", "a2"}, + }, + { + RequestId: "2", + Resource: &enginev1.Resource{ + Kind: "test:kind", + Id: "test", + }, + Principal: &enginev1.Principal{ + Id: "test", + Roles: []string{"a", "b"}, + Attr: map[string]*structpb.Value{ + "attr1": structpb.NewNumberValue(1), + "attr2": structpb.NewNumberValue(2), + "attr3": structpb.NewNumberValue(3), + }, + }, + Actions: []string{"a1", "a2"}, + }, + }, + Outputs: []*enginev1.CheckOutput{ + { + RequestId: "1", + ResourceId: "test", + Actions: map[string]*enginev1.CheckOutput_ActionEffect{ + "a1": {Effect: effectv1.Effect_EFFECT_ALLOW, Policy: "resource.test.v1"}, + "a2": {Effect: effectv1.Effect_EFFECT_ALLOW, Policy: "resource.test.v1"}, + }, + }, + }, + Error: "BOOM", + }, + }, + }, + }, + }, + } + + masker, err := hub.NewAuditLogFilter(maskConf) + // TODO(saml) assert that generated ast is as expected + require.NoError(t, err) + + wantRemoved := []string{ + "entries[0].access_log_entry.metadata.metadata_key_2", + "entries[0].access_log_entry.peer.address", + "entries[0].access_log_entry.peer.forwarded_for", + + "entries[1].decision_log_entry.metadata", // only one key existed and was removed + "entries[1].decision_log_entry.peer.address", + // we removed the first element by manipulating the slice, so the "changed" element is the removed, previously first one + "entries[1].decision_log_entry.plan_resources.input.principal.attr.someList[-1]", + "entries[1].decision_log_entry.plan_resources.input.principal.attr.someMap.nestedAttr1", + "entries[1].decision_log_entry.plan_resources.output.filter_debug", + + // Old CheckResources schema + "entries[2].decision_log_entry.inputs[0].principal.attr.attr1", + "entries[2].decision_log_entry.inputs[0].principal.attr.attr2", + // we removed the first element by manipulating the slice, so the "changed" element is the removed, previously first one + "entries[2].decision_log_entry.inputs[0].principal.attr.someList[-1]", + "entries[2].decision_log_entry.inputs[0].principal.attr.someMap.nestedAttr1", + "entries[2].decision_log_entry.inputs[0].principal.attr.someMap.someSomeMap", // sole key deletion results in entire map deletion + "entries[2].decision_log_entry.inputs[0].principal.id", + "entries[2].decision_log_entry.inputs[1].principal.attr.attr2", + "entries[2].decision_log_entry.outputs", + "entries[2].decision_log_entry.peer.address", + + // New CheckResources schema + "entries[3].decision_log_entry.check_resources.inputs[0].principal.attr.attr1", + "entries[3].decision_log_entry.check_resources.inputs[0].principal.attr.attr2", + // we removed the first element by manipulating the slice, so the "changed" element is the removed, previously first one + "entries[3].decision_log_entry.check_resources.inputs[0].principal.attr.someList[-1]", + "entries[3].decision_log_entry.check_resources.inputs[0].principal.attr.someMap.nestedAttr1", + "entries[3].decision_log_entry.check_resources.inputs[0].principal.attr.someMap.someSomeMap.nestedNestedAttr1", + "entries[3].decision_log_entry.check_resources.inputs[0].principal.id", + "entries[3].decision_log_entry.check_resources.inputs[1].principal.attr.attr2", + "entries[3].decision_log_entry.check_resources.outputs", + "entries[3].decision_log_entry.peer.address", + } + + ingestBatch := &logsv1.IngestBatch{ + Id: "1", + Entries: logEntries, + } + ingestBatchCopy := proto.Clone(ingestBatch).(*logsv1.IngestBatch) + err = masker.Filter(ingestBatch) + require.NoError(t, err) + + require.Len(t, ingestBatch.Entries, len(logEntries)) + + var r diffReporter + cmp.Equal(ingestBatchCopy, ingestBatch, protocmp.Transform(), cmp.Reporter(&r)) + require.Equal(t, strings.Join(wantRemoved, "\n"), r.String()) + + // `-1` only infers that an array item is missing. check the correct item is missing for each + l := ingestBatch.Entries[1].GetDecisionLogEntry().GetPlanResources().Input.Principal.Attr["someList"].GetListValue().Values + require.Len(t, l, 1) + require.Equal(t, l[0].GetStringValue(), "index0") // we removed the second entry + + l = ingestBatch.Entries[2].GetDecisionLogEntry().Inputs[0].Principal.Attr["someList"].GetListValue().Values + require.Len(t, l, 1) + require.Equal(t, l[0].GetStringValue(), "index1") + + l = ingestBatch.Entries[3].GetDecisionLogEntry().GetCheckResources().Inputs[0].Principal.Attr["someList"].GetListValue().Values + require.Len(t, l, 1) + require.Equal(t, l[0].GetStringValue(), "index1") + }) +} + +type diffReporter struct { + path cmp.Path + diffs []string +} + +func (r *diffReporter) PushStep(ps cmp.PathStep) { + r.path = append(r.path, ps) +} + +func (r *diffReporter) Report(rs cmp.Result) { + if !rs.Equal() { + vx, vy := r.path.Last().Values() + if vx.IsValid() && !vy.IsValid() { + var b strings.Builder + for _, step := range r.path { + switch p := step.(type) { + case cmp.MapIndex: + k := fmt.Sprintf("%v", p.Key()) + switch k { + case "list_value", "values", "string_value", "struct_value", "fields": + continue + } + + if b.Len() > 0 { + b.WriteString(".") + } + + b.WriteString(k) + case cmp.SliceIndex: + fmt.Fprintf(&b, "[%d]", p.Key()) + } + } + r.diffs = append(r.diffs, b.String()) + } + } +} + +func (r *diffReporter) PopStep() { + r.path = r.path[:len(r.path)-1] +} + +func (r *diffReporter) String() string { + return strings.Join(r.diffs, "\n") +} diff --git a/internal/audit/hub/hub.go b/internal/audit/hub/hub.go index afb6f4548..0e867e1fc 100644 --- a/internal/audit/hub/hub.go +++ b/internal/audit/hub/hub.go @@ -50,7 +50,13 @@ func init() { } type Log struct { + syncer IngestSyncer *local.Log + logger *zap.Logger + filter *AuditLogFilter + muTimer *mutexTimer + minFlushInterval, flushTimeout time.Duration + maxBatchSize, numGo int } func NewLog(conf *Conf, decisionFilter audit.DecisionLogEntryFilter, syncer IngestSyncer, logger *zap.Logger) (*Log, error) { @@ -66,13 +72,28 @@ func NewLog(conf *Conf, decisionFilter audit.DecisionLogEntryFilter, syncer Inge flushTimeout := conf.Ingest.FlushTimeout numGo := int(conf.Ingest.NumGoRoutines) + filter, err := NewAuditLogFilter(conf.Mask) + if err != nil { + return nil, err + } + + log := &Log{ + Log: localLog, + syncer: syncer, + logger: logger, + filter: filter, + minFlushInterval: minFlushInterval, + flushTimeout: flushTimeout, + maxBatchSize: maxBatchSize, + numGo: numGo, + muTimer: newMutexTimer(), + } + localLog.RegisterCallback(func(waitCh chan<- struct{}) { - schedule(localLog.Db, newMutexTimer(), syncer, minFlushInterval, flushTimeout, maxBatchSize, numGo, logger, waitCh) + log.schedule(waitCh) }) - return &Log{ - localLog, - }, nil + return log, nil } func (l *Log) WriteAccessLogEntry(ctx context.Context, record audit.AccessLogEntryMaker) error { @@ -162,49 +183,49 @@ func (mt *mutexTimer) wait() { <-mt.expireCh } -func schedule(db *badgerv4.DB, muTimer *mutexTimer, syncer IngestSyncer, minFlushInterval, flushTimeout time.Duration, maxBatchSize, numGo int, logger *zap.Logger, waitCh chan<- struct{}) { - muTimer.wait() +func (l *Log) schedule(waitCh chan<- struct{}) { + l.muTimer.wait() - if err := streamLogs(db, syncer, maxBatchSize, numGo, flushTimeout); err != nil { + if err := l.streamLogs(); err != nil { var ingestErr ErrIngestBackoff if errors.As(err, &ingestErr) { - logger.Warn("svc-ingest issued backoff", zap.Error(err)) - muTimer.set(ingestErr.Backoff) - go schedule(db, muTimer, syncer, minFlushInterval, flushTimeout, maxBatchSize, numGo, logger, waitCh) + l.logger.Warn("svc-ingest issued backoff", zap.Error(err)) + l.muTimer.set(ingestErr.Backoff) + go l.schedule(waitCh) return } - logger.Warn("Failed sync", zap.Error(err)) + l.logger.Warn("Failed sync", zap.Error(err)) } // Set a min wait duration regardless of if events are pending. // This prevents a retry completion occurring immediately before the next sync // (and therefore burdening the backend). - muTimer.set(minFlushInterval) + l.muTimer.set(l.minFlushInterval) waitCh <- struct{}{} } -func streamLogs(db *badgerv4.DB, syncer IngestSyncer, maxBatchSize, numGo int, flushTimeout time.Duration) error { +func (l *Log) streamLogs() error { // We use two streams: one for access logs, and one for decision logs, as this allows us to // avoid the penalty of per-key string inspection when inferring the type down the line. ctx := context.Background() p := pool.New().WithContext(ctx).WithCancelOnError().WithFirstError() p.Go(func(ctx context.Context) error { - return streamPrefix(ctx, db, syncer, maxBatchSize, numGo, flushTimeout, logsv1.IngestBatch_ENTRY_KIND_ACCESS_LOG, AccessSyncPrefix) + return l.streamPrefix(ctx, logsv1.IngestBatch_ENTRY_KIND_ACCESS_LOG, AccessSyncPrefix) }) p.Go(func(ctx context.Context) error { - return streamPrefix(ctx, db, syncer, maxBatchSize, numGo, flushTimeout, logsv1.IngestBatch_ENTRY_KIND_DECISION_LOG, DecisionSyncPrefix) + return l.streamPrefix(ctx, logsv1.IngestBatch_ENTRY_KIND_DECISION_LOG, DecisionSyncPrefix) }) return p.Wait() } -func streamPrefix(ctx context.Context, db *badgerv4.DB, syncer IngestSyncer, maxBatchSize, numGo int, flushTimeout time.Duration, kind logsv1.IngestBatch_EntryKind, prefix []byte) error { +func (l *Log) streamPrefix(ctx context.Context, kind logsv1.IngestBatch_EntryKind, prefix []byte) error { // BadgerDB transactions work with snapshot isolation so we only take a view of the DB. // Subsequent writes aren't blocked. - stream := db.NewStream() - stream.NumGo = numGo + stream := l.Db.NewStream() + stream.NumGo = l.numGo stream.Prefix = prefix stream.Send = func(buf *z.Buffer) error { @@ -218,13 +239,13 @@ func streamPrefix(ctx context.Context, db *badgerv4.DB, syncer IngestSyncer, max keys[i] = kv.Key } - for i := 0; i < len(keys); i += maxBatchSize { - end := i + maxBatchSize + for i := 0; i < len(keys); i += l.maxBatchSize { + end := i + l.maxBatchSize if end > len(keys) { end = len(keys) } - if err := syncThenDelete(ctx, db, syncer, kind, keys[i:end], flushTimeout); err != nil { + if err := l.syncThenDelete(ctx, kind, keys[i:end]); err != nil { return err } } @@ -235,8 +256,8 @@ func streamPrefix(ctx context.Context, db *badgerv4.DB, syncer IngestSyncer, max return stream.Orchestrate(ctx) } -func syncThenDelete(ctx context.Context, db *badgerv4.DB, syncer IngestSyncer, kind logsv1.IngestBatch_EntryKind, syncKeys [][]byte, flushTimeout time.Duration) error { - entries, err := getIngestBatchEntries(db, syncKeys, kind) +func (l *Log) syncThenDelete(ctx context.Context, kind logsv1.IngestBatch_EntryKind, syncKeys [][]byte) error { + entries, err := l.getIngestBatchEntries(syncKeys, kind) if err != nil { return err } @@ -247,24 +268,33 @@ func syncThenDelete(ctx context.Context, db *badgerv4.DB, syncer IngestSyncer, k } { - ctx, cancelFn := context.WithTimeout(ctx, flushTimeout) + ctx, cancelFn := context.WithTimeout(ctx, l.flushTimeout) defer cancelFn() - if err := syncer.Sync(ctx, &logsv1.IngestBatch{ + ingestBatch := &logsv1.IngestBatch{ + Id: string(batchID), + Entries: entries, + } + + if err := l.filter.Filter(&logsv1.IngestBatch{ Id: string(batchID), Entries: entries, }); err != nil { return err } + + if err := l.syncer.Sync(ctx, ingestBatch); err != nil { + return err + } } - wb := db.NewWriteBatch() + wb := l.Db.NewWriteBatch() defer wb.Cancel() for _, k := range syncKeys { if err := wb.Delete(k); err != nil { if errors.Is(err, badgerv4.ErrDiscardedTxn) { - wb = db.NewWriteBatch() + wb = l.Db.NewWriteBatch() _ = wb.Delete(k) } else { return err @@ -275,9 +305,9 @@ func syncThenDelete(ctx context.Context, db *badgerv4.DB, syncer IngestSyncer, k return wb.Flush() } -func getIngestBatchEntries(db *badgerv4.DB, syncKeys [][]byte, kind logsv1.IngestBatch_EntryKind) ([]*logsv1.IngestBatch_Entry, error) { +func (l *Log) getIngestBatchEntries(syncKeys [][]byte, kind logsv1.IngestBatch_EntryKind) ([]*logsv1.IngestBatch_Entry, error) { entries := make([]*logsv1.IngestBatch_Entry, len(syncKeys)) - if err := db.Update(func(txn *badgerv4.Txn) error { + if err := l.Db.Update(func(txn *badgerv4.Txn) error { for i, k := range syncKeys { syncItem, err := txn.Get(k) if err != nil { diff --git a/internal/audit/hub/hub_test.go b/internal/audit/hub/hub_test.go index e9c04dfec..6f8b5c790 100644 --- a/internal/audit/hub/hub_test.go +++ b/internal/audit/hub/hub_test.go @@ -39,9 +39,10 @@ const ( type mockSyncer struct { *mocks.IngestSyncer - synced map[string]struct{} - t *testing.T - mu sync.RWMutex + entries []*logsv1.IngestBatch_Entry + synced map[string]struct{} + t *testing.T + mu sync.RWMutex } func newMockSyncer(t *testing.T) *mockSyncer { @@ -49,6 +50,7 @@ func newMockSyncer(t *testing.T) *mockSyncer { return &mockSyncer{ IngestSyncer: mocks.NewIngestSyncer(t), + entries: []*logsv1.IngestBatch_Entry{}, synced: make(map[string]struct{}), t: t, } @@ -62,6 +64,8 @@ func (m *mockSyncer) Sync(ctx context.Context, batch *logsv1.IngestBatch) error m.mu.Lock() defer m.mu.Unlock() + m.entries = append(m.entries, batch.Entries...) + for _, e := range batch.Entries { var key []byte switch e.Kind { @@ -115,6 +119,9 @@ func TestHubLog(t *testing.T) { FlushTimeout: 1 * time.Second, NumGoRoutines: 8, }, + hub.MaskConf{ + Peer: []string{"address"}, + }, local.Conf{ StoragePath: t.TempDir(), RetentionPeriod: 24 * time.Hour, @@ -143,6 +150,9 @@ func TestHubLog(t *testing.T) { purgeKeys := func() { err := db.Db.DropAll() require.NoError(t, err, "failed to purge keys") + + syncer.entries = []*logsv1.IngestBatch_Entry{} + syncer.synced = make(map[string]struct{}) } getLocalKeys := func() [][]byte { @@ -171,7 +181,7 @@ func TestHubLog(t *testing.T) { // we treat each batch separately (hence the /2 -> *2 below). wantNumBatches := int(math.Ceil((numRecords/2)/float64(batchSize))) * 2 - t.Run("insertsAndDeletesKeys", func(t *testing.T) { + t.Run("insertsDeletesKeys", func(t *testing.T) { t.Cleanup(purgeKeys) loadedKeys := loadData(t, db, startDate) @@ -182,6 +192,17 @@ func TestHubLog(t *testing.T) { require.True(t, syncer.hasKeys(loadedKeys), "keys should have been synced") require.Empty(t, getLocalKeys(), "keys should have been deleted") + + t.Run("filter", func(t *testing.T) { + for _, e := range syncer.entries { + switch v := e.Entry.(type) { + case *logsv1.IngestBatch_Entry_AccessLogEntry: + require.Zero(t, v.AccessLogEntry.Peer.Address) + case *logsv1.IngestBatch_Entry_DecisionLogEntry: + require.Zero(t, v.DecisionLogEntry.Peer.Address) + } + } + }) }) t.Run("partiallyDeletesBeforeError", func(t *testing.T) { @@ -273,6 +294,9 @@ func mkDecisionLogEntry(t *testing.T, id audit.ID, i int, ts time.Time) audit.De return &auditv1.DecisionLogEntry{ CallId: string(id), Timestamp: timestamppb.New(ts), + Peer: &auditv1.Peer{ + Address: "1.1.1.1", + }, Inputs: []*enginev1.CheckInput{ { RequestId: strconv.Itoa(i),