Skip to content

Commit

Permalink
Introduce a BeginTransaction request.
Browse files Browse the repository at this point in the history
BeginTransactionRequest is automatically inserted by the KV client
immediately before the first transactional write. On execution,
BeginTransaction creates the transaction record.

If a heartbeat arrives for a txn that has no transaction record, it's
ignored. If a push txn arrives for a txn that has no transaction record,
the txn is considered aborted.

This solves the problem of errant heartbeats or pushes recreating a GC'd
transaction record, addressing #2062.
  • Loading branch information
spencerkimball committed Oct 8, 2015
1 parent 4ed6a45 commit 50d25e1
Show file tree
Hide file tree
Showing 22 changed files with 3,356 additions and 1,635 deletions.
1 change: 1 addition & 0 deletions client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (b *Batch) fillResults(br *roachpb.BatchResponse, pErr *roachpb.Error) erro
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
case *roachpb.BeginTransactionRequest:
case *roachpb.EndTransactionRequest:
case *roachpb.AdminMergeRequest:
case *roachpb.AdminSplitRequest:
Expand Down
49 changes: 35 additions & 14 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,27 +408,43 @@ func (txn *Txn) send(reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb.
return &roachpb.BatchResponse{}, nil
}

lastReq := reqs[lastIndex]
// haveTxnWrite tracks intention to write. This is in contrast to
// txn.Proto.Writing, which is set by the coordinator when the first
// intent has been created, and which lives for the life of the
// transaction.
haveTxnWrite := roachpb.IsTransactionWrite(lastReq)

for _, args := range reqs[:lastIndex] {
if _, ok := args.(*roachpb.EndTransactionRequest); ok {
return nil, roachpb.NewError(util.Errorf("%s sent as non-terminal call", args.Method()))
// firstWriteIndex is set to the index of the first command which is
// a transactional write. If != -1, this indicates an intention to
// write. This is in contrast to txn.Proto.Writing, which is set by
// the coordinator when the first intent has been created, and which
// lives for the life of the transaction.
firstWriteIndex := -1
var firstWriteKey roachpb.Key

for i, args := range reqs {
if i < lastIndex {
if _, ok := args.(*roachpb.EndTransactionRequest); ok {
return nil, roachpb.NewError(util.Errorf("%s sent as non-terminal call", args.Method()))
}
}

if !haveTxnWrite {
haveTxnWrite = roachpb.IsTransactionWrite(args)
if roachpb.IsTransactionWrite(args) && firstWriteIndex == -1 {
firstWriteKey = args.Header().Key
firstWriteIndex = i
}
}

endTxnRequest, haveEndTxn := lastReq.(*roachpb.EndTransactionRequest)
haveTxnWrite := firstWriteIndex != -1
endTxnRequest, haveEndTxn := reqs[lastIndex].(*roachpb.EndTransactionRequest)
needBeginTxn := !txn.Proto.Writing && haveTxnWrite
needEndTxn := txn.Proto.Writing || haveTxnWrite
elideEndTxn := haveEndTxn && !needEndTxn

// If we're not yet writing in this txn, but intend to, insert a
// begin transaction request before the first write command.
if needBeginTxn {
bt := &roachpb.BeginTransactionRequest{
RequestHeader: roachpb.RequestHeader{
Key: firstWriteKey,
},
}
reqs = append(append(append([]roachpb.Request(nil), reqs[:firstWriteIndex]...), []roachpb.Request{bt}...), reqs[firstWriteIndex:]...)
}

if elideEndTxn {
reqs = reqs[:lastIndex]
}
Expand All @@ -445,5 +461,10 @@ func (txn *Txn) send(reqs ...roachpb.Request) (*roachpb.BatchResponse, *roachpb.
txn.Proto.Status = roachpb.ABORTED
}
}

// If we inserted a begin transaction request, remove it here.
if needBeginTxn && pErr == nil {
br.Responses = append(br.Responses[:firstWriteIndex], br.Responses[firstWriteIndex+1:]...)
}
return br, pErr
}
64 changes: 58 additions & 6 deletions client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func newDB(sender Sender) *DB {
}
}

// TestSender mocks out some of the txn coordinator sender's
// functionality. It responds to PutRequests using testPutResp.
func newTestSender(pre, post func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)) SenderFunc {
txnKey := roachpb.Key("test-txn")
txnID := []byte(uuid.NewUUID4())
Expand All @@ -68,9 +70,16 @@ func newTestSender(pre, post func(roachpb.BatchRequest) (*roachpb.BatchResponse,
}
var writing bool
status := roachpb.PENDING
if _, ok := ba.GetArg(roachpb.Put); ok {
br.Add(proto.Clone(testPutResp).(roachpb.Response))
writing = true
for i, req := range ba.Requests {
args := req.GetInner()
if _, ok := args.(*roachpb.PutRequest); ok {
if !br.Responses[i].SetValue(proto.Clone(testPutResp).(roachpb.Response)) {
panic("failed to set put response")
}
}
if roachpb.IsTransactionWrite(args) {
writing = true
}
}
if args, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := args.(*roachpb.EndTransactionRequest)
Expand Down Expand Up @@ -240,20 +249,63 @@ func TestCommitReadOnlyTransactionExplicit(t *testing.T) {
// upon successful invocation of the retryable func.
func TestCommitMutatingTransaction(t *testing.T) {
defer leaktest.AfterTest(t)

var calls []roachpb.Method
db := newDB(newTestSender(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if bt, ok := ba.GetArg(roachpb.BeginTransaction); ok && !bt.Header().Key.Equal(roachpb.Key("a")) {
t.Errorf("expected begin transaction key to be \"a\"; got %s", bt.Header().Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok && !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
}
return ba.CreateReply(), nil
}, nil))

// Test all transactional write methods.
testArgs := []struct {
f func(txn *Txn) error
expMethod roachpb.Method
}{
{func(txn *Txn) error { return txn.Put("a", "b") }, roachpb.Put},
{func(txn *Txn) error { return txn.CPut("a", "b", nil) }, roachpb.ConditionalPut},
{func(txn *Txn) error {
_, err := txn.Inc("a", 1)
return err
}, roachpb.Increment},
{func(txn *Txn) error { return txn.Del("a") }, roachpb.Delete},
{func(txn *Txn) error { return txn.DelRange("a", "b") }, roachpb.DeleteRange},
}
for i, test := range testArgs {
calls = []roachpb.Method{}
if err := db.Txn(func(txn *Txn) error {
return test.f(txn)
}); err != nil {
t.Errorf("%d: unexpected error on commit: %s", i, err)
}
expectedCalls := []roachpb.Method{roachpb.BeginTransaction, test.expMethod, roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Errorf("%d: expected %s, got %s", i, expectedCalls, calls)
}
}
}

// TestTxnInsertBeginTransaction verifies that a begin transaction
// request is inserted just before the first mutating command.
func TestTxnInsertBeginTransaction(t *testing.T) {
defer leaktest.AfterTest(t)
var calls []roachpb.Method
db := newDB(newTestSender(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
return ba.CreateReply(), nil
}, nil))
if err := db.Txn(func(txn *Txn) error {
txn.Get("foo")
return txn.Put("a", "b")
}); err != nil {
t.Errorf("unexpected error on commit: %s", err)
}
expectedCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}
expectedCalls := []roachpb.Method{roachpb.Get, roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Errorf("expected %s, got %s", expectedCalls, calls)
}
Expand Down Expand Up @@ -306,7 +358,7 @@ func TestAbortReadOnlyTransaction(t *testing.T) {
func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
defer leaktest.AfterTest(t)
for _, success := range []bool{true, false} {
expCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}
expCalls := []roachpb.Method{roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
var calls []roachpb.Method
db := newDB(newTestSender(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
Expand Down Expand Up @@ -355,7 +407,7 @@ func TestAbortMutatingTransaction(t *testing.T) {
}); err == nil {
t.Error("expected error on abort")
}
expectedCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}
expectedCalls := []roachpb.Method{roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Errorf("expected %s, got %s", expectedCalls, calls)
}
Expand Down
23 changes: 12 additions & 11 deletions kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ import (
)

var allExternalMethods = [...]roachpb.Request{
roachpb.Get: &roachpb.GetRequest{},
roachpb.Put: &roachpb.PutRequest{},
roachpb.ConditionalPut: &roachpb.ConditionalPutRequest{},
roachpb.Increment: &roachpb.IncrementRequest{},
roachpb.Delete: &roachpb.DeleteRequest{},
roachpb.DeleteRange: &roachpb.DeleteRangeRequest{},
roachpb.Scan: &roachpb.ScanRequest{},
roachpb.ReverseScan: &roachpb.ReverseScanRequest{},
roachpb.EndTransaction: &roachpb.EndTransactionRequest{},
roachpb.AdminSplit: &roachpb.AdminSplitRequest{},
roachpb.AdminMerge: &roachpb.AdminMergeRequest{},
roachpb.Get: &roachpb.GetRequest{},
roachpb.Put: &roachpb.PutRequest{},
roachpb.ConditionalPut: &roachpb.ConditionalPutRequest{},
roachpb.Increment: &roachpb.IncrementRequest{},
roachpb.Delete: &roachpb.DeleteRequest{},
roachpb.DeleteRange: &roachpb.DeleteRangeRequest{},
roachpb.Scan: &roachpb.ScanRequest{},
roachpb.ReverseScan: &roachpb.ReverseScanRequest{},
roachpb.BeginTransaction: &roachpb.BeginTransactionRequest{},
roachpb.EndTransaction: &roachpb.EndTransactionRequest{},
roachpb.AdminSplit: &roachpb.AdminSplitRequest{},
roachpb.AdminMerge: &roachpb.AdminMergeRequest{},
}

// A DBServer provides an HTTP server endpoint serving the key-value API.
Expand Down
49 changes: 36 additions & 13 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/util"
Expand Down Expand Up @@ -296,7 +295,9 @@ func (tc *TxnCoordSender) startStats() {
// write intents; they're tagged to an outgoing EndTransaction request, with
// the receiving replica in charge of resolving them.
func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
tc.maybeBeginTxn(&ba)
if err := tc.maybeBeginTxn(&ba); err != nil {
return nil, roachpb.NewError(err)
}
ba.CmdID = ba.GetOrCreateCmdID(tc.clock.PhysicalNow())
var startNS int64

Expand Down Expand Up @@ -334,6 +335,10 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r

if rArgs, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := rArgs.(*roachpb.EndTransactionRequest)
if len(et.Key) != 0 {
return nil, roachpb.NewError(util.Errorf("EndTransaction must not have a Key set"))
}
et.Key = ba.Txn.Key
// Remember when EndTransaction started in case we want to
// be linearizable.
startNS = tc.clock.PhysicalNow()
Expand All @@ -343,11 +348,6 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r
// write on multiple coordinators.
return nil, roachpb.NewError(util.Errorf("client must not pass intents to EndTransaction"))
}
if len(et.Key) != 0 {
return nil, roachpb.NewError(util.Errorf("EndTransaction must not have a Key set"))
}
et.Key = ba.Txn.Key

tc.Lock()
txnMeta, metaOK := tc.txns[id]
if id != "" && metaOK {
Expand Down Expand Up @@ -442,17 +442,21 @@ func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*r
// in the request but has a nil ID. The new transaction is initialized
// using the name and isolation in the otherwise uninitialized txn.
// The Priority, if non-zero is used as a minimum.
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) {
//
// No transactional writes are allowed unless preceded by a begin
// transaction request within the same batch. The exception is if the
// transaction is already in state txn.Writing=true.
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) error {
if ba.Txn == nil {
return
return nil
}
if len(ba.Requests) == 0 {
panic("empty batch with txn")
return util.Errorf("empty batch with txn")
}
if len(ba.Txn.ID) == 0 {
// TODO(tschottdorf): should really choose the first txn write here.
firstKey := ba.Requests[0].GetInner().Header().Key
newTxn := roachpb.NewTransaction(ba.Txn.Name, keys.KeyAddress(firstKey), ba.GetUserPriority(),
// Create transaction without a key. The key is set when a begin
// transaction request is received.
newTxn := roachpb.NewTransaction(ba.Txn.Name, nil, ba.GetUserPriority(),
ba.Txn.Isolation, tc.clock.Now(), tc.clock.MaxOffset().Nanoseconds())
// Use existing priority as a minimum. This is used on transaction
// aborts to ratchet priority when creating successor transaction.
Expand All @@ -461,6 +465,25 @@ func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) {
}
ba.Txn = newTxn
}

// Check for a begin transaction to set txn key based on the key of
// the first transactional write. Also enforce that no transactional
// writes occur before a begin transaction.
var haveBeginTxn bool
for _, req := range ba.Requests {
args := req.GetInner()
if bt, ok := args.(*roachpb.BeginTransactionRequest); ok {
if haveBeginTxn || ba.Txn.Writing {
return util.Errorf("begin transaction requested twice in the same transaction")
}
haveBeginTxn = true
ba.Txn.Key = bt.Key
}
if roachpb.IsTransactionWrite(args) && !haveBeginTxn && !ba.Txn.Writing {
return util.Errorf("transactional write before begin transaction")
}
}
return nil
}

// cleanupTxn is called when a transaction ends. The transaction record is
Expand Down
Loading

0 comments on commit 50d25e1

Please sign in to comment.