Skip to content

Commit

Permalink
Update handling of datastore Close to disconnect connections and chan…
Browse files Browse the repository at this point in the history
…ge to use an errgroup to clean up Postgres GC worker

Signed-off-by: Joseph Schorr <josephschorr@users.noreply.github.com>
  • Loading branch information
josephschorr committed Oct 6, 2021
1 parent 21e1b85 commit 5ced015
Show file tree
Hide file tree
Showing 18 changed files with 106 additions and 42 deletions.
4 changes: 4 additions & 0 deletions cmd/spicedb/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed while shutting down metrics server")
}

if err := ds.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down datastore")
}

if dashboardAddr != "" {
if err := dashboard.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down dashboard")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
go.opentelemetry.io/otel v1.0.0
go.opentelemetry.io/otel/exporters/jaeger v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.0.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect
golang.org/x/tools v0.1.6 // indirect
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
5 changes: 3 additions & 2 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ func (cds *crdbDatastore) IsReady(ctx context.Context) (bool, error) {
return version == headMigration, nil
}

func (cds *crdbDatastore) Dispose() {
// Nothing to do.
func (cds *crdbDatastore) Close() error {
cds.conn.Close()
return nil
}

func (cds *crdbDatastore) Revision(ctx context.Context) (datastore.Revision, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type Datastore interface {
// the necessary tables.
IsReady(ctx context.Context) (bool, error)

// Dispose disposes of the data store.
Dispose()
// Close closes the data store.
Close() error
}

// GraphDatastore is a subset of the datastore interface that is passed to
Expand Down
5 changes: 3 additions & 2 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func revisionFromVersion(version uint64) datastore.Revision {
return decimal.NewFromInt(int64(version))
}

func (mds *memdbDatastore) Dispose() {
// Nothing to do.
func (mds *memdbDatastore) Close() error {
mds.db = nil
return nil
}
16 changes: 16 additions & 0 deletions internal/datastore/memdb/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
)

func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -75,6 +79,10 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam

// ReadNamespace reads a namespace definition and version and returns it if found.
func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) {
if mds.db == nil {
return nil, datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(false)
defer txn.Abort()

Expand All @@ -99,6 +107,10 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v
}

func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -153,6 +165,10 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (
}

func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
if mds.db == nil {
return nil, fmt.Errorf("memdb closed")
}

var nsDefs []*v0.NamespaceDefinition

txn := mds.db.Txn(false)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func iteratorForFilter(txn *memdb.Txn, filter *v1.RelationshipFilter) (memdb.Res
}

func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator, error) {
if mtq.db == nil {
return nil, fmt.Errorf("memdb closed")
}

txn := mtq.db.Txn(false)

time.Sleep(mtq.simulatedLatency)
Expand Down
16 changes: 16 additions & 0 deletions internal/datastore/memdb/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (mds *memdbDatastore) checkPrecondition(txn *memdb.Txn, preconditions []*v1
}

func (mds *memdbDatastore) WriteTuples(ctx context.Context, preconditions []*v1.Precondition, mutations []*v1.RelationshipUpdate) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -133,6 +137,10 @@ func (mds *memdbDatastore) write(ctx context.Context, txn *memdb.Txn, mutations
}

func (mds *memdbDatastore) DeleteRelationships(ctx context.Context, preconditions []*v1.Precondition, filter *v1.RelationshipFilter) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -213,6 +221,10 @@ func (mds *memdbDatastore) ReverseQueryTuplesFromSubjectNamespace(subjectNamespa
}

func (mds *memdbDatastore) SyncRevision(ctx context.Context) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

// Compute the current revision
txn := mds.db.Txn(false)
defer txn.Abort()
Expand All @@ -228,6 +240,10 @@ func (mds *memdbDatastore) SyncRevision(ctx context.Context) (datastore.Revision
}

func (mds *memdbDatastore) Revision(ctx context.Context) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(false)
defer txn.Abort()

Expand Down
45 changes: 28 additions & 17 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
dbsql "database/sql"
"fmt"
"math/rand"
"sync"
"time"

"golang.org/x/sync/errgroup"

sq "github.com/Masterminds/squirrel"
"github.com/alecthomas/units"
"github.com/jackc/pgtype"
Expand Down Expand Up @@ -145,6 +146,8 @@ func NewPostgresDatastore(
}
}

gcCtx, cancelGc := context.WithCancel(context.Background())

datastore := &pgDatastore{
dburl: url,
dbpool: dbpool,
Expand All @@ -154,13 +157,14 @@ func NewPostgresDatastore(
gcInterval: config.gcInterval,
gcMaxOperationTime: config.gcMaxOperationTime,
splitAtEstimatedQuerySize: config.splitAtEstimatedQuerySize,
disposed: make(chan struct{}),
gcCtx: gcCtx,
cancelGc: cancelGc,
}

// Start a goroutine for garbage collection.
if datastore.gcInterval > 0*time.Minute {
datastore.wg.Add(1)
go datastore.runGarbageCollector()
datastore.gcGroup, datastore.gcCtx = errgroup.WithContext(datastore.gcCtx)
datastore.gcGroup.Go(datastore.runGarbageCollector)
} else {
log.Warn().Msg("garbage collection disabled in postgres driver")
}
Expand All @@ -178,19 +182,32 @@ type pgDatastore struct {
gcMaxOperationTime time.Duration
splitAtEstimatedQuerySize units.Base2Bytes

disposed chan struct{}
wg sync.WaitGroup
gcGroup *errgroup.Group
gcCtx context.Context
cancelGc context.CancelFunc
}

func (pgd *pgDatastore) runGarbageCollector() {
func (pgd *pgDatastore) Close() error {
pgd.cancelGc()

if pgd.gcGroup != nil {
err := pgd.gcGroup.Wait()
log.Warn().Err(err).Msg("completed shutdown of postgres datastore")
}

pgd.dbpool.Close()
return nil
}

func (pgd *pgDatastore) runGarbageCollector() error {
log.Info().Dur("interval", pgd.gcInterval).Msg("garbage collection worker started for postgres driver")

defer pgd.wg.Done()
for {
select {
case <-pgd.disposed:
log.Info().Msg("shutting down postgres GC")
return
case <-pgd.gcCtx.Done():
log.Info().Msg("shutting down garbage collection worker for postgres driver")
return pgd.gcCtx.Err()

case <-time.After(pgd.gcInterval):
err := pgd.collectGarbage()
if err != nil {
Expand Down Expand Up @@ -354,12 +371,6 @@ func (pgd *pgDatastore) IsReady(ctx context.Context) (bool, error) {
return version == headMigration, nil
}

func (pgd *pgDatastore) Dispose() {
// Close the channel to ensure the GC task is shut down.
close(pgd.disposed)
pgd.wg.Wait()
}

func (pgd *pgDatastore) SyncRevision(ctx context.Context) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "SyncRevision")
defer span.End()
Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestPostgresGarbageCollection(t *testing.T) {

ds, err := tester.New(0, time.Millisecond*1, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

ctx := context.Background()
ok, err := ds.IsReady(ctx)
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestPostgresGarbageCollectionByTime(t *testing.T) {

ds, err := tester.New(0, time.Millisecond*1, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

ctx := context.Background()
ok, err := ds.IsReady(ctx)
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestPostgresChunkedGarbageCollection(t *testing.T) {

ds, err := tester.New(0, time.Millisecond*1, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

ctx := context.Background()
ok, err := ds.IsReady(ctx)
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func NewMappingProxy(delegate datastore.Datastore, mapper namespace.Mapper, watc
return mappingProxy{delegate, mapper, watchBufferLength}
}

func (mp mappingProxy) Dispose() {
mp.delegate.Dispose()
func (mp mappingProxy) Close() error {
return mp.delegate.Close()
}

func (mp mappingProxy) IsReady(ctx context.Context) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewReadonlyDatastore(delegate datastore.Datastore) datastore.Datastore {
return roDatastore{delegate: delegate}
}

func (rd roDatastore) Dispose() {
rd.delegate.Dispose()
func (rd roDatastore) Close() error {
return rd.delegate.Close()
}

func (rd roDatastore) IsReady(ctx context.Context) (bool, error) {
Expand Down
5 changes: 3 additions & 2 deletions internal/datastore/proxy/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (dm *delegateMock) IsReady(ctx context.Context) (bool, error) {
return args.Bool(0), args.Error(1)
}

func (dm *delegateMock) Dispose() {
dm.Called()
func (dm *delegateMock) Close() error {
args := dm.Called()
return args.Error(0)
}
12 changes: 6 additions & 6 deletions internal/datastore/test/tuples.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func SimpleTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

ctx := context.Background()

Expand Down Expand Up @@ -248,7 +248,7 @@ func WritePreconditionsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

setupDatastore(ds, require)

Expand Down Expand Up @@ -297,7 +297,7 @@ func DeletePreconditionsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

setupDatastore(ds, require)

Expand Down Expand Up @@ -405,7 +405,7 @@ func DeleteRelationshipsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

setupDatastore(ds, require)

Expand Down Expand Up @@ -446,7 +446,7 @@ func InvalidReadsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, testGCDuration, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

setupDatastore(ds, require)

Expand Down Expand Up @@ -515,7 +515,7 @@ func UsersetsTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCWindow, 1)
require.NoError(err)
defer ds.Dispose()
defer ds.Close()

setupDatastore(ds, require)

Expand Down
1 change: 1 addition & 0 deletions internal/services/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestConsistency(t *testing.T) {
lrequire.NoError(err)

ds := testfixtures.NewValidatingDatastore(unvalidated)
// defer ds.Close()

fullyResolved, revision, err := validationfile.PopulateFromFiles(ds, []string{filePath})
lrequire.NoError(err)
Expand Down

0 comments on commit 5ced015

Please sign in to comment.