Skip to content

Commit

Permalink
[YUNIKORN-459]Implement the placeholder cleanup in PlaceholderManager (
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangTing-Yao committed Nov 25, 2020
1 parent f5fb65f commit 44b4069
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (app *Application) onReserving(event *fsm.Event) {
if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil {
// creating placeholder failed
// put the app into recycling queue and turn the app to running state
GetPlaceholderManager().Recycle(app.applicationID)
GetPlaceholderManager().CleanUp(app)
ev := NewRunApplicationEvent(app.applicationID)
dispatcher.Dispatch(ev)
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/cache/placeholder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"

"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
Expand All @@ -30,7 +31,8 @@ import (

// placeholder manager is a service to manage the lifecycle of app placeholders
type PlaceholderManager struct {
clients *client.Clients
clients *client.Clients
orphanPod map[string]*v1.Pod
sync.RWMutex
}

Expand Down Expand Up @@ -70,10 +72,23 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
return nil
}

// recycle all the placeholders for an application
func (mgr *PlaceholderManager) Recycle(appID string) {
log.Logger().Info("start to recycle app placeholders",
zap.String("appID", appID))
// clean up all the placeholders for an application
func (mgr *PlaceholderManager) CleanUp(app *Application) {
log.Logger().Info("start to clean up app placeholders",
zap.String("appID", app.GetApplicationID()))
for taskID, task := range app.taskMap {
if task.GetTaskPlaceholder() {
// remove pod
err := mgr.clients.KubeClient.Delete(task.pod)
if err != nil {
log.Logger().Error("failed to clean up placeholder pod",
zap.Error(err))
mgr.orphanPod[taskID] = task.pod
}
}
}
log.Logger().Info("finish to clean up app placeholders",
zap.String("appID", app.GetApplicationID()))
}

// this is only used in testing
Expand Down
87 changes: 87 additions & 0 deletions pkg/cache/placeholder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"testing"

"gotest.tools/assert"
is "gotest.tools/assert/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
"github.com/apache/incubator-yunikorn-k8shim/pkg/client"
Expand Down Expand Up @@ -86,3 +88,88 @@ func TestCreateAppPlaceholders(t *testing.T) {
err = placeholderMgr.createAppPlaceholders(app)
assert.Error(t, err, "failed to create pod tg-test-group-2-app01-15")
}

func TestCleanUp(t *testing.T) {
const (
appID = "app01"
queue = "root.default"
namespace = "test"
)
mockedContext := initContextForTest()
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
"bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
mockedContext.applications[appID] = app
res := app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 0)

pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-01",
UID: "UID-01",
},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-02",
UID: "UID-02",
},
}
pod3 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-03",
UID: "UID-03",
},
}
taskID1 := "task01"
task1 := NewTask(taskID1, app, mockedContext, pod1)
task1.placeholder = true
app.taskMap[taskID1] = task1
taskID2 := "task02"
task2 := NewTask(taskID2, app, mockedContext, pod2)
task2.placeholder = true
app.taskMap[taskID2] = task2
taskID3 := "task03"
task3 := NewTask(taskID3, app, mockedContext, pod3)
task3.placeholder = false
app.taskMap[taskID3] = task3
res = app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 3)

deletePod := make([]string, 0)
mockedAPIProvider := client.NewMockedAPIProvider()
mockedAPIProvider.MockDeleteFn(func(pod *v1.Pod) error {
deletePod = append(deletePod, pod.Name)
return nil
})
placeholderMgr := &PlaceholderManager{
clients: mockedAPIProvider.GetAPIs(),
orphanPod: make(map[string]*v1.Pod),
RWMutex: sync.RWMutex{},
}
placeholderMgr.CleanUp(app)

// check both pod-01 and pod-02 in deletePod list and pod-03 isn't contain
assert.Assert(t, is.Contains(deletePod, "pod-01"))
assert.Assert(t, is.Contains(deletePod, "pod-02"))
exist := false
for _, item := range deletePod {
if item == "pod-03" {
exist = true
}
}
assert.Equal(t, exist, false)
assert.Equal(t, len(placeholderMgr.orphanPod), 0)
}
6 changes: 6 additions & 0 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (task *Task) GetTaskID() string {
return task.taskID
}

func (task *Task) GetTaskPlaceholder() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.placeholder
}

func (task *Task) GetTaskState() string {
// fsm has its own internal lock, we don't need to hold node's lock here
return task.sm.Current()
Expand Down

0 comments on commit 44b4069

Please sign in to comment.