Skip to content

Commit

Permalink
[YUNIKORN-287] reserved ask release double release (#185)
Browse files Browse the repository at this point in the history
When an ask gets released that is reserved and being allocated the
counters tracking reservations can be updated twice for one ask release.
The counters keep track of the number of reservations on a queue or
partition.

This can lead to other reservations being ignored. Nodes that have been
reserved by other asks for the same app will be skipped during
scheduling. In a small cluster this could lead to resource starvation

Fixes: #185
  • Loading branch information
wilfred-s committed Jul 17, 2020
1 parent 699243c commit f3e1c76
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 129 deletions.
50 changes: 32 additions & 18 deletions pkg/scheduler/scheduling_application.go
Expand Up @@ -149,16 +149,17 @@ func (sa *SchedulingApplication) removeAllocationAsk(allocKey string) int {
if allocKey == "" {
// cleanup all reservations
for key, reserve := range sa.reservations {
_, err := reserve.unReserve()
releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
if err != nil {
log.Logger().Warn("Removal of reservation failed while removing all allocation asks",
zap.String("appID", sa.ApplicationInfo.ApplicationID),
zap.String("reservationKey", key),
zap.Error(err))
continue
}
// clean up the queue reservation (one at a time)
sa.queue.unReserve(sa.ApplicationInfo.ApplicationID)
toRelease++
sa.queue.unReserve(sa.ApplicationInfo.ApplicationID, releases)
toRelease += releases
}
// Cleanup total pending resource
deltaPendingResource = sa.pending
Expand All @@ -167,16 +168,18 @@ func (sa *SchedulingApplication) removeAllocationAsk(allocKey string) int {
} else {
// cleanup the reservation for this allocation
for _, key := range sa.isAskReserved(allocKey) {
_, err := sa.reservations[key].unReserve()
reserve := sa.reservations[key]
releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
if err != nil {
log.Logger().Warn("Removal of reservation failed while removing allocation ask",
zap.String("appID", sa.ApplicationInfo.ApplicationID),
zap.String("reservationKey", key),
zap.Error(err))
continue
}
// clean up the queue reservation
sa.queue.unReserve(sa.ApplicationInfo.ApplicationID)
toRelease++
sa.queue.unReserve(sa.ApplicationInfo.ApplicationID, releases)
toRelease += releases
}
if ask := sa.requests[allocKey]; ask != nil {
deltaPendingResource = resources.MultiplyBy(ask.AllocatedResource, float64(ask.getPendingAskRepeat()))
Expand Down Expand Up @@ -325,39 +328,50 @@ func (sa *SchedulingApplication) reserve(node *SchedulingNode, ask *schedulingAl

// unReserve the application for this node and ask combination.
// This first removes the reservation from the node.
// The error is set if the reservation key cannot be generated on the app or node.
// If the reservation does not exist it returns false, if the reservation is removed it returns true.
func (sa *SchedulingApplication) unReserve(node *SchedulingNode, ask *schedulingAllocationAsk) error {
// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
// The error is set if the reservation key cannot be removed from the app or node.
func (sa *SchedulingApplication) unReserve(node *SchedulingNode, ask *schedulingAllocationAsk) (int, error) {
sa.Lock()
defer sa.Unlock()
return sa.unReserveInternal(node, ask)
}

// Unlocked version for unReserve that really does the work.
// Must only be called while holding the application lock.
func (sa *SchedulingApplication) unReserveInternal(node *SchedulingNode, ask *schedulingAllocationAsk) error {
func (sa *SchedulingApplication) unReserveInternal(node *SchedulingNode, ask *schedulingAllocationAsk) (int, error) {
resKey := reservationKey(node, nil, ask)
if resKey == "" {
log.Logger().Debug("unreserve reservation key create failed unexpectedly",
zap.String("appID", sa.ApplicationInfo.ApplicationID),
zap.Any("node", node),
zap.Any("ask", ask))
return fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationInfo.ApplicationID)
return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationInfo.ApplicationID)
}
// find the reservation and then unReserve the node before removing from the app
// unReserve the node before removing from the app
var num int
var err error
if num, err = node.unReserve(sa, ask); err != nil {
return 0, err
}
// if the unreserve worked on the node check the app
if _, found := sa.reservations[resKey]; found {
if err := node.unReserve(sa, ask); err != nil {
return err
// worked on the node means either found or not but no error, log difference here
if num == 0 {
log.Logger().Info("reservation not found while removing from node, app has reservation",
zap.String("appID", sa.ApplicationInfo.ApplicationID),
zap.String("nodeID", node.NodeID),
zap.String("ask", ask.AskProto.AllocationKey))
}
delete(sa.reservations, resKey)
return nil
return 1, nil
}
// reservation was not found
log.Logger().Debug("reservation not found while removing from app",
log.Logger().Info("reservation not found while removing from app",
zap.String("appID", sa.ApplicationInfo.ApplicationID),
zap.String("nodeID", node.NodeID),
zap.String("ask", ask.AskProto.AllocationKey))
return nil
zap.String("ask", ask.AskProto.AllocationKey),
zap.Int("nodeReservationsRemoved", num))
return 0, nil
}

// Return the allocation reservations on any node.
Expand Down
30 changes: 16 additions & 14 deletions pkg/scheduler/scheduling_application_test.go
Expand Up @@ -142,7 +142,8 @@ func TestAppReservation(t *testing.T) {
}

// unreserve unknown node/ask
err = app.unReserve(nil, nil)
var num int
_, err = app.unReserve(nil, nil)
if err == nil {
t.Errorf("illegal reservation release but did not fail: error %v", err)
}
Expand All @@ -162,23 +163,23 @@ func TestAppReservation(t *testing.T) {
if err != nil {
t.Errorf("reservation of 2nd node should not have failed: error %v", err)
}
err = app.unReserve(node2, ask2)
_, err = app.unReserve(node2, ask2)
if err != nil {
t.Errorf("remove of reservation of 2nd node should not have failed: error %v", err)
}
// unreserve the same should fail
err = app.unReserve(node2, ask2)
_, err = app.unReserve(node2, ask2)
if err != nil {
t.Errorf("remove twice of reservation of 2nd node should have failed: error %v", err)
}

// failure case: remove reservation from node
err = node.unReserve(app, ask)
assert.NilError(t, err, "un-reserve on node should not have failed: error")
err = app.unReserve(node, ask)
if err != nil {
t.Errorf("node does not have reservation removal of app reservation should have failed: error %v", err)
}
// failure case: remove reservation from node, app still needs cleanup
num, err = node.unReserve(app, ask)
assert.NilError(t, err, "un-reserve on node should not have failed with error")
assert.Equal(t, num, 1, "un-reserve on node should have removed reservation")
num, err = app.unReserve(node, ask)
assert.NilError(t, err, "app has reservation should not have failed")
assert.Equal(t, num, 1, "un-reserve on app should have removed reservation from app")
}

// test multiple reservations from one allocation
Expand Down Expand Up @@ -437,10 +438,11 @@ func TestRemoveReservedAllocAsk(t *testing.T) {
if len(app.isAskReserved(allocKey)) != 1 || !node.isReserved() {
t.Fatalf("app should have reservation for %v on node", allocKey)
}
err = node.unReserve(app, ask2)
if err != nil {
t.Errorf("unreserve on node should not have failed: error %v", err)
}
var num int
num, err = node.unReserve(app, ask2)
assert.NilError(t, err, "un-reserve on node should not have failed")
assert.Equal(t, num, 1, "un-reserve on node should have removed reservation")

before = app.GetPendingResource().Clone()
reservedAsks = app.removeAllocationAsk(allocKey)
delta = resources.Sub(before, app.GetPendingResource())
Expand Down
26 changes: 14 additions & 12 deletions pkg/scheduler/scheduling_node.go
Expand Up @@ -338,9 +338,9 @@ func (sn *SchedulingNode) reserve(app *SchedulingApplication, ask *schedulingAll
}

// unReserve the node for this application and ask combination
// If the reservation does not exist it returns false, if the reservation is removed it returns true.
// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
// The error is set if the reservation key cannot be generated.
func (sn *SchedulingNode) unReserve(app *SchedulingApplication, ask *schedulingAllocationAsk) error {
func (sn *SchedulingNode) unReserve(app *SchedulingApplication, ask *schedulingAllocationAsk) (int, error) {
sn.Lock()
defer sn.Unlock()
resKey := reservationKey(nil, app, ask)
Expand All @@ -349,40 +349,42 @@ func (sn *SchedulingNode) unReserve(app *SchedulingApplication, ask *schedulingA
zap.String("nodeID", sn.NodeID),
zap.Any("app", app),
zap.Any("ask", ask))
return fmt.Errorf("reservation key failed app or ask are nil on nodeID %s", sn.NodeID)
return 0, fmt.Errorf("reservation key failed app or ask are nil on nodeID %s", sn.NodeID)
}
if _, ok := sn.reservations[resKey]; ok {
delete(sn.reservations, resKey)
return nil
return 1, nil
}
// reservation was not found
log.Logger().Debug("reservation not found while removing from node",
zap.String("nodeID", sn.NodeID),
zap.String("appID", app.ApplicationInfo.ApplicationID),
zap.String("ask", ask.AskProto.AllocationKey))
return nil
return 0, nil
}

// Remove all reservation made on this node from the app.
// This is an unlocked function, it does not use a copy of the map when calling unReserve. That call will via the app call
// unReserve on the node which is locked and modifies the original map. However deleting an entry from a map while iterating
// over the map is perfectly safe based on the Go Specs.
// It must only be called when removing the node under a partition lock.
// It returns a list of all apps that have been unreserved on the node regardless of the result of the app unReserve call.
// If all unReserve calls work true will be returned, false in all other cases.
func (sn *SchedulingNode) unReserveApps() ([]string, bool) {
var allOK = true
// It returns a list of all apps that have been checked on the node regardless of the result of the app unReserve call.
// The corresponding integers show the number of reservations removed for each app entry
func (sn *SchedulingNode) unReserveApps() ([]string, []int) {
var appReserve []string
var askRelease []int
for key, res := range sn.reservations {
appID, err := res.unReserve()
appID := res.appID
num, err := res.app.unReserveInternal(res.node, res.ask)
if err != nil {
log.Logger().Warn("Removal of reservation failed while removing node",
zap.String("nodeID", sn.NodeID),
zap.String("reservationKey", key),
zap.Error(err))
allOK = false
}
// pass back the removed asks for each app
appReserve = append(appReserve, appID)
askRelease = append(askRelease, num)
}
return appReserve, allOK
return appReserve, askRelease
}
31 changes: 17 additions & 14 deletions pkg/scheduler/scheduling_node_test.go
Expand Up @@ -334,18 +334,21 @@ func TestNodeReservation(t *testing.T) {
}

// unreserve different app
err = node.unReserve(nil, nil)
_, err = node.unReserve(nil, nil)
if err == nil {
t.Errorf("illegal reservation release but did not fail: error %v", err)
}
appID = "app-2"
ask2 := newAllocationAsk("alloc-2", appID, res)
appInfo = cache.NewApplicationInfo(appID, "default", "root.unknown", security.UserGroup{}, nil)
app2 := newSchedulingApplication(appInfo)
err = node.unReserve(app2, ask2)
var num int
num, err = node.unReserve(app2, ask2)
assert.NilError(t, err, "un-reserve different app should have failed without error")
err = node.unReserve(app, ask)
assert.Equal(t, num, 0, "un-reserve different app should have failed without releases")
num, err = node.unReserve(app, ask)
assert.NilError(t, err, "un-reserve should not have failed")
assert.Equal(t, num, 1, "un-reserve app should have released ")
}

func TestUnReserveApps(t *testing.T) {
Expand All @@ -356,9 +359,9 @@ func TestUnReserveApps(t *testing.T) {
if node.isReserved() {
t.Fatal("new node should not have reservations")
}
reservedKeys, ok := node.unReserveApps()
if !ok || len(reservedKeys) != 0 {
t.Fatal("new node should not fail remove all reservations")
reservedKeys, releasedAsks := node.unReserveApps()
if len(reservedKeys) != 0 || len(releasedAsks) != 0 {
t.Fatalf("new node should not fail remove all reservations: asks released = %v, reservation keys = %v", releasedAsks, reservedKeys)
}

// create some reservations and see it clean up via the app
Expand All @@ -379,18 +382,18 @@ func TestUnReserveApps(t *testing.T) {
err = app.reserve(node, ask)
assert.NilError(t, err, "reservation should not have failed")
assert.Equal(t, 1, len(node.reservations), "node should have reservation")
reservedKeys, ok = node.unReserveApps()
if !ok || len(reservedKeys) != 1 {
reservedKeys, releasedAsks = node.unReserveApps()
if len(reservedKeys) != 1 || len(releasedAsks) != 1 {
t.Fatal("node should have removed reservation")
}

// reserve just the node
err = node.reserve(app, ask)
assert.NilError(t, err, "reservation should not have failed")
assert.Equal(t, 1, len(node.reservations), "node should have reservation")
reservedKeys, ok = node.unReserveApps()
if !ok || len(reservedKeys) != 1 {
t.Errorf("node should have removed reservation: status = %t, reservation keys = %v", ok, reservedKeys)
reservedKeys, releasedAsks = node.unReserveApps()
if len(reservedKeys) != 1 || len(releasedAsks) != 1 {
t.Fatalf("node should have removed reservation: asks released = %v, reservation keys = %v", releasedAsks, reservedKeys)
}
}

Expand All @@ -402,9 +405,9 @@ func TestIsReservedForApp(t *testing.T) {
if node.isReserved() {
t.Fatal("new node should not have reservations")
}
reservedKeys, ok := node.unReserveApps()
if !ok || len(reservedKeys) != 0 {
t.Fatalf("new node should not fail remove all reservations: status = %t, reservation keys = %v", ok, reservedKeys)
reservedKeys, releasedAsks := node.unReserveApps()
if len(reservedKeys) != 0 || len(releasedAsks) != 0 {
t.Fatalf("new node should not fail remove all reservations: asks released = %v, reservation keys = %v", releasedAsks, reservedKeys)
}

// check if we can allocate on a reserved node
Expand Down
28 changes: 10 additions & 18 deletions pkg/scheduler/scheduling_partition.go
Expand Up @@ -344,15 +344,10 @@ func (psc *partitionSchedulingContext) removeSchedulingNode(nodeID string) {
// remove the node, this will also get the sync back between the two lists
delete(psc.nodes, nodeID)
// unreserve all the apps that were reserved on the node
var reservedKeys []string
reservedKeys, ok = node.unReserveApps()
if !ok {
log.Logger().Warn("Node removal did not remove all application reservations this can affect scheduling",
zap.String("nodeID", nodeID))
}
reservedKeys, releasedAsks := node.unReserveApps()
// update the partition reservations based on the node clean up
for _, appID := range reservedKeys {
psc.unReserveCount(appID, 1)
for i, appID := range reservedKeys {
psc.unReserveCount(appID, releasedAsks[i])
}
}

Expand All @@ -379,12 +374,6 @@ func (psc *partitionSchedulingContext) tryAllocate() *schedulingAllocation {
// Try process reservations for the partition
// Lock free call this all locks are taken when needed in called functions
func (psc *partitionSchedulingContext) tryReservedAllocate() *schedulingAllocation {
psc.Lock()
if len(psc.reservedApps) == 0 {
psc.Unlock()
return nil
}
psc.Unlock()
// try allocating from the root down
return psc.root.tryReservedAllocate(psc)
}
Expand Down Expand Up @@ -553,21 +542,24 @@ func (psc *partitionSchedulingContext) unReserve(app *SchedulingApplication, nod
return
}
// all ok, remove the reservation of the app, this will also unReserve the node
if err := app.unReserve(node, ask); err != nil {
var err error
var num int
if num, err = app.unReserve(node, ask); err != nil {
log.Logger().Info("Failed to unreserve, error during allocate on the app",
zap.Error(err))
return
}
// remove the reservation of the queue
app.queue.unReserve(appID)
app.queue.unReserve(appID, num)
// make sure we cannot go below 0
psc.unReserveCount(appID, 1)
psc.unReserveCount(appID, num)

log.Logger().Info("allocation ask is unreserved",
zap.String("appID", ask.ApplicationID),
zap.String("queue", ask.QueueName),
zap.String("allocationKey", ask.AskProto.AllocationKey),
zap.String("node", node.NodeID))
zap.String("node", node.NodeID),
zap.Int("reservationsRemoved", num))
}

// Get the iterator for the sorted nodes list from the partition.
Expand Down

0 comments on commit f3e1c76

Please sign in to comment.