Skip to content

Commit

Permalink
Merge pull request #1972 from authzed/align-revision-timestamps-crdb-pg
Browse files Browse the repository at this point in the history
adjust pg revision timestamps
  • Loading branch information
vroldanbet committed Jul 4, 2024
2 parents 1f04451 + 36d8b12 commit 4891cfb
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 17 deletions.
76 changes: 76 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package postgres

import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -177,6 +178,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
MigrationPhase(config.migrationPhase),
))

t.Run("TestRevisionTimestampAndTransactionID", createDatastoreTest(
b,
RevisionTimestampAndTransactionIDTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))

t.Run("TestStrictReadMode", createReplicaDatastoreTest(
b,
StrictReadModeTest,
Expand Down Expand Up @@ -1536,4 +1546,70 @@ func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) {
)
}

func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// Run the watch API.
changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints,
})
require.Zero(len(errchan))

pds := ds.(*pgDatastore)
_, err = pds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
return rwt.WriteRelationships(ctx, []*core.RelationTupleUpdate{
tuple.Touch(tuple.MustParse("something:001#viewer@user:123")),
})
})
require.NoError(err)

anHourAgo := time.Now().UTC().Add(-1 * time.Hour)
var checkedUpdate, checkedCheckpoint bool
for {
if checkedCheckpoint && checkedUpdate {
break
}

changeWait := time.NewTimer(waitForChangesTimeout)
select {
case change, ok := <-changes:
if !ok {
errWait := time.NewTimer(waitForChangesTimeout)
select {
case err := <-errchan:
require.True(errors.As(err, &datastore.ErrWatchDisconnected{}))
return
case <-errWait.C:
require.Fail("Timed out waiting for ErrWatchDisconnected")
}
return
}

rev := change.Revision.(postgresRevision)
timestamp, timestampPresent := rev.OptionalNanosTimestamp()
require.True(timestampPresent, "expected timestamp to be present in revision")
isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo)
require.True(isCorrectAndUsesNanos, "timestamp is not correct")

_, transactionIDPresent := rev.OptionalTransactionID()
require.True(transactionIDPresent, "expected transactionID to be present in revision")

if change.IsCheckpoint {
checkedCheckpoint = true
} else {
checkedUpdate = true
}
time.Sleep(1 * time.Millisecond)
case <-changeWait.C:
require.Fail("Timed out")
}
}
}

const waitForChangesTimeout = 5 * time.Second
20 changes: 10 additions & 10 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func parseRevisionProto(revisionStr string) (datastore.Revision, error) {
xmax: uint64(xminInt + decoded.RelativeXmax),
xipList: xips,
},
optionalTxID: xid8{Uint64: decoded.OptionalTxid, Valid: decoded.OptionalTxid != 0},
optionalTimestamp: decoded.OptionalTimestamp,
optionalTxID: xid8{Uint64: decoded.OptionalTxid, Valid: decoded.OptionalTxid != 0},
optionalNanosTimestamp: decoded.OptionalTimestamp,
}, nil
}

Expand Down Expand Up @@ -248,9 +248,9 @@ func createNewTransaction(ctx context.Context, tx pgx.Tx) (newXID xid8, newSnaps
}

type postgresRevision struct {
snapshot pgSnapshot
optionalTxID xid8
optionalTimestamp uint64
snapshot pgSnapshot
optionalTxID xid8
optionalNanosTimestamp uint64
}

func (pr postgresRevision) Equal(rhsRaw datastore.Revision) bool {
Expand Down Expand Up @@ -298,14 +298,14 @@ func (pr postgresRevision) OptionalTransactionID() (xid8, bool) {
return pr.optionalTxID, true
}

// OptionalTimestamp returns a unix epoch timestamp representing the time at which the transaction committed as
// defined by the Postgres primary. This is not guaranteed to be monotonically increasing.
func (pr postgresRevision) OptionalTimestamp() (uint64, bool) {
if pr.optionalTimestamp == 0 {
// OptionalNanosTimestamp returns a unix epoch timestamp in nanos representing the time at which the transaction committed
// as defined by the Postgres primary. This is not guaranteed to be monotonically increasing
func (pr postgresRevision) OptionalNanosTimestamp() (uint64, bool) {
if pr.optionalNanosTimestamp == 0 {
return 0, false
}

return pr.optionalTimestamp, true
return pr.optionalNanosTimestamp, true
}

// MarshalBinary creates a version of the snapshot that uses relative encoding
Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/postgres/revisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ func TestRevisionSerDe(t *testing.T) {
func TestTxIDTimestampAvailable(t *testing.T) {
testTimestamp := uint64(time.Now().Unix())
snapshot := snap(0, 5, 1)
pgr := postgresRevision{snapshot: snapshot, optionalTxID: newXid8(1), optionalTimestamp: testTimestamp}
receivedTimestamp, ok := pgr.OptionalTimestamp()
pgr := postgresRevision{snapshot: snapshot, optionalTxID: newXid8(1), optionalNanosTimestamp: testTimestamp}
receivedTimestamp, ok := pgr.OptionalNanosTimestamp()
require.True(t, ok)
require.Equal(t, receivedTimestamp, testTimestamp)
txid, ok := pgr.OptionalTransactionID()
require.True(t, ok)
require.Equal(t, newXid8(1), txid)

anotherRev := postgresRevision{snapshot: snapshot}
_, ok = anotherRev.OptionalTimestamp()
_, ok = anotherRev.OptionalNanosTimestamp()
require.False(t, ok)
_, ok = anotherRev.OptionalTransactionID()
require.False(t, ok)
Expand Down
12 changes: 8 additions & 4 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ func (pgd *pgDatastore) Watch(
// transactions.
currentTxn = newTxns[len(newTxns)-1]
for _, newTx := range newTxns {
currentTxn = postgresRevision{snapshot: currentTxn.snapshot.markComplete(newTx.optionalTxID.Uint64)}
currentTxn = postgresRevision{
snapshot: currentTxn.snapshot.markComplete(newTx.optionalTxID.Uint64),
optionalTxID: currentTxn.optionalTxID,
optionalNanosTimestamp: currentTxn.optionalNanosTimestamp,
}
}

// If checkpoints were requested, output a checkpoint. While the Postgres datastore does not
Expand Down Expand Up @@ -201,9 +205,9 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev
}

ids = append(ids, postgresRevision{
snapshot: nextSnapshot.markComplete(nextXID.Uint64),
optionalTxID: nextXID,
optionalTimestamp: uint64(timestamp.Unix()),
snapshot: nextSnapshot.markComplete(nextXID.Uint64),
optionalTxID: nextXID,
optionalNanosTimestamp: uint64(timestamp.UnixNano()),
})
}
if rows.Err() != nil {
Expand Down

0 comments on commit 4891cfb

Please sign in to comment.