Skip to content

Commit

Permalink
executor, stats: extract topn from cm sketch (pingcap#11409)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx committed Nov 13, 2019
1 parent 8320728 commit 2860e52
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 14 deletions.
14 changes: 8 additions & 6 deletions executor/analyze.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
maxSketchSize = 10000
defaultCMSketchDepth = 5
defaultCMSketchWidth = 2048
defaultNumTopN = uint32(20)
)

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -336,7 +337,8 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
}
}
}
return hist, cms, nil
err := hist.ExtractTopN(cms, len(e.idxInfo.Columns), defaultNumTopN)
return hist, cms, err
}

func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, err error) {
Expand Down Expand Up @@ -509,6 +511,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis
cms = append(cms, nil)
}
for i, col := range e.colsInfo {
collectors[i].ExtractTopN(defaultNumTopN)
for j, s := range collectors[i].Samples {
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
Expand Down Expand Up @@ -1015,14 +1018,13 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
data[i] = append(data[i], sample.Value.GetBytes()[:preLen])
}
}
numTop := uint32(20)
cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[0], numTop, uint64(rowCount))
cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[0], defaultNumTopN, uint64(rowCount))
// Build CM Sketch for each prefix and merge them into one.
for i := 1; i < len(idxInfo.Columns); i++ {
var curCMSketch *statistics.CMSketch
// `ndv` should be the ndv of full index, so just rewrite it here.
curCMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[i], numTop, uint64(rowCount))
err := cmSketch.MergeCMSketch(curCMSketch, numTop)
curCMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[i], defaultNumTopN, uint64(rowCount))
err := cmSketch.MergeCMSketch(curCMSketch, defaultNumTopN)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1199,7 +1201,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
return analyzeResult{Err: err, job: idxExec.job}
}
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS)
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS, defaultNumTopN)
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand Down
27 changes: 27 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,30 @@ func (s *testSuite1) TestFailedAnalyzeRequest(c *C) {
c.Assert(err.Error(), Equals, "mock buildStatsFromResult error")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult"), IsNil)
}

func (s *testSuite1) TestExtractTopN(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, 0)", i+10))
}
tk.MustExec("analyze table t")
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()
tblStats := s.dom.StatsHandle().GetTableStats(tblInfo)
colStats := tblStats.Columns[tblInfo.Columns[1].ID]
c.Assert(len(colStats.CMSketch.TopN()), Equals, 1)
item := colStats.CMSketch.TopN()[0]
c.Assert(item.Count, Equals, uint64(11))
idxStats := tblStats.Indices[tblInfo.Indices[0].ID]
c.Assert(len(idxStats.CMSketch.TopN()), Equals, 1)
item = idxStats.CMSketch.TopN()[0]
c.Assert(item.Count, Equals, uint64(11))
}
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,7 @@ func (s *testSuite1) SetUpSuite(c *C) {
mockstore.WithHijackClient(hijackClient),
)
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom.SetStatsUpdating(true)
Expand Down
37 changes: 29 additions & 8 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func newTopNHelper(sample [][]byte, numTop uint32) *topNHelper {
if actualNumTop >= numTop && sorted[actualNumTop].cnt*3 < sorted[numTop-1].cnt*2 {
break
}
if sorted[actualNumTop].cnt == 1 {
break
}
sumTopN += sorted[actualNumTop].cnt
}

Expand Down Expand Up @@ -247,6 +250,14 @@ func (c *CMSketch) setValue(h1, h2 uint64, count uint64) {
}
}

func (c *CMSketch) subValue(h1, h2 uint64, count uint64) {
c.count -= count
for i := range c.table {
j := (h1 + h2*uint64(i)) % uint64(c.width)
c.table[i][j] = c.table[i][j] - uint32(count)
}
}

func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (uint64, error) {
bytes, err := tablecodec.EncodeValue(sc, val)
if err != nil {
Expand Down Expand Up @@ -290,7 +301,7 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32) {
func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32, usingMax bool) {
counter := make(map[hack.MutableString]uint64)
for _, metas := range lTopN {
for _, meta := range metas {
Expand All @@ -299,7 +310,11 @@ func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*T
}
for _, metas := range rTopN {
for _, meta := range metas {
counter[hack.String(meta.Data)] += meta.Count
if usingMax {
counter[hack.String(meta.Data)] = mathutil.MaxUint64(counter[hack.String(meta.Data)], meta.Count)
} else {
counter[hack.String(meta.Data)] += meta.Count
}
}
}
sorted := make([]uint64, len(counter))
Expand Down Expand Up @@ -332,7 +347,7 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
c.mergeTopN(c.topN, rc.topN, numTopN)
c.mergeTopN(c.topN, rc.topN, numTopN, false)
}
c.count += rc.count
for i := range c.table {
Expand All @@ -351,12 +366,12 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
// (3): For values that appears both in `c` and `rc`, if they do not appear partially in `c` and `rc`, for example,
// if `v` appears 5 times in the table, it can appears 5 times in `c` and 3 times in `rc`, then `max` also gives the correct answer.
// So in fact, if we can know the number of appearances of each value in the first place, it is better to use `max` to construct the CM sketch rather than `sum`.
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch) error {
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch, numTopN uint32) error {
if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
c.mergeTopN(c.topN, rc.topN, numTopN, true)
}
for i := range c.table {
c.count = 0
Expand Down Expand Up @@ -399,10 +414,10 @@ func CMSketchFromProto(protoSketch *tipb.CMSketch) *CMSketch {
c.count = c.count + uint64(counter)
}
}
c.defaultValue = protoSketch.DefaultValue
if len(protoSketch.TopN) == 0 {
return c
}
c.defaultValue = protoSketch.DefaultValue
c.topN = make(map[uint64][]*TopNMeta)
for _, e := range protoSketch.TopN {
h1, h2 := murmur3.Sum128(e.Data)
Expand Down Expand Up @@ -458,9 +473,15 @@ func LoadCMSketchWithTopN(exec sqlexec.RestrictedSQLExecutor, tableID, isIndex,
return decodeCMSketch(cms, topN)
}

// TotalCount returns the count, it is only used for test.
// TotalCount returns the total count in the sketch, it is only used for test.
func (c *CMSketch) TotalCount() uint64 {
return c.count
res := c.count
for _, metas := range c.topN {
for _, meta := range metas {
res += meta.Count
}
}
return res
}

// Equal tests if two CM Sketch equal, it is only used for test.
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
}

func (s *testStatsSuite) TestIndexQueryFeedback(c *C) {
c.Skip("support update the topn of index equal conditions")
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)

Expand Down
61 changes: 61 additions & 0 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"sort"
"strings"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1070,3 +1072,62 @@ func matchPrefix(row chunk.Row, colIdx int, ad *types.Datum) bool {
}
return false
}

func getIndexPrefixLens(data []byte, numCols int) (prefixLens []int, err error) {
prefixLens = make([]int, 0, numCols)
var colData []byte
prefixLen := 0
for len(data) > 0 {
colData, data, err = codec.CutOne(data)
if err != nil {
return nil, err
}
prefixLen += len(colData)
prefixLens = append(prefixLens, prefixLen)
}
return prefixLens, nil
}

// ExtractTopN extracts topn from histogram.
func (hg *Histogram) ExtractTopN(cms *CMSketch, numCols int, numTopN uint32) error {
if hg.Len() == 0 || cms == nil || numTopN == 0 {
return nil
}
dataSet := make(map[string]struct{}, hg.Bounds.NumRows())
dataCnts := make([]dataCnt, 0, hg.Bounds.NumRows())
hg.PreCalculateScalar()
// Set a limit on the frequency of boundary values to avoid extract values with low frequency.
limit := hg.notNullCount() / float64(hg.Len())
// Since our histogram are equal depth, they must occurs on the boundaries of buckets.
for i := 0; i < hg.Bounds.NumRows(); i++ {
data := hg.Bounds.GetRow(i).GetBytes(0)
prefixLens, err := getIndexPrefixLens(data, numCols)
if err != nil {
return err
}
for _, prefixLen := range prefixLens {
prefixColData := data[:prefixLen]
_, ok := dataSet[string(prefixColData)]
if ok {
continue
}
dataSet[string(prefixColData)] = struct{}{}
res := hg.BetweenRowCount(types.NewBytesDatum(prefixColData), types.NewBytesDatum(kv.Key(prefixColData).PrefixNext()))
if res >= limit {
dataCnts = append(dataCnts, dataCnt{prefixColData, uint64(res)})
}
}
}
sort.SliceStable(dataCnts, func(i, j int) bool { return dataCnts[i].cnt >= dataCnts[j].cnt })
cms.topN = make(map[uint64][]*TopNMeta)
if len(dataCnts) > int(numTopN) {
dataCnts = dataCnts[:numTopN]
}
for _, dataCnt := range dataCnts {
h1, h2 := murmur3.Sum128(dataCnt.data)
realCnt := cms.queryHashValue(h1, h2)
cms.subValue(h1, h2, realCnt)
cms.topN[h1] = append(cms.topN[h1], &TopNMeta{h2, dataCnt.data, realCnt})
}
return nil
}
24 changes: 24 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
)

// SampleItem is an item of sampled column value.
Expand Down Expand Up @@ -257,3 +258,26 @@ func RowToDatums(row chunk.Row, fields []*ast.ResultField) []types.Datum {
}
return datums
}

// ExtractTopN extracts the topn from the CM Sketch.
func (c *SampleCollector) ExtractTopN(numTop uint32) {
if numTop == 0 {
return
}
values := make([][]byte, 0, len(c.Samples))
for _, sample := range c.Samples {
values = append(values, sample.Value.GetBytes())
}
helper := newTopNHelper(values, numTop)
cms := c.CMSketch
cms.topN = make(map[uint64][]*TopNMeta)
// Process them decreasingly so we can handle most frequent values first and reduce the probability of hash collision
// by small values.
for i := uint32(0); i < helper.actualNumTop; i++ {
data := helper.sorted[i].data
h1, h2 := murmur3.Sum128(data)
realCnt := cms.queryHashValue(h1, h2)
cms.subValue(h1, h2, realCnt)
cms.topN[h1] = append(cms.topN[h1], &TopNMeta{h2, data, realCnt})
}
}

0 comments on commit 2860e52

Please sign in to comment.