-
Notifications
You must be signed in to change notification settings - Fork 1
/
random_merge.go
136 lines (119 loc) · 4.16 KB
/
random_merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package schedulers
import (
"errors"
"math/rand"
"github.com/deepfabric/prophet/core"
"github.com/deepfabric/prophet/schedule"
"github.com/deepfabric/prophet/schedule/checker"
"github.com/deepfabric/prophet/schedule/filter"
"github.com/deepfabric/prophet/schedule/operator"
"github.com/deepfabric/prophet/schedule/opt"
"github.com/deepfabric/prophet/storage"
"github.com/deepfabric/prophet/util"
)
const (
// RandomMergeName is random merge scheduler name.
RandomMergeName = "random-merge-scheduler"
// RandomMergeType is random merge scheduler type.
RandomMergeType = "random-merge"
)
func init() {
schedule.RegisterSliceDecoderBuilder(RandomMergeType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*randomMergeSchedulerConfig)
if !ok {
return errors.New("scheduler error configuration")
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Name = RandomMergeName
return nil
}
})
schedule.RegisterScheduler(RandomMergeType, func(opController *schedule.OperatorController, storage storage.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &randomMergeSchedulerConfig{}
if err := decoder(conf); err != nil {
return nil, err
}
return newRandomMergeScheduler(opController, conf), nil
})
}
type randomMergeSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
}
type randomMergeScheduler struct {
*BaseScheduler
conf *randomMergeSchedulerConfig
}
// newRandomMergeScheduler creates an admin scheduler that randomly picks two adjacent resources
// then merges them.
func newRandomMergeScheduler(opController *schedule.OperatorController, conf *randomMergeSchedulerConfig) schedule.Scheduler {
base := NewBaseScheduler(opController)
return &randomMergeScheduler{
BaseScheduler: base,
conf: conf,
}
}
func (s *randomMergeScheduler) GetName() string {
return s.conf.Name
}
func (s *randomMergeScheduler) GetType() string {
return RandomMergeType
}
func (s *randomMergeScheduler) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(s.conf)
}
func (s *randomMergeScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return s.OpController.OperatorCount(operator.OpMerge) < cluster.GetOpts().GetMergeScheduleLimit()
}
func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
container := filter.NewCandidates(cluster.GetContainers()).
FilterSource(cluster.GetOpts(), &filter.ContainerStateFilter{ActionScope: s.conf.Name, MoveResource: true}).
RandomPick()
if container == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-source-container").Inc()
return nil
}
res := cluster.RandLeaderResource(container.Meta.ID(), s.conf.Ranges, opt.HealthResource(cluster))
if res == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-resource").Inc()
return nil
}
other, target := cluster.GetAdjacentResources(res)
if !cluster.GetOpts().IsOneWayMergeEnabled() && ((rand.Int()%2 == 0 && other != nil) || target == nil) {
target = other
}
if target == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-target-container").Inc()
return nil
}
if !s.allowMerge(cluster, res, target) {
schedulerCounter.WithLabelValues(s.GetName(), "not-allowed").Inc()
return nil
}
ops, err := operator.CreateMergeResourceOperator(RandomMergeType, cluster, res, target, operator.OpAdmin)
if err != nil {
util.GetLogger().Debugf("create merge resource operator failed with %+v",
err)
return nil
}
ops[0].Counters = append(ops[0].Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return ops
}
func (s *randomMergeScheduler) allowMerge(cluster opt.Cluster, res, target *core.CachedResource) bool {
if !opt.IsResourceHealthy(cluster, res) || !opt.IsResourceHealthy(cluster, target) {
return false
}
if !opt.IsResourceReplicated(cluster, res) || !opt.IsResourceReplicated(cluster, target) {
return false
}
if cluster.IsResourceHot(res) || cluster.IsResourceHot(target) {
return false
}
return checker.AllowMerge(cluster, res, target)
}