Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/schemachanger: plumb context, check for cancelation sometimes #88471

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cli/declarative_corpus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ a given corpus file.
return jobID
},
}
_, err := scplan.MakePlan(*state, params)
_, err := scplan.MakePlan(cmd.Context(), *state, params)
if err != nil {
fmt.Printf("failed to validate %s with error %v\n", name, err)
} else {
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/explain_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func (n *explainDDLNode) startExec(params runParams) error {
return explainNotPossibleError
}
}
return n.setExplainValues(scNode.plannedState)
return n.setExplainValues(params.ctx, scNode.plannedState)
}

func (n *explainDDLNode) setExplainValues(scState scpb.CurrentState) (err error) {
func (n *explainDDLNode) setExplainValues(
ctx context.Context, scState scpb.CurrentState,
) (err error) {
defer func() {
err = errors.WithAssertionFailure(err)
}()
var p scplan.Plan
p, err = scplan.MakePlan(scState, scplan.Params{
p, err = scplan.MakePlan(ctx, scState, scplan.Params{
ExecutionPhase: scop.StatementPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 },
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/schemachanger/corpus/corpus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package corpus_test

import (
"context"
"flag"
"testing"

Expand Down Expand Up @@ -40,7 +41,7 @@ func TestValidateCorpuses(t *testing.T) {
jobID := jobspb.InvalidJobID
name, state := reader.GetCorpus(corpusIdx)
t.Run(name, func(t *testing.T) {
_, err := scplan.MakePlan(*state, scplan.Params{
_, err := scplan.MakePlan(context.Background(), *state, scplan.Params{
ExecutionPhase: scop.LatestPhase,
InRollback: state.InRollback,
SchemaChangerJobIDSupplier: func() jobspb.JobID {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func ProtoDiff(a, b protoutil.Message, args DiffArgs, rewrites func(interface{})

// MakePlan is a convenient alternative to calling scplan.MakePlan in tests.
func MakePlan(t *testing.T, state scpb.CurrentState, phase scop.Phase) scplan.Plan {
plan, err := scplan.MakePlan(state, scplan.Params{
plan, err := scplan.MakePlan(context.Background(), state, scplan.Params{
ExecutionPhase: phase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 },
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,19 @@ func IterateTransitions(

// BuildGraph constructs a graph with operation edges populated from an initial
// state.
func BuildGraph(cs scpb.CurrentState) (*scgraph.Graph, error) {
return opRegistry.buildGraph(cs)
func BuildGraph(ctx context.Context, cs scpb.CurrentState) (*scgraph.Graph, error) {
return opRegistry.buildGraph(ctx, cs)
}

func (r *registry) buildGraph(cs scpb.CurrentState) (_ *scgraph.Graph, err error) {
func (r *registry) buildGraph(
ctx context.Context, cs scpb.CurrentState,
) (_ *scgraph.Graph, err error) {
start := timeutil.Now()
defer func() {
if err != nil || !log.V(2) {
if err != nil || !log.ExpensiveLogEnabled(ctx, 2) {
return
}
log.Infof(context.TODO(), "operation graph generation took %v", timeutil.Since(start))
log.Infof(ctx, "operation graph generation took %v", timeutil.Since(start))
}()
g, err := scgraph.New(cs)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/schemachanger/scplan/internal/rules/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// ApplyDepRules adds dependency edges to the graph according to the
// registered dependency rules.
func ApplyDepRules(g *scgraph.Graph) error {
func ApplyDepRules(ctx context.Context, g *scgraph.Graph) error {
for _, dr := range registry.depRules {
start := timeutil.Now()
var added int
Expand All @@ -41,9 +41,15 @@ func ApplyDepRules(g *scgraph.Graph) error {
}); err != nil {
return errors.Wrapf(err, "applying dep rule %s", dr.name)
}
if log.V(2) {
// Applying the dep rules can be slow in some cases. Check for
// cancellation when applying the rules to ensure we don't spin for
// too long while the user is waiting for the task to exit cleanly.
if ctx.Err() != nil {
return ctx.Err()
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(
context.TODO(), "applying dep rule %s %d took %v",
ctx, "applying dep rule %s %d took %v",
dr.name, added, timeutil.Since(start),
)
}
Expand All @@ -53,7 +59,7 @@ func ApplyDepRules(g *scgraph.Graph) error {

// ApplyOpRules marks op edges as no-op in a shallow copy of the graph according
// to the registered rules.
func ApplyOpRules(g *scgraph.Graph) (*scgraph.Graph, error) {
func ApplyOpRules(ctx context.Context, g *scgraph.Graph) (*scgraph.Graph, error) {
db := g.Database()
m := make(map[*screl.Node][]scgraph.RuleName)
for _, rule := range registry.opRules {
Expand All @@ -68,9 +74,9 @@ func ApplyOpRules(g *scgraph.Graph) (*scgraph.Graph, error) {
if err != nil {
return nil, errors.Wrapf(err, "applying op rule %s", rule.name)
}
if log.V(2) {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(
context.TODO(), "applying op rule %s %d took %v",
ctx, "applying op rule %s %d took %v",
rule.name, added, timeutil.Since(start),
)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/schemachanger/scplan/internal/scstage/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package scstage

import (
"context"
"fmt"
"sort"
"strings"
Expand All @@ -31,7 +32,11 @@ import (
// Note that the scJobIDSupplier function is idempotent, and must return the
// same value for all calls.
func BuildStages(
init scpb.CurrentState, phase scop.Phase, g *scgraph.Graph, scJobIDSupplier func() jobspb.JobID,
ctx context.Context,
init scpb.CurrentState,
phase scop.Phase,
g *scgraph.Graph,
scJobIDSupplier func() jobspb.JobID,
) []Stage {
c := buildContext{
rollback: init.InRollback,
Expand Down
28 changes: 14 additions & 14 deletions pkg/sql/schemachanger/scplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ func (p Plan) StagesForCurrentPhase() []scstage.Stage {

// MakePlan generates a Plan for a particular phase of a schema change, given
// the initial state for a set of targets. Returns an error when planning fails.
func MakePlan(initial scpb.CurrentState, params Params) (p Plan, err error) {
func MakePlan(ctx context.Context, initial scpb.CurrentState, params Params) (p Plan, err error) {
p = Plan{
CurrentState: initial,
Params: params,
}
err = makePlan(&p)
if err != nil {
err = makePlan(ctx, &p)
if err != nil && ctx.Err() == nil {
err = p.DecorateErrorWithPlanDetails(err)
}
return p, err
}

func makePlan(p *Plan) (err error) {
func makePlan(ctx context.Context, p *Plan) (err error) {
defer func() {
if r := recover(); r != nil {
rAsErr, ok := r.(error)
Expand All @@ -99,18 +99,18 @@ func makePlan(p *Plan) (err error) {
}()
{
start := timeutil.Now()
p.Graph = buildGraph(p.CurrentState)
if log.V(2) {
log.Infof(context.TODO(), "graph generation took %v", timeutil.Since(start))
p.Graph = buildGraph(ctx, p.CurrentState)
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "graph generation took %v", timeutil.Since(start))
}
}
{
start := timeutil.Now()
p.Stages = scstage.BuildStages(
p.CurrentState, p.Params.ExecutionPhase, p.Graph, p.Params.SchemaChangerJobIDSupplier,
ctx, p.CurrentState, p.Params.ExecutionPhase, p.Graph, p.Params.SchemaChangerJobIDSupplier,
)
if log.V(2) {
log.Infof(context.TODO(), "stage generation took %v", timeutil.Since(start))
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "stage generation took %v", timeutil.Since(start))
}
}
if n := len(p.Stages); n > 0 && p.Stages[n-1].Phase > scop.PreCommitPhase {
Expand All @@ -123,20 +123,20 @@ func makePlan(p *Plan) (err error) {
return nil
}

func buildGraph(cs scpb.CurrentState) *scgraph.Graph {
g, err := opgen.BuildGraph(cs)
func buildGraph(ctx context.Context, cs scpb.CurrentState) *scgraph.Graph {
g, err := opgen.BuildGraph(ctx, cs)
if err != nil {
panic(errors.Wrapf(err, "build graph op edges"))
}
err = rules.ApplyDepRules(g)
err = rules.ApplyDepRules(ctx, g)
if err != nil {
panic(errors.Wrapf(err, "build graph dep edges"))
}
err = g.Validate()
if err != nil {
panic(errors.Wrapf(err, "validate graph"))
}
g, err = rules.ApplyOpRules(g)
g, err = rules.ApplyOpRules(ctx, g)
if err != nil {
panic(errors.Wrapf(err, "mark op edges as no-op"))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scrun/scrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func runTransactionPhase(
if len(state.Current) == 0 {
return scpb.CurrentState{}, jobspb.InvalidJobID, nil
}
sc, err := scplan.MakePlan(state, scplan.Params{
sc, err := scplan.MakePlan(ctx, state, scplan.Params{
ExecutionPhase: phase,
SchemaChangerJobIDSupplier: deps.TransactionalJobRegistry().SchemaChangerJobID,
})
Expand Down Expand Up @@ -112,7 +112,7 @@ func RunSchemaChangesInJob(
}
return errors.Wrapf(err, "failed to construct state for job %d", jobID)
}
sc, err := scplan.MakePlan(state, scplan.Params{
sc, err := scplan.MakePlan(ctx, state, scplan.Params{
ExecutionPhase: scop.PostCommitPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return jobID },
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/sctest/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func checkExplainDiagrams(
params.InRollback = true
params.ExecutionPhase = scop.PostCommitNonRevertiblePhase
}
pl, err := scplan.MakePlan(state, params)
pl, err := scplan.MakePlan(context.Background(), state, params)
require.NoErrorf(t, err, "%s: %s", fileNameSuffix, explainedStmt)
action(explainDir, "ddl", pl.ExplainCompact)
action(explainVerboseDir, "ddl, verbose", pl.ExplainVerbose)
Expand Down