Skip to content

Commit

Permalink
fix: commands (TensorBoards, notebooks, etc.) should not be prempted [D…
Browse files Browse the repository at this point in the history
  • Loading branch information
mackrorysd committed Sep 28, 2020
1 parent 5ace732 commit b957dab
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 55 deletions.
1 change: 1 addition & 0 deletions master/go.sum
Expand Up @@ -193,6 +193,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denis-tingajkin/go-header v0.3.1 h1:ymEpSiFjeItCy1FOP+x0M2KdCELdEAHUsNa8F+hHc6w=
github.com/denis-tingajkin/go-header v0.3.1/go.mod h1:sq/2IxMhaZX+RRcgHfCRx/m0M5na0fBt4/CRe7Lrji0=
github.com/determined-ai/determined v0.12.4 h1:lLASmkwInB+Q58ZmBfx2aDYt71cP6Liya6BYljxROQI=
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
3 changes: 2 additions & 1 deletion master/internal/checkpoint_gc.go
Expand Up @@ -33,7 +33,8 @@ func (t *checkpointGCTask) Receive(ctx *actor.Context) error {
FittingRequirements: scheduler.FittingRequirements{
SingleAgent: true,
},
TaskActor: ctx.Self(),
TaskActor: ctx.Self(),
NonPreemptible: true,
})

case scheduler.ResourcesAllocated:
Expand Down
10 changes: 5 additions & 5 deletions master/internal/command/command.go
Expand Up @@ -104,11 +104,11 @@ func (c *command) Receive(ctx *actor.Context) error {
c.proxy = ctx.Self().System().Get(actor.Addr("proxy"))

c.task = &scheduler.AllocateRequest{
ID: c.taskID,
Name: c.config.Description,
SlotsNeeded: c.config.Resources.Slots,
Label: c.config.Resources.AgentLabel,
CanTerminate: true,
ID: c.taskID,
Name: c.config.Description,
SlotsNeeded: c.config.Resources.Slots,
Label: c.config.Resources.AgentLabel,
NonPreemptible: true,
FittingRequirements: scheduler.FittingRequirements{
SingleAgent: true,
},
Expand Down
36 changes: 16 additions & 20 deletions master/internal/scheduler/default_resource_provider_test.go
Expand Up @@ -90,11 +90,10 @@ func TestCleanUpTaskWhenTaskActorStopsWithError(t *testing.T) {

system.Ask(mockActor, AskSchedulerToAddTask{
task: AllocateRequest{
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
CanTerminate: true,
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
},
}).Get()
assert.Equal(t, c.taskList.len(), 1)
Expand Down Expand Up @@ -128,11 +127,10 @@ func TestCleanUpTaskWhenTaskActorPanics(t *testing.T) {

system.Ask(mockActor, AskSchedulerToAddTask{
task: AllocateRequest{
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
CanTerminate: true,
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
},
}).Get()

Expand Down Expand Up @@ -167,11 +165,10 @@ func TestCleanUpTaskWhenTaskActorStopsNormally(t *testing.T) {

system.Ask(mockActor, AskSchedulerToAddTask{
task: AllocateRequest{
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
CanTerminate: true,
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
},
}).Get()

Expand Down Expand Up @@ -204,11 +201,10 @@ func testWhenActorsStopOrTaskIsKilled(t *testing.T, r *rand.Rand) {

system.Ask(mockActor, AskSchedulerToAddTask{
task: AllocateRequest{
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
CanTerminate: true,
ID: TaskID(uuid.New().String()),
Name: "mock_task",
Group: mockActor,
SlotsNeeded: 1,
},
}).Get()

Expand Down
48 changes: 44 additions & 4 deletions master/internal/scheduler/fair_share.go
Expand Up @@ -27,6 +27,8 @@ type groupState struct {
slotDemand int
// activeSlots is the number of slots in use by running tasks that can potentially be freed.
activeSlots int
// presubscribedSlots are slots that are already allocated and cannot be terminated.
presubscribedSlots int
// offered is the number of slots that were offered to the group for scheduling.
offered int

Expand Down Expand Up @@ -136,6 +138,9 @@ func calculateGroupStates(
case allocated == nil || len(allocated.Allocations) == 0:
state.pendingReqs = append(state.pendingReqs, req)
case len(allocated.Allocations) > 0:
if req.NonPreemptible {
state.presubscribedSlots += req.SlotsNeeded
}
state.allocatedReqs = append(state.allocatedReqs, req)
state.activeSlots += req.SlotsNeeded
}
Expand All @@ -159,7 +164,39 @@ func getTotalWeight(states []*groupState) float64 {
return total
}

func accountForPreoffers(preoffers int, offer int) (int, int) {
if preoffers > 0 {
if preoffers == offer {
preoffers = 0
offer = 0
}
if preoffers > offer {
preoffers -= offer
offer = 0
}
if preoffers < offer {
preoffers = 0
offer -= preoffers
}
}
return preoffers, offer
}

func allocateSlotOffers(states []*groupState, capacity int) {
// To prevent becoming oversubscribed, we first need to account for slots that were already
// allocated to tasks that cannot be preempted.
preoffers := make(map[*groupState]int)
for _, state := range states {
if state.presubscribedSlots == 0 {
continue
}
// if state.presubscribedSlots > capacity, we are oversubscribed
// This shouldn't happen outside of unit tests
state.offered = state.presubscribedSlots
preoffers[state] = state.presubscribedSlots
capacity -= state.presubscribedSlots
}

// Slots are offered to each group based on the progressive filling algorithm, an
// implementation of max-min fairness. All groups start with no slots offered. All
// groups offers increase equally until groups have reached their slot demand. The
Expand Down Expand Up @@ -202,6 +239,7 @@ func allocateSlotOffers(states []*groupState, capacity int) {

progressMade = true
offer := min(calculatedFairShare, capacity, state.slotDemand-state.offered)
preoffers[state], offer = accountForPreoffers(preoffers[state], offer)
state.offered += offer
capacity -= offer
if state.offered == state.slotDemand {
Expand Down Expand Up @@ -263,10 +301,12 @@ func assignTasks(
// the count of offered slots.
// TODO: We should terminate running tasks more intelligently.
for _, req := range state.allocatedReqs {
toRelease = append(toRelease, req.TaskActor)
state.activeSlots -= req.SlotsNeeded
if state.activeSlots <= state.offered {
break
if !req.NonPreemptible {
toRelease = append(toRelease, req.TaskActor)
state.activeSlots -= req.SlotsNeeded
if state.activeSlots <= state.offered {
break
}
}
}
} else if state.activeSlots < state.offered {
Expand Down
82 changes: 82 additions & 0 deletions master/internal/scheduler/fair_share_test.go
Expand Up @@ -262,3 +262,85 @@ func TestFairShareLabels(t *testing.T) {
assertEqualToAllocate(t, toAllocate, expectedToAllocate)
assertEqualToRelease(t, taskList, toRelease, expectedToRelease)
}

func TestFairSharePreemptible(t *testing.T) {
agents := []*mockAgent{
{id: "agent", slots: 1, label: ""},
}
tasks := []*mockTask{
{id: "task1", slotsNeeded: 1, allocatedAgent: agents[0]},
{id: "task2", slotsNeeded: 1, allocatedAgent: agents[0]},
}

expectedToAllocate := []*mockTask{}
expectedToRelease := []*mockTask{tasks[1]}

system := actor.NewSystem(t.Name())
taskList, groupMap, agentMap := setupClusterStates(t, system, tasks, nil, agents)
toAllocate, toRelease := fairshareSchedule(taskList, groupMap, agentMap, BestFit)
assertEqualToAllocate(t, toAllocate, expectedToAllocate)
assertEqualToRelease(t, taskList, toRelease, expectedToRelease)
}

func TestFairShareHonorsNonPreemptibleInAGroup(t *testing.T) {
agents := []*mockAgent{
{id: "agent", slots: 1, label: ""},
}
groups := []*mockGroup{
{id: "group1", maxSlots: newMaxSlot(2), weight: 1},
}
expectedToAllocate := []*mockTask{}

tasks := []*mockTask{
{id: "task1", slotsNeeded: 1, group: groups[0], allocatedAgent: agents[0]},
{id: "task2", slotsNeeded: 1, group: groups[0], allocatedAgent: agents[0], nonPreemptible: true},
}
expectedToRelease := []*mockTask{tasks[0]}
system := actor.NewSystem(t.Name())
taskList, groupMap, agentMap := setupClusterStates(t, system, tasks, groups, agents)
toAllocate, toRelease := fairshareSchedule(taskList, groupMap, agentMap, BestFit)
assertEqualToAllocate(t, toAllocate, expectedToAllocate)
assertEqualToRelease(t, taskList, toRelease, expectedToRelease)

// Repeat test in reverse order, because subtle bugs can be order-dependent
tasks = []*mockTask{
{id: "task1", slotsNeeded: 1, group: groups[0], allocatedAgent: agents[0], nonPreemptible: true},
{id: "task2", slotsNeeded: 1, group: groups[0], allocatedAgent: agents[0]},
}
expectedToRelease = []*mockTask{tasks[1]}
system = actor.NewSystem(t.Name())
taskList, groupMap, agentMap = setupClusterStates(t, system, tasks, groups, agents)
toAllocate, toRelease = fairshareSchedule(taskList, groupMap, agentMap, BestFit)
assertEqualToAllocate(t, toAllocate, expectedToAllocate)
assertEqualToRelease(t, taskList, toRelease, expectedToRelease)
}

func TestFairShareHonorsNonPreemptibleNilGroup(t *testing.T) {
agents := []*mockAgent{
{id: "agent", slots: 1, label: ""},
}
expectedToAllocate := []*mockTask{}

tasks := []*mockTask{
{id: "task1", slotsNeeded: 1, allocatedAgent: agents[0]},
{id: "task2", slotsNeeded: 1, allocatedAgent: agents[0], nonPreemptible: true},
}
expectedToRelease := []*mockTask{tasks[0]}
system := actor.NewSystem(t.Name())
taskList, groupMap, agentMap := setupClusterStates(t, system, tasks, nil, agents)
toAllocate, toRelease := fairshareSchedule(taskList, groupMap, agentMap, BestFit)
assertEqualToAllocate(t, toAllocate, expectedToAllocate)
assertEqualToRelease(t, taskList, toRelease, expectedToRelease)

// Repeat test in reverse order, because subtle bugs can be order-dependent
tasks = []*mockTask{
{id: "task1", slotsNeeded: 1, allocatedAgent: agents[0], nonPreemptible: true},
{id: "task2", slotsNeeded: 1, allocatedAgent: agents[0]},
}
expectedToRelease = []*mockTask{tasks[1]}
system = actor.NewSystem(t.Name())
taskList, groupMap, agentMap = setupClusterStates(t, system, tasks, nil, agents)
toAllocate, toRelease = fairshareSchedule(taskList, groupMap, agentMap, BestFit)
assertEqualToAllocate(t, toAllocate, expectedToAllocate)
assertEqualToRelease(t, taskList, toRelease, expectedToRelease)
}
9 changes: 4 additions & 5 deletions master/internal/scheduler/filterable_view_test.go
Expand Up @@ -166,11 +166,10 @@ func addTask(
slotsNeeded int,
) *AllocateRequest {
req := &AllocateRequest{
ID: TaskID(taskID),
Group: newGroup(t, system, taskID+"-group"),
TaskActor: newGroup(t, system, taskID+"-handler"),
SlotsNeeded: slotsNeeded,
CanTerminate: true,
ID: TaskID(taskID),
Group: newGroup(t, system, taskID+"-group"),
TaskActor: newGroup(t, system, taskID+"-handler"),
SlotsNeeded: slotsNeeded,
}
allocated := &ResourcesAllocated{ID: req.ID, Allocations: []Allocation{}}
for i := 0; i < numAllocated; i++ {
Expand Down
4 changes: 2 additions & 2 deletions master/internal/scheduler/fitting_methods_test.go
Expand Up @@ -10,8 +10,8 @@ import (

func consumeSlots(agent *agentState, consume int) *agentState {
req := &AllocateRequest{
SlotsNeeded: consume,
CanTerminate: true,
SlotsNeeded: consume,
NonPreemptible: false,
}
container := newContainer(req, agent, req.SlotsNeeded)
agent.allocateFreeDevices(req.SlotsNeeded, container.id)
Expand Down
23 changes: 12 additions & 11 deletions master/internal/scheduler/scheduler_test.go
Expand Up @@ -44,6 +44,7 @@ type mockTask struct {
label string
group *mockGroup
allocatedAgent *mockAgent
nonPreemptible bool
}

type (
Expand Down Expand Up @@ -130,13 +131,12 @@ func setupCluster(
label := system.Ask(handler, getLabel{}).Get().(string)

d.addAllocatedTask(&AllocateRequest{
ID: TaskID(handler.Address().String()),
Name: handler.Address().Local(),
Group: g,
TaskActor: handler,
SlotsNeeded: slots,
CanTerminate: true,
Label: label,
ID: TaskID(handler.Address().String()),
Name: handler.Address().Local(),
Group: g,
TaskActor: handler,
SlotsNeeded: slots,
Label: label,
}, nil)
_ = d.getOrCreateGroup(nil, g)
if resp := system.Ask(g, getMaxSlots{}); resp.Get() != nil {
Expand Down Expand Up @@ -200,10 +200,11 @@ func setupClusterStates(
groups[ref] = &group{handler: ref}

req := &AllocateRequest{
ID: mockTask.id,
SlotsNeeded: mockTask.slotsNeeded,
Label: mockTask.label,
TaskActor: ref,
ID: mockTask.id,
SlotsNeeded: mockTask.slotsNeeded,
Label: mockTask.label,
TaskActor: ref,
NonPreemptible: mockTask.nonPreemptible,
}
if mockTask.group == nil {
req.Group = ref
Expand Down
2 changes: 1 addition & 1 deletion master/internal/scheduler/task.go
Expand Up @@ -14,7 +14,7 @@ type (
Name string
Group *actor.Ref
SlotsNeeded int
CanTerminate bool
NonPreemptible bool
Label string
FittingRequirements FittingRequirements
TaskActor *actor.Ref
Expand Down
12 changes: 6 additions & 6 deletions master/internal/trial.go
Expand Up @@ -341,12 +341,12 @@ func (t *trial) Receive(ctx *actor.Context) error {
}

t.task = &scheduler.AllocateRequest{
ID: scheduler.NewTaskID(),
Name: name,
Group: ctx.Self().Parent(),
SlotsNeeded: slotsNeeded,
CanTerminate: true,
Label: label,
ID: scheduler.NewTaskID(),
Name: name,
Group: ctx.Self().Parent(),
SlotsNeeded: slotsNeeded,
NonPreemptible: false,
Label: label,
FittingRequirements: scheduler.FittingRequirements{
SingleAgent: false,
},
Expand Down

0 comments on commit b957dab

Please sign in to comment.