Skip to content

Commit

Permalink
Make fake executor support cancellation
Browse files Browse the repository at this point in the history
We now track which pods are allocated to each node, so we can deallocate them correctly on cancellation

Previously cancelled pods would just remain allocated forever and block all capacity on the cluster
  • Loading branch information
JamesMurkin committed May 4, 2023
1 parent 04c6673 commit 6e52201
Showing 1 changed file with 46 additions and 26 deletions.
72 changes: 46 additions & 26 deletions internal/executor/fake/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,37 @@ var DefaultNodeSpec = []*NodeSpec{
},
}

type nodeAllocation struct {
availableResource armadaresource.ComputeResources
allocatedPods map[string]bool
}

type FakeClusterContext struct {
clusterId string
nodeIdLabel string
pool string
podEventHandlers []*cache.ResourceEventHandlerFuncs
clusterEventHandlers []*cache.ResourceEventHandlerFuncs
rwLock sync.RWMutex
pods map[string]*v1.Pod
events map[string]*v1.Event
nodes []*v1.Node
nodesByNodeId map[string]*v1.Node
nodeAvailableResource map[string]armadaresource.ComputeResources
clusterId string
nodeIdLabel string
pool string
podEventHandlers []*cache.ResourceEventHandlerFuncs
clusterEventHandlers []*cache.ResourceEventHandlerFuncs
rwLock sync.RWMutex
pods map[string]*v1.Pod
events map[string]*v1.Event
nodes []*v1.Node
nodesByNodeId map[string]*v1.Node
nodeAllocation map[string]nodeAllocation
}

func NewFakeClusterContext(appConfig configuration.ApplicationConfiguration, nodeIdLabel string, nodeSpecs []*NodeSpec) cluster_context.ClusterContext {
if nodeIdLabel == "" {
panic("nodeIdLabel must be set")
}
c := &FakeClusterContext{
clusterId: appConfig.ClusterId,
nodeIdLabel: nodeIdLabel,
pool: appConfig.Pool,
pods: map[string]*v1.Pod{},
nodes: []*v1.Node{},
nodesByNodeId: map[string]*v1.Node{},
nodeAvailableResource: map[string]armadaresource.ComputeResources{},
clusterId: appConfig.ClusterId,
nodeIdLabel: nodeIdLabel,
pool: appConfig.Pool,
pods: map[string]*v1.Pod{},
nodes: []*v1.Node{},
nodesByNodeId: map[string]*v1.Node{},
nodeAllocation: map[string]nodeAllocation{},
}
if nodeSpecs == nil {
nodeSpecs = DefaultNodeSpec
Expand Down Expand Up @@ -296,6 +301,7 @@ func (c *FakeClusterContext) DeletePods(pods []*v1.Pod) {

for _, p := range pods {
delete(c.pods, p.Name)
c.deallocateNoLock(p)
}
}()
}
Expand Down Expand Up @@ -336,7 +342,10 @@ func (c *FakeClusterContext) addNodes(specs []*NodeSpec) {
}
c.nodes = append(c.nodes, node)
c.nodesByNodeId[name] = node
c.nodeAvailableResource[node.Name] = armadaresource.FromResourceList(s.Allocatable)
c.nodeAllocation[name] = nodeAllocation{
allocatedPods: map[string]bool{},
availableResource: armadaresource.FromResourceList(s.Allocatable),
}
}
}
}
Expand Down Expand Up @@ -365,8 +374,8 @@ func (c *FakeClusterContext) trySchedule(pod *v1.Pod) (scheduled bool, removed b
sort.Slice(nodes, func(i, j int) bool {
node1 := c.nodes[i]
node2 := c.nodes[j]
node1Resource := c.nodeAvailableResource[node1.Name]
node2Resource := c.nodeAvailableResource[node2.Name]
node1Resource := c.nodeAllocation[node1.Name].availableResource
node2Resource := c.nodeAllocation[node2.Name].availableResource

// returns true if node1 should be considered before node2
return node2Resource.Dominates(node1Resource)
Expand All @@ -375,25 +384,36 @@ func (c *FakeClusterContext) trySchedule(pod *v1.Pod) (scheduled bool, removed b
for _, n := range nodes {
if c.isSchedulableOn(pod, n) {
resources := armadaresource.TotalPodResourceRequest(&pod.Spec)
c.nodeAvailableResource[n.Name].Sub(resources)
c.nodeAllocation[n.Name].availableResource.Sub(resources)
c.nodeAllocation[n.Name].allocatedPods[pod.Name] = true
pod.Spec.NodeName = n.Name
return true, false
}
}
return false, false
}

func (c *FakeClusterContext) deallocate(pod *v1.Pod) {
c.rwLock.Lock()
defer c.rwLock.Unlock()

resources := armadaresource.TotalPodResourceRequest(&pod.Spec)
c.nodeAvailableResource[pod.Spec.NodeName].Add(resources)
c.deallocateNoLock(pod)
}

func (c *FakeClusterContext) deallocateNoLock(pod *v1.Pod) {
if pod.Spec.NodeName == "" {
return
}

if c.nodeAllocation[pod.Spec.NodeName].allocatedPods[pod.Name] {
resources := armadaresource.TotalPodResourceRequest(&pod.Spec)
c.nodeAllocation[pod.Spec.NodeName].availableResource.Add(resources)
delete(c.nodeAllocation[pod.Spec.NodeName].allocatedPods, pod.Name)
}
}

func (c *FakeClusterContext) isSchedulableOn(pod *v1.Pod, n *v1.Node) bool {
requiredResource := armadaresource.TotalPodResourceRequest(&pod.Spec)
availableResource := c.nodeAvailableResource[n.Name].DeepCopy()
availableResource := c.nodeAllocation[n.Name].availableResource.DeepCopy()
availableResource.Sub(requiredResource)

// resources
Expand Down

0 comments on commit 6e52201

Please sign in to comment.