Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120419: sqlstats: simplify transaction latency test r=abarganier,xinhaoz a=dhartunian

Remove need for test case counter which causes a data race.

Fixes: #119580
Epic: None
Release note: None

120633: physicalplan: bias towards streaks in bulk planning r=dt a=dt

The bulk oracle is used when planning large, bulk jobs, that are expected to involve many or all ranges in a cluster, where all nodes are likely to be assigned a large number of spans, and overall plan and the specs that represent it will include a very large number of distinct spans.

These large numbers of distinct spans in the specs can increase the cost of executing such a plan. In particular, processes that maintain a frontier of spans processes or not processed or time at which they are processed, such as CDC and PCR, have to track far more distinct spans in large clusters.

We can, however, in some cases reduce this number of distinct spans, by biasing the assignment of key ranges to nodes during replica selection to pick the same node for sequential ranges. By assigning, say, 10 spans to node one, then ten to two, then ten to three, potentially each node is now only tracking one logical span, that is 10x wider, instead of ten distinct spans.

We can bias towards such streaks only when the candidate replicas for a span include one on the node that would extend the streak, so this is an opportunistic optimization that depends on replica placement making it an option.

Additionally we need to be careful when applying such a bias that we still *distribute* work roughly evenly to achieve our desired overall utilization of the cluster. Thus we only bias towards streaks when the streak length is short or when the node on which we are extending a streak remains within some multiple of the least assigned node, reverting to the normal random selection if this is not the case.

Release note: none.
Epic: none.

120766: sql: increase raft command size limit for some tests r=DrewKimball a=DrewKimball

The tests `TestLargeDynamicRows` and `TestLogic_upsert_non_metamorphic` occasionally flake because they set the raft command size limit to the minimum `4MiB`, and their batch size limiting is inexact. This commit prevents the flake by increasing the limit to `5MiB`. Making the batch size limit exact will still be tracked by #117070.

Informs #117070

Release note: None

120769: sqlstats: skip TestSQLStatsCompactor r=abarganier a=dhartunian

Release note: None

120781: streamingest: skip `TestStreamingReplanOnLag` r=rail a=rickystewart

This test is very flaky.

See #120688

Epic: none
Release note: None

Co-authored-by: David Hartunian <davidh@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Drew Kimball <drewk@cockroachlabs.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
  • Loading branch information
5 people committed Mar 20, 2024
6 parents 206fb84 + b71b3ce + a3ddfdd + 0e727ca + a8d44bd + 2d8d51b commit 28a4dae
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 37 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ func backup(

oracle := physicalplan.DefaultReplicaChooser
if useBulkOracle.Get(&evalCtx.Settings.SV) {
oracle = kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), execLocality)
oracle = kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), execLocality, kvfollowerreadsccl.StreakConfig{},
)
}

// We don't return the compatible nodes here since PartitionSpans will
Expand Down
72 changes: 69 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,54 @@ var followerReadOraclePolicy = replicaoracle.RegisterPolicy(newFollowerReadOracl
type bulkOracle struct {
cfg replicaoracle.Config
locFilter roachpb.Locality
streaks StreakConfig
}

// StreakConfig controls the streak-preferring behavior of oracles that support
// it, such as the bulk oracle, for minimizing distinct spans in large plans.
// See the fields and ShouldExtend for details.
type StreakConfig struct {
// Min is the streak lengths below which streaks are always extended, if able,
// unless overridden in a "small" plan by SmallPlanMin below.
Min int
// SmallPlanMin and SmallPlanThreshold are used to override the cap on streak
// lengths that are always extended to be lower when plans are still "small".
// Being "small" is defined as the node with the fewest assigned spans having
// fewer than SmallPlanThreshold assigned spans. If SmallPlanThreshold is >0,
// then when this condition is met SmallPlanMin is used instead of Min.
SmallPlanMin, SmallPlanThreshold int
// MaxSkew is the fraction (e.g. 0.95) of the number of spans assigned to a
// node that must be assigned to the node with the fewest assigned spans to
// extend a streak on that node beyond the Min streak length.
MaxSkew float64
}

// shouldExtend returns whether the current streak should be extended if able,
// according to its length, the number of spans assigned to the node on which it
// would be extended, and the number assigned to the candidate node with the
// fewest span assigned. This would be the case if the streak that would be
// extended is below the minimum streak length (which can be different
// initially/in smaller plans) or the plan is balanced enough to tolerate
// extending the streak. See the the fields of StreakConfig for details.
func (s StreakConfig) shouldExtend(streak, fewestSpansAssigned, assigned int) bool {
if streak < s.SmallPlanMin {
return true
}
if streak < s.Min && s.SmallPlanThreshold < fewestSpansAssigned {
return true
}
return fewestSpansAssigned >= int(float64(assigned)*s.MaxSkew)
}

var _ replicaoracle.Oracle = bulkOracle{}

// NewBulkOracle returns an oracle for planning bulk operations, which will plan
// balancing randomly across all replicas (if follower reads are enabled).
func NewBulkOracle(cfg replicaoracle.Config, locFilter roachpb.Locality) replicaoracle.Oracle {
return bulkOracle{cfg: cfg, locFilter: locFilter}
// TODO(dt): respect streak preferences when using locality filtering. #120755.
func NewBulkOracle(
cfg replicaoracle.Config, locFilter roachpb.Locality, streaks StreakConfig,
) replicaoracle.Oracle {
return bulkOracle{cfg: cfg, locFilter: locFilter, streaks: streaks}
}

// ChoosePreferredReplica implements the replicaoracle.Oracle interface.
Expand All @@ -218,7 +258,7 @@ func (r bulkOracle) ChoosePreferredReplica(
desc *roachpb.RangeDescriptor,
leaseholder *roachpb.ReplicaDescriptor,
_ roachpb.RangeClosedTimestampPolicy,
_ replicaoracle.QueryState,
qs replicaoracle.QueryState,
) (_ roachpb.ReplicaDescriptor, ignoreMisplannedRanges bool, _ error) {
if leaseholder != nil && !checkFollowerReadsEnabled(r.cfg.Settings) {
return *leaseholder, false, nil
Expand All @@ -235,10 +275,36 @@ func (r bulkOracle) ChoosePreferredReplica(
matches = append(matches, i)
}
}
// TODO(dt): ideally we'd just filter `replicas` here, then continue on to
// the code below to pick one as normal, just from the filtered slice.
if len(matches) > 0 {
return replicas[matches[randutil.FastUint32()%uint32(len(matches))]].ReplicaDescriptor, true, nil
}
}

if r.streaks.Min > 0 {
// Find the index of replica in replicas that is on the node that was last
// assigned a span in this plan if it exists. While doing so, find the
// number of spans assigned to that node and to the node with the fewest
// spans assigned to it.
prevIdx, prevAssigned, fewestAssigned := -1, -1, -1
for i := range replicas {
assigned := qs.RangesPerNode.GetDefault(int(replicas[i].NodeID))
if replicas[i].NodeID == qs.LastAssignment {
prevIdx = i
prevAssigned = assigned
}
if assigned < fewestAssigned || fewestAssigned == -1 {
fewestAssigned = assigned
}
}
// If the previously chosen node is a candidate in replicas, check if we want
// to pick it again to extend the node's streak instead of picking randomly.
if prevIdx != -1 && r.streaks.shouldExtend(qs.NodeStreak, fewestAssigned, prevAssigned) {
return replicas[prevIdx].ReplicaDescriptor, true, nil
}
}

return replicas[randutil.FastUint32()%uint32(len(replicas))].ReplicaDescriptor, true, nil
}

Expand Down
83 changes: 79 additions & 4 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,32 +703,107 @@ func TestOracle(t *testing.T) {
var noLeaseholder *roachpb.ReplicaDescriptor
var noCTPolicy roachpb.RangeClosedTimestampPolicy
var noQueryState replicaoracle.QueryState
sk := StreakConfig{Min: 10, SmallPlanMin: 3, SmallPlanThreshold: 3, MaxSkew: 0.95}
// intMap(k1, v1, k2, v2, ...) is a FastIntMaps constructor shorthand.
intMap := func(pairs ...int) util.FastIntMap {
f := util.FastIntMap{}
for i := 0; i < len(pairs); i += 2 {
f.Set(pairs[i], pairs[i+1])
}
return f
}

t.Run("no-followers", func(t *testing.T) {
br := NewBulkOracle(cfg(stNoFollowers), roachpb.Locality{})
br := NewBulkOracle(cfg(stNoFollowers), roachpb.Locality{}, sk)
leaseholder := &roachpb.ReplicaDescriptor{NodeID: 99}
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, leaseholder, noCTPolicy, noQueryState)
require.NoError(t, err)
require.Equal(t, leaseholder.NodeID, picked.NodeID, "no follower reads means we pick the leaseholder")
})
t.Run("no-filter", func(t *testing.T) {
br := NewBulkOracle(cfg(st), roachpb.Locality{})
br := NewBulkOracle(cfg(st), roachpb.Locality{}, sk)
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, noQueryState)
require.NoError(t, err)
require.NotNil(t, picked, "no filter picks some node but could be any node")
})
t.Run("filter", func(t *testing.T) {
br := NewBulkOracle(cfg(st), region("b"))
br := NewBulkOracle(cfg(st), region("b"), sk)
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, noQueryState)
require.NoError(t, err)
require.Equal(t, roachpb.NodeID(2), picked.NodeID, "filter means we pick the node that matches the filter")
})
t.Run("filter-no-match", func(t *testing.T) {
br := NewBulkOracle(cfg(st), region("z"))
br := NewBulkOracle(cfg(st), region("z"), sk)
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, noQueryState)
require.NoError(t, err)
require.NotNil(t, picked, "no match still picks some non-zero node")
})
t.Run("streak-short", func(t *testing.T) {
br := NewBulkOracle(cfg(st), roachpb.Locality{}, sk)
for _, r := range replicas { // Check for each to show it isn't random.
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, replicaoracle.QueryState{
NodeStreak: 1,
LastAssignment: r.NodeID,
})
require.NoError(t, err)
require.Equal(t, r.NodeID, picked.NodeID)
}
})
t.Run("streak-medium", func(t *testing.T) {
br := NewBulkOracle(cfg(st), roachpb.Locality{}, sk)
for _, r := range replicas { // Check for each to show it isn't random.
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, replicaoracle.QueryState{
NodeStreak: 9,
RangesPerNode: intMap(1, 3, 2, 3, 3, 3),
LastAssignment: r.NodeID,
})
require.NoError(t, err)
require.Equal(t, r.NodeID, picked.NodeID)
}
})
t.Run("streak-long-even", func(t *testing.T) {
br := NewBulkOracle(cfg(st), roachpb.Locality{}, sk)
for _, r := range replicas { // Check for each to show it isn't random.
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, replicaoracle.QueryState{
NodeStreak: 50,
RangesPerNode: intMap(1, 1000, 2, 1002, 3, 1005),
LastAssignment: r.NodeID,
})
require.NoError(t, err)
require.Equal(t, r.NodeID, picked.NodeID)
}
})
t.Run("streak-long-skewed-to-other", func(t *testing.T) {
br := NewBulkOracle(cfg(st), roachpb.Locality{}, sk)
for i := 0; i < 10; i++ { // Prove it isn't just randomly picking n2.
qs := replicaoracle.QueryState{
NodeStreak: 50,
RangesPerNode: intMap(1, 10, 2, 10, 3, 1005),
LastAssignment: 2,
}
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, qs)
require.NoError(t, err)
require.Equal(t, roachpb.NodeID(2), picked.NodeID)
}
})
t.Run("streak-long-skewed-randomizes", func(t *testing.T) {
br := NewBulkOracle(cfg(st), roachpb.Locality{}, sk)
qs := replicaoracle.QueryState{
NodeStreak: 50,
RangesPerNode: intMap(1, 10, 2, 10, 3, 1005),
LastAssignment: 3,
}
randomized := false
for i := 0; i < 100; i++ { // .33^100 is close enough to zero that this shouldn't flake.
picked, _, err := br.ChoosePreferredReplica(ctx, noTxn, desc, noLeaseholder, noCTPolicy, qs)
require.NoError(t, err)
if picked.NodeID != qs.LastAssignment {
randomized = true
break
}
}
require.True(t, randomized)
})
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,8 @@ func TestStreamingReplanOnLag(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 120688)

skip.UnderDuressWithIssue(t, 115850, "time to scatter ranges takes too long under duress")

ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func buildReplicationStreamSpec(
// Partition the spans with SQLPlanner
dsp := jobExecCtx.DistSQLPlanner()
noLoc := roachpb.Locality{}
oracle := kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc)
oracle := kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, kvfollowerreadsccl.StreakConfig{})

planCtx := dsp.NewPlanningCtxWithOracle(
ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, oracle, noLoc,
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,11 @@ func TestLargeDynamicRows(t *testing.T) {
err := conn.Exec(ctx, `SET COPY_FAST_PATH_ENABLED = 'true'`)
require.NoError(t, err)

// 4.0 MiB is minimum, copy sets max row size to this value / 3
// 4.0 MiB is minimum, but due to #117070 use 5MiB instead to avoid flakes.
// Copy sets max row size to this value / 3.
const memLimit = kvserverbase.MaxCommandSizeFloor + 1<<20
for _, l := range []serverutils.ApplicationLayerInterface{s, s.SystemLayer()} {
kvserverbase.MaxCommandSize.Override(ctx, &l.ClusterSettings().SV, 4<<20)
kvserverbase.MaxCommandSize.Override(ctx, &l.ClusterSettings().SV, memLimit)
}

err = conn.Exec(ctx, "CREATE TABLE t (s STRING)")
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/upsert_non_metamorphic
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ INSERT INTO src SELECT repeat('a', 100000) FROM generate_series(1, 60)

user host-cluster-root

# Set the memory limit a little higher than the minimum because the batch sizing
# is inexact, tracked in #117070.
statement ok
SET CLUSTER SETTING kv.raft.command.max_size='4MiB';
SET CLUSTER SETTING kv.raft.command.max_size='5MiB';

user root

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/physicalplan/replicaoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ var oracleFactories = map[Policy]OracleFactory{}
type QueryState struct {
RangesPerNode util.FastIntMap
AssignedRanges map[roachpb.RangeID]ReplicaDescriptorEx
LastAssignment roachpb.NodeID
NodeStreak int
}

// ReplicaDescriptorEx is a small extension of the roachpb.ReplicaDescriptor
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/physicalplan/span_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,5 +301,11 @@ func (it *spanResolverIterator) ReplicaInfo(
ReplDesc: repl,
IgnoreMisplannedRanges: ignoreMisplannedRanges,
}
if it.queryState.LastAssignment == repl.NodeID {
it.queryState.NodeStreak++
} else {
it.queryState.NodeStreak = 0
}
it.queryState.LastAssignment = repl.NodeID
return repl, ignoreMisplannedRanges, nil
}
3 changes: 3 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -74,6 +75,8 @@ func TestSQLStatsCompactor(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 120761)

ctx := context.Background()

testCases := []struct {
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sqlstats/sslocal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ go_test(
"//pkg/sql/sqlstats/ssmemstorage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
Expand Down
Loading

0 comments on commit 28a4dae

Please sign in to comment.