Skip to content

Commit

Permalink
[YUNIKORN-2618]Update Streamline AsyncRMCallback UpdateAllocation (#846)
Browse files Browse the repository at this point in the history
Closes: #846

Signed-off-by: Peter Bacsko <pbacsko@cloudera.com>
  • Loading branch information
SophieTech88 authored and pbacsko committed May 22, 2024
1 parent 6aa6afe commit 08a9b93
Showing 1 changed file with 19 additions and 26 deletions.
45 changes: 19 additions & 26 deletions pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,50 +56,43 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons

// update cache
task := callback.context.getTask(alloc.ApplicationID, alloc.AllocationKey)
if task != nil {
task.setAllocationKey(alloc.AllocationKey)
} else {
if task == nil {
log.Log(log.ShimRMCallback).Warn("Unable to get task", zap.String("taskID", alloc.AllocationKey))
continue
}

task.setAllocationKey(alloc.AllocationKey)

if err := callback.context.AssumePod(alloc.AllocationKey, alloc.NodeID); err != nil {
if task != nil {
task.failWithEvent(err.Error(), "AssumePodError")
}
task.failWithEvent(err.Error(), "AssumePodError")
return err
}
if app := callback.context.GetApplication(alloc.ApplicationID); app != nil {
if task != nil {
if utils.IsAssignedPod(task.GetTaskPod()) {
// task is already bound, fixup state and continue
task.MarkPreviouslyAllocated(alloc.AllocationKey, alloc.NodeID)
} else {
ev := NewAllocateTaskEvent(app.GetApplicationID(), task.taskID, alloc.AllocationKey, alloc.NodeID)
dispatcher.Dispatch(ev)
}
}

if utils.IsAssignedPod(task.GetTaskPod()) {
// task is already bound, fixup state and continue
task.MarkPreviouslyAllocated(alloc.AllocationKey, alloc.NodeID)
} else {
ev := NewAllocateTaskEvent(alloc.ApplicationID, task.taskID, alloc.AllocationKey, alloc.NodeID)
dispatcher.Dispatch(ev)
}
}

for _, reject := range response.Rejected {
// request rejected by the scheduler, put it back and try scheduling again
log.Log(log.ShimRMCallback).Debug("callback: response to rejected ask",
zap.String("allocationKey", reject.AllocationKey))
if app := callback.context.GetApplication(reject.ApplicationID); app != nil {
dispatcher.Dispatch(NewRejectTaskEvent(app.GetApplicationID(), reject.AllocationKey,
fmt.Sprintf("task %s ask from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}
dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, reject.AllocationKey,
fmt.Sprintf("task %s ask from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}

for _, reject := range response.RejectedAllocations {
// request rejected by the scheduler, reject it
log.Log(log.ShimRMCallback).Debug("callback: response to rejected allocation",
zap.String("allocationKey", reject.AllocationKey))
if app := callback.context.GetApplication(reject.ApplicationID); app != nil {
dispatcher.Dispatch(NewRejectTaskEvent(app.GetApplicationID(), reject.AllocationKey,
fmt.Sprintf("task %s allocation from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}
dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, reject.AllocationKey,
fmt.Sprintf("task %s allocation from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}

for _, release := range response.Released {
Expand Down

0 comments on commit 08a9b93

Please sign in to comment.