diff --git a/kv/local_sender.go b/kv/local_sender.go index bbed9245b5de..bf00f0f2b407 100644 --- a/kv/local_sender.go +++ b/kv/local_sender.go @@ -125,6 +125,26 @@ func (ls *LocalSender) Send(call *client.Call) { if err != nil { call.Reply.Header().SetGoError(err) } else { + if header := call.Args.Header(); header.Txn != nil { + // For calls that read data, we can avoid uncertainty related + // retries in certain situations. + // If the node is in "CertainNodes", we need not worry about + // uncertain reads any more. Setting MaxTimestamp=Timestamp + // for the operation accomplishes that. + // See proto.Transaction.CertainNodes for details. + if header.Txn.CertainNodes.Contains(header.Replica.NodeID) { + // Make sure that when this retryable function returns, + // MaxTimestamp is restored. On retries, there is no + // guarantee that the request gets routed to the same node + // as the replica may have moved. + defer func(ts proto.Timestamp) { + header.Txn.MaxTimestamp = ts + }(header.Txn.MaxTimestamp) + // MaxTimestamp = Timestamp corresponds to no clock uncertainty. + header.Txn.MaxTimestamp = header.Txn.Timestamp + } + } + if err = store.ExecuteCmd(call.Method, call.Args, call.Reply); err != nil { // Check for range key mismatch error (this could happen if // range was split between lookup and execution). In this case, diff --git a/kv/split_test.go b/kv/split_test.go index 50956361743c..e91f6469e75e 100644 --- a/kv/split_test.go +++ b/kv/split_test.go @@ -90,7 +90,10 @@ func startTestWriter(db *client.KV, i int64, valBytes int32, wg *sync.WaitGroup, // 10 concurrent goroutines are each running successive transactions // composed of a random mix of puts. func TestRangeSplitsWithConcurrentTxns(t *testing.T) { - db, _, _, _, _ := createTestDB(t) + db, _, _, _, _, err := createTestDB() + if err != nil { + t.Fatal(err) + } defer db.Close() // This channel shuts the whole apparatus down. @@ -136,7 +139,10 @@ func TestRangeSplitsWithConcurrentTxns(t *testing.T) { // TestRangeSplitsWithWritePressure sets the zone config max bytes for // a range to 256K and writes data until there are five ranges. func TestRangeSplitsWithWritePressure(t *testing.T) { - db, eng, _, _, _ := createTestDB(t) + db, eng, _, _, _, err := createTestDB() + if err != nil { + t.Fatal(err) + } defer db.Close() setTestRetryOptions() diff --git a/kv/txn_coord_sender.go b/kv/txn_coord_sender.go index f9b545d390a1..366849893787 100644 --- a/kv/txn_coord_sender.go +++ b/kv/txn_coord_sender.go @@ -388,6 +388,10 @@ func (tc *TxnCoordSender) updateResponseTxn(argsHeader *proto.RequestHeader, rep // Take action on various errors. switch t := replyHeader.GoError().(type) { case *proto.ReadWithinUncertaintyIntervalError: + // Mark the host as certain. See the protobuf comment for + // Transaction.CertainNodes for details. + replyHeader.Txn.CertainNodes.Add(argsHeader.Replica.NodeID) + // If the reader encountered a newer write within the uncertainty // interval, move the timestamp forward, just past that write or // up to MaxTimestamp, whichever comes first. diff --git a/kv/txn_coord_sender_test.go b/kv/txn_coord_sender_test.go index 127f37efe4f3..d39a0fad099b 100644 --- a/kv/txn_coord_sender_test.go +++ b/kv/txn_coord_sender_test.go @@ -38,31 +38,32 @@ import ( // with a store using an in-memory engine. Returns the created kv // client and associated clock's manual time. // TODO(spencer): return a struct. -func createTestDB(t *testing.T) (*client.KV, engine.Engine, *hlc.Clock, *hlc.ManualClock, *LocalSender) { +func createTestDB() (db *client.KV, eng engine.Engine, clock *hlc.Clock, + manual *hlc.ManualClock, lSender *LocalSender, err error) { rpcContext := rpc.NewContext(hlc.NewClock(hlc.UnixNano), rpc.LoadInsecureTLSConfig()) g := gossip.New(rpcContext) - manual := hlc.NewManualClock(0) - clock := hlc.NewClock(manual.UnixNano) - eng := engine.NewInMem(proto.Attributes{}, 50<<20) - lSender := NewLocalSender() + manual = hlc.NewManualClock(0) + clock = hlc.NewClock(manual.UnixNano) + eng = engine.NewInMem(proto.Attributes{}, 50<<20) + lSender = NewLocalSender() sender := NewTxnCoordSender(lSender, clock) - db := client.NewKV(sender, nil) + db = client.NewKV(sender, nil) db.User = storage.UserRoot store := storage.NewStore(clock, eng, db, g) - if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { - t.Fatal(err) + if err = store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + return } - if err := store.Start(); err != nil { - t.Fatal(err) + if err = store.Start(); err != nil { + return } lSender.AddStore(store) - if err := store.BootstrapRange(); err != nil { - t.Fatal(err) + if err = store.BootstrapRange(); err != nil { + return } - if err := store.Start(); err != nil { - t.Fatal(err) + if err = store.Start(); err != nil { + return } - return db, eng, clock, manual, lSender + return } // makeTS creates a new timestamp. @@ -100,7 +101,10 @@ func createPutRequest(key proto.Key, value []byte, txn *proto.Transaction) *prot // transaction metadata and adding multiple requests with same // transaction ID updates the last update timestamp. func TestTxnCoordSenderAddRequest(t *testing.T) { - db, _, clock, manual, ls := createTestDB(t) + db, _, clock, manual, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } coord := getCoord(db) defer db.Close() defer ls.Close() @@ -141,7 +145,10 @@ func TestTxnCoordSenderAddRequest(t *testing.T) { // TestTxnCoordSenderBeginTransaction verifies that a command sent with a // not-nil Txn with empty ID gets a new transaction initialized. func TestTxnCoordSenderBeginTransaction(t *testing.T) { - db, _, _, _, ls := createTestDB(t) + db, _, _, _, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } defer db.Close() defer ls.Close() @@ -182,7 +189,10 @@ func TestTxnCoordSenderBeginTransaction(t *testing.T) { // TestTxnCoordSenderBeginTransactionMinPriority verifies that when starting // a new transaction, a non-zero priority is treated as a minimum value. func TestTxnCoordSenderBeginTransactionMinPriority(t *testing.T) { - db, _, _, _, ls := createTestDB(t) + db, _, _, _, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } defer db.Close() defer ls.Close() @@ -226,7 +236,10 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) { {proto.Key("b"), proto.Key("c")}, } - db, _, clock, _, ls := createTestDB(t) + db, _, clock, _, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } coord := getCoord(db) defer db.Close() defer ls.Close() @@ -256,7 +269,10 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) { // TestTxnCoordSenderMultipleTxns verifies correct operation with // multiple outstanding transactions. func TestTxnCoordSenderMultipleTxns(t *testing.T) { - db, _, clock, _, ls := createTestDB(t) + db, _, clock, _, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } coord := getCoord(db) defer db.Close() defer ls.Close() @@ -278,7 +294,10 @@ func TestTxnCoordSenderMultipleTxns(t *testing.T) { // TestTxnCoordSenderHeartbeat verifies periodic heartbeat of the // transaction record. func TestTxnCoordSenderHeartbeat(t *testing.T) { - db, _, clock, manual, ls := createTestDB(t) + db, _, clock, manual, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } coord := getCoord(db) defer db.Close() defer ls.Close() @@ -350,7 +369,10 @@ func verifyCleanup(key proto.Key, db *client.KV, eng engine.Engine, t *testing.T // sends resolve write intent requests and removes the transaction // from the txns map. func TestTxnCoordSenderEndTxn(t *testing.T) { - db, eng, clock, _, ls := createTestDB(t) + db, eng, clock, _, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } defer db.Close() defer ls.Close() @@ -385,7 +407,10 @@ func TestTxnCoordSenderEndTxn(t *testing.T) { // TestTxnCoordSenderCleanupOnAborted verifies that if a txn receives a // TransactionAbortedError, the coordinator cleans up the transaction. func TestTxnCoordSenderCleanupOnAborted(t *testing.T) { - db, eng, clock, _, ls := createTestDB(t) + db, eng, clock, _, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } defer db.Close() defer ls.Close() @@ -422,7 +447,7 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) { }, Commit: true, } - err := db.Call(proto.EndTransaction, etArgs, &proto.EndTransactionResponse{}) + err = db.Call(proto.EndTransaction, etArgs, &proto.EndTransactionResponse{}) switch err.(type) { case nil: t.Fatal("expected txn aborted error") @@ -437,7 +462,10 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) { // TestTxnCoordSenderGC verifies that the coordinator cleans up extant // transactions after the lastUpdateTS exceeds the timeout. func TestTxnCoordSenderGC(t *testing.T) { - db, _, clock, manual, ls := createTestDB(t) + db, _, clock, manual, ls, err := createTestDB() + if err != nil { + t.Fatal(err) + } coord := getCoord(db) defer db.Close() defer ls.Close() @@ -492,6 +520,9 @@ var testPutReq = &proto.PutRequest{ Txn: &proto.Transaction{ Name: "test txn", }, + Replica: proto.Replica{ + NodeID: 12345, + }, }, } @@ -508,11 +539,21 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { expPri int32 expTS proto.Timestamp expOrigTS proto.Timestamp + nodeSeen bool }{ - {&proto.ReadWithinUncertaintyIntervalError{ExistingTimestamp: makeTS(10, 10)}, 1, 1, makeTS(10, 11), makeTS(10, 11)}, - {&proto.TransactionAbortedError{Txn: proto.Transaction{Timestamp: makeTS(20, 10), Priority: 10}}, 0, 10, makeTS(20, 10), makeTS(0, 1)}, - {&proto.TransactionPushError{PusheeTxn: proto.Transaction{Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 9, makeTS(10, 11), makeTS(10, 11)}, - {&proto.TransactionRetryError{Txn: proto.Transaction{Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 10, makeTS(10, 10), makeTS(10, 10)}, + {nil, 0, 1, makeTS(0, 1), makeTS(0, 1), false}, + {&proto.ReadWithinUncertaintyIntervalError{ + ExistingTimestamp: makeTS(10, 10)}, 1, 1, makeTS(10, 11), + makeTS(10, 11), true}, + {&proto.TransactionAbortedError{Txn: proto.Transaction{ + Timestamp: makeTS(20, 10), Priority: 10}}, 0, 10, makeTS(20, 10), + makeTS(0, 1), false}, + {&proto.TransactionPushError{PusheeTxn: proto.Transaction{ + Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 9, + makeTS(10, 11), makeTS(10, 11), false}, + {&proto.TransactionRetryError{Txn: proto.Transaction{ + Timestamp: makeTS(10, 10), Priority: int32(10)}}, 1, 10, + makeTS(10, 10), makeTS(10, 10), false}, } for i, test := range testCases { @@ -526,16 +567,24 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) { t.Fatalf("%d: expected %T; got %T", i, test.err, reply.GoError()) } if reply.Txn.Epoch != test.expEpoch { - t.Errorf("%d: expected epoch = %d; got %d", i, test.expEpoch, reply.Txn.Epoch) + t.Errorf("%d: expected epoch = %d; got %d", + i, test.expEpoch, reply.Txn.Epoch) } if reply.Txn.Priority != test.expPri { - t.Errorf("%d: expected priority = %d; got %d", i, test.expPri, reply.Txn.Priority) + t.Errorf("%d: expected priority = %d; got %d", + i, test.expPri, reply.Txn.Priority) } if !reply.Txn.Timestamp.Equal(test.expTS) { - t.Errorf("%d: expected timestamp to be %s; got %s", i, test.expTS, reply.Txn.Timestamp) + t.Errorf("%d: expected timestamp to be %s; got %s", + i, test.expTS, reply.Txn.Timestamp) } if !reply.Txn.OrigTimestamp.Equal(test.expOrigTS) { - t.Errorf("%d: expected orig timestamp to be %s + 1; got %s", i, test.expOrigTS, reply.Txn.OrigTimestamp) + t.Errorf("%d: expected orig timestamp to be %s + 1; got %s", + i, test.expOrigTS, reply.Txn.OrigTimestamp) + } + if nodes := reply.Txn.CertainNodes.GetNodes(); (len(nodes) != 0) != test.nodeSeen { + t.Errorf("%d: expected nodeSeen=%t, but list of hosts is %v", + i, test.nodeSeen, nodes) } } } diff --git a/kv/txn_correctness_test.go b/kv/txn_correctness_test.go index 001fb575906d..599a814cd222 100644 --- a/kv/txn_correctness_test.go +++ b/kv/txn_correctness_test.go @@ -606,7 +606,10 @@ func checkConcurrency(name string, isolations []proto.IsolationType, txns []stri verify *verifier, expSuccess bool, t *testing.T) { setCorrectnessRetryOptions() verifier := newHistoryVerifier(name, txns, verify, expSuccess, t) - db, _, _, _, _ := createTestDB(t) + db, _, _, _, _, err := createTestDB() + if err != nil { + t.Fatal(err) + } verifier.run(isolations, db, t) } diff --git a/kv/txn_test.go b/kv/txn_test.go index c7590e2e2c74..8bef3df6844f 100644 --- a/kv/txn_test.go +++ b/kv/txn_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/storage" + "github.com/cockroachdb/cockroach/storage/engine" "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/hlc" ) @@ -40,7 +41,10 @@ import ( // uncommitted writes cannot be read outside of the txn but can be // read from inside the txn. func TestTxnDBBasics(t *testing.T) { - db, _, _, _, _ := createTestDB(t) + db, _, _, _, _, err := createTestDB() + if err != nil { + t.Fatal(err) + } value := []byte("value") for _, commit := range []bool{true, false} { @@ -101,12 +105,42 @@ func TestTxnDBBasics(t *testing.T) { } } +// BenchmarkTxnWrites benchmarks a number of transactions writing to the +// same key back to back, without using Prepare/Flush. +func BenchmarkTxnWrites(b *testing.B) { + db, _, _, mClock, _, err := createTestDB() + if err != nil { + b.Fatal(err) + } + key := proto.Key("key") + txnOpts := &client.TransactionOptions{ + Name: "benchWrite", + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + mClock.Increment(1) + if tErr := db.RunTransaction(txnOpts, func(txn *client.KV) error { + pr := &proto.PutResponse{} + pa := proto.PutArgs(key, []byte(fmt.Sprintf("value-%d", i))) + if err := txn.Call(proto.Put, pa, pr); err != nil { + b.Fatal(err) + } + return nil + }); tErr != nil { + b.Fatal(err) + } + } +} + // verifyUncertainty writes values to a key in 5ns intervals and then launches // a transaction at each value's timestamp reading that value with // the maximumOffset given, verifying in the process that the correct values // are read (usually after one transaction restart). func verifyUncertainty(concurrency int, maxOffset time.Duration, t *testing.T) { - db, _, clock, _, lSender := createTestDB(t) + db, _, clock, _, lSender, err := createTestDB() + if err != nil { + t.Fatal(err) + } txnOpts := &client.TransactionOptions{ Name: "test", @@ -239,3 +273,155 @@ func TestTxnDBUncertainty(t *testing.T) { verifyUncertainty(7, 12*time.Nanosecond, t) verifyUncertainty(100, 10*time.Nanosecond, t) } + +// TestUncertaintyRestarts verifies that transactional reads within the +// uncertainty interval cause exactly one restart. The test runs a transaction +// which attempts to read a single key, but just before that read, a future +// version of that key is written directly through the MVCC layer. + +// Indirectly this tests that the transaction remembers the NodeID of the node +// being read from correctly, at least in this simple case. Not remembering the +// node would lead to thousands of transaction restarts and almost certainly a +// test timeout. +func TestUncertaintyRestarts(t *testing.T) { + { + db, eng, clock, mClock, _, err := createTestDB() + if err != nil { + t.Fatal(err) + } + // Set a large offset so that a busy restart-loop + // really shows. Also makes sure that the values + // we write in the future below don't actually + // wind up in the past. + offset := 4000 * time.Millisecond + clock.SetMaxOffset(offset) + key := proto.Key("key") + value := proto.Value{ + Bytes: nil, // Set for each Put + } + // With the correct restart behaviour, we see only one restart + // and the value read is the very first one (as nothing else + // has been written) + wantedBytes := []byte("value-0") + + txnOpts := &client.TransactionOptions{ + Name: "uncertainty", + } + gr := &proto.GetResponse{} + i := -1 + tErr := db.RunTransaction(txnOpts, func(txn *client.KV) error { + i++ + mClock.Increment(1) + futureTS := clock.Now() + futureTS.WallTime++ + value.Bytes = []byte(fmt.Sprintf("value-%d", i)) + err = engine.MVCCPut(eng, nil, key, futureTS, value, nil) + if err != nil { + t.Fatal(err) + } + gr.Reset() + if err := txn.Call(proto.Get, proto.GetArgs(key), gr); err != nil { + return err + } + if gr.Value == nil || !bytes.Equal(gr.Value.Bytes, wantedBytes) { + t.Fatalf("%d: read wrong value: %v, wanted %q", i, + gr.Value, wantedBytes) + } + return nil + }) + if i != 1 { + t.Errorf("txn restarted %d times, expected only one restart", i) + } + if tErr != nil { + t.Fatal(tErr) + } + } +} + +// TestUncertaintyMaxTimestampForwarding checks that we correctly read from +// hosts which for which we control the uncertainty by checking that when a +// transaction restarts after an uncertain read, it will also take into account +// the target node's clock at the time of the failed read when forwarding the +// read timestamp. +// This is a prerequisite for being able to prevent further uncertainty +// restarts for that node and transaction without sacrificing correctness. +// See proto.Transaction.CertainNodes for details. +func TestUncertaintyMaxTimestampForwarding(t *testing.T) { + db, eng, clock, mClock, _, err := createTestDB() + // Large offset so that any value in the future is an uncertain read. + // Also makes sure that the values we write in the future below don't + // actually wind up in the past. + clock.SetMaxOffset(50000 * time.Millisecond) + + txnOpts := &client.TransactionOptions{ + Name: "uncertainty", + } + + offsetNS := int64(100) + keySlow := proto.Key("slow") + keyFast := proto.Key("fast") + valSlow := []byte("wols") + valFast := []byte("tsaf") + + // Write keySlow at now+offset, keyFast at now+2*offset + futureTS := clock.Now() + futureTS.WallTime += offsetNS + err = engine.MVCCPut(eng, nil, keySlow, futureTS, + proto.Value{Bytes: valSlow}, nil) + if err != nil { + t.Fatal(err) + } + futureTS.WallTime += offsetNS + err = engine.MVCCPut(eng, nil, keyFast, futureTS, + proto.Value{Bytes: valFast}, nil) + if err != nil { + t.Fatal(err) + } + + i := 0 + if tErr := db.RunTransaction(txnOpts, func(txn *client.KV) error { + i++ + // The first command serves to start a Txn, fixing the timestamps. + // There will be a restart, but this is idempotent. + sr := &proto.ScanResponse{} + if err = txn.Call(proto.Scan, proto.ScanArgs(proto.Key("t"), proto.Key("t"), + 0), sr); err != nil { + t.Fatal(err) + } + + // The server's clock suddenly jumps ahead of keyFast's timestamp. + // There will be a restart, but this is idempotent. + mClock.Set(2*offsetNS + 1) + + // Now read slowKey first. It should read at 0, catch an uncertainty error, + // and get keySlow's timestamp in that error, but upgrade it to the larger + // node clock (which is ahead of keyFast as well). If the last part does + // not happen, the read of keyFast should fail (i.e. read nothing). + // There will be exactly one restart here. + gr := &proto.GetResponse{} + if err = txn.Call(proto.Get, proto.GetArgs(keySlow), gr); err != nil { + if i != 1 { + t.Errorf("unexpected transaction error: %v", err) + } + return err + } + if gr.Value == nil || !bytes.Equal(gr.Value.Bytes, valSlow) { + t.Errorf("read of %q returned %v, wanted value %q", keySlow, gr.Value, + valSlow) + } + + gr.Reset() + // The node should already be certain, so we expect no restart here + // and to read the correct key. + if err = txn.Call(proto.Get, proto.GetArgs(keyFast), gr); err != nil { + t.Errorf("second Get failed with %v", err) + } + if gr.Value == nil || !bytes.Equal(gr.Value.Bytes, valFast) { + t.Errorf("read of %q returned %v, wanted value %q", keyFast, gr.Value, + valFast) + } + return nil + }); tErr != nil { + t.Fatal(tErr) + } +} diff --git a/proto/data.go b/proto/data.go index 6729df2ae7cf..0b120eddc68c 100644 --- a/proto/data.go +++ b/proto/data.go @@ -23,6 +23,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strings" "time" @@ -260,6 +261,22 @@ func (t Timestamp) Add(wallTime int64, logical int32) Timestamp { } } +// Forward updates the timestamp from the one given, if that moves it +// forwards in time. +func (t *Timestamp) Forward(s Timestamp) { + if t.Less(s) { + *t = s + } +} + +// Backward updates the timestamp from the one given, if that moves it +// backwards in time. +func (t *Timestamp) Backward(s Timestamp) { + if s.Less(*t) { + *t = s + } +} + // InitChecksum initializes a checksum based on the provided key and // the contents of the value. If the value contains a byte slice, the // checksum includes it directly; if the value contains an integer, @@ -399,8 +416,8 @@ func (t *Transaction) Restart(userPriority, upgradePriority int32, timestamp Tim t.UpgradePriority(upgradePriority) } -// Update ratchets priority, timestamp and original timestamp values -// for the transaction. If t.ID is empty, then the transaction is +// Update ratchets priority, timestamp and original timestamp values (among +// others) for the transaction. If t.ID is empty, then the transaction is // copied from o. func (t *Transaction) Update(o *Transaction) { if o == nil { @@ -422,6 +439,11 @@ func (t *Transaction) Update(o *Transaction) { if t.OrigTimestamp.Less(o.OrigTimestamp) { t.OrigTimestamp = o.OrigTimestamp } + // Should not actually change at the time of writing. + t.MaxTimestamp = o.MaxTimestamp + // Copy the list of nodes without time uncertainty. + t.CertainNodes = NodeList{Nodes: append(Int32Slice(nil), + o.CertainNodes.Nodes...)} t.UpgradePriority(o.Priority) } @@ -500,3 +522,26 @@ func (gc *GCMetadata) EstimatedBytes(now time.Time, currentNonLiveBytes int64) i // Otherwise, just return the expGCBytes. return expGCBytes } + +// Add adds the given NodeID to the interface (unless already present) +// and restores ordering. +func (s *NodeList) Add(nodeID int32) { + if !s.Contains(nodeID) { + (*s).Nodes = append(s.Nodes, nodeID) + sort.Sort(Int32Slice(s.Nodes)) + } +} + +// Contains returns true if the underlying slice contains the given NodeID. +func (s NodeList) Contains(nodeID int32) bool { + ns := s.GetNodes() + i := sort.Search(len(ns), func(i int) bool { return ns[i] >= nodeID }) + return i < len(ns) && ns[i] == nodeID +} + +// Int32Slice implements sort.Interface. +type Int32Slice []int32 + +func (s Int32Slice) Len() int { return len(s) } +func (s Int32Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s Int32Slice) Less(i, j int) bool { return s[i] < s[j] } diff --git a/proto/data.proto b/proto/data.proto index 2659f2cd88d8..0ce087d4c21f 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -134,6 +134,12 @@ enum TransactionStatus { ABORTED = 2; } +// NodeList keeps a growing set of NodeIDs as a sorted slice, with Add() +// adding to the set and Contains() verifying membership. +message NodeList { + repeated int32 nodes = 1 [packed=true]; +} + // A Transaction is a unit of work performed on the database. // Cockroach transactions support two isolation levels: snapshot // isolation and serializable snapshot isolation. Each Cockroach @@ -150,21 +156,41 @@ message Transaction { optional TransactionStatus status = 5 [(gogoproto.nullable) = false]; // Incremented on txn retry. optional int32 epoch = 6 [(gogoproto.nullable) = false]; + // The last heartbeat timestamp. + optional Timestamp last_heartbeat = 7; // The proposed timestamp for the transaction. This starts as // the current wall time on the txn coordinator. - optional Timestamp timestamp = 7 [(gogoproto.nullable) = false]; + optional Timestamp timestamp = 8 [(gogoproto.nullable) = false]; // The original timestamp at which the transaction started. For serializable // transactions, if the timestamp drifts from the original timestamp, the // transaction will retry. - optional Timestamp orig_timestamp = 8 [(gogoproto.nullable) = false]; + optional Timestamp orig_timestamp = 9 [(gogoproto.nullable) = false]; // Initial Timestamp + clock skew. Reads which encounter values with // timestamps between Timestamp and MaxTimestamp trigger a txn - // retry error. + // retry error, unless the node being read is listed in nodes_read + // (in which case no more read uncertainty can occur). // The case MaxTimestamp < Timestamp is possible for transactions which have // been pushed; in this case, MaxTimestamp should be ignored. - optional Timestamp max_timestamp = 9 [(gogoproto.nullable) = false]; - // The last hearbeat timestamp. - optional Timestamp last_heartbeat = 10; + optional Timestamp max_timestamp = 10 [(gogoproto.nullable) = false]; + // A sorted list of ids of nodes for which a ReadWithinUncertaintyIntervalError + // occurred during a prior read. The purpose of keeping this information is + // that as a reaction to this error, the transaction's timestamp is forwarded + // appropriately to reflect that node's clock uncertainty. Future reads to + // the same node are therefore freed from uncertainty restarts. + // + // The exact mechanism is that upon encountering the above error, the trans- + // action will have to retry with a higher timestamp. This higher timestamp + // is either the one of the encountered future write returned in the error + // or (if higher, which is in the majority of cases), the time of the node + // serving the key at the time of the failed read. + // Additionally storing the node, we make sure to set MaxTimestamp=Timestamp + // at the time of the read for nodes whose clock we've taken into acount, + // which amounts to reading without any uncertainty. + // + // Bits of this mechanism are found in the local sender, the range and the + // txn_coord_sender, with brief comments referring here. + // See https://github.com/cockroachdb/cockroach/pull/221. + optional NodeList certain_nodes = 11 [(gogoproto.nullable) = false]; } // MVCCMetadata holds MVCC metadata for a key. Used by storage/engine/mvcc.go. diff --git a/proto/data_test.go b/proto/data_test.go index 106dbfaf4539..3c3ce5f057f4 100644 --- a/proto/data_test.go +++ b/proto/data_test.go @@ -20,6 +20,7 @@ package proto import ( "bytes" "math" + "math/rand" "strings" "testing" "time" @@ -383,3 +384,29 @@ func TestGCMetadataEstimatedBytes(t *testing.T) { } } } + +// TestNodeList verifies that its public methods Add() and Contain() +// operate as expected. +func TestNodeList(t *testing.T) { + sn := NodeList{} + items := append([]int{109, 104, 102, 108, 1000}, rand.Perm(100)...) + for i, _ := range items { + n := int32(items[i]) + if sn.Contains(n) { + t.Fatalf("%d: false positive hit for %d on slice %v", + i, n, sn.GetNodes()) + } + // Add this item and, for good measure, all the previous ones. + for j := i; j >= 0; j-- { + sn.Add(int32(items[j])) + } + if nodes := sn.GetNodes(); len(nodes) != i+1 { + t.Fatalf("%d: missing values or duplicates: %v", + i, nodes) + } + if !sn.Contains(n) { + t.Fatalf("%d: false negative hit for %d on slice %v", + i, n, sn.GetNodes()) + } + } +} diff --git a/server/server_test.go b/server/server_test.go index ca6dda940adf..1a66bc4744ad 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -242,6 +242,7 @@ func TestGzip(t *testing.T) { func TestMultiRangeScanDeleteRange(t *testing.T) { ts := StartTestServer(t) tds := kv.NewTxnCoordSender(kv.NewDistSender(ts.Gossip()), ts.Clock()) + defer tds.Close() if err := ts.node.db.Call(proto.AdminSplit, &proto.AdminSplitRequest{ @@ -332,7 +333,6 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { if err := scan.Reply.Header().GoError(); err != nil { t.Fatal(err) } - // TODO: end this txn if txn := scan.Reply.Header().Txn; txn == nil || txn.Name != "MyTxn" { t.Errorf("wanted Txn to persist, but it changed to %v", txn) } diff --git a/storage/engine/mvcc.go b/storage/engine/mvcc.go index 28e763c0557d..5fb4acc54e77 100644 --- a/storage/engine/mvcc.go +++ b/storage/engine/mvcc.go @@ -384,7 +384,8 @@ func mvccGetInternal(engine Engine, key proto.Key, kv proto.RawKeyValue, timesta // Second case: Our read timestamp is behind the latest write, but the // latest write could possibly have happened before our read in // absolute time if the writer had a fast clock. - // The reader should try again at meta.Timestamp+1. + // The reader should try again with a later timestamp than the + // one given below. return nil, &proto.ReadWithinUncertaintyIntervalError{ Timestamp: timestamp, ExistingTimestamp: meta.Timestamp, diff --git a/storage/engine/mvcc_test.go b/storage/engine/mvcc_test.go index ab0e63a8978c..fb092fd445b6 100644 --- a/storage/engine/mvcc_test.go +++ b/storage/engine/mvcc_test.go @@ -333,14 +333,6 @@ func TestMVCCGetNoMoreOldVersion(t *testing.T) { // TestMVCCGetUncertainty verifies that the appropriate error results when // a transaction reads a key at a timestamp that has versions newer than that // timestamp, but older than the transaction's MaxTimestamp. -// TODO(Tobias): Test this in a live transactions as well, verifying the -// necessary transaction restarts happen correctly etc. -// Spencer's suggestion: -// Create 3 clocks, each with max drift (offset) set to 100ms. Set clock one to -// time=t, clock two to time=t+50ms, clock three to time=t+100ms. Write three -// values at current time according to each of the three clocks. Start three -// txns, each using one of the three clocks. In each txn, read the three values -// and ensure that all txns read the correct values. func TestMVCCGetUncertainty(t *testing.T) { engine := createTestEngine() txn := &proto.Transaction{ID: []byte("txn"), Timestamp: makeTS(5, 0), MaxTimestamp: makeTS(10, 0)} diff --git a/storage/range.go b/storage/range.go index 070e104f1b87..841f3ace6062 100644 --- a/storage/range.go +++ b/storage/range.go @@ -713,14 +713,27 @@ func (r *Range) executeCmd(method string, args proto.Request, reply proto.Respon } // On success, flush the MVCC stats to the batch and commit. - if proto.IsReadWrite(method) && reply.Header().Error == nil { - ms.MergeStats(batch, r.Desc.RaftID, r.rm.StoreID()) - if err := batch.Commit(); err != nil { - reply.Header().SetGoError(err) - } else { - // If the commit succeeded, potentially initiate a split of this range. - r.maybeSplit() + if err := reply.Header().GoError(); err == nil { + if proto.IsReadWrite(method) { + ms.MergeStats(batch, r.Desc.RaftID, r.rm.StoreID()) + if err := batch.Commit(); err != nil { + reply.Header().SetGoError(err) + } else { + // If the commit succeeded, potentially initiate a split of this range. + r.maybeSplit() + } } + } else if err, ok := reply.Header().GoError().(*proto.ReadWithinUncertaintyIntervalError); ok { + // A ReadUncertaintyIntervalError contains the timestamp of the value + // that provoked the conflict. However, we forward the timestamp to the + // node's time here. The reason is that the caller (which is always + // transactional when this error occurs) in our implementation wants to + // use this information to extract a timestamp after which reads from + // the nodes are causally consistent with the transaction. This allows + // the node to be classified as without further uncertain reads for the + // remainder of the transaction. + // See the comment on proto.Transaction.CertainNodes. + err.ExistingTimestamp.Forward(r.rm.Clock().Now()) } // Maybe update gossip configs on a put if there was no error. @@ -889,7 +902,7 @@ func (r *Range) EndTransaction(batch engine.Engine, args *proto.EndTransactionRe reply.Txn.Status = proto.ABORTED } - // Persist the transaction record with updated status (& possibly timestmap). + // Persist the transaction record with updated status (& possibly timestamp). if err := engine.MVCCPutProto(batch, nil, key, proto.ZeroTimestamp, nil, reply.Txn); err != nil { reply.SetGoError(err) return diff --git a/storage/range_test.go b/storage/range_test.go index 1a316421abe2..0f19419a4062 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -1508,7 +1508,7 @@ func TestRangeStats(t *testing.T) { if err := rng.AddCmd(proto.Put, pArgs, pReply, true); err != nil { t.Fatal(err) } - expMS = engine.MVCCStats{LiveBytes: 124, KeyBytes: 40, ValBytes: 84, IntentBytes: 28, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 1} + expMS = engine.MVCCStats{LiveBytes: 124 + 2, KeyBytes: 40, ValBytes: 84 + 2, IntentBytes: 28, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 1} verifyRangeStats(eng, rng.Desc.RaftID, expMS, t) // Resolve the 2nd value.