Skip to content

Commit

Permalink
fix: require connected and approved nodes for scheduling (#3957)
Browse files Browse the repository at this point in the history
- This change modifies the Requester nodes scheduling constraints s.t.
jobs will only be scheduled on nodes that are online and approved.
Disconnected nodes and nodes that are rejected or pending will not be
eligible to run jobs.
- Additionally, this change cleans up some code by making constraints an
parameter to the node selector - which simplify various parts of
dependency construction.
- Lastly, this change removes some *Param types to avoid the possibility
of NPD.
- fixes #3784

Co-authored-by: frrist <forrest@expanso.io>
  • Loading branch information
frrist and frrist committed May 8, 2024
1 parent 6e12e1f commit 693c6d3
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 178 deletions.
36 changes: 14 additions & 22 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ func NewRequesterNode(
}),
)

// node selector
nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{
NodeDiscoverer: nodeInfoStore,
NodeRanker: nodeRankerChain,
})

// evaluation broker
evalBroker, err := evaluation.NewInMemoryBroker(evaluation.InMemoryBrokerParams{
VisibilityTimeout: requesterConfig.EvalBrokerVisibilityTimeout,
Expand Down Expand Up @@ -151,26 +145,24 @@ func NewRequesterNode(
retryStrategy = retryStrategyChain
}

// node selector
nodeSelector := selector.NewNodeSelector(
nodeInfoStore,
nodeRankerChain,
// selector constraints: require nodes be online and approved to schedule
orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
},
)

// scheduler provider
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(scheduler.BatchServiceJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
RetryStrategy: retryStrategy,
})
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(jobStore, planners, nodeSelector, retryStrategy)
schedulerProvider := orchestrator.NewMappedSchedulerProvider(map[string]orchestrator.Scheduler{
models.JobTypeBatch: batchServiceJobScheduler,
models.JobTypeService: batchServiceJobScheduler,
models.JobTypeOps: scheduler.NewOpsJobScheduler(scheduler.OpsJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
}),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(scheduler.DaemonJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
}),
models.JobTypeOps: scheduler.NewOpsJobScheduler(jobStore, planners, nodeSelector),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(jobStore, planners, nodeSelector),
})

workers := make([]*orchestrator.Worker, 0, requesterConfig.WorkerCount)
Expand Down
4 changes: 2 additions & 2 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ type NodeSelector interface {
AllNodes(ctx context.Context) ([]models.NodeInfo, error)

// AllMatchingNodes returns all nodes that match the job constrains and selection criteria.
AllMatchingNodes(ctx context.Context, job *models.Job, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error)

// TopMatchingNodes return the top ranked desiredCount number of nodes that match job constraints
// ordered in descending order based on their rank, or error if not enough nodes match.
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error)
}

type RetryStrategy interface {
Expand Down
36 changes: 21 additions & 15 deletions pkg/orchestrator/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 7 additions & 13 deletions pkg/orchestrator/scheduler/batch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func (s *BatchJobSchedulerTestSuite) SetupTest() {
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)
s.retryStrategy = retry.NewFixedStrategy(retry.FixedStrategyParams{ShouldRetry: true})

s.scheduler = NewBatchServiceJobScheduler(BatchServiceJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
RetryStrategy: s.retryStrategy,
})
s.scheduler = NewBatchServiceJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
s.retryStrategy,
)
}

func TestBatchSchedulerTestSuite(t *testing.T) {
Expand Down Expand Up @@ -304,19 +304,13 @@ func (s *BatchJobSchedulerTestSuite) TestProcess_ShouldMarkJobAsFailed_NoRetry()
}

func (s *BatchJobSchedulerTestSuite) mockNodeSelection(job *models.Job, nodeInfos []models.NodeInfo, desiredCount int) {
constraints := &orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
}

if len(nodeInfos) < desiredCount {
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount, constraints).Return(nil, orchestrator.ErrNotEnoughNodes{})
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount).Return(nil, orchestrator.ErrNotEnoughNodes{})
} else {
s.nodeSelector.EXPECT().TopMatchingNodes(
gomock.Any(),
job,
desiredCount,
constraints,
).Return(nodeInfos, nil)
}
}
Expand Down
39 changes: 14 additions & 25 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@ import (
type BatchServiceJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
selector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
}

type BatchServiceJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
RetryStrategy orchestrator.RetryStrategy
}

func NewBatchServiceJobScheduler(params BatchServiceJobSchedulerParams) *BatchServiceJobScheduler {
func NewBatchServiceJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
strategy orchestrator.RetryStrategy,
) *BatchServiceJobScheduler {
return &BatchServiceJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
retryStrategy: params.RetryStrategy,
jobStore: store,
planner: planner,
selector: selector,
retryStrategy: strategy,
}
}

Expand Down Expand Up @@ -69,7 +67,7 @@ func (b *BatchServiceJobScheduler) Process(ctx context.Context, evaluation *mode
}

// Retrieve the info for all the nodes that have executions for this job
nodeInfos, err := existingNodeInfos(ctx, b.nodeSelector, nonTerminalExecs)
nodeInfos, err := existingNodeInfos(ctx, b.selector, nonTerminalExecs)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,16 +153,7 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
// placeExecs places the executions
func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error {
if len(execs) > 0 {
// TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used
selectedNodes, err := b.nodeSelector.TopMatchingNodes(
ctx,
job,
len(execs),
&orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
},
)
selectedNodes, err := b.selector.TopMatchingNodes(ctx, job, len(execs))
if err != nil {
return err
}
Expand All @@ -178,7 +167,7 @@ func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet
}

func (b *BatchServiceJobScheduler) handleFailure(nonTerminalExecs execSet, failed execSet, plan *models.Plan, err error) {
// TODO: allow scheduling retries in a later time if don't find nodes instead of failing the job
// TODO(walid): allow scheduling retries in a later time if don't find nodes instead of failing the job
// mark all non-terminal executions as failed
nonTerminalExecs.markStopped(plan.Event, plan)

Expand Down
25 changes: 9 additions & 16 deletions pkg/orchestrator/scheduler/daemon_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ type DaemonJobScheduler struct {
nodeSelector orchestrator.NodeSelector
}

type DaemonJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
}

func NewDaemonJobScheduler(params DaemonJobSchedulerParams) *DaemonJobScheduler {
func NewDaemonJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
) *DaemonJobScheduler {
return &DaemonJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
jobStore: store,
planner: planner,
nodeSelector: selector,
}
}

Expand Down Expand Up @@ -86,12 +84,7 @@ func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}

// Require approval when selecting nodes, but do not require them to be connected.
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job,
&orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: false},
)
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)
if err != nil {
return newExecs, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/orchestrator/scheduler/daemon_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func (s *DaemonJobSchedulerTestSuite) SetupTest() {
s.planner = orchestrator.NewMockPlanner(ctrl)
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)

s.scheduler = NewDaemonJobScheduler(DaemonJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
})
s.scheduler = NewDaemonJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
)
}

func TestDaemonJobSchedulerTestSuite(t *testing.T) {
Expand All @@ -53,7 +53,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldCreateNewExecutions() {
*fakeNodeInfo(s.T(), nodeIDs[1]),
*fakeNodeInfo(s.T(), nodeIDs[2]),
}
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand All @@ -78,7 +78,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsCompleted()
executions[1].ComputeState = models.NewExecutionState(models.ExecutionStateCompleted) // Simulate a completed execution
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand All @@ -101,7 +101,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldMarkLostExecutionsOnUnhe
*fakeNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsFailed() {
*fakeNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcessFail_NoMatchingNodes() {
executions := []models.Execution{} // no executions yet
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand Down
Loading

0 comments on commit 693c6d3

Please sign in to comment.