From 2e8e89cfcadcbf55996c509c9f4ab09979922277 Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Wed, 18 Dec 2019 14:52:17 +0800 Subject: [PATCH] executor: implement disk-based sort (Part 1) (#13718) --- executor/explain_test.go | 18 +++- executor/sort.go | 215 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 219 insertions(+), 14 deletions(-) diff --git a/executor/explain_test.go b/executor/explain_test.go index 8a274ecd7604a..8af54865216c0 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -163,7 +163,8 @@ func (s *testSuite1) TestMemoryUsageAfterClose(c *C) { for i := 0; i < tk.Se.GetSessionVars().MaxChunkSize*5; i++ { tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i)) } - SQLs := []string{"select v+abs(k) from t"} + SQLs := []string{"select v+abs(k) from t", + "select v from t order by v"} for _, sql := range SQLs { tk.MustQuery(sql) c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.BytesConsumed(), Equals, int64(0)) @@ -171,6 +172,21 @@ func (s *testSuite1) TestMemoryUsageAfterClose(c *C) { } } +func (s *testSuite1) TestDiskUsageAfterClose(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (v int, k int, key(k))") + for i := 0; i < tk.Se.GetSessionVars().MaxChunkSize*5; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i)) + } + SQLs := []string{ + "select v from t order by v"} + for _, sql := range SQLs { + tk.MustQuery(sql) + c.Assert(tk.Se.GetSessionVars().StmtCtx.DiskTracker.BytesConsumed(), Equals, int64(0)) + } +} + func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") diff --git a/executor/sort.go b/executor/sort.go index aa895015b1eec..65129c8c7f8c9 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -18,11 +18,13 @@ import ( "context" "fmt" "sort" + "sync/atomic" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" ) @@ -49,12 +51,57 @@ type SortExec struct { // rowPointer store the chunk index and row index for each row. rowPtrs []chunk.RowPtr - memTracker *memory.Tracker + memTracker *memory.Tracker + diskTracker *disk.Tracker + + // rowChunksInDisk is the chunks to store row values in disk. + rowChunksInDisk *chunk.ListInDisk + // rowPtrsInDisk store the disk-chunk index and row index for each row. + rowPtrsInDisk []chunk.RowPtr + // partitionList is the chunks to store row values in disk for partitions. + partitionList []*chunk.ListInDisk + // partitionRowPtrs store the disk-chunk index and row index for each row for partitions. + partitionRowPtrs [][]chunk.RowPtr + // exceeded indicates that records have exceeded memQuota during + // adding this chunk and we should spill now. + exceeded uint32 + // spilled indicates that records have spilled out into disk. + spilled uint32 } // Close implements the Executor Close interface. func (e *SortExec) Close() error { + if e.alreadySpilled() { + if e.rowChunksInDisk != nil { + if err := e.rowChunksInDisk.Close(); err != nil { + return err + } + } + for _, chunkInDisk := range e.partitionList { + if chunkInDisk != nil { + if err := chunkInDisk.Close(); err != nil { + return err + } + } + } + e.rowChunksInDisk = nil + e.partitionList = e.partitionList[:0] + + e.memTracker.Consume(int64(-8 * cap(e.rowPtrsInDisk))) + e.rowPtrsInDisk = nil + for _, partitionPtrs := range e.partitionRowPtrs { + e.memTracker.Consume(int64(-8 * cap(partitionPtrs))) + } + e.partitionRowPtrs = nil + } + if e.rowChunks != nil { + e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed()) + e.rowChunks = nil + } + e.memTracker.Consume(int64(-8 * cap(e.rowPtrs))) + e.rowPtrs = nil e.memTracker = nil + e.diskTracker = nil return e.children[0].Close() } @@ -67,7 +114,15 @@ func (e *SortExec) Open(ctx context.Context) error { if e.memTracker == nil { e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaSort) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.diskTracker = memory.NewTracker(e.id, -1) + e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) } + e.exceeded = 0 + e.spilled = 0 + e.rowChunksInDisk = nil + e.rowPtrsInDisk = e.rowPtrsInDisk[:0] + e.partitionList = e.partitionList[:0] + e.partitionRowPtrs = e.partitionRowPtrs[:0] return e.children[0].Open(ctx) } @@ -79,20 +134,65 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } - e.initPointers() - e.initCompareFuncs() - e.buildKeyColumns() - sort.Slice(e.rowPtrs, e.keyColumnsLess) - e.fetched = true + if e.alreadySpilled() { + err = e.prepareExternalSorting() + if err != nil { + return err + } + e.fetched = true + } else { + e.initPointers() + e.initCompareFuncs() + e.buildKeyColumns() + sort.Slice(e.rowPtrs, e.keyColumnsLess) + e.fetched = true + } } - for !req.IsFull() && e.Idx < len(e.rowPtrs) { - rowPtr := e.rowPtrs[e.Idx] - req.AppendRow(e.rowChunks.GetRow(rowPtr)) - e.Idx++ + + if e.alreadySpilled() { + for !req.IsFull() && e.Idx < len(e.partitionRowPtrs[0]) { + rowPtr := e.partitionRowPtrs[0][e.Idx] + row, err := e.partitionList[0].GetRow(rowPtr) + if err != nil { + return err + } + req.AppendRow(row) + e.Idx++ + } + } else { + for !req.IsFull() && e.Idx < len(e.rowPtrs) { + rowPtr := e.rowPtrs[e.Idx] + req.AppendRow(e.rowChunks.GetRow(rowPtr)) + e.Idx++ + } } return nil } +func (e *SortExec) prepareExternalSorting() (err error) { + e.initCompareFuncs() + e.buildKeyColumns() + e.rowPtrsInDisk = e.initPointersForListInDisk(e.rowChunksInDisk) + // partition sort + // Now only have one partition. + // The partition will be adjusted in the next pr. + err = e.readPartition(e.rowChunksInDisk, e.rowPtrsInDisk) + if err != nil { + return err + } + e.initPointers() + sort.Slice(e.rowPtrs, e.keyColumnsLess) + listInDisk, err := e.spillToDiskByRowPtr() + if err != nil { + return err + } + e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed()) + e.rowChunks = nil + e.partitionList = append(e.partitionList, listInDisk) + e.partitionRowPtrs = append(e.partitionRowPtrs, e.initPointersForListInDisk(listInDisk)) + return err +} + func (e *SortExec) fetchRowChunks(ctx context.Context) error { fields := retTypes(e) e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize) @@ -108,20 +208,53 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { if rowCount == 0 { break } - e.rowChunks.Add(chk) + if e.alreadySpilled() { + // append chk to disk. + err := e.rowChunksInDisk.Add(chk) + if err != nil { + return err + } + } else { + e.rowChunks.Add(chk) + if atomic.LoadUint32(&e.exceeded) == 1 { + e.rowChunksInDisk, err = e.spillToDisk() + if err != nil { + return err + } + e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed()) + e.rowChunks = nil // GC its internal chunks. + atomic.StoreUint32(&e.spilled, 1) + } + } } return nil } func (e *SortExec) initPointers() { - e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len()) - e.memTracker.Consume(int64(8 * e.rowChunks.Len())) + if e.rowPtrs != nil { + e.memTracker.Consume(int64(-8 * cap(e.rowPtrs))) + e.rowPtrs = e.rowPtrs[:0] + } else { + e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len()) + } for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { rowChk := e.rowChunks.GetChunk(chkIdx) for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ { e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } } + e.memTracker.Consume(int64(8 * cap(e.rowPtrs))) +} + +func (e *SortExec) initPointersForListInDisk(disk *chunk.ListInDisk) []chunk.RowPtr { + rowPtrsInDisk := make([]chunk.RowPtr, 0) + for chkIdx := 0; chkIdx < disk.NumChunks(); chkIdx++ { + for rowIdx := 0; rowIdx < disk.NumRowsOfChunk(chkIdx); rowIdx++ { + rowPtrsInDisk = append(rowPtrsInDisk, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) + } + } + e.memTracker.Consume(int64(8 * len(rowPtrsInDisk))) + return rowPtrsInDisk } func (e *SortExec) initCompareFuncs() { @@ -163,6 +296,62 @@ func (e *SortExec) keyColumnsLess(i, j int) bool { return e.lessRow(rowI, rowJ) } +func (e *SortExec) readPartition(disk *chunk.ListInDisk, rowPtrs []chunk.RowPtr) error { + e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize) + e.rowChunks.GetMemTracker().AttachTo(e.memTracker) + e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel) + for _, rowPtr := range rowPtrs { + rowPtr, err := disk.GetRow(rowPtr) + if err != nil { + return err + } + e.rowChunks.AppendRow(rowPtr) + } + return nil +} + +// alreadySpilled indicates that records have spilled out into disk. +func (e *SortExec) alreadySpilled() bool { return e.rowChunksInDisk != nil } + +// alreadySpilledSafe indicates that records have spilled out into disk. It's thread-safe. +func (e *SortExec) alreadySpilledSafe() bool { return atomic.LoadUint32(&e.spilled) == 1 } + +func (e *SortExec) spillToDisk() (disk *chunk.ListInDisk, err error) { + N := e.rowChunks.NumChunks() + rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes) + rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker) + for i := 0; i < N; i++ { + chk := e.rowChunks.GetChunk(i) + err = rowChunksInDisk.Add(chk) + if err != nil { + return nil, err + } + } + return rowChunksInDisk, nil +} + +func (e *SortExec) spillToDiskByRowPtr() (disk *chunk.ListInDisk, err error) { + rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes) + rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker) + chk := newFirstChunk(e) + for _, rowPtr := range e.rowPtrs { + chk.AppendRow(e.rowChunks.GetRow(rowPtr)) + if chk.IsFull() { + err := rowChunksInDisk.Add(chk) + if err != nil { + return nil, err + } + chk = newFirstChunk(e) + } + } + if chk.NumRows() != 0 { + if err := rowChunksInDisk.Add(chk); err != nil { + return nil, err + } + } + return rowChunksInDisk, nil +} + // TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT. // Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage. type TopNExec struct {