Skip to content

Commit

Permalink
Fix PodCache handling of multi node jobs (#573)
Browse files Browse the repository at this point in the history
The PodCache used job id to identify a pod uniquely. The issue with this is that JobId is no longer unique to a single pod.

So when you cancelled a multi node job, it'll delete one of the pods then leave the others until the cache expired
 - We prevent repeated deletion calls by holding an empty value for that pod in the cache
However this means we wait for the PodExpiry between each pod being deleted of a multi node job

Similarly we have a cache of submitted pods (that may not have been reported back via the api yet). However we'd incorrectly report only 1 pod being submitted when submitting many pods as part of a multi node job.
  • Loading branch information
JamesMurkin committed May 19, 2021
1 parent a47ac57 commit 4a41caf
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 34 deletions.
10 changes: 5 additions & 5 deletions internal/executor/context/cluster_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewClusterContext(
log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", obj)
return
}
context.submittedPods.Delete(util.ExtractJobId(pod))
context.submittedPods.Delete(util.ExtractPodKey(pod))
},
})

Expand Down Expand Up @@ -216,7 +216,7 @@ func (c *KubernetesClusterContext) SubmitPod(pod *v1.Pod, owner string) (*v1.Pod
returnedPod, err := ownerClient.CoreV1().Pods(pod.Namespace).Create(ctx.Background(), pod, metav1.CreateOptions{})

if err != nil {
c.submittedPods.Delete(util.ExtractJobId(pod))
c.submittedPods.Delete(util.ExtractPodKey(pod))
}
return returnedPod, err
}
Expand Down Expand Up @@ -266,12 +266,12 @@ func (c *KubernetesClusterContext) ProcessPodsToDelete() {
continue
}
err := c.kubernetesClient.CoreV1().Pods(podToDelete.Namespace).Delete(ctx.Background(), podToDelete.Name, deleteOptions)
jobId := util.ExtractJobId(podToDelete)
podId := util.ExtractPodKey(podToDelete)
if err == nil || errors.IsNotFound(err) {
c.podsToDelete.Update(jobId, nil)
c.podsToDelete.Update(podId, nil)
} else {
log.Errorf("Failed to delete pod %s/%s because %s", podToDelete.Namespace, podToDelete.Name, err)
c.podsToDelete.Delete(jobId)
c.podsToDelete.Delete(podId)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/context/cluster_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func waitForContextSync(t *testing.T, context *KubernetesClusterContext, pods []
allSync = false
break
}
if cachedPod := context.submittedPods.Get(pod.Labels[domain.JobId]); cachedPod != nil {
if cachedPod := context.submittedPods.Get(util.ExtractPodKey(pod)); cachedPod != nil {
allSync = false
break
}
Expand Down
4 changes: 3 additions & 1 deletion internal/executor/service/job_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func (jobLeaseService *JobLeaseService) reportTerminated(pods []*v1.Pod) {
for _, pod := range pods {
event := reporter.CreateJobTerminatedEvent(pod, "Pod terminated because lease could not be renewed.", jobLeaseService.clusterContext.GetClusterId())
jobLeaseService.eventReporter.QueueEvent(event, func(err error) {
log.Errorf("Failed to report terminated pod %s: %s", pod.Name, err)
if err != nil {
log.Errorf("Failed to report terminated pod %s: %s", pod.Name, err)
}
})
}
}
Expand Down
32 changes: 16 additions & 16 deletions internal/executor/util/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type PodCache interface {
Add(pod *v1.Pod)
AddIfNotExists(pod *v1.Pod) bool
Update(key string, pod *v1.Pod) bool
Delete(jobId string)
Get(jobId string) *v1.Pod
Delete(podId string)
Get(podId string) *v1.Pod
GetAll() []*v1.Pod
}

Expand All @@ -41,7 +41,7 @@ func NewTimeExpiringPodCache(expiry time.Duration, cleanUpInterval time.Duration
sizeGauge: promauto.NewGauge(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + metricName + "_cache_size",
Help: "Number of jobs in the submitted job cache",
Help: "Number of pods in the pod cache",
},
),
}
Expand All @@ -50,58 +50,58 @@ func NewTimeExpiringPodCache(expiry time.Duration, cleanUpInterval time.Duration
}

func (podCache *mapPodCache) Add(pod *v1.Pod) {
jobId := ExtractJobId(pod)
podId := ExtractPodKey(pod)

podCache.rwLock.Lock()
defer podCache.rwLock.Unlock()

podCache.records[jobId] = cacheRecord{pod: pod.DeepCopy(), expiry: time.Now().Add(podCache.defaultExpiry)}
podCache.records[podId] = cacheRecord{pod: pod.DeepCopy(), expiry: time.Now().Add(podCache.defaultExpiry)}
podCache.sizeGauge.Inc()
}

func (podCache *mapPodCache) AddIfNotExists(pod *v1.Pod) bool {
jobId := ExtractJobId(pod)
podId := ExtractPodKey(pod)

podCache.rwLock.Lock()
defer podCache.rwLock.Unlock()

existing, ok := podCache.records[jobId]
existing, ok := podCache.records[podId]
exists := ok && existing.expiry.After(time.Now())
if !exists {
podCache.records[jobId] = cacheRecord{pod: pod.DeepCopy(), expiry: time.Now().Add(podCache.defaultExpiry)}
podCache.records[podId] = cacheRecord{pod: pod.DeepCopy(), expiry: time.Now().Add(podCache.defaultExpiry)}
podCache.sizeGauge.Inc()
}
return !exists
}

func (podCache *mapPodCache) Update(jobId string, pod *v1.Pod) bool {
func (podCache *mapPodCache) Update(podId string, pod *v1.Pod) bool {
podCache.rwLock.Lock()
defer podCache.rwLock.Unlock()

existing, ok := podCache.records[jobId]
existing, ok := podCache.records[podId]
exists := ok && existing.expiry.After(time.Now())
if exists {
podCache.records[jobId] = cacheRecord{pod: pod.DeepCopy(), expiry: time.Now().Add(podCache.defaultExpiry)}
podCache.records[podId] = cacheRecord{pod: pod.DeepCopy(), expiry: time.Now().Add(podCache.defaultExpiry)}
}
return ok
}

func (podCache *mapPodCache) Delete(jobId string) {
func (podCache *mapPodCache) Delete(podId string) {
podCache.rwLock.Lock()
defer podCache.rwLock.Unlock()

_, ok := podCache.records[jobId]
_, ok := podCache.records[podId]
if ok {
delete(podCache.records, jobId)
delete(podCache.records, podId)
podCache.sizeGauge.Dec()
}
}

func (podCache *mapPodCache) Get(jobId string) *v1.Pod {
func (podCache *mapPodCache) Get(podId string) *v1.Pod {
podCache.rwLock.Lock()
defer podCache.rwLock.Unlock()

record := podCache.records[jobId]
record := podCache.records[podId]
if record.expiry.After(time.Now()) {
return record.pod.DeepCopy()
}
Expand Down
25 changes: 14 additions & 11 deletions internal/executor/util/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestMapPodCache_Add(t *testing.T) {
cache := NewTimeExpiringPodCache(time.Minute, time.Second, "metric1")
cache.Add(pod)

assert.Equal(t, pod, cache.Get("job1"))
assert.Equal(t, pod, cache.Get(ExtractPodKey(pod)))
}

func TestMapPodCache_Add_Expires(t *testing.T) {
Expand All @@ -33,7 +33,7 @@ func TestMapPodCache_Add_Expires(t *testing.T) {
cache := NewTimeExpiringPodCache(time.Second/10, time.Second/100, "metric1")
cache.Add(pod)

assert.Equal(t, pod, cache.Get("job1"))
assert.Equal(t, pod, cache.Get(ExtractPodKey(pod)))

time.Sleep(time.Second / 5)

Expand All @@ -52,7 +52,7 @@ func TestMapPodCache_AddIfNotExists(t *testing.T) {
cache := NewTimeExpiringPodCache(time.Minute, time.Second, "metric1")
assert.True(t, cache.AddIfNotExists(pod1))
assert.False(t, cache.AddIfNotExists(pod2))
assert.Equal(t, "1", cache.Get("job1").Name)
assert.Equal(t, "1", cache.Get(ExtractPodKey(pod1)).Name)
}

func TestMapPodCache_Update(t *testing.T) {
Expand All @@ -64,11 +64,11 @@ func TestMapPodCache_Update(t *testing.T) {
pod2.Name = "2"

cache := NewTimeExpiringPodCache(time.Minute, time.Second, "metric1")
assert.False(t, cache.Update("job1", pod1))
assert.False(t, cache.Update(ExtractPodKey(pod1), pod1))
assert.Equal(t, 0, len(cache.GetAll()))
cache.Add(pod1)
assert.True(t, cache.Update("job1", pod2))
assert.Equal(t, "2", cache.Get("job1").Name)
assert.True(t, cache.Update(ExtractPodKey(pod2), pod2))
assert.Equal(t, "2", cache.Get(ExtractPodKey(pod1)).Name)
}

func TestMapPodCache_Delete(t *testing.T) {
Expand All @@ -78,10 +78,10 @@ func TestMapPodCache_Delete(t *testing.T) {
cache := NewTimeExpiringPodCache(time.Minute, time.Second, "metric1")

cache.Add(pod)
assert.NotNil(t, cache.Get("job1"))
assert.NotNil(t, cache.Get(ExtractPodKey(pod)))

cache.Delete("job1")
assert.Nil(t, cache.Get("job1"))
cache.Delete(ExtractPodKey(pod))
assert.Nil(t, cache.Get(ExtractPodKey(pod)))
}

func TestMapPodCache_Delete_DoNotFailOnUnrecognisedKey(t *testing.T) {
Expand All @@ -101,7 +101,7 @@ func TestNewMapPodCache_Get_ReturnsCopy(t *testing.T) {

cache.Add(pod)

result := cache.Get("job1")
result := cache.Get(ExtractPodKey(pod))
assert.Equal(t, result, pod)

pod.Namespace = "new value"
Expand Down Expand Up @@ -149,7 +149,10 @@ func TestMapPodCache_GetReturnsACopy(t *testing.T) {
func makeManagedPod(jobId string) *v1.Pod {
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{domain.JobId: jobId},
Labels: map[string]string{
domain.JobId: jobId,
domain.PodNumber: "0",
},
},
}
return &pod
Expand Down

0 comments on commit 4a41caf

Please sign in to comment.