Skip to content

Commit

Permalink
Merge pull request #221 from tschottdorf/certainty
Browse files Browse the repository at this point in the history
improved Txn restart behaviour
  • Loading branch information
tbg committed Dec 20, 2014
2 parents 308a0ef + a6afc9b commit 72e33df
Show file tree
Hide file tree
Showing 14 changed files with 437 additions and 65 deletions.
20 changes: 20 additions & 0 deletions kv/local_sender.go
Expand Up @@ -125,6 +125,26 @@ func (ls *LocalSender) Send(call *client.Call) {
if err != nil {
call.Reply.Header().SetGoError(err)
} else {
if header := call.Args.Header(); header.Txn != nil {
// For calls that read data, we can avoid uncertainty related
// retries in certain situations.
// If the node is in "CertainNodes", we need not worry about
// uncertain reads any more. Setting MaxTimestamp=Timestamp
// for the operation accomplishes that.
// See proto.Transaction.CertainNodes for details.
if header.Txn.CertainNodes.Contains(header.Replica.NodeID) {
// Make sure that when this retryable function returns,
// MaxTimestamp is restored. On retries, there is no
// guarantee that the request gets routed to the same node
// as the replica may have moved.
defer func(ts proto.Timestamp) {
header.Txn.MaxTimestamp = ts
}(header.Txn.MaxTimestamp)
// MaxTimestamp = Timestamp corresponds to no clock uncertainty.
header.Txn.MaxTimestamp = header.Txn.Timestamp
}
}

if err = store.ExecuteCmd(call.Method, call.Args, call.Reply); err != nil {
// Check for range key mismatch error (this could happen if
// range was split between lookup and execution). In this case,
Expand Down
10 changes: 8 additions & 2 deletions kv/split_test.go
Expand Up @@ -90,7 +90,10 @@ func startTestWriter(db *client.KV, i int64, valBytes int32, wg *sync.WaitGroup,
// 10 concurrent goroutines are each running successive transactions
// composed of a random mix of puts.
func TestRangeSplitsWithConcurrentTxns(t *testing.T) {
db, _, _, _, _ := createTestDB(t)
db, _, _, _, _, err := createTestDB()
if err != nil {
t.Fatal(err)
}
defer db.Close()

// This channel shuts the whole apparatus down.
Expand Down Expand Up @@ -136,7 +139,10 @@ func TestRangeSplitsWithConcurrentTxns(t *testing.T) {
// TestRangeSplitsWithWritePressure sets the zone config max bytes for
// a range to 256K and writes data until there are five ranges.
func TestRangeSplitsWithWritePressure(t *testing.T) {
db, eng, _, _, _ := createTestDB(t)
db, eng, _, _, _, err := createTestDB()
if err != nil {
t.Fatal(err)
}
defer db.Close()
setTestRetryOptions()

Expand Down
4 changes: 4 additions & 0 deletions kv/txn_coord_sender.go
Expand Up @@ -388,6 +388,10 @@ func (tc *TxnCoordSender) updateResponseTxn(argsHeader *proto.RequestHeader, rep
// Take action on various errors.
switch t := replyHeader.GoError().(type) {
case *proto.ReadWithinUncertaintyIntervalError:
// Mark the host as certain. See the protobuf comment for
// Transaction.CertainNodes for details.
replyHeader.Txn.CertainNodes.Add(argsHeader.Replica.NodeID)

// If the reader encountered a newer write within the uncertainty
// interval, move the timestamp forward, just past that write or
// up to MaxTimestamp, whichever comes first.
Expand Down
115 changes: 82 additions & 33 deletions kv/txn_coord_sender_test.go
Expand Up @@ -38,31 +38,32 @@ import (
// with a store using an in-memory engine. Returns the created kv
// client and associated clock's manual time.
// TODO(spencer): return a struct.
func createTestDB(t *testing.T) (*client.KV, engine.Engine, *hlc.Clock, *hlc.ManualClock, *LocalSender) {
func createTestDB() (db *client.KV, eng engine.Engine, clock *hlc.Clock,
manual *hlc.ManualClock, lSender *LocalSender, err error) {
rpcContext := rpc.NewContext(hlc.NewClock(hlc.UnixNano), rpc.LoadInsecureTLSConfig())
g := gossip.New(rpcContext)
manual := hlc.NewManualClock(0)
clock := hlc.NewClock(manual.UnixNano)
eng := engine.NewInMem(proto.Attributes{}, 50<<20)
lSender := NewLocalSender()
manual = hlc.NewManualClock(0)
clock = hlc.NewClock(manual.UnixNano)
eng = engine.NewInMem(proto.Attributes{}, 50<<20)
lSender = NewLocalSender()
sender := NewTxnCoordSender(lSender, clock)
db := client.NewKV(sender, nil)
db = client.NewKV(sender, nil)
db.User = storage.UserRoot
store := storage.NewStore(clock, eng, db, g)
if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
t.Fatal(err)
if err = store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
return
}
if err := store.Start(); err != nil {
t.Fatal(err)
if err = store.Start(); err != nil {
return
}
lSender.AddStore(store)
if err := store.BootstrapRange(); err != nil {
t.Fatal(err)
if err = store.BootstrapRange(); err != nil {
return
}
if err := store.Start(); err != nil {
t.Fatal(err)
if err = store.Start(); err != nil {
return
}
return db, eng, clock, manual, lSender
return
}

// makeTS creates a new timestamp.
Expand Down Expand Up @@ -100,7 +101,10 @@ func createPutRequest(key proto.Key, value []byte, txn *proto.Transaction) *prot
// transaction metadata and adding multiple requests with same
// transaction ID updates the last update timestamp.
func TestTxnCoordSenderAddRequest(t *testing.T) {
db, _, clock, manual, ls := createTestDB(t)
db, _, clock, manual, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
coord := getCoord(db)
defer db.Close()
defer ls.Close()
Expand Down Expand Up @@ -141,7 +145,10 @@ func TestTxnCoordSenderAddRequest(t *testing.T) {
// TestTxnCoordSenderBeginTransaction verifies that a command sent with a
// not-nil Txn with empty ID gets a new transaction initialized.
func TestTxnCoordSenderBeginTransaction(t *testing.T) {
db, _, _, _, ls := createTestDB(t)
db, _, _, _, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer ls.Close()

Expand Down Expand Up @@ -182,7 +189,10 @@ func TestTxnCoordSenderBeginTransaction(t *testing.T) {
// TestTxnCoordSenderBeginTransactionMinPriority verifies that when starting
// a new transaction, a non-zero priority is treated as a minimum value.
func TestTxnCoordSenderBeginTransactionMinPriority(t *testing.T) {
db, _, _, _, ls := createTestDB(t)
db, _, _, _, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer ls.Close()

Expand Down Expand Up @@ -226,7 +236,10 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) {
{proto.Key("b"), proto.Key("c")},
}

db, _, clock, _, ls := createTestDB(t)
db, _, clock, _, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
coord := getCoord(db)
defer db.Close()
defer ls.Close()
Expand Down Expand Up @@ -256,7 +269,10 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) {
// TestTxnCoordSenderMultipleTxns verifies correct operation with
// multiple outstanding transactions.
func TestTxnCoordSenderMultipleTxns(t *testing.T) {
db, _, clock, _, ls := createTestDB(t)
db, _, clock, _, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
coord := getCoord(db)
defer db.Close()
defer ls.Close()
Expand All @@ -278,7 +294,10 @@ func TestTxnCoordSenderMultipleTxns(t *testing.T) {
// TestTxnCoordSenderHeartbeat verifies periodic heartbeat of the
// transaction record.
func TestTxnCoordSenderHeartbeat(t *testing.T) {
db, _, clock, manual, ls := createTestDB(t)
db, _, clock, manual, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
coord := getCoord(db)
defer db.Close()
defer ls.Close()
Expand Down Expand Up @@ -350,7 +369,10 @@ func verifyCleanup(key proto.Key, db *client.KV, eng engine.Engine, t *testing.T
// sends resolve write intent requests and removes the transaction
// from the txns map.
func TestTxnCoordSenderEndTxn(t *testing.T) {
db, eng, clock, _, ls := createTestDB(t)
db, eng, clock, _, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer ls.Close()

Expand Down Expand Up @@ -385,7 +407,10 @@ func TestTxnCoordSenderEndTxn(t *testing.T) {
// TestTxnCoordSenderCleanupOnAborted verifies that if a txn receives a
// TransactionAbortedError, the coordinator cleans up the transaction.
func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
db, eng, clock, _, ls := createTestDB(t)
db, eng, clock, _, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer ls.Close()

Expand Down Expand Up @@ -422,7 +447,7 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
},
Commit: true,
}
err := db.Call(proto.EndTransaction, etArgs, &proto.EndTransactionResponse{})
err = db.Call(proto.EndTransaction, etArgs, &proto.EndTransactionResponse{})
switch err.(type) {
case nil:
t.Fatal("expected txn aborted error")
Expand All @@ -437,7 +462,10 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
// TestTxnCoordSenderGC verifies that the coordinator cleans up extant
// transactions after the lastUpdateTS exceeds the timeout.
func TestTxnCoordSenderGC(t *testing.T) {
db, _, clock, manual, ls := createTestDB(t)
db, _, clock, manual, ls, err := createTestDB()
if err != nil {
t.Fatal(err)
}
coord := getCoord(db)
defer db.Close()
defer ls.Close()
Expand Down Expand Up @@ -492,6 +520,9 @@ var testPutReq = &proto.PutRequest{
Txn: &proto.Transaction{
Name: "test txn",
},
Replica: proto.Replica{
NodeID: 12345,
},
},
}

Expand All @@ -508,11 +539,21 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
expPri int32
expTS proto.Timestamp
expOrigTS proto.Timestamp
nodeSeen bool
}{
{&proto.ReadWithinUncertaintyIntervalError{ExistingTimestamp: makeTS(10, 10)}, 1, 1, makeTS(10, 11), makeTS(10, 11)},
{&proto.TransactionAbortedError{Txn: proto.Transaction{Timestamp: makeTS(20, 10), Priority: 10}}, 0, 10, makeTS(20, 10), makeTS(0, 1)},
{&proto.TransactionPushError{PusheeTxn: proto.Transaction{Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 9, makeTS(10, 11), makeTS(10, 11)},
{&proto.TransactionRetryError{Txn: proto.Transaction{Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 10, makeTS(10, 10), makeTS(10, 10)},
{nil, 0, 1, makeTS(0, 1), makeTS(0, 1), false},
{&proto.ReadWithinUncertaintyIntervalError{
ExistingTimestamp: makeTS(10, 10)}, 1, 1, makeTS(10, 11),
makeTS(10, 11), true},
{&proto.TransactionAbortedError{Txn: proto.Transaction{
Timestamp: makeTS(20, 10), Priority: 10}}, 0, 10, makeTS(20, 10),
makeTS(0, 1), false},
{&proto.TransactionPushError{PusheeTxn: proto.Transaction{
Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 9,
makeTS(10, 11), makeTS(10, 11), false},
{&proto.TransactionRetryError{Txn: proto.Transaction{
Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 10,
makeTS(10, 10), makeTS(10, 10), false},
}

for i, test := range testCases {
Expand All @@ -526,16 +567,24 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
t.Fatalf("%d: expected %T; got %T", i, test.err, reply.GoError())
}
if reply.Txn.Epoch != test.expEpoch {
t.Errorf("%d: expected epoch = %d; got %d", i, test.expEpoch, reply.Txn.Epoch)
t.Errorf("%d: expected epoch = %d; got %d",
i, test.expEpoch, reply.Txn.Epoch)
}
if reply.Txn.Priority != test.expPri {
t.Errorf("%d: expected priority = %d; got %d", i, test.expPri, reply.Txn.Priority)
t.Errorf("%d: expected priority = %d; got %d",
i, test.expPri, reply.Txn.Priority)
}
if !reply.Txn.Timestamp.Equal(test.expTS) {
t.Errorf("%d: expected timestamp to be %s; got %s", i, test.expTS, reply.Txn.Timestamp)
t.Errorf("%d: expected timestamp to be %s; got %s",
i, test.expTS, reply.Txn.Timestamp)
}
if !reply.Txn.OrigTimestamp.Equal(test.expOrigTS) {
t.Errorf("%d: expected orig timestamp to be %s + 1; got %s", i, test.expOrigTS, reply.Txn.OrigTimestamp)
t.Errorf("%d: expected orig timestamp to be %s + 1; got %s",
i, test.expOrigTS, reply.Txn.OrigTimestamp)
}
if nodes := reply.Txn.CertainNodes.GetNodes(); (len(nodes) != 0) != test.nodeSeen {
t.Errorf("%d: expected nodeSeen=%t, but list of hosts is %v",
i, test.nodeSeen, nodes)
}
}
}
5 changes: 4 additions & 1 deletion kv/txn_correctness_test.go
Expand Up @@ -606,7 +606,10 @@ func checkConcurrency(name string, isolations []proto.IsolationType, txns []stri
verify *verifier, expSuccess bool, t *testing.T) {
setCorrectnessRetryOptions()
verifier := newHistoryVerifier(name, txns, verify, expSuccess, t)
db, _, _, _, _ := createTestDB(t)
db, _, _, _, _, err := createTestDB()
if err != nil {
t.Fatal(err)
}
verifier.run(isolations, db, t)
}

Expand Down

0 comments on commit 72e33df

Please sign in to comment.