Skip to content

Commit

Permalink
Add Dispose method on datastore in prep for GC worker for postgres
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph Schorr <josephschorr@users.noreply.github.com>
  • Loading branch information
josephschorr committed Oct 5, 2021
1 parent 8931d76 commit b906977
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/spicedb/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func registerServeCmd(rootCmd *cobra.Command) {
serveCmd.Flags().Duration("datastore-conn-max-idletime", 30*time.Minute, "maximum amount of time a connection can idle in a remote datastore's connection pool")
serveCmd.Flags().Duration("datastore-conn-healthcheck-interval", 30*time.Second, "time between a remote datastore's connection pool health checks")
serveCmd.Flags().Duration("datastore-gc-window", 24*time.Hour, "amount of time before revisions are garbage collected")
serveCmd.Flags().Duration("datastore-gc-interval", 3*time.Minute, "amount of time between passes of garbage collection (postgres driver only)")
serveCmd.Flags().Duration("datastore-revision-fuzzing-duration", 5*time.Second, "amount of time to advertize stale revisions")
serveCmd.Flags().String("datastore-query-split-size", common.DefaultSplitAtEstimatedQuerySize.String(), "estimated number of bytes at which a query is split when using a remote datastore")
serveCmd.Flags().StringSlice("datastore-bootstrap-files", []string{}, "bootstrap data yaml files to load")
Expand Down Expand Up @@ -169,6 +170,7 @@ func serveRun(cmd *cobra.Command, args []string) {
postgres.MaxOpenConns(cobrautil.MustGetInt(cmd, "datastore-conn-max-open")),
postgres.MinOpenConns(cobrautil.MustGetInt(cmd, "datastore-conn-min-open")),
postgres.RevisionFuzzingTimedelta(revisionFuzzingTimedelta),
postgres.GCInterval(cobrautil.MustGetDuration(cmd, "datastore-gc-interval")),
postgres.GCWindow(gcWindow),
postgres.EnablePrometheusStats(),
postgres.EnableTracing(),
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (cds *crdbDatastore) IsReady(ctx context.Context) (bool, error) {
return version == headMigration, nil
}

func (cds *crdbDatastore) Dispose() {
// Nothing to do.
}

func (cds *crdbDatastore) Revision(ctx context.Context) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "Revision")
defer span.End()
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Datastore interface {
// database schema creation will return false until the migrations have been run to create
// the necessary tables.
IsReady(ctx context.Context) (bool, error)

// Dispose disposes of the data store.
Dispose()
}

// GraphDatastore is a subset of the datastore interface that is passed to
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,7 @@ func (mds *memdbDatastore) IsReady(ctx context.Context) (bool, error) {
func revisionFromVersion(version uint64) datastore.Revision {
return decimal.NewFromInt(int64(version))
}

func (mdb *memdbDatastore) Dispose() {
// Nothing to do.
}
4 changes: 4 additions & 0 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (pgd *pgDatastore) IsReady(ctx context.Context) (bool, error) {
return version == headMigration, nil
}

func (pgd *pgDatastore) Dispose() {
// Close the channel to ensure the GC task is shut down.
}

func (pgd *pgDatastore) SyncRevision(ctx context.Context) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "SyncRevision")
defer span.End()
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func NewMappingProxy(delegate datastore.Datastore, mapper namespace.Mapper, watc
return mappingProxy{delegate, mapper, watchBufferLength}
}

func (mp mappingProxy) Dispose() {
mp.delegate.Dispose()
}

func (mp mappingProxy) IsReady(ctx context.Context) (bool, error) {
return mp.delegate.IsReady(ctx)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func NewReadonlyDatastore(delegate datastore.Datastore) datastore.Datastore {
return roDatastore{delegate: delegate}
}

func (rd roDatastore) Dispose() {
rd.delegate.Dispose()
}

func (rd roDatastore) IsReady(ctx context.Context) (bool, error) {
return rd.delegate.IsReady(ctx)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,7 @@ func (dm *delegateMock) IsReady(ctx context.Context) (bool, error) {
args := dm.Called()
return args.Bool(0), args.Error(1)
}

func (dm *delegateMock) Dispose() {
dm.Called()
}
6 changes: 6 additions & 0 deletions internal/datastore/test/tuples.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func SimpleTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()

ctx := context.Background()

Expand Down Expand Up @@ -244,6 +245,7 @@ func WritePreconditionsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()

setupDatastore(ds, require)

Expand Down Expand Up @@ -292,6 +294,7 @@ func DeletePreconditionsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()

setupDatastore(ds, require)

Expand Down Expand Up @@ -399,6 +402,7 @@ func DeleteRelationshipsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()

setupDatastore(ds, require)

Expand Down Expand Up @@ -439,6 +443,7 @@ func InvalidReadsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, testGCDuration, 1)
require.NoError(err)
defer ds.Dispose()

setupDatastore(ds, require)

Expand Down Expand Up @@ -507,6 +512,7 @@ func UsersetsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()

setupDatastore(ds, require)

Expand Down
15 changes: 15 additions & 0 deletions internal/services/v0/devcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func NewDevContext(ctx context.Context, requestContext *v0.RequestContext) (*Dev
return nil, false, err
}

dctx, ok, err := newDevContext(ctx, requestContext, ds)
if !ok || err != nil {
ds.Dispose()
}
return dctx, ok, err
}

func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds datastore.Datastore) (*DevContext, bool, error) {
nsm, err := namespace.NewCachingNamespaceManager(ds, 0*time.Second, nil)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -93,6 +101,13 @@ func NewDevContext(ctx context.Context, requestContext *v0.RequestContext) (*Dev
}, len(requestErrors) == 0, nil
}

func (dc *DevContext) dispose() {
datastore := dc.Datastore
if datastore != nil {
datastore.Dispose()
}
}

func compile(schema string) ([]*v0.NamespaceDefinition, *v0.DeveloperError, error) {
empty := ""
namespaces, err := compiler.Compile([]compiler.InputSchema{
Expand Down
2 changes: 2 additions & 0 deletions internal/services/v0/developer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (ds *devServer) EditCheck(ctx context.Context, req *v0.EditCheckRequest) (*
RequestErrors: devContext.RequestErrors,
}, nil
}
defer devContext.dispose()

// Run the checks and store their output.
var results []*v0.EditCheckResult
Expand Down Expand Up @@ -199,6 +200,7 @@ func (ds *devServer) Validate(ctx context.Context, req *v0.ValidateRequest) (*v0
RequestErrors: devContext.RequestErrors,
}, nil
}
defer devContext.dispose()

// Parse the validation YAML.
validation, err := validationfile.ParseValidationBlock([]byte(req.ValidationYaml))
Expand Down
4 changes: 4 additions & 0 deletions internal/testfixtures/validating.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func NewValidatingDatastore(delegate datastore.Datastore) datastore.Datastore {
return validatingDatastore{delegate: delegate}
}

func (vd validatingDatastore) Dispose() {
vd.Dispose()
}

func (vd validatingDatastore) IsReady(ctx context.Context) (bool, error) {
return vd.delegate.IsReady(ctx)
}
Expand Down

0 comments on commit b906977

Please sign in to comment.