Skip to content

Commit

Permalink
Merge #118722
Browse files Browse the repository at this point in the history
118722: sql: don't parallelize *any* FK and unique checks containing locking r=yuzefovich a=michae2

**sql, execbuilder: move plan flags from planTop to planComponents**

Execbuild produces a `sql.planComponents` containing the main query
plan, plus plans for any subqueries, checks, and cascades. In the normal
SQL code path (`sql.(*optPlanningCtx).runExecBuilder`) we copy this
planComponents into a `sql.planTop`, which then is decorated with extra
information gathered from the Builder.

However, there are other users of execbuild that only work with the
planComponents, and don't have a planTop. In the next commit, one such
user, `PlanAndRunCascadesAndChecks`, needs to see some of the plan flags
set when building FK check plans, but doesn't have access to the Builder
which gathered those plans.

To solve this, this commit moves plan flags from the planTop to
planComponents, and then refactors the execbuilder and exec factory to
set some of these flags when creating the planComponents.

Informs: #118720

Epic: None

Release note: None

---

**sql: don't parallelize *any* FK and unique checks containing locking**

In #111896 we disallowed parallel execution of FK and unique checks that
contain locking, to avoid usage of LeafTxns. But #111896 only considered
checks built during planning of the main query, and overlooked checks
built during planning of FK cascades.

This commit also considers checks built during planning of FK cascades.

Fixes: #118720

Epic: None

Release note (bug fix): This commit fixes an internal error with message
like: "LeafTxn ... incompatible with locking request" that occurs when
performing an update under read committed isolation which cascades to a
table with multiple other foreign keys.

---

**sql: rename \*ContainsNonDefaultKeyLocking to \*ContainsLocking**

"Default" locking here simply means no locking. I find this convention
of saying "non-default key locking" a little verbose and confusing.

Epic: None

Release note: None

Co-authored-by: Michael Erickson <michae2@cockroachlabs.com>
  • Loading branch information
craig[bot] and michae2 committed Feb 6, 2024
2 parents a57ec22 + 5021627 commit 9a3b3c9
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 111 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4889,7 +4889,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
if planner == nil ||
evalCtx.SessionData().Internal ||
planner.curPlan.flags.IsSet(planFlagContainsMutation) ||
planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking) {
planner.curPlan.flags.IsSet(planFlagContainsLocking) {
// Don't parallelize the scans if we have a local plan if
// - we don't have a planner which is the case when we are not on
// the main query path;
Expand All @@ -4900,7 +4900,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
// any synchronization (see #116039);
// - the plan contains a mutation operation - we currently don't
// support any parallelism when mutations are present;
// - the plan uses non-default key locking strength (see #94290).
// - the plan uses locking (see #94290).
return planCtx
}
prohibitParallelization, hasScanNodeToParallelize := checkScanParallelizationIfLocal(ctx, &planner.curPlan.planComponents)
Expand Down
27 changes: 15 additions & 12 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,15 +740,14 @@ func (dsp *DistSQLPlanner) Run(
// during the flow setup, we need to populate leafInputState below,
// so we tell the localState that there is concurrency.

// At the moment, we disable the usage of the Streamer API for local
// plans when non-default key locking modes are requested on some of
// the processors. This is the case since the lock spans propagation
// doesn't happen for the leaf txns which can result in excessive
// contention for future reads (since the acquired locks are not
// cleaned up properly when the txn commits).
// TODO(yuzefovich): fix the propagation of the lock spans with the
// leaf txns and remove this check. See #94290.
containsNonDefaultLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking)
// At the moment, we disable the usage of the Streamer API for local plans
// when locking is used by any of the processors. This is the case since
// the lock spans propagation doesn't happen for the leaf txns which can
// result in excessive contention for future reads (since the acquired
// locks are not cleaned up properly when the txn commits).
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
// txns and remove this check. See #94290.
containsLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)

// We also currently disable the usage of the Streamer API whenever
// we have a wrapped planNode. This is done to prevent scenarios
Expand All @@ -769,7 +768,7 @@ func (dsp *DistSQLPlanner) Run(
}
return false
}()
if !containsNonDefaultLocking && !mustUseRootTxn {
if !containsLocking && !mustUseRootTxn {
if evalCtx.SessionData().StreamerEnabled {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
Expand Down Expand Up @@ -2072,6 +2071,8 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks(
return getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}

checksContainLocking := planner.curPlan.flags.IsSet(planFlagCheckContainsLocking)

// We treat plan.cascades as a queue.
for i := 0; i < len(plan.cascades); i++ {
// The original bufferNode is stored in c.Buffer; we can refer to it
Expand Down Expand Up @@ -2138,6 +2139,9 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks(
// Collect any new checks.
if len(cp.checkPlans) > 0 {
plan.checkPlans = append(plan.checkPlans, cp.checkPlans...)
if cp.flags.IsSet(planFlagCheckContainsLocking) {
checksContainLocking = true
}
}

// In cyclical reference situations, the number of cascading operations can
Expand Down Expand Up @@ -2183,8 +2187,7 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks(
// multiple checks to run, none of the checks have non-default locking, and
// we're likely to have quota to do so.
runParallelChecks := parallelizeChecks.Get(&dsp.st.SV) &&
len(plan.checkPlans) > 1 &&
!planner.curPlan.flags.IsSet(planFlagCheckContainsNonDefaultLocking) &&
len(plan.checkPlans) > 1 && !checksContainLocking &&
dsp.parallelChecksSem.ApproximateQuota() > 0
if runParallelChecks {
// At the moment, we rely on not using the newer DistSQL spec factory to
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ func (e *distSQLSpecExecFactory) ConstructPlan(
cascades []exec.Cascade,
checks []exec.Node,
rootRowCount int64,
flags exec.PlanFlags,
) (exec.Plan, error) {
if len(subqueries) != 0 {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: subqueries")
Expand All @@ -888,7 +889,7 @@ func (e *distSQLSpecExecFactory) ConstructPlan(
} else {
p.physPlan.onClose = e.planCtx.getCleanupFunc()
}
return constructPlan(e.planner, root, subqueries, cascades, checks, rootRowCount)
return constructPlan(e.planner, root, subqueries, cascades, checks, rootRowCount, flags)
}

func (e *distSQLSpecExecFactory) ConstructExplainOpt(
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func constructPlan(
cascades []exec.Cascade,
checks []exec.Node,
rootRowCount int64,
flags exec.PlanFlags,
) (exec.Plan, error) {
res := &planComponents{}
assignPlan := func(plan *planMaybePhysical, node exec.Node) {
Expand Down Expand Up @@ -81,6 +82,30 @@ func constructPlan(
assignPlan(&res.checkPlans[i].plan, checks[i])
}
}
if flags.IsSet(exec.PlanFlagIsDDL) {
res.flags.Set(planFlagIsDDL)
}
if flags.IsSet(exec.PlanFlagContainsFullTableScan) {
res.flags.Set(planFlagContainsFullTableScan)
}
if flags.IsSet(exec.PlanFlagContainsFullIndexScan) {
res.flags.Set(planFlagContainsFullIndexScan)
}
if flags.IsSet(exec.PlanFlagContainsLargeFullTableScan) {
res.flags.Set(planFlagContainsLargeFullTableScan)
}
if flags.IsSet(exec.PlanFlagContainsLargeFullIndexScan) {
res.flags.Set(planFlagContainsLargeFullIndexScan)
}
if flags.IsSet(exec.PlanFlagContainsMutation) {
res.flags.Set(planFlagContainsMutation)
}
if flags.IsSet(exec.PlanFlagContainsLocking) {
res.flags.Set(planFlagContainsLocking)
}
if flags.IsSet(exec.PlanFlagCheckContainsLocking) {
res.flags.Set(planFlagCheckContainsLocking)
}

return res, nil
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/fk_read_committed
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ statement ok
DELETE FROM jars WHERE j = 2

# Test that we do not use parallel FK checks under RC (see #111888).
subtest parallelFK
subtest no-parallel-fk-checks

statement ok
CREATE TABLE a (a PRIMARY KEY) AS SELECT 1
Expand All @@ -48,7 +48,7 @@ CREATE TABLE e (e PRIMARY KEY) AS SELECT 1

statement ok
CREATE TABLE f (
a INT REFERENCES a (a),
a INT REFERENCES a (a) ON UPDATE CASCADE,
b INT REFERENCES b (b),
c INT REFERENCES c (c),
d INT REFERENCES d (d),
Expand All @@ -64,3 +64,37 @@ INSERT INTO f VALUES (1, 1, 1, 1, 1, 1)

statement ok
RESET enable_insert_fast_path

# Test that we do not use parallel FK checks under RC (see #111888).
subtest no-parallel-fk-checks-from-cascade

statement ok
CREATE TABLE x (
x INT,
FOREIGN KEY (x) REFERENCES a (a) ON UPDATE CASCADE,
FOREIGN KEY (x) REFERENCES b (b),
FOREIGN KEY (x) REFERENCES c (c),
FOREIGN KEY (x) REFERENCES d (d),
FOREIGN KEY (x) REFERENCES e (e)
)

statement ok
INSERT INTO x VALUES (1)

statement error pq: update on table "x" violates foreign key constraint
UPDATE a SET a = 2 WHERE a = 1

statement ok
INSERT INTO b VALUES (2)

statement ok
INSERT INTO c VALUES (2)

statement ok
INSERT INTO d VALUES (2)

statement ok
INSERT INTO e VALUES (2)

statement ok
UPDATE a SET a = 2 WHERE a = 1
39 changes: 5 additions & 34 deletions pkg/sql/opt/exec/execbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,44 +112,13 @@ type Builder struct {

// -- output --

// IsDDL is set to true if the statement contains DDL.
IsDDL bool

// ContainsFullTableScan is set to true if the statement contains an
// unconstrained primary index scan. This could be a full scan of any
// cardinality.
ContainsFullTableScan bool

// ContainsFullIndexScan is set to true if the statement contains an
// unconstrained non-partial secondary index scan. This could be a full scan
// of any cardinality.
ContainsFullIndexScan bool

// ContainsLargeFullTableScan is set to true if the statement contains an
// unconstrained primary index scan estimated to read more than
// large_full_scan_rows (or without available stats).
ContainsLargeFullTableScan bool

// ContainsLargeFullIndexScan is set to true if the statement contains an
// unconstrained non-partial secondary index scan estimated to read more than
// large_full_scan_rows (or without without available stats).
ContainsLargeFullIndexScan bool
// flags tracks various properties of the plan accumulated while building.
flags exec.PlanFlags

// containsBoundedStalenessScan is true if the query uses bounded
// staleness and contains a scan.
containsBoundedStalenessScan bool

// ContainsMutation is set to true if the whole plan contains any mutations.
ContainsMutation bool

// ContainsNonDefaultKeyLocking is set to true if at least one node in the
// plan uses non-default key locking strength.
ContainsNonDefaultKeyLocking bool

// CheckContainsNonDefaultKeyLocking is set to true if at least one node in at
// least one check query plan uses non-default key locking strength.
CheckContainsNonDefaultKeyLocking bool

// MaxFullScanRows is the maximum number of rows scanned by a full scan, as
// estimated by the optimizer.
MaxFullScanRows float64
Expand Down Expand Up @@ -267,7 +236,9 @@ func (b *Builder) Build() (_ exec.Plan, err error) {
}

rootRowCount := int64(b.e.(memo.RelExpr).Relational().Statistics().RowCountIfAvailable())
return b.factory.ConstructPlan(plan.root, b.subqueries, b.cascades, b.checks, rootRowCount)
return b.factory.ConstructPlan(
plan.root, b.subqueries, b.cascades, b.checks, rootRowCount, b.flags,
)
}

func (b *Builder) wrapFunction(fnName string) (tree.ResolvableFunctionReference, error) {
Expand Down
20 changes: 12 additions & 8 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,11 +748,15 @@ func mutationOutputColMap(mutation memo.RelExpr) opt.ColMap {
return colMap
}

// checkContainsLocking sets CheckContainsNonDefaultKeyLocking based on whether
// we found non-default locking while building a check query plan.
// checkContainsLocking sets PlanFlagCheckContainsLocking based on whether we
// found locking while building a check query plan.
func (b *Builder) checkContainsLocking(mainContainsLocking bool) {
b.CheckContainsNonDefaultKeyLocking = b.CheckContainsNonDefaultKeyLocking || b.ContainsNonDefaultKeyLocking
b.ContainsNonDefaultKeyLocking = b.ContainsNonDefaultKeyLocking || mainContainsLocking
if b.flags.IsSet(exec.PlanFlagContainsLocking) {
b.flags.Set(exec.PlanFlagCheckContainsLocking)
}
if mainContainsLocking {
b.flags.Set(exec.PlanFlagContainsLocking)
}
}

// buildUniqueChecks builds uniqueness check queries. These check queries are
Expand All @@ -762,8 +766,8 @@ func (b *Builder) checkContainsLocking(mainContainsLocking bool) {
// violated. Those queries are each wrapped in an ErrorIfRows operator, which
// will throw an appropriate error in case the inner query returns any rows.
func (b *Builder) buildUniqueChecks(checks memo.UniqueChecksExpr) error {
defer b.checkContainsLocking(b.ContainsNonDefaultKeyLocking)
b.ContainsNonDefaultKeyLocking = false
defer b.checkContainsLocking(b.flags.IsSet(exec.PlanFlagContainsLocking))
b.flags.Unset(exec.PlanFlagContainsLocking)
md := b.mem.Metadata()
for i := range checks {
c := &checks[i]
Expand Down Expand Up @@ -794,8 +798,8 @@ func (b *Builder) buildUniqueChecks(checks memo.UniqueChecksExpr) error {
}

func (b *Builder) buildFKChecks(checks memo.FKChecksExpr) error {
defer b.checkContainsLocking(b.ContainsNonDefaultKeyLocking)
b.ContainsNonDefaultKeyLocking = false
defer b.checkContainsLocking(b.flags.IsSet(exec.PlanFlagContainsLocking))
b.flags.Unset(exec.PlanFlagContainsLocking)
md := b.mem.Metadata()
for i := range checks {
c := &checks[i]
Expand Down
30 changes: 19 additions & 11 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {
if opt.IsDDLOp(e) {
// Mark the statement as containing DDL for use
// in the SQL executor.
b.IsDDL = true
b.flags.Set(exec.PlanFlagIsDDL)
}

if opt.IsMutationOp(e) {
b.ContainsMutation = true
b.flags.Set(exec.PlanFlagContainsMutation)
// Raise error if mutation op is part of a read-only transaction.
if b.evalCtx.TxnReadOnly {
return execPlan{}, pgerror.Newf(pgcode.ReadOnlySQLTransaction,
Expand Down Expand Up @@ -620,7 +620,8 @@ func (b *Builder) scanParams(
err = pgerror.Newf(pgcode.WrongObjectType,
"index \"%s\" cannot be used for this query", idx.Name())
if b.evalCtx.SessionData().DisallowFullTableScans &&
(b.ContainsLargeFullTableScan || b.ContainsLargeFullIndexScan) {
(b.flags.IsSet(exec.PlanFlagContainsLargeFullTableScan) ||
b.flags.IsSet(exec.PlanFlagContainsLargeFullIndexScan)) {
err = errors.WithHint(err,
"try overriding the `disallow_full_table_scans` or increasing the `large_full_scan_rows` cluster/session settings",
)
Expand Down Expand Up @@ -773,11 +774,15 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) {
if !tab.IsVirtualTable() && isUnfiltered {
large := !stats.Available || stats.RowCount > b.evalCtx.SessionData().LargeFullScanRows
if scan.Index == cat.PrimaryIndex {
b.ContainsFullTableScan = true
b.ContainsLargeFullTableScan = b.ContainsLargeFullTableScan || large
b.flags.Set(exec.PlanFlagContainsFullTableScan)
if large {
b.flags.Set(exec.PlanFlagContainsLargeFullTableScan)
}
} else {
b.ContainsFullIndexScan = true
b.ContainsLargeFullIndexScan = b.ContainsLargeFullIndexScan || large
b.flags.Set(exec.PlanFlagContainsFullIndexScan)
if large {
b.flags.Set(exec.PlanFlagContainsLargeFullIndexScan)
}
}
if stats.Available && stats.RowCount > b.MaxFullScanRows {
b.MaxFullScanRows = stats.RowCount
Expand Down Expand Up @@ -2946,10 +2951,10 @@ func (b *Builder) buildLocking(locking opt.Locking) (opt.Locking, error) {
!b.evalCtx.SessionData().SharedLockingForSerializable) {
// Reset locking information as we've determined we're going to be
// performing a non-locking read.
return opt.Locking{}, nil // early return; do not set b.ContainsNonDefaultKeyLocking
return opt.Locking{}, nil // early return; do not set PlanFlagContainsLocking
}
}
b.ContainsNonDefaultKeyLocking = true
b.flags.Set(exec.PlanFlagContainsLocking)
}
return locking, nil
}
Expand Down Expand Up @@ -3056,7 +3061,10 @@ func (b *Builder) buildRecursiveCTE(rec *memo.RecursiveCTEExpr) (execPlan, error
return nil, err
}
rootRowCount := int64(rec.Recursive.Relational().Statistics().RowCountIfAvailable())
return innerBld.factory.ConstructPlan(plan.root, innerBld.subqueries, innerBld.cascades, innerBld.checks, rootRowCount)
return innerBld.factory.ConstructPlan(
plan.root, innerBld.subqueries, innerBld.cascades, innerBld.checks, rootRowCount,
innerBld.flags,
)
}

label := fmt.Sprintf("working buffer (%s)", rec.Name)
Expand Down Expand Up @@ -3171,7 +3179,7 @@ func (b *Builder) buildCall(c *memo.CallExpr) (execPlan, error) {

for _, s := range udf.Def.Body {
if s.Relational().CanMutate {
b.ContainsMutation = true
b.flags.Set(exec.PlanFlagContainsMutation)
break
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ func (b *Builder) buildSubquery(
}
plan, err := b.factory.ConstructPlan(
ePlan.root, nil /* subqueries */, nil /* cascades */, nil /* checks */, inputRowCount,
eb.flags,
)
if err != nil {
return err
Expand Down Expand Up @@ -947,7 +948,7 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ

for _, s := range udf.Def.Body {
if s.Relational().CanMutate {
b.ContainsMutation = true
b.flags.Set(exec.PlanFlagContainsMutation)
break
}
}
Expand Down

0 comments on commit 9a3b3c9

Please sign in to comment.