Skip to content

Commit

Permalink
fix: require connected and approved nodes for scheduling
Browse files Browse the repository at this point in the history
- fixes #3784
  • Loading branch information
frrist committed Apr 25, 2024
1 parent 3a4c578 commit ce7e6c5
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 91 deletions.
53 changes: 31 additions & 22 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ func NewRequesterNode(
}),
)

// node selector
nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{
NodeDiscoverer: nodeInfoStore,
NodeRanker: nodeRankerChain,
})

// evaluation broker
evalBroker, err := evaluation.NewInMemoryBroker(evaluation.InMemoryBrokerParams{
VisibilityTimeout: requesterConfig.EvalBrokerVisibilityTimeout,
Expand Down Expand Up @@ -151,26 +145,41 @@ func NewRequesterNode(
retryStrategy = retryStrategyChain
}

// scheduler provider
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(scheduler.BatchServiceJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
RetryStrategy: retryStrategy,
// TODO(forrest): [refactor] the selector constraints ought to be a parameter to the node selector.
// node selector
nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{
NodeDiscoverer: nodeInfoStore,
NodeRanker: nodeRankerChain,
})
// selector constraints: require nodes be online and approved to schedule
selectorConstraints := orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
}

// scheduler provider
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(
jobStore,
planners,
nodeSelector,
retryStrategy,
selectorConstraints,
)
schedulerProvider := orchestrator.NewMappedSchedulerProvider(map[string]orchestrator.Scheduler{
models.JobTypeBatch: batchServiceJobScheduler,
models.JobTypeService: batchServiceJobScheduler,
models.JobTypeOps: scheduler.NewOpsJobScheduler(scheduler.OpsJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
}),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(scheduler.DaemonJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
}),
models.JobTypeOps: scheduler.NewOpsJobScheduler(
jobStore,
planners,
nodeSelector,
selectorConstraints,
),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(
jobStore,
planners,
nodeSelector,
selectorConstraints,
),
})

workers := make([]*orchestrator.Worker, 0, requesterConfig.WorkerCount)
Expand Down
16 changes: 10 additions & 6 deletions pkg/orchestrator/scheduler/batch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ func (s *BatchJobSchedulerTestSuite) SetupTest() {
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)
s.retryStrategy = retry.NewFixedStrategy(retry.FixedStrategyParams{ShouldRetry: true})

s.scheduler = NewBatchServiceJobScheduler(BatchServiceJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
RetryStrategy: s.retryStrategy,
})
s.scheduler = NewBatchServiceJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
s.retryStrategy,
orchestrator.NodeSelectionConstraints{
RequireConnected: false,
RequireApproval: false,
},
)
}

func TestBatchSchedulerTestSuite(t *testing.T) {
Expand Down
39 changes: 18 additions & 21 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@ import (
// - batch jobs that run until completion on N number of nodes
// - service jobs than run until stopped on N number of nodes
type BatchServiceJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
selectorConstraints orchestrator.NodeSelectionConstraints
}

type BatchServiceJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
RetryStrategy orchestrator.RetryStrategy
}

func NewBatchServiceJobScheduler(params BatchServiceJobSchedulerParams) *BatchServiceJobScheduler {
func NewBatchServiceJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
strategy orchestrator.RetryStrategy,
constraints orchestrator.NodeSelectionConstraints,
) *BatchServiceJobScheduler {
return &BatchServiceJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
retryStrategy: params.RetryStrategy,
jobStore: store,
planner: planner,
nodeSelector: selector,
retryStrategy: strategy,
selectorConstraints: constraints,
}
}

Expand Down Expand Up @@ -155,15 +156,11 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
// placeExecs places the executions
func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error {
if len(execs) > 0 {
// TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used
selectedNodes, err := b.nodeSelector.TopMatchingNodes(
ctx,
job,
len(execs),
&orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
},
&b.selectorConstraints,
)
if err != nil {
return err
Expand Down
25 changes: 13 additions & 12 deletions pkg/orchestrator/scheduler/daemon_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ type DaemonJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
constraints orchestrator.NodeSelectionConstraints
}

type DaemonJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
}

func NewDaemonJobScheduler(params DaemonJobSchedulerParams) *DaemonJobScheduler {
func NewDaemonJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
constraints orchestrator.NodeSelectionConstraints,
) *DaemonJobScheduler {
return &DaemonJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
jobStore: store,
planner: planner,
nodeSelector: selector,
constraints: constraints,
}
}

Expand Down Expand Up @@ -86,11 +87,11 @@ func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}

// Require approval when selecting nodes, but do not require them to be connected.
// Require nodes to be approved and connected to schedule work.
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job,
&orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: false},
&orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: true},
)
if err != nil {
return newExecs, err
Expand Down
14 changes: 9 additions & 5 deletions pkg/orchestrator/scheduler/daemon_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ func (s *DaemonJobSchedulerTestSuite) SetupTest() {
s.planner = orchestrator.NewMockPlanner(ctrl)
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)

s.scheduler = NewDaemonJobScheduler(DaemonJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
})
s.scheduler = NewDaemonJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
},
)
}

func TestDaemonJobSchedulerTestSuite(t *testing.T) {
Expand Down
29 changes: 15 additions & 14 deletions pkg/orchestrator/scheduler/ops_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@ import (

// OpsJobScheduler is a scheduler for batch jobs that run until completion
type OpsJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
selectorConstraints orchestrator.NodeSelectionConstraints
}

type OpsJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
}

func NewOpsJobScheduler(params OpsJobSchedulerParams) *OpsJobScheduler {
func NewOpsJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
constraints orchestrator.NodeSelectionConstraints,
) *OpsJobScheduler {
return &OpsJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
jobStore: store,
planner: planner,
nodeSelector: selector,
selectorConstraints: constraints,
}
}

Expand Down Expand Up @@ -107,7 +108,7 @@ func (b *OpsJobScheduler) createMissingExecs(
ctx,
job, &orchestrator.NodeSelectionConstraints{
RequireApproval: true,
RequireConnected: false,
RequireConnected: true,
})
if err != nil {
return newExecs, err
Expand Down
14 changes: 9 additions & 5 deletions pkg/orchestrator/scheduler/ops_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ func (s *OpsJobSchedulerTestSuite) SetupTest() {
s.planner = orchestrator.NewMockPlanner(ctrl)
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)

s.scheduler = NewOpsJobScheduler(OpsJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
})
s.scheduler = NewOpsJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
orchestrator.NodeSelectionConstraints{
RequireConnected: false,
RequireApproval: false,
},
)
}

func TestOpsJobSchedulerTestSuite(t *testing.T) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/orchestrator/scheduler/service_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ func (s *ServiceJobSchedulerTestSuite) SetupTest() {
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)
s.retryStrategy = retry.NewFixedStrategy(retry.FixedStrategyParams{ShouldRetry: true})

s.scheduler = NewBatchServiceJobScheduler(BatchServiceJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
RetryStrategy: s.retryStrategy,
})
s.scheduler = NewBatchServiceJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
s.retryStrategy,
orchestrator.NodeSelectionConstraints{
RequireConnected: false,
RequireApproval: false,
},
)
}

func TestServiceSchedulerTestSuite(t *testing.T) {
Expand Down

0 comments on commit ce7e6c5

Please sign in to comment.