From 230e1190f31aeb88ded2450832ba2050bbfe5076 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 29 Jan 2024 15:12:40 -0500 Subject: [PATCH] Export the unique, stable, ID from datastore --- internal/datastore/context.go | 4 +++ internal/datastore/crdb/crdb.go | 3 ++ internal/datastore/crdb/stats.go | 23 ++++++++++--- internal/datastore/memdb/memdb.go | 4 +++ internal/datastore/mysql/datastore.go | 4 ++- internal/datastore/mysql/stats.go | 24 ++++++++------ internal/datastore/postgres/postgres.go | 4 ++- internal/datastore/postgres/stats.go | 25 +++++++++----- internal/datastore/proxy/observable.go | 4 +++ internal/datastore/proxy/proxy_test/mock.go | 4 +++ .../proxy/schemacaching/watchingcache_test.go | 4 +++ internal/datastore/proxy/singleflight.go | 4 +++ internal/datastore/spanner/revisions.go | 6 ++-- internal/datastore/spanner/spanner.go | 15 +++++---- internal/datastore/spanner/spanner_test.go | 2 +- internal/datastore/spanner/stats.go | 33 +++++++++++++------ internal/datastore/spanner/watch.go | 4 +-- pkg/datastore/datastore.go | 5 +++ pkg/datastore/datastore_test.go | 4 +++ pkg/datastore/test/basic.go | 18 ++++++++++ pkg/datastore/test/datastore.go | 1 + 21 files changed, 148 insertions(+), 47 deletions(-) diff --git a/internal/datastore/context.go b/internal/datastore/context.go index 9392cf6ded..ea30a4e6e4 100644 --- a/internal/datastore/context.go +++ b/internal/datastore/context.go @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.Datasto type ctxProxy struct{ delegate datastore.Datastore } +func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) { + return p.delegate.UniqueID(SeparateContextWithTracing(ctx)) +} + func (p *ctxProxy) ReadWriteTx( ctx context.Context, f datastore.TxUserFunc, diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index d6c686177b..b13d3fba38 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -6,6 +6,7 @@ import ( "fmt" "regexp" "strconv" + "sync/atomic" "time" "github.com/IBM/pgxpoolprometheus" @@ -273,6 +274,8 @@ type crdbDatastore struct { pruneGroup *errgroup.Group ctx context.Context cancel context.CancelFunc + + uniqueID atomic.Pointer[string] } func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index 5da5ed355e..3dffe03574 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -20,20 +20,33 @@ const ( var ( queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) - uniqueID string ) -func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { - if len(uniqueID) == 0 { +func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) { + if cds.uniqueID.Load() == nil { sql, args, err := queryReadUniqueID.ToSql() if err != nil { - return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err) + return "", fmt.Errorf("unable to prepare unique ID sql: %w", err) } + + var uniqueID string if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { return row.Scan(&uniqueID) }, sql, args...); err != nil { - return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err) + return "", fmt.Errorf("unable to query unique ID: %w", err) } + + cds.uniqueID.Store(&uniqueID) + return uniqueID, nil + } + + return *cds.uniqueID.Load(), nil +} + +func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { + uniqueID, err := cds.UniqueID(ctx) + if err != nil { + return datastore.Stats{}, err } var nsDefs []datastore.RevisionedNamespace diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index e23a7f3560..03e458ac33 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -99,6 +99,10 @@ type snapshot struct { db *memdb.MemDB } +func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) { + return mdb.uniqueID, nil +} + func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader { mdb.RLock() defer mdb.RUnlock() diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 0bb9b88b18..ed112bed39 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -448,6 +448,8 @@ type Datastore struct { createTxn string createBaseTxn string + uniqueID atomic.Pointer[string] + *QueryBuilder *revisions.CachedOptimizedRevisions revisions.CommonDecoder @@ -524,7 +526,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) { return false, nil } - _, err = mds.getUniqueID(ctx) + _, err = mds.UniqueID(ctx) if err != nil { return false, nil } diff --git a/internal/datastore/mysql/stats.go b/internal/datastore/mysql/stats.go index b4bf0d2954..516ee3fa11 100644 --- a/internal/datastore/mysql/stats.go +++ b/internal/datastore/mysql/stats.go @@ -28,7 +28,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) { } } - uniqueID, err := mds.getUniqueID(ctx) + uniqueID, err := mds.UniqueID(ctx) if err != nil { return datastore.Stats{}, err } @@ -81,16 +81,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) { }, nil } -func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) { - sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql() - if err != nil { - return "", fmt.Errorf("unable to generate query sql: %w", err) - } +func (mds *Datastore) UniqueID(ctx context.Context) (string, error) { + if mds.uniqueID.Load() == nil { + sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql() + if err != nil { + return "", fmt.Errorf("unable to generate query sql: %w", err) + } - var uniqueID string - if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil { - return "", fmt.Errorf("unable to query unique ID: %w", err) + var uniqueID string + if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil { + return "", fmt.Errorf("unable to query unique ID: %w", err) + } + mds.uniqueID.Store(&uniqueID) + return uniqueID, nil } - return uniqueID, nil + return *mds.uniqueID.Load(), nil } diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index c0144b741c..a7d9fc8963 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -300,6 +300,8 @@ type pgDatastore struct { maxRetries uint8 watchEnabled bool + uniqueID atomic.Pointer[string] + gcGroup *errgroup.Group gcCtx context.Context cancelGc context.CancelFunc @@ -547,7 +549,7 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e if version == headMigration { // Ensure a datastore ID is present. This ensures the tables have not been truncated. - uniqueID, err := pgd.datastoreUniqueID(ctx) + uniqueID, err := pgd.UniqueID(ctx) if err != nil { return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err) } diff --git a/internal/datastore/postgres/stats.go b/internal/datastore/postgres/stats.go index 61a0be593d..73c4b7f94c 100644 --- a/internal/datastore/postgres/stats.go +++ b/internal/datastore/postgres/stats.go @@ -28,16 +28,25 @@ var ( Where(sq.Eq{colRelname: tableTuple}) ) -func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) { - idSQL, idArgs, err := queryUniqueID.ToSql() - if err != nil { - return "", fmt.Errorf("unable to generate query sql: %w", err) +func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) { + if pgd.uniqueID.Load() == nil { + idSQL, idArgs, err := queryUniqueID.ToSql() + if err != nil { + return "", fmt.Errorf("unable to generate query sql: %w", err) + } + + var uniqueID string + if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error { + return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID) + }); err != nil { + return "", fmt.Errorf("unable to query unique ID: %w", err) + } + + pgd.uniqueID.Store(&uniqueID) + return uniqueID, nil } - var uniqueID string - return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error { - return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID) - }) + return *pgd.uniqueID.Load(), nil } func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index 35ba839501..ffc62fdd02 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -67,6 +67,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore { type observableProxy struct{ delegate datastore.Datastore } +func (p *observableProxy) UniqueID(ctx context.Context) (string, error) { + return p.delegate.UniqueID(ctx) +} + func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { delegateReader := p.delegate.SnapshotReader(rev) return &observableReader{delegateReader} diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index 2f40526b2e..02ac011b3f 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -15,6 +15,10 @@ type MockDatastore struct { mock.Mock } +func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { + return "mockds", nil +} + func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { args := dm.Called(rev) return args.Get(0).(datastore.Reader) diff --git a/internal/datastore/proxy/schemacaching/watchingcache_test.go b/internal/datastore/proxy/schemacaching/watchingcache_test.go index 01e89020af..ed5cc94fd4 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache_test.go +++ b/internal/datastore/proxy/schemacaching/watchingcache_test.go @@ -337,6 +337,10 @@ type fakeDatastore struct { lock sync.RWMutex } +func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) { + return "fakedsforwatch", nil +} + func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) { fds.lock.Lock() defer fds.lock.Unlock() diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index bf254a70d0..b9404bd76f 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -25,6 +25,10 @@ type singleflightProxy struct { var _ datastore.Datastore = (*singleflightProxy)(nil) +func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) { + return p.delegate.UniqueID(ctx) +} + func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { return p.delegate.SnapshotReader(rev) } diff --git a/internal/datastore/spanner/revisions.go b/internal/datastore/spanner/revisions.go index 965bb553af..eadd7c809a 100644 --- a/internal/datastore/spanner/revisions.go +++ b/internal/datastore/spanner/revisions.go @@ -13,7 +13,7 @@ import ( var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp) -func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { +func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { now, err := sd.now(ctx) if err != nil { return datastore.NoRevision, fmt.Errorf(errRevision, err) @@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore. return revisions.NewForTime(now), nil } -func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { +func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { return sd.headRevisionInternal(ctx) } -func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) { +func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) { var timestamp time.Time if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error { return r.Columns(×tamp) diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 9efafde43a..f59f824ee5 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -6,6 +6,7 @@ import ( "os" "regexp" "strconv" + "sync/atomic" "time" "cloud.google.com/go/spanner" @@ -78,6 +79,8 @@ type spannerDatastore struct { client *spanner.Client config spannerOptions database string + + uniqueID atomic.Pointer[string] } // NewSpannerDatastore returns a datastore backed by cloud spanner @@ -143,7 +146,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) ( maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())* config.maxRevisionStalenessPercent) * time.Nanosecond - ds := spannerDatastore{ + ds := &spannerDatastore{ RemoteClockRevisions: revisions.NewRemoteClockRevisions( defaultChangeStreamRetention, maxRevisionStaleness, @@ -195,7 +198,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) * return t.delegate.Query(ctx, statement) } -func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { +func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { r := revisionRaw.(revisions.TimestampRevision) txSource := func() readTX { @@ -205,7 +208,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast return spannerReader{executor, txSource} } -func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { +func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { config := options.NewRWTOptionsWithOptions(opts...) ctx, span := tracer.Start(ctx, "ReadWriteTx") @@ -248,7 +251,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF return revisions.NewForTime(ts), nil } -func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { +func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { headMigration, err := migrations.SpannerMigrations.HeadRevision() if err != nil { return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err) @@ -275,11 +278,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState }, nil } -func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) { +func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) { return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil } -func (sd spannerDatastore) Close() error { +func (sd *spannerDatastore) Close() error { sd.client.Close() return nil } diff --git a/internal/datastore/spanner/spanner_test.go b/internal/datastore/spanner/spanner_test.go index e6b6008d87..a6f6787b19 100644 --- a/internal/datastore/spanner/spanner_test.go +++ b/internal/datastore/spanner/spanner_test.go @@ -18,7 +18,7 @@ import ( ) // Implement TestableDatastore interface -func (sd spannerDatastore) ExampleRetryableError() error { +func (sd *spannerDatastore) ExampleRetryableError() error { return status.New(codes.Aborted, "retryable").Err() } diff --git a/internal/datastore/spanner/stats.go b/internal/datastore/spanner/stats.go index 0df2d25b79..cd851e547a 100644 --- a/internal/datastore/spanner/stats.go +++ b/internal/datastore/spanner/stats.go @@ -20,16 +20,29 @@ var ( rng = rand.NewSource(time.Now().UnixNano()) ) -func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { - var uniqueID string - if err := sd.client.Single().Read( - context.Background(), - tableMetadata, - spanner.AllKeys(), - []string{colUniqueID}, - ).Do(func(r *spanner.Row) error { - return r.Columns(&uniqueID) - }); err != nil { +func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) { + if sd.uniqueID.Load() == nil { + var uniqueID string + if err := sd.client.Single().Read( + ctx, + tableMetadata, + spanner.AllKeys(), + []string{colUniqueID}, + ).Do(func(r *spanner.Row) error { + return r.Columns(&uniqueID) + }); err != nil { + return "", fmt.Errorf("unable to read unique ID: %w", err) + } + sd.uniqueID.Store(&uniqueID) + return uniqueID, nil + } + + return *sd.uniqueID.Load(), nil +} + +func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { + uniqueID, err := sd.UniqueID(ctx) + if err != nil { return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err) } diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index 8a2c437da9..0cc707eb3d 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -51,7 +51,7 @@ func parseDatabaseName(db string) (project, instance, database string, err error return matches[1], matches[2], matches[3], nil } -func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { watchBufferLength := opts.WatchBufferLength if watchBufferLength <= 0 { watchBufferLength = sd.watchBufferLength @@ -65,7 +65,7 @@ func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Re return updates, errs } -func (sd spannerDatastore) watch( +func (sd *spannerDatastore) watch( ctx context.Context, afterRevisionRaw datastore.Revision, opts datastore.WatchOptions, diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 4f22101af7..9e6f147b8f 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -459,6 +459,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio // Datastore represents tuple access for a single namespace. type Datastore interface { + // UniqueID returns a unique identifier for the datastore. This identifier + // must be stable across restarts of the datastore if the datastore is + // persistent. + UniqueID(context.Context) (string, error) + // SnapshotReader creates a read-only handle that reads the datastore at the specified revision. // Any errors establishing the reader will be returned by subsequent calls. SnapshotReader(Revision) Reader diff --git a/pkg/datastore/datastore_test.go b/pkg/datastore/datastore_test.go index 1898d9aa77..7b9d06fd18 100644 --- a/pkg/datastore/datastore_test.go +++ b/pkg/datastore/datastore_test.go @@ -582,6 +582,10 @@ type fakeDatastore struct { delegate Datastore } +func (f fakeDatastore) UniqueID(_ context.Context) (string, error) { + return "fake", nil +} + func (f fakeDatastore) Unwrap() Datastore { return f.delegate } diff --git a/pkg/datastore/test/basic.go b/pkg/datastore/test/basic.go index 8e8aafbf73..39fb1f9dfe 100644 --- a/pkg/datastore/test/basic.go +++ b/pkg/datastore/test/basic.go @@ -22,3 +22,21 @@ func UseAfterCloseTest(t *testing.T, tester DatastoreTester) { _, err = ds.HeadRevision(context.Background()) require.Error(err) } + +func UniqueIDTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + // Create the datastore. + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + // Ensure the unique ID is not empty. + uniqueID, err := ds.UniqueID(context.Background()) + require.NoError(err) + require.NotEmpty(uniqueID) + + // Ensure the unique ID is stable. + uniqueID2, err := ds.UniqueID(context.Background()) + require.NoError(err) + require.Equal(uniqueID, uniqueID2) +} diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 9e8ec85b8e..0240fb2cac 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -81,6 +81,7 @@ func WithCategories(cats ...string) Categories { // AllWithExceptions runs all generic datastore tests on a DatastoreTester, except // those specified test categories func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) { + t.Run("TestUniqueID", func(t *testing.T) { UniqueIDTest(t, tester) }) t.Run("TestUseAfterClose", func(t *testing.T) { UseAfterCloseTest(t, tester) }) t.Run("TestNamespaceNotFound", func(t *testing.T) { NamespaceNotFoundTest(t, tester) })