diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index 7c37737442..e2352367d0 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb require.NoError(t, err) t.Log("r2 token: ", r2.WrittenAt.Token) - z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) - z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) t.Log("z1 revision: ", z1) t.Log("z2 revision: ", z2) diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index 3dffe03574..a8a7e25261 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -18,9 +18,7 @@ const ( colUniqueID = "unique_id" ) -var ( - queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) -) +var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) { if cds.uniqueID.Load() == nil { diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index 02ac011b3f..422f6011fb 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -13,10 +13,16 @@ import ( type MockDatastore struct { mock.Mock + + CurrentUniqueID string } func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { - return "mockds", nil + if dm.CurrentUniqueID == "" { + return "mockds", nil + } + + return dm.CurrentUniqueID, nil } func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/revisions/commonrevision.go b/internal/datastore/revisions/commonrevision.go index de85496d9b..c1adc51fca 100644 --- a/internal/datastore/revisions/commonrevision.go +++ b/internal/datastore/revisions/commonrevision.go @@ -1,6 +1,8 @@ package revisions import ( + "context" + "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc { // CommonDecoder is a revision decoder that can decode revisions of a given kind. type CommonDecoder struct { - Kind RevisionKind + Kind RevisionKind + DatastoreUniqueID string +} + +func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) { + return cd.DatastoreUniqueID, nil } func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) { diff --git a/internal/middleware/consistency/consistency.go b/internal/middleware/consistency/consistency.go index c928e6b88a..ac6e222870 100644 --- a/internal/middleware/consistency/consistency.go +++ b/internal/middleware/consistency/consistency.go @@ -18,6 +18,7 @@ import ( "github.com/authzed/spicedb/internal/services/shared" "github.com/authzed/spicedb/pkg/cursor" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/zedtoken" ) @@ -55,19 +56,39 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, handle := c.(*revisionHandle) rev := handle.revision if rev != nil { - return rev, zedtoken.MustNewFromRevision(rev), nil + ds := datastoremw.FromContext(ctx) + if ds == nil { + return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore") + } + + zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds) + if err != nil { + return nil, nil, err + } + + return rev, zedToken, nil } } return nil, nil, fmt.Errorf("consistency middleware did not inject revision") } +type MismatchingTokenOption int + +const ( + TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota + + TreatMismatchingTokensAsMinLatency + + TreatMismatchingTokensAsError +) + // AddRevisionToContext adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore) error { +func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, option MismatchingTokenOption) error { switch req := req.(type) { case hasConsistency: - return addRevisionToContextFromConsistency(ctx, req, ds) + return addRevisionToContextFromConsistency(ctx, req, ds, option) default: return nil } @@ -75,7 +96,7 @@ func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Dat // addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore) error { +func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, option MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil @@ -91,7 +112,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency // Always use the revision encoded in the cursor. ConsistentyCounter.WithLabelValues("snapshot", "cursor").Inc() - requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds) + requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -130,7 +151,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency case consistency.GetAtLeastAsFresh() != nil: // At least as fresh as: Pick one of the datastore's revision and that specified, which // ever is later. - picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds) + picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -147,11 +168,16 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency // Exact snapshot: Use the revision as encoded in the zed token. ConsistentyCounter.WithLabelValues("snapshot", "request").Inc() - requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) + requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) if err != nil { return errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + log.Error().Str("zedtoken", consistency.GetAtExactSnapshot().Token).Msg("ZedToken specified references an older datastore but at-exact-snapshot was requested") + return fmt.Errorf("ZedToken specified references an older datastore but at-exact-snapshot was requested") + } + err = ds.CheckRevision(ctx, requestedRev) if err != nil { return rewriteDatastoreError(ctx, err) @@ -175,7 +201,7 @@ var bypassServiceWhitelist = map[string]struct{}{ // UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func UnaryServerInterceptor() grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(option MismatchingTokenOption) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { @@ -184,7 +210,7 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor { } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := AddRevisionToContext(newCtx, req, ds); err != nil { + if err := AddRevisionToContext(newCtx, req, ds, option); err != nil { return nil, err } @@ -194,21 +220,22 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor { // StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func StreamServerInterceptor() grpc.StreamServerInterceptor { +func StreamServerInterceptor(option MismatchingTokenOption) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context())} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), option} return handler(srv, wrapper) } } type recvWrapper struct { grpc.ServerStream - ctx context.Context + ctx context.Context + option MismatchingTokenOption } func (s *recvWrapper) Context() context.Context { return s.ctx } @@ -219,12 +246,12 @@ func (s *recvWrapper) RecvMsg(m interface{}) error { } ds := datastoremw.MustFromContext(s.ctx) - return AddRevisionToContext(s.ctx, m, ds) + return AddRevisionToContext(s.ctx, m, ds, s.option) } // pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most // recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise. -func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) { +func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) { // Calculate a revision as we see fit databaseRev, err := ds.OptimizedRevision(ctx) if err != nil { @@ -232,11 +259,35 @@ func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore. } if requested != nil { - requestedRev, err := zedtoken.DecodeRevision(requested, ds) + requestedRev, status, err := zedtoken.DecodeRevision(requested, ds) if err != nil { return datastore.NoRevision, false, errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + switch option { + case TreatMismatchingTokensAsFullConsistency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a full consistency request") + headRev, err := ds.HeadRevision(ctx) + if err != nil { + return datastore.NoRevision, false, err + } + + return headRev, false, nil + + case TreatMismatchingTokensAsMinLatency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a min latency request") + return databaseRev, false, nil + + case TreatMismatchingTokensAsError: + log.Error().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") + return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") + + default: + return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option) + } + } + if databaseRev.GreaterThan(requestedRev) { return databaseRev, false, nil } diff --git a/internal/middleware/consistency/consistency_test.go b/internal/middleware/consistency/consistency_test.go index 58a2555246..e28a816805 100644 --- a/internal/middleware/consistency/consistency_test.go +++ b/internal/middleware/consistency/consistency_test.go @@ -10,6 +10,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" "github.com/authzed/spicedb/internal/datastore/revisions" + datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/pkg/cursor" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/zedtoken" @@ -29,7 +30,9 @@ func TestAddRevisionToContextNoneSupplied(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) - err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -46,13 +49,15 @@ func TestAddRevisionToContextMinimizeLatency(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{ MinimizeLatency: true, }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -69,13 +74,15 @@ func TestAddRevisionToContextFullyConsistent(t *testing.T) { ds.On("HeadRevision").Return(head, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_FullyConsistent{ FullyConsistent: true, }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -93,13 +100,15 @@ func TestAddRevisionToContextAtLeastAsFresh(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(exact), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -117,13 +126,15 @@ func TestAddRevisionToContextAtValidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -141,13 +152,15 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", zero.String()).Return(zero, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(zero), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(zero), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.Error(err) ds.AssertExpectations(t) } @@ -155,7 +168,10 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { func TestAddRevisionToContextNoConsistencyAPI(t *testing.T) { require := require.New(t) + ds := &proxy_test.MockDatastore{} + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) _, _, err := RevisionFromContext(updated) require.Error(err) @@ -174,14 +190,16 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { // revision in context is at `exact` updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, OptionalCursor: cursor, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) // ensure we get back `optimized` from the cursor @@ -191,3 +209,153 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { require.True(optimized.Equal(rev)) ds.AssertExpectations(t) } + +func TestAtExactSnapshotWithMismatchedToken(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore but at-exact-snapshot") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectError(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectMinLatency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsMinLatency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(optimized.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectFullConsistency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("HeadRevision").Return(head, nil).Once() + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsFullConsistency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(head.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAddRevisionToContextAtLeastAsFreshMatchingIDs(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() + + ds.CurrentUniqueID = "foo" + + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(exact.Equal(rev)) + ds.AssertExpectations(t) +} diff --git a/internal/services/integrationtesting/cert_test.go b/internal/services/integrationtesting/cert_test.go index 516aa6de19..4ccdf27271 100644 --- a/internal/services/integrationtesting/cert_test.go +++ b/internal/services/integrationtesting/cert_test.go @@ -146,7 +146,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor(), + Middleware: consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -165,7 +165,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor(), + Middleware: consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -209,7 +209,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, @@ -262,7 +262,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/integrationtesting/consistencytestutil/servicetester.go b/internal/services/integrationtesting/consistencytestutil/servicetester.go index 0dbc26a2c6..7c90a1ee98 100644 --- a/internal/services/integrationtesting/consistencytestutil/servicetester.go +++ b/internal/services/integrationtesting/consistencytestutil/servicetester.go @@ -78,7 +78,7 @@ func (v1st v1ServiceTester) Check(ctx context.Context, resource *core.ObjectAndR }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: context, @@ -98,7 +98,7 @@ func (v1st v1ServiceTester) Expand(ctx context.Context, resource *core.ObjectAnd Permission: resource.Relation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -128,7 +128,7 @@ func (v1st v1ServiceTester) Read(_ context.Context, namespaceName string, atRevi }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -166,7 +166,7 @@ func (v1st v1ServiceTester) LookupResources(_ context.Context, resourceRelation }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, OptionalLimit: limit, @@ -214,7 +214,7 @@ func (v1st v1ServiceTester) LookupSubjects(_ context.Context, resource *core.Obj OptionalSubjectRelation: optionalizeRelation(subjectRelation.Relation), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: builtContext, @@ -244,7 +244,7 @@ func (v1st v1ServiceTester) BulkCheck(ctx context.Context, items []*v1.BulkCheck Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -260,7 +260,7 @@ func (v1st v1ServiceTester) CheckBulk(ctx context.Context, items []*v1.CheckBulk Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) diff --git a/internal/services/integrationtesting/perf_test.go b/internal/services/integrationtesting/perf_test.go index f411bce80e..d2ab1786b9 100644 --- a/internal/services/integrationtesting/perf_test.go +++ b/internal/services/integrationtesting/perf_test.go @@ -58,7 +58,7 @@ func TestBurst(t *testing.T) { _, err := client.CheckPermission(context.Background(), &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/v1/debug_test.go b/internal/services/v1/debug_test.go index dea12bb669..e829a1d141 100644 --- a/internal/services/v1/debug_test.go +++ b/internal/services/v1/debug_test.go @@ -467,7 +467,7 @@ func TestCheckPermissionWithDebug(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: stc.checkRequest.resource, diff --git a/internal/services/v1/metadata_test.go b/internal/services/v1/metadata_test.go index 90ffc962b6..e5ee8183f8 100644 --- a/internal/services/v1/metadata_test.go +++ b/internal/services/v1/metadata_test.go @@ -38,7 +38,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -53,7 +53,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckBulkPermissions(ctx, &v1.CheckBulkPermissionsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Items: []*v1.CheckBulkPermissionsRequestItem{ @@ -96,7 +96,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.ExpandPermissionTree(ctx, &v1.ExpandPermissionTreeRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -130,7 +130,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupResources(ctx, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -155,7 +155,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupSubjects(ctx, &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), diff --git a/internal/services/v1/permissions_test.go b/internal/services/v1/permissions_test.go index bd30cffbd0..1cc15c6d96 100644 --- a/internal/services/v1/permissions_test.go +++ b/internal/services/v1/permissions_test.go @@ -273,7 +273,7 @@ func TestCheckPermissions(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: tc.resource, @@ -328,7 +328,7 @@ func TestCheckPermissionWithDebugInfo(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -543,7 +543,7 @@ func TestLookupResources(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -618,7 +618,7 @@ func TestExpand(t *testing.T) { Permission: tc.startPermission, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -862,7 +862,7 @@ func TestLookupSubjects(t *testing.T) { OptionalSubjectRelation: tc.subjectRelation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -910,7 +910,7 @@ func TestCheckWithCaveats(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "companyplan"), @@ -1022,7 +1022,7 @@ func TestCheckWithCaveatErrors(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "firstdoc"), @@ -1075,7 +1075,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request := &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1121,7 +1121,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request = &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1194,7 +1194,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1239,7 +1239,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1284,7 +1284,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1358,7 +1358,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1397,7 +1397,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1541,7 +1541,7 @@ func TestLookupResourcesWithCursors(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, OptionalLimit: uint32(limit), @@ -1610,7 +1610,7 @@ func TestLookupResourcesDeduplication(t *testing.T) { Subject: sub("user", "tom", ""), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }) diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index 7d3f8b6ce3..b23b9aebf8 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -329,8 +329,13 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ writeUpdateCounter.WithLabelValues(v1.RelationshipUpdate_Operation_name[int32(kind)]).Observe(float64(count)) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.WriteRelationshipsResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } @@ -424,8 +429,13 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del return nil, ps.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.DeleteRelationshipsResponse{ - DeletedAt: zedtoken.MustNewFromRevision(revision), + DeletedAt: zedToken, DeletionProgress: deletionProgress, }, nil } diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 3c1bedcc04..662a610ef4 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -296,7 +296,7 @@ func TestReadRelationships(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: tc.filter, @@ -1151,7 +1151,7 @@ func TestDeleteRelationships(t *testing.T) { } require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(tc.deleted), readAll(require, client, resp.DeletedAt)) @@ -1227,7 +1227,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { headRev, err := ds.HeadRevision(context.Background()) require.NoError(err) - beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) resp, err := client.DeleteRelationships(context.Background(), &v1.DeleteRelationshipsRequest{ RelationshipFilter: &v1.RelationshipFilter{ @@ -1238,7 +1238,10 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { }) require.NoError(err) - afterDelete := readOfType(require, "document", client, resp.DeletedAt) + headRev, err = ds.HeadRevision(context.Background()) + require.NoError(err) + + afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) require.LessOrEqual(len(beforeDelete)-len(afterDelete), batchSize) if i == 0 { @@ -1249,7 +1252,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(expected), readAll(require, client, resp.DeletedAt)) @@ -1506,7 +1509,7 @@ func TestReadRelationshipsInvalidCursor(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: &v1.RelationshipFilter{ diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index 25cfc72ba9..8638949dd6 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -97,9 +97,14 @@ func (ss *schemaServer) ReadSchema(ctx context.Context, _ *v1.ReadSchemaRequest) DispatchCount: dispatchCount, }) + zedToken, err := zedtoken.NewFromRevision(ctx, headRevision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.ReadSchemaResponse{ SchemaText: schemaText, - ReadAt: zedtoken.MustNewFromRevision(headRevision), + ReadAt: zedToken, }, nil } @@ -145,7 +150,12 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque return nil, ss.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.WriteSchemaResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } diff --git a/internal/services/v1/watch.go b/internal/services/v1/watch.go index 0bb42fc733..8c8bbc7d74 100644 --- a/internal/services/v1/watch.go +++ b/internal/services/v1/watch.go @@ -52,11 +52,15 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS var afterRevision datastore.Revision if req.OptionalStartCursor != nil && req.OptionalStartCursor.Token != "" { - decodedRevision, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) + decodedRevision, tokenStatus, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) if err != nil { return status.Errorf(codes.InvalidArgument, "failed to decode start revision: %s", err) } + if tokenStatus == zedtoken.StatusMismatchedDatastoreID { + return status.Errorf(codes.InvalidArgument, "start revision was generated by a different datastore") + } + afterRevision = decodedRevision } else { var err error @@ -95,9 +99,14 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if ok { filtered := filterUpdates(objectTypes, filters, update.RelationshipChanges) if len(filtered) > 0 { + zedToken, err := zedtoken.NewFromRevision(ctx, update.Revision, ds) + if err != nil { + return err + } + if err := stream.Send(&v1.WatchResponse{ Updates: filtered, - ChangesThrough: zedtoken.MustNewFromRevision(update.Revision), + ChangesThrough: zedToken, }); err != nil { return status.Errorf(codes.Canceled, "watch canceled by user: %s", err) } diff --git a/internal/services/v1/watch_test.go b/internal/services/v1/watch_test.go index 09de4fcecd..97f499dfae 100644 --- a/internal/services/v1/watch_test.go +++ b/internal/services/v1/watch_test.go @@ -210,7 +210,7 @@ func TestWatch(t *testing.T) { t.Cleanup(cleanup) client := v1.NewWatchServiceClient(conn) - cursor := zedtoken.MustNewFromRevision(revision) + cursor := zedtoken.MustNewFromRevisionForTesting(revision) if tc.startCursor != nil { cursor = tc.startCursor } diff --git a/internal/testserver/server.go b/internal/testserver/server.go index 108d3156c3..e4dbc25919 100644 --- a/internal/testserver/server.go +++ b/internal/testserver/server.go @@ -90,7 +90,7 @@ func NewTestServerWithConfig(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor(), + Middleware: consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -113,7 +113,7 @@ func NewTestServerWithConfig(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor(), + Middleware: consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index ffdcc7dbf5..54ee2da3eb 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -132,6 +132,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { cmd.Flags().IntVar(&config.MaxRelationshipContextSize, "max-relationship-context-size", 25000, "maximum allowed size of the context to be stored in a relationship") cmd.Flags().DurationVar(&config.StreamingAPITimeout, "streaming-api-response-delay-timeout", 30*time.Second, "max duration time elapsed between messages sent by the server-side to the client (responses) before the stream times out") cmd.Flags().DurationVar(&config.WatchHeartbeat, "watch-api-heartbeat", 1*time.Second, "heartbeat time on the watch in the API. 0 means to default to the datastore's minimum.") + cmd.Flags().StringVar(&config.MismatchZedTokenBehavior, "mismatch-zed-token-behavior", "full-consistency", "behavior when a mismatched zedtoken is encountered. One of: full-consistency (treat as a full-consistency call), min-latency (treat as a min-latency call), error (return an error). defaults to full-consistency for safety.") cmd.Flags().BoolVar(&config.V1SchemaAdditiveOnly, "testing-only-schema-additive-writes", false, "append new definitions to the existing schema, rather than overwriting it") if err := cmd.Flags().MarkHidden("testing-only-schema-additive-writes"); err != nil { diff --git a/pkg/cmd/server/defaults.go b/pkg/cmd/server/defaults.go index 1fd91091cf..46cfbaa592 100644 --- a/pkg/cmd/server/defaults.go +++ b/pkg/cmd/server/defaults.go @@ -169,14 +169,15 @@ const ( ) type MiddlewareOption struct { - logger zerolog.Logger - authFunc grpcauth.AuthFunc - enableVersionResponse bool - dispatcher dispatch.Dispatcher - ds datastore.Datastore - enableRequestLog bool - enableResponseLog bool - disableGRPCHistogram bool + logger zerolog.Logger + authFunc grpcauth.AuthFunc + enableVersionResponse bool + dispatcher dispatch.Dispatcher + ds datastore.Datastore + enableRequestLog bool + enableResponseLog bool + disableGRPCHistogram bool + mismatchingZedTokenOption consistencymw.MismatchingTokenOption } // gRPCMetricsUnaryInterceptor creates the default prometheus metrics interceptor for unary gRPCs @@ -276,7 +277,7 @@ func DefaultUnaryMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.UnaryS NewUnaryMiddleware(). WithName(DefaultInternalMiddlewareConsistency). WithInternal(true). - WithInterceptor(consistencymw.UnaryServerInterceptor()). + WithInterceptor(consistencymw.UnaryServerInterceptor(opts.mismatchingZedTokenOption)). Done(), NewUnaryMiddleware(). @@ -354,7 +355,7 @@ func DefaultStreamingMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.St NewStreamMiddleware(). WithName(DefaultInternalMiddlewareConsistency). WithInternal(true). - WithInterceptor(consistencymw.StreamServerInterceptor()). + WithInterceptor(consistencymw.StreamServerInterceptor(opts.mismatchingZedTokenOption)). Done(), NewStreamMiddleware(). diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index cb3968d821..27f6aa16f3 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -33,6 +33,7 @@ import ( "github.com/authzed/spicedb/internal/dispatch/graph" "github.com/authzed/spicedb/internal/gateway" log "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/middleware/consistency" "github.com/authzed/spicedb/internal/services" dispatchSvc "github.com/authzed/spicedb/internal/services/dispatch" "github.com/authzed/spicedb/internal/services/health" @@ -111,6 +112,7 @@ type Config struct { MaxDatastoreReadPageSize uint64 `debugmap:"visible"` StreamingAPITimeout time.Duration `debugmap:"visible"` WatchHeartbeat time.Duration `debugmap:"visible"` + MismatchZedTokenBehavior string `debugmap:"visible"` // Additional Services MetricsAPI util.HTTPServerConfig `debugmap:"visible"` @@ -363,6 +365,24 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { watchServiceOption = services.WatchServiceDisabled } + var mismatchZedTokenOption consistency.MismatchingTokenOption + switch c.MismatchZedTokenBehavior { + case "": + fallthrough + + case "full-consistency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsFullConsistency + + case "min-latency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsMinLatency + + case "error": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsError + + default: + return nil, fmt.Errorf("unknown mismatched zedtoken behavior: %s", c.MismatchZedTokenBehavior) + } + opts := MiddlewareOption{ log.Logger, c.GRPCAuthFunc, @@ -372,6 +392,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { c.EnableRequestLogs, c.EnableResponseLogs, c.DisableGRPCLatencyHistogram, + mismatchZedTokenOption, } defaultUnaryMiddlewareChain, err := DefaultUnaryMiddleware(opts) if err != nil { diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 9ae630b6ce..363f7d2794 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -9,6 +9,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/middleware/consistency" "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" @@ -230,7 +231,7 @@ func TestModifyUnaryMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, false} + opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, false, consistency.TreatMismatchingTokensAsFullConsistency} defaultMw, err := DefaultUnaryMiddleware(opt) require.NoError(t, err) @@ -256,7 +257,7 @@ func TestModifyStreamingMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, false} + opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, false, consistency.TreatMismatchingTokensAsFullConsistency} defaultMw, err := DefaultStreamingMiddleware(opt) require.NoError(t, err) diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index 33572cb0b8..0023de59ce 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -81,6 +81,7 @@ func (c *Config) ToOption() ConfigOption { to.MaxDatastoreReadPageSize = c.MaxDatastoreReadPageSize to.StreamingAPITimeout = c.StreamingAPITimeout to.WatchHeartbeat = c.WatchHeartbeat + to.MismatchZedTokenBehavior = c.MismatchZedTokenBehavior to.MetricsAPI = c.MetricsAPI to.UnaryMiddlewareModification = c.UnaryMiddlewareModification to.StreamingMiddlewareModification = c.StreamingMiddlewareModification @@ -142,6 +143,7 @@ func (c Config) DebugMap() map[string]any { debugMap["MaxDatastoreReadPageSize"] = helpers.DebugValue(c.MaxDatastoreReadPageSize, false) debugMap["StreamingAPITimeout"] = helpers.DebugValue(c.StreamingAPITimeout, false) debugMap["WatchHeartbeat"] = helpers.DebugValue(c.WatchHeartbeat, false) + debugMap["MismatchZedTokenBehavior"] = helpers.DebugValue(c.MismatchZedTokenBehavior, false) debugMap["MetricsAPI"] = helpers.DebugValue(c.MetricsAPI, false) debugMap["SilentlyDisableTelemetry"] = helpers.DebugValue(c.SilentlyDisableTelemetry, false) debugMap["TelemetryCAOverridePath"] = helpers.DebugValue(c.TelemetryCAOverridePath, false) @@ -498,6 +500,13 @@ func WithWatchHeartbeat(watchHeartbeat time.Duration) ConfigOption { } } +// WithMismatchZedTokenBehavior returns an option that can set MismatchZedTokenBehavior on a Config +func WithMismatchZedTokenBehavior(mismatchZedTokenBehavior string) ConfigOption { + return func(c *Config) { + c.MismatchZedTokenBehavior = mismatchZedTokenBehavior + } +} + // WithMetricsAPI returns an option that can set MetricsAPI on a Config func WithMetricsAPI(metricsAPI util.HTTPServerConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cmd/testserver/testserver.go b/pkg/cmd/testserver/testserver.go index 47b3f22877..c90a87b263 100644 --- a/pkg/cmd/testserver/testserver.go +++ b/pkg/cmd/testserver/testserver.go @@ -78,13 +78,13 @@ func (c *Config) Complete() (RunnableTestServer, error) { grpc.ChainUnaryInterceptor( datastoreMiddleware.UnaryServerInterceptor(), dispatchmw.UnaryServerInterceptor(dispatcher), - consistencymw.UnaryServerInterceptor(), + consistencymw.UnaryServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.UnaryServerInterceptor, ), grpc.ChainStreamInterceptor( datastoreMiddleware.StreamServerInterceptor(), dispatchmw.StreamServerInterceptor(dispatcher), - consistencymw.StreamServerInterceptor(), + consistencymw.StreamServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.StreamServerInterceptor, ), ) @@ -97,14 +97,14 @@ func (c *Config) Complete() (RunnableTestServer, error) { datastoreMiddleware.UnaryServerInterceptor(), readonly.UnaryServerInterceptor(), dispatchmw.UnaryServerInterceptor(dispatcher), - consistencymw.UnaryServerInterceptor(), + consistencymw.UnaryServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.UnaryServerInterceptor, ), grpc.ChainStreamInterceptor( datastoreMiddleware.StreamServerInterceptor(), readonly.StreamServerInterceptor(), dispatchmw.StreamServerInterceptor(dispatcher), - consistencymw.StreamServerInterceptor(), + consistencymw.StreamServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.StreamServerInterceptor, ), ) diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go index 38df2353ee..cc81b31be9 100644 --- a/pkg/cursor/cursor.go +++ b/pkg/cursor/cursor.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "encoding/base64" "errors" "fmt" @@ -11,6 +12,7 @@ import ( dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" impl "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/zedtoken" ) // Encode converts a decoded cursor to its opaque version. @@ -92,25 +94,39 @@ func DecodeToDispatchCursor(encoded *v1.Cursor, callAndParameterHash string) (*d // DecodeToDispatchRevision decodes an encoded API cursor into an internal dispatch revision. // NOTE: this method does *not* verify the caller's method signature. -func DecodeToDispatchRevision(encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, error) { +func DecodeToDispatchRevision(ctx context.Context, encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, zedtoken.TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return nil, err + return nil, zedtoken.StatusUnknown, err } v1decoded := decoded.GetV1() if v1decoded == nil { - return nil, ErrNilCursor + return nil, zedtoken.StatusUnknown, ErrNilCursor + } + + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, zedtoken.StatusUnknown, fmt.Errorf(errEncodeError, err) } parsed, err := ds.RevisionFromString(v1decoded.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, zedtoken.StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if v1decoded.DatastoreUniqueId == "" { + return parsed, zedtoken.StatusLegacyEmptyDatastoreID, nil + } + + if v1decoded.DatastoreUniqueId != datastoreUniqueID { + return parsed, zedtoken.StatusMismatchedDatastoreID, nil } - return parsed, nil + return parsed, zedtoken.StatusValid, nil } type revisionDecoder interface { + UniqueID(_ context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/cursor/cursor_test.go b/pkg/cursor/cursor_test.go index 9013fb51df..1c66863024 100644 --- a/pkg/cursor/cursor_test.go +++ b/pkg/cursor/cursor_test.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "fmt" "testing" @@ -58,7 +59,7 @@ func TestEncodeDecode(t *testing.T) { require.Equal(tc.sections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(encoded, revisions.CommonDecoder{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -136,7 +137,7 @@ func TestDecode(t *testing.T) { require.NotNil(decoded) require.Equal(testCase.expectedSections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(&v1.Cursor{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), &v1.Cursor{ Token: testCase.token, }, revisions.CommonDecoder{ Kind: revisions.TransactionID, diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index 4a93fa1445..c7ff66ed5c 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -124,11 +124,11 @@ func (dc *DevContext) RunV1InMemoryService() (*grpc.ClientConn, func(), error) { s := grpc.NewServer( grpc.ChainUnaryInterceptor( datastoremw.UnaryServerInterceptor(dc.Datastore), - consistency.UnaryServerInterceptor(), + consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), ), grpc.ChainStreamInterceptor( datastoremw.StreamServerInterceptor(dc.Datastore), - consistency.StreamServerInterceptor(), + consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), ), ) ps := v1svc.NewPermissionsServer(dc.Dispatcher, v1svc.PermissionsServerConfig{ diff --git a/pkg/proto/impl/v1/impl.pb.go b/pkg/proto/impl/v1/impl.pb.go index 5b26440200..7336605cca 100644 --- a/pkg/proto/impl/v1/impl.pb.go +++ b/pkg/proto/impl/v1/impl.pb.go @@ -400,6 +400,9 @@ type V1Cursor struct { CallAndParametersHash string `protobuf:"bytes,3,opt,name=call_and_parameters_hash,json=callAndParametersHash,proto3" json:"call_and_parameters_hash,omitempty"` // dispatch_version is the version of the dispatcher which created the cursor. DispatchVersion uint32 `protobuf:"varint,4,opt,name=dispatch_version,json=dispatchVersion,proto3" json:"dispatch_version,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + DatastoreUniqueId string `protobuf:"bytes,5,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *V1Cursor) Reset() { @@ -462,6 +465,13 @@ func (x *V1Cursor) GetDispatchVersion() uint32 { return 0 } +func (x *V1Cursor) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + type DocComment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -805,6 +815,9 @@ type DecodedZedToken_V1ZedToken struct { unknownFields protoimpl.UnknownFields Revision string `protobuf:"bytes,1,opt,name=revision,proto3" json:"revision,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + DatastoreUniqueId string `protobuf:"bytes,2,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *DecodedZedToken_V1ZedToken) Reset() { @@ -846,6 +859,13 @@ func (x *DecodedZedToken_V1ZedToken) GetRevision() string { return "" } +func (x *DecodedZedToken_V1ZedToken) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + var File_impl_v1_impl_proto protoreflect.FileDescriptor var file_impl_v1_impl_proto_rawDesc = []byte{ @@ -875,7 +895,7 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x08, 0x56, 0x32, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x82, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, + 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0xb2, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x55, 0x0a, 0x14, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x76, 0x31, 0x5f, 0x7a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, @@ -888,15 +908,18 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x65, 0x6e, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x1a, 0x26, 0x0a, 0x08, 0x56, 0x31, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, - 0x1a, 0x28, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, + 0x1a, 0x58, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, + 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x45, 0x0a, 0x0d, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x23, 0x0a, 0x02, 0x76, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, - 0x6f, 0x66, 0x22, 0xa6, 0x01, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, + 0x6f, 0x66, 0x22, 0xd6, 0x01, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x73, @@ -906,7 +929,10 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x6e, 0x64, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x48, 0x61, 0x73, 0x68, 0x12, 0x29, 0x0a, 0x10, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x64, 0x69, 0x73, 0x70, - 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, 0x0a, 0x44, + 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, + 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, + 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x22, 0x26, 0x0a, 0x0a, 0x44, 0x6f, 0x63, 0x43, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x8e, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, diff --git a/pkg/proto/impl/v1/impl.pb.validate.go b/pkg/proto/impl/v1/impl.pb.validate.go index 43a753a435..d46c680943 100644 --- a/pkg/proto/impl/v1/impl.pb.validate.go +++ b/pkg/proto/impl/v1/impl.pb.validate.go @@ -733,6 +733,8 @@ func (m *V1Cursor) validate(all bool) error { // no validation rules for DispatchVersion + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return V1CursorMultiError(errors) } @@ -1589,6 +1591,8 @@ func (m *DecodedZedToken_V1ZedToken) validate(all bool) error { // no validation rules for Revision + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return DecodedZedToken_V1ZedTokenMultiError(errors) } diff --git a/pkg/proto/impl/v1/impl_vtproto.pb.go b/pkg/proto/impl/v1/impl_vtproto.pb.go index b33b124c04..e39a6cddaa 100644 --- a/pkg/proto/impl/v1/impl_vtproto.pb.go +++ b/pkg/proto/impl/v1/impl_vtproto.pb.go @@ -154,6 +154,7 @@ func (m *DecodedZedToken_V1ZedToken) CloneVT() *DecodedZedToken_V1ZedToken { } r := new(DecodedZedToken_V1ZedToken) r.Revision = m.Revision + r.DatastoreUniqueId = m.DatastoreUniqueId if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -242,6 +243,7 @@ func (m *V1Cursor) CloneVT() *V1Cursor { r.Revision = m.Revision r.CallAndParametersHash = m.CallAndParametersHash r.DispatchVersion = m.DispatchVersion + r.DatastoreUniqueId = m.DatastoreUniqueId if rhs := m.Sections; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -542,6 +544,9 @@ func (this *DecodedZedToken_V1ZedToken) EqualVT(that *DecodedZedToken_V1ZedToken if this.Revision != that.Revision { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -707,6 +712,9 @@ func (this *V1Cursor) EqualVT(that *V1Cursor) bool { if this.DispatchVersion != that.DispatchVersion { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1121,6 +1129,13 @@ func (m *DecodedZedToken_V1ZedToken) MarshalToSizedBufferVT(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x12 + } if len(m.Revision) > 0 { i -= len(m.Revision) copy(dAtA[i:], m.Revision) @@ -1302,6 +1317,13 @@ func (m *V1Cursor) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x2a + } if m.DispatchVersion != 0 { i = protohelpers.EncodeVarint(dAtA, i, uint64(m.DispatchVersion)) i-- @@ -1628,6 +1650,10 @@ func (m *DecodedZedToken_V1ZedToken) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1717,6 +1743,10 @@ func (m *V1Cursor) SizeVT() (n int) { if m.DispatchVersion != 0 { n += 1 + protohelpers.SizeOfVarint(uint64(m.DispatchVersion)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -2358,6 +2388,38 @@ func (m *DecodedZedToken_V1ZedToken) UnmarshalVT(dAtA []byte) error { } m.Revision = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2749,6 +2811,38 @@ func (m *V1Cursor) UnmarshalVT(dAtA []byte) error { break } } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/zedtoken/zedtoken.go b/pkg/zedtoken/zedtoken.go index 797b3e184b..f655b74565 100644 --- a/pkg/zedtoken/zedtoken.go +++ b/pkg/zedtoken/zedtoken.go @@ -2,6 +2,7 @@ package zedtoken import ( + "context" "encoding/base64" "errors" "fmt" @@ -22,9 +23,32 @@ const ( // zedtoken argument to Decode var ErrNilZedToken = errors.New("zedtoken pointer was nil") -// MustNewFromRevision generates an encoded zedtoken from an integral revision. -func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { - encoded, err := NewFromRevision(revision) +// legacyEmptyDatastoreID is the empty datastore ID for legacy tokens and cursors. +const legacyEmptyDatastoreID = "" + +// TokenStatus is the status of a zedtoken. +type TokenStatus int + +const ( + // StatusUnknown indicates that the status of the zedtoken is unknown. + StatusUnknown TokenStatus = iota + + // StatusLegacyEmptyDatastoreID indicates that the zedtoken is a legacy token + // with an empty datastore ID. + StatusLegacyEmptyDatastoreID + + // StatusValid indicates that the zedtoken is valid. + StatusValid + + // StatusMismatchedDatastoreID indicates that the zedtoken is valid, but the + // datastore ID does not match the current datastore, indicating that the + // token was generated by a different datastore. + StatusMismatchedDatastoreID +) + +// MustNewFromRevisionForTesting generates an encoded zedtoken from an integral revision. +func MustNewFromRevisionForTesting(revision datastore.Revision) *v1.ZedToken { + encoded, err := newFromRevision(revision, legacyEmptyDatastoreID) if err != nil { panic(err) } @@ -32,11 +56,21 @@ func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { } // NewFromRevision generates an encoded zedtoken from an integral revision. -func NewFromRevision(revision datastore.Revision) (*v1.ZedToken, error) { +func NewFromRevision(ctx context.Context, revision datastore.Revision, ds datastore.Datastore) (*v1.ZedToken, error) { + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, fmt.Errorf(errEncodeError, err) + } + + return newFromRevision(revision, datastoreUniqueID) +} + +func newFromRevision(revision datastore.Revision, datastoreUniqueID string) (*v1.ZedToken, error) { toEncode := &zedtoken.DecodedZedToken{ VersionOneof: &zedtoken.DecodedZedToken_V1{ V1: &zedtoken.DecodedZedToken_V1ZedToken{ - Revision: revision.String(), + Revision: revision.String(), + DatastoreUniqueId: datastoreUniqueID, }, }, } @@ -77,10 +111,10 @@ func Decode(encoded *v1.ZedToken) (*zedtoken.DecodedZedToken, error) { } // DecodeRevision converts and extracts the revision from a zedtoken or legacy zookie. -func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, error) { +func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return datastore.NoRevision, err + return datastore.NoRevision, StatusUnknown, err } switch ver := decoded.VersionOneof.(type) { @@ -88,21 +122,36 @@ func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revisio revString := fmt.Sprintf("%d", ver.DeprecatedV1Zookie.Revision) parsed, err := ds.RevisionFromString(revString) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + return parsed, StatusLegacyEmptyDatastoreID, nil case *zedtoken.DecodedZedToken_V1: parsed, err := ds.RevisionFromString(ver.V1.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + + if ver.V1.DatastoreUniqueId == legacyEmptyDatastoreID { + return parsed, StatusLegacyEmptyDatastoreID, nil + } + + datastoreUniqueID, err := ds.UniqueID(context.Background()) + if err != nil { + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if ver.V1.DatastoreUniqueId != datastoreUniqueID { + return parsed, StatusMismatchedDatastoreID, nil + } + + return parsed, StatusValid, nil default: - return datastore.NoRevision, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) } } type revisionDecoder interface { + UniqueID(context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/zedtoken/zedtoken_test.go b/pkg/zedtoken/zedtoken_test.go index 09bbc4d48e..85df77aa4d 100644 --- a/pkg/zedtoken/zedtoken_test.go +++ b/pkg/zedtoken/zedtoken_test.go @@ -41,10 +41,9 @@ func TestZedTokenEncode(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -58,10 +57,9 @@ func TestZedTokenEncodeHLC(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.HybridLogicalClock, }) require.NoError(err) @@ -71,65 +69,92 @@ func TestZedTokenEncodeHLC(t *testing.T) { } var decodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { format: "invalid", token: "abc", expectedRevision: datastore.NoRevision, + expectedStatus: StatusUnknown, expectError: true, }, { format: "V1 Zookie", token: "CAESAA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggB", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggC", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAwiAAg==", expectedRevision: revisions.NewForTransactionID(256), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAIaAwoBMA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMQ==", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMg==", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBNA==", expectedRevision: revisions.NewForTransactionID(4), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, + { + format: "V1 ZedToken with matching datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "someuniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusValid, + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "anotheruniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusMismatchedDatastoreID, + expectError: false, + }, } func TestDecode(t *testing.T) { @@ -139,15 +164,17 @@ func TestDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.TransactionID, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.TransactionID, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", @@ -160,14 +187,17 @@ func TestDecode(t *testing.T) { } var hlcDecodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { - format: "V1 ZedToken", - token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + format: "V1 ZedToken", + token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + expectedStatus: StatusLegacyEmptyDatastoreID, expectedRevision: func() datastore.Revision { r, err := revisions.NewForHLC(decimal.NewFromInt(1621538189028928000)) if err != nil { @@ -175,11 +205,47 @@ var hlcDecodeTests = []struct { } return r }(), + }, + { + format: "V1 ZedToken", + token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + expectedStatus: StatusLegacyEmptyDatastoreID, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), expectError: false, }, { - format: "V1 ZedToken", - token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + format: "V1 ZedToken with matching datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusValid, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "arrrg-6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusMismatchedDatastoreID, expectedRevision: (func() datastore.Revision { v, err := decimal.NewFromString("1693540940373045727.0000000001") if err != nil { @@ -204,15 +270,17 @@ func TestHLCDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.HybridLogicalClock, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.HybridLogicalClock, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", diff --git a/proto/internal/impl/v1/impl.proto b/proto/internal/impl/v1/impl.proto index c6e871a1ed..0e3f1356e6 100644 --- a/proto/internal/impl/v1/impl.proto +++ b/proto/internal/impl/v1/impl.proto @@ -33,6 +33,10 @@ message DecodedZedToken { } message V1ZedToken { string revision = 1; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + string datastore_unique_id = 2; } oneof version_oneof { V1Zookie deprecated_v1_zookie = 2; @@ -60,6 +64,10 @@ message V1Cursor { // dispatch_version is the version of the dispatcher which created the cursor. uint32 dispatch_version = 4; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + string datastore_unique_id = 5; } message DocComment {