diff --git a/pkg/cache/application_events.go b/pkg/cache/application_events.go index 23a89e014..d3617e050 100644 --- a/pkg/cache/application_events.go +++ b/pkg/cache/application_events.go @@ -223,4 +223,4 @@ func (re ReleaseAppAllocationEvent) GetArgs() []interface{} { func (re ReleaseAppAllocationEvent) GetEvent() events.ApplicationEventType { return re.event -} \ No newline at end of file +} diff --git a/pkg/cache/placeholder.go b/pkg/cache/placeholder.go index 0bf4a4c1a..d048630e6 100644 --- a/pkg/cache/placeholder.go +++ b/pkg/cache/placeholder.go @@ -33,16 +33,8 @@ type Placeholder struct { appID string taskGroupName string pod *v1.Pod - stage PlaceholderState } -type PlaceholderState string - -const ( - Acquiring PlaceholderState = "Acquiring" - Acquired PlaceholderState = "Acquired" -) - func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1.TaskGroup) *Placeholder { placeholderPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -54,7 +46,7 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1 }, Annotations: map[string]string{ constants.AnnotationPlaceholderFlag: "true", - constants.AnnotationTaskGroupName: taskGroup.Name, + constants.AnnotationTaskGroupName: taskGroup.Name, }, }, Spec: v1.PodSpec{ @@ -69,6 +61,8 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1 }, RestartPolicy: constants.PlaceholderPodRestartPolicy, SchedulerName: constants.SchedulerName, + NodeSelector: taskGroup.NodeSelector, + Tolerations: taskGroup.Tolerations, }, } @@ -76,11 +70,10 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1 appID: app.GetApplicationID(), taskGroupName: taskGroup.Name, pod: placeholderPod, - stage: Acquiring, } } func (p *Placeholder) String() string { - return fmt.Sprintf("appID: %s, taskGroup: %s, podName: %s/%s, stage: %s", - p.appID, p.taskGroupName, p.pod.Namespace, p.pod.Name, p.stage) + return fmt.Sprintf("appID: %s, taskGroup: %s, podName: %s/%s", + p.appID, p.taskGroupName, p.pod.Namespace, p.pod.Name) } diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index f4b16a0d1..b25b97c2a 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -107,13 +107,6 @@ func (mgr *PlaceholderManager) CleanUp(app *Application) { zap.String("appID", app.GetApplicationID())) } -// this is only used in testing -func (mgr *PlaceholderManager) setMockedClients(mockedClients *client.Clients) { - mgr.Lock() - defer mgr.Unlock() - mgr.clients = mockedClients -} - func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { mgr.Lock() defer mgr.Unlock() diff --git a/pkg/cache/placeholder_test.go b/pkg/cache/placeholder_test.go index 42bf58c80..e9d27864b 100644 --- a/pkg/cache/placeholder_test.go +++ b/pkg/cache/placeholder_test.go @@ -22,6 +22,7 @@ import ( "testing" "gotest.tools/assert" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" @@ -51,7 +52,6 @@ func TestNewPlaceholder(t *testing.T) { holder := newPlaceholder("ph-name", app, app.taskGroups[0]) assert.Equal(t, holder.appID, appID) - assert.Equal(t, holder.stage, Acquiring) assert.Equal(t, holder.taskGroupName, app.taskGroups[0].Name) assert.Equal(t, holder.pod.Spec.SchedulerName, constants.SchedulerName) assert.Equal(t, holder.pod.Name, "ph-name") @@ -63,5 +63,74 @@ func TestNewPlaceholder(t *testing.T) { assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name) assert.Equal(t, common.GetPodResource(holder.pod).Resources[constants.CPU].Value, int64(500)) assert.Equal(t, common.GetPodResource(holder.pod).Resources[constants.Memory].Value, int64(1024)) - assert.Equal(t, holder.String(), "appID: app01, taskGroup: test-group-1, podName: test/ph-name, stage: Acquiring") + assert.Equal(t, len(holder.pod.Spec.NodeSelector), 0) + assert.Equal(t, len(holder.pod.Spec.Tolerations), 0) + assert.Equal(t, holder.String(), "appID: app01, taskGroup: test-group-1, podName: test/ph-name") +} + +func TestNewPlaceholderWithNodeSelectors(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + "memory": resource.MustParse("1024M"), + }, + NodeSelector: map[string]string{ + "nodeType": "test", + "nodeState": "healthy", + }, + }, + }) + + holder := newPlaceholder("ph-name", app, app.taskGroups[0]) + assert.Equal(t, len(holder.pod.Spec.NodeSelector), 2) + assert.Equal(t, holder.pod.Spec.NodeSelector["nodeType"], "test") + assert.Equal(t, holder.pod.Spec.NodeSelector["nodeState"], "healthy") +} + +func TestNewPlaceholderWithTolerations(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + "memory": resource.MustParse("1024M"), + }, + Tolerations: []v1.Toleration{ + { + Key: "key1", + Operator: v1.TolerationOpEqual, + Value: "value1", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }) + + holder := newPlaceholder("ph-name", app, app.taskGroups[0]) + assert.Equal(t, len(holder.pod.Spec.Tolerations), 1) + tlr := holder.pod.Spec.Tolerations[0] + assert.Equal(t, tlr.Key, "key1") + assert.Equal(t, tlr.Value, "value1") + assert.Equal(t, tlr.Operator, v1.TolerationOpEqual) + assert.Equal(t, tlr.Effect, v1.TaintEffectNoSchedule) } diff --git a/pkg/cache/task.go b/pkg/cache/task.go index 979546c77..8fa50c7eb 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -23,9 +23,9 @@ import ( "sync" "time" - "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces" "go.uber.org/zap" + "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces" "github.com/apache/incubator-yunikorn-k8shim/pkg/common" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" @@ -55,7 +55,6 @@ type Task struct { lock *sync.RWMutex } -// TODO to be removed func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task { taskResource := common.GetPodResource(pod) return createTaskInternal(tid, app, taskResource, pod, false, "", ctx) diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index f1956ddb3..5cb37b07f 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -86,7 +86,7 @@ func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRe return result } -func GetTerminationTypeFromString(terminationTypeStr string) si.AllocationRelease_TerminationType{ +func GetTerminationTypeFromString(terminationTypeStr string) si.AllocationRelease_TerminationType { if v, ok := si.AllocationRelease_TerminationType_value[terminationTypeStr]; ok { return si.AllocationRelease_TerminationType(v) }