Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: require connected and approved nodes for scheduling #3957

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 14 additions & 22 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ func NewRequesterNode(
}),
)

// node selector
nodeSelector := selector.NewNodeSelector(selector.NodeSelectorParams{
NodeDiscoverer: nodeInfoStore,
NodeRanker: nodeRankerChain,
})

// evaluation broker
evalBroker, err := evaluation.NewInMemoryBroker(evaluation.InMemoryBrokerParams{
VisibilityTimeout: requesterConfig.EvalBrokerVisibilityTimeout,
Expand Down Expand Up @@ -151,26 +145,24 @@ func NewRequesterNode(
retryStrategy = retryStrategyChain
}

// node selector
nodeSelector := selector.NewNodeSelector(
nodeInfoStore,
nodeRankerChain,
// selector constraints: require nodes be online and approved to schedule
orchestrator.NodeSelectionConstraints{
RequireConnected: true,
RequireApproval: true,
},
)

// scheduler provider
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(scheduler.BatchServiceJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
RetryStrategy: retryStrategy,
})
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(jobStore, planners, nodeSelector, retryStrategy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping params in the constructor helps maintain code stability and makes the overall code more maintainable. This ensures that constructor signatures and tests do not require changes when new dependencies are introduced. This also makes the code more readable even if we fail to provide clear field names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally get where you are coming from - maintainable code is great! But please hear me out here:

Keeping params in the constructor helps maintain code stability and makes the overall code more maintainable.

I'd argue it does the opposite: the params pattern doesn't provide compile-time safety. Additionally, whenever a new dependency is introduced, we need to modify the the constructor, the call sites, and the params type.

Comparing this with declaring dependencies in the function signature: It provides compile-time safety, and reduces the number of modifications we need to make when introducing new dependencies. We only need to update the constructor and the call sites. (And the compiler will complain very loudly in places we don't, instead of panicking later when the application runs - this is what soured my additional refactor here.)

This ensures that constructor signatures and tests do not require changes when new dependencies are introduced

I don't follow. Constructors and tests still need to change when new dependencies are introduce - all fields are required and therefore must be provided.

This also makes the code more readable even if we fail to provide clear field names

I'll acknowledge we have a different in opinions here. Personally I want to know the type of something rather than its name in a structure.


Given the points regarding of compile time safety and reduced maintenance overhead, would you be alright moving forward with this code as is?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having methods or constructors with long parameters is by standard a bad code design in the industry. First time I hear a discussion favouring long parameters :)

I hear your point about wanting to fail early at compile-time. It is all about tradeoffs. An example, you have picked fx as our dependency injection framework, and that will move a lot of compile-time failures to runtime. You have prioritized code readability, maintainability and testability in favour of compile-time checks, and that is okay as the gains are worth the cost.

Cons of long params include:

  1. Increased error potential as more params increase the likelihood of passing arguments in the wrong order
  2. Poor readability as methods with many params are harder to read and understand at a glance, without having to open the method implementation to understand what param does. The IDEs might help, but not always
  3. Optional fields still need to be passed with their zero values
  4. Harder to maintain as adding or modifying parameters requires changes in all your method calls. This is where you have a different opinion that you prefer the compiler to fail and force the developer to update all calls. Yes, initialization oversight is a true problem with using a single param object. Though in practice, it still makes the code more maintainable for the follow reasons:
    1. Compile time are not sufficient to validate the correct initialization of types. They are great to fail fast, but we shouldn't assume if the compiler is not complaining that we don't need to do runtime validation or that we won't get NPE. This is an example where param object can help with runtime validation. I am still saying that failing at compile-time is great, but it is not the only criteria we should evaluate tradeoffs at. Runtime validation during node startup can be as good.
    2. Usually constructors/methods are called once or very few times in production code, but multiple times in tests to test different branches and combinations. In practice, and with good tests, I see the parameter object is initialized once with default testing values, and then different tests override certain values that they want to test. This makes tests more maintainable and readable compared to having to modify tens of method signatures and that is before even adding any new tests
    3. A lot of fields and configurations are exposed in the constructor mainly for testing purposes with fallback default values if not configured. e.g. allow to inject a clock to have more predictable tests. Having a param object enables this without complicating the type initialization

A bonus is fx seem to work well with param objects out of the box https://uber-go.github.io/fx/parameter-objects.html#using-parameter-objects

Overall I feel silly blocking the PR because of this code style, specially that the number of parameters (4) is still manageable. My concerns are I feel you are planning to change this code design in more places, I expect the scheduler to have more params soon as I work on the queueing, and our code is still in its early stages and more parameters might be added in places other than the scheduler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is certainly a limit to consider when declaring function parameters, and when encapsulating them starts to make sense. I completely agree that this isn't a black and white decision - nuance is required depending on the specific context.

A lot of fields and configurations are exposed in the constructor mainly for testing purposes with fallback default values if not configured

In this case I'd prefer to use functional options (we do a good job with this in some places already) as it makes it clearer when parameters are optional vs required. It's also easy to extend later with more options.

My concerns are I feel you are planning to change this code design in more places

I'll probably make a change like this again in the future if it makes sense while refactoring code. But I don't intend to remove param arguments everywhere! In this specific case 4 felt manageable enough to make the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it seems we are in agreement about 4 arguments being manageable here, can this get a ✅? 🙏

schedulerProvider := orchestrator.NewMappedSchedulerProvider(map[string]orchestrator.Scheduler{
models.JobTypeBatch: batchServiceJobScheduler,
models.JobTypeService: batchServiceJobScheduler,
models.JobTypeOps: scheduler.NewOpsJobScheduler(scheduler.OpsJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
}),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(scheduler.DaemonJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
}),
models.JobTypeOps: scheduler.NewOpsJobScheduler(jobStore, planners, nodeSelector),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(jobStore, planners, nodeSelector),
})

workers := make([]*orchestrator.Worker, 0, requesterConfig.WorkerCount)
Expand Down
4 changes: 2 additions & 2 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ type NodeSelector interface {
AllNodes(ctx context.Context) ([]models.NodeInfo, error)

// AllMatchingNodes returns all nodes that match the job constrains and selection criteria.
AllMatchingNodes(ctx context.Context, job *models.Job, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error)

// TopMatchingNodes return the top ranked desiredCount number of nodes that match job constraints
// ordered in descending order based on their rank, or error if not enough nodes match.
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error)
}

type RetryStrategy interface {
Expand Down
36 changes: 21 additions & 15 deletions pkg/orchestrator/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 7 additions & 13 deletions pkg/orchestrator/scheduler/batch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func (s *BatchJobSchedulerTestSuite) SetupTest() {
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)
s.retryStrategy = retry.NewFixedStrategy(retry.FixedStrategyParams{ShouldRetry: true})

s.scheduler = NewBatchServiceJobScheduler(BatchServiceJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
RetryStrategy: s.retryStrategy,
})
s.scheduler = NewBatchServiceJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
s.retryStrategy,
)
}

func TestBatchSchedulerTestSuite(t *testing.T) {
Expand Down Expand Up @@ -304,19 +304,13 @@ func (s *BatchJobSchedulerTestSuite) TestProcess_ShouldMarkJobAsFailed_NoRetry()
}

func (s *BatchJobSchedulerTestSuite) mockNodeSelection(job *models.Job, nodeInfos []models.NodeInfo, desiredCount int) {
constraints := &orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
}

if len(nodeInfos) < desiredCount {
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount, constraints).Return(nil, orchestrator.ErrNotEnoughNodes{})
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount).Return(nil, orchestrator.ErrNotEnoughNodes{})
} else {
s.nodeSelector.EXPECT().TopMatchingNodes(
gomock.Any(),
job,
desiredCount,
constraints,
).Return(nodeInfos, nil)
}
}
Expand Down
39 changes: 14 additions & 25 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@ import (
type BatchServiceJobScheduler struct {
jobStore jobstore.Store
planner orchestrator.Planner
nodeSelector orchestrator.NodeSelector
selector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
}

type BatchServiceJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
RetryStrategy orchestrator.RetryStrategy
}

func NewBatchServiceJobScheduler(params BatchServiceJobSchedulerParams) *BatchServiceJobScheduler {
func NewBatchServiceJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
strategy orchestrator.RetryStrategy,
) *BatchServiceJobScheduler {
return &BatchServiceJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
retryStrategy: params.RetryStrategy,
jobStore: store,
planner: planner,
selector: selector,
retryStrategy: strategy,
}
}

Expand Down Expand Up @@ -69,7 +67,7 @@ func (b *BatchServiceJobScheduler) Process(ctx context.Context, evaluation *mode
}

// Retrieve the info for all the nodes that have executions for this job
nodeInfos, err := existingNodeInfos(ctx, b.nodeSelector, nonTerminalExecs)
nodeInfos, err := existingNodeInfos(ctx, b.selector, nonTerminalExecs)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,16 +153,7 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
// placeExecs places the executions
func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error {
if len(execs) > 0 {
// TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used
selectedNodes, err := b.nodeSelector.TopMatchingNodes(
ctx,
job,
len(execs),
&orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
},
)
selectedNodes, err := b.selector.TopMatchingNodes(ctx, job, len(execs))
if err != nil {
return err
}
Expand All @@ -178,7 +167,7 @@ func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet
}

func (b *BatchServiceJobScheduler) handleFailure(nonTerminalExecs execSet, failed execSet, plan *models.Plan, err error) {
// TODO: allow scheduling retries in a later time if don't find nodes instead of failing the job
// TODO(walid): allow scheduling retries in a later time if don't find nodes instead of failing the job
// mark all non-terminal executions as failed
nonTerminalExecs.markStopped(plan.Event, plan)

Expand Down
25 changes: 9 additions & 16 deletions pkg/orchestrator/scheduler/daemon_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ type DaemonJobScheduler struct {
nodeSelector orchestrator.NodeSelector
}

type DaemonJobSchedulerParams struct {
JobStore jobstore.Store
Planner orchestrator.Planner
NodeSelector orchestrator.NodeSelector
}

func NewDaemonJobScheduler(params DaemonJobSchedulerParams) *DaemonJobScheduler {
func NewDaemonJobScheduler(
store jobstore.Store,
planner orchestrator.Planner,
selector orchestrator.NodeSelector,
) *DaemonJobScheduler {
return &DaemonJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
nodeSelector: params.NodeSelector,
jobStore: store,
planner: planner,
nodeSelector: selector,
}
}

Expand Down Expand Up @@ -86,12 +84,7 @@ func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}

// Require approval when selecting nodes, but do not require them to be connected.
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job,
&orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: false},
)
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)
if err != nil {
return newExecs, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/orchestrator/scheduler/daemon_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func (s *DaemonJobSchedulerTestSuite) SetupTest() {
s.planner = orchestrator.NewMockPlanner(ctrl)
s.nodeSelector = orchestrator.NewMockNodeSelector(ctrl)

s.scheduler = NewDaemonJobScheduler(DaemonJobSchedulerParams{
JobStore: s.jobStore,
Planner: s.planner,
NodeSelector: s.nodeSelector,
})
s.scheduler = NewDaemonJobScheduler(
s.jobStore,
s.planner,
s.nodeSelector,
)
}

func TestDaemonJobSchedulerTestSuite(t *testing.T) {
Expand All @@ -53,7 +53,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldCreateNewExecutions() {
*fakeNodeInfo(s.T(), nodeIDs[1]),
*fakeNodeInfo(s.T(), nodeIDs[2]),
}
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand All @@ -78,7 +78,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsCompleted()
executions[1].ComputeState = models.NewExecutionState(models.ExecutionStateCompleted) // Simulate a completed execution
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand All @@ -101,7 +101,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldMarkLostExecutionsOnUnhe
*fakeNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsFailed() {
*fakeNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcessFail_NoMatchingNodes() {
executions := []models.Execution{} // no executions yet
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand Down
Loading
Loading