Skip to content

Commit

Permalink
[YUNIKORN-501] Include node-selector and tolerations in the placehold…
Browse files Browse the repository at this point in the history
…er's pod spec. (#217)
  • Loading branch information
yangwwei committed Jan 14, 2021
1 parent 132fdae commit 396d0b1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/application_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,4 @@ func (re ReleaseAppAllocationEvent) GetArgs() []interface{} {

func (re ReleaseAppAllocationEvent) GetEvent() events.ApplicationEventType {
return re.event
}
}
17 changes: 5 additions & 12 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,8 @@ type Placeholder struct {
appID string
taskGroupName string
pod *v1.Pod
stage PlaceholderState
}

type PlaceholderState string

const (
Acquiring PlaceholderState = "Acquiring"
Acquired PlaceholderState = "Acquired"
)

func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1.TaskGroup) *Placeholder {
placeholderPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -54,7 +46,7 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1
},
Annotations: map[string]string{
constants.AnnotationPlaceholderFlag: "true",
constants.AnnotationTaskGroupName: taskGroup.Name,
constants.AnnotationTaskGroupName: taskGroup.Name,
},
},
Spec: v1.PodSpec{
Expand All @@ -69,18 +61,19 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1
},
RestartPolicy: constants.PlaceholderPodRestartPolicy,
SchedulerName: constants.SchedulerName,
NodeSelector: taskGroup.NodeSelector,
Tolerations: taskGroup.Tolerations,
},
}

return &Placeholder{
appID: app.GetApplicationID(),
taskGroupName: taskGroup.Name,
pod: placeholderPod,
stage: Acquiring,
}
}

func (p *Placeholder) String() string {
return fmt.Sprintf("appID: %s, taskGroup: %s, podName: %s/%s, stage: %s",
p.appID, p.taskGroupName, p.pod.Namespace, p.pod.Name, p.stage)
return fmt.Sprintf("appID: %s, taskGroup: %s, podName: %s/%s",
p.appID, p.taskGroupName, p.pod.Namespace, p.pod.Name)
}
7 changes: 0 additions & 7 deletions pkg/cache/placeholder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ func (mgr *PlaceholderManager) CleanUp(app *Application) {
zap.String("appID", app.GetApplicationID()))
}

// this is only used in testing
func (mgr *PlaceholderManager) setMockedClients(mockedClients *client.Clients) {
mgr.Lock()
defer mgr.Unlock()
mgr.clients = mockedClients
}

func (mgr *PlaceholderManager) cleanOrphanPlaceholders() {
mgr.Lock()
defer mgr.Unlock()
Expand Down
73 changes: 71 additions & 2 deletions pkg/cache/placeholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"gotest.tools/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
Expand Down Expand Up @@ -51,7 +52,6 @@ func TestNewPlaceholder(t *testing.T) {

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
assert.Equal(t, holder.appID, appID)
assert.Equal(t, holder.stage, Acquiring)
assert.Equal(t, holder.taskGroupName, app.taskGroups[0].Name)
assert.Equal(t, holder.pod.Spec.SchedulerName, constants.SchedulerName)
assert.Equal(t, holder.pod.Name, "ph-name")
Expand All @@ -63,5 +63,74 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name)
assert.Equal(t, common.GetPodResource(holder.pod).Resources[constants.CPU].Value, int64(500))
assert.Equal(t, common.GetPodResource(holder.pod).Resources[constants.Memory].Value, int64(1024))
assert.Equal(t, holder.String(), "appID: app01, taskGroup: test-group-1, podName: test/ph-name, stage: Acquiring")
assert.Equal(t, len(holder.pod.Spec.NodeSelector), 0)
assert.Equal(t, len(holder.pod.Spec.Tolerations), 0)
assert.Equal(t, holder.String(), "appID: app01, taskGroup: test-group-1, podName: test/ph-name")
}

func TestNewPlaceholderWithNodeSelectors(t *testing.T) {
const (
appID = "app01"
queue = "root.default"
namespace = "test"
)
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
"bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups([]v1alpha1.TaskGroup{
{
Name: "test-group-1",
MinMember: 10,
MinResource: map[string]resource.Quantity{
"cpu": resource.MustParse("500m"),
"memory": resource.MustParse("1024M"),
},
NodeSelector: map[string]string{
"nodeType": "test",
"nodeState": "healthy",
},
},
})

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
assert.Equal(t, len(holder.pod.Spec.NodeSelector), 2)
assert.Equal(t, holder.pod.Spec.NodeSelector["nodeType"], "test")
assert.Equal(t, holder.pod.Spec.NodeSelector["nodeState"], "healthy")
}

func TestNewPlaceholderWithTolerations(t *testing.T) {
const (
appID = "app01"
queue = "root.default"
namespace = "test"
)
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
"bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups([]v1alpha1.TaskGroup{
{
Name: "test-group-1",
MinMember: 10,
MinResource: map[string]resource.Quantity{
"cpu": resource.MustParse("500m"),
"memory": resource.MustParse("1024M"),
},
Tolerations: []v1.Toleration{
{
Key: "key1",
Operator: v1.TolerationOpEqual,
Value: "value1",
Effect: v1.TaintEffectNoSchedule,
},
},
},
})

holder := newPlaceholder("ph-name", app, app.taskGroups[0])
assert.Equal(t, len(holder.pod.Spec.Tolerations), 1)
tlr := holder.pod.Spec.Tolerations[0]
assert.Equal(t, tlr.Key, "key1")
assert.Equal(t, tlr.Value, "value1")
assert.Equal(t, tlr.Operator, v1.TolerationOpEqual)
assert.Equal(t, tlr.Effect, v1.TaintEffectNoSchedule)
}
3 changes: 1 addition & 2 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync"
"time"

"github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"
"go.uber.org/zap"

"github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/events"
"github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils"
Expand Down Expand Up @@ -55,7 +55,6 @@ type Task struct {
lock *sync.RWMutex
}

// TODO to be removed
func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
taskResource := common.GetPodResource(pod)
return createTaskInternal(tid, app, taskResource, pod, false, "", ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/si_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRe
return result
}

func GetTerminationTypeFromString(terminationTypeStr string) si.AllocationRelease_TerminationType{
func GetTerminationTypeFromString(terminationTypeStr string) si.AllocationRelease_TerminationType {
if v, ok := si.AllocationRelease_TerminationType_value[terminationTypeStr]; ok {
return si.AllocationRelease_TerminationType(v)
}
Expand Down

0 comments on commit 396d0b1

Please sign in to comment.