forked from tikv/pd
/
scheduler.go
129 lines (111 loc) · 3.77 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package schedule
import (
"sync"
"time"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
)
// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
GetStores() []*core.StoreInfo
GetStore(id uint64) *core.StoreInfo
GetRegion(id uint64) *core.RegionInfo
GetRegionStores(region *core.RegionInfo) []*core.StoreInfo
GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo
GetLeaderStore(region *core.RegionInfo) *core.StoreInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo
BlockStore(id uint64) error
UnblockStore(id uint64)
IsRegionHot(id uint64) bool
RegionWriteStats() []*core.RegionStat
RegionReadStats() []*core.RegionStat
RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo
// get config methods
GetOpt() NamespaceOptions
Options
// TODO: it should be removed. Schedulers don't need to know anything
// about peers.
AllocPeer(storeID uint64) (*metapb.Peer, error)
}
// Scheduler is an interface to schedule resources.
type Scheduler interface {
GetName() string
// GetType should in accordance with the name passing to schedule.RegisterScheduler()
GetType() string
GetMinInterval() time.Duration
GetNextInterval(interval time.Duration) time.Duration
Prepare(cluster Cluster) error
Cleanup(cluster Cluster)
Schedule(cluster Cluster, opInfluence OpInfluence) []*Operator
IsScheduleAllowed(cluster Cluster) bool
}
// CreateSchedulerFunc is for creating scheudler.
type CreateSchedulerFunc func(limiter *Limiter, args []string) (Scheduler, error)
var schedulerMap = make(map[string]CreateSchedulerFunc)
// RegisterScheduler binds a scheduler creator. It should be called in init()
// func of a package.
func RegisterScheduler(name string, createFn CreateSchedulerFunc) {
if _, ok := schedulerMap[name]; ok {
log.Fatalf("duplicated scheduler name: %v", name)
}
schedulerMap[name] = createFn
}
// CreateScheduler creates a scheduler with registered creator func.
func CreateScheduler(name string, limiter *Limiter, args ...string) (Scheduler, error) {
fn, ok := schedulerMap[name]
if !ok {
return nil, errors.Errorf("create func of %v is not registered", name)
}
return fn(limiter, args)
}
// Limiter a counter that limits the number of operators
type Limiter struct {
sync.RWMutex
counts map[OperatorKind]uint64
}
// NewLimiter create a schedule limiter
func NewLimiter() *Limiter {
return &Limiter{
counts: make(map[OperatorKind]uint64),
}
}
// UpdateCounts updates resouce counts using current pending operators.
func (l *Limiter) UpdateCounts(operators map[uint64]*Operator) {
l.Lock()
defer l.Unlock()
for k := range l.counts {
delete(l.counts, k)
}
for _, op := range operators {
l.counts[op.Kind()]++
}
}
// OperatorCount gets the count of operators filtered by mask.
func (l *Limiter) OperatorCount(mask OperatorKind) uint64 {
l.RLock()
defer l.RUnlock()
var total uint64
for k, count := range l.counts {
if k&mask != 0 {
total += count
}
}
return total
}