Skip to content

Commit

Permalink
sql: extend resolverQueue to resolve waiter txn id
Browse files Browse the repository at this point in the history
Previously, the resovlerQueue used in the contention event store only
resolved the txnID of the blocking transaction.
This commit, the resolverQueue would also resolve the txnID of the
waiting transaction.

Release note: None
  • Loading branch information
Azhng committed Feb 24, 2022
1 parent d8a2fec commit c82d6a7
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 120 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Expand Up @@ -254,6 +254,7 @@ ALL_TESTS = [
"//pkg/sql/contention/contentionutils:contentionutils_test",
"//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/contentionpb:contentionpb_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
"//pkg/sql/doctor:doctor_test",
Expand Down
21 changes: 10 additions & 11 deletions pkg/sql/contention/event_store.go
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

const (
Expand Down Expand Up @@ -248,14 +247,14 @@ func (s *eventStore) forEachEvent(
// is important since the op() callback can take arbitrary long to execute,
// we should not be holding the lock while op() is executing.
s.mu.RLock()
keys := make([]uuid.UUID, 0, s.mu.store.Len())
keys := make([]uint64, 0, s.mu.store.Len())
s.mu.store.Do(func(entry *cache.Entry) {
keys = append(keys, entry.Key.(uuid.UUID))
keys = append(keys, entry.Key.(uint64))
})
s.mu.RUnlock()

for i := range keys {
event, ok := s.getEventByBlockingTxnID(keys[i])
event, ok := s.getEventByEventHash(keys[i])
if !ok {
// The event might have been evicted between reading the keys and
// getting the event. In this case we simply ignore it.
Expand All @@ -269,13 +268,13 @@ func (s *eventStore) forEachEvent(
return nil
}

func (s *eventStore) getEventByBlockingTxnID(
txnID uuid.UUID,
func (s *eventStore) getEventByEventHash(
hash uint64,
) (_ contentionpb.ExtendedContentionEvent, ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()

event, ok := s.mu.store.Get(txnID)
event, ok := s.mu.store.Get(hash)
return event.(contentionpb.ExtendedContentionEvent), ok
}

Expand Down Expand Up @@ -319,7 +318,7 @@ func (s *eventStore) upsertBatch(events []contentionpb.ExtendedContentionEvent)
if !ok {
atomic.AddInt64(&s.atomic.storageSize, int64(entryBytes(&events[i])))
}
s.mu.store.Add(blockingTxnID, events[i])
s.mu.store.Add(events[i].Hash(), events[i])
}
}

Expand All @@ -333,7 +332,7 @@ func (s *eventStore) resolutionIntervalWithJitter() time.Duration {
}

func entryBytes(event *contentionpb.ExtendedContentionEvent) int {
// Since we store the blocking txn's txnID as the key to the unordered cache,
// this is means we are storing another copy of uuid.
return event.Size() + uuid.UUID{}.Size()
// Since we store the event's hash as the key to the unordered cache,
// this is means we are storing another copy of uint64 (8 bytes).
return event.Size() + 8
}
6 changes: 5 additions & 1 deletion pkg/sql/contention/event_store_test.go
Expand Up @@ -201,10 +201,14 @@ func randomlyGenerateTestData(testSize int, numOfCoordinator int) []testData {
tcs := make([]testData, 0, testSize)
for i := 0; i < testSize; i++ {
tcs = append(tcs, testData{
ResolvedTxnID: contentionpb.ResolvedTxnID{
blockingTxn: contentionpb.ResolvedTxnID{
TxnID: uuid.FastMakeV4(),
TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxUint64 - uint64(i)),
},
waitingTxn: contentionpb.ResolvedTxnID{
TxnID: uuid.FastMakeV4(),
TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxUint64/2 - uint64(i)),
},
coordinatorNodeID: strconv.Itoa(rand.Intn(numOfCoordinator)),
})
}
Expand Down
184 changes: 130 additions & 54 deletions pkg/sql/contention/resolver.go
Expand Up @@ -12,6 +12,7 @@ package contention

import (
"context"
"math"
"sort"
"strconv"

Expand Down Expand Up @@ -83,6 +84,11 @@ const (
// retry resolving until giving up. This needs to be a finite number to handle
// the case where the node is permanently removed from the cluster.
retryBudgetForRPCFailure = uint32(3)

// retryBudgetForTxnInProgress is a special value indicating that the resolver should
// indefinitely retry the resolution. This is because the retry is due to the
// transaction is still in progress.
retryBudgetForTxnInProgress = uint32(math.MaxUint32)
)

// ResolverEndpoint is an alias for the TxnIDResolution RPC endpoint in the
Expand All @@ -96,7 +102,10 @@ type resolverQueueImpl struct {
unresolvedEvents []contentionpb.ExtendedContentionEvent
resolvedEvents []contentionpb.ExtendedContentionEvent

remainingRetries map[uuid.UUID]uint32
// remainingRetries stores a mapping of each contention event to its
// remaining number of retries attempts. The key in the map is the hash of
// the contention event.
remainingRetries map[uint64]uint32
}

resolverEndpoint ResolverEndpoint
Expand All @@ -111,7 +120,7 @@ func newResolver(endpoint ResolverEndpoint, sizeHint int) *resolverQueueImpl {

s.mu.unresolvedEvents = make([]contentionpb.ExtendedContentionEvent, 0, sizeHint)
s.mu.resolvedEvents = make([]contentionpb.ExtendedContentionEvent, 0, sizeHint)
s.mu.remainingRetries = make(map[uuid.UUID]uint32, sizeHint)
s.mu.remainingRetries = make(map[uint64]uint32, sizeHint)

return s
}
Expand Down Expand Up @@ -160,43 +169,54 @@ func (q *resolverQueueImpl) resolveLocked(ctx context.Context) error {
// by observing some node'q load metrics (e.g. QPS value) and start
// self-throttling once that QPS value exceed certain value.

req := makeRPCRequestFromBatch(currentBatch)
resp, err := q.resolverEndpoint(ctx, req)
blockTxnIDsRPCReq, waitingTxnIDsRPCReq := makeRPCRequestsFromBatch(currentBatch)

blockingTxnIDsRPCResp, err := q.resolverEndpoint(ctx, blockTxnIDsRPCReq)
if err != nil {
allErrors = errors.CombineErrors(allErrors, err)
}

waitingTxnIDRPCResp, err := q.resolverEndpoint(ctx, waitingTxnIDsRPCReq)
if err != nil {
q.maybeRequeueBatchLocked(currentBatch, retryBudgetForRPCFailure)
// Read next batch of unresolved contention events.
currentBatch, remaining = readUntilNextCoordinatorID(remaining)
allErrors = errors.CombineErrors(allErrors, err)
continue
}
resolvedTxnIDs, inProgressTxnIDs := extractResolvedAndInProgressTxnIDs(resp)

resolvedBlockingTxnIDs, inProgressBlockingTxnIDs := extractResolvedAndInProgressTxnIDs(blockingTxnIDsRPCResp)
resolvedWaitingTxnIDs, inProgressWaitingTxnIDs := extractResolvedAndInProgressTxnIDs(waitingTxnIDRPCResp)

for _, event := range currentBatch {
// If the coordinator node indicates that it is aware of the requested
// txnID but does not yet have the corresponding txnFingerprintID,
// (e.g. when the transaction is still executing), we re-queue
// the contention event, so we will check in with the coordinator node
// again later. In this case, we don't want to update the retry
// record since we are confident that the txnID entry on the coordinator
// node has not yet being evicted.
if _, ok := inProgressTxnIDs[event.BlockingEvent.TxnMeta.ID]; ok {
q.mu.unresolvedEvents = append(q.mu.unresolvedEvents, event)
// Clear any retry count if there is any.
delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID)
continue
needToRetryDueToBlockingTxnID, initialRetryBudgetDueToBlockingTxnID :=
maybeUpdateTxnFingerprintID(
event.BlockingEvent.TxnMeta.ID,
&event.BlockingTxnFingerprintID,
resolvedBlockingTxnIDs,
inProgressBlockingTxnIDs,
)

needToRetryDueToWaitingTxnID, initialRetryBudgetDueToWaitingTxnID :=
maybeUpdateTxnFingerprintID(
event.WaitingTxnID,
&event.WaitingTxnFingerprintID,
resolvedWaitingTxnIDs,
inProgressWaitingTxnIDs,
)

// The initial retry budget is
// max(
// initialRetryBudgetDueToBlockingTxnID,
// initialRetryBudgetDueToWaitingTxnID,
// ).
initialRetryBudget := initialRetryBudgetDueToBlockingTxnID
if initialRetryBudget < initialRetryBudgetDueToWaitingTxnID {
initialRetryBudget = initialRetryBudgetDueToWaitingTxnID
}

// If we successfully resolveLocked the transaction ID, we append it to the
// resolvedEvent slice and clear remaining retry count if there is any.
if txnFingerprintID, ok := resolvedTxnIDs[event.BlockingEvent.TxnMeta.ID]; ok {
event.BlockingTxnFingerprintID = txnFingerprintID
if needToRetryDueToBlockingTxnID || needToRetryDueToWaitingTxnID {
q.maybeRequeueEventForRetryLocked(event, initialRetryBudget)
} else {
q.mu.resolvedEvents = append(q.mu.resolvedEvents, event)

delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID)
continue
delete(q.mu.remainingRetries, event.Hash())
}

q.maybeRequeueEventForRetryLocked(event, retryBudgetForMissingResult)
}

currentBatch, remaining = readUntilNextCoordinatorID(remaining)
Expand All @@ -205,32 +225,71 @@ func (q *resolverQueueImpl) resolveLocked(ctx context.Context) error {
return allErrors
}

func (q *resolverQueueImpl) maybeRequeueBatchLocked(
batch []contentionpb.ExtendedContentionEvent, initialBudget uint32,
) {
for _, event := range batch {
q.maybeRequeueEventForRetryLocked(event, initialBudget)
func maybeUpdateTxnFingerprintID(
txnID uuid.UUID,
existingTxnFingerprintID *roachpb.TransactionFingerprintID,
resolvedTxnIDs, inProgressTxnIDs map[uuid.UUID]roachpb.TransactionFingerprintID,
) (needToRetry bool, initialRetryBudget uint32) {
// This means the txnID has already been resolved into transaction fingerprint
// ID.
if *existingTxnFingerprintID != roachpb.InvalidTransactionFingerprintID {
return false /* needToRetry */, 0 /* initialRetryBudget */
}

// Sometimes DistSQL engine is used by in weird ways. It is possible for a
// DistSQL flow to exist without being associated with any transactions and
// can still experience contentions. When that happens, we don't attempt to
// resolve it.
if uuid.Nil.Equal(txnID) {
return false /* needToRetry */, 0 /* initialRetryBudget */
}

if resolvedTxnIDs == nil {
return true /* needToRetry */, retryBudgetForRPCFailure
}

if _, ok := inProgressTxnIDs[txnID]; ok {
return true /* needToRetry */, retryBudgetForTxnInProgress
}

if inProgressTxnIDs == nil {
return true /* needToRetry */, retryBudgetForRPCFailure
}

if txnFingerprintID, ok := resolvedTxnIDs[txnID]; ok {
*existingTxnFingerprintID = txnFingerprintID
return false /* needToRetry */, 0 /* initialRetryBudget */
}

return true /* needToRetry */, retryBudgetForMissingResult
}

func (q *resolverQueueImpl) maybeRequeueEventForRetryLocked(
event contentionpb.ExtendedContentionEvent, initialBudget uint32,
) (requeued bool) {
// If we fail to resolve the result, we look up this event's remaining retry
// count. If its retry budget is exhausted, we discard it. Else, we
// re-queue the event for retry and decrement its retry budget for the
// event.
remainingRetryBudget, ok := q.mu.remainingRetries[event.BlockingEvent.TxnMeta.ID]
if !ok {
remainingRetryBudget = initialBudget
var remainingRetryBudget uint32
var ok bool

if initialBudget == retryBudgetForTxnInProgress {
delete(q.mu.remainingRetries, event.Hash())
} else {
remainingRetryBudget--
}
q.mu.remainingRetries[event.BlockingEvent.TxnMeta.ID] = remainingRetryBudget
// If we fail to resolve the result, we look up this event's remaining retry
// count. If its retry budget is exhausted, we discard it. Else, we
// re-queue the event for retry and decrement its retry budget for the
// event.
remainingRetryBudget, ok = q.mu.remainingRetries[event.Hash()]
if !ok {
remainingRetryBudget = initialBudget
} else {
remainingRetryBudget--
}

if remainingRetryBudget == 0 {
delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID)
return false /* requeued */
q.mu.remainingRetries[event.Hash()] = remainingRetryBudget

if remainingRetryBudget == 0 {
delete(q.mu.remainingRetries, event.Hash())
return false /* requeued */
}
}

q.mu.unresolvedEvents = append(q.mu.unresolvedEvents, event)
Expand All @@ -257,6 +316,10 @@ func readUntilNextCoordinatorID(
func extractResolvedAndInProgressTxnIDs(
resp *serverpb.TxnIDResolutionResponse,
) (resolvedTxnIDs, inProgressTxnIDs map[uuid.UUID]roachpb.TransactionFingerprintID) {
if resp == nil {
return nil /* resolvedTxnID */, nil /* inProgressTxnIDs */
}

resolvedTxnIDs = make(map[uuid.UUID]roachpb.TransactionFingerprintID, len(resp.ResolvedTxnIDs))
inProgressTxnIDs = make(map[uuid.UUID]roachpb.TransactionFingerprintID, len(resp.ResolvedTxnIDs))

Expand All @@ -271,17 +334,30 @@ func extractResolvedAndInProgressTxnIDs(
return resolvedTxnIDs, inProgressTxnIDs
}

func makeRPCRequestFromBatch(
// makeRPCRequestsFromBatch creates two TxnIDResolution RPC requests from the
// batch of contentionpb.ExtendedContentionEvent. If the event already contains
// a resolved transaction fingerprint ID, then the corresponding transaction ID
// is omitted from the RPC request payload.
func makeRPCRequestsFromBatch(
batch []contentionpb.ExtendedContentionEvent,
) *serverpb.TxnIDResolutionRequest {
req := &serverpb.TxnIDResolutionRequest{
) (blockingTxnIDRPCReq, waitingTxnIDRPCReq *serverpb.TxnIDResolutionRequest) {
blockingTxnIDRPCReq = &serverpb.TxnIDResolutionRequest{
CoordinatorID: strconv.Itoa(int(batch[0].BlockingEvent.TxnMeta.CoordinatorNodeID)),
TxnIDs: make([]uuid.UUID, 0, len(batch)),
}
waitingTxnIDRPCReq = &serverpb.TxnIDResolutionRequest{
CoordinatorID: "local",
TxnIDs: make([]uuid.UUID, 0, len(batch)),
}

for _, event := range batch {
req.TxnIDs = append(req.TxnIDs, event.BlockingEvent.TxnMeta.ID)
for i := range batch {
if batch[i].BlockingTxnFingerprintID == roachpb.InvalidTransactionFingerprintID {
blockingTxnIDRPCReq.TxnIDs = append(blockingTxnIDRPCReq.TxnIDs, batch[i].BlockingEvent.TxnMeta.ID)
}
if batch[i].WaitingTxnFingerprintID == roachpb.InvalidTransactionFingerprintID {
waitingTxnIDRPCReq.TxnIDs = append(waitingTxnIDRPCReq.TxnIDs, batch[i].WaitingTxnID)
}
}

return req
return blockingTxnIDRPCReq, waitingTxnIDRPCReq
}

0 comments on commit c82d6a7

Please sign in to comment.