Skip to content

Commit

Permalink
sql: squash add and drop mutations in same transaction
Browse files Browse the repository at this point in the history
Schema elements in an adding state can be dropped if the drop happens in
the same transaction as the one which initiated the add because
backfilling has not started yet. This change uses the cluster version of
the mutable table descriptor to squash these new mutations.

Related to #16020.

Release note: None
  • Loading branch information
Erik Trinh committed Nov 1, 2018
1 parent 1ef85f0 commit d30b4a8
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 6 deletions.
15 changes: 14 additions & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (n *alterTableNode) startExec(params runParams) error {
// alterations that don't require a backfill) or add a mutation to
// the list.
descriptorChanged := false
addedMutations := false
origNumMutations := len(n.tableDesc.Mutations)
var droppedViews []string
tn := &n.n.Table
Expand Down Expand Up @@ -407,6 +408,18 @@ func (n *alterTableNode) startExec(params runParams) error {
break
}
}
if !found {
for _, m := range n.tableDesc.Mutations[len(n.tableDesc.ClusterVersion.Mutations):] {
if mutCol := m.GetColumn(); mutCol != nil && mutCol.ID == col.ID && m.Direction == sqlbase.DescriptorMutation_ADD {
// Referencing index drops could have been squashed, set this to true
// because len(table.Mutations) can be lower than origNumMutations.
addedMutations = true
n.tableDesc.AddColumnMutation(*mutCol, sqlbase.DescriptorMutation_DROP)
found = true
break
}
}
}
if !found {
return fmt.Errorf("column %q in the middle of being added, try again later", t.Column)
}
Expand Down Expand Up @@ -577,7 +590,7 @@ func (n *alterTableNode) startExec(params runParams) error {
// dummy mutations. Most tests trigger errors above
// this line, but tests that run redundant operations like dropping
// a column when it's already dropped will hit this condition and exit.
addedMutations := len(n.tableDesc.Mutations) > origNumMutations
addedMutations = addedMutations || len(n.tableDesc.Mutations) > origNumMutations
if !addedMutations && !descriptorChanged {
return nil
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/sql/drop_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,21 @@ func (p *planner) dropIndexByName(
break
}
}
if !found {
for _, m := range tableDesc.Mutations[len(tableDesc.ClusterVersion.Mutations):] {
if mutIdx := m.GetIndex(); mutIdx != nil && mutIdx.ID == idx.ID && m.Direction == sqlbase.DescriptorMutation_ADD {
if err := tableDesc.AddIndexMutation(*mutIdx, sqlbase.DescriptorMutation_DROP); err != nil {
return err
}
found = true
break
}
}
}
if !found {
return fmt.Errorf("index %q in the middle of being added, try again later", idxName)
}

if err := tableDesc.Validate(ctx, p.txn, p.EvalContext().Settings); err != nil {
return err
}
mutationID, err := p.createSchemaChangeJob(ctx, tableDesc, jobDesc)
if err != nil {
return err
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/schema_change_in_txn
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,41 @@ INSERT INTO customers (k) VALUES ('b')

statement ok
COMMIT;

subtest squash_add_drop

statement ok
BEGIN

statement ok
ALTER TABLE forlater ADD COLUMN i INT

statement ok
CREATE INDEX i_idx ON forlater (i)

statement ok
ALTER TABLE forlater DROP COLUMN i

statement ok
ALTER TABLE forlater ADD COLUMN i CHAR

statement ok
CREATE INDEX i_idx ON forlater (i)

statement ok
CREATE INDEX v_idx ON forlater (v)

statement ok
DROP INDEX forlater@i_idx

statement ok
ALTER INDEX forlater@v_idx RENAME TO i_idx

statement ok
ALTER TABLE forlater DROP COLUMN i

statement ok
ALTER TABLE forlater RENAME COLUMN v TO i

statement ok
COMMIT
2 changes: 0 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,6 @@ dist sender send querying next range at /Table/3/1/53/2/1
dist sender send r1: sending batch 1 Get to (n1,s1):1
dist sender send querying next range at /Table/5/1/0/2/1
dist sender send r1: sending batch 1 Get to (n1,s1):1
dist sender send querying next range at /Table/3/1/53/2/1
dist sender send r1: sending batch 1 Get to (n1,s1):1
sql txn Put /Table/3/1/54/2/1 -> table:<name:"kv" id:54 parent_id:53 version:5 modification_time:<wall_time:... > columns:<name:"k" id:1 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:false hidden:false > columns:<name:"v" id:2 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:true hidden:false > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"primary" id:1 unique:true column_names:"k" column_directions:ASC column_ids:1 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION > interleave:<> partitioning:<num_columns:0 > type:FORWARD > next_index_id:3 privileges:<users:<user:"admin" privileges:2 > users:<user:"root" privileges:2 > > mutations:<index:<name:"woo" id:2 unique:true column_names:"v" column_directions:ASC column_ids:2 extra_column_ids:1 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION > interleave:<> partitioning:<num_columns:0 > type:FORWARD > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC view_query:"" mutationJobs:<...> drop_time:0 replacement_of:<id:0 time:<> > audit_mode:DISABLED drop_job_id:0 >
dist sender send querying next range at /Table/3/1/54/2/1
dist sender send r1: sending batch 1 Put to (n1,s1):1
Expand Down
89 changes: 89 additions & 0 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,91 @@ func (p *planner) writeTableDesc(
return p.txn.Run(ctx, b)
}

// squashNewMutations rewrites the mutations list, removing
// mutations where schema elements are added and dropped in
// the same transaction, and marking the mutation jobs as
// succeeded.
func (p *planner) squashNewMutations(
ctx context.Context, desc *sqlbase.MutableTableDescriptor,
) error {
if len(desc.ClusterVersion.Mutations) == len(desc.Mutations) {
return nil
}

origLen := len(desc.ClusterVersion.Mutations)
addDroppedColumns := make(map[sqlbase.ColumnID]bool)
addDroppedIndexes := make(map[sqlbase.IndexID]bool)
for _, m := range desc.Mutations[origLen:] {
switch t := m.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
switch m.Direction {
case sqlbase.DescriptorMutation_ADD:
addDroppedColumns[t.Column.ID] = false
case sqlbase.DescriptorMutation_DROP:
if _, ok := addDroppedColumns[t.Column.ID]; ok {
addDroppedColumns[t.Column.ID] = true
}
}
case *sqlbase.DescriptorMutation_Index:
switch m.Direction {
case sqlbase.DescriptorMutation_ADD:
addDroppedIndexes[t.Index.ID] = false
case sqlbase.DescriptorMutation_DROP:
if _, ok := addDroppedIndexes[t.Index.ID]; ok {
addDroppedIndexes[t.Index.ID] = true
}
}
}
}

var newMutations []sqlbase.DescriptorMutation
var squashedIDs []sqlbase.MutationID

for _, m := range desc.Mutations[origLen:] {
var squashed bool
switch t := m.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
squashed = addDroppedColumns[t.Column.ID]
case *sqlbase.DescriptorMutation_Index:
squashed = addDroppedIndexes[t.Index.ID]
}
if squashed {
squashedIDs = append(squashedIDs, m.MutationID)
// Perform any cleanup after drop is succeeded.
if m.Direction == sqlbase.DescriptorMutation_DROP {
if err := desc.MakeMutationComplete(m); err != nil {
return err
}
}
} else {
newMutations = append(newMutations, m)
}
}

var prev sqlbase.MutationID
for _, id := range squashedIDs {
if prev != id {
prev = id
jobID, err := getJobIDForMutationWithDescriptor(ctx, desc.TableDesc(), id)
if err != nil {
return err
}

job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
return err
}

if err := job.WithTxn(p.txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err, "failed to mark job %d as as successful", jobID)
}
}
}

desc.Mutations = append(desc.ClusterVersion.Mutations, newMutations...)
return nil
}

func (p *planner) writeTableDescToBatch(
ctx context.Context,
tableDesc *sqlbase.MutableTableDescriptor,
Expand Down Expand Up @@ -811,6 +896,10 @@ func (p *planner) writeTableDescToBatch(
p.queueSchemaChange(tableDesc.TableDesc(), mutationID)
}

if err := p.squashNewMutations(ctx, tableDesc); err != nil {
return err
}

if err := tableDesc.ValidateTable(p.extendedEvalCtx.Settings); err != nil {
return pgerror.NewAssertionErrorf("table descriptor is not valid: %s\n%v", err, tableDesc)
}
Expand Down

0 comments on commit d30b4a8

Please sign in to comment.