Skip to content

Commit

Permalink
sql: squash add and drop mutations in same transaction
Browse files Browse the repository at this point in the history
Schema elements in an adding state can be dropped if the drop happens in
the same transaction as the one which initiated the add because
backfilling has not started yet. This change uses the cluster version of
the mutable table descriptor to squash these new mutations.

Related to #16020.

Release note: None
  • Loading branch information
Erik Trinh committed Oct 31, 2018
1 parent 1ef85f0 commit 240b219
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 5 deletions.
9 changes: 9 additions & 0 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ func (n *alterTableNode) startExec(params runParams) error {
break
}
}
if !found {
for _, m := range n.tableDesc.Mutations[len(n.tableDesc.ClusterVersion.Mutations):] {
if mutCol := m.GetColumn(); mutCol != nil && mutCol.ID == col.ID && m.Direction == sqlbase.DescriptorMutation_ADD {
n.tableDesc.AddColumnMutation(*mutCol, sqlbase.DescriptorMutation_DROP)
found = true
break
}
}
}
if !found {
return fmt.Errorf("column %q in the middle of being added, try again later", t.Column)
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/sql/drop_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,21 @@ func (p *planner) dropIndexByName(
break
}
}
if !found {
for _, m := range tableDesc.Mutations[len(tableDesc.ClusterVersion.Mutations):] {
if mutIdx := m.GetIndex(); mutIdx != nil && mutIdx.ID == idx.ID && m.Direction == sqlbase.DescriptorMutation_ADD {
if err := tableDesc.AddIndexMutation(*mutIdx, sqlbase.DescriptorMutation_DROP); err != nil {
return err
}
found = true
break
}
}
}
if !found {
return fmt.Errorf("index %q in the middle of being added, try again later", idxName)
}

if err := tableDesc.Validate(ctx, p.txn, p.EvalContext().Settings); err != nil {
return err
}
mutationID, err := p.createSchemaChangeJob(ctx, tableDesc, jobDesc)
if err != nil {
return err
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/show_trace
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,6 @@ dist sender send querying next range at /Table/3/1/53/2/1
dist sender send r1: sending batch 1 Get to (n1,s1):1
dist sender send querying next range at /Table/5/1/0/2/1
dist sender send r1: sending batch 1 Get to (n1,s1):1
dist sender send querying next range at /Table/3/1/53/2/1
dist sender send r1: sending batch 1 Get to (n1,s1):1
sql txn Put /Table/3/1/54/2/1 -> table:<name:"kv" id:54 parent_id:53 version:5 modification_time:<wall_time:... > columns:<name:"k" id:1 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:false hidden:false > columns:<name:"v" id:2 type:<semantic_type:INT width:0 precision:0 visible_type:NONE > nullable:true hidden:false > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"primary" id:1 unique:true column_names:"k" column_directions:ASC column_ids:1 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION > interleave:<> partitioning:<num_columns:0 > type:FORWARD > next_index_id:3 privileges:<users:<user:"admin" privileges:2 > users:<user:"root" privileges:2 > > mutations:<index:<name:"woo" id:2 unique:true column_names:"v" column_directions:ASC column_ids:2 extra_column_ids:1 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION > interleave:<> partitioning:<num_columns:0 > type:FORWARD > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC view_query:"" mutationJobs:<...> drop_time:0 replacement_of:<id:0 time:<> > audit_mode:DISABLED drop_job_id:0 >
dist sender send querying next range at /Table/3/1/54/2/1
dist sender send r1: sending batch 1 Put to (n1,s1):1
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,6 +2405,10 @@ INSERT INTO t.kv VALUES ('a', 'b');
`schema change statement cannot follow a statement that has written in the same transaction`},
// schema change at the end of a read only transaction.
{`select-create`, `SELECT * FROM t.kv`, `CREATE INDEX bar ON t.kv (v)`, ``},
// ADD and DROP column squashed and works.
{`add-drop-column`, `ALTER TABLE t.kv ADD COLUMN i INT`, `ALTER TABLE t.kv DROP COLUMN i`, ``},
// CREATE and DROP index squashed and works.
{`add-drop-index`, `CREATE INDEX foobar ON t.kv (v)`, `DROP INDEX t.kv@foobar`, ``},
}

for _, testCase := range testCases {
Expand Down
89 changes: 89 additions & 0 deletions pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,91 @@ func (p *planner) writeTableDesc(
return p.txn.Run(ctx, b)
}

// squashNewMutations rewrites the mutations list, removing
// mutations where schema elements are added and dropped in
// the same transaction, and marking the mutation jobs as
// succeeded.
func (p *planner) squashNewMutations(
ctx context.Context, desc *sqlbase.MutableTableDescriptor,
) error {
if len(desc.ClusterVersion.Mutations) == len(desc.Mutations) {
return nil
}

origLen := len(desc.ClusterVersion.Mutations)
addDroppedColumns := make(map[sqlbase.ColumnID]bool)
addDroppedIndexes := make(map[sqlbase.IndexID]bool)
for _, m := range desc.Mutations[origLen:] {
switch t := m.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
switch m.Direction {
case sqlbase.DescriptorMutation_ADD:
addDroppedColumns[t.Column.ID] = false
case sqlbase.DescriptorMutation_DROP:
if _, ok := addDroppedColumns[t.Column.ID]; ok {
addDroppedColumns[t.Column.ID] = true
}
}
case *sqlbase.DescriptorMutation_Index:
switch m.Direction {
case sqlbase.DescriptorMutation_ADD:
addDroppedIndexes[t.Index.ID] = false
case sqlbase.DescriptorMutation_DROP:
if _, ok := addDroppedIndexes[t.Index.ID]; ok {
addDroppedIndexes[t.Index.ID] = true
}
}
}
}

var newMutations []sqlbase.DescriptorMutation
var squashedIDs []sqlbase.MutationID

for _, m := range desc.Mutations[origLen:] {
var squashed bool
switch t := m.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
squashed = addDroppedColumns[t.Column.ID]
case *sqlbase.DescriptorMutation_Index:
squashed = addDroppedIndexes[t.Index.ID]
}
if squashed {
squashedIDs = append(squashedIDs, m.MutationID)
// Perform any cleanup after drop is succeeded.
if m.Direction == sqlbase.DescriptorMutation_DROP {
if err := desc.MakeMutationComplete(m); err != nil {
return err
}
}
} else {
newMutations = append(newMutations, m)
}
}

var prev sqlbase.MutationID
for _, id := range squashedIDs {
if prev != id {
prev = id
jobID, err := getJobIDForMutationWithDescriptor(ctx, desc.TableDesc(), id)
if err != nil {
return err
}

job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
return err
}

if err := job.WithTxn(p.txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err, "failed to mark job %d as as successful", jobID)
}
}
}

desc.Mutations = append(desc.ClusterVersion.Mutations, newMutations...)
return nil
}

func (p *planner) writeTableDescToBatch(
ctx context.Context,
tableDesc *sqlbase.MutableTableDescriptor,
Expand Down Expand Up @@ -811,6 +896,10 @@ func (p *planner) writeTableDescToBatch(
p.queueSchemaChange(tableDesc.TableDesc(), mutationID)
}

if err := p.squashNewMutations(ctx, tableDesc); err != nil {
return err
}

if err := tableDesc.ValidateTable(p.extendedEvalCtx.Settings); err != nil {
return pgerror.NewAssertionErrorf("table descriptor is not valid: %s\n%v", err, tableDesc)
}
Expand Down

0 comments on commit 240b219

Please sign in to comment.