Skip to content

Commit

Permalink
executor: reuse chunk row for insert on duplicate update (#12847) (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and sre-bot committed Nov 5, 2019
1 parent 418023f commit 4c7bfb8
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
5 changes: 3 additions & 2 deletions ddl/db_test.go
Expand Up @@ -3030,16 +3030,17 @@ func (s *testDBSuite) TestAddNotNullColumnWhileInsertOnDupUpdate(c *C) {
return
default:
}
_, tk2Err = tk2.Exec("insert nn (a, b) values (1, 1) on duplicate key update a = 1, b = b + 1")
_, tk2Err = tk2.Exec("insert nn (a, b) values (1, 1) on duplicate key update a = 1, b = values(b) + 1")
if tk2Err != nil {
return
}
}
}()
tk1.MustExec("alter table nn add column c int not null default 0")
tk1.MustExec("alter table nn add column c int not null default 3 after a")
close(closeCh)
wg.Wait()
c.Assert(tk2Err, IsNil)
tk1.MustQuery("select * from nn").Check(testkit.Rows("1 3 2"))
}

type testMaxTableRowIDContext struct {
Expand Down
55 changes: 46 additions & 9 deletions executor/insert.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand All @@ -30,8 +31,12 @@ import (
// InsertExec represents an insert executor.
type InsertExec struct {
*InsertValues
OnDuplicate []*expression.Assignment
Priority mysql.PriorityEnum
OnDuplicate []*expression.Assignment
evalBuffer4Dup chunk.MutRow
curInsertVals chunk.MutRow
row4Update []types.Datum

Priority mysql.PriorityEnum
}

func (e *InsertExec) exec(rows [][]types.Datum) error {
Expand Down Expand Up @@ -151,6 +156,9 @@ func (e *InsertExec) Close() error {

// Open implements the Executor Close interface.
func (e *InsertExec) Open(ctx context.Context) error {
if e.OnDuplicate != nil {
e.initEvalBuffer4Dup()
}
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
Expand All @@ -177,32 +185,61 @@ func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
}

func (e *InsertExec) initEvalBuffer4Dup() {
// Use public columns for new row.
numCols := len(e.Table.Cols())
// Use writable columns for old row for update.
numWritableCols := len(e.Table.WritableCols())

evalBufferTypes := make([]*types.FieldType, 0, numCols+numWritableCols)

// Append the old row before the new row, to be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
for _, col := range e.Table.WritableCols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
for _, col := range e.Table.Cols() {
evalBufferTypes = append(evalBufferTypes, &col.FieldType)
}
if e.hasExtraHandle {
evalBufferTypes = append(evalBufferTypes, types.NewFieldType(mysql.TypeLonglong))
}
e.evalBuffer4Dup = chunk.MutRowFromTypes(evalBufferTypes)
e.curInsertVals = chunk.MutRowFromTypes(evalBufferTypes[numWritableCols:])
e.row4Update = make([]types.Datum, 0, len(evalBufferTypes))
}

// doDupRowUpdate updates the duplicate row.
// TODO: Report rows affected.
func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.ctx.GetSessionVars().CurrInsertValues = chunk.MutRowFromDatums(newRow).ToRow()
e.curInsertVals.SetDatums(newRow...)
e.ctx.GetSessionVars().CurrInsertValues = e.curInsertVals.ToRow()

// NOTE: In order to execute the expression inside the column assignment,
// we have to put the value of "oldRow" before "newRow" in "row4Update" to
// be consistent with "Schema4OnDuplicate" in the "Insert" PhysicalPlan.
row4Update := make([]types.Datum, 0, len(oldRow)+len(newRow))
row4Update = append(row4Update, oldRow...)
row4Update = append(row4Update, newRow...)
e.row4Update = e.row4Update[:0]
e.row4Update = append(e.row4Update, oldRow...)
e.row4Update = append(e.row4Update, newRow...)

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
for _, col := range cols {
val, err1 := col.Expr.Eval(chunk.MutRowFromDatums(row4Update).ToRow())
val, err1 := col.Expr.Eval(e.evalBuffer4Dup.ToRow())
if err1 != nil {
return nil, false, 0, err1
}
e.row4Update[col.Col.Index], err1 = table.CastValue(e.ctx, val, col.Col.ToInfo())
if err1 != nil {
return nil, false, 0, errors.Trace(err1)
}
row4Update[col.Col.Index] = val
e.evalBuffer4Dup.SetDatum(col.Col.Index, e.row4Update[col.Col.Index])
assignFlag[col.Col.Index] = true
}

newData := row4Update[:len(oldRow)]
newData := e.row4Update[:len(oldRow)]
_, handleChanged, newHandle, err := updateRecord(e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
if err != nil {
return nil, false, 0, errors.Trace(err)
Expand Down
10 changes: 10 additions & 0 deletions expression/column.go
Expand Up @@ -336,6 +336,16 @@ func (col *Column) resolveIndices(schema *Schema) {
}
}

// ToInfo converts the expression.Column to model.ColumnInfo for casting values,
// beware it doesn't fill all the fields of the model.ColumnInfo.
func (col *Column) ToInfo() *model.ColumnInfo {
return &model.ColumnInfo{
ID: col.ID,
Name: col.ColName,
FieldType: *col.RetType,
}
}

// Column2Exprs will transfer column slice to expression slice.
func Column2Exprs(cols []*Column) []Expression {
result := make([]Expression, 0, len(cols))
Expand Down
4 changes: 2 additions & 2 deletions expression/explain.go
Expand Up @@ -37,8 +37,8 @@ func (expr *ScalarFunction) ExplainInfo() string {
}

// ExplainInfo implements the Expression interface.
func (expr *Column) ExplainInfo() string {
return expr.String()
func (col *Column) ExplainInfo() string {
return col.String()
}

// ExplainInfo implements the Expression interface.
Expand Down

0 comments on commit 4c7bfb8

Please sign in to comment.