Skip to content

Commit

Permalink
fix: clean up device allocation if the resources are released (#1930)
Browse files Browse the repository at this point in the history
Previously, the agent resource pool relies on the container state change message to deallocate resources. This PR changes that to always deallocate resources when a task actor fails.
  • Loading branch information
shiyuann authored Feb 8, 2021
1 parent 0a36789 commit fea363e
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 53 deletions.
5 changes: 0 additions & 5 deletions master/internal/agent/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ func (s *slot) Receive(ctx *actor.Context) error {
s.container = &msg.Container
if msg.Container.State == container.Terminated {
s.container = nil
if s.enabled.Enabled() {
ctx.Tell(s.resourcePool, sproto.FreeDevice{
DeviceID: s.deviceID(ctx), ContainerID: &msg.Container.ID,
})
}
}
case *proto.GetSlotRequest:
ctx.Respond(&proto.GetSlotResponse{Slot: toProtoSlot(s.summarize(ctx))})
Expand Down
16 changes: 2 additions & 14 deletions master/internal/agent/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ import (
"github.com/pkg/errors"

"github.com/determined-ai/determined/master/internal/api"
"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/pkg/actor"
aproto "github.com/determined-ai/determined/master/pkg/agent"
"github.com/determined-ai/determined/master/pkg/check"
"github.com/determined-ai/determined/master/pkg/container"
"github.com/determined-ai/determined/master/pkg/device"
)

type slots struct {
Expand Down Expand Up @@ -81,17 +79,7 @@ func (s *slots) summarize(ctx *actor.Context) SlotsSummary {
}

func (s *slots) sendToSlots(ctx *actor.Context, c container.Container, msg actor.Message) {
if len(c.Devices) == 0 && c.State == container.Terminated {
// This is to handle the case where the task is not using GPU devices and is running
// on agent where only GPUs are modeled as devices.
ctx.Tell(s.resourcePool, sproto.FreeDevice{
DeviceID: sproto.DeviceID{
Agent: ctx.Self().Parent(), Device: device.Device{Type: device.ZeroSlot}},
ContainerID: &c.ID,
})
} else {
for _, d := range c.Devices {
ctx.Tell(ctx.Child(d.ID), msg)
}
for _, d := range c.Devices {
ctx.Tell(ctx.Child(d.ID), msg)
}
}
14 changes: 6 additions & 8 deletions master/internal/resourcemanagers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,13 @@ func (a *agentState) allocateFreeDevices(slots int, id cproto.ID) []device.Devic
return devices
}

func (a *agentState) deallocateDevice(t device.Type, id cproto.ID, d device.Device) {
if t == device.ZeroSlot {
delete(a.zeroSlotContainers, id)
return
func (a *agentState) deallocateContainer(id cproto.ID) {
delete(a.zeroSlotContainers, id)
for d, cid := range a.devices {
if cid != nil && *cid == id {
a.devices[d] = nil
}
}
cid, ok := a.devices[d]
check.Panic(check.True(ok, "error freeing device, device not found: %s", d))
check.Panic(check.True(cid != nil, "error freeing device, device not assigned: %s", d))
a.devices[d] = nil
}

func (a *agentState) deepCopy() *agentState {
Expand Down
14 changes: 1 addition & 13 deletions master/internal/resourcemanagers/priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/pkg/actor"
cproto "github.com/determined-ai/determined/master/pkg/container"
"github.com/determined-ai/determined/master/pkg/device"
)

func TestSortTasksByPriorityAndTimestamps(t *testing.T) {
Expand Down Expand Up @@ -886,23 +884,13 @@ func AddUnallocatedTasks(
}
}

func deallocateDevices(a *agentState, slots int, id cproto.ID, devices []device.Device) {
if slots == 0 {
a.deallocateDevice(device.ZeroSlot, id, device.Device{})
}

for _, d := range devices {
a.deallocateDevice(device.CPU, id, d)
}
}

func RemoveTask(slots int, toRelease *actor.Ref, taskList *taskList, delete bool) bool {
for _, alloc := range taskList.GetAllocations(toRelease).Allocations {
alloc, ok := alloc.(*containerAllocation)
if !ok {
return false
}
deallocateDevices(alloc.agent, slots, alloc.container.id, alloc.devices)
alloc.agent.deallocateContainer(alloc.container.id)
}
if delete {
taskList.RemoveTaskByHandler(toRelease)
Expand Down
14 changes: 6 additions & 8 deletions master/internal/resourcemanagers/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func (rp *ResourcePool) releaseResource(ctx *actor.Context, handler *actor.Ref)

func (rp *ResourcePool) resourcesReleased(ctx *actor.Context, handler *actor.Ref) {
ctx.Log().Infof("resources are released for %s", handler.Address())
if allocated := rp.taskList.GetAllocations(handler); allocated != nil {
for _, allocation := range allocated.Allocations {
typed := allocation.(*containerAllocation)
typed.agent.deallocateContainer(typed.container.id)
}
}
rp.taskList.RemoveTaskByHandler(handler)
}

Expand Down Expand Up @@ -219,7 +225,6 @@ func (rp *ResourcePool) Receive(ctx *actor.Context) error {
case
sproto.AddAgent,
sproto.AddDevice,
sproto.FreeDevice,
sproto.RemoveDevice,
sproto.RemoveAgent:
return rp.receiveAgentMsg(ctx)
Expand Down Expand Up @@ -283,13 +288,6 @@ func (rp *ResourcePool) receiveAgentMsg(ctx *actor.Context) error {
check.Panic(check.True(ok, "error adding device, agent not found: %s", msg.Agent.Address()))
state.devices[msg.Device] = msg.ContainerID

case sproto.FreeDevice:
ctx.Log().Infof("freeing device from container %s: %s on %s",
msg.ContainerID, msg.Device.String(), msg.Agent.Address().Local())
state, ok := rp.agents[msg.Agent]
check.Panic(check.True(ok, "error freeing device, agent not found: %s", msg.Agent.Address()))
state.deallocateDevice(msg.Device.Type, *msg.ContainerID, msg.Device)

case sproto.RemoveDevice:
ctx.Log().Infof("removing device: %s (%s)", msg.Device.String(), msg.Agent.Address().Local())
state, ok := rp.agents[msg.Agent]
Expand Down
5 changes: 0 additions & 5 deletions master/internal/sproto/agent_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ type (
DeviceID
ContainerID *cproto.ID
}
// FreeDevice notifies the cluster that the device's container is no longer running.
FreeDevice struct {
DeviceID
ContainerID *cproto.ID
}
// RemoveDevice removes the device from scheduling.
RemoveDevice struct {
DeviceID
Expand Down

0 comments on commit fea363e

Please sign in to comment.