Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datastore): Adding BeginLater and transaction state #8984

Merged
merged 5 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datastore/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ func TestReadOptions(t *testing.T) {
}
// Test errors.
for _, q := range []*Query{
NewQuery("").Transaction(&Transaction{id: nil}),
NewQuery("").Transaction(&Transaction{id: tid}).EventualConsistency(),
NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}),
NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(),
} {
req := &pb.RunQueryRequest{}
if err := q.toRunQueryRequest(req); err == nil {
Expand Down
51 changes: 41 additions & 10 deletions datastore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type transactionSettings struct {
readOnly bool
prevID []byte // ID of the transaction to retry
readTime *timestamppb.Timestamp

// When set, skips the initial BeginTransaction RPC call to obtain txn id and
// uses the piggybacked txn id from first read rpc call.
// If there are no read operations on transaction, BeginTransaction RPC call is made
// before rollback or commit
// Currently, this setting is set but unused
// TODO: b/291258189 - Use this setting
beginLater bool
}

// newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
Expand Down Expand Up @@ -91,6 +99,7 @@ var ReadOnly TransactionOption

func init() {
ReadOnly = readOnly{}
BeginLater = beginLater{}
}

type readOnly struct{}
Expand All @@ -99,6 +108,25 @@ func (readOnly) apply(s *transactionSettings) {
s.readOnly = true
}

// BeginLater is a TransactionOption that can be used to improve transaction performance
// Currently, it is a no-op
// TODO: b/291258189 - Add implementation
var BeginLater TransactionOption

type beginLater struct{}

func (beginLater) apply(s *transactionSettings) {
s.beginLater = true
}

type transactionState int

const (
transactionStateNotStarted transactionState = iota // Currently unused
transactionStateInProgress
transactionStateExpired
)

// Transaction represents a set of datastore operations to be committed atomically.
//
// Operations are enqueued by calling the Put and Delete methods on Transaction
Expand All @@ -114,6 +142,8 @@ type Transaction struct {
ctx context.Context
mutations []*pb.Mutation // The mutations to apply.
pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
settings *transactionSettings
state transactionState
}

// NewTransaction starts a new transaction.
Expand Down Expand Up @@ -167,6 +197,8 @@ func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_
client: c,
mutations: nil,
pending: make(map[int]*PendingKey),
state: transactionStateInProgress,
settings: s,
}, nil
}

Expand Down Expand Up @@ -223,7 +255,7 @@ func (t *Transaction) Commit() (c *Commit, err error) {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit")
defer func() { trace.EndSpan(t.ctx, err) }()

if t.id == nil {
if t.state == transactionStateExpired {
return nil, errExpiredTransaction
}
req := &pb.CommitRequest{
Expand All @@ -237,7 +269,7 @@ func (t *Transaction) Commit() (c *Commit, err error) {
if status.Code(err) == codes.Aborted {
return nil, ErrConcurrentTransaction
}
t.id = nil // mark the transaction as expired
t.state = transactionStateExpired // mark the transaction as expired
if err != nil {
return nil, err
}
Expand All @@ -264,15 +296,14 @@ func (t *Transaction) Rollback() (err error) {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback")
defer func() { trace.EndSpan(t.ctx, err) }()

if t.id == nil {
if t.state == transactionStateExpired {
return errExpiredTransaction
}
id := t.id
t.id = nil
t.state = transactionStateExpired
_, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
ProjectId: t.client.dataset,
DatabaseId: t.client.databaseID,
Transaction: id,
Transaction: t.id,
})
return err
}
Expand Down Expand Up @@ -303,7 +334,7 @@ func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti")
defer func() { trace.EndSpan(t.ctx, err) }()

if t.id == nil {
if t.state == transactionStateExpired {
return errExpiredTransaction
}
opts := &pb.ReadOptions{
Expand Down Expand Up @@ -336,7 +367,7 @@ func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
// element of src in the same order.
// TODO(jba): rewrite in terms of Mutate.
func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) {
if t.id == nil {
if t.state == transactionStateExpired {
return nil, errExpiredTransaction
}
mutations, err := putMutations(keys, src)
Expand Down Expand Up @@ -376,7 +407,7 @@ func (t *Transaction) Delete(key *Key) error {
// DeleteMulti is a batch version of Delete.
// TODO(jba): rewrite in terms of Mutate.
func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
if t.id == nil {
if t.state == transactionStateExpired {
return errExpiredTransaction
}
mutations, err := deleteMutations(keys)
Expand All @@ -396,7 +427,7 @@ func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
//
// For an example, see Client.Mutate.
func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) {
if t.id == nil {
if t.state == transactionStateExpired {
return nil, errExpiredTransaction
}
pmuts, err := mutationProtos(muts)
Expand Down
Loading