Skip to content

Commit

Permalink
executor: load data/batch insert improvement reducing memory a… (#11284)
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and zz-jason committed Jul 23, 2019
1 parent 3b6d2f4 commit 119d532
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
3 changes: 3 additions & 0 deletions executor/builder.go
Expand Up @@ -681,6 +681,9 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
},
}

var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
}

Expand Down
21 changes: 18 additions & 3 deletions executor/insert_common.go
Expand Up @@ -36,6 +36,7 @@ type InsertValues struct {
batchChecker

rowCount uint64
curBatchCnt uint64
maxRowsInBatch uint64
lastInsertID uint64
hasRefCols bool
Expand Down Expand Up @@ -379,6 +380,20 @@ func (e *InsertValues) getRow(ctx context.Context, vals []types.Datum) ([]types.
return e.fillRow(ctx, row, hasValue)
}

func (e *InsertValues) getRowInPlace(ctx context.Context, vals []types.Datum, rowBuf []types.Datum) ([]types.Datum, error) {
hasValue := make([]bool, len(e.Table.Cols()))
for i, v := range vals {
casted, err := table.CastValue(e.ctx, v, e.insertColumns[i].ToInfo())
if e.filterErr(err) != nil {
return nil, err
}
offset := e.insertColumns[i].Offset
rowBuf[offset] = casted
hasValue[offset] = true
}
return e.fillRow(ctx, rowBuf, hasValue)
}

func (e *InsertValues) filterErr(err error) error {
if err == nil {
return nil
Expand Down Expand Up @@ -565,25 +580,25 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func(
}
// append warnings and get no duplicated error rows
for i, r := range e.toBeCheckedRows {
skip := false
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
rows[i] = nil
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
}
}
for _, uk := range r.uniqueKeys {
if _, found := e.dupKVs[string(uk.newKV.key)]; found {
// If duplicate keys were found in BatchGet, mark row = nil.
rows[i] = nil
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
skip = true
break
}
}
// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
if rows[i] != nil {
if !skip {
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
_, err = addRecord(rows[i])
if err != nil {
Expand Down
23 changes: 15 additions & 8 deletions executor/load_data.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -112,7 +113,12 @@ type LoadDataInfo struct {
// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
e.rows = make([][]types.Datum, 0, limit)
if uint64(cap(e.rows)) < limit {
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
}
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -174,7 +180,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool)
}
endIdx := -1
if len(curData) >= curStartIdx {
endIdx = strings.Index(string(curData[curStartIdx:]), e.LinesInfo.Terminated)
endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated)
}
if endIdx == -1 {
// no terminated symbol
Expand Down Expand Up @@ -253,8 +259,9 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
if err != nil {
return nil, false, err
}
e.rows = append(e.rows, e.colsToRow(ctx, cols))
e.colsToRow(ctx, cols)
e.rowCount++
e.curBatchCnt++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
Expand All @@ -268,15 +275,15 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch() error {
var err error
if len(e.rows) == 0 {
if e.curBatchCnt == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(e.rows)))
err = e.batchCheckAndInsert(e.rows, e.addRecordLD)
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(e.rows[0:e.curBatchCnt], e.addRecordLD)
if err != nil {
return err
}
e.rows = e.rows[:0]
e.curBatchCnt = 0
return err
}

Expand Down Expand Up @@ -312,7 +319,7 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []field) []types.Datu
e.row[i].SetString(string(cols[i].str))
}
}
row, err := e.getRow(ctx, e.row)
row, err := e.getRowInPlace(ctx, e.row, e.rows[e.curBatchCnt])
if err != nil {
e.handleWarning(err)
return nil
Expand Down

0 comments on commit 119d532

Please sign in to comment.