Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: improvment refactor the logic of load data batch insert #11132

Merged
merged 2 commits into from Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions executor/executor_test.go
Expand Up @@ -443,6 +443,8 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch()
c.Assert(err1, IsNil)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
23 changes: 17 additions & 6 deletions executor/load_data.go
Expand Up @@ -106,11 +106,13 @@ type LoadDataInfo struct {
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
e.rows = make([][]types.Datum, 0, limit)
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -223,7 +225,6 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
isEOF = true
prevData, curData = curData, prevData
}
rows := make([][]types.Datum, 0, e.maxRowsInBatch)
for len(curData) > 0 {
line, curData, hasStarting = e.getLine(prevData, curData)
prevData = nil
Expand Down Expand Up @@ -252,7 +253,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
if err != nil {
return nil, false, err
}
rows = append(rows, e.colsToRow(ctx, cols))
e.rows = append(e.rows, e.colsToRow(ctx, cols))
e.rowCount++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
Expand All @@ -261,12 +262,22 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
break
}
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(rows)))
err := e.batchCheckAndInsert(rows, e.addRecordLD)
return curData, reachLimit, nil
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch() error {
var err error
lysu marked this conversation as resolved.
Show resolved Hide resolved
if len(e.rows) == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(e.rows)))
err = e.batchCheckAndInsert(e.rows, e.addRecordLD)
if err != nil {
return nil, reachLimit, err
return err
}
return curData, reachLimit, nil
e.rows = e.rows[:0]
return err
}

// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that
Expand Down
4 changes: 4 additions & 0 deletions executor/write_test.go
Expand Up @@ -1807,6 +1807,8 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch()
c.Assert(err, IsNil)
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2056,6 +2058,8 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch()
c.Assert(err, IsNil)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
9 changes: 8 additions & 1 deletion server/conn.go
Expand Up @@ -1049,6 +1049,10 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat
if !reachLimit {
break
}
err := loadDataInfo.CheckAndInsertOneBatch()
if err != nil {
return nil, err
}
if err = loadDataInfo.Ctx.StmtCommit(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1113,7 +1117,10 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
if err != nil {
loadDataInfo.Ctx.StmtRollback()
} else {
err = loadDataInfo.Ctx.StmtCommit()
err = loadDataInfo.CheckAndInsertOneBatch()
if err == nil {
err = loadDataInfo.Ctx.StmtCommit()
}
}

var txn kv.Transaction
Expand Down