Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally constrain to connected, approved compute nodes when selecting/ranking them #3768

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/compute/store/boltdb"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
boltjobstore "github.com/bacalhau-project/bacalhau/pkg/jobstore/boltdb"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
pkgerrors "github.com/pkg/errors"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -91,7 +92,8 @@ func GetRequesterConfig(ctx context.Context, createJobStore bool) (node.Requeste
return node.RequesterConfig{}, pkgerrors.Wrapf(err, "failed to create job store")
}
}
return node.NewRequesterConfigWith(node.RequesterConfigParams{

requesterConfig, err := node.NewRequesterConfigWith(node.RequesterConfigParams{
JobDefaults: transformer.JobDefaults{
ExecutionTimeout: time.Duration(cfg.JobDefaults.ExecutionTimeout),
},
Expand Down Expand Up @@ -120,6 +122,17 @@ func GetRequesterConfig(ctx context.Context, createJobStore bool) (node.Requeste
JobStore: jobStore,
DefaultPublisher: cfg.DefaultPublisher,
})
if err != nil {
return node.RequesterConfig{}, err
}

if cfg.ManualNodeApproval {
requesterConfig.DefaultApprovalState = models.NodeApprovals.PENDING
} else {
requesterConfig.DefaultApprovalState = models.NodeApprovals.APPROVED
}

return requesterConfig, nil
}

func getNodeType() (requester, compute bool, err error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/types/generated_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ const NodeRequesterControlPlaneSettings = "Node.Requester.ControlPlaneSettings"
const NodeRequesterControlPlaneSettingsHeartbeatCheckFrequency = "Node.Requester.ControlPlaneSettings.HeartbeatCheckFrequency"
const NodeRequesterControlPlaneSettingsHeartbeatTopic = "Node.Requester.ControlPlaneSettings.HeartbeatTopic"
const NodeRequesterControlPlaneSettingsNodeDisconnectedAfter = "Node.Requester.ControlPlaneSettings.NodeDisconnectedAfter"
const NodeRequesterManualNodeApproval = "Node.Requester.ManualNodeApproval"
const NodeBootstrapAddresses = "Node.BootstrapAddresses"
const NodeDownloadURLRequestRetries = "Node.DownloadURLRequestRetries"
const NodeDownloadURLRequestTimeout = "Node.DownloadURLRequestTimeout"
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types/generated_viper_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func SetDefaults(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.SetDefault(NodeRequesterControlPlaneSettingsHeartbeatCheckFrequency, cfg.Node.Requester.ControlPlaneSettings.HeartbeatCheckFrequency.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterControlPlaneSettingsHeartbeatTopic, cfg.Node.Requester.ControlPlaneSettings.HeartbeatTopic)
p.Viper.SetDefault(NodeRequesterControlPlaneSettingsNodeDisconnectedAfter, cfg.Node.Requester.ControlPlaneSettings.NodeDisconnectedAfter.AsTimeDuration())
p.Viper.SetDefault(NodeRequesterManualNodeApproval, cfg.Node.Requester.ManualNodeApproval)
p.Viper.SetDefault(NodeBootstrapAddresses, cfg.Node.BootstrapAddresses)
p.Viper.SetDefault(NodeDownloadURLRequestRetries, cfg.Node.DownloadURLRequestRetries)
p.Viper.SetDefault(NodeDownloadURLRequestTimeout, cfg.Node.DownloadURLRequestTimeout.AsTimeDuration())
Expand Down Expand Up @@ -361,6 +362,7 @@ func Set(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.Set(NodeRequesterControlPlaneSettingsHeartbeatCheckFrequency, cfg.Node.Requester.ControlPlaneSettings.HeartbeatCheckFrequency.AsTimeDuration())
p.Viper.Set(NodeRequesterControlPlaneSettingsHeartbeatTopic, cfg.Node.Requester.ControlPlaneSettings.HeartbeatTopic)
p.Viper.Set(NodeRequesterControlPlaneSettingsNodeDisconnectedAfter, cfg.Node.Requester.ControlPlaneSettings.NodeDisconnectedAfter.AsTimeDuration())
p.Viper.Set(NodeRequesterManualNodeApproval, cfg.Node.Requester.ManualNodeApproval)
p.Viper.Set(NodeBootstrapAddresses, cfg.Node.BootstrapAddresses)
p.Viper.Set(NodeDownloadURLRequestRetries, cfg.Node.DownloadURLRequestRetries)
p.Viper.Set(NodeDownloadURLRequestTimeout, cfg.Node.DownloadURLRequestTimeout.AsTimeDuration())
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/types/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type RequesterConfig struct {
DefaultPublisher string `yaml:"DefaultPublisher"`

ControlPlaneSettings RequesterControlPlaneConfig `yaml:"ControlPlaneSettings"`

// ManualNodeApproval is a flag that determines if nodes should be manually approved or not.
// By default, nodes are auto-approved to simplify upgrades, by setting this property to
// true, nodes will need to be manually approved before they are included in node selection.
ManualNodeApproval bool `yaml:"ManualNodeApproval"`
}

type EvaluationBrokerConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/types/storagetype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func Setup(
}
}

// Set the default approval state from the config provided, either PENDING if the user has
// chosen manual approval, or the default otherwise.
nodeConfig.RequesterNodeConfig.DefaultApprovalState = stackConfig.RequesterConfig.DefaultApprovalState

// Create dedicated store paths for each node
err = setStorePaths(ctx, fsRepo, &nodeConfig)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/devstack/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/rs/zerolog"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/node"
"github.com/bacalhau-project/bacalhau/pkg/routing"
)
Expand Down Expand Up @@ -121,6 +122,12 @@ func (o *DevStackConfig) Validate() error {
return errs
}

func WithAutoNodeApproval() ConfigOption {
return func(cfg *DevStackConfig) {
cfg.RequesterConfig.DefaultApprovalState = models.NodeApprovals.APPROVED
}
}

func WithNodeOverrides(overrides ...node.NodeConfig) ConfigOption {
return func(cfg *DevStackConfig) {
cfg.NodeOverrides = overrides
Expand Down
4 changes: 4 additions & 0 deletions pkg/node/config_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ var DefaultRequesterConfig = RequesterConfigParams{
HeartbeatTopic: "heartbeat",
NodeDisconnectedAfter: types.Duration(30 * time.Second), //nolint:gomnd
},

DefaultApprovalState: models.NodeApprovals.APPROVED,
}

var TestRequesterConfig = RequesterConfigParams{
Expand Down Expand Up @@ -108,6 +110,8 @@ var TestRequesterConfig = RequesterConfigParams{
HeartbeatTopic: "heartbeat",
NodeDisconnectedAfter: types.Duration(30 * time.Second), //nolint:gomnd
},

DefaultApprovalState: models.NodeApprovals.APPROVED,
}

func getRequesterConfigParams() RequesterConfigParams {
Expand Down
5 changes: 5 additions & 0 deletions pkg/node/config_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ type RequesterConfigParams struct {

DefaultPublisher string

// When new nodes join the cluster, what state do they have? By default, APPROVED, and
// for tests, APPROVED. We will provide an option to set this to PENDING for production
// or for when operators are ready to control node approval.
DefaultApprovalState models.NodeApproval

ControlPlaneSettings types.RequesterControlPlaneConfig
}

Expand Down
23 changes: 12 additions & 11 deletions pkg/node/manager/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,26 @@ const (
// also provides operations for querying and managing compute
// node information.
type NodeManager struct {
nodeInfo routing.NodeInfoStore
resourceMap *concurrency.StripedMap[models.Resources]
heartbeats *heartbeat.HeartbeatServer
nodeInfo routing.NodeInfoStore
resourceMap *concurrency.StripedMap[models.Resources]
heartbeats *heartbeat.HeartbeatServer
defaultApprovalState models.NodeApproval
}

type NodeManagerParams struct {
NodeInfo routing.NodeInfoStore
Heartbeats *heartbeat.HeartbeatServer
NodeInfo routing.NodeInfoStore
Heartbeats *heartbeat.HeartbeatServer
DefaultApprovalState models.NodeApproval
}

// NewNodeManager constructs a new node manager and returns a pointer
// to the structure.
func NewNodeManager(params NodeManagerParams) *NodeManager {
return &NodeManager{
resourceMap: concurrency.NewStripedMap[models.Resources](resourceMapLockCount),
nodeInfo: params.NodeInfo,
heartbeats: params.Heartbeats,
resourceMap: concurrency.NewStripedMap[models.Resources](resourceMapLockCount),
nodeInfo: params.NodeInfo,
heartbeats: params.Heartbeats,
defaultApprovalState: params.DefaultApprovalState,
}
}

Expand Down Expand Up @@ -83,9 +86,7 @@ func (n *NodeManager) Register(ctx context.Context, request requests.RegisterReq
}, nil
}

// TODO: We will default to PENDING, but once we start filtering on NodeApprovals.APPROVED we will need to
// make a decision on how this is determined.
request.Info.Approval = models.NodeApprovals.PENDING
request.Info.Approval = n.defaultApprovalState

if err := n.nodeInfo.Add(ctx, request.Info); err != nil {
return nil, errors.Wrap(err, "failed to save nodeinfo during node registration")
Expand Down
5 changes: 3 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ func NewNode(
// to the network. Provide it with a mechanism to lookup (and enhance)
// node info, and a reference to the heartbeat server if running NATS.
nodeManager := manager.NewNodeManager(manager.NodeManagerParams{
NodeInfo: tracingInfoStore,
Heartbeats: heartbeatSvr,
NodeInfo: tracingInfoStore,
Heartbeats: heartbeatSvr,
DefaultApprovalState: config.RequesterNodeConfig.DefaultApprovalState,
})

// Start the nodemanager, ensuring it doesn't block the main thread and
Expand Down
4 changes: 4 additions & 0 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func NewRequesterNode(
}),
)

log.Ctx(ctx).
Info().
Msgf("Nodes joining the cluster will be assigned approval state: %s", requesterConfig.DefaultApprovalState.String())

// compute node ranker
nodeRankerChain := ranking.NewChain()
nodeRankerChain.Add(
Expand Down
6 changes: 4 additions & 2 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ type NodeRanker interface {
type NodeSelector interface {
// AllNodes returns all nodes in the network.
AllNodes(ctx context.Context) ([]models.NodeInfo, error)

// AllMatchingNodes returns all nodes that match the job constrains and selection criteria.
AllMatchingNodes(ctx context.Context, job *models.Job) ([]models.NodeInfo, error)
AllMatchingNodes(ctx context.Context, job *models.Job, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)

// TopMatchingNodes return the top ranked desiredCount number of nodes that match job constraints
// ordered in descending order based on their rank, or error if not enough nodes match.
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int) ([]models.NodeInfo, error)
TopMatchingNodes(ctx context.Context, job *models.Job, desiredCount int, constraints *NodeSelectionConstraints) ([]models.NodeInfo, error)
}

type RetryStrategy interface {
Expand Down
16 changes: 8 additions & 8 deletions pkg/orchestrator/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions pkg/orchestrator/scheduler/batch_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,20 @@ func (s *BatchJobSchedulerTestSuite) TestProcess_ShouldMarkJobAsFailed_NoRetry()
}

func (s *BatchJobSchedulerTestSuite) mockNodeSelection(job *models.Job, nodeInfos []models.NodeInfo, desiredCount int) {
constraints := &orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
}

if len(nodeInfos) < desiredCount {
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount).Return(nil, orchestrator.ErrNotEnoughNodes{})
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount, constraints).Return(nil, orchestrator.ErrNotEnoughNodes{})
} else {
s.nodeSelector.EXPECT().TopMatchingNodes(gomock.Any(), job, desiredCount).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().TopMatchingNodes(
gomock.Any(),
job,
desiredCount,
constraints,
).Return(nodeInfos, nil)
}
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,16 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
// placeExecs places the executions
func (b *BatchServiceJobScheduler) placeExecs(ctx context.Context, execs execSet, job *models.Job) error {
if len(execs) > 0 {
selectedNodes, err := b.nodeSelector.TopMatchingNodes(ctx, job, len(execs))
// TODO: Remove the options once we are ready to enforce that only connected/approved nodes can be used
selectedNodes, err := b.nodeSelector.TopMatchingNodes(
ctx,
job,
len(execs),
&orchestrator.NodeSelectionConstraints{
RequireApproval: false,
RequireConnected: false,
},
)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/orchestrator/scheduler/daemon_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ func (b *DaemonJobScheduler) Process(ctx context.Context, evaluation *models.Eva
func (b *DaemonJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan, existingExecs execSet) (execSet, error) {
newExecs := execSet{}
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)

// Require approval when selecting nodes, but do not require them to be connected.
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job,
&orchestrator.NodeSelectionConstraints{RequireApproval: true, RequireConnected: false},
)
if err != nil {
return newExecs, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/orchestrator/scheduler/daemon_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldCreateNewExecutions() {
*mockNodeInfo(s.T(), nodeIDs[1]),
*mockNodeInfo(s.T(), nodeIDs[2]),
}
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand All @@ -77,7 +77,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsCompleted()
executions[1].ComputeState = models.NewExecutionState(models.ExecutionStateCompleted) // Simulate a completed execution
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), gomock.Any(), gomock.Any()).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand All @@ -100,7 +100,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldMarkLostExecutionsOnUnhe
*mockNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcess_ShouldNOTMarkJobAsFailed() {
*mockNodeInfo(s.T(), executions[1].NodeID),
}
s.nodeSelector.EXPECT().AllNodes(gomock.Any()).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)

matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: evaluation,
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *DaemonJobSchedulerTestSuite) TestProcessFail_NoMatchingNodes() {
executions := []models.Execution{} // no executions yet
s.jobStore.EXPECT().GetJob(gomock.Any(), job.ID).Return(*job, nil)
s.jobStore.EXPECT().GetExecutions(gomock.Any(), jobstore.GetExecutionsOptions{JobID: job.ID}).Return(executions, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return([]models.NodeInfo{}, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return([]models.NodeInfo{}, nil)

// Noop plan
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Expand Down
7 changes: 6 additions & 1 deletion pkg/orchestrator/scheduler/ops_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ func (b *OpsJobScheduler) Process(ctx context.Context, evaluation *models.Evalua
func (b *OpsJobScheduler) createMissingExecs(
ctx context.Context, job *models.Job, plan *models.Plan) (execSet, error) {
newExecs := execSet{}
nodes, err := b.nodeSelector.AllMatchingNodes(ctx, job)
nodes, err := b.nodeSelector.AllMatchingNodes(
ctx,
job, &orchestrator.NodeSelectionConstraints{
RequireApproval: true,
RequireConnected: false,
})
if err != nil {
return newExecs, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/scheduler/ops_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *OpsJobSchedulerTestSuite) TestProcessFail_NoMatchingNodes() {
}

func (s *OpsJobSchedulerTestSuite) mockNodeSelection(job *models.Job, nodeInfos []models.NodeInfo) {
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job).Return(nodeInfos, nil)
s.nodeSelector.EXPECT().AllMatchingNodes(gomock.Any(), job, gomock.Any()).Return(nodeInfos, nil)
}

func mockOpsJob() (*models.Job, []models.Execution, *models.Evaluation) {
Expand Down
Loading
Loading