Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper handling of incompatible zedtokens #1723

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e/newenemy/newenemy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb
require.NoError(t, err)
t.Log("r2 token: ", r2.WrittenAt.Token)

z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})

t.Log("z1 revision: ", z1)
t.Log("z2 revision: ", z2)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR

type ctxProxy struct{ delegate datastore.Datastore }

func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
}

func (p *ctxProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"regexp"
"strconv"
"sync/atomic"
"time"

"github.com/IBM/pgxpoolprometheus"
Expand Down Expand Up @@ -280,6 +281,8 @@ type crdbDatastore struct {
ctx context.Context
cancel context.CancelFunc
filterMaximumIDCount uint16

uniqueID atomic.Pointer[string]
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
Expand Down
27 changes: 19 additions & 8 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@ const (
colUniqueID = "unique_id"
)

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
uniqueID string
)
var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
if len(uniqueID) == 0 {
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
if cds.uniqueID.Load() == nil {
sql, args, err := queryReadUniqueID.ToSql()
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
}

var uniqueID string
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&uniqueID)
}, sql, args...); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
return "", fmt.Errorf("unable to query unique ID: %w", err)
}

cds.uniqueID.Store(&uniqueID)
vroldanbet marked this conversation as resolved.
Show resolved Hide resolved
return uniqueID, nil
}

return *cds.uniqueID.Load(), nil
}

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
uniqueID, err := cds.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, err
}

var nsDefs []datastore.RevisionedNamespace
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type snapshot struct {
db *memdb.MemDB
}

func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
return mdb.uniqueID, nil
}

func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
mdb.RLock()
defer mdb.RUnlock()
Expand Down
4 changes: 3 additions & 1 deletion internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ type Datastore struct {
createTxn string
createBaseTxn string

uniqueID atomic.Pointer[string]

*QueryBuilder
*revisions.CachedOptimizedRevisions
revisions.CommonDecoder
Expand Down Expand Up @@ -576,7 +578,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
return false, nil
}

_, err = mds.getUniqueID(ctx)
_, err = mds.UniqueID(ctx)
if err != nil {
return false, nil
}
Expand Down
24 changes: 14 additions & 10 deletions internal/datastore/mysql/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
}
}

uniqueID, err := mds.getUniqueID(ctx)
uniqueID, err := mds.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, err
}
Expand Down Expand Up @@ -81,16 +81,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
}, nil
}

func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
if mds.uniqueID.Load() == nil {
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}

var uniqueID string
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
var uniqueID string
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
}
mds.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return uniqueID, nil
return *mds.uniqueID.Load(), nil
}
3 changes: 2 additions & 1 deletion internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ type pgDatastore struct {
inStrictReadMode bool

credentialsProvider datastore.CredentialsProvider
uniqueID atomic.Pointer[string]

gcGroup *errgroup.Group
gcCtx context.Context
Expand Down Expand Up @@ -651,7 +652,7 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e

if version == headMigration {
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
uniqueID, err := pgd.datastoreUniqueID(ctx)
uniqueID, err := pgd.UniqueID(ctx)
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
}
Expand Down
25 changes: 17 additions & 8 deletions internal/datastore/postgres/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ var (
Where(sq.Eq{colRelname: tableTuple})
)

func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
idSQL, idArgs, err := queryUniqueID.ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
if pgd.uniqueID.Load() == nil {
idSQL, idArgs, err := queryUniqueID.ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}

var uniqueID string
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
}); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
}

pgd.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

var uniqueID string
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
})
return *pgd.uniqueID.Load(), nil
}

func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {

type observableProxy struct{ delegate datastore.Datastore }

func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(ctx)
}

func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegateReader := p.delegate.SnapshotReader(rev)
return &observableReader{delegateReader}
Expand Down
10 changes: 10 additions & 0 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import (

type MockDatastore struct {
mock.Mock

CurrentUniqueID string
}

func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
if dm.CurrentUniqueID == "" {
return "mockds", nil
}

return dm.CurrentUniqueID, nil
}

func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/replicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (f fakeDatastore) Statistics(_ context.Context) (datastore.Stats, error) {
return datastore.Stats{}, nil
}

func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
return "fake", nil
}

func (f fakeDatastore) Close() error {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ type fakeDatastore struct {
lock sync.RWMutex
}

func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
return "fakedsforwatch", nil
}

func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
fds.lock.Lock()
defer fds.lock.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type singleflightProxy struct {

var _ datastore.Datastore = (*singleflightProxy)(nil)

func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(ctx)
}

func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return p.delegate.SnapshotReader(rev)
}
Expand Down
9 changes: 8 additions & 1 deletion internal/datastore/revisions/commonrevision.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package revisions

import (
"context"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/spiceerrors"
)
Expand Down Expand Up @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc {

// CommonDecoder is a revision decoder that can decode revisions of a given kind.
type CommonDecoder struct {
Kind RevisionKind
Kind RevisionKind
DatastoreUniqueID string
}

func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) {
return cd.DatastoreUniqueID, nil
}

func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) {
Expand Down
17 changes: 15 additions & 2 deletions internal/datastore/spanner/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,27 @@ var (
nowStmt = spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")
)

func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
now, err := sd.now(ctx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}

return revisions.NewForTime(now), nil
}

func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
return sd.headRevisionInternal(ctx)
}

func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
var timestamp time.Time
if err := sd.client.Single().Query(ctx, nowStmt).Do(func(r *spanner.Row) error {
return r.Columns(&timestamp)
}); err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
return timestamp, fmt.Errorf(errRevision, err)
}
return revisions.NewForTime(timestamp), nil
return timestamp, nil
}

func (sd *spannerDatastore) staleHeadRevision(ctx context.Context) (datastore.Revision, error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"regexp"
"strconv"
"sync"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -92,6 +93,7 @@ type spannerDatastore struct {

tableSizesStatsTable string
filterMaximumIDCount uint16
uniqueID atomic.Pointer[string]
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down
31 changes: 22 additions & 9 deletions internal/datastore/spanner/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,29 @@ var querySomeRandomRelationships = fmt.Sprintf(`SELECT %s FROM %s LIMIT 10`,

const defaultEstimatedBytesPerRelationships = 20 // determined by looking at some sample clusters

func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
if sd.uniqueID.Load() == nil {
var uniqueID string
if err := sd.client.Single().Read(
ctx,
tableMetadata,
spanner.AllKeys(),
[]string{colUniqueID},
).Do(func(r *spanner.Row) error {
return r.Columns(&uniqueID)
}); err != nil {
return "", fmt.Errorf("unable to read unique ID: %w", err)
}
sd.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return *sd.uniqueID.Load(), nil
}

func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
var uniqueID string
if err := sd.client.Single().Read(
context.Background(),
tableMetadata,
spanner.AllKeys(),
[]string{colUniqueID},
).Do(func(r *spanner.Row) error {
return r.Columns(&uniqueID)
}); err != nil {
uniqueID, err := sd.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
}

Expand Down
Loading
Loading