-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
replica_batch_updates.go
249 lines (236 loc) · 9.85 KB
/
replica_batch_updates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
// ----------------------------------------------------------------------------
// This files contains functions performing updates to a BatchRequest performed
// on the server-side, specifically after the request has been routed to a
// replica (and thus the request has been split based on range boundaries).
// As per the client.Sender contract, these function need to consider the input
// batches as copy-on-write.
// ----------------------------------------------------------------------------
// maybeStripInFlightWrites attempts to remove all point writes and query
// intents that ended up in the same batch as an EndTxn request from that EndTxn
// request's "in-flight" write set. The entire batch will commit atomically, so
// there is no need to consider the writes in the same batch concurrent.
//
// The transformation can lead to bypassing the STAGING state for a transaction
// entirely. This is possible if the function removes all of the in-flight
// writes from an EndTxn request that was committing in parallel with writes
// which all happened to be on the same range as the transaction record.
func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error) {
args, hasET := ba.GetArg(kvpb.EndTxn)
if !hasET {
return ba, nil
}
et := args.(*kvpb.EndTxnRequest)
otherReqs := ba.Requests[:len(ba.Requests)-1]
if !et.IsParallelCommit() || len(otherReqs) == 0 {
return ba, nil
}
// Clone the BatchRequest and the EndTxn request before modifying it. We nil
// out the request's in-flight writes and make the lock spans immutable on
// append. Code below can use origET to recreate the in-flight write set if
// any elements remain in it.
origET := et
etAlloc := new(struct {
et kvpb.EndTxnRequest
union kvpb.RequestUnion_EndTxn
})
etAlloc.et = *origET // shallow copy
etAlloc.union.EndTxn = &etAlloc.et
et = &etAlloc.et
et.InFlightWrites = nil
et.LockSpans = et.LockSpans[:len(et.LockSpans):len(et.LockSpans)] // immutable
ba = ba.ShallowCopy()
ba.Requests = append([]kvpb.RequestUnion(nil), ba.Requests...)
ba.Requests[len(ba.Requests)-1].Value = &etAlloc.union
// Fast-path: If we know that this batch contains all of the transaction's
// in-flight writes, then we can avoid searching in the in-flight writes set
// for each request. Instead, we can blindly merge all in-flight writes into
// the lock spans and clear out the in-flight writes set.
if len(otherReqs) >= len(origET.InFlightWrites) {
writes := 0
for _, ru := range otherReqs {
req := ru.GetInner()
switch {
case kvpb.CanParallelCommit(req):
// Concurrent point write being committed in parallel.
writes++
case req.Method() == kvpb.QueryIntent:
// Earlier pipelined point write that hasn't been proven yet.
writes++
default:
// Ranged write or read. See below.
}
}
if len(origET.InFlightWrites) < writes {
return ba, errors.New("more write in batch with EndTxn than listed in in-flight writes")
} else if len(origET.InFlightWrites) == writes {
et.LockSpans = make([]roachpb.Span, len(origET.LockSpans)+len(origET.InFlightWrites))
copy(et.LockSpans, origET.LockSpans)
for i, w := range origET.InFlightWrites {
et.LockSpans[len(origET.LockSpans)+i] = roachpb.Span{Key: w.Key}
}
// See below for why we set Header.DistinctSpans here.
et.LockSpans, ba.Header.DistinctSpans = roachpb.MergeSpans(&et.LockSpans)
return ba, nil
}
}
// Slow-path: If not then we remove each transaction write in the batch from
// the in-flight write set and merge it into the lock spans.
copiedTo := 0
for _, ru := range otherReqs {
req := ru.GetInner()
seq := req.Header().Sequence
switch {
case kvpb.CanParallelCommit(req):
// Concurrent point write being committed in parallel.
case req.Method() == kvpb.QueryIntent:
// Earlier pipelined point write that hasn't been proven yet. We
// could remove from the in-flight writes set when we see these,
// but doing so would prevent us from using the optimization we
// have below where we rely on increasing sequence numbers for
// each subsequent request.
//
// We already don't intend on sending QueryIntent requests in the
// same batch as EndTxn requests because doing so causes a pipeline
// stall, so this doesn't seem worthwhile to support.
continue
default:
// Ranged write or read. These can make it into the final batch with
// a parallel committing EndTxn request if the entire batch issued
// by DistSender lands on the same range. Skip.
continue
}
// Remove the write from the in-flight writes set. We only need to
// search from after the previously removed sequence number forward
// because both the InFlightWrites and the Requests in the batch are
// stored in increasing sequence order.
//
// Maintaining an iterator into the in-flight writes slice and scanning
// instead of performing a binary search on each request changes the
// complexity of this loop from O(n*log(m)) to O(m) where n is the
// number of point writes in the batch and m is the number of in-flight
// writes. These complexities aren't directly comparable, but copying
// all unstripped writes back into et.InFlightWrites is already O(m),
// so the approach here was preferred over repeat binary searches.
match := -1
for i, w := range origET.InFlightWrites[copiedTo:] {
if w.Sequence == seq {
match = i + copiedTo
break
}
}
if match == -1 {
return ba, errors.New("write in batch with EndTxn missing from in-flight writes")
}
w := origET.InFlightWrites[match]
notInBa := origET.InFlightWrites[copiedTo:match]
et.InFlightWrites = append(et.InFlightWrites, notInBa...)
copiedTo = match + 1
// Move the write to the lock spans set since it's no
// longer being tracked in the in-flight write set.
et.LockSpans = append(et.LockSpans, roachpb.Span{Key: w.Key})
}
if et != origET {
// Finish building up the remaining in-flight writes.
notInBa := origET.InFlightWrites[copiedTo:]
et.InFlightWrites = append(et.InFlightWrites, notInBa...)
// Re-sort and merge the lock spans. We can set the batch request's
// DistinctSpans flag based on whether any of in-flight writes in this
// batch overlap with each other. This will have (rare) false negatives
// when the in-flight writes overlap with existing lock spans, but never
// false positives.
et.LockSpans, ba.Header.DistinctSpans = roachpb.MergeSpans(&et.LockSpans)
}
return ba, nil
}
// maybeBumpReadTimestampToWriteTimestamp bumps the batch's read timestamp to
// the write timestamp for transactional batches where these timestamp have
// diverged and where bumping is possible. When possible, this allows the
// transaction to commit without having to retry.
//
// Returns true if the timestamp was bumped.
//
// Note that this, like all the server-side bumping of the read timestamp, only
// works for batches that exclusively contain writes; reads cannot be bumped
// like this because they've already acquired timestamp-aware latches.
func maybeBumpReadTimestampToWriteTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard,
) bool {
if ba.Txn == nil {
return false
}
if !ba.CanForwardReadTimestamp {
return false
}
if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
return false
}
arg, ok := ba.GetArg(kvpb.EndTxn)
if !ok {
return false
}
et := arg.(*kvpb.EndTxnRequest)
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp)
}
// tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts.
//
// This function is called both below and above latching, which is indicated by
// the concurrency guard argument. The concurrency guard, if not nil, indicates
// that the caller is holding latches and cannot adjust its timestamp beyond the
// limits of what is protected by those latches. If the concurrency guard is
// nil, the caller indicates that it is not holding latches and can therefore
// more freely adjust its timestamp because it will re-acquire latches at
// whatever timestamp the batch is bumped to.
//
// Returns true if the timestamp was bumped. Returns false if the timestamp could
// not be bumped.
func tryBumpBatchTimestamp(
ctx context.Context, ba *kvpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp,
) bool {
if g != nil && !g.IsolatedAtLaterTimestamps() {
return false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba.Timestamp = ts
return true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
txn := ba.Txn.Clone()
txn.BumpReadTimestamp(ts)
readTs := ba.Txn.ReadTimestamp
ba = ba.ShallowCopy()
ba.Txn = txn
ba.Timestamp = readTs // Refresh just updated ReadTimestamp
return true
}