Skip to content

Commit

Permalink
*: fix panic when to enable lite-init-stats and concurrently-init-sta…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and hawkingrei committed Apr 1, 2024
1 parent 2cd930e commit ab26c0c
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 26 deletions.
217 changes: 192 additions & 25 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package handle

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
Expand All @@ -36,9 +39,20 @@ import (
"go.uber.org/zap"
)

const initStatsStep = int64(500)

var maxTidRecord MaxTidRecord

// MaxTidRecord is to record the max tid.
type MaxTidRecord struct {
mu sync.Mutex
tid atomic.Int64
}

func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
var physicalID int64
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
physicalID = row.GetInt64(1)
// The table is read-only. Please do not modify it.
table, ok := h.TableInfoByID(is, physicalID)
if !ok {
Expand All @@ -61,6 +75,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsC
}
cache.Put(physicalID, tbl) // put this table again since it is updated
}
maxTidRecord.mu.Lock()
defer maxTidRecord.mu.Unlock()
if maxTidRecord.tid.Load() < physicalID {
maxTidRecord.tid.Store(physicalID)
}
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error) {
Expand Down Expand Up @@ -244,12 +263,6 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache util.St
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -273,12 +286,39 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
}
return nil
}

func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache util.StatsCache, task initstats.Task) error {
se, err := h.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -295,6 +335,24 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC
return nil
}

func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache util.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + initStatsStep,
})
tid += initStatsStep
}
ls.Wait()
return nil
}

func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
affectedIndexes := make(map[*statistics.Index]struct{})
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
Expand Down Expand Up @@ -352,6 +410,57 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error {
return nil
}

func (h *Handle) initStatsTopNByPaging(cache util.StatsCache, task initstats.Task) error {
se, err := h.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
}
return nil
}

func (h *Handle) initStatsTopNConcurrency(cache util.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsTopNByPaging(cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + initStatsStep,
})
tid += initStatsStep
}
ls.Wait()
return nil
}

func (*Handle) initStatsFMSketch4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.Get(row.GetInt64(0))
Expand Down Expand Up @@ -455,19 +564,18 @@ func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterato
}

func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsBuckets4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
err := h.initStatsBucketsConcurrency(cache)
if err != nil {
return errors.Trace(err)
}
} else {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand Down Expand Up @@ -501,6 +609,57 @@ func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
return nil
}

func (h *Handle) initStatsBucketsByPaging(cache util.StatsCache, task initstats.Task) error {
se, err := h.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsBuckets4Chunk(cache, iter)
}
return nil
}

func (h *Handle) initStatsBucketsConcurrency(cache util.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsBucketsByPaging(cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + initStatsStep,
})
tid += initStatsStep
}
ls.Wait()
return nil
}

// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats.
// Column/index stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN.
func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
Expand Down Expand Up @@ -545,11 +704,19 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return errors.Trace(err)
}
err = h.initStatsHistograms(is, cache)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache)
} else {
err = h.initStatsHistograms(is, cache)
}
if err != nil {
return errors.Trace(err)
}
err = h.initStatsTopN(cache)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsTopNConcurrency(cache)
} else {
err = h.initStatsTopN(cache)
}
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/statistics/handle/initstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "initstats",
srcs = ["load_stats.go"],
srcs = [
"load_stats.go",
"load_stats_page.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -12,6 +15,7 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
Expand Down
74 changes: 74 additions & 0 deletions pkg/statistics/handle/initstats/load_stats_page.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package initstats

import (
"runtime"

"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

// Task represents the range of the table for loading stats.
type Task struct {
StartTid int64
EndTid int64
}

// RangeWorker is used to load stats concurrently by the range of table id.
type RangeWorker struct {
dealFunc func(task Task) error
taskChan chan Task

wg util.WaitGroupWrapper
}

// NewRangeWorker creates a new RangeWorker.
func NewRangeWorker(dealFunc func(task Task) error) *RangeWorker {
return &RangeWorker{
dealFunc: dealFunc,
taskChan: make(chan Task, 1),
}
}

// LoadStats loads stats concurrently when to init stats
func (ls *RangeWorker) LoadStats() {
concurrency := runtime.GOMAXPROCS(0)
for n := 0; n < concurrency; n++ {
ls.wg.Run(func() {
ls.loadStats()
})
}
}

func (ls *RangeWorker) loadStats() {
for task := range ls.taskChan {
if err := ls.dealFunc(task); err != nil {
logutil.BgLogger().Error("load stats failed", zap.Error(err))
}
}
}

// SendTask sends a task to the load stats worker.
func (ls *RangeWorker) SendTask(task Task) {
ls.taskChan <- task
}

// Wait closes the load stats worker.
func (ls *RangeWorker) Wait() {
close(ls.taskChan)
ls.wg.Wait()
}

0 comments on commit ab26c0c

Please sign in to comment.