-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compute nodes no longer reject bids based on queued capacity #4002
Conversation
Important Auto Review SkippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Consider addressing a few of the comments.
log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)). | ||
Err(err). | ||
Str("Job", request.Job.ID). | ||
Str("Strategy", strategyType). | ||
Bool("Bid", resp.ShouldBid). | ||
Bool("Wait", resp.ShouldWait). | ||
Str("Reason", resp.Reason). | ||
Send() | ||
if err != nil || !resp.ShouldBid { | ||
log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)). | ||
Err(err). | ||
Str("Job", request.Job.ID). | ||
Str("Strategy", strategyType). | ||
Bool("Bid", resp.ShouldBid). | ||
Bool("Wait", resp.ShouldWait). | ||
Str("Reason", resp.Reason). | ||
Send() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this need to change? Previously it logged a debug log with no error on the success case, and an error log with the error on the failure case. Now we only log on failures or rejection. I valued the debug logs for the success case, so I'd prefer to leave this section of the code unmodified. Up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logs were too noisy. I've changed to only log if the bid was rejected or if an error was raised. Happy cases such as when bids were accepted were too many and too noisy when trying to troubleshoot an issue
log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)). | ||
Err(err). | ||
Str("Job", request.Job.ID). | ||
Str("Strategy", strategyType). | ||
Bool("Bid", resp.ShouldBid). | ||
Bool("Wait", resp.ShouldWait). | ||
Str("Reason", resp.Reason). | ||
Send() | ||
if err != nil || !resp.ShouldBid { | ||
log.Ctx(ctx).WithLevel(logger.ErrOrDebug(err)). | ||
Err(err). | ||
Str("Job", request.Job.ID). | ||
Str("Strategy", strategyType). | ||
Bool("Bid", resp.ShouldBid). | ||
Bool("Wait", resp.ShouldWait). | ||
Str("Reason", resp.Reason). | ||
Send() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment to above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -154,11 +159,14 @@ func (n *NodeManager) UpdateResources(ctx context.Context, | |||
return &requests.UpdateResourcesResponse{}, nil | |||
} | |||
|
|||
log.Ctx(ctx).Debug().Msg("updating resources availability for node") | |||
log.Ctx(ctx).Debug().Msgf("updating node resources availability: %+v", request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could extract the individual fields (nodeID and Resources) of the request to make this more readable, I think both implement stringer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also interested in the queued capacity reported in the request
gpuScale = 1 // No scaling needed for GPU as it's already in units | ||
cpuScale = 1 // No scaling needed for CPU as it's already in cores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't appear to be necessary, are they?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for consistency with other scales, and to make it clear how we are handling gpu and cpu units
// Calculate weighted capacities for each node and determine the maximum values | ||
for _, node := range nodes { | ||
weightedAvailableCapacity := weightedCapacity(node.ComputeNodeInfo.AvailableCapacity, cpuWeight, memoryWeight, diskWeight, gpuWeight) | ||
weightedQueueUsedCapacity := weightedCapacity(node.ComputeNodeInfo.QueueUsedCapacity, cpuWeight, memoryWeight, diskWeight, gpuWeight) | ||
|
||
weightedAvailableCapacities[node.NodeID] = weightedAvailableCapacity | ||
weightedQueueCapacities[node.NodeID] = weightedQueueUsedCapacity | ||
|
||
if weightedAvailableCapacity > maxWeightedAvailableCapacity { | ||
maxWeightedAvailableCapacity = weightedAvailableCapacity | ||
} | ||
if weightedQueueUsedCapacity > maxQueueUsedCapacity { | ||
maxQueueUsedCapacity = weightedQueueUsedCapacity | ||
} | ||
} | ||
|
||
// Rank nodes based on normalized weighted capacities | ||
ranks := make([]orchestrator.NodeRank, len(nodes)) | ||
|
||
for i, node := range nodes { | ||
weightedAvailableCapacity := weightedAvailableCapacities[node.NodeID] | ||
weightedQueueUsedCapacity := weightedQueueCapacities[node.NodeID] | ||
|
||
// Calculate the ratios of available and queue capacities | ||
availableRatio := 0.0 | ||
queueRatio := 0.0 | ||
|
||
if maxWeightedAvailableCapacity > 0 { | ||
availableRatio = weightedAvailableCapacity / maxWeightedAvailableCapacity | ||
} | ||
if maxQueueUsedCapacity > 0 { | ||
queueRatio = weightedQueueUsedCapacity / maxQueueUsedCapacity | ||
} | ||
|
||
// Normalize the ratios to the rank range | ||
normalizedAvailableRank := availableRatio * float64(maxAvailableCapacityRank) | ||
normalizedQueueRank := (1 - queueRatio) * float64(maxQueueCapacityRank) | ||
|
||
// Calculate the final rank, higher available capacity and lower queue used capacity should give a higher rank | ||
rank := normalizedAvailableRank + normalizedQueueRank | ||
|
||
// Ensure the rank is within the desired range | ||
rank = math.Max(rank, float64(orchestrator.RankPossible)) | ||
rank = math.Min(rank, maxAvailableCapacityRank+maxQueueCapacityRank) | ||
|
||
// Assign rank and reason to the node | ||
ranks[i] = orchestrator.NodeRank{ | ||
NodeInfo: node, | ||
Rank: int(rank), | ||
Reason: fmt.Sprintf( | ||
"Ranked based on available capacity %s and queue capacity %s", | ||
node.ComputeNodeInfo.AvailableCapacity.String(), node.ComputeNodeInfo.QueueUsedCapacity.String()), | ||
Retryable: true, | ||
} | ||
log.Ctx(ctx).Trace().Object("Rank", ranks[i]).Msg("Ranked node") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For readability these two loops could be separate functions, but I don't feel strongly about it. your choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in c5fbcb9
node.ComputeNodeInfo.AvailableCapacity.String(), node.ComputeNodeInfo.QueueUsedCapacity.String()), | ||
Retryable: true, | ||
} | ||
log.Ctx(ctx).Trace().Object("Rank", ranks[i]).Msg("Ranked node") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a debug log? Or add a log outside of this loop that prints the nodes in ranked order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorting node ranks is not cheap. We are already sorting in the selector after each ranker provided their own rank, and will be waste to sort again here for debug logging purpose. Also making this log line debug level is going to be noisy as we have more nodes in the network
equalCheck bool | ||
} | ||
|
||
func (suite *AvailableCapacityNodeRankerSuite) TestRankNodes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd find this test slightly more readable if the node resources were scaled from a common value, e.g.
But don't feel strongly enough to push for change
bc := models.Resources{CPU: 4, Memory: 16000, Disk: 200000, GPU: 1}
bq := models.Resources{CPU: 1, Memory: 4000, Disk: 100000, GPU: 0},
nodes: []nodeScenario{
{
nodeID: "node1",
availableCapacity: bc,
queueUsedCapacity: bq,
},
{
nodeID: "node2",
availableCapacity: models.Resources{CPU: bc.CPU*2, Memory: bc.Memory*2, Disk: bc.Disk/2, GPU: bc.GPU*2},
queueUsedCapacity: models.Resources{CPU: bq.CPU/2, Memory: bq.Memory/2, Disk: bc.Disk/2, GPU: bc.GPU},
},
{
nodeID: "node3",
availableCapacity: models.Resources{CPU: bc.CPU/2, Memory: bc.Memory, Disk: bc.Disk-5000, GPU: bc.GPU*4},
queueUsedCapacity: models.Resources{CPU: bq.CPU/2, Memory: bq.Memory/4, Disk: bq.Disk/10, GPU: bq.GPU*2},
},
},
log.Ctx(ctx).Trace().Object("Rank", ranks[i]).Msg("Ranked node") | ||
} | ||
|
||
return ranks, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's return the ranks in sorted order, will remove one step from the tests and make it easier for humans to parse log lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
answered in #4002 (comment)
} | ||
|
||
// dynamicWeights calculates the weights for resources based on the job requirements. | ||
func dynamicWeights(jobRequirements *models.Resources) (float64, float64, float64, float64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably worth using named returns here, or encapsulate the result in a struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in c5fbcb9
) | ||
|
||
// Constants to normalize resource values to a comparable scale. | ||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passing thought: These constants are great candidates for system configuration values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like us to think more of conventions over configurations, and reduce the number of available configurations that are not expected to change, whether we call them user or system configurations. If a value is not expected to change by users or system operators, not shared between packages and only expected to be used by current package, then it should live as a constant in the package itself. This improves the code's readability and make it clear to developers/operators what the values are, and that they are not configurable in different environments.
Even when it comes to testing, we should minimize the need to override those values for testing purposes to make sure we are are testing similar paths to production. But when there is a need to override those values, then we should still keep the default values as constants in the same package.
This PR not only closes #3992, but also adds a node ranker based on available and queued capacity, which we didn't have before. I had to introduce the ranker along with this PR to make sure some tests are passing that were relying on bid rejection due to exceeding resource limits, specifically
TestParallelGPU