Skip to content

Commit

Permalink
sql: disable the streamer for queries which might use internal executor
Browse files Browse the repository at this point in the history
This commit fixes a possible violation of `kv.Txn` API that was
introduced when we enabled the usage of the streamer by default in
22.2.0. Namely, the problem is as follows: the streamer requires the
LeafTxn to be used since it can perform reads concurrently with other
parts of the execution flow; however, if the flow contains a wrapped
`planNode` which is using the internal executor, the query issued by the
IE might use the RootTxn. As a result, we might have concurrency between
the LeafTxn of the "outer" query and the RootTxn of the "inner" query
which is not allowed.

The fix in this commit is "quick" and is disallowing the usage of the
streamer in more cases than strictly necessary. In particular:
1) it makes it so that the streamer is not used by the flow that has any
`planNode`s (even if they don't use the IE at all and don't interact with
the `kv.Txn` otherwise either). Auditing each `planNode` implementation
is error-prone, and this "quick" fix should be more reliable.
2) it makes it so that the streamer is disabled for all queries issued
by the IE.

The thinking behind the second change is as follows: if the query issued
by the IE uses the streamer, then it'll use the LeafTxn. The IE exposes
the iterator API, so it might be possible for the user of the IE to keep
the iterator "open" while returning the control flow back to the "outer"
flow. If that "outer" flow is using the RootTxn, we can have the same
concurrency violation with the "paused" IE iterator performing some
reads in the streamer.

Overall, this is "not terrible, not great" - we effectively fallback to
the pre-22.2 behavior for some types of queries. For the queries that do
process a lot of data, the streamer is likely to still be enabled.

Release note (bug fix): Since 22.2.0 CockroachDB could crash with
"attempting to append refresh spans after the tracked timestamp has
moved forward" error in some rare cases (most likely when querying
`pg_catalog` and `crdb_internal` virtual tables), and this has now been
fixed. The workaround before upgrading would be to run `SET CLUSTER
SETTING sql.distsql.use_streamer.enabled = false;`.
  • Loading branch information
yuzefovich committed Mar 23, 2023
1 parent 5a4309c commit ed3f640
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 14 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
"main_test.go",
"requests_provider_test.go",
"results_buffer_test.go",
"streamer_disabled_test.go",
"streamer_test.go",
],
args = ["-test.timeout=295s"],
Expand All @@ -64,6 +65,7 @@ go_test(
"//pkg/sql/rowcontainer",
"//pkg/storage",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
60 changes: 60 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_disabled_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestStreamerDisabledWithInternalExecutorQuery verifies that the streamer is
// not used when the plan has a planNode that will use the internal executor. It
// also confirms that the streamer is not used for queries issued by that
// planNode.
func TestStreamerDisabledWithInternalExecutorQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

// Trace a query which has a lookup join on top of a scan of a virtual
// table, with that virtual table being populated by a query issued via the
// internal executor.
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "COMMENT ON DATABASE defaultdb IS 'foo'")
runner.Exec(t, "SET tracing = on")
runner.Exec(t, `
SELECT
c.*
FROM
crdb_internal.jobs AS j
INNER LOOKUP JOIN system.comments AS c ON c.type = (j.num_runs - 1)::INT8
WHERE
j.num_runs = 1;
`)
runner.Exec(t, "SET tracing = off")

// Ensure that no streamer spans were created (meaning that the streamer
// wasn't used, neither for the "outer" query nor for any "internal" ones).
r := runner.QueryRow(t, "SELECT count(*) FROM [SHOW TRACE FOR SESSION] WHERE operation ILIKE '%streamer%'")
var numStreamerSpans int
r.Scan(&numStreamerSpans)
require.Zero(t, numStreamerSpans)
}
24 changes: 22 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,28 @@ func (dsp *DistSQLPlanner) Run(
// TODO(yuzefovich): fix the propagation of the lock spans with the
// leaf txns and remove this check. See #94290.
containsNonDefaultLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking)
if !containsNonDefaultLocking {
if execinfra.CanUseStreamer(dsp.st) {

// We also currently disable the usage of the Streamer API whenever
// we have a wrapped planNode. This is done to prevent scenarios
// where some of planNodes will use the RootTxn (via the internal
// executor) which prohibits the usage of the LeafTxn for this flow.
//
// Note that we're disallowing the Streamer API in more cases than
// strictly necessary (i.e. there are planNodes that don't use the
// txn at all), but auditing each planNode implementation to see
// which are using the internal executor is error-prone, so we just
// disable the Streamer API for the "super-set" of problematic
// cases.
mustUseRootTxn := func() bool {
for _, p := range plan.Processors {
if p.Spec.Core.LocalPlanNode != nil {
return true
}
}
return false
}()
if !containsNonDefaultLocking && !mustUseRootTxn {
if evalCtx.SessionData().StreamerEnabled {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
// Both index and lookup joins, with and without
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3498,6 +3498,10 @@ func (m *sessionDataMutator) SetPreparedStatementsCacheSize(val int64) {
m.data.PreparedStatementsCacheSize = val
}

func (m *sessionDataMutator) SetStreamerEnabled(val bool) {
m.data.StreamerEnabled = val
}

// Utility functions related to scrubbing sensitive information on SQL Stats.

// quantizeCounts ensures that the Count field in the
Expand Down
16 changes: 5 additions & 11 deletions pkg/sql/execinfra/readerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -204,16 +203,10 @@ func (h *LimitHintHelper) ReadSomeRows(rowsRead int64) error {
return nil
}

// CanUseStreamer returns whether the kvstreamer.Streamer API should be used if
// possible.
func CanUseStreamer(settings *cluster.Settings) bool {
return useStreamerEnabled.Get(&settings.SV)
}

// UseStreamer returns whether the kvstreamer.Streamer API should be used as
// well as the txn that should be used (regardless of the boolean return value).
func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) {
useStreamer := CanUseStreamer(flowCtx.EvalCtx.Settings) && flowCtx.Txn != nil &&
useStreamer := flowCtx.EvalCtx.SessionData().StreamerEnabled && flowCtx.Txn != nil &&
flowCtx.Txn.Type() == kv.LeafTxn && flowCtx.MakeLeafTxn != nil
if !useStreamer {
return false, flowCtx.Txn, nil
Expand All @@ -227,9 +220,10 @@ func (flowCtx *FlowCtx) UseStreamer() (bool, *kv.Txn, error) {
return true, leafTxn, nil
}

// useStreamerEnabled determines whether the Streamer API should be used.
// TODO(yuzefovich): remove this in 23.1.
var useStreamerEnabled = settings.RegisterBoolSetting(
// UseStreamerEnabled determines the default value for the 'streamer_enabled'
// session variable.
// TODO(yuzefovich): consider removing this at some point.
var UseStreamerEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.use_streamer.enabled",
"determines whether the usage of the Streamer API is allowed. "+
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,10 @@ func applyInternalExecutorSessionExceptions(sd *sessiondata.SessionData) {
// DisableBuffering is not supported by the InternalExecutor
// which uses streamingCommandResults.
sd.LocalOnlySessionData.AvoidBuffering = false
// At the moment, we disable the usage of the Streamer API in the internal
// executor to avoid possible concurrency with the "outer" query (which
// might be using the RootTxn).
sd.LocalOnlySessionData.StreamerEnabled = false
}

// applyOverrides overrides the respective fields from sd for all the fields set on o.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5302,6 +5302,7 @@ ssl on
ssl_renegotiation_limit 0
standard_conforming_strings on
statement_timeout 0
streamer_enabled on
stub_catalog_tables on
synchronize_seqscans on
synchronous_commit on
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2790,6 +2790,7 @@ show_primary_key_constraint_on_not_visible_columns on NULL
sql_safe_updates off NULL NULL NULL string
standard_conforming_strings on NULL NULL NULL string
statement_timeout 0 NULL NULL NULL string
streamer_enabled on NULL NULL NULL string
stub_catalog_tables on NULL NULL NULL string
synchronize_seqscans on NULL NULL NULL string
synchronous_commit on NULL NULL NULL string
Expand Down Expand Up @@ -2942,6 +2943,7 @@ show_primary_key_constraint_on_not_visible_columns on NULL
sql_safe_updates off NULL user NULL off off
standard_conforming_strings on NULL user NULL on on
statement_timeout 0 NULL user NULL 0s 0s
streamer_enabled on NULL user NULL on on
stub_catalog_tables on NULL user NULL on on
synchronize_seqscans on NULL user NULL on on
synchronous_commit on NULL user NULL on on
Expand Down Expand Up @@ -3094,6 +3096,7 @@ show_primary_key_constraint_on_not_visible_columns NULL NULL NULL
sql_safe_updates NULL NULL NULL NULL NULL
standard_conforming_strings NULL NULL NULL NULL NULL
statement_timeout NULL NULL NULL NULL NULL
streamer_enabled NULL NULL NULL NULL NULL
stub_catalog_tables NULL NULL NULL NULL NULL
synchronize_seqscans NULL NULL NULL NULL NULL
synchronous_commit NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ show_primary_key_constraint_on_not_visible_columns on
sql_safe_updates off
standard_conforming_strings on
statement_timeout 0
streamer_enabled on
stub_catalog_tables on
synchronize_seqscans on
synchronous_commit on
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinreader_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestJoinReaderUsesBatchLimit(t *testing.T) {
// non-streamer code path.
// TODO(yuzefovich): remove the test altogether when the corresponding
// cluster setting is removed (i.e. only the streamer code path remains).
_, err := sqlDB.Exec("SET CLUSTER SETTING sql.distsql.use_streamer.enabled = false;")
_, err := sqlDB.Exec("SET streamer_enabled = false;")
require.NoError(t, err)

// We're going to create a table with enough rows to exceed a batch's memory
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ message LocalOnlySessionData {
// Execution of these deallocated prepared statements will fail until they are
// prepared again.
int64 prepared_statements_cache_size = 97;
// StreamerEnabled controls whether the Streamer API can be used.
bool streamer_enabled = 98;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,25 @@ var varGen = map[string]sessionVar{
return string(humanizeutil.IBytes(0))
},
},

// CockroachDB extension.
`streamer_enabled`: {
GetStringVal: makePostgresBoolGetStringValFn(`streamer_enabled`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar("streamer_enabled", s)
if err != nil {
return err
}
m.SetStreamerEnabled(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().StreamerEnabled), nil
},
GlobalDefault: func(sv *settings.Values) string {
return formatBoolAsPostgresSetting(execinfra.UseStreamerEnabled.Get(sv))
},
},
}

// We want test coverage for this on and off so make it metamorphic.
Expand Down

0 comments on commit ed3f640

Please sign in to comment.