Skip to content

Commit

Permalink
Merge pull request #168 from authzed/postgres-gc
Browse files Browse the repository at this point in the history
Garbage collection support for Postgres
  • Loading branch information
josephschorr committed Oct 6, 2021
2 parents 713a97c + 5ced015 commit ddeee97
Show file tree
Hide file tree
Showing 23 changed files with 754 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/spicedb/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newRootCmd() *cobra.Command {
--grpc-key-path path/to/tls/key --datastore-engine postgres
--datastore-conn-uri "postgres-connection-string-here"
%s:
spicedb serve-testing"
spicedb serve-testing
`,
color.YellowString("No TLS and in-memory"),
color.GreenString("TLS and a real datastore"),
Expand Down
8 changes: 8 additions & 0 deletions cmd/spicedb/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func registerServeCmd(rootCmd *cobra.Command) {
serveCmd.Flags().Duration("datastore-conn-max-idletime", 30*time.Minute, "maximum amount of time a connection can idle in a remote datastore's connection pool")
serveCmd.Flags().Duration("datastore-conn-healthcheck-interval", 30*time.Second, "time between a remote datastore's connection pool health checks")
serveCmd.Flags().Duration("datastore-gc-window", 24*time.Hour, "amount of time before revisions are garbage collected")
serveCmd.Flags().Duration("datastore-gc-interval", 3*time.Minute, "amount of time between passes of garbage collection (postgres driver only)")
serveCmd.Flags().Duration("datastore-gc-max-operation-time", 1*time.Minute, "maximum amount of time a garbage collection pass can operate before timing out (postgres driver only)")
serveCmd.Flags().Duration("datastore-revision-fuzzing-duration", 5*time.Second, "amount of time to advertize stale revisions")
serveCmd.Flags().String("datastore-query-split-size", common.DefaultSplitAtEstimatedQuerySize.String(), "estimated number of bytes at which a query is split when using a remote datastore")
serveCmd.Flags().StringSlice("datastore-bootstrap-files", []string{}, "bootstrap data yaml files to load")
Expand Down Expand Up @@ -169,6 +171,8 @@ func serveRun(cmd *cobra.Command, args []string) {
postgres.MaxOpenConns(cobrautil.MustGetInt(cmd, "datastore-conn-max-open")),
postgres.MinOpenConns(cobrautil.MustGetInt(cmd, "datastore-conn-min-open")),
postgres.RevisionFuzzingTimedelta(revisionFuzzingTimedelta),
postgres.GCInterval(cobrautil.MustGetDuration(cmd, "datastore-gc-interval")),
postgres.GCMaxOperationTime(cobrautil.MustGetDuration(cmd, "datastore-gc-max-operation-time")),
postgres.GCWindow(gcWindow),
postgres.EnablePrometheusStats(),
postgres.EnableTracing(),
Expand Down Expand Up @@ -394,6 +398,10 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed while shutting down metrics server")
}

if err := ds.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down datastore")
}

if dashboardAddr != "" {
if err := dashboard.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down dashboard")
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-memdb v1.3.2
github.com/jackc/pgconn v1.10.0
github.com/jackc/pgtype v1.8.1
github.com/jackc/pgx/v4 v4.13.0
github.com/jmoiron/sqlx v1.3.4
github.com/johannesboyne/gofakes3 v0.0.0-20210608054100-92d5d4af5fde
Expand All @@ -52,6 +53,7 @@ require (
go.opentelemetry.io/otel v1.0.0
go.opentelemetry.io/otel/exporters/jaeger v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.0.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect
golang.org/x/tools v0.1.6 // indirect
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (cds *crdbDatastore) IsReady(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
defer currentRevision.Dispose()

version, err := currentRevision.Version()
if err != nil {
Expand All @@ -171,6 +172,11 @@ func (cds *crdbDatastore) IsReady(ctx context.Context) (bool, error) {
return version == headMigration, nil
}

func (cds *crdbDatastore) Close() error {
cds.conn.Close()
return nil
}

func (cds *crdbDatastore) Revision(ctx context.Context) (datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "Revision")
defer span.End()
Expand Down
5 changes: 5 additions & 0 deletions internal/datastore/crdb/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,8 @@ func (apd *CRDBDriver) WriteVersion(version, replaced string) error {

return nil
}

// Dispose disposes the driver.
func (apd *CRDBDriver) Dispose() {
apd.db.Close(context.Background())
}
3 changes: 3 additions & 0 deletions internal/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type Datastore interface {
// database schema creation will return false until the migrations have been run to create
// the necessary tables.
IsReady(ctx context.Context) (bool, error)

// Close closes the data store.
Close() error
}

// GraphDatastore is a subset of the datastore interface that is passed to
Expand Down
5 changes: 5 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,8 @@ func (mds *memdbDatastore) IsReady(ctx context.Context) (bool, error) {
func revisionFromVersion(version uint64) datastore.Revision {
return decimal.NewFromInt(int64(version))
}

func (mds *memdbDatastore) Close() error {
mds.db = nil
return nil
}
16 changes: 16 additions & 0 deletions internal/datastore/memdb/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
)

func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.NamespaceDefinition) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -75,6 +79,10 @@ func (mds *memdbDatastore) WriteNamespace(ctx context.Context, newConfig *v0.Nam

// ReadNamespace reads a namespace definition and version and returns it if found.
func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v0.NamespaceDefinition, datastore.Revision, error) {
if mds.db == nil {
return nil, datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(false)
defer txn.Abort()

Expand All @@ -99,6 +107,10 @@ func (mds *memdbDatastore) ReadNamespace(ctx context.Context, nsName string) (*v
}

func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -153,6 +165,10 @@ func (mds *memdbDatastore) DeleteNamespace(ctx context.Context, nsName string) (
}

func (mds *memdbDatastore) ListNamespaces(ctx context.Context) ([]*v0.NamespaceDefinition, error) {
if mds.db == nil {
return nil, fmt.Errorf("memdb closed")
}

var nsDefs []*v0.NamespaceDefinition

txn := mds.db.Txn(false)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func iteratorForFilter(txn *memdb.Txn, filter *v1.RelationshipFilter) (memdb.Res
}

func (mtq memdbTupleQuery) Execute(ctx context.Context) (datastore.TupleIterator, error) {
if mtq.db == nil {
return nil, fmt.Errorf("memdb closed")
}

txn := mtq.db.Txn(false)

time.Sleep(mtq.simulatedLatency)
Expand Down
16 changes: 16 additions & 0 deletions internal/datastore/memdb/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (mds *memdbDatastore) checkPrecondition(txn *memdb.Txn, preconditions []*v1
}

func (mds *memdbDatastore) WriteTuples(ctx context.Context, preconditions []*v1.Precondition, mutations []*v1.RelationshipUpdate) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -133,6 +137,10 @@ func (mds *memdbDatastore) write(ctx context.Context, txn *memdb.Txn, mutations
}

func (mds *memdbDatastore) DeleteRelationships(ctx context.Context, preconditions []*v1.Precondition, filter *v1.RelationshipFilter) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(true)
defer txn.Abort()

Expand Down Expand Up @@ -213,6 +221,10 @@ func (mds *memdbDatastore) ReverseQueryTuplesFromSubjectNamespace(subjectNamespa
}

func (mds *memdbDatastore) SyncRevision(ctx context.Context) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

// Compute the current revision
txn := mds.db.Txn(false)
defer txn.Abort()
Expand All @@ -228,6 +240,10 @@ func (mds *memdbDatastore) SyncRevision(ctx context.Context) (datastore.Revision
}

func (mds *memdbDatastore) Revision(ctx context.Context) (datastore.Revision, error) {
if mds.db == nil {
return datastore.NoRevision, fmt.Errorf("memdb closed")
}

txn := mds.db.Txn(false)
defer txn.Abort()

Expand Down
5 changes: 5 additions & 0 deletions internal/datastore/postgres/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,8 @@ func (apd *AlembicPostgresDriver) WriteVersion(version, replaced string) error {

return nil
}

// Dispose disposes the driver.
func (apd *AlembicPostgresDriver) Dispose() {
apd.db.Close()
}
32 changes: 29 additions & 3 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type postgresOptions struct {
watchBufferLength uint16
revisionFuzzingTimedelta time.Duration
gcWindow time.Duration
gcInterval time.Duration
gcMaxOperationTime time.Duration
splitAtEstimatedQuerySize units.Base2Bytes

enablePrometheusStats bool
Expand All @@ -27,9 +29,12 @@ type postgresOptions struct {
}

const (
errFuzzingTooLarge = "revision fuzzing timdelta (%s) must be less than GC window (%s)"
errFuzzingTooLarge = "revision fuzzing timedelta (%s) must be less than GC window (%s)"

defaultWatchBufferLength = 128
defaultWatchBufferLength = 128
defaultGarbageCollectionWindow = 24 * time.Hour
defaultGarbageCollectionInterval = time.Minute * 3
defaultGarbageCollectionMaxOperationTime = time.Minute
)

// Option provides the facility to configure how clients within the
Expand All @@ -38,7 +43,9 @@ type Option func(*postgresOptions)

func generateConfig(options []Option) (postgresOptions, error) {
computed := postgresOptions{
gcWindow: 24 * time.Hour,
gcWindow: defaultGarbageCollectionWindow,
gcInterval: defaultGarbageCollectionInterval,
gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime,
watchBufferLength: defaultWatchBufferLength,
splitAtEstimatedQuerySize: common.DefaultSplitAtEstimatedQuerySize,
}
Expand Down Expand Up @@ -147,6 +154,25 @@ func GCWindow(window time.Duration) Option {
}
}

// GCInterval is the the interval at which garbage collection will occur.
//
// This value defaults to 3 minutes.
func GCInterval(interval time.Duration) Option {
return func(po *postgresOptions) {
po.gcInterval = interval
}
}

// GCMaxOperationTime is the maximum operation time of a garbage collection
// pass before it times out.
//
// This value defaults to 1 minute.
func GCMaxOperationTime(time time.Duration) Option {
return func(po *postgresOptions) {
po.gcMaxOperationTime = time
}
}

// EnablePrometheusStats enables Prometheus metrics provided by the Postgres
// clients being used by the datastore.
func EnablePrometheusStats() Option {
Expand Down

0 comments on commit ddeee97

Please sign in to comment.