Skip to content

Commit

Permalink
sql: rename *ContainsNonDefaultKeyLocking to *ContainsLocking
Browse files Browse the repository at this point in the history
"Default" locking here simply means no locking. I find this convention
of saying "non-default key locking" a little verbose and confusing.

Epic: None

Release note: None
  • Loading branch information
michae2 committed Feb 3, 2024
1 parent b541940 commit 26e6bad
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 42 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4889,7 +4889,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
if planner == nil ||
evalCtx.SessionData().Internal ||
planner.curPlan.flags.IsSet(planFlagContainsMutation) ||
planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking) {
planner.curPlan.flags.IsSet(planFlagContainsLocking) {
// Don't parallelize the scans if we have a local plan if
// - we don't have a planner which is the case when we are not on
// the main query path;
Expand All @@ -4900,7 +4900,7 @@ func (dsp *DistSQLPlanner) NewPlanningCtxWithOracle(
// any synchronization (see #116039);
// - the plan contains a mutation operation - we currently don't
// support any parallelism when mutations are present;
// - the plan uses non-default key locking strength (see #94290).
// - the plan uses locking (see #94290).
return planCtx
}
prohibitParallelization, hasScanNodeToParallelize := checkScanParallelizationIfLocal(ctx, &planner.curPlan.planComponents)
Expand Down
23 changes: 11 additions & 12 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,15 +740,14 @@ func (dsp *DistSQLPlanner) Run(
// during the flow setup, we need to populate leafInputState below,
// so we tell the localState that there is concurrency.

// At the moment, we disable the usage of the Streamer API for local
// plans when non-default key locking modes are requested on some of
// the processors. This is the case since the lock spans propagation
// doesn't happen for the leaf txns which can result in excessive
// contention for future reads (since the acquired locks are not
// cleaned up properly when the txn commits).
// TODO(yuzefovich): fix the propagation of the lock spans with the
// leaf txns and remove this check. See #94290.
containsNonDefaultLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsNonDefaultLocking)
// At the moment, we disable the usage of the Streamer API for local plans
// when locking is used by any of the processors. This is the case since
// the lock spans propagation doesn't happen for the leaf txns which can
// result in excessive contention for future reads (since the acquired
// locks are not cleaned up properly when the txn commits).
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
// txns and remove this check. See #94290.
containsLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)

// We also currently disable the usage of the Streamer API whenever
// we have a wrapped planNode. This is done to prevent scenarios
Expand All @@ -769,7 +768,7 @@ func (dsp *DistSQLPlanner) Run(
}
return false
}()
if !containsNonDefaultLocking && !mustUseRootTxn {
if !containsLocking && !mustUseRootTxn {
if evalCtx.SessionData().StreamerEnabled {
for _, proc := range plan.Processors {
if jr := proc.Spec.Core.JoinReader; jr != nil {
Expand Down Expand Up @@ -2072,7 +2071,7 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks(
return getDefaultSaveFlowsFunc(ctx, planner, planComponentTypePostquery)
}

checksContainLocking := planner.curPlan.flags.IsSet(planFlagCheckContainsNonDefaultLocking)
checksContainLocking := planner.curPlan.flags.IsSet(planFlagCheckContainsLocking)

// We treat plan.cascades as a queue.
for i := 0; i < len(plan.cascades); i++ {
Expand Down Expand Up @@ -2140,7 +2139,7 @@ func (dsp *DistSQLPlanner) PlanAndRunCascadesAndChecks(
// Collect any new checks.
if len(cp.checkPlans) > 0 {
plan.checkPlans = append(plan.checkPlans, cp.checkPlans...)
if cp.flags.IsSet(planFlagCheckContainsNonDefaultLocking) {
if cp.flags.IsSet(planFlagCheckContainsLocking) {
checksContainLocking = true
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func constructPlan(
if flags.IsSet(exec.PlanFlagContainsMutation) {
res.flags.Set(planFlagContainsMutation)
}
if flags.IsSet(exec.PlanFlagContainsNonDefaultKeyLocking) {
res.flags.Set(planFlagContainsNonDefaultLocking)
if flags.IsSet(exec.PlanFlagContainsLocking) {
res.flags.Set(planFlagContainsLocking)
}
if flags.IsSet(exec.PlanFlagCheckContainsNonDefaultKeyLocking) {
res.flags.Set(planFlagCheckContainsNonDefaultLocking)
if flags.IsSet(exec.PlanFlagCheckContainsLocking) {
res.flags.Set(planFlagCheckContainsLocking)
}

return res, nil
Expand Down
18 changes: 9 additions & 9 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,14 +748,14 @@ func mutationOutputColMap(mutation memo.RelExpr) opt.ColMap {
return colMap
}

// checkContainsLocking sets CheckContainsNonDefaultKeyLocking based on whether
// we found non-default locking while building a check query plan.
// checkContainsLocking sets PlanFlagCheckContainsLocking based on whether we
// found locking while building a check query plan.
func (b *Builder) checkContainsLocking(mainContainsLocking bool) {
if b.flags.IsSet(exec.PlanFlagContainsNonDefaultKeyLocking) {
b.flags.Set(exec.PlanFlagCheckContainsNonDefaultKeyLocking)
if b.flags.IsSet(exec.PlanFlagContainsLocking) {
b.flags.Set(exec.PlanFlagCheckContainsLocking)
}
if mainContainsLocking {
b.flags.Set(exec.PlanFlagContainsNonDefaultKeyLocking)
b.flags.Set(exec.PlanFlagContainsLocking)
}
}

Expand All @@ -766,8 +766,8 @@ func (b *Builder) checkContainsLocking(mainContainsLocking bool) {
// violated. Those queries are each wrapped in an ErrorIfRows operator, which
// will throw an appropriate error in case the inner query returns any rows.
func (b *Builder) buildUniqueChecks(checks memo.UniqueChecksExpr) error {
defer b.checkContainsLocking(b.flags.IsSet(exec.PlanFlagContainsNonDefaultKeyLocking))
b.flags.Unset(exec.PlanFlagContainsNonDefaultKeyLocking)
defer b.checkContainsLocking(b.flags.IsSet(exec.PlanFlagContainsLocking))
b.flags.Unset(exec.PlanFlagContainsLocking)
md := b.mem.Metadata()
for i := range checks {
c := &checks[i]
Expand Down Expand Up @@ -798,8 +798,8 @@ func (b *Builder) buildUniqueChecks(checks memo.UniqueChecksExpr) error {
}

func (b *Builder) buildFKChecks(checks memo.FKChecksExpr) error {
defer b.checkContainsLocking(b.flags.IsSet(exec.PlanFlagContainsNonDefaultKeyLocking))
b.flags.Unset(exec.PlanFlagContainsNonDefaultKeyLocking)
defer b.checkContainsLocking(b.flags.IsSet(exec.PlanFlagContainsLocking))
b.flags.Unset(exec.PlanFlagContainsLocking)
md := b.mem.Metadata()
for i := range checks {
c := &checks[i]
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -2951,10 +2951,10 @@ func (b *Builder) buildLocking(locking opt.Locking) (opt.Locking, error) {
!b.evalCtx.SessionData().SharedLockingForSerializable) {
// Reset locking information as we've determined we're going to be
// performing a non-locking read.
return opt.Locking{}, nil // early return; do not set PlanFlagContainsNonDefaultKeyLocking
return opt.Locking{}, nil // early return; do not set PlanFlagContainsLocking
}
}
b.flags.Set(exec.PlanFlagContainsNonDefaultKeyLocking)
b.flags.Set(exec.PlanFlagContainsLocking)
}
return locking, nil
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ const (
// PlanFlagContainsMutation is set if the whole plan contains any mutations.
PlanFlagContainsMutation

// PlanFlagContainsNonDefaultKeyLocking is set if at least one node in the
// plan uses non-default key locking strength.
PlanFlagContainsNonDefaultKeyLocking

// PlanFlagCheckContainsNonDefaultKeyLocking is set if at least one node in at
// least one check query plan uses non-default key locking strength.
PlanFlagCheckContainsNonDefaultKeyLocking
// PlanFlagContainsLocking is set if at least one node in the plan uses
// locking. (Examples of plans using locking include SELECT FOR UPDATE and
// SELECT FOR SHARE, UPDATE, UPSERT, and FK checks under read committed
// isolation.)
PlanFlagContainsLocking

// PlanFlagCheckContainsLocking is set if at least one node in at least one
// check plan uses locking. Typically this is set for plans with FK checks
// under read committed isolation.
PlanFlagCheckContainsLocking
)

func (pf PlanFlags) IsSet(flag PlanFlags) bool {
Expand Down
11 changes: 5 additions & 6 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,12 @@ const (
// planFlagContainsMutation is set if the plan has any mutations.
planFlagContainsMutation

// planFlagContainsNonDefaultLocking is set if the plan has a node with
// non-default key locking strength.
planFlagContainsNonDefaultLocking
// planFlagContainsLocking is set if the plan has a node with locking.
planFlagContainsLocking

// planFlagCheckContainsNonDefaultLocking is set if at least one check plan
// has a node with non-default key locking strength.
planFlagCheckContainsNonDefaultLocking
// planFlagCheckContainsLocking is set if at least one check plan has a node
// with locking.
planFlagCheckContainsLocking

// planFlagSessionMigration is set if the plan is being created during
// a session migration.
Expand Down

0 comments on commit 26e6bad

Please sign in to comment.