Skip to content

Commit

Permalink
chore: Fix schema cache and blob store timeout (#502)
Browse files Browse the repository at this point in the history
- Blob store clone was always failing due to a potential issue with the
  upstream library
- Schema cache was using the wrong key and not updating itself correctly
  while processing storage events

Signed-off-by: Charith Ellawala <charith@cerbos.dev>
  • Loading branch information
charithe committed Dec 14, 2021
1 parent 34a7a85 commit 60bd517
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 45 deletions.
17 changes: 0 additions & 17 deletions internal/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package policy
import (
"fmt"

jsonschema "github.com/santhosh-tekuri/jsonschema/v5"

policyv1 "github.com/cerbos/cerbos/api/genpb/cerbos/policy/v1"
"github.com/cerbos/cerbos/internal/namer"
)
Expand Down Expand Up @@ -180,7 +178,6 @@ func Wrap(p *policyv1.Policy) Wrapper {
type CompilationUnit struct {
ModID namer.ModuleID
Definitions map[namer.ModuleID]*policyv1.Policy
Schemas map[string]*jsonschema.Schema
}

func (cu *CompilationUnit) AddDefinition(id namer.ModuleID, p *policyv1.Policy) {
Expand All @@ -191,20 +188,6 @@ func (cu *CompilationUnit) AddDefinition(id namer.ModuleID, p *policyv1.Policy)
cu.Definitions[id] = p
}

func (cu *CompilationUnit) AddSchemas(schemas map[string]*jsonschema.Schema) {
if len(schemas) == 0 {
return
}

if cu.Schemas == nil {
cu.Schemas = make(map[string]*jsonschema.Schema, len(schemas))
}

for id, def := range schemas {
cu.Schemas[id] = def
}
}

func (cu *CompilationUnit) MainSourceFile() string {
return GetSourceFile(cu.Definitions[cu.ModID])
}
Expand Down
4 changes: 2 additions & 2 deletions internal/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ func (m *manager) OnStorageEvent(events ...storage.Event) {
//nolint:exhaustive
switch event.Kind {
case storage.EventAddOrUpdateSchema:
cacheKey := fmt.Sprintf("%s/%s", URLScheme, event.SchemaFile)
cacheKey := fmt.Sprintf("%s:///%s", URLScheme, event.SchemaFile)
_ = m.cache.Remove(cacheKey)
m.log.Debug("Handled schema add/update event", zap.String("schema", cacheKey))
case storage.EventDeleteSchema:
cacheKey := fmt.Sprintf("%s/%s", URLScheme, event.SchemaFile)
cacheKey := fmt.Sprintf("%s:///%s", URLScheme, event.SchemaFile)
_ = m.cache.Remove(cacheKey)
m.log.Warn("Handled schema delete event", zap.String("schema", cacheKey))
}
Expand Down
65 changes: 41 additions & 24 deletions internal/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"

policyv1 "github.com/cerbos/cerbos/api/genpb/cerbos/policy/v1"
privatev1 "github.com/cerbos/cerbos/api/genpb/cerbos/private/v1"
schemav1 "github.com/cerbos/cerbos/api/genpb/cerbos/schema/v1"
"github.com/cerbos/cerbos/internal/schema"
Expand Down Expand Up @@ -144,40 +143,58 @@ func TestValidate(t *testing.T) {
}

func TestCache(t *testing.T) {
store := mkStore(t)
fsDir := test.PathToDir(t, filepath.Join("schema", "fs"))
fsys := afero.NewCopyOnWriteFs(afero.FromIOFS{FS: os.DirFS(fsDir)}, afero.NewMemMapFs())

index, err := index.Build(context.Background(), afero.NewIOFS(fsys))
require.NoError(t, err)

store := disk.NewFromIndexWithConf(index, &disk.Conf{})
conf := &schema.Conf{Enforcement: schema.EnforcementReject}
mgr := schema.NewWithConf(context.Background(), store, conf)

s, ok := mgr.(storage.Subscriber)
require.True(t, ok)

tc := &privatev1.SchemaTestCase{}
test.ReadSingleTestCase(t, filepath.Join("schema", "test_cases", "case_00.yaml"), tc)
// stash the schema contents for later use
schemaBytes, err := afero.ReadFile(fsys, filepath.Join(schema.Directory, "complex_object.json"))
require.NoError(t, err)

checkValid := func(t *testing.T) {
t.Helper()
t.Run("change_contents", func(t *testing.T) {
schemaFile := filepath.Join(schema.Directory, "complex_object.json")
schemaURL := fmt.Sprintf("%s:///complex_object.json", schema.URLScheme)

have, err := mgr.Validate(context.Background(), tc.SchemaRefs, tc.Input)
require.NoError(t, err)
require.Empty(t, have.Errors)
}
// control test (everything is as it should be)
require.NoError(t, mgr.CheckSchema(context.Background(), schemaURL))

t.Run("delete_schema", func(t *testing.T) {
s.OnStorageEvent(genEvents(storage.EventDeleteSchema, tc.SchemaRefs)...)
checkValid(t)
})
// write rubbish to file
require.NoError(t, afero.WriteFile(fsys, schemaFile, []byte("blah"), 0o644))
s.OnStorageEvent(storage.Event{Kind: storage.EventAddOrUpdateSchema, SchemaFile: "complex_object.json"})
require.Error(t, mgr.CheckSchema(context.Background(), schemaURL))

t.Run("add_schema", func(t *testing.T) {
s.OnStorageEvent(genEvents(storage.EventAddOrUpdateSchema, tc.SchemaRefs)...)
checkValid(t)
// reset
require.NoError(t, afero.WriteFile(fsys, schemaFile, schemaBytes, 0o644))
s.OnStorageEvent(storage.Event{Kind: storage.EventAddOrUpdateSchema, SchemaFile: "complex_object.json"})
require.NoError(t, mgr.CheckSchema(context.Background(), schemaURL))
})
}

func genEvents(kind storage.EventKind, schemas *policyv1.Schemas) []storage.Event {
return []storage.Event{
{Kind: kind, SchemaFile: strings.TrimPrefix(schemas.PrincipalSchema.Ref, schema.URLScheme+"/")},
{Kind: kind, SchemaFile: strings.TrimPrefix(schemas.ResourceSchema.Ref, schema.URLScheme+"/")},
}
t.Run("add_and_delete", func(t *testing.T) {
schemaFile := filepath.Join(schema.Directory, "wibble.json")
schemaURL := fmt.Sprintf("%s:///wibble.json", schema.URLScheme)

// control test
require.Error(t, mgr.CheckSchema(context.Background(), schemaURL))

// add file
require.NoError(t, afero.WriteFile(fsys, schemaFile, schemaBytes, 0o644))
s.OnStorageEvent(storage.Event{Kind: storage.EventAddOrUpdateSchema, SchemaFile: "wibble.json"})
require.NoError(t, mgr.CheckSchema(context.Background(), schemaURL))

// delete file
require.NoError(t, fsys.Remove(schemaFile))
s.OnStorageEvent(storage.Event{Kind: storage.EventDeleteSchema, SchemaFile: "wibble.json"})
require.Error(t, mgr.CheckSchema(context.Background(), schemaURL))
})
}

func readTestCase(t *testing.T, data []byte) *privatev1.SchemaTestCase {
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/blob/cloner.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func (c *Cloner) downloadToFile(ctx context.Context, key, file string) (err erro
if err != nil {
return fmt.Errorf("failed to create a reader for the object %q: %w", key, err)
}
defer multierr.AppendInvoke(&err, multierr.Close(r))
// defer multierr.AppendInvoke(&err, multierr.Close(r))
defer r.Close()

if _, err = io.Copy(fd, r); err != nil {
return fmt.Errorf("failed to read the object %q: %w", key, err)
Expand Down
5 changes: 4 additions & 1 deletion internal/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,25 @@ type Event struct {

func (evt Event) String() string {
kind := ""
id := evt.PolicyID.String()
switch evt.Kind {
case EventAddOrUpdatePolicy:
kind = "ADD/UPDATE"
case EventDeletePolicy:
kind = "DELETE"
case EventAddOrUpdateSchema:
kind = "ADD/UPDATE SCHEMA"
id = evt.SchemaFile
case EventDeleteSchema:
kind = "DELETE SCHEMA"
id = evt.SchemaFile
case EventNop:
kind = "NOP"
default:
kind = "UNKNOWN"
}

return fmt.Sprintf("%s [%s]", kind, evt.PolicyID.String())
return fmt.Sprintf("%s [%s]", kind, id)
}

// NewPolicyEvent creates a new storage event for a policy.
Expand Down

0 comments on commit 60bd517

Please sign in to comment.