Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: refresh range cache on rangefeed barrier failure #119512

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ go_test(
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/replicastats",
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -663,3 +665,13 @@ func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[stri
}
return diffMap
}

func NewRangefeedTxnPusher(
ir *intentresolver.IntentResolver, r *Replica, span roachpb.RSpan,
) rangefeed.TxnPusher {
return &rangefeedTxnPusher{
ir: ir,
r: r,
span: span,
}
}
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,19 @@ func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error {
// Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any
// range changes (e.g. splits/merges) that we haven't applied yet.
lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey)
if err != nil {
if errors.HasType(err, &kvpb.RangeKeyMismatchError{}) {
return errors.Wrap(err, "range barrier failed, range split")
if err != nil && errors.HasType(err, &kvpb.RangeKeyMismatchError{}) {
// The DistSender may have a stale range descriptor, e.g. following a merge.
// Failed unsplittable requests don't trigger a refresh, so we have to
// attempt to refresh it by sending a Get request to the start key.
//
// TODO(erikgrinaker): the DistSender should refresh its cache instead.
if _, err := tp.r.store.db.Get(ctx, tp.span.Key); err != nil {
return errors.Wrap(err, "range barrier failed: range descriptor refresh failed")
}
// Retry the Barrier.
lai, desc, err = tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey)
}
if err != nil {
return errors.Wrap(err, "range barrier failed")
}
if lai == 0 {
Expand Down
100 changes: 100 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/future"
Expand Down Expand Up @@ -1652,3 +1653,102 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) {
rangeFeedCancel()

}

// TestRangefeedTxnPusherBarrierRangeKeyMismatch is a regression test for
// https://github.com/cockroachdb/cockroach/issues/119333
//
// Specifically, it tests that a Barrier call that encounters a
// RangeKeyMismatchError will eagerly attempt to refresh the DistSender range
// cache. The DistSender does not do this itself for unsplittable requests, so
// it might otherwise continually fail.
func TestRangefeedTxnPusherBarrierRangeKeyMismatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow, times out
skip.UnderDeadlock(t)

// Use a timeout, to prevent a hung test.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Start a cluster with 3 nodes.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{},
},
},
})
defer tc.Stopper().Stop(ctx)
defer cancel()

n1 := tc.Server(0)
n3 := tc.Server(2)
db1 := n1.ApplicationLayer().DB()
db3 := n3.ApplicationLayer().DB()

// Split off a range and upreplicate it, with leaseholder on n1. This is the
// range we'll run the barrier across.
prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...)
_, _, err := n1.StorageLayer().SplitRange(prefix)
require.NoError(t, err)
desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...)
t.Logf("split off range %s", desc)

rspan := desc.RSpan()
span := rspan.AsRawSpanWithNoLocals()

// Split off three other ranges.
splitKeys := []roachpb.Key{
append(prefix.Clone(), roachpb.Key("/a")...),
append(prefix.Clone(), roachpb.Key("/b")...),
append(prefix.Clone(), roachpb.Key("/c")...),
}
for _, key := range splitKeys {
_, desc, err = n1.StorageLayer().SplitRange(key)
require.NoError(t, err)
t.Logf("split off range %s", desc)
}

// Scan the ranges on n3 to update the range caches, then run a barrier
// request which should fail with RangeKeyMismatchError.
_, err = db3.Scan(ctx, span.Key, span.EndKey, 0)
require.NoError(t, err)

_, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey)
require.Error(t, err)
require.IsType(t, &kvpb.RangeKeyMismatchError{}, err)
t.Logf("n3 barrier returned %s", err)

// Merge the ranges on n1.
for range splitKeys {
desc, err = n1.StorageLayer().MergeRanges(span.Key)
require.NoError(t, err)
t.Logf("merged range %s", desc)
}

// Barriers should now succeed on n1, which have an updated range cache, but
// fail on n3 which doesn't.
lai, _, err := db1.BarrierWithLAI(ctx, span.Key, span.EndKey)
require.NoError(t, err)
t.Logf("n1 barrier returned LAI %d", lai)

// NB: this could potentially flake if n3 somehow updates its range cache. If
// it does, we can remove this assertion, but it's nice to make sure we're
// actually testing what we think we're testing.
_, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey)
require.Error(t, err)
require.IsType(t, &kvpb.RangeKeyMismatchError{}, err)
t.Logf("n3 barrier returned %s", err)

// However, rangefeedTxnPusher.Barrier() will refresh the cache and
// successfully apply the barrier.
s3 := tc.GetFirstStoreFromServer(t, 2)
repl3 := s3.LookupReplica(roachpb.RKey(span.Key))
t.Logf("repl3 desc: %s", repl3.Desc())
txnPusher := kvserver.NewRangefeedTxnPusher(nil, repl3, rspan)
require.NoError(t, txnPusher.Barrier(ctx))
t.Logf("n3 txnPusher barrier succeeded")
}