Skip to content

Commit

Permalink
Simplify labels for compute node (#3637)
Browse files Browse the repository at this point in the history
Provides compute node with the configured labels and lets it manage its
own labels. Simplifies the label management in the Node{} and ensure
that both node types get Runtime labels and configured labels, with
Compute nodes having the usual extra labels (e.g. GPU)

At the same time (and as it affects the node list output) we default
libp2p nodes to APPROVED to avoid confusion.

Fixes #3630
  • Loading branch information
rossjones committed Mar 14, 2024
1 parent d426325 commit 50d6afa
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 22 deletions.
2 changes: 2 additions & 0 deletions pkg/node/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewComputeNode(
publishers publisher.PublisherProvider,
computeCallback compute.Callback,
managementProxy compute.ManagementEndpoint,
configuredLabels map[string]string,
) (*Compute, error) {
executionStore := config.ExecutionStore

Expand Down Expand Up @@ -227,6 +228,7 @@ func NewComputeNode(

// Node labels
labelsProvider := models.MergeLabelsInOrder(
&ConfigLabelsProvider{staticLabels: configuredLabels},
&RuntimeLabelsProvider{},
capacity.NewGPULabelsProvider(config.TotalResourceLimits),
repo_storage.NewLabelsProvider(),
Expand Down
19 changes: 11 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ func NewNode(

var requesterNode *Requester
var computeNode *Compute
var labelsProvider models.LabelsProvider = &ConfigLabelsProvider{staticLabels: config.Labels}
labelsProvider := models.MergeLabelsInOrder(
&ConfigLabelsProvider{staticLabels: config.Labels},
&RuntimeLabelsProvider{},
)

// setup requester node
if config.IsRequesterNode {
Expand Down Expand Up @@ -353,6 +356,7 @@ func NewNode(
publishers,
transportLayer.CallbackProxy(),
transportLayer.ManagementProxy(),
config.Labels,
)
if err != nil {
return nil, err
Expand All @@ -363,17 +367,16 @@ func NewNode(
return nil, err
}

labelsProvider = models.MergeLabelsInOrder(
computeNode.autoLabelsProvider,
labelsProvider,
)
debugInfoProviders = append(debugInfoProviders, computeNode.debugInfoProviders...)
}

// Create a node info provider for LibP2P, and specify the default node approval state
// of Approved to avoid confusion as approval state is not used for this transport type.
nodeInfoProvider := routing.NewNodeInfoProvider(routing.NodeInfoProviderParams{
NodeID: config.NodeID,
LabelsProvider: labelsProvider,
BacalhauVersion: *version.Get(),
NodeID: config.NodeID,
LabelsProvider: labelsProvider,
BacalhauVersion: *version.Get(),
DefaultNodeApproval: models.NodeApprovals.APPROVED,
})
nodeInfoProvider.RegisterNodeInfoDecorator(transportLayer.NodeInfoDecorator())
if computeNode != nil {
Expand Down
41 changes: 28 additions & 13 deletions pkg/routing/node_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@ import (
)

type NodeInfoProviderParams struct {
NodeID string
LabelsProvider models.LabelsProvider
BacalhauVersion models.BuildVersionInfo
NodeID string
LabelsProvider models.LabelsProvider
BacalhauVersion models.BuildVersionInfo
DefaultNodeApproval models.NodeApproval
}

type NodeInfoProvider struct {
nodeID string
labelsProvider models.LabelsProvider
bacalhauVersion models.BuildVersionInfo
nodeInfoDecorators []models.NodeInfoDecorator
nodeID string
labelsProvider models.LabelsProvider
bacalhauVersion models.BuildVersionInfo
nodeInfoDecorators []models.NodeInfoDecorator
defaultNodeApproval models.NodeApproval
}

func NewNodeInfoProvider(params NodeInfoProviderParams) *NodeInfoProvider {
return &NodeInfoProvider{
nodeID: params.NodeID,
labelsProvider: params.LabelsProvider,
bacalhauVersion: params.BacalhauVersion,
nodeInfoDecorators: make([]models.NodeInfoDecorator, 0),
provider := &NodeInfoProvider{
nodeID: params.NodeID,
labelsProvider: params.LabelsProvider,
bacalhauVersion: params.BacalhauVersion,
nodeInfoDecorators: make([]models.NodeInfoDecorator, 0),
defaultNodeApproval: params.DefaultNodeApproval,
}

// If we were not given a default approval, we default to PENDING
if !provider.defaultNodeApproval.IsValid() {
provider.defaultNodeApproval = models.NodeApprovals.PENDING
}

return provider
}

// RegisterNodeInfoDecorator registers a node info decorator with the node info provider.
Expand All @@ -39,11 +49,16 @@ func (n *NodeInfoProvider) GetNodeInfo(ctx context.Context) models.NodeInfo {
BacalhauVersion: n.bacalhauVersion,
Labels: n.labelsProvider.GetLabels(ctx),
NodeType: models.NodeTypeRequester,
Approval: models.NodeApprovals.PENDING,
Approval: n.defaultNodeApproval,
}
for _, decorator := range n.nodeInfoDecorators {
res = decorator.DecorateNodeInfo(ctx, res)
}

if !res.Approval.IsValid() {
res.Approval = models.NodeApprovals.PENDING
}

return res
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/test/compute/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func (s *ComputeSuite) setupNode() {
provider.NewNoopProvider[executor.Executor](s.executor),
provider.NewNoopProvider[publisher.Publisher](s.publisher),
callback,
nil, // until we switch to testing with NATS
nil, // until we switch to testing with NATS
map[string]string{}, // empty configured labels
)
s.NoError(err)
s.stateResolver = *resolver.NewStateResolver(resolver.StateResolverParams{
Expand Down

0 comments on commit 50d6afa

Please sign in to comment.