diff --git a/Makefile b/Makefile index 4daf69216..0e484924c 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,9 @@ OUTPUT=_output RELEASE_BIN_DIR=${OUTPUT}/bin ADMISSION_CONTROLLER_BIN_DIR=${OUTPUT}/admission-controllers/ POD_ADMISSION_CONTROLLER_BINARY=scheduler-admission-controller +GANG_BIN_DIR=${OUTPUT}/gang +GANG_CLIENT_BINARY=simulation-gang-worker +GANG_SERVER_BINARY=simulation-gang-coordinator LOCAL_CONF=conf CONF_FILE=queues.yaml REPO=github.com/apache/incubator-yunikorn-k8shim/pkg @@ -168,6 +171,32 @@ adm_image: admission docker build ./deployments/image/admission -t ${REGISTRY}/yunikorn:admission-${VERSION} @rm -f ./deployments/image/admission/${POD_ADMISSION_CONTROLLER_BINARY} +# Build gang web server and client binary in a production ready version +.PHONY: simulation +simulation: + @echo "building gang web client binary" + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -a -o=${GANG_BIN_DIR}/${GANG_CLIENT_BINARY} -ldflags \ + '-extldflags "-static" -X main.version=${VERSION} -X main.date=${DATE}' \ + -tags netgo -installsuffix netgo \ + ./pkg/simulation/gang/gangclient + @echo "building gang web server binary" + go build -a -o=${GANG_BIN_DIR}/${GANG_SERVER_BINARY} -ldflags \ + '-extldflags "-static" -X main.version=${VERSION} -X main.date=${DATE}' \ + -tags netgo -installsuffix netgo \ + ./pkg/simulation/gang/webserver + +# Build gang test images based on the production ready version +.PHONY: simulation_image +simulation_image: simulation + @echo "building gang test docker images" + @cp ${GANG_BIN_DIR}/${GANG_CLIENT_BINARY} ./deployments/image/gang/gangclient + @cp ${GANG_BIN_DIR}/${GANG_SERVER_BINARY} ./deployments/image/gang/webserver + docker build ./deployments/image/gang/gangclient -t ${REGISTRY}/yunikorn:simulation-gang-worker-latest + docker build ./deployments/image/gang/webserver -t ${REGISTRY}/yunikorn:simulation-gang-coordinator-latest + @rm -f ./deployments/image/gang/gangclient/${GANG_CLIENT_BINARY} + @rm -f ./deployments/image/gang/webserver/${GANG_SERVER_BINARY} + # Build all images based on the production ready version .PHONY: image image: sched_image adm_image diff --git a/deployments/examples/README.md b/deployments/examples/README.md index c1281a128..639c51d47 100644 --- a/deployments/examples/README.md +++ b/deployments/examples/README.md @@ -49,6 +49,14 @@ Deployment files for the driver and executor: A simple example that runs a [kubeflow/tensorflow](./tfjob/tf-job-mnist.yaml) job. In this example it will run a distributed mnist model for e2e test, for full more detail see the [dist-mnist](https://github.com/kubeflow/tf-operator/tree/master/examples/v1/dist-mnist) section. +## gang +A sample application which implement gang in application level. +Start via the [gangDeploy.sh](./gang/cmd/gangDeploy.sh) script, for full more detail see the [gang](./gang/README.md) section. + +Deployment file for gang-coordinator and gang-job: +* [gang-coordinator](./gang/gang-coordinator.yaml) +* [gang-job](./gang/gang-job.yaml) + ## volumes The volumes directory contains three examples: 1. [local volumes](#local-volume) diff --git a/deployments/examples/gang/README.md b/deployments/examples/gang/README.md new file mode 100644 index 000000000..a0a9579b4 --- /dev/null +++ b/deployments/examples/gang/README.md @@ -0,0 +1,28 @@ + + +# Gang in application level + +The following script runs a given number of jobs, and each one is a simulated job that requires gang scheduling support. Where the job only starts to execute its tasks when the cluster has the min member of instances running. + +```shell script +./cmd/gangDeploy.sh +``` +Note: if you prefer to manually launch such jobs, please refer to the `gang-coordinator.yaml` `gang-job.yaml`. Where: +* [gang-coordinator.yaml](./gang-coordinator.yaml): the coordinator service used to coordinate the execution of each job's tasks. +* [gang-job](./gang-job.yaml): the simulated job that requires gang members to be started before executing its tasks. diff --git a/deployments/examples/gang/cmd/gangDeploy.sh b/deployments/examples/gang/cmd/gangDeploy.sh new file mode 100755 index 000000000..42f4f2855 --- /dev/null +++ b/deployments/examples/gang/cmd/gangDeploy.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +#limitations under the License. +# + +# gangDeploy.sh +set -o errexit +set -o nounset +set -o pipefail + +JOBAMOUNT=$1 +GANGMEMBER=$2 +RUNTIMESEC=$3 + +# create service +kubectl create -f <(cat << EOF +apiVersion: v1 +kind: Service +metadata: + name: gangservice + labels: + app: gang +spec: + selector: + app: gang + type: ClusterIP + ports: + - protocol: TCP + port: 8863 + targetPort: 8863 +EOF) +# create job counter web server +kubectl create -f <(cat << EOF +apiVersion: v1 +kind: Pod +metadata: + name: gangweb + labels: + app: gang + queue: root.sandbox +spec: + schedulerName: yunikorn + containers: + - name: gangweb + image: apache/yunikorn:simulation-gang-coordinator-latest + imagePullPolicy: Never + ports: + - containerPort: 8863 +EOF) +# wait for web server to be running +until grep 'Running' <(kubectl get pod gangweb -o=jsonpath='{.status.phase}'); do + sleep 1 +done +# create gang jobs +for i in $(seq "$JOBAMOUNT"); do + kubectl create -f <(cat << EOF +apiVersion: batch/v1 +kind: Job +metadata: + name: gang-job-$i + labels: + app: gang + queue: root.sandbox +spec: + completions: $GANGMEMBER + parallelism: $GANGMEMBER + template: + spec: + containers: + - name: gang + image: apache/yunikorn:simulation-gang-worker-latest + imagePullPolicy: Never + env: + - name: JOB_ID + value: gang-job-$i + - name: SERVICE_NAME + value: gangservice + - name: MEMBER_AMOUNT + value: "$GANGMEMBER" + - name: TASK_EXECUTION_SECONDS + value: "$RUNTIMESEC" + restartPolicy: Never + schedulerName: yunikorn +EOF) +done \ No newline at end of file diff --git a/deployments/examples/gang/gang-coordinator.yaml b/deployments/examples/gang/gang-coordinator.yaml new file mode 100644 index 000000000..d44776087 --- /dev/null +++ b/deployments/examples/gang/gang-coordinator.yaml @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: v1 +kind: Service +metadata: + name: gangservice + labels: + app: gang +spec: + selector: + app: gang + type: ClusterIP + ports: + - protocol: TCP + port: 8863 + targetPort: 8863 +--- +apiVersion: v1 +kind: Pod +metadata: + name: gangweb + labels: + app: gang + queue: root.sandbox +spec: + schedulerName: yunikorn + containers: + - name: gangweb + image: apache/yunikorn:simulation-gang-coordinator-latest + imagePullPolicy: Never + ports: + - containerPort: 8863 \ No newline at end of file diff --git a/deployments/examples/gang/gang-job.yaml b/deployments/examples/gang/gang-job.yaml new file mode 100644 index 000000000..852e90ba0 --- /dev/null +++ b/deployments/examples/gang/gang-job.yaml @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: batch/v1 +kind: Job +metadata: + name: gang-job-1 + labels: + app: gang + queue: root.sandbox +spec: + completions: 10 # The pod number will create + parallelism: 10 # The pod number will create + template: + spec: + containers: + - name: gang + image: apache/yunikorn:simulation-gang-worker-latest + imagePullPolicy: Never + env: + - name: JOB_ID + value: gang-job-1 # This job's name + - name: SERVICE_NAME + value: gangservice # The service name + - name: MEMBER_AMOUNT + value: "10" # The gang member that you hope, must small than pod number.must be string. + - name: TASK_EXECUTION_SECONDS + value: "60" # The task execution time (sec), it will start to countdown when the gang member amount be satisfied. must be string. + restartPolicy: Never + schedulerName: yunikorn \ No newline at end of file diff --git a/deployments/image/gang/gangclient/dockerfile b/deployments/image/gang/gangclient/dockerfile new file mode 100644 index 000000000..f5e03c720 --- /dev/null +++ b/deployments/image/gang/gangclient/dockerfile @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.15.2 +WORKDIR /gang/client +ADD . /gang/client +ENTRYPOINT ["./simulation-gang-worker"] \ No newline at end of file diff --git a/deployments/image/gang/webserver/dockerfile b/deployments/image/gang/webserver/dockerfile new file mode 100644 index 000000000..a519d7514 --- /dev/null +++ b/deployments/image/gang/webserver/dockerfile @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM golang:1.15.2 +WORKDIR /gang/server +ADD . /gang/server +EXPOSE 8863 +ENTRYPOINT ["./simulation-gang-coordinator"] \ No newline at end of file diff --git a/deployments/yunikorn-application/application-definition.yaml b/deployments/yunikorn-application/application-definition.yaml index 1b0ee81c2..7d78c37fc 100644 --- a/deployments/yunikorn-application/application-definition.yaml +++ b/deployments/yunikorn-application/application-definition.yaml @@ -50,10 +50,10 @@ spec: spec: type: object properties: - policy: + schedulingPolicy: type: object properties: - policy: + type: type: string parameters: type: object @@ -67,7 +67,7 @@ spec: items: type: object properties: - groupName: + name: type: string minMember: type: integer @@ -79,6 +79,26 @@ spec: - type: string pattern: '^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$' x-kubernetes-int-or-string: true + nodeSelector: + type: object + additionalProperties: + type: string + tolerations: + type: array + items: + type: object + properties: + effect: + type: string + key: + type: string + operator: + type: string + value: + type: string + tolerationSeconds: + format: int64 + type: integer status: type: object properties: diff --git a/deployments/yunikorn-application/sample-yunikorn-app.yaml b/deployments/yunikorn-application/sample-yunikorn-app.yaml index 5f581a682..618aca6e9 100644 --- a/deployments/yunikorn-application/sample-yunikorn-app.yaml +++ b/deployments/yunikorn-application/sample-yunikorn-app.yaml @@ -21,18 +21,25 @@ metadata: name: example spec: schedulingPolicy: - name: TryOnce + type: TryOnce parameters: timeout: "60" retrySecond: "0" queue: root.default taskGroups: - - groupName: "test-task-0001" + - name: "test-task-0001" minMember: 1 minResource: cpu: "300m" memory: "128Mi" - - groupName: "test-task-0002" + nodeSelector: + locate: west + tolerations: + - key: "key" + operator: "Equal" + value: "value" + effect: "NoSchedule" + - name: "test-task-0002" minMember: 2 minResource: cpu: "600m" diff --git a/go.mod b/go.mod index 6c9da55ac..786a633bb 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/apache/incubator-yunikorn-core v0.0.0-20210113182138-361d455c7f28 github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20210106054514-49c4f33ed27b github.com/google/uuid v1.1.1 + github.com/gorilla/mux v1.7.3 github.com/looplab/fsm v0.1.0 github.com/onsi/ginkgo v1.11.0 github.com/onsi/gomega v1.7.0 diff --git a/go.sum b/go.sum index 4f9a9650a..fe3762d86 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,8 @@ github.com/apache/incubator-yunikorn-core v0.0.0-20210106054125-dcf631d5c6b5 h1: github.com/apache/incubator-yunikorn-core v0.0.0-20210106054125-dcf631d5c6b5/go.mod h1:d/fn47kdGd094NHiysHAY+0d4evpYRFQ6q5b9eaBONo= github.com/apache/incubator-yunikorn-core v0.0.0-20210113182138-361d455c7f28 h1:7NO76ZYnYtrdC8CT+ZjBKsezSzviYwDjW9DzVu090/M= github.com/apache/incubator-yunikorn-core v0.0.0-20210113182138-361d455c7f28/go.mod h1:IhX7VztC5z9LfPVKTIkbkuisl+2CLum0neLVMF+hfXM= +github.com/apache/incubator-yunikorn-core v0.0.0-20210113193345-bd9ed16dd49f h1:GTWn7W/KdjvihGHBIcjd+Q5FRNJl9Ycj4a+NWfQ8kD8= +github.com/apache/incubator-yunikorn-core v0.0.0-20210113193345-bd9ed16dd49f/go.mod h1:IhX7VztC5z9LfPVKTIkbkuisl+2CLum0neLVMF+hfXM= github.com/apache/incubator-yunikorn-scheduler-interface v0.0.0-20210106054514-49c4f33ed27b/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0= github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319 h1:I12nCcXdHe6W4oysVejFHfQDy0Ix0r+ZHdfooyEoldo= github.com/apache/incubator-yunikorn-scheduler-interface v0.9.1-0.20200901200728-b9033558f319/go.mod h1:ObMs03XFbnmpGD81jYvdUDEVZbHvz8W6dWH5nGDCjc0= diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go index bd20f8b32..f06701e28 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/type.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/type.go @@ -19,6 +19,7 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -37,29 +38,31 @@ type Application struct { // Spec part type ApplicationSpec struct { - Policy SchedulePolicy `json:"schedulingPolicy"` - Queue string `json:"queue"` - TaskGroup []Task `json:"taskGroups"` + SchedulingPolicy SchedulingPolicy `json:"schedulingPolicy"` + Queue string `json:"queue"` + TaskGroups []TaskGroup `json:"taskGroups"` } -type SchedulePolicy struct { - Policy SchedulingPolicy `json:"name"` - Parameters map[string]string `json:"parameters,omitempty"` +type SchedulingPolicy struct { + Type SchedulingPolicyType `json:"type"` + Parameters map[string]string `json:"parameters,omitempty"` } -type SchedulingPolicy string +type SchedulingPolicyType string const ( - TryOnce SchedulingPolicy = "TryOnce" - MaxRetry SchedulingPolicy = "MaxRetry" - TryReserve SchedulingPolicy = "TryReserve" - TryPreempt SchedulingPolicy = "TryPreempt" + TryOnce SchedulingPolicyType = "TryOnce" + MaxRetry SchedulingPolicyType = "MaxRetry" + TryReserve SchedulingPolicyType = "TryReserve" + TryPreempt SchedulingPolicyType = "TryPreempt" ) -type Task struct { - GroupName string `json:"groupName"` - MinMember int32 `json:"minMember"` - MinResource map[string]resource.Quantity `json:"minResource"` +type TaskGroup struct { + Name string `json:"name"` + MinMember int32 `json:"minMember"` + MinResource map[string]resource.Quantity `json:"minResource"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + Tolerations []v1.Toleration `json:"tolerations,omitempty"` } // Status part diff --git a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go index 25f88eb17..3149d782c 100644 --- a/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/yunikorn.apache.org/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" resource "k8s.io/apimachinery/pkg/api/resource" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -90,10 +91,10 @@ func (in *ApplicationList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ApplicationSpec) DeepCopyInto(out *ApplicationSpec) { *out = *in - in.Policy.DeepCopyInto(&out.Policy) - if in.TaskGroup != nil { - in, out := &in.TaskGroup, &out.TaskGroup - *out = make([]Task, len(*in)) + in.SchedulingPolicy.DeepCopyInto(&out.SchedulingPolicy) + if in.TaskGroups != nil { + in, out := &in.TaskGroups, &out.TaskGroups + *out = make([]TaskGroup, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -129,7 +130,7 @@ func (in *ApplicationStatus) DeepCopy() *ApplicationStatus { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { +func (in *SchedulingPolicy) DeepCopyInto(out *SchedulingPolicy) { *out = *in if in.Parameters != nil { in, out := &in.Parameters, &out.Parameters @@ -141,18 +142,18 @@ func (in *SchedulePolicy) DeepCopyInto(out *SchedulePolicy) { return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulePolicy. -func (in *SchedulePolicy) DeepCopy() *SchedulePolicy { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchedulingPolicy. +func (in *SchedulingPolicy) DeepCopy() *SchedulingPolicy { if in == nil { return nil } - out := new(SchedulePolicy) + out := new(SchedulingPolicy) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Task) DeepCopyInto(out *Task) { +func (in *TaskGroup) DeepCopyInto(out *TaskGroup) { *out = *in if in.MinResource != nil { in, out := &in.MinResource, &out.MinResource @@ -161,15 +162,29 @@ func (in *Task) DeepCopyInto(out *Task) { (*out)[key] = val.DeepCopy() } } + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]v1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Task. -func (in *Task) DeepCopy() *Task { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskGroup. +func (in *TaskGroup) DeepCopy() *TaskGroup { if in == nil { return nil } - out := new(Task) + out := new(TaskGroup) in.DeepCopyInto(out) return out } diff --git a/pkg/appmgmt/general/general.go b/pkg/appmgmt/general/general.go index adf7e18cd..4061ab634 100644 --- a/pkg/appmgmt/general/general.go +++ b/pkg/appmgmt/general/general.go @@ -20,7 +20,7 @@ package general import ( "go.uber.org/zap" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" k8sCache "k8s.io/client-go/tools/cache" @@ -88,10 +88,15 @@ func (os *Manager) getTaskMetadata(pod *v1.Pod) (interfaces.TaskMetadata, bool) return interfaces.TaskMetadata{}, false } + placeholder := utils.GetPlaceholderFlagFromPodSpec(pod) + taskGroupName := utils.GetTaskGroupFromPodSpec(pod) + return interfaces.TaskMetadata{ ApplicationID: appId, TaskID: string(pod.UID), Pod: pod, + Placeholder: placeholder, + TaskGroupName: taskGroupName, }, true } @@ -114,11 +119,17 @@ func (os *Manager) getAppMetadata(pod *v1.Pod) (interfaces.ApplicationMetadata, // get the application owner (this is all that is available as far as we can find) user := pod.Spec.ServiceAccountName + taskGroups, err := utils.GetTaskGroupsFromAnnotation(pod) + if err != nil { + log.Logger().Error("unable to get taskGroups by given pod", zap.Error(err)) + } + return interfaces.ApplicationMetadata{ ApplicationID: appId, QueueName: utils.GetQueueNameFromPod(pod), User: user, Tags: tags, + TaskGroups: taskGroups, }, true } @@ -294,6 +305,8 @@ func (os *Manager) GetExistingAllocation(pod *v1.Pod) *si.Allocation { // when submit a task, we use pod UID as the allocationKey, // to keep consistent, during recovery, the pod UID is also used // for an Allocation. + placeholder := utils.GetPlaceholderFlagFromPodSpec(pod) + taskGroupName := utils.GetTaskGroupFromPodSpec(pod) return &si.Allocation{ AllocationKey: string(pod.UID), AllocationTags: meta.Tags, @@ -302,6 +315,8 @@ func (os *Manager) GetExistingAllocation(pod *v1.Pod) *si.Allocation { QueueName: meta.QueueName, NodeID: pod.Spec.NodeName, ApplicationID: meta.ApplicationID, + Placeholder: placeholder, + TaskGroupName: taskGroupName, PartitionName: constants.DefaultPartition, } } diff --git a/pkg/appmgmt/general/general_test.go b/pkg/appmgmt/general/general_test.go index a2fdeb03d..9c0b48d80 100644 --- a/pkg/appmgmt/general/general_test.go +++ b/pkg/appmgmt/general/general_test.go @@ -23,6 +23,7 @@ import ( "gotest.tools/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" apis "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/incubator-yunikorn-k8shim/pkg/cache" @@ -31,6 +32,18 @@ import ( "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" ) +const taskGroupInfo = ` +[ + { + "name": "test-group-1", + "minMember": 3, + "minResource": { + "cpu": 2, + "memory": "1Gi" + } + } +]` + func TestGetAppMetadata(t *testing.T) { am := NewManager(cache.NewMockedAMProtocol(), client.NewMockedAPIProvider()) @@ -47,6 +60,9 @@ func TestGetAppMetadata(t *testing.T) { "applicationId": "app00001", "queue": "root.a", }, + Annotations: map[string]string{ + constants.AnnotationTaskGroups: taskGroupInfo, + }, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, Status: v1.PodStatus{ @@ -60,6 +76,10 @@ func TestGetAppMetadata(t *testing.T) { assert.Equal(t, app.QueueName, "root.a") assert.Equal(t, app.User, "") assert.DeepEqual(t, app.Tags, map[string]string{"namespace": "default"}) + assert.Equal(t, app.TaskGroups[0].Name, "test-group-1") + assert.Equal(t, app.TaskGroups[0].MinMember, int32(3)) + assert.Equal(t, app.TaskGroups[0].MinResource["cpu"], resource.MustParse("2")) + assert.Equal(t, app.TaskGroups[0].MinResource["memory"], resource.MustParse("1Gi")) pod = v1.Pod{ TypeMeta: apis.TypeMeta{ @@ -90,6 +110,7 @@ func TestGetAppMetadata(t *testing.T) { assert.Equal(t, app.QueueName, "root.b") assert.Equal(t, app.User, "bob") assert.DeepEqual(t, app.Tags, map[string]string{"namespace": "app-namespace-01"}) + assert.DeepEqual(t, len(app.TaskGroups), 0) pod = v1.Pod{ TypeMeta: apis.TypeMeta{ @@ -130,6 +151,9 @@ func TestGetTaskMetadata(t *testing.T) { "applicationId": "app00001", "queue": "root.a", }, + Annotations: map[string]string{ + constants.AnnotationTaskGroupName: "test-group-01", + }, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, Status: v1.PodStatus{ @@ -141,6 +165,11 @@ func TestGetTaskMetadata(t *testing.T) { assert.Equal(t, ok, true) assert.Equal(t, task.ApplicationID, "app00001") assert.Equal(t, task.TaskID, "UID-POD-00001") + assert.Equal(t, task.TaskGroupName, "test-group-01") + pod.Annotations = map[string]string{} + task, ok = am.getTaskMetadata(&pod) + assert.Equal(t, ok, true) + assert.Equal(t, task.TaskGroupName, "") pod = v1.Pod{ TypeMeta: apis.TypeMeta{ diff --git a/pkg/appmgmt/interfaces/amprotocol.go b/pkg/appmgmt/interfaces/amprotocol.go index 67c824f9a..0476c011a 100644 --- a/pkg/appmgmt/interfaces/amprotocol.go +++ b/pkg/appmgmt/interfaces/amprotocol.go @@ -20,6 +20,8 @@ package interfaces import ( v1 "k8s.io/api/core/v1" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" ) // app management protocol defines all the APIs needed for app management, @@ -72,10 +74,13 @@ type ApplicationMetadata struct { QueueName string User string Tags map[string]string + TaskGroups []v1alpha1.TaskGroup } type TaskMetadata struct { ApplicationID string TaskID string Pod *v1.Pod + Placeholder bool + TaskGroupName string } diff --git a/pkg/cache/application.go b/pkg/cache/application.go index d48bb6eb2..8ff251837 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -28,9 +28,11 @@ import ( v1 "k8s.io/api/core/v1" "github.com/apache/incubator-yunikorn-core/pkg/api" + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" "github.com/apache/incubator-yunikorn-k8shim/pkg/conf" "github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher" "github.com/apache/incubator-yunikorn-k8shim/pkg/log" @@ -38,15 +40,17 @@ import ( ) type Application struct { - applicationID string - queue string - partition string - user string - taskMap map[string]*Task - tags map[string]string - sm *fsm.FSM - lock *sync.RWMutex - schedulerAPI api.SchedulerAPI + applicationID string + queue string + partition string + user string + taskMap map[string]*Task + tags map[string]string + schedulingPolicy v1alpha1.SchedulingPolicy + taskGroups []v1alpha1.TaskGroup + sm *fsm.FSM + lock *sync.RWMutex + schedulerAPI api.SchedulerAPI } func (app *Application) String() string { @@ -58,14 +62,16 @@ func (app *Application) String() string { func NewApplication(appID, queueName, user string, tags map[string]string, scheduler api.SchedulerAPI) *Application { taskMap := make(map[string]*Task) app := &Application{ - applicationID: appID, - queue: queueName, - partition: constants.DefaultPartition, - user: user, - taskMap: taskMap, - tags: tags, - lock: &sync.RWMutex{}, - schedulerAPI: scheduler, + applicationID: appID, + queue: queueName, + partition: constants.DefaultPartition, + user: user, + taskMap: taskMap, + tags: tags, + schedulingPolicy: v1alpha1.SchedulingPolicy{}, + taskGroups: make([]v1alpha1.TaskGroup, 0), + lock: &sync.RWMutex{}, + schedulerAPI: scheduler, } var states = events.States().Application @@ -81,8 +87,14 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched {Name: string(events.AcceptApplication), Src: []string{states.Submitted, states.Recovering}, Dst: states.Accepted}, + {Name: string(events.TryReserve), + Src: []string{states.Accepted}, + Dst: states.Reserving}, + {Name: string(events.UpdateReservation), + Src: []string{states.Reserving}, + Dst: states.Reserving}, {Name: string(events.RunApplication), - Src: []string{states.Accepted, states.Running}, + Src: []string{states.Accepted, states.Reserving, states.Running}, Dst: states.Running}, {Name: string(events.ReleaseAppAllocation), Src: []string{states.Running}, @@ -104,12 +116,15 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched Dst: states.Killed}, }, fsm.Callbacks{ - string(events.SubmitApplication): app.handleSubmitApplicationEvent, - string(events.RecoverApplication): app.handleRecoverApplicationEvent, - string(events.RejectApplication): app.handleRejectApplicationEvent, - string(events.CompleteApplication): app.handleCompleteApplicationEvent, - string(events.ReleaseAppAllocation): app.handleReleaseAppAllocationEvent, - events.EnterState: app.enterState, + string(events.SubmitApplication): app.handleSubmitApplicationEvent, + string(events.RecoverApplication): app.handleRecoverApplicationEvent, + string(events.RejectApplication): app.handleRejectApplicationEvent, + string(events.CompleteApplication): app.handleCompleteApplicationEvent, + string(events.UpdateReservation): app.onReservationStateChange, + events.States().Application.Accepted: app.postAppAccepted, + events.States().Application.Reserving: app.onReserving, + string(events.ReleaseAppAllocation): app.handleReleaseAppAllocationEvent, + events.EnterState: app.enterState, }, ) @@ -170,6 +185,30 @@ func (app *Application) GetUser() string { return app.user } +func (app *Application) setSchedulingPolicy(policy v1alpha1.SchedulingPolicy) { + app.lock.Lock() + defer app.lock.Unlock() + app.schedulingPolicy = policy +} + +func (app *Application) getSchedulingPolicy() v1alpha1.SchedulingPolicy { + app.lock.RLock() + defer app.lock.RUnlock() + return app.schedulingPolicy +} + +func (app *Application) setTaskGroups(taskGroups []v1alpha1.TaskGroup) { + app.lock.Lock() + defer app.lock.Unlock() + app.taskGroups = taskGroups +} + +func (app *Application) getTaskGroups() []v1alpha1.TaskGroup { + app.lock.RLock() + defer app.lock.RUnlock() + return app.taskGroups +} + func (app *Application) addTask(task *Task) { app.lock.Lock() defer app.lock.Unlock() @@ -273,35 +312,14 @@ func (app *Application) Schedule() { log.Logger().Warn("failed to handle SUBMIT app event", zap.Error(err)) } - case states.Accepted: - ev := NewRunApplicationEvent(app.GetApplicationID()) - if err := app.handle(ev); err != nil { - log.Logger().Warn("failed to handle RUN app event", - zap.Error(err)) - } + case states.Reserving: + app.scheduleTasks(func(t *Task) bool { + return t.placeholder + }) case states.Running: - if len(app.GetNewTasks()) > 0 { - for _, task := range app.GetNewTasks() { - // for each new task, we do a sanity check before moving the state to Pending_Schedule - if err := task.sanityCheckBeforeScheduling(); err == nil { - // note, if we directly trigger submit task event, it may spawn too many duplicate - // events, because a task might be submitted multiple times before its state transits to PENDING. - if handleErr := task.handle( - NewSimpleTaskEvent(task.applicationID, task.taskID, events.InitTask)); handleErr != nil { - // something goes wrong when transit task to PENDING state, - // this should not happen because we already checked the state - // before calling the transition. Nowhere to go, just log the error. - log.Logger().Warn("init task failed", zap.Error(err)) - } - } else { - events.GetRecorder().Event(task.GetTaskPod(), v1.EventTypeWarning, "FailedScheduling", err.Error()) - log.Logger().Debug("task is not ready for scheduling", - zap.String("appID", task.applicationID), - zap.String("taskID", task.taskID), - zap.Error(err)) - } - } - } + app.scheduleTasks(func(t *Task) bool { + return !t.placeholder + }) default: log.Logger().Debug("skipping scheduling application", zap.String("appID", app.GetApplicationID()), @@ -309,6 +327,31 @@ func (app *Application) Schedule() { } } +func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) { + for _, task := range app.GetNewTasks() { + if taskScheduleCondition(task) { + // for each new task, we do a sanity check before moving the state to Pending_Schedule + if err := task.sanityCheckBeforeScheduling(); err == nil { + // note, if we directly trigger submit task event, it may spawn too many duplicate + // events, because a task might be submitted multiple times before its state transits to PENDING. + if handleErr := task.handle( + NewSimpleTaskEvent(task.applicationID, task.taskID, events.InitTask)); handleErr != nil { + // something goes wrong when transit task to PENDING state, + // this should not happen because we already checked the state + // before calling the transition. Nowhere to go, just log the error. + log.Logger().Warn("init task failed", zap.Error(err)) + } + } else { + events.GetRecorder().Event(task.GetTaskPod(), v1.EventTypeWarning, "FailedScheduling", err.Error()) + log.Logger().Debug("task is not ready for scheduling", + zap.String("appID", task.applicationID), + zap.String("taskID", task.taskID), + zap.Error(err)) + } + } + } +} + func (app *Application) handleSubmitApplicationEvent(event *fsm.Event) { log.Logger().Info("handle app submission", zap.String("app", app.String()), @@ -363,6 +406,52 @@ func (app *Application) handleRecoverApplicationEvent(event *fsm.Event) { } } +func (app *Application) postAppAccepted(event *fsm.Event) { + // if app has taskGroups defined, it goes to the Reserving state before getting to Running + var ev events.SchedulingEvent + if len(app.taskGroups) != 0 { + ev = NewSimpleApplicationEvent(app.applicationID, events.TryReserve) + dispatcher.Dispatch(ev) + } else { + ev = NewRunApplicationEvent(app.applicationID) + } + dispatcher.Dispatch(ev) +} + +func (app *Application) onReserving(event *fsm.Event) { + go func() { + // while doing reserving + if err := GetPlaceholderManager().createAppPlaceholders(app); err != nil { + // creating placeholder failed + // put the app into recycling queue and turn the app to running state + GetPlaceholderManager().CleanUp(app) + ev := NewRunApplicationEvent(app.applicationID) + dispatcher.Dispatch(ev) + } + }() +} + +func (app *Application) onReservationStateChange(event *fsm.Event) { + // this event is called when there is a add or release of placeholders + desireCounts := utils.NewTaskGroupInstanceCountMap() + for _, tg := range app.taskGroups { + desireCounts.Add(tg.Name, tg.MinMember) + } + + actualCounts := utils.NewTaskGroupInstanceCountMap() + for _, t := range app.getTasks(events.States().Task.Bound) { + if t.placeholder { + actualCounts.AddOne(t.taskGroupName) + } + } + + // min member all satisfied + if desireCounts.Equals(actualCounts) { + ev := NewRunApplicationEvent(app.applicationID) + dispatcher.Dispatch(ev) + } +} + func (app *Application) handleRejectApplicationEvent(event *fsm.Event) { log.Logger().Info("app is rejected by scheduler", zap.String("appID", app.applicationID)) // for rejected apps, we directly move them to failed state @@ -376,16 +465,19 @@ func (app *Application) handleCompleteApplicationEvent(event *fsm.Event) { func (app *Application) handleReleaseAppAllocationEvent(event *fsm.Event) { eventArgs := make([]string, 2) if err := events.GetEventArgsAsStrings(eventArgs, event.Args); err != nil { - log.Logger().Error("fail to paser event arg", zap.Error(err)) + log.Logger().Error("fail to parse event arg", zap.Error(err)) return } allocUUID := eventArgs[0] + terminationTypeStr := eventArgs[1] log.Logger().Info("try to release pod from application", zap.String("appID", app.applicationID), - zap.String("allocationUUID", allocUUID)) + zap.String("allocationUUID", allocUUID), + zap.String("terminationType", terminationTypeStr)) for _, task := range app.taskMap { if task.allocationUUID == allocUUID { + task.setTaskTerminationType(terminationTypeStr) err := task.DeleteTaskPod(task.pod) if err != nil { log.Logger().Error("failed to release allocation from application", zap.Error(err)) diff --git a/pkg/cache/application_events.go b/pkg/cache/application_events.go index 8d77f27dd..d3617e050 100644 --- a/pkg/cache/application_events.go +++ b/pkg/cache/application_events.go @@ -164,6 +164,33 @@ func (fe FailApplicationEvent) GetApplicationID() string { return fe.applicationID } +// ------------------------ +// Reservation Update Event +// ------------------------ +type UpdateApplicationReservationEvent struct { + applicationID string + event events.ApplicationEventType +} + +func NewUpdateApplicationReservationEvent(appID string) UpdateApplicationReservationEvent { + return UpdateApplicationReservationEvent{ + applicationID: appID, + event: events.UpdateReservation, + } +} + +func (ue UpdateApplicationReservationEvent) GetEvent() events.ApplicationEventType { + return ue.event +} + +func (ue UpdateApplicationReservationEvent) GetArgs() []interface{} { + return nil +} + +func (ue UpdateApplicationReservationEvent) GetApplicationID() string { + return ue.applicationID +} + // ------------------------ // Release application allocations // ------------------------ diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go index 769916cc2..ecbeb541d 100644 --- a/pkg/cache/application_test.go +++ b/pkg/cache/application_test.go @@ -19,6 +19,9 @@ package cache import ( + "sort" + "strings" + "sync" "testing" "time" @@ -29,8 +32,12 @@ import ( apis "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/incubator-yunikorn-core/pkg/api" + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/client" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" + "github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher" "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" ) @@ -227,7 +234,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) { assert.Assert(t, is.Contains(res, "/test-00001")) assert.Assert(t, is.Contains(res, "/test-00002")) - //set two tasks to terminated states + // set two tasks to terminated states task1.sm.SetState(events.States().Task.Rejected) task2.sm.SetState(events.States().Task.Rejected) // check the tasks both in terminated states @@ -235,7 +242,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) { res = app.getNonTerminatedTaskAlias() assert.Equal(t, len(res), 0) - //set two tasks to one is terminated, another is non-terminated + // set two tasks to one is terminated, another is non-terminated task1.sm.SetState(events.States().Task.Rejected) task2.sm.SetState(events.States().Task.Allocated) // check the task, should only return task2's alias @@ -243,3 +250,172 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) { assert.Equal(t, len(res), 1) assert.Equal(t, res[0], "/test-00002") } + +func TestSetTaskGroupsAndSchedulingPolicy(t *testing.T) { + app := NewApplication("app01", "root.a", "test-user", map[string]string{}, newMockSchedulerAPI()) + assert.Assert(t, app.getSchedulingPolicy().Type == "") + assert.Equal(t, len(app.getTaskGroups()), 0) + + app.setSchedulingPolicy(v1alpha1.SchedulingPolicy{ + Type: v1alpha1.TryReserve, + Parameters: map[string]string{ + "option-1": "value-1", + "option-2": "value-2", + }, + }) + + assert.Equal(t, app.getSchedulingPolicy().Type, v1alpha1.TryReserve) + assert.Equal(t, len(app.getSchedulingPolicy().Parameters), 2) + assert.Equal(t, app.getSchedulingPolicy().Parameters["option-1"], "value-1", "incorrect parameter value") + assert.Equal(t, app.getSchedulingPolicy().Parameters["option-2"], "value-2", "incorrect parameter value") + + duration := int64(3000) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + v1.ResourceCPU.String(): resource.MustParse("500m"), + v1.ResourceMemory.String(): resource.MustParse("500Mi"), + }, + }, + { + Name: "test-group-2", + MinMember: 20, + MinResource: map[string]resource.Quantity{ + v1.ResourceCPU.String(): resource.MustParse("1000m"), + v1.ResourceMemory.String(): resource.MustParse("1000Mi"), + }, + NodeSelector: map[string]string{ + "locate": "west", + }, + Tolerations: []v1.Toleration{ + { + Key: "nodeType", + Operator: v1.TolerationOpEqual, + Value: "infra", + Effect: v1.TaintEffectNoSchedule, + TolerationSeconds: &duration, + }, + }, + }, + }) + + assert.Assert(t, app.getTaskGroups() != nil) + assert.Equal(t, len(app.getTaskGroups()), 2) + + // sort the slice to give us a stable order + sort.Slice(app.getTaskGroups(), func(i, j int) bool { + return strings.Compare(app.getTaskGroups()[i].Name, app.getTaskGroups()[j].Name) < 0 + }) + + tg1 := app.getTaskGroups()[0] + assert.Equal(t, tg1.Name, "test-group-1") + assert.Equal(t, tg1.MinMember, int32(10)) + assert.Equal(t, tg1.MinResource[v1.ResourceCPU.String()], resource.MustParse("500m")) + assert.Equal(t, tg1.MinResource[v1.ResourceMemory.String()], resource.MustParse("500Mi")) + + tg2 := app.getTaskGroups()[1] + assert.Equal(t, tg2.Name, "test-group-2") + assert.Equal(t, tg2.MinMember, int32(20)) + assert.Equal(t, len(tg2.Tolerations), 1) + assert.Equal(t, tg2.Tolerations[0].Key, "nodeType") + assert.Equal(t, tg2.Tolerations[0].Value, "infra") + assert.Equal(t, tg2.Tolerations[0].Operator, v1.TolerationOpEqual) + assert.Equal(t, tg2.Tolerations[0].Effect, v1.TaintEffectNoSchedule) + assert.Equal(t, tg2.Tolerations[0].TolerationSeconds, &duration) +} + +type threadSafePodsMap struct { + pods map[string]*v1.Pod + sync.RWMutex +} + +func newThreadSafePodsMap() *threadSafePodsMap { + return &threadSafePodsMap{ + pods: make(map[string]*v1.Pod), + } +} + +func (t *threadSafePodsMap) add(pod *v1.Pod) { + t.Lock() + defer t.Unlock() + t.pods[pod.Name] = pod +} + +func (t *threadSafePodsMap) count() int { + t.RLock() + defer t.RUnlock() + return len(t.pods) +} + +func TestTryReserve(t *testing.T) { + context := initContextForTest() + dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) + dispatcher.Start() + defer dispatcher.Stop() + + // inject the mocked clients to the placeholder manager + createdPods := newThreadSafePodsMap() + mockedAPIProvider := client.NewMockedAPIProvider() + mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { + createdPods.add(pod) + return pod, nil + }) + mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) + mgr.Start() + defer mgr.Stop() + + // create a new app + app := NewApplication("app00001", "root.abc", "test-user", + map[string]string{}, mockedAPIProvider.GetAPIs().SchedulerAPI) + context.applications[app.applicationID] = app + + // set app scheduling policy + app.setSchedulingPolicy(v1alpha1.SchedulingPolicy{ + Type: v1alpha1.TryReserve, + Parameters: map[string]string{ + "option-1": "value-1", + "option-2": "value-2", + }, + }) + + // set taskGroups + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + v1.ResourceCPU.String(): resource.MustParse("500m"), + v1.ResourceMemory.String(): resource.MustParse("500Mi"), + }, + }, + { + Name: "test-group-2", + MinMember: 20, + MinResource: map[string]resource.Quantity{ + v1.ResourceCPU.String(): resource.MustParse("1000m"), + v1.ResourceMemory.String(): resource.MustParse("1000Mi"), + }, + }, + }) + + // submit the app + err := app.handle(NewSubmitApplicationEvent(app.applicationID)) + assert.NilError(t, err) + assertAppState(t, app, events.States().Application.Submitted, 3*time.Second) + + // accepted the app + err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), events.AcceptApplication)) + assert.NilError(t, err) + + // since this app has taskGroups defined, + // once the app is accepted, it is expected to see this app goes to Reserving state + assertAppState(t, app, events.States().Application.Reserving, 3*time.Second) + + // under Reserving state, the app will need to acquire all the placeholders it asks for + err = utils.WaitForCondition(func() bool { + return createdPods.count() == 30 + }, 100*time.Millisecond, 3*time.Second) + assert.NilError(t, err, "placeholders are not created") +} diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 80a738aa3..cb8347f83 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -498,6 +498,7 @@ func (ctx *Context) AddApplication(request *interfaces.AddApplicationRequest) in request.Metadata.User, request.Metadata.Tags, ctx.apiProvider.GetAPIs().SchedulerAPI) + app.setTaskGroups(request.Metadata.TaskGroups) // add into cache ctx.applications[app.applicationID] = app @@ -572,7 +573,7 @@ func (ctx *Context) AddTask(request *interfaces.AddTaskRequest) interfaces.Manag if app, valid := managedApp.(*Application); valid { existingTask, err := app.GetTask(request.Metadata.TaskID) if err != nil { - task := NewTask(request.Metadata.TaskID, app, ctx, request.Metadata.Pod) + task := NewFromTaskMeta(request.Metadata.TaskID, app, ctx, request.Metadata) // in recovery mode, task is considered as allocated if request.Recovery { // in scheduling, allocationUUID is assigned by scheduler-core diff --git a/pkg/cache/placeholder.go b/pkg/cache/placeholder.go new file mode 100644 index 000000000..d048630e6 --- /dev/null +++ b/pkg/cache/placeholder.go @@ -0,0 +1,79 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package cache + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" +) + +type Placeholder struct { + appID string + taskGroupName string + pod *v1.Pod +} + +func newPlaceholder(placeholderName string, app *Application, taskGroup v1alpha1.TaskGroup) *Placeholder { + placeholderPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: placeholderName, + Namespace: app.tags[constants.AppTagNamespace], + Labels: map[string]string{ + constants.LabelApplicationID: app.GetApplicationID(), + constants.LabelQueueName: app.GetQueue(), + }, + Annotations: map[string]string{ + constants.AnnotationPlaceholderFlag: "true", + constants.AnnotationTaskGroupName: taskGroup.Name, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: constants.PlaceholderContainerName, + Image: constants.PlaceholderContainerImage, + Resources: v1.ResourceRequirements{ + Requests: utils.GetPlaceholderResourceRequest(taskGroup.MinResource), + }, + }, + }, + RestartPolicy: constants.PlaceholderPodRestartPolicy, + SchedulerName: constants.SchedulerName, + NodeSelector: taskGroup.NodeSelector, + Tolerations: taskGroup.Tolerations, + }, + } + + return &Placeholder{ + appID: app.GetApplicationID(), + taskGroupName: taskGroup.Name, + pod: placeholderPod, + } +} + +func (p *Placeholder) String() string { + return fmt.Sprintf("appID: %s, taskGroup: %s, podName: %s/%s", + p.appID, p.taskGroupName, p.pod.Namespace, p.pod.Name) +} diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go new file mode 100644 index 000000000..4ac47ce4f --- /dev/null +++ b/pkg/cache/placeholder_manager.go @@ -0,0 +1,172 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package cache + +import ( + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + v1 "k8s.io/api/core/v1" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/client" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" + "github.com/apache/incubator-yunikorn-k8shim/pkg/log" +) + +// placeholder manager is a service to manage the lifecycle of app placeholders +type PlaceholderManager struct { + clients *client.Clients + // when the placeholder manager is unable to delete a pod, + // this pod becomes to be an "orphan" pod. We add them to a map + // and keep retrying deleting them in order to avoid wasting resources. + orphanPod map[string]*v1.Pod + stopChan chan struct{} + running atomic.Value + sync.RWMutex +} + +var placeholderMgr *PlaceholderManager +var once sync.Once + +func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager { + var r atomic.Value + r.Store(false) + placeholderMgr = &PlaceholderManager{ + clients: clients, + running: r, + } + return placeholderMgr +} + +func GetPlaceholderManager() *PlaceholderManager { + once.Do(func() { + if placeholderMgr == nil { + log.Logger().Fatal("PlaceholderManager is not initiated") + } + }) + return placeholderMgr +} + +func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error { + mgr.Lock() + defer mgr.Unlock() + + // iterate all task groups, create placeholders for all the min members + for _, tg := range app.getTaskGroups() { + for i := int32(0); i < tg.MinMember; i++ { + placeholderName := utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i) + placeholder := newPlaceholder(placeholderName, app, tg) + // create the placeholder on K8s + _, err := mgr.clients.KubeClient.Create(placeholder.pod) + if err != nil { + // if failed to create the place holder pod + // caller should handle this error + log.Logger().Error("failed to create placeholder pod", + zap.Error(err)) + return err + } + log.Logger().Info("placeholder created", + zap.String("placeholder", placeholder.String())) + } + } + + return nil +} + +// clean up all the placeholders for an application +func (mgr *PlaceholderManager) CleanUp(app *Application) { + mgr.Lock() + defer mgr.Unlock() + log.Logger().Info("start to clean up app placeholders", + zap.String("appID", app.GetApplicationID())) + for taskID, task := range app.taskMap { + if task.GetTaskPlaceholder() { + // remove pod + err := mgr.clients.KubeClient.Delete(task.pod) + if err != nil { + log.Logger().Error("failed to clean up placeholder pod", + zap.Error(err)) + mgr.orphanPod[taskID] = task.pod + } + } + } + log.Logger().Info("finish to clean up app placeholders", + zap.String("appID", app.GetApplicationID())) +} + +func (mgr *PlaceholderManager) cleanOrphanPlaceholders() { + mgr.Lock() + defer mgr.Unlock() + for taskID, pod := range mgr.orphanPod { + log.Logger().Debug("start to clean up orphan pod", + zap.String("taskID", taskID), + zap.String("podName", pod.Name)) + err := mgr.clients.KubeClient.Delete(pod) + if err != nil { + log.Logger().Warn("failed to clean up orphan pod", zap.Error(err)) + } else { + delete(mgr.orphanPod, taskID) + } + } +} + +func (mgr *PlaceholderManager) Start() { + if mgr.isRunning() { + log.Logger().Info("The placeholder manager has been started") + return + } + log.Logger().Info("starting the Placeholder Manager") + mgr.stopChan = make(chan struct{}) + mgr.setRunning(true) + go func() { + for { + select { + case <-mgr.stopChan: + log.Logger().Info("PlaceholderManager has been stopped") + mgr.setRunning(false) + return + default: + // clean orphan placeholders every 5 seconds + log.Logger().Info("clean up orphan pod") + mgr.cleanOrphanPlaceholders() + time.Sleep(5 * time.Second) + } + } + }() +} + +func (mgr *PlaceholderManager) Stop() { + if !mgr.isRunning() { + log.Logger().Info("The placeholder manager has been stopped") + return + } + log.Logger().Info("stopping the Placeholder Manager") + mgr.stopChan <- struct{}{} + time.Sleep(3 * time.Second) +} + +func (mgr *PlaceholderManager) isRunning() bool { + return mgr.running.Load().(bool) +} + +func (mgr *PlaceholderManager) setRunning(flag bool) { + mgr.running.Store(flag) +} diff --git a/pkg/cache/placeholder_manager_test.go b/pkg/cache/placeholder_manager_test.go new file mode 100644 index 000000000..93b9cbf0b --- /dev/null +++ b/pkg/cache/placeholder_manager_test.go @@ -0,0 +1,217 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + + "gotest.tools/assert" + is "gotest.tools/assert/cmp" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + apis "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/client" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" +) + +func TestCreateAppPlaceholders(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + "memory": resource.MustParse("1024M"), + }, + }, + { + Name: "test-group-2", + MinMember: 20, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("1000m"), + "memory": resource.MustParse("2048M"), + }, + }, + }) + + createdPods := make(map[string]*v1.Pod) + mockedAPIProvider := client.NewMockedAPIProvider() + mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { + createdPods[pod.Name] = pod + return pod, nil + }) + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + RWMutex: sync.RWMutex{}, + } + + err := placeholderMgr.createAppPlaceholders(app) + assert.NilError(t, err, "create app placeholders should be successful") + assert.Equal(t, len(createdPods), 30) + + // simulate placeholder creation failures + // failed to create one placeholder + mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { + if pod.Name == "tg-test-group-2-app01-15" { + return nil, fmt.Errorf("failed to create pod %s", pod.Name) + } + return pod, nil + }) + err = placeholderMgr.createAppPlaceholders(app) + assert.Error(t, err, "failed to create pod tg-test-group-2-app01-15") +} + +func TestCleanUp(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedContext := initContextForTest() + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + mockedContext.applications[appID] = app + res := app.getNonTerminatedTaskAlias() + assert.Equal(t, len(res), 0) + + pod1 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + }, + } + pod2 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-02", + UID: "UID-02", + }, + } + pod3 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-03", + UID: "UID-03", + }, + } + taskID1 := "task01" + task1 := NewTask(taskID1, app, mockedContext, pod1) + task1.placeholder = true + app.taskMap[taskID1] = task1 + taskID2 := "task02" + task2 := NewTask(taskID2, app, mockedContext, pod2) + task2.placeholder = true + app.taskMap[taskID2] = task2 + taskID3 := "task03" + task3 := NewTask(taskID3, app, mockedContext, pod3) + task3.placeholder = false + app.taskMap[taskID3] = task3 + res = app.getNonTerminatedTaskAlias() + assert.Equal(t, len(res), 3) + + deletePod := make([]string, 0) + mockedAPIProvider := client.NewMockedAPIProvider() + mockedAPIProvider.MockDeleteFn(func(pod *v1.Pod) error { + deletePod = append(deletePod, pod.Name) + return nil + }) + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + RWMutex: sync.RWMutex{}, + } + placeholderMgr.CleanUp(app) + + // check both pod-01 and pod-02 in deletePod list and pod-03 isn't contain + assert.Assert(t, is.Contains(deletePod, "pod-01")) + assert.Assert(t, is.Contains(deletePod, "pod-02")) + exist := false + for _, item := range deletePod { + if item == "pod-03" { + exist = true + } + } + assert.Equal(t, exist, false) + assert.Equal(t, len(placeholderMgr.orphanPod), 0) +} + +func TestCleanOrphanPlaceholders(t *testing.T) { + mockedAPIProvider := client.NewMockedAPIProvider() + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + RWMutex: sync.RWMutex{}, + } + pod1 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + }, + } + placeholderMgr.orphanPod["task01"] = pod1 + assert.Equal(t, len(placeholderMgr.orphanPod), 1) + placeholderMgr.cleanOrphanPlaceholders() + assert.Equal(t, len(placeholderMgr.orphanPod), 0) +} + +func TestPlaceholderManagerStartStop(t *testing.T) { + mockedAPIProvider := client.NewMockedAPIProvider() + placeholderMgr := &PlaceholderManager{ + clients: mockedAPIProvider.GetAPIs(), + orphanPod: make(map[string]*v1.Pod), + running: atomic.Value{}, + RWMutex: sync.RWMutex{}, + } + placeholderMgr.setRunning(false) + // start clean up goroutine + placeholderMgr.Start() + assert.Equal(t, placeholderMgr.isRunning(), true) + + placeholderMgr.Stop() + // check orphan pod map is empty + assert.Equal(t, placeholderMgr.isRunning(), false) +} diff --git a/pkg/cache/placeholder_test.go b/pkg/cache/placeholder_test.go new file mode 100644 index 000000000..e9d27864b --- /dev/null +++ b/pkg/cache/placeholder_test.go @@ -0,0 +1,136 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package cache + +import ( + "testing" + + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" +) + +func TestNewPlaceholder(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + "memory": resource.MustParse("1024M"), + }, + }, + }) + + holder := newPlaceholder("ph-name", app, app.taskGroups[0]) + assert.Equal(t, holder.appID, appID) + assert.Equal(t, holder.taskGroupName, app.taskGroups[0].Name) + assert.Equal(t, holder.pod.Spec.SchedulerName, constants.SchedulerName) + assert.Equal(t, holder.pod.Name, "ph-name") + assert.Equal(t, holder.pod.Namespace, namespace) + assert.Equal(t, len(holder.pod.Labels), 2) + assert.Equal(t, holder.pod.Labels[constants.LabelApplicationID], appID) + assert.Equal(t, holder.pod.Labels[constants.LabelQueueName], queue) + assert.Equal(t, len(holder.pod.Annotations), 2) + assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name) + assert.Equal(t, common.GetPodResource(holder.pod).Resources[constants.CPU].Value, int64(500)) + assert.Equal(t, common.GetPodResource(holder.pod).Resources[constants.Memory].Value, int64(1024)) + assert.Equal(t, len(holder.pod.Spec.NodeSelector), 0) + assert.Equal(t, len(holder.pod.Spec.Tolerations), 0) + assert.Equal(t, holder.String(), "appID: app01, taskGroup: test-group-1, podName: test/ph-name") +} + +func TestNewPlaceholderWithNodeSelectors(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + "memory": resource.MustParse("1024M"), + }, + NodeSelector: map[string]string{ + "nodeType": "test", + "nodeState": "healthy", + }, + }, + }) + + holder := newPlaceholder("ph-name", app, app.taskGroups[0]) + assert.Equal(t, len(holder.pod.Spec.NodeSelector), 2) + assert.Equal(t, holder.pod.Spec.NodeSelector["nodeType"], "test") + assert.Equal(t, holder.pod.Spec.NodeSelector["nodeState"], "healthy") +} + +func TestNewPlaceholderWithTolerations(t *testing.T) { + const ( + appID = "app01" + queue = "root.default" + namespace = "test" + ) + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication(appID, queue, + "bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI) + app.setTaskGroups([]v1alpha1.TaskGroup{ + { + Name: "test-group-1", + MinMember: 10, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + "memory": resource.MustParse("1024M"), + }, + Tolerations: []v1.Toleration{ + { + Key: "key1", + Operator: v1.TolerationOpEqual, + Value: "value1", + Effect: v1.TaintEffectNoSchedule, + }, + }, + }, + }) + + holder := newPlaceholder("ph-name", app, app.taskGroups[0]) + assert.Equal(t, len(holder.pod.Spec.Tolerations), 1) + tlr := holder.pod.Spec.Tolerations[0] + assert.Equal(t, tlr.Key, "key1") + assert.Equal(t, tlr.Value, "value1") + assert.Equal(t, tlr.Operator, v1.TolerationOpEqual) + assert.Equal(t, tlr.Effect, v1.TaintEffectNoSchedule) +} diff --git a/pkg/cache/task.go b/pkg/cache/task.go index ddc3f8d81..8fa50c7eb 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -24,10 +24,11 @@ import ( "time" "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces" "github.com/apache/incubator-yunikorn-k8shim/pkg/common" "github.com/apache/incubator-yunikorn-k8shim/pkg/common/events" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/utils" "github.com/apache/incubator-yunikorn-k8shim/pkg/dispatcher" "github.com/apache/incubator-yunikorn-k8shim/pkg/log" "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" @@ -37,38 +38,43 @@ import ( ) type Task struct { - taskID string - alias string - applicationID string - application *Application - allocationUUID string - resource *si.Resource - pod *v1.Pod - context *Context - nodeName string - createTime time.Time - sm *fsm.FSM - lock *sync.RWMutex + taskID string + alias string + applicationID string + application *Application + allocationUUID string + resource *si.Resource + pod *v1.Pod + context *Context + nodeName string + createTime time.Time + taskGroupName string + placeholder bool + terminationType string + sm *fsm.FSM + lock *sync.RWMutex } func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task { taskResource := common.GetPodResource(pod) - return createTaskInternal(tid, app, taskResource, pod, ctx) + return createTaskInternal(tid, app, taskResource, pod, false, "", ctx) } -// test only -func CreateTaskForTest(tid string, app *Application, resource *si.Resource, ctx *Context) *Task { - // for testing purpose, the pod name is same as the taskID - taskPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: tid, - }, - } - return createTaskInternal(tid, app, resource, taskPod, ctx) +func NewFromTaskMeta(tid string, app *Application, ctx *Context, metadata interfaces.TaskMetadata) *Task { + taskPod := metadata.Pod + taskResource := common.GetPodResource(taskPod) + return createTaskInternal( + tid, + app, + taskResource, + metadata.Pod, + metadata.Placeholder, + metadata.TaskGroupName, + ctx) } func createTaskInternal(tid string, app *Application, resource *si.Resource, - pod *v1.Pod, ctx *Context) *Task { + pod *v1.Pod, placeholder bool, taskGroupName string, ctx *Context) *Task { task := &Task{ taskID: tid, alias: fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), @@ -77,6 +83,8 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource, pod: pod, resource: resource, createTime: pod.GetCreationTimestamp().Time, + placeholder: placeholder, + taskGroupName: taskGroupName, context: ctx, lock: &sync.RWMutex{}, } @@ -121,10 +129,15 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource, states.Rejected: task.postTaskRejected, beforeHook(events.CompleteTask): task.beforeTaskCompleted, states.Failed: task.postTaskFailed, + states.Bound: task.postTaskBound, events.EnterState: task.enterState, }, ) + if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" { + task.taskGroupName = tgName + } + return task } @@ -162,11 +175,35 @@ func (task *Task) GetTaskID() string { return task.taskID } +func (task *Task) GetTaskPlaceholder() bool { + task.lock.RLock() + defer task.lock.RUnlock() + return task.placeholder +} + func (task *Task) GetTaskState() string { // fsm has its own internal lock, we don't need to hold node's lock here return task.sm.Current() } +func (task *Task) setTaskGroupName(groupName string) { + task.lock.Lock() + defer task.lock.Unlock() + task.taskGroupName = groupName +} + +func (task *Task) setTaskTerminationType(terminationTyp string) { + task.lock.Lock() + defer task.lock.Unlock() + task.terminationType = terminationTyp +} + +func (task *Task) getTaskGroupName() string { + task.lock.RLock() + defer task.lock.RUnlock() + return task.taskGroupName +} + func (task *Task) getTaskAllocationUUID() string { task.lock.RLock() defer task.lock.RUnlock() @@ -213,7 +250,13 @@ func (task *Task) handleSubmitTaskEvent(event *fsm.Event) { log.Logger().Debug("scheduling pod", zap.String("podName", task.pod.Name)) // convert the request - rr := common.CreateUpdateRequestForTask(task.applicationID, task.taskID, task.resource, task.pod) + rr := common.CreateUpdateRequestForTask( + task.applicationID, + task.taskID, + task.resource, + task.placeholder, + task.taskGroupName, + task.pod) log.Logger().Debug("send update request", zap.String("request", rr.String())) if err := task.context.apiProvider.GetAPIs().SchedulerAPI.Update(&rr); err != nil { log.Logger().Debug("failed to send scheduling request to scheduler", zap.Error(err)) @@ -300,6 +343,16 @@ func (task *Task) postTaskAllocated(event *fsm.Event) { }(event) } +func (task *Task) postTaskBound(event *fsm.Event) { + if task.placeholder { + log.Logger().Info("placeholder is bound", + zap.String("appID", task.applicationID), + zap.String("taskName", task.alias), + zap.String("taskGroupName", task.taskGroupName)) + dispatcher.Dispatch(NewUpdateApplicationReservationEvent(task.applicationID)) + } +} + func (task *Task) postTaskRejected(event *fsm.Event) { // currently, once task is rejected by scheduler, we directly move task to failed state. // so this function simply triggers the state transition when it is rejected. @@ -341,7 +394,8 @@ func (task *Task) releaseAllocation() { zap.String("taskID", task.taskID), zap.String("taskAlias", task.alias), zap.String("allocationUUID", task.allocationUUID), - zap.String("task", task.GetTaskState())) + zap.String("task", task.GetTaskState()), + zap.String("terminationType", task.terminationType)) // depends on current task state, generate requests accordingly. // if task is already allocated, which means the scheduler core already, @@ -369,7 +423,7 @@ func (task *Task) releaseAllocation() { return } releaseRequest = common.CreateReleaseAllocationRequestForTask( - task.applicationID, task.allocationUUID, task.application.partition) + task.applicationID, task.allocationUUID, task.application.partition, task.terminationType) } if releaseRequest.Releases != nil { diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index 966b79dfb..a2e138bae 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -413,13 +413,33 @@ func TestIsTerminated(t *testing.T) { }, } task := NewTask("task01", app, mockedContext, pod) - //set task states to non-terminated + // set task states to non-terminated task.sm.SetState(events.States().Task.Pending) res := task.isTerminated() assert.Equal(t, res, false) - //set task states to terminated + // set task states to terminated task.sm.SetState(events.States().Task.Failed) res = task.isTerminated() assert.Equal(t, res, true) } + +func TestSetTaskGroup(t *testing.T) { + mockedContext := initContextForTest() + mockedSchedulerAPI := newMockSchedulerAPI() + app := NewApplication("app01", "root.default", + "bob", map[string]string{}, mockedSchedulerAPI) + pod := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + }, + } + task := NewTask("task01", app, mockedContext, pod) + task.setTaskGroupName("test-group") + assert.Equal(t, task.getTaskGroupName(), "test-group") +} diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go index 0ae0095d4..5bb6f122c 100644 --- a/pkg/client/apifactory_mock.go +++ b/pkg/client/apifactory_mock.go @@ -103,6 +103,12 @@ func (m *MockedAPIProvider) MockDeleteFn(dfn func(pod *v1.Pod) error) { } } +func (m *MockedAPIProvider) MockCreateFn(cfn func(pod *v1.Pod) (*v1.Pod, error)) { + if mock, ok := m.clients.KubeClient.(*KubeClientMock); ok { + mock.createFn = cfn + } +} + func (m *MockedAPIProvider) SetNodeLister(lister corev1.NodeLister) { informer := m.clients.NodeInformer if i, ok := informer.(*test.MockedNodeInformer); ok { diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 87e6bad43..f60580fc5 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -28,6 +28,9 @@ type KubeClient interface { // bind a pod to a specific host Bind(pod *v1.Pod, hostID string) error + // Create a pod + Create(pod *v1.Pod) (*v1.Pod, error) + // Delete a pod from a host Delete(pod *v1.Pod) error diff --git a/pkg/client/kubeclient.go b/pkg/client/kubeclient.go index 3f253bd7f..f54c1a01d 100644 --- a/pkg/client/kubeclient.go +++ b/pkg/client/kubeclient.go @@ -100,6 +100,10 @@ func (nc SchedulerKubeClient) Bind(pod *v1.Pod, hostID string) error { return nil } +func (nc SchedulerKubeClient) Create(pod *v1.Pod) (*v1.Pod, error) { + return nc.clientSet.CoreV1().Pods(pod.Namespace).Create(pod) +} + func (nc SchedulerKubeClient) Delete(pod *v1.Pod) error { // TODO make this configurable for pods gracefulSeconds := int64(3) diff --git a/pkg/client/kubeclient_mock.go b/pkg/client/kubeclient_mock.go index af8e9ba17..c289f61b1 100644 --- a/pkg/client/kubeclient_mock.go +++ b/pkg/client/kubeclient_mock.go @@ -32,6 +32,7 @@ import ( type KubeClientMock struct { bindFn func(pod *v1.Pod, hostID string) error deleteFn func(pod *v1.Pod) error + createFn func(pod *v1.Pod) (*v1.Pod, error) clientSet kubernetes.Interface } @@ -47,6 +48,11 @@ func NewKubeClientMock() *KubeClientMock { zap.String("PodName", pod.Name)) return nil }, + createFn: func(pod *v1.Pod) (*v1.Pod, error) { + log.Logger().Info("pod created", + zap.String("PodName", pod.Name)) + return pod, nil + }, clientSet: fake.NewSimpleClientset(), } } @@ -59,10 +65,18 @@ func (c *KubeClientMock) MockDeleteFn(dfn func(pod *v1.Pod) error) { c.deleteFn = dfn } +func (c *KubeClientMock) MockCreateFn(cfn func(pod *v1.Pod) (*v1.Pod, error)) { + c.createFn = cfn +} + func (c *KubeClientMock) Bind(pod *v1.Pod, hostID string) error { return c.bindFn(pod, hostID) } +func (c *KubeClientMock) Create(pod *v1.Pod) (*v1.Pod, error) { + return c.createFn(pod) +} + func (c *KubeClientMock) Delete(pod *v1.Pod) error { return c.deleteFn(pod) } diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 7559ab040..09ef14f3c 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -48,5 +48,13 @@ const SparkLabelRoleDriver = "driver" const DefaultConfigMapName = "yunikorn-configs" const SchedulerName = "yunikorn" -//Application crd +// Application crd const AppManagerHandlerName = "yunikorn-app" +const AnnotationPlaceholderFlag = "yunikorn.apache.org/placeholder" +const AnnotationTaskGroupName = "yunikorn.apache.org/task-group-name" +const AnnotationTaskGroups = "yunikorn.apache.org/task-groups" + +// Gang scheduling +const PlaceholderContainerImage = "k8s.gcr.io/pause" +const PlaceholderContainerName = "pause" +const PlaceholderPodRestartPolicy = "Never" diff --git a/pkg/common/events/events.go b/pkg/common/events/events.go index 644251c14..52188e8e0 100644 --- a/pkg/common/events/events.go +++ b/pkg/common/events/events.go @@ -36,6 +36,8 @@ const ( SubmitApplication ApplicationEventType = "SubmitApplication" RecoverApplication ApplicationEventType = "RecoverApplication" AcceptApplication ApplicationEventType = "AcceptApplication" + TryReserve ApplicationEventType = "TryReserve" + UpdateReservation ApplicationEventType = "UpdateReservation" RunApplication ApplicationEventType = "RunApplication" RejectApplication ApplicationEventType = "RejectApplication" CompleteApplication ApplicationEventType = "CompleteApplication" diff --git a/pkg/common/events/states.go b/pkg/common/events/states.go index 0af94d76b..991bdcad4 100644 --- a/pkg/common/events/states.go +++ b/pkg/common/events/states.go @@ -44,6 +44,7 @@ type ApplicationStates struct { Recovering string Submitted string Accepted string + Reserving string Running string Rejected string Completed string @@ -84,6 +85,7 @@ func States() *AllStates { Recovering: "Recovering", Submitted: "Submitted", Accepted: "Accepted", + Reserving: "Reserving", Running: "Running", Rejected: "Rejected", Completed: "Completed", diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index cf8405f70..5cb37b07f 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -43,13 +43,15 @@ func createTagsForTask(pod *v1.Pod) map[string]string { return tags } -func CreateUpdateRequestForTask(appID, taskID string, resource *si.Resource, pod *v1.Pod) si.UpdateRequest { +func CreateUpdateRequestForTask(appID, taskID string, resource *si.Resource, placeholder bool, taskGroupName string, pod *v1.Pod) si.UpdateRequest { ask := si.AllocationAsk{ AllocationKey: taskID, ResourceAsk: resource, ApplicationID: appID, MaxAllocations: 1, Tags: createTagsForTask(pod), + Placeholder: placeholder, + TaskGroupName: taskGroupName, } result := si.UpdateRequest{ @@ -84,13 +86,21 @@ func CreateReleaseAskRequestForTask(appID, taskId, partition string) si.UpdateRe return result } -func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition string) si.UpdateRequest { +func GetTerminationTypeFromString(terminationTypeStr string) si.AllocationRelease_TerminationType { + if v, ok := si.AllocationRelease_TerminationType_value[terminationTypeStr]; ok { + return si.AllocationRelease_TerminationType(v) + } + return si.AllocationRelease_STOPPED_BY_RM +} + +func CreateReleaseAllocationRequestForTask(appID, allocUUID, partition, terminationType string) si.UpdateRequest { toReleases := make([]*si.AllocationRelease, 0) toReleases = append(toReleases, &si.AllocationRelease{ - ApplicationID: appID, - UUID: allocUUID, - PartitionName: partition, - Message: "task completed", + ApplicationID: appID, + UUID: allocUUID, + PartitionName: partition, + TerminationType: GetTerminationTypeFromString(terminationType), + Message: "task completed", }) releaseRequest := si.AllocationReleasesRequest{ diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index ff06b280e..5f01a2556 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -27,7 +27,7 @@ import ( ) func TestCreateReleaseAllocationRequest(t *testing.T) { - request := CreateReleaseAllocationRequestForTask("app01", "alloc01", "default") + request := CreateReleaseAllocationRequestForTask("app01", "alloc01", "default", "STOPPED_BY_RM") assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease != nil) assert.Assert(t, request.Releases.AllocationAsksToRelease == nil) @@ -83,7 +83,7 @@ func TestCreateUpdateRequestForTask(t *testing.T) { }, } - updateRequest := CreateUpdateRequestForTask("appId1", "taskId1", res, pod) + updateRequest := CreateUpdateRequestForTask("appId1", "taskId1", res, false, "", pod) asks := updateRequest.Asks assert.Equal(t, len(asks), 1) allocAsk := asks[0] diff --git a/pkg/common/utils/gang_utils.go b/pkg/common/utils/gang_utils.go new file mode 100644 index 000000000..6a8174eb6 --- /dev/null +++ b/pkg/common/utils/gang_utils.go @@ -0,0 +1,188 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package utils + +import ( + "encoding/json" + "fmt" + "strconv" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" +) + +func FindAppTaskGroup(appTaskGroups []*v1alpha1.TaskGroup, groupName string) (*v1alpha1.TaskGroup, error) { + if groupName == "" { + // task has no group defined + return nil, nil + } + + // app has no taskGroups associated + if len(appTaskGroups) == 0 { + return nil, nil + } + + // task group defined in app, return the corresponding taskGroup + for _, tg := range appTaskGroups { + if tg.Name == groupName { + return tg, nil + } + } + + // task group name specified, but could not find a mapping value in app taskGroups + return nil, fmt.Errorf("taskGroup %s is not defined in the application", groupName) +} + +// the placeholder name is the pod name, pod name can not be longer than 63 chars, +// taskGroup name and appID will be truncated if they go over 20/28 chars respectively, +// each taskGroup is assigned with an incremental index starting from 0. +func GeneratePlaceholderName(taskGroupName, appID string, index int32) string { + // taskGroup name no longer than 20 chars + // appID no longer than 28 chars + // total length no longer than 20 + 28 + 5 + 10 = 63 + shortTaskGroupName := fmt.Sprintf("%.20s", taskGroupName) + shortAppID := fmt.Sprintf("%.28s", appID) + return "tg-" + shortTaskGroupName + "-" + shortAppID + fmt.Sprintf("-%d", index) +} + +func GetPlaceholderResourceRequest(resources map[string]resource.Quantity) v1.ResourceList { + resourceReq := v1.ResourceList{} + for k, v := range resources { + resourceReq[v1.ResourceName(k)] = v + } + return resourceReq +} + +func GetPlaceholderFlagFromPodSpec(pod *v1.Pod) bool { + if value, ok := pod.Annotations[constants.AnnotationPlaceholderFlag]; ok { + if v, err := strconv.ParseBool(value); err == nil { + return v + } + } + return false +} + +func GetTaskGroupFromPodSpec(pod *v1.Pod) string { + if value, ok := pod.Annotations[constants.AnnotationTaskGroupName]; ok { + return value + } + return "" +} + +func GetTaskGroupsFromAnnotation(pod *v1.Pod) ([]v1alpha1.TaskGroup, error) { + taskGroupInfo, ok := pod.Annotations[constants.AnnotationTaskGroups] + if !ok { + return nil, nil + } + taskGroups := []v1alpha1.TaskGroup{} + err := json.Unmarshal([]byte(taskGroupInfo), &taskGroups) + if err != nil { + return nil, err + } + // json.Unmarchal won't return error if name or MinMember is empty, but will return error if MinResource is empty or error format. + for _, taskGroup := range taskGroups { + if taskGroup.Name == "" { + return nil, fmt.Errorf("can't get taskGroup Name from pod annotation, %s", + pod.Annotations[constants.AnnotationTaskGroups]) + } + if taskGroup.MinMember == int32(0) { + return nil, fmt.Errorf("can't get taskGroup MinMember from pod annotation, %s", + pod.Annotations[constants.AnnotationTaskGroups]) + } + if taskGroup.MinMember < int32(0) { + return nil, fmt.Errorf("minMember cannot be negative, %s", + pod.Annotations[constants.AnnotationTaskGroups]) + } + } + return taskGroups, nil +} + +type TaskGroupInstanceCountMap struct { + counts map[string]int32 + sync.RWMutex +} + +func NewTaskGroupInstanceCountMap() *TaskGroupInstanceCountMap { + return &TaskGroupInstanceCountMap{ + counts: make(map[string]int32), + } +} + +func (t *TaskGroupInstanceCountMap) Add(taskGroupName string, num int32) { + t.update(taskGroupName, num) +} + +func (t *TaskGroupInstanceCountMap) AddOne(taskGroupName string) { + t.update(taskGroupName, 1) +} + +func (t *TaskGroupInstanceCountMap) DeleteOne(taskGroupName string) { + t.update(taskGroupName, -1) +} + +func (t *TaskGroupInstanceCountMap) update(taskGroupName string, delta int32) { + t.Lock() + defer t.Unlock() + if v, ok := t.counts[taskGroupName]; ok { + t.counts[taskGroupName] = v + delta + } else { + t.counts[taskGroupName] = delta + } +} + +func (t *TaskGroupInstanceCountMap) Size() int { + t.RLock() + defer t.RUnlock() + return len(t.counts) +} + +func (t *TaskGroupInstanceCountMap) GetTaskGroupInstanceCount(groupName string) int32 { + t.RLock() + defer t.RUnlock() + return t.counts[groupName] +} + +func (t *TaskGroupInstanceCountMap) Equals(target *TaskGroupInstanceCountMap) bool { + if t == nil { + return t == target + } + + t.RLock() + defer t.RUnlock() + + if target == nil { + return false + } + + if t.Size() != target.Size() { + return false + } + + for k, v := range t.counts { + if target.counts[k] != v { + return false + } + } + + return true +} diff --git a/pkg/common/utils/gang_utils_test.go b/pkg/common/utils/gang_utils_test.go new file mode 100644 index 000000000..fec7d145e --- /dev/null +++ b/pkg/common/utils/gang_utils_test.go @@ -0,0 +1,359 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package utils + +import ( + "math" + "testing" + + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/apache/incubator-yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" + "github.com/apache/incubator-yunikorn-k8shim/pkg/common/constants" +) + +func TestFindAppTaskGroup(t *testing.T) { + taskGroups := []*v1alpha1.TaskGroup{ + { + Name: "test-group-0", + MinMember: 1, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + }, + }, + { + Name: "test-group-1", + MinMember: 2, + MinResource: map[string]resource.Quantity{ + "cpu": resource.MustParse("500m"), + }, + }, + } + + tg, err := FindAppTaskGroup(taskGroups, "") + assert.NilError(t, err) + assert.Assert(t, tg == nil) + + tg, err = FindAppTaskGroup(nil, "") + assert.NilError(t, err) + assert.Assert(t, tg == nil) + + tg, err = FindAppTaskGroup(nil, "test-group-0") + assert.NilError(t, err) + assert.Assert(t, tg == nil) + + tg, err = FindAppTaskGroup(taskGroups, "test-group-3") + assert.Error(t, err, "taskGroup test-group-3 is not defined in the application") + assert.Assert(t, tg == nil) + + tg, err = FindAppTaskGroup(taskGroups, "test-group-1") + assert.NilError(t, err) + assert.Equal(t, tg.Name, "test-group-1") + assert.Equal(t, tg.MinMember, int32(2)) +} + +func TestGeneratePlaceholderName(t *testing.T) { + name := GeneratePlaceholderName("my-group", "app0001", 100) + assert.Equal(t, name, "tg-my-group-app0001-100") + + name = GeneratePlaceholderName("my-group", + "app00000000000000000000000000000000000000000001", 100) + assert.Equal(t, name, "tg-my-group-app0000000000000000000000000-100") + assert.Assert(t, len(name) < 63) + + name = GeneratePlaceholderName("a-very-long-task-group-name------------------------------------------", + "a-very-long-app-ID-----------------------------------------------------------------", 100) + assert.Equal(t, name, "tg-a-very-long-task-gro-a-very-long-app-ID-----------100") + assert.Assert(t, len(name) < 63) + + name = GeneratePlaceholderName("a-very-long-task-group-name------------------------------------------", + "a-very-long-app-ID-----------------------------------------------------------------", math.MaxInt32) + assert.Equal(t, name, "tg-a-very-long-task-gro-a-very-long-app-ID-----------2147483647") + assert.Assert(t, len(name) == 63) +} + +func TestGetTaskGroupFromPodSpec(t *testing.T) { + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + Annotations: map[string]string{ + constants.AnnotationTaskGroupName: "test-task-group", + }, + }, + } + + assert.Equal(t, GetTaskGroupFromPodSpec(pod), "test-task-group") + + pod = &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-01", + UID: "UID-01", + }, + } + + assert.Equal(t, GetTaskGroupFromPodSpec(pod), "") +} + +func TestTaskGroupInstanceCountMap(t *testing.T) { + counts := NewTaskGroupInstanceCountMap() + assert.Equal(t, counts.Size(), 0) + assert.Equal(t, counts.GetTaskGroupInstanceCount("g1"), int32(0)) + assert.Equal(t, counts.GetTaskGroupInstanceCount("g2"), int32(0)) + + counts = NewTaskGroupInstanceCountMap() + counts.AddOne("g1") + counts.AddOne("g1") + counts.AddOne("g1") + counts.AddOne("g2") + counts.AddOne("g2") + assert.Equal(t, counts.Size(), 2) + assert.Equal(t, counts.GetTaskGroupInstanceCount("g1"), int32(3)) + assert.Equal(t, counts.GetTaskGroupInstanceCount("g2"), int32(2)) + + counts.DeleteOne("g1") + counts.DeleteOne("g2") + assert.Equal(t, counts.Size(), 2) + assert.Equal(t, counts.GetTaskGroupInstanceCount("g1"), int32(2)) + assert.Equal(t, counts.GetTaskGroupInstanceCount("g2"), int32(1)) + + counts1 := NewTaskGroupInstanceCountMap() + counts2 := NewTaskGroupInstanceCountMap() + assert.Equal(t, counts1.Equals(counts2), true) + counts1.Add("g1", 101) + counts2.Add("g1", 101) + assert.Equal(t, counts1.Equals(counts2), true) + counts1.Add("g1", 100) + counts2.Add("g1", 101) + assert.Equal(t, counts1.Equals(counts2), false) + + counts1 = NewTaskGroupInstanceCountMap() + counts2 = NewTaskGroupInstanceCountMap() + counts1.AddOne("g1") + counts1.AddOne("g2") + counts1.AddOne("g3") + counts1.AddOne("g4") + counts1.AddOne("g5") + counts2.AddOne("g5") + counts2.AddOne("g4") + counts2.AddOne("g3") + counts2.AddOne("g2") + counts2.AddOne("g1") + assert.Equal(t, counts1.Equals(counts2), true) + + counts1 = NewTaskGroupInstanceCountMap() + counts2 = NewTaskGroupInstanceCountMap() + counts1.AddOne("g1") + counts1.AddOne("g2") + counts2.AddOne("g1") + assert.Equal(t, counts1.Equals(counts2), false) + + counts1 = NewTaskGroupInstanceCountMap() + counts2 = NewTaskGroupInstanceCountMap() + counts1.AddOne("g1") + counts2.AddOne("g2") + counts2.AddOne("g1") + assert.Equal(t, counts1.Equals(counts2), false) + + var nilOne *TaskGroupInstanceCountMap + var nilTwo *TaskGroupInstanceCountMap + assert.Equal(t, nilOne.Equals(nilTwo), true) + + empty := NewTaskGroupInstanceCountMap() + assert.Equal(t, nilOne.Equals(empty), false) +} + +// nolint: funlen +func TestGetTaskGroupFromAnnotation(t *testing.T) { + // correct json + testGroup := ` + [ + { + "name": "test-group-1", + "minMember": 10, + "minResource": { + "cpu": 1, + "memory": "2Gi" + }, + "nodeSelector": { + "test": "testnode", + "locate": "west" + }, + "tolerations": [ + { + "key": "key", + "operator": "Equal", + "value": "value", + "effect": "NoSchedule" + } + ] + }, + { + "name": "test-group-2", + "minMember": 5, + "minResource": { + "cpu": 2, + "memory": "4Gi" + } + } + ]` + testGroup2 := ` + [ + { + "name": "test-group-3", + "minMember": 3, + "minResource": { + "cpu": 2, + "memory": "1Gi" + } + } + ]` + // Error json + testGroupErr := ` + [ + { + "name": "test-group-err-1", + "minMember": "ERR", + "minResource": { + "cpu": "ERR", + "memory": "ERR" + }, + } + ]` + // without name + testGroupErr2 := ` + [ + { + "minMember": 3, + "minResource": { + "cpu": 2, + "memory": "1Gi" + } + } + ]` + // without minMember + testGroupErr3 := ` + [ + { + "name": "test-group-err-2", + "minResource": { + "cpu": 2, + "memory": "1Gi" + } + } + ]` + // withot minResource + testGroupErr4 := ` + [ + { + "name": "test-group-err-4", + "minMember": 3, + } + ]` + // negative minMember + testGroupErr5 := ` + [ + { + "name": "test-group-err-5", + "minMember": -100, + } + ]` + // Insert task group info to pod annotation + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-err", + Namespace: "test", + UID: "test-pod-UID-err", + }, + Spec: v1.PodSpec{}, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + // Empty case + taskGroupEmpty, err := GetTaskGroupsFromAnnotation(pod) + assert.Assert(t, taskGroupEmpty == nil) + assert.Assert(t, err == nil) + // Error case + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroupErr} + taskGroupErr, err := GetTaskGroupsFromAnnotation(pod) + assert.Assert(t, taskGroupErr == nil) + assert.Assert(t, err != nil) + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroupErr2} + taskGroupErr2, err := GetTaskGroupsFromAnnotation(pod) + assert.Assert(t, taskGroupErr2 == nil) + assert.Assert(t, err != nil) + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroupErr3} + taskGroupErr3, err := GetTaskGroupsFromAnnotation(pod) + assert.Assert(t, taskGroupErr3 == nil) + assert.Assert(t, err != nil) + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroupErr4} + taskGroupErr4, err := GetTaskGroupsFromAnnotation(pod) + assert.Assert(t, taskGroupErr4 == nil) + assert.Assert(t, err != nil) + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroupErr5} + taskGroupErr5, err := GetTaskGroupsFromAnnotation(pod) + assert.Assert(t, taskGroupErr5 == nil) + assert.Assert(t, err != nil) + // Correct case + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroup} + taskGroups, err := GetTaskGroupsFromAnnotation(pod) + assert.NilError(t, err) + // Group value check + assert.Equal(t, taskGroups[0].Name, "test-group-1") + assert.Equal(t, taskGroups[0].MinMember, int32(10)) + assert.Equal(t, taskGroups[0].MinResource["cpu"], resource.MustParse("1")) + assert.Equal(t, taskGroups[0].MinResource["memory"], resource.MustParse("2Gi")) + assert.Equal(t, taskGroups[1].Name, "test-group-2") + assert.Equal(t, taskGroups[1].MinMember, int32(5)) + assert.Equal(t, taskGroups[1].MinResource["cpu"], resource.MustParse("2")) + assert.Equal(t, taskGroups[1].MinResource["memory"], resource.MustParse("4Gi")) + // NodeSelector check + assert.Equal(t, taskGroups[0].NodeSelector["test"], "testnode") + assert.Equal(t, taskGroups[0].NodeSelector["locate"], "west") + // Toleration check + var tolerations []v1.Toleration + toleration := v1.Toleration{ + Key: "key", + Operator: "Equal", + Value: "value", + Effect: "NoSchedule", + } + tolerations = append(tolerations, toleration) + assert.DeepEqual(t, taskGroups[0].Tolerations, tolerations) + + pod.Annotations = map[string]string{constants.AnnotationTaskGroups: testGroup2} + taskGroups2, err := GetTaskGroupsFromAnnotation(pod) + assert.NilError(t, err) + assert.Equal(t, taskGroups2[0].Name, "test-group-3") + assert.Equal(t, taskGroups2[0].MinMember, int32(3)) + assert.Equal(t, taskGroups2[0].MinResource["cpu"], resource.MustParse("2")) + assert.Equal(t, taskGroups2[0].MinResource["memory"], resource.MustParse("1Gi")) +} diff --git a/pkg/common/utils/utils.go b/pkg/common/utils/utils.go index afe8f799a..c62ba128e 100644 --- a/pkg/common/utils/utils.go +++ b/pkg/common/utils/utils.go @@ -15,6 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ + package utils import ( diff --git a/pkg/controller/application/app_controller_test.go b/pkg/controller/application/app_controller_test.go index 5b84da579..49aa1a791 100644 --- a/pkg/controller/application/app_controller_test.go +++ b/pkg/controller/application/app_controller_test.go @@ -165,13 +165,13 @@ func createApp(name string, namespace string, queue string) appv1.Application { UID: "UID-APP-00001", }, Spec: appv1.ApplicationSpec{ - Policy: appv1.SchedulePolicy{ - Policy: "TryOnce", - }, Queue: queue, - TaskGroup: []appv1.Task{ + SchedulingPolicy: appv1.SchedulingPolicy{ + Type: appv1.TryReserve, + }, + TaskGroups: []appv1.TaskGroup{ { - GroupName: "test-task-001", + Name: "test-task-001", MinMember: 0, }, }, diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index 63f8565ba..8d0e9e026 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -267,6 +267,9 @@ func (ss *KubernetesShim) run() { } ss.apiFactory.Start() + + // run the placeholder manager + cache.NewPlaceholderManager(ss.apiFactory.GetAPIs()).Start() } func (ss *KubernetesShim) enterState(event *fsm.Event) { @@ -284,6 +287,8 @@ func (ss *KubernetesShim) stop() { dispatcher.Stop() // stop the app manager ss.appManager.Stop() + // stop the placeholder manager + cache.GetPlaceholderManager().Stop() default: log.Logger().Info("scheduler is already stopped") } diff --git a/pkg/simulation/gang/gangclient/gangclient.go b/pkg/simulation/gang/gangclient/gangclient.go new file mode 100644 index 000000000..31951f479 --- /dev/null +++ b/pkg/simulation/gang/gangclient/gangclient.go @@ -0,0 +1,97 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "os" + "strconv" + "strings" + "time" +) + +func main() { + jobID := os.Getenv("JOB_ID") + if jobID == "" { + err := errors.New("can't get the jobID from env") + log.Fatal(err) + } + serviceName := os.Getenv("SERVICE_NAME") + if serviceName == "" { + err := errors.New("can't get the service name from env") + log.Fatal(err) + } + memberAmount, err := strconv.Atoi(os.Getenv("MEMBER_AMOUNT")) + if err != nil { + err = errors.New("can't get the member amount from env") + log.Fatal(err) + } + taskExecutionSeconds, err := strconv.Atoi(os.Getenv("TASK_EXECUTION_SECONDS")) + if err != nil { + err = errors.New("can't get the task execution seconds from env") + log.Fatal(err) + } + serviceName = strings.ToUpper(serviceName) + "_SERVICE_HOST" + serviceIP := os.Getenv(serviceName) + err = addRequest(serviceIP, jobID) + if err != nil { + log.Fatal(err) + } + // Check if we satisfy gang minMember or not every 2 second + for { + number, err := checkRequest(serviceIP, jobID) + if err != nil { + log.Fatal(err) + } + if number >= memberAmount { + log.Printf("satisfy gang minMember.") + log.Printf("start to run task.") + log.Printf("Task will run for %d min and %d sec.", taskExecutionSeconds/60, taskExecutionSeconds-((taskExecutionSeconds/60)*60)) + time.Sleep(time.Duration(taskExecutionSeconds) * time.Second) + break + } + log.Printf("The ready member and expected member: %d/%d", number, memberAmount) + time.Sleep(2 * time.Second) + } +} + +func addRequest(ip string, jobID string) error { + _, err := http.Get("http://" + ip + ":8863" + "/ws/v1/add/" + jobID) + if err != nil { + return err + } + return nil +} + +func checkRequest(ip string, jobID string) (int, error) { + resp, err := http.Get("http://" + ip + ":8863" + "/ws/v1/check/" + jobID) + var value int + if err != nil { + return -1, err + } + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&value) + if err != nil { + return -1, err + } + return value, nil +} diff --git a/pkg/simulation/gang/webserver/handlers.go b/pkg/simulation/gang/webserver/handlers.go new file mode 100644 index 000000000..e394735e7 --- /dev/null +++ b/pkg/simulation/gang/webserver/handlers.go @@ -0,0 +1,58 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "log" + "net/http" + + "github.com/gorilla/mux" +) + +func writeHeader(w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + w.Header().Set("Access-Control-Allow-Methods", "GET,POST,HEAD,OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "X-Requested-With,Content-Type,Accept,Origin") +} + +func setTaskReady(w http.ResponseWriter, r *http.Request) { + writeHeader(w) + lock.Lock() + defer lock.Unlock() + vars := mux.Vars(r) + jobID := vars["jobID"] + jobMember[jobID]++ + if err := json.NewEncoder(w).Encode(jobMember); err != nil { + log.Printf("Add task %s fail.", jobID) + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func checkJobReady(w http.ResponseWriter, r *http.Request) { + writeHeader(w) + vars := mux.Vars(r) + jobID := vars["jobID"] + if err := json.NewEncoder(w).Encode(jobMember[jobID]); err != nil { + log.Printf("Check job %s Member fail", jobID) + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} diff --git a/pkg/simulation/gang/webserver/routes.go b/pkg/simulation/gang/webserver/routes.go new file mode 100644 index 000000000..9f902a8ea --- /dev/null +++ b/pkg/simulation/gang/webserver/routes.go @@ -0,0 +1,46 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "net/http" +) + +type route struct { + Name string + Method string + Pattern string + HandlerFunc http.HandlerFunc +} +type routes []route + +var webRoutes = routes{ + route{ + "Counter", + "GET", + "/ws/v1/add/{jobID}", + setTaskReady, + }, + route{ + "Counter", + "GET", + "/ws/v1/check/{jobID}", + checkJobReady, + }, +} diff --git a/pkg/simulation/gang/webserver/webserver.go b/pkg/simulation/gang/webserver/webserver.go new file mode 100644 index 000000000..f1c6f92fa --- /dev/null +++ b/pkg/simulation/gang/webserver/webserver.go @@ -0,0 +1,26 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +var jobMember = make(map[string]int) + +func main() { + webapp := NewWebApp() + webapp.StartWebApp() +} diff --git a/pkg/simulation/gang/webserver/webservice.go b/pkg/simulation/gang/webserver/webservice.go new file mode 100644 index 000000000..075683189 --- /dev/null +++ b/pkg/simulation/gang/webserver/webservice.go @@ -0,0 +1,76 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "context" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/mux" +) + +var lock sync.RWMutex + +type WebService struct { + httpServer *http.Server +} + +func newRouter() *mux.Router { + router := mux.NewRouter().StrictSlash(true) + for _, webRoute := range webRoutes { + handler := loggingHandler(webRoute.HandlerFunc, webRoute.Name) + router.Methods(webRoute.Method).Path(webRoute.Pattern).Name(webRoute.Name).Handler(handler) + } + return router +} + +func loggingHandler(inner http.Handler, name string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + inner.ServeHTTP(w, r) + log.Printf("%s %s %s %s", r.Method, r.RequestURI, name, time.Since(start)) + }) +} + +func (m *WebService) StartWebApp() { + router := newRouter() + m.httpServer = &http.Server{Addr: ":8863", Handler: router} + log.Println("web-app started with port 8863") + httpError := m.httpServer.ListenAndServe() + if httpError != nil && httpError != http.ErrServerClosed { + log.Println("HTTP server error.") + } +} + +func NewWebApp() *WebService { + m := &WebService{} + return m +} + +func (m *WebService) StopWebApp() error { + if m.httpServer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return m.httpServer.Shutdown(ctx) + } + return nil +} diff --git a/test/e2e/app/app_test.go b/test/e2e/app/app_test.go index 6d4ff994d..ac426225b 100644 --- a/test/e2e/app/app_test.go +++ b/test/e2e/app/app_test.go @@ -34,7 +34,7 @@ import ( var _ = ginkgo.Describe("App", func() { var kClient k8s.KubeCtl var appClient *crdclientset.Clientset - var appCRDDef string + // var appCRDDef string var appCRD *v1alpha1.Application var dev = "apptest" @@ -51,17 +51,41 @@ var _ = ginkgo.Describe("App", func() { appClient, err = yunikorn.NewApplicationClient() gomega.Ω(err).NotTo(gomega.HaveOccurred()) // error test case - apperrDef, err := common.GetAbsPath("../testdata/application_error.yaml") + // error queue format + appCRDDef, err := common.GetAbsPath("../testdata/app/application_err_queue.yaml") gomega.Ω(err).NotTo(gomega.HaveOccurred()) - _, err = yunikorn.GetApplicationObj(apperrDef) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) gomega.Ω(err).To(gomega.HaveOccurred()) - // correct test case - appCRDDef, err = common.GetAbsPath("../testdata/application.yaml") + // error gang name format + appCRDDef, err = common.GetAbsPath("../testdata/app/application_err_name.yaml") + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) + gomega.Ω(err).To(gomega.HaveOccurred()) + // error minMember format + appCRDDef, err = common.GetAbsPath("../testdata/app/application_err_minmember.yaml") + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) + gomega.Ω(err).To(gomega.HaveOccurred()) + // error minResource format + appCRDDef, err = common.GetAbsPath("../testdata/app/application_err_minresource.yaml") + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) + gomega.Ω(err).To(gomega.HaveOccurred()) + // error NodeSelector format + appCRDDef, err = common.GetAbsPath("../testdata/app/application_err_nodeselector.yaml") + gomega.Ω(err).NotTo(gomega.HaveOccurred()) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) + gomega.Ω(err).To(gomega.HaveOccurred()) + // error tolerations format + appCRDDef, err = common.GetAbsPath("../testdata/app/application_err_tolerations.yaml") gomega.Ω(err).NotTo(gomega.HaveOccurred()) - appCRDObj, err := yunikorn.GetApplicationObj(appCRDDef) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) + gomega.Ω(err).To(gomega.HaveOccurred()) + + // correct test case + appCRDDef, err = common.GetAbsPath("../testdata/app/application.yaml") gomega.Ω(err).NotTo(gomega.HaveOccurred()) - appCRDObj.Namespace = dev - err = yunikorn.CreateApplication(appClient, appCRDObj, dev) + err = k8s.ApplyYamlWithKubectl(appCRDDef, dev) gomega.Ω(err).NotTo(gomega.HaveOccurred()) appCRD, err = yunikorn.GetApplication(appClient, dev, "example") gomega.Ω(err).NotTo(gomega.HaveOccurred()) @@ -74,14 +98,19 @@ var _ = ginkgo.Describe("App", func() { gomega.Ω(appCRD.Spec.Queue).To(gomega.Equal("root.default")) gomega.Ω(appCRD.ObjectMeta.Name).To(gomega.Equal("example")) gomega.Ω(appCRD.ObjectMeta.Namespace).To(gomega.Equal(dev)) - policy := appCRD.Spec.Policy.Policy - gomega.Ω(string(policy)).To(gomega.Equal("TryOnce")) - gomega.Ω(appCRD.Spec.TaskGroup[0].GroupName).To(gomega.Equal("test-task-0001")) - gomega.Ω(appCRD.Spec.TaskGroup[0].MinMember).To(gomega.Equal(int32(1))) + policy := appCRD.Spec.SchedulingPolicy + gomega.Ω(policy.Type).To(gomega.Equal(v1alpha1.TryOnce)) + gomega.Ω(appCRD.Spec.TaskGroups[0].Name).To(gomega.Equal("test-task-0001")) + gomega.Ω(appCRD.Spec.TaskGroups[0].MinMember).To(gomega.Equal(int32(1))) anscpu := resource.MustParse("300m") ansmem := resource.MustParse("128Mi") - gomega.Ω(appCRD.Spec.TaskGroup[0].MinResource["cpu"]).To(gomega.Equal(anscpu)) - gomega.Ω(appCRD.Spec.TaskGroup[0].MinResource["memory"]).To(gomega.Equal(ansmem)) + gomega.Ω(appCRD.Spec.TaskGroups[0].MinResource["cpu"]).To(gomega.Equal(anscpu)) + gomega.Ω(appCRD.Spec.TaskGroups[0].MinResource["memory"]).To(gomega.Equal(ansmem)) + gomega.Ω(appCRD.Spec.TaskGroups[0].NodeSelector["locate"]).To(gomega.Equal("west")) + gomega.Ω(appCRD.Spec.TaskGroups[0].Tolerations[0].Key).To(gomega.Equal("key")) + gomega.Ω(appCRD.Spec.TaskGroups[0].Tolerations[0].Operator).To(gomega.Equal(v1.TolerationOpEqual)) + gomega.Ω(appCRD.Spec.TaskGroups[0].Tolerations[0].Value).To(gomega.Equal("value")) + gomega.Ω(appCRD.Spec.TaskGroups[0].Tolerations[0].Effect).To(gomega.Equal(v1.TaintEffectNoSchedule)) }) ginkgo.AfterSuite(func() { diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index fb1895ec0..3034b4bce 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -17,9 +17,11 @@ package k8s import ( + "bytes" "errors" "fmt" "os" + "os/exec" "path/filepath" "time" @@ -468,3 +470,18 @@ func (k *KubeCtl) RemoveYunikornSchedulerPodAnnotation(annotation string) error _, err = k.DeletePodAnnotation(&schedPod, configmanager.YuniKornTestConfig.YkNamespace, annotation) return err } + +func ApplyYamlWithKubectl(path, namespace string) error { + cmd := exec.Command("kubectl", "apply", "-f", path, "-n", namespace) + var stderr bytes.Buffer + cmd.Stderr = &stderr + // if err != nil, isn't represent yaml format error. + // it only represent the cmd.Run() fail. + err := cmd.Run() + // if yaml format error, errStr will show the detail + errStr := stderr.String() + if err != nil && errStr != "" { + return fmt.Errorf("apply fail with %s", errStr) + } + return nil +} diff --git a/test/e2e/testdata/application.yaml b/test/e2e/testdata/app/application.yaml similarity index 82% rename from test/e2e/testdata/application.yaml rename to test/e2e/testdata/app/application.yaml index dcdc26ca1..4d473548a 100644 --- a/test/e2e/testdata/application.yaml +++ b/test/e2e/testdata/app/application.yaml @@ -21,12 +21,19 @@ metadata: name: example spec: schedulingPolicy: - name: TryOnce + type: TryOnce queue: root.default taskGroups: - - groupName: "test-task-0001" + - name: "test-task-0001" minMember: 1 minResource: cpu: "300m" memory: "128Mi" + nodeSelector: + locate: west + tolerations: + - key: "key" + operator: "Equal" + value: "value" + effect: "NoSchedule" \ No newline at end of file diff --git a/test/e2e/testdata/app/application_err_minmember.yaml b/test/e2e/testdata/app/application_err_minmember.yaml new file mode 100644 index 000000000..f6ef954cc --- /dev/null +++ b/test/e2e/testdata/app/application_err_minmember.yaml @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#The error application example for error case +apiVersion: "yunikorn.apache.org/v1alpha1" +kind: Application +metadata: + name: error-minmember +spec: + queue: root.default + schedulingPolicy: + type: TryReserve + taskGroups: + - name: "test-task-0001" + minMember: "string" + minResource: + cpu: "300m" + memory: "128Mi" \ No newline at end of file diff --git a/test/e2e/testdata/app/application_err_minresource.yaml b/test/e2e/testdata/app/application_err_minresource.yaml new file mode 100644 index 000000000..510f4897b --- /dev/null +++ b/test/e2e/testdata/app/application_err_minresource.yaml @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#The error application example for error case +apiVersion: "yunikorn.apache.org/v1alpha1" +kind: Application +metadata: + name: error-minResource +spec: + queue: root.default + schedulingPolicy: + type: TryReserve + taskGroups: + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300A" + memory: "128Mi" \ No newline at end of file diff --git a/test/e2e/testdata/app/application_err_name.yaml b/test/e2e/testdata/app/application_err_name.yaml new file mode 100644 index 000000000..eaad3af15 --- /dev/null +++ b/test/e2e/testdata/app/application_err_name.yaml @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#The error application example for error case +apiVersion: "yunikorn.apache.org/v1alpha1" +kind: Application +metadata: + name: error-name +spec: + queue: root.default + schedulingPolicy: + type: TryReserve + taskGroups: + - name: 100 + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" \ No newline at end of file diff --git a/test/e2e/testdata/app/application_err_nodeselector.yaml b/test/e2e/testdata/app/application_err_nodeselector.yaml new file mode 100644 index 000000000..0a025988b --- /dev/null +++ b/test/e2e/testdata/app/application_err_nodeselector.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# The error application example for error case +apiVersion: "yunikorn.apache.org/v1alpha1" +kind: Application +metadata: + name: error-nodeselector +spec: + schedulingPolicy: + type: TryOnce + queue: root.default + taskGroups: + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" + nodeSelector: + locate: 1 \ No newline at end of file diff --git a/test/e2e/testdata/application_error.yaml b/test/e2e/testdata/app/application_err_queue.yaml similarity index 88% rename from test/e2e/testdata/application_error.yaml rename to test/e2e/testdata/app/application_err_queue.yaml index 32da826a4..5aa8afa6e 100644 --- a/test/e2e/testdata/application_error.yaml +++ b/test/e2e/testdata/app/application_err_queue.yaml @@ -19,15 +19,15 @@ apiVersion: "yunikorn.apache.org/v1alpha1" kind: Application metadata: - name: example_test + name: error-queue spec: queue: ////// schedulingPolicy: - name: errorName + type: TryReserve taskGroups: - - groupName: 123 - minMember: "string" + - name: "test-task-0001" + minMember: 1 minResource: - cpu: "ABC" - memory: "ABC" + cpu: "300m" + memory: "128Mi" \ No newline at end of file diff --git a/test/e2e/testdata/app/application_err_tolerations.yaml b/test/e2e/testdata/app/application_err_tolerations.yaml new file mode 100644 index 000000000..9d8ac1ee5 --- /dev/null +++ b/test/e2e/testdata/app/application_err_tolerations.yaml @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# The error application example for error case +apiVersion: "yunikorn.apache.org/v1alpha1" +kind: Application +metadata: + name: error-tolerations +spec: + schedulingPolicy: + type: TryOnce + queue: root.default + taskGroups: + - name: "test-task-0001" + minMember: 1 + minResource: + cpu: "300m" + memory: "128Mi" + tolerations: + - key: 1 + operator: "ERR" + value: "value" + effect: "ERR" \ No newline at end of file