-
Notifications
You must be signed in to change notification settings - Fork 212
/
scheduler.go
230 lines (205 loc) · 7.37 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"reflect"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/handler"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
// Main Scheduler service that starts the needed sub services
type Scheduler struct {
clusterContext *ClusterContext // main context
pendingEvents chan interface{} // queue for events
activityPending chan bool // activity pending channel
stop chan struct{} // channel to signal stop request
healthChecker *HealthChecker
nodesMonitor *nodesResourceUsageMonitor
}
func NewScheduler() *Scheduler {
m := &Scheduler{}
m.clusterContext = newClusterContext()
m.pendingEvents = make(chan interface{}, 1024*1024)
m.activityPending = make(chan bool, 1)
m.stop = make(chan struct{})
return m
}
// Start service
func (s *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule bool) {
// set the proxy handler in the context
s.clusterContext.setEventHandler(handlers.RMProxyEventHandler)
// Start event handlers
go s.handleRMEvent()
// Start resource monitor if necessary (majorly for testing)
s.nodesMonitor = newNodesResourceUsageMonitor(s.clusterContext)
s.nodesMonitor.start()
// Start health check periodically
s.healthChecker = NewHealthChecker(s.clusterContext)
s.healthChecker.Start()
if !manualSchedule {
go s.internalSchedule()
go s.internalInspectOutstandingRequests()
}
}
// Internal start scheduling service
func (s *Scheduler) internalSchedule() {
for {
select {
case <-s.stop:
return
case <-s.activityPending:
// activity pending
case <-time.After(100 * time.Millisecond):
// timeout, run scheduler anyway
}
if s.clusterContext.schedule() {
s.registerActivity()
}
}
}
func (s *Scheduler) internalInspectOutstandingRequests() {
for {
select {
case <-s.stop:
return
case <-time.After(time.Second):
if noRequests, totalResources := s.inspectOutstandingRequests(); noRequests > 0 {
log.Log(log.Scheduler).Info("Found outstanding requests that will trigger autoscaling",
zap.Int("number of requests", noRequests),
zap.Stringer("total resources", totalResources))
}
}
}
}
// Implement methods for Scheduler events
func (s *Scheduler) HandleEvent(ev interface{}) {
enqueueAndCheckFull(s.pendingEvents, ev)
}
func enqueueAndCheckFull(queue chan interface{}, ev interface{}) {
select {
case queue <- ev:
log.Log(log.Scheduler).Debug("enqueued event",
zap.Stringer("eventType", reflect.TypeOf(ev)),
zap.Any("event", ev),
zap.Int("currentQueueSize", len(queue)))
default:
log.Log(log.Scheduler).DPanic("failed to enqueue event",
zap.Stringer("event", reflect.TypeOf(ev)))
}
}
func (s *Scheduler) handleRMEvent() {
for {
select {
case ev := <-s.pendingEvents:
switch v := ev.(type) {
case *rmevent.RMUpdateAllocationEvent:
s.clusterContext.handleRMUpdateAllocationEvent(v)
case *rmevent.RMUpdateApplicationEvent:
s.clusterContext.handleRMUpdateApplicationEvent(v)
case *rmevent.RMUpdateNodeEvent:
s.clusterContext.handleRMUpdateNodeEvent(v)
case *rmevent.RMPartitionsRemoveEvent:
s.clusterContext.removePartitionsByRMID(v)
case *rmevent.RMRegistrationEvent:
s.clusterContext.processRMRegistrationEvent(v)
case *rmevent.RMConfigUpdateEvent:
s.clusterContext.processRMConfigUpdateEvent(v)
default:
log.Log(log.Scheduler).Error("Received type is not an acceptable type for RM event.",
zap.Stringer("received type", reflect.TypeOf(v)))
}
s.registerActivity()
case <-s.stop:
return
}
}
}
// registerActivity is used to notify the scheduler that some activity that may impact scheduling results has occurred.
func (s *Scheduler) registerActivity() {
select {
case s.activityPending <- true:
// activity registered
default:
// buffer is full, activity will be processed at the next available opportunity
}
}
// inspect on the outstanding requests for each of the queues,
// update request state accordingly to shim if needed.
// this function filters out all outstanding requests that being
// skipped due to insufficient cluster resources and update the
// state through the ContainerSchedulingStateUpdaterPlugin in order
// to trigger the auto-scaling.
func (s *Scheduler) inspectOutstandingRequests() (int, *resources.Resource) {
log.Log(log.Scheduler).Debug("inspect outstanding requests")
// schedule each partition defined in the cluster
total := resources.NewResource()
noRequests := 0
for _, psc := range s.clusterContext.GetPartitionMapClone() {
requests := psc.calculateOutstandingRequests()
noRequests = len(requests)
if noRequests > 0 {
for _, ask := range requests {
log.Log(log.Scheduler).Debug("outstanding request",
zap.String("appID", ask.GetApplicationID()),
zap.String("allocationKey", ask.GetAllocationKey()))
// these asks are queue outstanding requests,
// they can fit into the max head room, but they are pending because lack of partition resources
if updater := plugins.GetResourceManagerCallbackPlugin(); updater != nil {
updater.UpdateContainerSchedulingState(&si.UpdateContainerSchedulingStateRequest{
ApplicationID: ask.GetApplicationID(),
AllocationKey: ask.GetAllocationKey(),
State: si.UpdateContainerSchedulingStateRequest_FAILED,
Reason: "request is waiting for cluster resources become available",
})
}
total.AddTo(ask.GetAllocatedResource())
ask.SetScaleUpTriggered(true)
}
}
}
return noRequests, total
}
// Visible by tests
func (s *Scheduler) GetClusterContext() *ClusterContext {
return s.clusterContext
}
// The scheduler for testing which runs nAlloc times the normal schedule routine.
// Visible by tests
func (s *Scheduler) MultiStepSchedule(nAlloc int) {
for i := 0; i < nAlloc; i++ {
log.Log(log.Scheduler).Debug("Scheduler manual stepping",
zap.Int("count", i))
s.clusterContext.schedule()
// sometimes the smoke tests are failing because they are competing CPU resources.
// each scheduling cycle, let's sleep for a small amount of time (100ms),
// this can ensure even CPU is intensive, the main thread can give up some CPU time
// for other go routines to process, such as event handling routines.
// Note, this sleep only works in tests.
time.Sleep(100 * time.Millisecond)
}
}
func (s *Scheduler) Stop() {
log.Log(log.Scheduler).Info("Stopping scheduler & background services")
s.healthChecker.Stop()
s.nodesMonitor.stop()
s.clusterContext.Stop()
close(s.stop)
}