Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: remove stores that have no region before balance #52966

Merged
merged 3 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 30,
deps = [
"//pkg/kv",
"//pkg/store/driver/backoff",
Expand All @@ -92,6 +92,7 @@ go_test(
"//pkg/util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
Expand Down
88 changes: 60 additions & 28 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,12 +820,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore
return
}

func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap))
for _, store := range allTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
if ok {
allUsedTiFlashStores = append(allUsedTiFlashStores, store)
}
}
return allUsedTiFlashStores
}

// getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs.
// If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone.
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
aliveStores = new(aliveStoresBundle)
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store)
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store)

if !tiflashReplicaReadPolicy.IsAllReplicas() {
aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
Expand All @@ -850,9 +862,8 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time
// 1. tiflash_replica_read policy
// 2. whether the store is alive
// After filtering, it will build the RegionInfo.
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, allStores []uint64, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
needCrossZoneAccess := false
allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy)
regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}
if needCrossZoneAccess {
Expand Down Expand Up @@ -896,10 +907,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
if !isTiDBLabelZoneSet {
tiflashReplicaReadPolicy = tiflash.AllReplicas
}
aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

for {
var tasks []*copTask
rangesLen = 0
Expand All @@ -920,12 +928,10 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks))
usedTiFlashStores := make([][]uint64, 0, len(tasks))
usedTiFlashStoresMap := make(map[uint64]struct{}, 0)
needRetry := false
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
if err != nil {
Expand All @@ -942,36 +948,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
// Then `splitRegion` will reloads these regions.
continue
}

allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
for _, storeID := range allStores {
usedTiFlashStoresMap[storeID] = struct{}{}
}
rpcCtxs = append(rpcCtxs, rpcCtx)
usedTiFlashStores = append(usedTiFlashStores, allStores)
}

if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
if len(aliveStores.storeIDsInTiDBZone) == 0 {
return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone)
}
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for idx, task := range tasks {
var err error
var regionInfo RegionInfo
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
if err != nil {
return nil, err
}
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
storeAddr: rpcCtxs[idx].Addr,
cmdType: cmdType,
ctx: rpcCtx,
ctx: rpcCtxs[idx],
regionInfos: []RegionInfo{regionInfo},
}
storeTaskMap[rpcCtx.Addr] = batchTask
storeTaskMap[rpcCtxs[idx].Addr] = batchTask
}
for _, storeID := range regionInfo.AllStores {
storeIDsUnionSetForAllTasks[storeID] = struct{}{}
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

if len(regionIDsInOtherZones) != 0 {
warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone)
regionIDErrMsg := ""
Expand Down
38 changes: 38 additions & 0 deletions pkg/store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/store/driver/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stathat/consistent"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) {
require.GreaterOrEqual(t, dura, 30*time.Second)
require.LessOrEqual(t, dura, 50*time.Second)
}

func TestGetAllUsedTiFlashStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}

cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2})

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
require.Equal(t, 3, len(allTiFlashStores))
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}