Skip to content

Commit

Permalink
refactor: selection constraints are an attribute of the NodeSelector
Browse files Browse the repository at this point in the history
- cleans up some of the setup
  • Loading branch information
frrist committed Apr 25, 2024
1 parent ce7e6c5 commit 3de4a9f
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 150 deletions.
41 changes: 12 additions & 29 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,24 @@ func NewRequesterNode(
retryStrategy = retryStrategyChain
}

// TODO(forrest): [refactor] the selector constraints ought to be a parameter to the node selector.
// node selector
nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{
NodeDiscoverer: nodeInfoStore,
NodeRanker: nodeRankerChain,
})
// selector constraints: require nodes be online and approved to schedule
selectorConstraints := orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
}
nodeSelector := selector.NewNodeSelector(
nodeInfoStore,
nodeRankerChain,
// selector constraints: require nodes be online and approved to schedule
orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
},
)

// scheduler provider
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(
jobStore,
planners,
nodeSelector,
retryStrategy,
selectorConstraints,
)
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(jobStore, planners, nodeSelector, retryStrategy)
schedulerProvider := orchestrator.NewMappedSchedulerProvider(map[string]orchestrator.Scheduler{
models.JobTypeBatch: batchServiceJobScheduler,
models.JobTypeService: batchServiceJobScheduler,
models.JobTypeOps: scheduler.NewOpsJobScheduler(
jobStore,
planners,
nodeSelector,
selectorConstraints,
),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(
jobStore,
planners,
nodeSelector,
selectorConstraints,
),
models.JobTypeOps: scheduler.NewOpsJobScheduler(jobStore, planners, nodeSelector),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(jobStore, planners, nodeSelector),
})

workers := make([]*orchestrator.Worker, 0, requesterConfig.WorkerCount)
Expand Down
4 changes: 2 additions & 2 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ type NodeSelector interface {
AllNodes(ctx context.Context) ([]models.NodeInfo, error)

// AllMatchingNodes returns all nodes that match the job constrains and selection criteria.
AllMatchingNodes(ctx context.Context, job *models.Job, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error)

// TopMatchingNodes return the top ranked desiredCount number of nodes that match job constraints
// ordered in descending order based on their rank, or error if not enough nodes match.
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error)
}

type RetryStrategy interface {
Expand Down
36 changes: 21 additions & 15 deletions pkg/orchestrator/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 1 addition & 11 deletions pkg/orchestrator/scheduler/batch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ func (s *BatchJobSchedulerTestSuite) SetupTest() {
s.planner,
s.nodeSelector,
s.retryStrategy,
orchestrator.NodeSelectionConstraints{
RequireConnected: false,
RequireApproval: false,
},
)
}

Expand Down Expand Up @@ -308,19 +304,13 @@ func (s *BatchJobSchedulerTestSuite) TestProcess_ShouldMarkJobAsFailed_NoRetry()
}

func (s *BatchJobSchedulerTestSuite) mockNodeSelection(job *models.Job, nodeInfos []models.NodeInfo, desiredCount int) {
constraints := &orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
}

if len(nodeInfos) < desiredCount {
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount, constraints).Return(nil, orchestrator.ErrNotEnoughNodes{})
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount).Return(nil, orchestrator.ErrNotEnoughNodes{})
} else {
s.nodeSelector.EXPECT().TopMatchingNodes(
gomock.Any(),
job,
desiredCount,
constraints,
).Return(nodeInfos, nil)
}
}
Expand Down
30 changes: 11 additions & 19 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,23 @@ import (
// - batch jobs that run until completion on N number of nodes
// - service jobs than run until stopped on N number of nodes
type BatchServiceJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
selectorConstraints orchestrator.NodeSelectionConstraints
jobStore jobstore.Store
planner orchestrator.Planner
selector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
}

func NewBatchServiceJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
strategy orchestrator.RetryStrategy,
constraints orchestrator.NodeSelectionConstraints,
) *BatchServiceJobScheduler {
return &BatchServiceJobScheduler{
jobStore: store,
planner: planner,
nodeSelector: selector,
retryStrategy: strategy,
selectorConstraints: constraints,
jobStore: store,
planner: planner,
selector: selector,
retryStrategy: strategy,
}
}

Expand Down Expand Up @@ -70,7 +67,7 @@ func (b *BatchServiceJobScheduler) Process(ctx context.Context, evaluation *mode
}

// Retrieve the info for all the nodes that have executions for this job
nodeInfos, err := existingNodeInfos(ctx, b.nodeSelector, nonTerminalExecs)
nodeInfos, err := existingNodeInfos(ctx, b.selector, nonTerminalExecs)
if err != nil {
return err
}
Expand Down Expand Up @@ -156,12 +153,7 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
// placeExecs places the executions
func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error {
if len(execs) > 0 {
selectedNodes, err := b.nodeSelector.TopMatchingNodes(
ctx,
job,
len(execs),
&b.selectorConstraints,
)
selectedNodes, err := b.selector.TopMatchingNodes(ctx, job, len(execs))
if err != nil {
return err
}
Expand All @@ -175,7 +167,7 @@ func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet
}

func (b *BatchServiceJobScheduler) handleFailure(nonTerminalExecs execSet, failed execSet, plan *models.Plan, err error) {
// TODO: allow scheduling retries in a later time if don't find nodes instead of failing the job
// TODO(walid): allow scheduling retries in a later time if don't find nodes instead of failing the job
// mark all non-terminal executions as failed
nonTerminalExecs.markStopped(plan.Event, plan)

Expand Down
10 changes: 1 addition & 9 deletions pkg/orchestrator/scheduler/daemon_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@ type DaemonJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
constraints orchestrator.NodeSelectionConstraints
}

func NewDaemonJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
constraints orchestrator.NodeSelectionConstraints,
) *DaemonJobScheduler {
return &DaemonJobScheduler{
jobStore: store,
planner: planner,
nodeSelector: selector,
constraints: constraints,
}
}

Expand Down Expand Up @@ -87,12 +84,7 @@ func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}

// Require nodes to be approved and connected to schedule work.
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job,
&orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: true},
)
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)
if err != nil {
return newExecs, err
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/orchestrator/scheduler/daemon_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ func (s *DaemonJobSchedulerTestSuite) SetupTest() {
s.jobStore,
s.planner,
s.nodeSelector,
orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
},
)
}

Expand All @@ -57,7 +53,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldCreateNewExecutions() {
*fakeNodeInfo(s.T(), nodeIDs[1]),
*fakeNodeInfo(s.T(), nodeIDs[2]),
}
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand All @@ -82,7 +78,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsCompleted()
executions[1].ComputeState = models.NewExecutionState(models.ExecutionStateCompleted) // Simulate a completed execution
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand All @@ -105,7 +101,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldMarkLostExecutionsOnUnhe
*fakeNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -134,7 +130,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsFailed() {
*fakeNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -170,7 +166,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcessFail_NoMatchingNodes() {
executions := []models.Execution{} // no executions yet
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand Down
24 changes: 8 additions & 16 deletions pkg/orchestrator/scheduler/ops_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@ import (

// OpsJobScheduler is a scheduler for batch jobs that run until completion
type OpsJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
selectorConstraints orchestrator.NodeSelectionConstraints
jobStore jobstore.Store
planner orchestrator.Planner
selector orchestrator.NodeSelector
}

func NewOpsJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
constraints orchestrator.NodeSelectionConstraints,
) *OpsJobScheduler {
return &OpsJobScheduler{
jobStore: store,
planner: planner,
nodeSelector: selector,
selectorConstraints: constraints,
jobStore: store,
planner: planner,
selector: selector,
}
}

Expand Down Expand Up @@ -65,7 +62,7 @@ func (b *OpsJobScheduler) Process(ctx context.Context, evaluation *models.Evalua
}

// Retrieve the info for all the nodes that have executions for this job
nodeInfos, err := existingNodeInfos(ctx, b.nodeSelector, nonTerminalExecs)
nodeInfos, err := existingNodeInfos(ctx, b.selector, nonTerminalExecs)
if err != nil {
return err
}
Expand Down Expand Up @@ -104,12 +101,7 @@ func (b *OpsJobScheduler) Process(ctx context.Context, evaluation *models.Evalua
func (b *OpsJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan) (execSet, error) {
newExecs := execSet{}
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job, &orchestrator.NodeSelectionConstraints{
RequireApproval: true,
RequireConnected: true,
})
nodes, err := b.selector.AllMatchingNodes(ctx, job)
if err != nil {
return newExecs, err
}
Expand Down
Loading

0 comments on commit 3de4a9f

Please sign in to comment.