Skip to content

Commit

Permalink
sql: fix the collection of logical plan samples
Browse files Browse the repository at this point in the history
The logic was in the wrong place and did not properly account for the
optimizer mode and whether an execution error occured. This patch
fixes it.

To achieve this, the patch requires the ability to inspect the
planNode tree after the execution machinery finishes running the
query.

Unfortunately, there is not a single point in the execution code where
the following two properties hold at once:

- the execution has completed (so it is known whether an error has
  occurred or not).
- the planNode tree is still available for sampling.

This is because when the local exec code for a planNode tree is
started (`startExec`), it allocates memory in memory accounts set up
by the distSQL exec machinery. At the end of execution, it is thus
important to close the planNode tree before the exec machinery
accounts are closed. So the execution code currently has
co-"ownership" of closing the planNode tree.

Unfortunately it does not have full ownership of the plan close
because the executor still has to close the plan in case distSQL
PlanAndRun() is not called.

The proper way forward would be to remove local execution so that there are no
planNode memory accounts any more, and so that the responsibility to
close the plan is fully transferred back to the executor code.

Unfortunately this is a major undertaking and we still want to get a
fix in CockroachDB 2.1.

So instead this patch makes `planTop.close()` responsible for sampling
the logical plan. This "works" because that is the method guaranteed
to be called in all execution paths.

To achieve this planTop.close() calls into a `maybeSavePlan()`
callsback set up by the executor code. This in turn needs the plan
flags (optimizer used, distsql used) and the last error, which are now
also stored in `planTop`.

The rest of the commit is plumbing, including avoiding passing
`sql.Statement` by-value throughout calls (it's a big struct!)

Release note: None
  • Loading branch information
knz committed Jan 28, 2019
1 parent 67af366 commit 6ae4881
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 117 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/app_stats.go
Expand Up @@ -111,7 +111,7 @@ const saveFingerprintPlanOnceEvery = 1000
//
// samplePlanDescription can be nil, as these are only sampled periodically per unique fingerprint.
func (a *appStats) recordStatement(
stmt Statement,
stmt *Statement,
samplePlanDescription *roachpb.ExplainTreePlanNode,
distSQLUsed bool,
optUsed bool,
Expand Down Expand Up @@ -157,7 +157,7 @@ func (a *appStats) recordStatement(

// getStatsForStmt retrieves the per-stmt stat object.
func (a *appStats) getStatsForStmt(
stmt Statement, distSQLUsed bool, optimizerUsed bool, err error, createIfNonexistent bool,
stmt *Statement, distSQLUsed bool, optimizerUsed bool, err error, createIfNonexistent bool,
) *stmtStats {
// Extend the statement key with various characteristics, so
// that we use separate buckets for the different situations.
Expand Down Expand Up @@ -185,7 +185,7 @@ func (a *appStats) getStatsForStmtWithKey(key stmtKey, createIfNonexistent bool)
return s
}

func anonymizeStmt(stmt Statement) string {
func anonymizeStmt(stmt *Statement) string {
return tree.AsStringWithFlags(stmt.AST, tree.FmtHideConstants)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Expand Up @@ -1994,7 +1994,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
//
// If an error is returned, it is to be considered a query execution error.
func (ex *connExecutor) initStatementResult(
ctx context.Context, res RestrictedCommandResult, stmt Statement, cols sqlbase.ResultColumns,
ctx context.Context, res RestrictedCommandResult, stmt *Statement, cols sqlbase.ResultColumns,
) error {
for _, c := range cols {
if err := checkResultType(c.Typ); err != nil {
Expand Down
144 changes: 88 additions & 56 deletions pkg/sql/conn_executor_exec.go
Expand Up @@ -420,7 +420,7 @@ func (ex *connExecutor) execStmtInOpenState(
defer constantMemAcc.Close(ctx)

if runInParallel {
cols, err := ex.execStmtInParallel(ctx, stmt, p, queryDone)
cols, err := ex.execStmtInParallel(ctx, p, queryDone)
queryDone = nil
if err != nil {
return makeErrEvent(err)
Expand All @@ -429,12 +429,12 @@ func (ex *connExecutor) execStmtInOpenState(
// statement's result type:
// - tree.Rows -> an empty set of rows
// - tree.RowsAffected -> zero rows affected
if err := ex.initStatementResult(ctx, res, stmt, cols); err != nil {
if err := ex.initStatementResult(ctx, res, p.stmt, cols); err != nil {
return makeErrEvent(err)
}
} else {
p.autoCommit = os.ImplicitTxn.Get() && !ex.server.cfg.TestingKnobs.DisableAutoCommit
if err := ex.dispatchToExecutionEngine(ctx, stmt, p, res); err != nil {
if err := ex.dispatchToExecutionEngine(ctx, p, res); err != nil {
return nil, nil, err
}
if err := res.Err(); err != nil {
Expand Down Expand Up @@ -651,20 +651,33 @@ func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event,
// Args:
// queryDone: A cleanup function to be called when the execution is done.
func (ex *connExecutor) execStmtInParallel(
ctx context.Context,
stmt Statement,
planner *planner,
queryDone func(context.Context, RestrictedCommandResult),
ctx context.Context, planner *planner, queryDone func(context.Context, RestrictedCommandResult),
) (sqlbase.ResultColumns, error) {
params := runParams{
ctx: ctx,
extendedEvalCtx: planner.ExtendedEvalContext(),
p: planner,
}

stmt := planner.stmt
ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag())
err := planner.makePlan(ctx, stmt)
planner.statsCollector.PhaseTimes()[plannerStartLogicalPlan] = timeutil.Now()

err := planner.makePlan(ctx)
// Ensure that the plan is collected just before closing.
if sampleLogicalPlans.Get(&ex.appStats.st.SV) {
// Note: if sampleLogicalPlans is false,
// planner.curPlan.maybeSavePlan remains nil (because makePlan has
// cleared curPlan at this point) and plan collection will not
// happen.
planner.curPlan.maybeSavePlan = func(ctx context.Context) *roachpb.ExplainTreePlanNode {
return ex.maybeSavePlan(ctx, planner)
}
}

planner.statsCollector.PhaseTimes()[plannerEndLogicalPlan] = timeutil.Now()
ex.sessionTracing.TracePlanEnd(ctx, err)

if err != nil {
planner.maybeLogStatement(ctx, "par-prepare" /* lbl */, 0 /* rows */, err)
return nil, err
Expand Down Expand Up @@ -714,23 +727,33 @@ func (ex *connExecutor) execStmtInParallel(

planner.statsCollector.PhaseTimes()[plannerStartExecStmt] = timeutil.Now()

samplePlanDescription := ex.sampleLogicalPlanDescription(
stmt, false /* optimizerUsed */, planner)
// We need to set the "exec done" flag early because
// curPlan.close(), which will need to observe it, may be closed
// during execution (distsqlrun.PlanAndRun).
//
// TODO(knz): This is a mis-design. Andrei says "it's OK if
// execution closes the plan" but it transfers responsibility to
// run any "finalizers" on the plan (including plan sampling for
// stats) to the execution engine. That's a lot of responsibility
// to transfer! It would be better if this responsibility remained
// around here.
planner.curPlan.flags.Set(planFlagExecDone)

var flags planFlags
if distributePlan {
flags.Set(planFlagDistributed)
planner.curPlan.flags.Set(planFlagDistributed)
} else {
flags.Set(planFlagDistSQLLocal)
planner.curPlan.flags.Set(planFlagDistSQLLocal)
}
ex.sessionTracing.TraceExecStart(ctx, "parallel")
err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res, distributePlan)
ex.sessionTracing.TraceExecEnd(ctx, res.Err(), res.RowsAffected())
planner.statsCollector.PhaseTimes()[plannerEndExecStmt] = timeutil.Now()

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
ex.recordStatementSummary(
planner, stmt, samplePlanDescription, flags, ex.extraTxnState.autoRetryCounter,
res.RowsAffected(), err,
ctx, planner, ex.extraTxnState.autoRetryCounter,
res.RowsAffected(), res.Err(),
)
if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
Expand Down Expand Up @@ -792,14 +815,24 @@ func enhanceErrWithCorrelation(err error, isCorrelated bool) {
// expected that the caller will inspect res and react to query errors by
// producing an appropriate state machine event.
func (ex *connExecutor) dispatchToExecutionEngine(
ctx context.Context, stmt Statement, planner *planner, res RestrictedCommandResult,
ctx context.Context, planner *planner, res RestrictedCommandResult,
) error {
stmt := planner.stmt
ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag())
planner.statsCollector.PhaseTimes()[plannerStartLogicalPlan] = timeutil.Now()

flags, err := ex.makeExecPlan(ctx, stmt, planner)
err := ex.makeExecPlan(ctx, planner)
// We'll be closing the plan manually below after execution; this
// defer is a catch-all in case some other return path is taken.
defer planner.curPlan.close(ctx)

// Ensure that the plan is collected just before closing.
if sampleLogicalPlans.Get(&ex.appStats.st.SV) {
planner.curPlan.maybeSavePlan = func(ctx context.Context) *roachpb.ExplainTreePlanNode {
return ex.maybeSavePlan(ctx, planner)
}
}

defer func() { planner.maybeLogStatement(ctx, "exec", res.RowsAffected(), res.Err()) }()

planner.statsCollector.PhaseTimes()[plannerEndLogicalPlan] = timeutil.Now()
Expand Down Expand Up @@ -845,99 +878,98 @@ func (ex *connExecutor) dispatchToExecutionEngine(
queryMeta.isDistributed = distributePlan
ex.mu.Unlock()

samplePlanDescription := ex.sampleLogicalPlanDescription(stmt, flags.IsSet(planFlagOptUsed), planner)
// We need to set the "exec done" flag early because
// curPlan.close(), which will need to observe it, may be closed
// during execution (distsqlrun.PlanAndRun).
//
// TODO(knz): This is a mis-design. Andrei says "it's OK if
// execution closes the plan" but it transfers responsibility to
// run any "finalizers" on the plan (including plan sampling for
// stats) to the execution engine. That's a lot of responsibility
// to transfer! It would be better if this responsibility remained
// around here.
planner.curPlan.flags.Set(planFlagExecDone)

if distributePlan {
flags.Set(planFlagDistributed)
planner.curPlan.flags.Set(planFlagDistributed)
} else {
flags.Set(planFlagDistSQLLocal)
planner.curPlan.flags.Set(planFlagDistSQLLocal)
}
ex.sessionTracing.TraceExecStart(ctx, "distributed")
err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res, distributePlan)
ex.sessionTracing.TraceExecEnd(ctx, res.Err(), res.RowsAffected())
planner.statsCollector.PhaseTimes()[plannerEndExecStmt] = timeutil.Now()
if err != nil {
return err
}

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
ex.recordStatementSummary(
planner, stmt, samplePlanDescription, flags,
ctx, planner,
ex.extraTxnState.autoRetryCounter, res.RowsAffected(), res.Err(),
)
if ex.server.cfg.TestingKnobs.AfterExecute != nil {
ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err())
}

return nil
return err
}

// makeExecPlan creates an execution plan and populates planner.curPlan, using
// either the optimizer or the heuristic planner.
func (ex *connExecutor) makeExecPlan(
ctx context.Context, stmt Statement, planner *planner,
) (planFlags, error) {
func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error {
stmt := planner.stmt
// Initialize planner.curPlan.AST early; it might be used by maybeLogStatement
// in error cases.
planner.curPlan = planTop{AST: stmt.AST}

var flags planFlags
var isCorrelated bool
if optMode := ex.sessionData.OptimizerMode; optMode != sessiondata.OptimizerOff {
log.VEvent(ctx, 2, "generating optimizer plan")
var result *planTop
var err error
result, flags, isCorrelated, err = planner.makeOptimizerPlan(ctx, stmt)
result, isCorrelated, err = planner.makeOptimizerPlan(ctx)
if err == nil {
planner.curPlan = *result
return flags, nil
return nil
}
log.VEventf(ctx, 1, "optimizer plan failed (isCorrelated=%t): %v", isCorrelated, err)
if !canFallbackFromOpt(err, optMode, stmt) {
return 0, err
return err
}
flags = planFlagOptFallback
planner.curPlan.flags.Set(planFlagOptFallback)
log.VEvent(ctx, 1, "optimizer falls back on heuristic planner")
} else {
log.VEvent(ctx, 2, "optimizer disabled")
}
// Use the heuristic planner.
err := planner.makePlan(ctx, stmt)
optFlags := planner.curPlan.flags
err := planner.makePlan(ctx)
planner.curPlan.flags |= optFlags
enhanceErrWithCorrelation(err, isCorrelated)
return flags, err
}

// sampleLogicalPlanDescription returns a serialized representation of a statement's logical plan.
// The returned ExplainTreePlanNode will be nil if plan should not be sampled.
func (ex *connExecutor) sampleLogicalPlanDescription(
stmt Statement, optimizerUsed bool, planner *planner,
) *roachpb.ExplainTreePlanNode {
if !sampleLogicalPlans.Get(&ex.appStats.st.SV) {
return nil
}

if ex.saveLogicalPlanDescription(stmt, optimizerUsed) {
return planToTree(context.Background(), planner.curPlan)
}
return nil
return err
}

// saveLogicalPlanDescription returns if we should save this as a sample logical plan
// saveLogicalPlanDescription returns whether we should save this as a sample logical plan
// for its corresponding fingerprint. We use `saveFingerprintPlanOnceEvery`
// to assess how frequently to sample logical plans.
func (ex *connExecutor) saveLogicalPlanDescription(stmt Statement, optimizerUsed bool) bool {
stats := ex.appStats.getStatsForStmt(stmt, true /* distSQLUsed */, optimizerUsed, nil, false /* createIfNonexistent */)
func (ex *connExecutor) saveLogicalPlanDescription(
stmt *Statement, useDistSQL bool, optimizerUsed bool, err error,
) bool {
stats := ex.appStats.getStatsForStmt(
stmt, useDistSQL, optimizerUsed, err, false /* createIfNonexistent */)
if stats == nil {
// Save logical plan the first time we see new statement fingerprint.
return true
}
stats.Lock()
defer stats.Unlock()
count := stats.data.Count
stats.Unlock()

return stats.data.Count%saveFingerprintPlanOnceEvery == 0
return count%saveFingerprintPlanOnceEvery == 0
}

// canFallbackFromOpt returns whether we can fallback on the heuristic planner
// when the optimizer hits an error.
func canFallbackFromOpt(err error, optMode sessiondata.OptimizerMode, stmt Statement) bool {
func canFallbackFromOpt(err error, optMode sessiondata.OptimizerMode, stmt *Statement) bool {
pgerr, ok := err.(*pgerror.Error)
if !ok || pgerr.Code != pgerror.CodeFeatureNotSupportedError {
// We only fallback on "feature not supported" errors.
Expand Down
26 changes: 15 additions & 11 deletions pkg/sql/conn_executor_prepare.go
Expand Up @@ -142,7 +142,7 @@ func (ex *connExecutor) prepare(
return prepared, nil
}
prepared.Statement = stmt.Statement
prepared.AnonymizedStr = anonymizeStmt(stmt)
prepared.AnonymizedStr = anonymizeStmt(&stmt)

// Point to the prepared state, which can be further populated during query
// preparation.
Expand All @@ -160,7 +160,8 @@ func (ex *connExecutor) prepare(

p := &ex.planner
ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTimestamp */)
flags, err := ex.populatePrepared(ctx, txn, stmt, placeholderHints, p)
p.stmt = &stmt
flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p)
if err != nil {
txn.CleanupOnError(ctx, err)
return nil, err
Expand All @@ -180,12 +181,9 @@ func (ex *connExecutor) prepare(
// populatePrepared analyzes and type-checks the query and populates
// stmt.Prepared.
func (ex *connExecutor) populatePrepared(
ctx context.Context,
txn *client.Txn,
stmt Statement,
placeholderHints tree.PlaceholderTypes,
p *planner,
ctx context.Context, txn *client.Txn, placeholderHints tree.PlaceholderTypes, p *planner,
) (planFlags, error) {
stmt := p.stmt
if err := p.semaCtx.Placeholders.Init(stmt.NumPlaceholders, placeholderHints); err != nil {
return 0, err
}
Expand Down Expand Up @@ -221,7 +219,7 @@ func (ex *connExecutor) populatePrepared(
if optMode := ex.sessionData.OptimizerMode; optMode != sessiondata.OptimizerOff {
log.VEvent(ctx, 2, "preparing using optimizer")
var err error
flags, isCorrelated, err = p.prepareUsingOptimizer(ctx, stmt)
flags, isCorrelated, err = p.prepareUsingOptimizer(ctx)
if err == nil {
log.VEvent(ctx, 2, "optimizer prepare succeeded")
// stmt.Prepared fields have been populated.
Expand All @@ -231,7 +229,7 @@ func (ex *connExecutor) populatePrepared(
if !canFallbackFromOpt(err, optMode, stmt) {
return 0, err
}
flags = planFlagOptFallback
flags.Set(planFlagOptFallback)
log.VEvent(ctx, 1, "prepare falls back on heuristic planner")
} else {
log.VEvent(ctx, 2, "optimizer disabled (prepare)")
Expand All @@ -247,7 +245,12 @@ func (ex *connExecutor) populatePrepared(

if p.curPlan.plan == nil {
// Statement with no result columns and no support for placeholders.
return flags, nil
//
// Note: we're combining `flags` which comes from
// `prepareUsingOptimizer`, with `p.curPlan.flags` which ensures
// the new flags combine with the existing flags (this is used
// e.g. to maintain the count of times the optimizer was used).
return flags | p.curPlan.flags, nil
}
defer p.curPlan.close(ctx)

Expand All @@ -262,7 +265,8 @@ func (ex *connExecutor) populatePrepared(
return 0, err
}
prepared.Types = p.semaCtx.Placeholders.Types
return flags, nil
// The flags are combined, see the comment above for why.
return flags | p.curPlan.flags, nil
}

func (ex *connExecutor) execBind(
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running.go
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -255,6 +255,7 @@ func (dsp *DistSQLPlanner) Run(
// We need to close the planNode tree we translated into a DistSQL plan before
// flow.Cleanup, which closes memory accounts that expect to be emptied.
if planCtx.planner != nil && !planCtx.ignoreClose {
planCtx.planner.curPlan.execErr = recv.resultWriter.Err()
planCtx.planner.curPlan.close(ctx)
}
flow.Cleanup(ctx)
Expand Down

0 comments on commit 6ae4881

Please sign in to comment.