diff --git a/apis/kubecluster.org/v1alpha1/cluster_defaults.go b/apis/kubecluster.org/v1alpha1/cluster_defaults.go index 671b17f..be4f615 100644 --- a/apis/kubecluster.org/v1alpha1/cluster_defaults.go +++ b/apis/kubecluster.org/v1alpha1/cluster_defaults.go @@ -16,7 +16,6 @@ package v1alpha1 import ( "k8s.io/apimachinery/pkg/runtime" - "k8s.io/utils/pointer" ) func addKubeClusterDefaultingFuncs(scheme *runtime.Scheme) error { @@ -39,11 +38,6 @@ func SetDefaults_KubeCluster(kcluster *KubeCluster) { } } -func setDefaultReplicas(replicaSpec *ReplicaSpec, replicas int32) { - if replicaSpec != nil && replicaSpec.Replicas == nil { - replicaSpec.Replicas = pointer.Int32(replicas) - } -} func SetDefaults_KubeClusterList(in *KubeClusterList) { for i := range in.Items { a := &in.Items[i] diff --git a/apis/kubecluster.org/v1alpha1/cluster_defaults_test.go b/apis/kubecluster.org/v1alpha1/cluster_defaults_test.go index 6554557..414ef3c 100644 --- a/apis/kubecluster.org/v1alpha1/cluster_defaults_test.go +++ b/apis/kubecluster.org/v1alpha1/cluster_defaults_test.go @@ -1 +1,69 @@ -package v1alpha1_test +package v1alpha1 + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + "testing" +) + +func TestSetDefaults_KubeCluster(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + t.Run("test empty kube cluster", func(t *testing.T) { + var replica2 int32 = 2 + kclusetr := &KubeCluster{Spec: ClusterSpec{ + ClusterType: "", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{ + "replica1": {}, "replica2": {Replicas: &replica2}}, + MainContainer: "", + RunPolicy: RunPolicy{}, + }} + SetDefaults_KubeCluster(kclusetr) + + gomega.Expect(kclusetr.Spec.MainContainer).To(gomega.Equal(ClusterDefaultContainerName)) + gomega.Expect(*kclusetr.Spec.RunPolicy.CleanKubeNodePolicy).To(gomega.Equal(CleanKubeNodePolicyAll)) + + gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica1"].Replicas).To(gomega.Equal(int32(1))) + gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica2"].Replicas).To(gomega.Equal(int32(2))) + }) +} + +func TestSetDefaults_KubeClusterList(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + var replica2 int32 = 2 + testCount := 10 + kclusterList := &KubeClusterList{Items: make([]KubeCluster, 0)} + for i := 0; i < testCount; i++ { + kclusetr := KubeCluster{Spec: ClusterSpec{ + ClusterType: "", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{ + "replica1": {}, "replica2": {Replicas: &replica2}}, + MainContainer: "", + RunPolicy: RunPolicy{}, + }} + kclusterList.Items = append(kclusterList.Items, kclusetr) + } + SetDefaults_KubeClusterList(kclusterList) + for i := 0; i < testCount; i++ { + t.Run("test empty kube cluster", func(t *testing.T) { + kclusetr := kclusterList.Items[i] + gomega.Expect(kclusetr.Spec.MainContainer).To(gomega.Equal(ClusterDefaultContainerName)) + gomega.Expect(*kclusetr.Spec.RunPolicy.CleanKubeNodePolicy).To(gomega.Equal(CleanKubeNodePolicyAll)) + + gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica1"].Replicas).To(gomega.Equal(int32(1))) + gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica2"].Replicas).To(gomega.Equal(int32(2))) + }) + } +} + +func Test_setDefaultReplicas(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + t.Run("test replicaSpec not set replica", func(t *testing.T) { + replicaSpec := &ReplicaSpec{} + setDefaultReplicas(replicaSpec, 1) + gomega.Expect(*replicaSpec.Replicas).To(gomega.Equal(int32(1))) + }) +} diff --git a/apis/kubecluster.org/v1alpha1/cluster_types.go b/apis/kubecluster.org/v1alpha1/cluster_types.go index f6f0a18..8f140da 100644 --- a/apis/kubecluster.org/v1alpha1/cluster_types.go +++ b/apis/kubecluster.org/v1alpha1/cluster_types.go @@ -58,7 +58,7 @@ type ClusterSpec struct { MainContainer string `json:"mainContainer,omitempty"` // `RunPolicy` encapsulates various runtime policies of the distributed training - // job, for example how to clean up resources and how long the job can stay + // cluster, for example how to clean up resources and how long the cluster can stay // active. RunPolicy RunPolicy `json:"runPolicy,omitempty"` } @@ -82,7 +82,7 @@ type ClusterStatus struct { // It is represented in RFC3339 form and is in UTC. LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"` - // Represents time when the job was completed. It is not guaranteed to + // Represents time when the cluster was completed. It is not guaranteed to // be set in happens-before order across separate operations. // It is represented in RFC3339 form and is in UTC. CompletionTime *metav1.Time `json:"completionTime,omitempty"` diff --git a/apis/kubecluster.org/v1alpha1/cluster_validation.go b/apis/kubecluster.org/v1alpha1/cluster_validation.go index 59918ea..0a2ba87 100644 --- a/apis/kubecluster.org/v1alpha1/cluster_validation.go +++ b/apis/kubecluster.org/v1alpha1/cluster_validation.go @@ -9,8 +9,8 @@ import ( ) func ValidateV1alphaCluster(cluster *KubeCluster) error { - if errors := apimachineryvalidation.NameIsDNS1035Label(cluster.ObjectMeta.Name, false); errors != nil { - return fmt.Errorf("TFCluster name is invalid: %v", errors) + if errs := apimachineryvalidation.NameIsDNS1035Label(cluster.ObjectMeta.Name, false); errs != nil { + return fmt.Errorf("TFCluster name is invalid: %v", errs) } if err := validateV1alphaClusterSpecs(cluster.Spec); err != nil { return err @@ -22,7 +22,7 @@ func validateV1alphaClusterSpecs(spec ClusterSpec) error { if len(spec.ClusterType) == 0 { return fmt.Errorf("KubeCluster is not valid: cluster type expected") } - if spec.ClusterReplicaSpec == nil { + if len(spec.ClusterReplicaSpec) == 0 { return fmt.Errorf("KubeCluster is not valid") } defaultContainerName := ClusterDefaultContainerName diff --git a/apis/kubecluster.org/v1alpha1/cluster_validation_test.go b/apis/kubecluster.org/v1alpha1/cluster_validation_test.go new file mode 100644 index 0000000..b86c50c --- /dev/null +++ b/apis/kubecluster.org/v1alpha1/cluster_validation_test.go @@ -0,0 +1,175 @@ +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "testing" +) + +func TestValidateV1alphaCluster(t *testing.T) { + validClusterSpec := ClusterSpec{ + ClusterType: "test", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": { + Replicas: pointer.Int32(1), + Template: ReplicaTemplate{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: ClusterDefaultContainerName, + Image: "centos", + }}, + }, + }, + }}, + } + type args struct { + cluster *KubeCluster + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + "valid kubeCluster", + args{ + &KubeCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: validClusterSpec, + }, + }, + false, + }, + { + "kubeCluster name does not meet DNS1035", + args{ + &KubeCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "0-test"}, + Spec: validClusterSpec, + }, + }, + true, + }, + { + "cluster type is empty", + args{ + &KubeCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: ClusterSpec{ClusterType: ""}, + }, + }, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := ValidateV1alphaCluster(tt.args.cluster); (err != nil) != tt.wantErr { + t.Errorf("ValidateV1alphaCluster() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } + Test_validateV1alphaClusterSpecs(t) +} + +func Test_validateV1alphaClusterSpecs(t *testing.T) { + type args struct { + spec ClusterSpec + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + "replica specs are empty", + args{ + ClusterSpec{ + ClusterType: "test", + ClusterReplicaSpec: make(map[ReplicaType]*ReplicaSpec), + }, + }, + true, + }, + { + "no containers", + args{ + ClusterSpec{ + ClusterType: "test", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": { + Replicas: pointer.Int32(1), + Template: ReplicaTemplate{ + Spec: corev1.PodSpec{}, + }, + }}, + }, + }, + true, + }, + { + "main container name doesn't present", + args{ + ClusterSpec{ + ClusterType: "test", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": { + Replicas: pointer.Int32(1), + Template: ReplicaTemplate{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Image: "centos", + }}, + }, + }, + }}, + }, + }, + true, + }, + { + "main container name not consistent", + args{ + ClusterSpec{ + ClusterType: "test", + MainContainer: "testmain", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": { + Replicas: pointer.Int32(1), + Template: ReplicaTemplate{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: ClusterDefaultContainerName, + Image: "centos", + }}, + }, + }, + }}, + }, + }, + true, + }, + { + "image is empty", + args{ + ClusterSpec{ + ClusterType: "test", + ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": { + Replicas: pointer.Int32(1), + Template: ReplicaTemplate{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: ClusterDefaultContainerName, + }}, + }, + }, + }}, + }, + }, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := validateV1alphaClusterSpecs(tt.args.spec); (err != nil) != tt.wantErr { + t.Errorf("validateV1alphaClusterSpecs() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/apis/kubecluster.org/v1alpha1/common_types.go b/apis/kubecluster.org/v1alpha1/common_types.go index ac36ccb..6e85709 100644 --- a/apis/kubecluster.org/v1alpha1/common_types.go +++ b/apis/kubecluster.org/v1alpha1/common_types.go @@ -72,7 +72,7 @@ type ReplicaSpec struct { // will be overide by RestartPolicy in ReplicaSpec Template ReplicaTemplate `json:"template"` - // Restart policy for all replicas within the job. + // Restart policy for all replicas within the cluster. // One of Always, OnFailure, Never and ExitCode. // Default to Never. RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"` diff --git a/apis/kubecluster.org/v1alpha1/defauting_utils.go b/apis/kubecluster.org/v1alpha1/defauting_utils.go index 8771ed7..367410a 100644 --- a/apis/kubecluster.org/v1alpha1/defauting_utils.go +++ b/apis/kubecluster.org/v1alpha1/defauting_utils.go @@ -42,7 +42,7 @@ func SetDefaultRestartPolicy(replicaSpec *ReplicaSpec, defaultRestartPolicy Rest } } -func SetDefaultReplicas(replicaSpec *ReplicaSpec, replicas int32) { +func setDefaultReplicas(replicaSpec *ReplicaSpec, replicas int32) { if replicaSpec != nil && replicaSpec.Replicas == nil { replicaSpec.Replicas = pointer.Int32(replicas) } diff --git a/cmd/main.go b/cmd/main.go index 27b9367..3db8bde 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -67,7 +67,7 @@ func main() { flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") - flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfjob --enable-scheme=pytorchjob, case insensitive."+ + flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfcluster --enable-scheme=pytorchcluster, case insensitive."+ " Now supporting TFCluster, PyTorchCluster, MXNetCluster, XGBoostCluster, PaddleCluster. By default, all supported schemes will be enabled.") flag.IntVar(&controllerThreads, "controller-threads", 1, "Number of worker threads used by the controller.") flag.StringVar(&gangSchedulerName, "gang-scheduler-name", "", "Now Supporting volcano and scheduler-plugins."+ @@ -137,7 +137,7 @@ func setupController(mgr ctrl.Manager, enabledSchemes cluster_schema.EnabledSche reconciler := controller.NewReconciler(mgr, gangSchedulingSetupFunc) - // TODO: We need a general manager. all rest ctrlutil addsToManager + // TODO: We need a general manager. all rest util addsToManager // Based on the user configuration, we start different controllers if enabledSchemes.Empty() { enabledSchemes.FillAll() diff --git a/docs/api/kubecluster.org_v1alpha1_generated.asciidoc b/docs/api/kubecluster.org_v1alpha1_generated.asciidoc index 9d34e29..60dee54 100644 --- a/docs/api/kubecluster.org_v1alpha1_generated.asciidoc +++ b/docs/api/kubecluster.org_v1alpha1_generated.asciidoc @@ -82,7 +82,7 @@ ClusterSpec defines the desired state of KubeCluster | *`clusterType`* __ClusterType__ | ClusterType define the type of the cluster to be created | *`clusterReplicaSpec`* __object (keys:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicaspec[$$ReplicaSpec$$])__ | ClusterType define the template of the cluster to be created | *`mainContainer`* __string__ | MainContainer specifies name of the main container which run as kubenode. -| *`runPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-runpolicy[$$RunPolicy$$]__ | `RunPolicy` encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active. +| *`runPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-runpolicy[$$RunPolicy$$]__ | `RunPolicy` encapsulates various runtime policies of the distributed training cluster, for example how to clean up resources and how long the cluster can stay active. |=== @@ -103,7 +103,7 @@ ClusterStatus defines the observed state of KubeCluster | *`replicaStatuses`* __object (keys:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicastatus[$$ReplicaStatus$$])__ | ReplicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica. | *`startTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents time when the KubeCluster was acknowledged by the KubeCluster controller. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC. | *`lastReconcileTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents last time when the KubeCluster was reconciled. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC. -| *`completionTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents time when the job was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC. +| *`completionTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents time when the cluster was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC. |=== @@ -164,7 +164,7 @@ ReplicaSpec is a description of the replica | Field | Description | *`replicas`* __integer__ | Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1. | *`template`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicatemplate[$$ReplicaTemplate$$]__ | Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec -| *`restartPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-restartpolicy[$$RestartPolicy$$]__ | Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never. +| *`restartPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-restartpolicy[$$RestartPolicy$$]__ | Restart policy for all replicas within the cluster. One of Always, OnFailure, Never and ExitCode. Default to Never. |=== diff --git a/docs/quick-start.md b/docs/quick-start.md index e3cf142..7e107bd 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -86,7 +86,7 @@ pbspro-centos-sample-cpu-0 sharing = default_shared last_state_change_time = Thu Sep 28 07:05:43 2023 ``` -Switch to the normal user and submit the job using [qsub](https://www.jlab.org/hpc/PBS/qsub.html) +Switch to the normal user and submit the cluster using [qsub](https://www.jlab.org/hpc/PBS/qsub.html) ```shell [root@pbspro-centos-sample-server-0 /]# useradd pbsexample [root@pbspro-centos-sample-server-0 /]# su pbsexample @@ -94,13 +94,13 @@ Switch to the normal user and submit the job using [qsub](https://www.jlab.org/h 2.pbspro-centos-sample-server-0 [pbsexample@pbspro-centos-sample-server-0 /]$ ``` -Use [qstat](https://docs.adaptivecomputing.com/torque/4-0-2/Content/topics/commands/qstat.htm) to view the job we just submitted +Use [qstat](https://docs.adaptivecomputing.com/torque/4-0-2/Content/topics/commands/qstat.htm) to view the cluster we just submitted ``` [pbsexample@pbspro-centos-sample-server-0 /]$ qstat -a pbspro-centos-sample-server-0: Req'd Req'd Elap -Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time +cluster ID Username Queue clustername SessID NDS TSK Memory Time S Time --------------- -------- -------- ---------- ------ --- --- ------ ----- - ----- 2.pbspro-centos pbsexamp workq STDIN 1377 1 1 -- -- E 00:00 ``` diff --git a/manifests/crd/bases/kubecluster.org_kubeclusters.yaml b/manifests/crd/bases/kubecluster.org_kubeclusters.yaml index 89b6620..a11b736 100644 --- a/manifests/crd/bases/kubecluster.org_kubeclusters.yaml +++ b/manifests/crd/bases/kubecluster.org_kubeclusters.yaml @@ -54,7 +54,7 @@ spec: format: int32 type: integer restartPolicy: - description: Restart policy for all replicas within the job. + description: Restart policy for all replicas within the cluster. One of Always, OnFailure, Never and ExitCode. Default to Never. type: string template: @@ -8173,8 +8173,8 @@ spec: type: string runPolicy: description: '`RunPolicy` encapsulates various runtime policies of - the distributed training job, for example how to clean up resources - and how long the job can stay active.' + the distributed training cluster, for example how to clean up resources + and how long the cluster can stay active.' properties: CleanKubeNodePolicy: description: CleanKubeNodePolicy defines the policy to kill pods @@ -8235,9 +8235,9 @@ spec: description: ClusterStatus defines the observed state of KubeCluster properties: completionTime: - description: Represents time when the job was completed. It is not - guaranteed to be set in happens-before order across separate operations. - It is represented in RFC3339 form and is in UTC. + description: Represents time when the cluster was completed. It is + not guaranteed to be set in happens-before order across separate + operations. It is represented in RFC3339 form and is in UTC. format: date-time type: string conditions: diff --git a/pkg/common/interface.go b/pkg/common/interface.go index 9d1678a..00797de 100644 --- a/pkg/common/interface.go +++ b/pkg/common/interface.go @@ -23,15 +23,15 @@ type ControllerInterface interface { GetClusterFromInformerCache(namespace, name string) (metav1.Object, error) // GetPodsForCluster returns the pods managed by the cluster. This can be achieved by selecting pods using label key "kubecluster-name" - // i.e. all pods created by the job will come with label "kubecluster-name" = + // i.e. all pods created by the cluster will come with label "kubecluster-name" = GetPodsForCluster(kcluster *kubeclusterorgv1alpha1.KubeCluster) ([]*v1.Pod, error) // GetConfigMapForCluster returns the pods managed by the cluster. This can be achieved by selecting pods using label key "kubecluster-name" - // i.e. all pods created by the job will come with label "kubecluster-name" = + // i.e. all pods created by the cluster will come with label "kubecluster-name" = GetConfigMapForCluster(kcluster *kubeclusterorgv1alpha1.KubeCluster) (*v1.ConfigMap, error) // GetServicesForCluster returns the services managed by the cluster. This can be achieved by selecting services using label key "kubecluster-name" - // i.e. all services created by the job will come with label "kubecluster-name" = + // i.e. all services created by the cluster will come with label "kubecluster-name" = GetServicesForCluster(kcluster *kubeclusterorgv1alpha1.KubeCluster) ([]*v1.Service, error) // DeleteCluster deletes the cluster @@ -40,18 +40,16 @@ type ControllerInterface interface { // UpdateClusterStatusInApiServer updates the cluster status in API server UpdateClusterStatusInApiServer(kcluster metav1.Object, clusterStatus *kubeclusterorgv1alpha1.ClusterStatus) error - // UpdateClusterStatus updates the job status and job conditions + // UpdateClusterStatus updates the cluster status and cluster conditions UpdateClusterStatus( kcluster *kubeclusterorgv1alpha1.KubeCluster, replicas map[kubeclusterorgv1alpha1.ReplicaType]*kubeclusterorgv1alpha1.ReplicaSpec, clusterStatus *kubeclusterorgv1alpha1.ClusterStatus, ) error - FilterServicesForReplicaType(services []*v1.Service, rt string) ([]*v1.Service, error) - GetServiceSlices(services []*v1.Service, replicas int, replica *log.Entry) [][]*v1.Service - // GetGroupNameLabelValue Returns the Group Name(value) in the labels of the job + // GetGroupNameLabelValue Returns the Group Name(value) in the labels of the cluster GetGroupNameLabelValue() string UpdateConfigMapInApiServer(metav1.Object, *v1.ConfigMap) error diff --git a/pkg/common/schema.go b/pkg/common/schema.go index c5af2a9..c08e116 100644 --- a/pkg/common/schema.go +++ b/pkg/common/schema.go @@ -35,7 +35,7 @@ type ClusterSchemaReconciler interface { index int, ) bool - // UpdateClusterStatus updates the job status and job conditions + // UpdateClusterStatus updates the cluster status and cluster conditions UpdateClusterStatus( kcluster *kubeclusterorgv1alpha1.KubeCluster, clusterStatus *kubeclusterorgv1alpha1.ClusterStatus, diff --git a/pkg/common/util.go b/pkg/common/util.go index ab4dd25..b9307fe 100644 --- a/pkg/common/util.go +++ b/pkg/common/util.go @@ -34,14 +34,14 @@ func ConvertServiceList(list []corev1.Service) []*corev1.Service { return ret } -// KubeClusterControlledPodList filter pod list owned by the job. -func KubeClusterControlledPodList(list []corev1.Pod, job metav1.Object) []*corev1.Pod { +// KubeClusterControlledPodList filter pod list owned by the cluster. +func KubeClusterControlledPodList(list []corev1.Pod, cluster metav1.Object) []*corev1.Pod { if list == nil { return nil } ret := make([]*corev1.Pod, 0, len(list)) for i := range list { - if !metav1.IsControlledBy(&list[i], job) { + if !metav1.IsControlledBy(&list[i], cluster) { continue } ret = append(ret, &list[i]) diff --git a/pkg/controller/ctrlutil/fake_workqueue.go b/pkg/common/util/fake_workqueue.go similarity index 99% rename from pkg/controller/ctrlutil/fake_workqueue.go rename to pkg/common/util/fake_workqueue.go index c61f41b..4428134 100644 --- a/pkg/controller/ctrlutil/fake_workqueue.go +++ b/pkg/common/util/fake_workqueue.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License -package ctrlutil +package util import "time" diff --git a/pkg/controller/ctrlutil/reconcile_generic.go b/pkg/common/util/reconcile_generic.go similarity index 87% rename from pkg/controller/ctrlutil/reconcile_generic.go rename to pkg/common/util/reconcile_generic.go index 7a8efa4..e9b8a17 100644 --- a/pkg/controller/ctrlutil/reconcile_generic.go +++ b/pkg/common/util/reconcile_generic.go @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License -package ctrlutil +package util import ( "fmt" kubeclusterorgv1alpha1 "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" - common2 "github.com/chriskery/kubecluster/pkg/common" + "github.com/chriskery/kubecluster/pkg/common" "github.com/chriskery/kubecluster/pkg/controller/cluster_schema" "github.com/chriskery/kubecluster/pkg/controller/ctrlcommon" "reflect" @@ -29,30 +29,30 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" ) -// GenExpectationGenericKey generates an expectation key for {Kind} of a job +// GenExpectationGenericKey generates an expectation key for {Kind} of a cluster func GenExpectationGenericKey(clusterKey string, replicaType string, pl string) string { return clusterKey + "/" + strings.ToLower(replicaType) + "/" + pl } // LoggerForGenericKind generates log entry for generic Kubernetes resource Kind func LoggerForGenericKind(obj metav1.Object, kind string) *log.Entry { - job := "" + cluster := "" if controllerRef := metav1.GetControllerOf(obj); controllerRef != nil { if controllerRef.Kind == kind { - job = obj.GetNamespace() + "." + controllerRef.Name + cluster = obj.GetNamespace() + "." + controllerRef.Name } } return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // In controller.go we log the key used with the workqueue. - "job": job, - kind: obj.GetNamespace() + "." + obj.GetName(), - "uid": obj.GetUID(), + "cluster": cluster, + kind: obj.GetNamespace() + "." + obj.GetName(), + "uid": obj.GetUID(), }) } // OnDependentCreateFuncGeneric modify expectations when dependent (pod/service) creation observed. -func OnDependentCreateFuncGeneric(schemaReconcilers map[cluster_schema.ClusterSchema]common2.ClusterSchemaReconciler) func(event.CreateEvent) bool { +func OnDependentCreateFuncGeneric(schemaReconcilers map[cluster_schema.ClusterSchema]common.ClusterSchemaReconciler) func(event.CreateEvent) bool { return func(e event.CreateEvent) bool { clusterType := e.Object.GetLabels()[kubeclusterorgv1alpha1.ClusterTypeLabel] if len(clusterType) == 0 { @@ -101,7 +101,7 @@ func OnDependentUpdateFuncGeneric(jc *ctrlcommon.ClusterController) func(updateE if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. - if job := resolveControllerRef(jc, oldObj.GetNamespace(), oldControllerRef); job != nil { + if cluster := resolveControllerRef(jc, oldObj.GetNamespace(), oldControllerRef); cluster != nil { logger.Infof("%s controller ref updated: %v, %v", kind, newObj, oldObj) return true } @@ -109,8 +109,8 @@ func OnDependentUpdateFuncGeneric(jc *ctrlcommon.ClusterController) func(updateE // If it has a controller ref, that's all that matters. if newControllerRef != nil { - job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef) - if job == nil { + cluster := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef) + if cluster == nil { return false } logger.Debugf("%s has a controller ref: %v, %v", kind, newObj, oldObj) @@ -121,7 +121,7 @@ func OnDependentUpdateFuncGeneric(jc *ctrlcommon.ClusterController) func(updateE } // OnDependentDeleteFuncGeneric modify expectations when dependent (pod/service) deletion observed. -func OnDependentDeleteFuncGeneric(schemaReconcilers map[cluster_schema.ClusterSchema]common2.ClusterSchemaReconciler) func(event.DeleteEvent) bool { +func OnDependentDeleteFuncGeneric(schemaReconcilers map[cluster_schema.ClusterSchema]common.ClusterSchemaReconciler) func(event.DeleteEvent) bool { return func(e event.DeleteEvent) bool { clusterType := e.Object.GetLabels()[kubeclusterorgv1alpha1.ClusterTypeLabel] if len(clusterType) == 0 { diff --git a/pkg/controller/ctrlutil/reconciler.go b/pkg/common/util/reconciler.go similarity index 87% rename from pkg/controller/ctrlutil/reconciler.go rename to pkg/common/util/reconciler.go index 24811f3..be919f6 100644 --- a/pkg/controller/ctrlutil/reconciler.go +++ b/pkg/common/util/reconciler.go @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License -package ctrlutil +package util import ( "fmt" kubeclusterorgv1alpha1 "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" - common2 "github.com/chriskery/kubecluster/pkg/common" + "github.com/chriskery/kubecluster/pkg/common" "github.com/chriskery/kubecluster/pkg/controller/cluster_schema" "github.com/chriskery/kubecluster/pkg/controller/ctrlcommon" "github.com/chriskery/kubecluster/pkg/controller/expectation" @@ -29,7 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" ) -// SatisfiedExpectations returns true if the required adds/dels for the given mxjob have been observed. +// SatisfiedExpectations returns true if the required adds/dels for the given mxcluster have been observed. // Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller // manager. func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, @@ -57,7 +57,7 @@ func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, } // OnDependentCreateFunc modify expectations when dependent (pod/service) creation observed. -func OnDependentCreateFunc(schemaReconcilers map[cluster_schema.ClusterSchema]common2.ClusterSchemaReconciler) func(event.CreateEvent) bool { +func OnDependentCreateFunc(schemaReconcilers map[cluster_schema.ClusterSchema]common.ClusterSchemaReconciler) func(event.CreateEvent) bool { return func(e event.CreateEvent) bool { clusterType := e.Object.GetLabels()[kubeclusterorgv1alpha1.ClusterTypeLabel] if len(clusterType) == 0 { @@ -67,7 +67,12 @@ func OnDependentCreateFunc(schemaReconcilers map[cluster_schema.ClusterSchema]co if !ok { return false } + return onDependentCreateFunc(exp)(e) + } +} +func onDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool { + return func(e event.CreateEvent) bool { rtype := e.Object.GetLabels()[kubeclusterorgv1alpha1.ReplicaTypeLabel] if len(rtype) == 0 { return false @@ -122,7 +127,7 @@ func OnDependentUpdateFunc(cc *ctrlcommon.ClusterController) func(updateEvent ev if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. - if job := resolveControllerRef(cc, oldObj.GetNamespace(), oldControllerRef); job != nil { + if cluster := resolveControllerRef(cc, oldObj.GetNamespace(), oldControllerRef); cluster != nil { logger.Infof("pod/service controller ref updated: %v, %v", newObj, oldObj) return true } @@ -130,8 +135,8 @@ func OnDependentUpdateFunc(cc *ctrlcommon.ClusterController) func(updateEvent ev // If it has a controller ref, that's all that matters. if newControllerRef != nil { - job := resolveControllerRef(cc, newObj.GetNamespace(), newControllerRef) - if job == nil { + cluster := resolveControllerRef(cc, newObj.GetNamespace(), newControllerRef) + if cluster == nil { return false } logger.Debugf("pod/service has a controller ref: %v, %v", newObj, oldObj) @@ -141,8 +146,8 @@ func OnDependentUpdateFunc(cc *ctrlcommon.ClusterController) func(updateEvent ev } } -// resolveControllerRef returns the job referenced by a ControllerRef, -// or nil if the ControllerRef could not be resolved to a matching job +// resolveControllerRef returns the cluster referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching cluster // of the correct Kind. func resolveControllerRef(cc *ctrlcommon.ClusterController, namespace string, controllerRef *metav1.OwnerReference) metav1.Object { // We can't look up by UID, so look up by Name and then verify UID. @@ -162,18 +167,8 @@ func resolveControllerRef(cc *ctrlcommon.ClusterController, namespace string, co return cluster } -// OnDependentDeleteFunc modify expectations when dependent (pod/service) deletion observed. -func OnDependentDeleteFunc(schemaReconcilers map[cluster_schema.ClusterSchema]common2.ClusterSchemaReconciler) func(event.DeleteEvent) bool { +func onDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool { return func(e event.DeleteEvent) bool { - clusterType := e.Object.GetLabels()[kubeclusterorgv1alpha1.ClusterTypeLabel] - if len(clusterType) == 0 { - return false - } - exp, ok := schemaReconcilers[cluster_schema.ClusterSchema(clusterType)] - if !ok { - return false - } - rtype := e.Object.GetLabels()[kubeclusterorgv1alpha1.ReplicaTypeLabel] if len(rtype) == 0 { return false @@ -198,3 +193,19 @@ func OnDependentDeleteFunc(schemaReconcilers map[cluster_schema.ClusterSchema]co return true } } + +// OnDependentDeleteFunc modify expectations when dependent (pod/service) deletion observed. +func OnDependentDeleteFunc(schemaReconcilers map[cluster_schema.ClusterSchema]common.ClusterSchemaReconciler) func(event.DeleteEvent) bool { + return func(e event.DeleteEvent) bool { + clusterType := e.Object.GetLabels()[kubeclusterorgv1alpha1.ClusterTypeLabel] + if len(clusterType) == 0 { + return false + } + exp, ok := schemaReconcilers[cluster_schema.ClusterSchema(clusterType)] + if !ok { + return false + } + + return onDependentDeleteFunc(exp)(e) + } +} diff --git a/pkg/common/util/reconciler_test.go b/pkg/common/util/reconciler_test.go new file mode 100644 index 0000000..924b64f --- /dev/null +++ b/pkg/common/util/reconciler_test.go @@ -0,0 +1,59 @@ +package util + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "github.com/chriskery/kubecluster/pkg/controller/expectation" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +func TestOnDependentXXXFunc(t *testing.T) { + + createfunc := onDependentCreateFunc(expectation.NewControllerExpectations()) + deletefunc := onDependentDeleteFunc(expectation.NewControllerExpectations()) + + for _, testCase := range []struct { + object client.Object + expect bool + }{ + { + // pod object with label is allowed + object: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha1.ReplicaTypeLabel: "Worker", + }, + }, + }, + expect: true, + }, + { + // service object without label is not allowed + object: &corev1.Service{}, + expect: false, + }, + { + // objects other than pod/service are not allowed + object: &corev1.ConfigMap{}, + expect: false, + }, + } { + ret := createfunc(event.CreateEvent{ + Object: testCase.object, + }) + if ret != testCase.expect { + t.Errorf("expect %t, but get %t", testCase.expect, ret) + } + ret = deletefunc(event.DeleteEvent{ + Object: testCase.object, + }) + if ret != testCase.expect { + t.Errorf("expect %t, but get %t", testCase.expect, ret) + } + + } +} diff --git a/pkg/controller/cluster_controller.go b/pkg/controller/cluster_controller.go index cea66e6..db81c6d 100644 --- a/pkg/controller/cluster_controller.go +++ b/pkg/controller/cluster_controller.go @@ -21,10 +21,10 @@ import ( "fmt" "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" "github.com/chriskery/kubecluster/pkg/common" + util3 "github.com/chriskery/kubecluster/pkg/common/util" "github.com/chriskery/kubecluster/pkg/controller/cluster_schema" "github.com/chriskery/kubecluster/pkg/controller/control" "github.com/chriskery/kubecluster/pkg/controller/ctrlcommon" - "github.com/chriskery/kubecluster/pkg/controller/ctrlutil" "github.com/chriskery/kubecluster/pkg/core" "github.com/chriskery/kubecluster/pkg/util" utillabels "github.com/chriskery/kubecluster/pkg/util/labels" @@ -98,7 +98,7 @@ func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc ctrlcommon.GangS ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder}, ConfigMapControl: control.RealConfigMapControl{KubeClient: kubeClientSet, Recorder: r.recorder}, SchemaReconcilerManager: make(map[cluster_schema.ClusterSchema]common.ClusterSchemaReconciler), - WorkQueue: &ctrlutil.FakeWorkQueue{}, + WorkQueue: &util3.FakeWorkQueue{}, } gangSchedulingSetupFunc(&r.ClusterController) @@ -139,7 +139,7 @@ func (r *KubeClusterReconciler) UpdateClusterStatusInApiServer( } common.ClearGeneratedFields(&kcluster.ObjectMeta) - // Cluster status passed in differs with status in job, update in basis of the passed in one. + // Cluster status passed in differs with status in cluster, update in basis of the passed in one. if !equality.Semantic.DeepEqual(&kcluster.Status, clusterStatus) { kcluster = kcluster.DeepCopy() kcluster.Status = *clusterStatus.DeepCopy() @@ -175,9 +175,9 @@ func (r *KubeClusterReconciler) UpdateClusterStatus( if clusterStatus.StartTime == nil { now := metav1.Now() clusterStatus.StartTime = &now - // clusterStatus a sync to check if job past ActiveDeadlineSeconds + // clusterStatus a sync to check if cluster past ActiveDeadlineSeconds if kcluster.Spec.RunPolicy.ActiveDeadlineSeconds != nil { - logger.Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *kcluster.Spec.RunPolicy.ActiveDeadlineSeconds) + logger.Infof("cluster with ActiveDeadlineSeconds will sync after %d seconds", *kcluster.Spec.RunPolicy.ActiveDeadlineSeconds) r.WorkQueue.AddAfter(clusterKey, time.Duration(*kcluster.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second) } } @@ -274,7 +274,7 @@ func (r *KubeClusterReconciler) needReconcile(kcluster *v1alpha1.KubeCluster, sc utilruntime.HandleError(fmt.Errorf("couldn't get clusterKey for kubecluster object %#v: %v", kcluster, err)) } replicaTypes := common.GetReplicaTypes(kcluster.Spec.ClusterReplicaSpec) - needReconcile := !ctrlutil.SatisfiedExpectations(schemaReconciler, clusterKey, replicaTypes) + needReconcile := !util3.SatisfiedExpectations(schemaReconciler, clusterKey, replicaTypes) return needReconcile } @@ -298,28 +298,28 @@ func (r *KubeClusterReconciler) SetupWithManager(mgr ctrl.Manager, controllerThr // eventHandler for owned objects eventHandler := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &v1alpha1.KubeCluster{}, handler.OnlyControllerOwner()) predicates := predicate.Funcs{ - CreateFunc: ctrlutil.OnDependentCreateFunc(r.SchemaReconcilerManager), - UpdateFunc: ctrlutil.OnDependentUpdateFunc(&r.ClusterController), - DeleteFunc: ctrlutil.OnDependentDeleteFunc(r.SchemaReconcilerManager), + CreateFunc: util3.OnDependentCreateFunc(r.SchemaReconcilerManager), + UpdateFunc: util3.OnDependentUpdateFunc(&r.ClusterController), + DeleteFunc: util3.OnDependentDeleteFunc(r.SchemaReconcilerManager), } // Create generic predicates genericPredicates := predicate.Funcs{ - CreateFunc: ctrlutil.OnDependentCreateFuncGeneric(r.SchemaReconcilerManager), - UpdateFunc: ctrlutil.OnDependentUpdateFuncGeneric(&r.ClusterController), - DeleteFunc: ctrlutil.OnDependentDeleteFuncGeneric(r.SchemaReconcilerManager), + CreateFunc: util3.OnDependentCreateFuncGeneric(r.SchemaReconcilerManager), + UpdateFunc: util3.OnDependentUpdateFuncGeneric(&r.ClusterController), + DeleteFunc: util3.OnDependentDeleteFuncGeneric(r.SchemaReconcilerManager), } - // inject watching for job related pod + // inject watching for cluster related pod if err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), eventHandler, predicates); err != nil { return err } - // inject watching for job related service + // inject watching for cluster related service if err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), eventHandler, predicates); err != nil { return err } // skip watching volcano PodGroup if volcano PodGroup is not installed if _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.GroupName, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version); err == nil { - // inject watching for job related volcano PodGroup + // inject watching for cluster related volcano PodGroup if err = c.Watch(source.Kind(mgr.GetCache(), &v1beta1.PodGroup{}), eventHandler, genericPredicates); err != nil { return err } @@ -327,7 +327,7 @@ func (r *KubeClusterReconciler) SetupWithManager(mgr ctrl.Manager, controllerThr // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed if _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, schedulerpluginsv1alpha1.SchemeGroupVersion.Version); err == nil { - // inject watching for job related scheduler-plugins PodGroup + // inject watching for cluster related scheduler-plugins PodGroup if err = c.Watch(source.Kind(mgr.GetCache(), &schedulerpluginsv1alpha1.PodGroup{}), eventHandler, genericPredicates); err != nil { return err } @@ -361,7 +361,7 @@ func (r *KubeClusterReconciler) GetAPIGroupVersion() schema.GroupVersion { return v1alpha1.GroupVersion } -// GetPodsForCluster returns the set of pods that this job should manage. +// GetPodsForCluster returns the set of pods that this cluster should manage. // It also reconciles ControllerRef by adopting/orphaning. // Note that the returned Pods are pointers into the cache. func (r *KubeClusterReconciler) GetPodsForCluster(kcluster *v1alpha1.KubeCluster) ([]*corev1.Pod, error) { @@ -414,11 +414,11 @@ func (r *KubeClusterReconciler) DeleteCluster(metaObject metav1.Object) error { } if err := r.Delete(context.Background(), kubecluster); err != nil { r.recorder.Eventf(kubecluster, corev1.EventTypeWarning, control.FailedDeletePodReason, "Error deleting: %v", err) - logrus.Error(err, "failed to delete job", "namespace", kubecluster.Namespace, "name", kubecluster.Name) + logrus.Error(err, "failed to delete cluster", "namespace", kubecluster.Namespace, "name", kubecluster.Name) return err } - r.recorder.Eventf(kubecluster, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted job: %v", kubecluster.Name) - logrus.Info("job deleted", "namespace", kubecluster.Namespace, "name", kubecluster.Name) + r.recorder.Eventf(kubecluster, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted cluster: %v", kubecluster.Name) + logrus.Info("cluster deleted", "namespace", kubecluster.Namespace, "name", kubecluster.Name) commonmetrics.DeletedclustersCounterInc(kubecluster.Namespace, kubecluster.Spec.ClusterType) return nil } diff --git a/pkg/controller/cluster_controller_test.go b/pkg/controller/cluster_controller_test.go new file mode 100644 index 0000000..6f14292 --- /dev/null +++ b/pkg/controller/cluster_controller_test.go @@ -0,0 +1,418 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +//var _ = Describe("KubeCluster controller", func() { +// +// Context("When creating the Slurm Schema Cluster", func() { +// const name = "test-cluster" +// var ( +// ns *corev1.Namespace +// cluster *v1alpha1.KubeCluster +// clusterKey types.NamespacedName +// masterKey types.NamespacedName +// worker0Key types.NamespacedName +// ctx = context.Background() +// ) +// BeforeEach(func() { +// ns = &corev1.Namespace{ +// ObjectMeta: metav1.ObjectMeta{ +// GenerateName: "cluster-test-", +// }, +// } +// Expect(k8sClient.Create(ctx, ns)).Should(Succeed()) +// +// cluster = newClusterForTest(name, ns.Name) +// clusterKey = client.ObjectKeyFromObject(cluster) +// masterKey = types.NamespacedName{ +// Name: fmt.Sprintf("%s-master-0", name), +// Namespace: ns.Name, +// } +// worker0Key = types.NamespacedName{ +// Name: fmt.Sprintf("%s-worker-0", name), +// Namespace: ns.Name, +// } +// cluster.Spec.ClusterReplicaSpec = map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ +// slurm_schema.SchemaReplicaTypeController: { +// Replicas: pointer.Int32(1), +// Template: v1alpha1.ReplicaTemplate{ +// Spec: corev1.PodSpec{ +// Containers: []corev1.Container{ +// { +// Image: "test-image", +// Name: v1alpha1.ClusterDefaultContainerName, +// }, +// }, +// }, +// }, +// }, +// "worker": { +// Replicas: pointer.Int32(2), +// Template: v1alpha1.ReplicaTemplate{ +// Spec: corev1.PodSpec{ +// Containers: []corev1.Container{ +// { +// Image: "test-image", +// Name: v1alpha1.ClusterDefaultContainerName, +// }, +// }, +// }, +// }, +// }, +// } +// }) +// AfterEach(func() { +// Expect(k8sClient.Delete(ctx, cluster)).Should(Succeed()) +// Expect(k8sClient.Delete(ctx, ns)).Should(Succeed()) +// }) +// It("Should get the corresponding resources successfully", func() { +// By("By creating a new Cluster") +// Expect(k8sClient.Create(ctx, cluster)).Should(Succeed()) +// +// created := &v1alpha1.KubeCluster{} +// +// // We'll need to retry getting this newly created Cluster, given that creation may not immediately happen. +// Eventually(func() bool { +// err := k8sClient.Get(ctx, clusterKey, created) +// return err == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// +// masterPod := &corev1.Pod{} +// Eventually(func() bool { +// err := k8sClient.Get(ctx, masterKey, masterPod) +// return err == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// +// masterSvc := &corev1.Service{} +// Eventually(func() bool { +// err := k8sClient.Get(ctx, masterKey, masterSvc) +// return err == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// +// // Check the pod port. +// Expect(masterPod.Spec.Containers[0].Ports).To(ContainElement(corev1.ContainerPort{ +// Name: slurm_schema.SlurmdPortName, +// Protocol: corev1.ProtocolTCP})) +// Expect(masterPod.Spec.Containers[0].Ports).To(ContainElement(corev1.ContainerPort{ +// Name: slurm_schema.SlurmctlPortName, +// Protocol: corev1.ProtocolTCP})) +// +// // Check service port. +// Expect(func() bool { return len(masterSvc.Spec.Ports) >= 2 }).To(Succeed()) +// // Check owner reference. +// trueVal := true +// Expect(masterPod.OwnerReferences).To(ContainElement(metav1.OwnerReference{ +// APIVersion: v1alpha1.SchemeGroupVersion.String(), +// Kind: v1alpha1.KubeClusterKind, +// Name: name, +// UID: created.UID, +// Controller: &trueVal, +// BlockOwnerDeletion: &trueVal, +// })) +// Expect(masterSvc.OwnerReferences).To(ContainElement(metav1.OwnerReference{ +// APIVersion: v1alpha1.SchemeGroupVersion.String(), +// Kind: v1alpha1.KubeClusterKind, +// Name: name, +// UID: created.UID, +// Controller: &trueVal, +// BlockOwnerDeletion: &trueVal, +// })) +// +// // Test cluster status. +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) +// masterPod.Status.Phase = corev1.PodRunning +// return k8sClient.Status().Update(ctx, masterPod) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// Eventually(func() bool { +// err := k8sClient.Get(ctx, clusterKey, created) +// if err != nil { +// return false +// } +// return created.Status.ReplicaStatuses != nil && created.Status. +// ReplicaStatuses[slurm_schema.SchemaReplicaTypeController].Active == 1 +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// // Check if the cluster is succeeded. +// cond := getCondition(created.Status, v1alpha1.ClusterRunning) +// Expect(cond.Status).To(Equal(corev1.ConditionTrue)) +// }) +// +// It("Shouldn't create resources if Cluster is suspended", func() { +// By("By creating a new Cluster with suspend=true") +// cluster.Spec.RunPolicy.Suspend = pointer.Bool(true) +// cluster.Spec.ClusterReplicaSpec["worker"].Replicas = pointer.Int32(1) +// Expect(k8sClient.Create(ctx, cluster)).Should(Succeed()) +// +// created := &v1alpha1.KubeCluster{} +// masterPod := &corev1.Pod{} +// workerPod := &corev1.Pod{} +// masterSvc := &corev1.Service{} +// workerSvc := &corev1.Service{} +// +// By("Checking created Cluster") +// Eventually(func() bool { +// err := k8sClient.Get(ctx, clusterKey, created) +// return err == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// By("Checking created Cluster has a nil startTime") +// Consistently(func() *metav1.Time { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.StartTime +// }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) +// +// By("Checking if the pods and services aren't created") +// Consistently(func() bool { +// errMasterPod := k8sClient.Get(ctx, masterKey, masterPod) +// errWorkerPod := k8sClient.Get(ctx, worker0Key, workerPod) +// errMasterSvc := k8sClient.Get(ctx, masterKey, masterSvc) +// errWorkerSvc := k8sClient.Get(ctx, worker0Key, workerSvc) +// return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && +// errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) +// }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) +// +// By("Checking if the Cluster has suspended condition") +// Eventually(func() []v1alpha1.ClusterCondition { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.Conditions +// }, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]v1alpha1.ClusterCondition{ +// { +// Type: v1alpha1.ClusterCreated, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterCreatedReason), +// Message: fmt.Sprintf("Cluster %s is created.", name), +// }, +// { +// Type: v1alpha1.ClusterSuspended, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterSuspendedReason), +// Message: fmt.Sprintf("Cluster %s is suspended.", name), +// }, +// }, testutil.IgnoreClusterConditionsTimes)) +// }) +// +// It("Should delete resources after Cluster is suspended; Should resume Cluster after Cluster is unsuspended", func() { +// By("By creating a new Cluster") +// cluster.Spec.ClusterReplicaSpec["worker"].Replicas = pointer.Int32(1) +// Expect(k8sClient.Create(ctx, cluster)).Should(Succeed()) +// +// created := &v1alpha1.KubeCluster{} +// masterPod := &corev1.Pod{} +// workerPod := &corev1.Pod{} +// masterSvc := &corev1.Service{} +// workerSvc := &corev1.Service{} +// +// // We'll need to retry getting this newly created Cluster, given that creation may not immediately happen. +// By("Checking created Cluster") +// Eventually(func() bool { +// err := k8sClient.Get(ctx, clusterKey, created) +// return err == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// +// var startTimeBeforeSuspended *metav1.Time +// Eventually(func() *metav1.Time { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// startTimeBeforeSuspended = created.Status.StartTime +// return startTimeBeforeSuspended +// }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) +// +// By("Checking the created pods and services") +// Eventually(func() bool { +// errMaster := k8sClient.Get(ctx, masterKey, masterPod) +// errWorker := k8sClient.Get(ctx, worker0Key, workerPod) +// return errMaster == nil && errWorker == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// Eventually(func() bool { +// errMaster := k8sClient.Get(ctx, masterKey, masterSvc) +// errWorker := k8sClient.Get(ctx, worker0Key, workerSvc) +// return errMaster == nil && errWorker == nil +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// +// By("Updating the pod's phase with Running") +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) +// masterPod.Status.Phase = corev1.PodRunning +// return k8sClient.Status().Update(ctx, masterPod) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) +// workerPod.Status.Phase = corev1.PodRunning +// return k8sClient.Status().Update(ctx, workerPod) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// +// By("Checking the Cluster's condition") +// Eventually(func() []v1alpha1.ClusterCondition { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.Conditions +// }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]v1alpha1.ClusterCondition{ +// { +// Type: v1alpha1.ClusterCreated, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterCreatedReason), +// Message: fmt.Sprintf("Cluster %s is created.", name), +// }, +// { +// Type: v1alpha1.ClusterRunning, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterRunningReason), +// Message: fmt.Sprintf("Cluster %s is running.", name), +// }, +// }, testutil.IgnoreClusterConditionsTimes)) +// +// By("Updating the Cluster with suspend=true") +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// created.Spec.RunPolicy.Suspend = pointer.Bool(true) +// return k8sClient.Update(ctx, created) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// +// By("Checking if the pods and services are removed") +// Eventually(func() bool { +// errMaster := k8sClient.Get(ctx, masterKey, masterPod) +// errWorker := k8sClient.Get(ctx, worker0Key, workerPod) +// return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// Eventually(func() bool { +// errMaster := k8sClient.Get(ctx, masterKey, masterSvc) +// errWorker := k8sClient.Get(ctx, worker0Key, workerSvc) +// return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// Consistently(func() bool { +// errMasterPod := k8sClient.Get(ctx, masterKey, masterPod) +// errWorkerPod := k8sClient.Get(ctx, worker0Key, workerPod) +// errMasterSvc := k8sClient.Get(ctx, masterKey, masterSvc) +// errWorkerSvc := k8sClient.Get(ctx, worker0Key, workerSvc) +// return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && +// errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) +// }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) +// +// By("Checking if the Cluster has a suspended condition") +// Eventually(func() bool { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.ReplicaStatuses["worker"].Active == 0 && +// created.Status.StartTime.Equal(startTimeBeforeSuspended) +// }, testutil.Timeout, testutil.Interval).Should(BeTrue()) +// Consistently(func() bool { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.ReplicaStatuses[slurm_schema.SchemaReplicaTypeController].Active == 0 && +// created.Status.StartTime.Equal(startTimeBeforeSuspended) +// }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) +// Expect(created.Status.Conditions).Should(BeComparableTo([]v1alpha1.ClusterCondition{ +// { +// Type: v1alpha1.ClusterCreated, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterCreatedReason), +// Message: fmt.Sprintf("Cluster %s is created.", name), +// }, +// { +// Type: v1alpha1.ClusterRunning, +// Status: corev1.ConditionFalse, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterSuspendedReason), +// Message: fmt.Sprintf("Cluster %s is suspended.", name), +// }, +// { +// Type: v1alpha1.ClusterSuspended, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterSuspendedReason), +// Message: fmt.Sprintf("Cluster %s is suspended.", name), +// Status: corev1.ConditionTrue, +// }, +// }, testutil.IgnoreClusterConditionsTimes)) +// +// By("Unsuspending the Cluster") +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// created.Spec.RunPolicy.Suspend = pointer.Bool(false) +// return k8sClient.Update(ctx, created) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// Eventually(func() *metav1.Time { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.StartTime +// }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) +// +// By("Check if the pods and services are created") +// Eventually(func() error { +// return k8sClient.Get(ctx, masterKey, masterPod) +// }, testutil.Timeout, testutil.Interval).Should(BeNil()) +// Eventually(func() error { +// return k8sClient.Get(ctx, worker0Key, workerPod) +// }, testutil.Timeout, testutil.Interval).Should(BeNil()) +// Eventually(func() error { +// return k8sClient.Get(ctx, masterKey, masterSvc) +// }, testutil.Timeout, testutil.Interval).Should(BeNil()) +// Eventually(func() error { +// return k8sClient.Get(ctx, worker0Key, workerSvc) +// }, testutil.Timeout, testutil.Interval).Should(BeNil()) +// +// By("Updating Pod's condition with running") +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) +// masterPod.Status.Phase = corev1.PodRunning +// return k8sClient.Status().Update(ctx, masterPod) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// Eventually(func() error { +// Expect(k8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) +// workerPod.Status.Phase = corev1.PodRunning +// return k8sClient.Status().Update(ctx, workerPod) +// }, testutil.Timeout, testutil.Interval).Should(Succeed()) +// +// By("Checking if the Cluster has resumed conditions") +// Eventually(func() []v1alpha1.ClusterCondition { +// Expect(k8sClient.Get(ctx, clusterKey, created)).Should(Succeed()) +// return created.Status.Conditions +// }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]v1alpha1.ClusterCondition{ +// { +// Type: v1alpha1.ClusterCreated, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterCreatedReason), +// Message: fmt.Sprintf("Cluster %s is created.", name), +// }, +// { +// Type: v1alpha1.ClusterSuspended, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterResumedReason), +// Message: fmt.Sprintf("Cluster %s is resumed.", name), +// Status: corev1.ConditionFalse, +// }, +// { +// Type: v1alpha1.ClusterRunning, +// Status: corev1.ConditionTrue, +// Reason: util.NewReason(v1alpha1.KubeClusterKind, util.ClusterRunningReason), +// Message: fmt.Sprintf("Cluster %s is running.", name), +// }, +// }, testutil.IgnoreClusterConditionsTimes)) +// +// By("Checking if the startTime is updated") +// Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) +// }) +// }) +// +//}) +// +//func newClusterForTest(name, namespace string) *v1alpha1.KubeCluster { +// return &v1alpha1.KubeCluster{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: name, +// Namespace: namespace, +// }, +// } +//} +// +//// getCondition returns the condition with the provided type. +//func getCondition(status v1alpha1.ClusterStatus, condType v1alpha1.ClusterConditionType) *v1alpha1.ClusterCondition { +// for _, condition := range status.Conditions { +// if condition.Type == condType { +// return &condition +// } +// } +// return nil +//} diff --git a/pkg/controller/cluster_schema/pbspro_schema/initcontainer_test.go b/pkg/controller/cluster_schema/pbspro_schema/initcontainer_test.go new file mode 100644 index 0000000..7530f7d --- /dev/null +++ b/pkg/controller/cluster_schema/pbspro_schema/initcontainer_test.go @@ -0,0 +1,145 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +package pbspro_schema + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "github.com/chriskery/kubecluster/pkg/common" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "strconv" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/utils/pointer" +) + +func TestInitContainer(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + config.pbsproSchemaInitContainerImage = pbsproSchemaInitContainerImageDefault + config.pbsproSchemaInitContainerTemplateFile = pbsproSchemaInitContainerTemplateFileDefault + config.pbsproSchemaInitContainerMaxTries = pbsproSchemaInitContainerMaxTriesDefault + + var replicaTypeWorker v1alpha1.ReplicaType = "Worker" + + testCases := []struct { + kubecluster *v1alpha1.KubeCluster + rtype v1alpha1.ReplicaType + index string + expected int + exepctedErr error + }{ + { + kubecluster: &v1alpha1.KubeCluster{ + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + SchemaReplicaTypeServer: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + rtype: SchemaReplicaTypeServer, + index: "0", + expected: 1, + exepctedErr: nil, + }, + { + kubecluster: &v1alpha1.KubeCluster{ + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + replicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + SchemaReplicaTypeServer: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + rtype: replicaTypeWorker, + index: "0", + expected: 1, + exepctedErr: nil, + }, + { + kubecluster: &v1alpha1.KubeCluster{ + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + replicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + SchemaReplicaTypeServer: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + rtype: SchemaReplicaTypeServer, + index: "0", + expected: 1, + exepctedErr: nil, + }, + } + + for _, t := range testCases { + podTemplateSpec := t.kubecluster.Spec.ClusterReplicaSpec[t.rtype].Template + err := setInitContainer(t.kubecluster, (*corev1.PodTemplateSpec)(&podTemplateSpec), t.rtype) + if t.exepctedErr == nil { + gomega.Expect(err).To(gomega.BeNil()) + } else { + gomega.Expect(err).To(gomega.Equal(t.exepctedErr)) + } + gomega.Expect(len(podTemplateSpec.Spec.InitContainers) >= t.expected).To(gomega.BeTrue()) + } +} + +func TestGetInitContainer(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + g := getInitContainerGenerator() + testCases := []struct { + kubecluster *v1alpha1.KubeCluster + expected int + exepctedErr error + }{ + { + kubecluster: &v1alpha1.KubeCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + SchemaReplicaTypeServer: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + expected: 1, + exepctedErr: nil, + }, + } + + for _, t := range testCases { + controllerAddress := common.GenGeneralName(t.kubecluster.Name, SchemaReplicaTypeServer, strconv.Itoa(0)) + initContainers, err := g.GetInitContainer(controllerAddress) + if t.exepctedErr == nil { + gomega.Expect(err).To(gomega.BeNil()) + } else { + gomega.Expect(err).To(gomega.Equal(t.exepctedErr)) + } + gomega.Expect(len(initContainers) >= t.expected).To(gomega.BeTrue()) + } +} diff --git a/pkg/controller/cluster_schema/pbspro_schema/pbspro_schema.go b/pkg/controller/cluster_schema/pbspro_schema/pbspro_schema.go index 9206810..f3a1907 100644 --- a/pkg/controller/cluster_schema/pbspro_schema/pbspro_schema.go +++ b/pkg/controller/cluster_schema/pbspro_schema/pbspro_schema.go @@ -80,7 +80,7 @@ func (p *pbsproClusterSchemaReconciler) UpdateClusterStatus( status := clusterStatus.ReplicaStatuses[rtype] // Generate the label selector. - //status.Selector = metav1.FormatLabelSelector(r.GenLabelSelector(pytorchjob.Name, rtype)) + //status.Selector = metav1.FormatLabelSelector(r.GenLabelSelector(pytorchcluster.Name, rtype)) running := status.Active failed := status.Failed @@ -99,8 +99,8 @@ func (p *pbsproClusterSchemaReconciler) UpdateClusterStatus( } if failed > 0 { - // For the situation that jobStatus has a restarting condition, and append a running condition, - // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), + // For the situation that clusterStatus has a restarting condition, and append a running condition, + // the restarting condition will be removed from clusterStatus by filterOutCondition(), // so we need to record the existing restarting condition for later use. var existingRestartingCondition *kubeclusterorgv1alpha1.ClusterCondition for _, condition := range clusterStatus.Conditions { @@ -112,12 +112,12 @@ func (p *pbsproClusterSchemaReconciler) UpdateClusterStatus( } } - // For the situation that jobStatus has a restarting condition, and appends a new running condition, - // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), - // so we need to append the restarting condition back to jobStatus. + // For the situation that clusterStatus has a restarting condition, and appends a new running condition, + // the restarting condition will be removed from clusterStatus by filterOutCondition(), + // so we need to append the restarting condition back to clusterStatus. if existingRestartingCondition != nil { util.UpdateClusterConditions(clusterStatus, kubeclusterorgv1alpha1.ClusterRestarting, corev1.ConditionTrue, existingRestartingCondition.Reason, existingRestartingCondition.Message) - // job is restarting, no need to set it failed + // cluster is restarting, no need to set it failed // we know it because we update the status condition when reconciling the replicas common.RestartedClustersCounterInc(kcluster.GetNamespace(), kcluster.Spec.ClusterType) } else { diff --git a/pkg/controller/cluster_schema/register_cluster_schema_test.go b/pkg/controller/cluster_schema/register_cluster_schema_test.go new file mode 100644 index 0000000..8ef276f --- /dev/null +++ b/pkg/controller/cluster_schema/register_cluster_schema_test.go @@ -0,0 +1,61 @@ +package cluster_schema + +import ( + "github.com/chriskery/kubecluster/pkg/controller/cluster_schema/pbspro_schema" + "github.com/chriskery/kubecluster/pkg/controller/cluster_schema/slurm_schema" + "testing" +) + +func TestEnabledSchemes(t *testing.T) { + testES := EnabledSchemes{} + + if testES.String() != "" { + t.Errorf("empty EnabledSchemes converted no-empty string %s", testES.String()) + } + + if !testES.Empty() { + t.Error("Empty method returned false for empty EnabledSchemes") + } + + if testES.Set(slurm_schema.ClusterSchemaKind) != nil { + t.Error("failed to restore Slurm schema") + } else { + stored := false + for _, kind := range testES { + if kind == slurm_schema.ClusterSchemaKind { + stored = true + } + } + if !stored { + t.Errorf("%s not successfully registered", slurm_schema.ClusterSchemaKind) + } + } + + if testES.Set(pbspro_schema.ClusterSchemaKind) != nil { + t.Error("failed to restore Slurm schema") + } else { + stored := false + for _, kind := range testES { + if kind == pbspro_schema.ClusterSchemaKind { + stored = true + } + } + if !stored { + t.Errorf("%s not successfully registered", pbspro_schema.ClusterSchemaKind) + } + } + dummycluster := "dummycluster" + if testES.Set(dummycluster) == nil { + t.Errorf("successfully registered non-supported cluster %s", dummycluster) + } + + if testES.Empty() { + t.Error("Empty method returned true for non-empty EnabledSchemes") + } + + es2 := EnabledSchemes{} + es2.FillAll() + if es2.Empty() { + t.Error("Empty method returned true for fully registered EnabledSchemes") + } +} diff --git a/pkg/controller/cluster_schema/slurm_schema/config.go b/pkg/controller/cluster_schema/slurm_schema/config.go index 027a38c..67f86d4 100644 --- a/pkg/controller/cluster_schema/slurm_schema/config.go +++ b/pkg/controller/cluster_schema/slurm_schema/config.go @@ -4,28 +4,28 @@ import "flag" // Config is the global configuration for the training operator. var config struct { - SlurmSchemaInitContainerTemplateFile string - SlurmSchemaInitContainerImage string - SlurmSchemaInitContainerMaxTries int + slurmSchemaInitContainerTemplateFile string + slurmSchemaInitContainerImage string + slurmSchemaInitContainerMaxTries int } const ( - // SlurmSchemaInitContainerImageDefault is the default image for the SlurmSchema + // slurmSchemaInitContainerImageDefault is the default image for the SlurmSchema // init container. - SlurmSchemaInitContainerImageDefault = "registry.cn-shanghai.aliyuncs.com/eflops-bcp/slurm-minimal:v1" - // SlurmSchemaInitContainerTemplateFileDefault is the default template file for + slurmSchemaInitContainerImageDefault = "registry.cn-shanghai.aliyuncs.com/eflops-bcp/slurm-minimal:v1" + // slurmSchemaInitContainerTemplateFileDefault is the default template file for // the SlurmSchema init container. - SlurmSchemaInitContainerTemplateFileDefault = "/etc/config/initContainer.yaml" - // SlurmSchemaInitContainerMaxTriesDefault is the default number of tries for the SlurmSchema init container. - SlurmSchemaInitContainerMaxTriesDefault = 100 + slurmSchemaInitContainerTemplateFileDefault = "/etc/config/initContainer.yaml" + // slurmSchemaInitContainerMaxTriesDefault is the default number of tries for the SlurmSchema init container. + slurmSchemaInitContainerMaxTriesDefault = 100 ) func init() { // SlurmSchema related flags - flag.StringVar(&config.SlurmSchemaInitContainerImage, "SlurmSchema-init-container-image", - SlurmSchemaInitContainerImageDefault, "The image for SlurmSchema init container") - flag.StringVar(&config.SlurmSchemaInitContainerTemplateFile, "SlurmSchema-init-container-template-file", - SlurmSchemaInitContainerTemplateFileDefault, "The template file for SlurmSchema init container") - flag.IntVar(&config.SlurmSchemaInitContainerMaxTries, "SlurmSchema-init-container-max-tries", - SlurmSchemaInitContainerMaxTriesDefault, "The number of tries for the SlurmSchema init container") + flag.StringVar(&config.slurmSchemaInitContainerImage, "SlurmSchema-init-container-image", + slurmSchemaInitContainerImageDefault, "The image for SlurmSchema init container") + flag.StringVar(&config.slurmSchemaInitContainerTemplateFile, "SlurmSchema-init-container-template-file", + slurmSchemaInitContainerTemplateFileDefault, "The template file for SlurmSchema init container") + flag.IntVar(&config.slurmSchemaInitContainerMaxTries, "SlurmSchema-init-container-max-tries", + slurmSchemaInitContainerMaxTriesDefault, "The number of tries for the SlurmSchema init container") } diff --git a/pkg/controller/cluster_schema/slurm_schema/configmap.go b/pkg/controller/cluster_schema/slurm_schema/configmap.go index 0dc57cf..9c550e7 100644 --- a/pkg/controller/cluster_schema/slurm_schema/configmap.go +++ b/pkg/controller/cluster_schema/slurm_schema/configmap.go @@ -54,7 +54,7 @@ var ( "TaskPlugin=task/none \n" + "################################################\n# ACCOUNTING #\n################################################\n" + "AccountingStorageType=accounting_storage/none \n" + - "JobAcctGatherType=jobacct_gather/none \n" + + "clusterAcctGatherType=clusteracct_gather/none \n" + "GresTypes=gpu \n" resourceTypeGpu = "gpu" ) diff --git a/pkg/controller/cluster_schema/slurm_schema/initcontainer.go b/pkg/controller/cluster_schema/slurm_schema/initcontainer.go index 8b87b7f..22f0f07 100644 --- a/pkg/controller/cluster_schema/slurm_schema/initcontainer.go +++ b/pkg/controller/cluster_schema/slurm_schema/initcontainer.go @@ -55,9 +55,9 @@ type initContainerGenerator struct { func getInitContainerGenerator() *initContainerGenerator { onceInitContainer.Do(func() { icGenerator = &initContainerGenerator{ - template: getInitContainerTemplateOrDefault(config.SlurmSchemaInitContainerTemplateFile), - image: config.SlurmSchemaInitContainerImage, - maxTries: config.SlurmSchemaInitContainerMaxTries, + template: getInitContainerTemplateOrDefault(config.slurmSchemaInitContainerTemplateFile), + image: config.slurmSchemaInitContainerImage, + maxTries: config.slurmSchemaInitContainerMaxTries, } }) return icGenerator @@ -108,9 +108,6 @@ func setInitContainer( kcluster *kubeclusterorgv1alpha1.KubeCluster, podTemplate *corev1.PodTemplateSpec, rtype kubeclusterorgv1alpha1.ReplicaType, - index string, - port int, - slurmdPort int, ) error { g := getInitContainerGenerator() controllerAddress := common.GenGeneralName(kcluster.Name, SchemaReplicaTypeController, strconv.Itoa(0)) @@ -129,7 +126,6 @@ func setInitContainer( } //we only need to change tha last - podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, - initContainers...) + podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, initContainers...) return nil } diff --git a/pkg/controller/cluster_schema/slurm_schema/initcontainer_test.go b/pkg/controller/cluster_schema/slurm_schema/initcontainer_test.go new file mode 100644 index 0000000..1aea407 --- /dev/null +++ b/pkg/controller/cluster_schema/slurm_schema/initcontainer_test.go @@ -0,0 +1,145 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +package slurm_schema + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "github.com/chriskery/kubecluster/pkg/common" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "strconv" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/utils/pointer" +) + +func TestInitContainer(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + config.slurmSchemaInitContainerImage = slurmSchemaInitContainerImageDefault + config.slurmSchemaInitContainerTemplateFile = slurmSchemaInitContainerTemplateFileDefault + config.slurmSchemaInitContainerMaxTries = slurmSchemaInitContainerMaxTriesDefault + + var replicaTypeWorker v1alpha1.ReplicaType = "Worker" + + testCases := []struct { + kubecluster *v1alpha1.KubeCluster + rtype v1alpha1.ReplicaType + index string + expected int + exepctedErr error + }{ + { + kubecluster: &v1alpha1.KubeCluster{ + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + SchemaReplicaTypeController: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + rtype: SchemaReplicaTypeController, + index: "0", + expected: 1, + exepctedErr: nil, + }, + { + kubecluster: &v1alpha1.KubeCluster{ + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + replicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + SchemaReplicaTypeController: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + rtype: replicaTypeWorker, + index: "0", + expected: 1, + exepctedErr: nil, + }, + { + kubecluster: &v1alpha1.KubeCluster{ + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + replicaTypeWorker: { + Replicas: pointer.Int32(1), + }, + SchemaReplicaTypeController: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + rtype: SchemaReplicaTypeController, + index: "0", + expected: 1, + exepctedErr: nil, + }, + } + + for _, t := range testCases { + podTemplateSpec := t.kubecluster.Spec.ClusterReplicaSpec[t.rtype].Template + err := setInitContainer(t.kubecluster, (*corev1.PodTemplateSpec)(&podTemplateSpec), t.rtype) + if t.exepctedErr == nil { + gomega.Expect(err).To(gomega.BeNil()) + } else { + gomega.Expect(err).To(gomega.Equal(t.exepctedErr)) + } + gomega.Expect(len(podTemplateSpec.Spec.InitContainers) >= t.expected).To(gomega.BeTrue()) + } +} + +func TestGetInitContainer(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + g := getInitContainerGenerator() + testCases := []struct { + kubecluster *v1alpha1.KubeCluster + expected int + exepctedErr error + }{ + { + kubecluster: &v1alpha1.KubeCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + Spec: v1alpha1.ClusterSpec{ + ClusterReplicaSpec: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + SchemaReplicaTypeController: { + Replicas: pointer.Int32(1), + }, + }, + }, + }, + expected: 1, + exepctedErr: nil, + }, + } + + for _, t := range testCases { + controllerAddress := common.GenGeneralName(t.kubecluster.Name, SchemaReplicaTypeController, strconv.Itoa(0)) + initContainers, err := g.GetInitContainer(controllerAddress) + if t.exepctedErr == nil { + gomega.Expect(err).To(gomega.BeNil()) + } else { + gomega.Expect(err).To(gomega.Equal(t.exepctedErr)) + } + gomega.Expect(len(initContainers) >= t.expected).To(gomega.BeTrue()) + } +} diff --git a/pkg/controller/cluster_schema/slurm_schema/slurm_schema.go b/pkg/controller/cluster_schema/slurm_schema/slurm_schema.go index 37a6094..99ee87d 100644 --- a/pkg/controller/cluster_schema/slurm_schema/slurm_schema.go +++ b/pkg/controller/cluster_schema/slurm_schema/slurm_schema.go @@ -74,7 +74,7 @@ func (s *slurmClusterSchemaReconciler) UpdateClusterStatus( status := clusterStatus.ReplicaStatuses[rtype] // Generate the label selector. - //status.Selector = metav1.FormatLabelSelector(r.GenLabelSelector(pytorchjob.Name, rtype)) + //status.Selector = metav1.FormatLabelSelector(r.GenLabelSelector(pytorchcluster.Name, rtype)) running := status.Active failed := status.Failed @@ -91,8 +91,8 @@ func (s *slurmClusterSchemaReconciler) UpdateClusterStatus( } if failed > 0 { - // For the situation that jobStatus has a restarting condition, and append a running condition, - // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), + // For the situation that clusterStatus has a restarting condition, and append a running condition, + // the restarting condition will be removed from clusterStatus by filterOutCondition(), // so we need to record the existing restarting condition for later use. var existingRestartingCondition *kubeclusterorgv1alpha1.ClusterCondition for _, condition := range clusterStatus.Conditions { @@ -104,12 +104,12 @@ func (s *slurmClusterSchemaReconciler) UpdateClusterStatus( } } - // For the situation that jobStatus has a restarting condition, and appends a new running condition, - // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), - // so we need to append the restarting condition back to jobStatus. + // For the situation that clusterStatus has a restarting condition, and appends a new running condition, + // the restarting condition will be removed from clusterStatus by filterOutCondition(), + // so we need to append the restarting condition back to clusterStatus. if existingRestartingCondition != nil { util.UpdateClusterConditions(clusterStatus, kubeclusterorgv1alpha1.ClusterRestarting, corev1.ConditionTrue, existingRestartingCondition.Reason, existingRestartingCondition.Message) - // job is restarting, no need to set it failed + // cluster is restarting, no need to set it failed // we know it because we update the status condition when reconciling the replicas common.RestartedClustersCounterInc(kcluster.GetNamespace(), kcluster.Spec.ClusterType) } else { @@ -154,7 +154,7 @@ func (s *slurmClusterSchemaReconciler) SetClusterSpec( if err = setPodEnv(kcluster, podTemplate, s.GetDefaultContainerName(), rtype, index, slurmctlPort, slurmdPort); err != nil { return err } - if err = setInitContainer(kcluster, podTemplate, rtype, index, slurmctlPort, slurmdPort); err != nil { + if err = setInitContainer(kcluster, podTemplate, rtype); err != nil { return err } setVolumes(podTemplate, s.GetDefaultContainerName(), rtype, configMap.Name) diff --git a/pkg/controller/control/pod_control.go b/pkg/controller/control/pod_control.go index fc48771..8bc7a46 100644 --- a/pkg/controller/control/pod_control.go +++ b/pkg/controller/control/pod_control.go @@ -33,16 +33,16 @@ import ( // Reasons for pod events const ( - // FailedCreatePodReason is added in an event and in a job condition + // FailedCreatePodReason is added in an event and in a cluster condition // when a pod for a replica set is failed to be created. FailedCreatePodReason = "FailedCreatePod" - // SuccessfulCreatePodReason is added in an event when a pod for a job + // SuccessfulCreatePodReason is added in an event when a pod for a cluster // is successfully created. SuccessfulCreatePodReason = "SuccessfulCreatePod" - // FailedDeletePodReason is added in an event and in a job condition + // FailedDeletePodReason is added in an event and in a cluster condition // when a pod for a replica set is failed to be deleted. FailedDeletePodReason = "FailedDeletePod" - // SuccessfulDeletePodReason is added in an event when a pod for a job + // SuccessfulDeletePodReason is added in an event when a pod for a cluster // is successfully deleted. SuccessfulDeletePodReason = "SuccessfulDeletePod" ) diff --git a/pkg/controller/control/podgroup_control.go b/pkg/controller/control/podgroup_control.go index 7efca29..fd27451 100644 --- a/pkg/controller/control/podgroup_control.go +++ b/pkg/controller/control/podgroup_control.go @@ -48,7 +48,7 @@ type PodGroupControlInterface interface { DelayPodCreationDueToPodGroup(pg metav1.Object) bool // DecoratePodTemplateSpec decorates PodTemplateSpec. // If the PodTemplateSpec has SchedulerName set, this method will Not override. - DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, job metav1.Object, rtype string) + DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, cluster metav1.Object, rtype string) // GetSchedulerName returns the name of the gang scheduler. GetSchedulerName() string } @@ -62,14 +62,14 @@ func (v *VolcanoControl) GetSchedulerName() string { return "volcano" } -func (v *VolcanoControl) DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, job metav1.Object, rtype string) { +func (v *VolcanoControl) DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, cluster metav1.Object, rtype string) { if len(pts.Spec.SchedulerName) == 0 { pts.Spec.SchedulerName = v.GetSchedulerName() } if pts.Annotations == nil { pts.Annotations = make(map[string]string) } - pts.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = job.GetName() + pts.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = cluster.GetName() pts.Annotations[volcanobatchv1alpha1.TaskSpecKey] = rtype } @@ -128,7 +128,7 @@ type SchedulerPluginsControl struct { SchedulerName string } -func (s *SchedulerPluginsControl) DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, job metav1.Object, _ string) { +func (s *SchedulerPluginsControl) DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, cluster metav1.Object, _ string) { if len(pts.Spec.SchedulerName) == 0 { pts.Spec.SchedulerName = s.GetSchedulerName() } @@ -136,7 +136,7 @@ func (s *SchedulerPluginsControl) DecoratePodTemplateSpec(pts *corev1.PodTemplat if pts.Labels == nil { pts.Labels = make(map[string]string) } - pts.Labels[schedulerpluginsv1alpha1.PodGroupLabel] = job.GetName() + pts.Labels[schedulerpluginsv1alpha1.PodGroupLabel] = cluster.GetName() } func (s *SchedulerPluginsControl) GetSchedulerName() string { diff --git a/pkg/controller/control/service_control.go b/pkg/controller/control/service_control.go index e306ba7..56696c1 100644 --- a/pkg/controller/control/service_control.go +++ b/pkg/controller/control/service_control.go @@ -32,16 +32,16 @@ import ( ) const ( - // FailedCreateServiceReason is added in an event and in a job controller condition - // when a service for a job is failed to be created. + // FailedCreateServiceReason is added in an event and in a cluster controller condition + // when a service for a cluster is failed to be created. FailedCreateServiceReason = "FailedCreateService" - // SuccessfulCreateServiceReason is added in an event when a service for a job + // SuccessfulCreateServiceReason is added in an event when a service for a cluster // is successfully created. SuccessfulCreateServiceReason = "SuccessfulCreateService" - // FailedDeleteServiceReason is added in an event and in a job condition - // when a service for a job is failed to be deleted. + // FailedDeleteServiceReason is added in an event and in a cluster condition + // when a service for a cluster is failed to be deleted. FailedDeleteServiceReason = "FailedDeleteService" - // SuccessfulDeleteServiceReason is added in an event when a service for a job + // SuccessfulDeleteServiceReason is added in an event when a service for a cluster // is successfully deleted. SuccessfulDeleteServiceReason = "SuccessfulDeleteService" ) diff --git a/pkg/controller/ctrlcommon/configmap.go b/pkg/controller/ctrlcommon/configmap.go index e7ca0fb..6aad903 100644 --- a/pkg/controller/ctrlcommon/configmap.go +++ b/pkg/controller/ctrlcommon/configmap.go @@ -41,7 +41,7 @@ func (cc *ClusterController) ReconcileConfigMap(kcluster *v1alpha1.KubeCluster) if err = schemaReconciler.ReconcileConfigMap(kcluster, deepCopy); err != nil { return nil, err } - // No need to update the job status if the status hasn't changed since last time. + // No need to update the cluster status if the status hasn't changed since last time. if !reflect.DeepEqual(deepCopy.Data, configMap.Data) { err = cc.Controller.UpdateConfigMapInApiServer(kcluster, deepCopy) } @@ -51,7 +51,7 @@ func (cc *ClusterController) ReconcileConfigMap(kcluster *v1alpha1.KubeCluster) func (cc *ClusterController) CreateNewConfigMap(kcluster *v1alpha1.KubeCluster, expectations expectation.ControllerExpectationsInterface) (*v1.ConfigMap, error) { clusetrKey, err := KeyFunc(kcluster) if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", kcluster, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for cluster object %#v: %v", kcluster, err)) return nil, err } diff --git a/pkg/controller/ctrlcommon/controller.go b/pkg/controller/ctrlcommon/controller.go index 3dfc3c2..376300c 100644 --- a/pkg/controller/ctrlcommon/controller.go +++ b/pkg/controller/ctrlcommon/controller.go @@ -26,6 +26,7 @@ import ( "github.com/chriskery/kubecluster/pkg/core" "github.com/chriskery/kubecluster/pkg/util" "github.com/chriskery/kubecluster/pkg/util/k8sutil" + "github.com/chriskery/kubecluster/pkg/util/misc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" @@ -250,7 +251,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster clusterName := metaObject.GetName() clusterKind := cc.Controller.GetAPIGroupVersionKind().Kind oldStatus := kcluster.Status.DeepCopy() - if util.IsClusterSuspended(runPolicy) { + if misc.IsClusterSuspended(runPolicy) { if err = cc.CleanUpResources(runPolicy, runtimeObject, metaObject, kcluster.Status, pods); err != nil { return err } @@ -262,7 +263,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster if util.IsRunning(kcluster.Status) { util.UpdateClusterConditions(&kcluster.Status, v1alpha1.ClusterRunning, corev1.ConditionFalse, util.NewReason(clusterKind, util.ClusterSuspendedReason), msg) } - // We add the suspended condition to the job only when the job doesn't have a suspended condition. + // We add the suspended condition to the cluster only when the cluster doesn't have a suspended condition. if !util.IsSuspended(kcluster.Status) { util.UpdateClusterConditions(&kcluster.Status, v1alpha1.ClusterSuspended, corev1.ConditionTrue, util.NewReason(clusterKind, util.ClusterSuspendedReason), msg) } @@ -298,11 +299,11 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster prevReplicasFailedNum := k8sutil.GetTotalFailedReplicas(kcluster.Status.ReplicaStatuses) if runPolicy.BackoffLimit != nil { - jobHasNewFailure := failed > prevReplicasFailedNum + clusterHasNewFailure := failed > prevReplicasFailedNum // new failures happen when status does not reflect the failures and active // is different than parallelism, otherwise the previous controller loop // failed updating status so even if we pick up failure it is not a new one - exceedsBackoffLimit = jobHasNewFailure && (active != totalReplicas) && + exceedsBackoffLimit = clusterHasNewFailure && (active != totalReplicas) && (int32(previousRetry)+1 > *runPolicy.BackoffLimit) pastBackoffLimit, err = cc.PastBackoffLimit(clusterName, runPolicy, kcluster.Spec.ClusterReplicaSpec, pods) @@ -313,7 +314,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster if exceedsBackoffLimit || pastBackoffLimit { // check if the number of pod restart exceeds backoff (for restart OnFailure only) - // OR if the number of failed jobs increased since the last syncCluster + // OR if the number of failed clusters increased since the last syncCluster clusterExceedsLimit = true failureMessage = fmt.Sprintf("KubeCLuster %s has failed because it has reached the specified backoff limit", clusterName) } else if cc.PastActiveDeadline(runPolicy, kcluster.Status) { @@ -427,7 +428,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster now := metav1.Now() kcluster.Status.LastReconcileTime = &now - // Update job status here to trigger a new reconciliation + // Update cluster status here to trigger a new reconciliation return cc.Controller.UpdateClusterStatusInApiServer(metaObject, &kcluster.Status) } } @@ -458,7 +459,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster log.Warnf("UpdateClusterStatus error %v", err) return err } - // No need to update the job status if the status hasn't changed since last time. + // No need to update the cluster status if the status hasn't changed since last time. if !reflect.DeepEqual(*oldStatus, &kcluster.Status) { return cc.Controller.UpdateClusterStatusInApiServer(metaObject, &kcluster.Status) } @@ -469,7 +470,7 @@ func (cc *ClusterController) ReconcileKubeCluster(kcluster *v1alpha1.KubeCluster log.Warnf("UpdateClusterStatus error %v", err) return err } - // No need to update the job status if the status hasn't changed since last time. + // No need to update the cluster status if the status hasn't changed since last time. if !reflect.DeepEqual(configMapDeepCopy.Data, configMap.Data) { return cc.Controller.UpdateConfigMapInApiServer(metaObject, configMapDeepCopy) } @@ -480,16 +481,16 @@ func (cc *ClusterController) calcPGMinResources(minMember int32, replicas map[v1 return CalcPGMinResources(minMember, replicas, cc.PriorityClassLister.Get) } -// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. +// PastActiveDeadline checks if cluster has ActiveDeadlineSeconds field set and if it is exceeded. func (cc *ClusterController) PastActiveDeadline(runPolicy *v1alpha1.RunPolicy, clusterStatus v1alpha1.ClusterStatus) bool { return core.PastActiveDeadline(runPolicy, clusterStatus) } // PastBackoffLimit checks if container restartCounts sum exceeds BackoffLimit // this method applies only to pods when restartPolicy is one of OnFailure, Always or ExitCode -func (cc *ClusterController) PastBackoffLimit(jobName string, runPolicy *v1alpha1.RunPolicy, +func (cc *ClusterController) PastBackoffLimit(clusterName string, runPolicy *v1alpha1.RunPolicy, replicas map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec, pods []*corev1.Pod) (bool, error) { - return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, cc.FilterPodsForReplicaType) + return core.PastBackoffLimit(clusterName, runPolicy, replicas, pods, cc.FilterPodsForReplicaType) } // FilterPodsForReplicaType returns pods belong to a replicaType. @@ -497,6 +498,11 @@ func (cc *ClusterController) FilterPodsForReplicaType(pods []*corev1.Pod, replic return core.FilterPodsForReplicaType(pods, replicaType) } +// FilterServicesForReplicaType returns service belong to a replicaType. +func (cc *ClusterController) FilterServicesForReplicaType(services []*corev1.Service, replicaType string) ([]*corev1.Service, error) { + return core.FilterServicesForReplicaType(services, replicaType) +} + // recordAbnormalPods records the active pod whose latest condition is not in True status. func (cc *ClusterController) recordAbnormalPods(activePods []*corev1.Pod, object runtime.Object) { core.RecordAbnormalPods(activePods, object, cc.Recorder) @@ -565,7 +571,7 @@ func (cc *ClusterController) DeletePodAndServices(runtimeObject runtime.Object, return nil } - // Delete nothing when the cleanPodPolicy is None and the job has Succeeded or Failed condition. + // Delete nothing when the cleanPodPolicy is None and the cluster has Succeeded or Failed condition. if util.IsFinished(clusterStatus) && *runPolicy.CleanKubeNodePolicy == v1alpha1.CleanKubeNodePolicyNone { return nil } diff --git a/pkg/controller/ctrlcommon/pod_test.go b/pkg/controller/ctrlcommon/pod_test.go new file mode 100644 index 0000000..354cdcd --- /dev/null +++ b/pkg/controller/ctrlcommon/pod_test.go @@ -0,0 +1,227 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ctrlcommon + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "github.com/chriskery/kubecluster/pkg/core" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestSetRestartPolicy(t *testing.T) { + testCases := map[string]struct { + replicaSpec *v1alpha1.ReplicaSpec + expectedRestartPolicy v1.RestartPolicy + }{ + "restartPolicy is ExitCode": { + replicaSpec: &v1alpha1.ReplicaSpec{ + RestartPolicy: v1alpha1.RestartPolicyExitCode, + }, + expectedRestartPolicy: v1.RestartPolicyNever, + }, + "restartPolicy is Never": { + replicaSpec: &v1alpha1.ReplicaSpec{ + RestartPolicy: v1alpha1.RestartPolicyNever, + }, + expectedRestartPolicy: v1.RestartPolicyNever, + }, + "restartPolicy is Always": { + replicaSpec: &v1alpha1.ReplicaSpec{ + RestartPolicy: v1alpha1.RestartPolicyAlways, + }, + expectedRestartPolicy: v1.RestartPolicyAlways, + }, + "restartPolicy is OnFailure": { + replicaSpec: &v1alpha1.ReplicaSpec{ + RestartPolicy: v1alpha1.RestartPolicyOnFailure, + }, + expectedRestartPolicy: v1.RestartPolicyOnFailure, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + podTemplate := &tc.replicaSpec.Template + core.SetRestartPolicy((*v1.PodTemplateSpec)(podTemplate), tc.replicaSpec) + if podTemplate.Spec.RestartPolicy != tc.expectedRestartPolicy { + t.Errorf("Unexpected restartPolicy from SetRetartPolicy:\nwant:%v\ngot:%v\n", tc.expectedRestartPolicy, podTemplate.Spec.RestartPolicy) + } + }) + } +} + +func TestIsCustomSchedulerSet(t *testing.T) { + testCases := map[string]struct { + replicaSpecs map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec + gangSchedulerName string + want bool + }{ + "replicaSpecs aren't set custom schedulerName": { + replicaSpecs: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + v1alpha1.ReplicaType("A"): {}, + v1alpha1.ReplicaType("B"): {}, + }, + gangSchedulerName: "alpha", + want: false, + }, + "all replicaSpecs are set custom schedulerName": { + replicaSpecs: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + v1alpha1.ReplicaType("A"): { + Template: v1alpha1.ReplicaTemplate(v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + SchedulerName: "custom-a", + }, + }), + }, + v1alpha1.ReplicaType("B"): { + Template: v1alpha1.ReplicaTemplate(v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + SchedulerName: "custom-b", + }, + }), + }, + }, + gangSchedulerName: "beta", + want: true, + }, + "one of replicaSpecs is set custom schedulerName": { + replicaSpecs: map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec{ + v1alpha1.ReplicaType("A"): {}, + v1alpha1.ReplicaType("B"): { + Template: v1alpha1.ReplicaTemplate(v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + SchedulerName: "custom-b", + }, + }), + }, + }, + gangSchedulerName: "gamma", + want: true, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + got := isCustomSchedulerSet(tc.replicaSpecs, tc.gangSchedulerName) + if tc.want != got { + t.Errorf("Unexpected value from isCustomSchedulerSet:\nwant:%v\ngot:%v\n", tc.want, got) + } + }) + } +} + +func TestCalculatePodSliceSize(t *testing.T) { + type testCase struct { + pods []*v1.Pod + replicas int + expectedSize int + } + + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "0"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "2"}, + }, + }, + } + + var testCases = []testCase{ + { + pods: pods, + replicas: 3, + expectedSize: 3, + }, + { + pods: pods, + replicas: 4, + expectedSize: 4, + }, + { + pods: pods, + replicas: 2, + expectedSize: 3, + }, + { + pods: append(pods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "4"}, + }, + }), + replicas: 3, + expectedSize: 5, + }, + } + + for _, tc := range testCases { + result := core.CalculatePodSliceSize(tc.pods, tc.replicas) + assert.Equal(t, tc.expectedSize, result) + } +} + +func TestFilterPodsForReplicaType(t *testing.T) { + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "a", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "foo"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "b", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "bar"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "c", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "foo"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "d", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "bar"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "e", + Labels: map[string]string{ + v1alpha1.ReplicaTypeLabel: "foo", + }, + }, + }, + } + c := &ClusterController{} + got, err := c.FilterPodsForReplicaType(pods, "foo") + if err != nil { + t.Fatalf("FilterPodsForReplicaType returned error: %v", err) + } + want := []*v1.Pod{pods[0], pods[2], pods[4]} + assert.Equal(t, want, got) +} diff --git a/pkg/controller/ctrlcommon/scheduling.go b/pkg/controller/ctrlcommon/scheduling.go index daec9c8..31e963b 100644 --- a/pkg/controller/ctrlcommon/scheduling.go +++ b/pkg/controller/ctrlcommon/scheduling.go @@ -34,11 +34,11 @@ import ( type FillPodGroupSpecFunc func(object metav1.Object) error -func (cc *ClusterController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error) { +func (cc *ClusterController) SyncPodGroup(cluster metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error) { pgctl := cc.PodGroupControl // Check whether podGroup exists or not - podGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) + podGroup, err := pgctl.GetPodGroup(cluster.GetNamespace(), cluster.GetName()) if err == nil { // update podGroup for gang scheduling oldPodGroup := &podGroup @@ -54,10 +54,10 @@ func (cc *ClusterController) SyncPodGroup(job metav1.Object, specFunc FillPodGro } else { // create podGroup for gang scheduling newPodGroup := pgctl.NewEmptyPodGroup() - newPodGroup.SetName(job.GetName()) - newPodGroup.SetNamespace(job.GetNamespace()) - newPodGroup.SetAnnotations(job.GetAnnotations()) - newPodGroup.SetOwnerReferences([]metav1.OwnerReference{*cc.GenOwnerReference(job)}) + newPodGroup.SetName(cluster.GetName()) + newPodGroup.SetNamespace(cluster.GetNamespace()) + newPodGroup.SetAnnotations(cluster.GetAnnotations()) + newPodGroup.SetOwnerReferences([]metav1.OwnerReference{*cc.GenOwnerReference(cluster)}) if err = specFunc(newPodGroup); err != nil { return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(newPodGroup), err) } @@ -69,7 +69,7 @@ func (cc *ClusterController) SyncPodGroup(job metav1.Object, specFunc FillPodGro createdPodGroupsCount.Inc() } - createdPodGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) + createdPodGroup, err := pgctl.GetPodGroup(cluster.GetNamespace(), cluster.GetName()) if err != nil { return nil, fmt.Errorf("unable to get PodGroup after success creation: %v", err) } @@ -114,19 +114,19 @@ func (cc *ClusterController) SyncPdb(kcluster metav1.Object, minAvailableReplica return createdPdb, nil } -func (cc *ClusterController) DeletePodGroup(job metav1.Object) error { +func (cc *ClusterController) DeletePodGroup(cluster metav1.Object) error { pgctl := cc.PodGroupControl // Check whether podGroup exists or not - _, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) + _, err := pgctl.GetPodGroup(cluster.GetNamespace(), cluster.GetName()) if err != nil && k8serrors.IsNotFound(err) { return nil } - log.Infof("Deleting PodGroup %s", job.GetName()) + log.Infof("Deleting PodGroup %s", cluster.GetName()) // Delete podGroup - err = pgctl.DeletePodGroup(job.GetNamespace(), job.GetName()) + err = pgctl.DeletePodGroup(cluster.GetNamespace(), cluster.GetName()) if err != nil { return fmt.Errorf("unable to delete PodGroup: %v", err) } @@ -134,17 +134,17 @@ func (cc *ClusterController) DeletePodGroup(job metav1.Object) error { return nil } -func (cc *ClusterController) DeletePdb(job metav1.Object) error { +func (cc *ClusterController) DeletePdb(cluster metav1.Object) error { // Check whether pdb exists or not - _, err := cc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Get(context.TODO(), job.GetName(), metav1.GetOptions{}) + _, err := cc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(cluster.GetNamespace()).Get(context.TODO(), cluster.GetName(), metav1.GetOptions{}) if err != nil && k8serrors.IsNotFound(err) { return nil } - msg := fmt.Sprintf("Deleting pdb %s", job.GetName()) + msg := fmt.Sprintf("Deleting pdb %s", cluster.GetName()) log.Info(msg) - if err = cc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Delete(context.TODO(), job.GetName(), metav1.DeleteOptions{}); err != nil { + if err = cc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(cluster.GetNamespace()).Delete(context.TODO(), cluster.GetName(), metav1.DeleteOptions{}); err != nil { return fmt.Errorf("unable to delete pdb: %v", err) } deletedPDBCount.Inc() diff --git a/pkg/controller/ctrlcommon/service.go b/pkg/controller/ctrlcommon/service.go index 684e595..49d2006 100644 --- a/pkg/controller/ctrlcommon/service.go +++ b/pkg/controller/ctrlcommon/service.go @@ -37,7 +37,7 @@ func (cc *ClusterController) CreateNewService( expectations expectation.ControllerExpectationsInterface) error { clusetrKey, err := KeyFunc(kcluster) if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", kcluster, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for cluster object %#v: %v", kcluster, err)) return err } @@ -181,7 +181,7 @@ func (cc *ClusterController) ReconcilePods( // Deletion is expected schemaReconciler.RaiseExpectations(expectationPodsKey, 0, 1) - msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.", + msg := fmt.Sprintf("cluster %s is restarting because %s replica(s) failed.", kcluster.GetName(), rType) cc.Recorder.Event(kcluster, corev1.EventTypeWarning, util.NewReason(clusterKind, util.ClusterRestartingReason), msg) util.UpdateClusterConditions(&kcluster.Status, kubeclusterorgv1alpha1.ClusterRestarting, @@ -205,7 +205,7 @@ func (cc *ClusterController) ReconcileServices( // Convert ReplicaType to lower string. replicas := int(*spec.Replicas) // Get all services for the type rt. - services, err := cc.Controller.FilterServicesForReplicaType(services, utillabels.GenReplicaTypeLabel(rtype)) + services, err := cc.FilterServicesForReplicaType(services, utillabels.GenReplicaTypeLabel(rtype)) if err != nil { return err } diff --git a/pkg/controller/ctrlcommon/service_test.go b/pkg/controller/ctrlcommon/service_test.go new file mode 100644 index 0000000..85e3c09 --- /dev/null +++ b/pkg/controller/ctrlcommon/service_test.go @@ -0,0 +1,114 @@ +package ctrlcommon + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "github.com/chriskery/kubecluster/pkg/core" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestCalculateServiceSliceSize(t *testing.T) { + type testCase struct { + services []*corev1.Service + replicas int + expectedSize int + } + + services := []*corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "0"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "2"}, + }, + }, + } + + var testCases = []testCase{ + { + services: services, + replicas: 3, + expectedSize: 3, + }, + { + services: services, + replicas: 4, + expectedSize: 4, + }, + { + services: services, + replicas: 2, + expectedSize: 3, + }, + { + services: append(services, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{v1alpha1.ReplicaIndexLabel: "4"}, + }, + }), + replicas: 3, + expectedSize: 5, + }, + } + + for _, tc := range testCases { + result := core.CalculateServiceSliceSize(tc.services, tc.replicas) + assert.Equal(t, tc.expectedSize, result) + } +} + +func TestFilterServicesForReplicaType(t *testing.T) { + services := []*v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "a", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "foo"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "b", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "bar"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "c", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "foo"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "d", + Labels: map[string]string{v1alpha1.ReplicaTypeLabel: "bar"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "e", + Labels: map[string]string{ + v1alpha1.ReplicaTypeLabel: "foo", + }, + }, + }, + } + c := &ClusterController{} + got, err := c.FilterServicesForReplicaType(services, "foo") + if err != nil { + t.Fatalf("FilterPodsForReplicaType returned error: %v", err) + } + want := []*v1.Service{services[0], services[2], services[4]} + assert.Equal(t, want, got) +} diff --git a/pkg/controller/ctrlcommon/util.go b/pkg/controller/ctrlcommon/util.go index efdc8bd..b35ec77 100644 --- a/pkg/controller/ctrlcommon/util.go +++ b/pkg/controller/ctrlcommon/util.go @@ -15,12 +15,10 @@ package ctrlcommon import ( - "fmt" "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sort" ) @@ -45,30 +43,6 @@ func (p ReplicasPriority) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -// RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion. -// -// The CanAdopt() function calls getObject() to fetch the latest value, -// and denies adoption attempts if that object has a non-nil DeletionTimestamp. -func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error { - return func() error { - obj, err := getObject() - if err != nil { - return fmt.Errorf("can't recheck DeletionTimestamp: %v", err) - } - if obj.GetDeletionTimestamp() != nil { - return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp()) - } - return nil - } -} - -func MaxInt(x, y int) int { - if x < y { - return y - } - return x -} - func AddResourceList(list, req, limit v1.ResourceList) { for name, quantity := range req { diff --git a/pkg/controller/expectation/util.go b/pkg/controller/expectation/util.go index 98e7e32..a38e91d 100644 --- a/pkg/controller/expectation/util.go +++ b/pkg/controller/expectation/util.go @@ -4,22 +4,22 @@ import ( "strings" ) -// GenExpectationPodsKey generates an expectation key for pods of a job +// GenExpectationPodsKey generates an expectation key for pods of a cluster func GenExpectationPodsKey(clusterKey string, replicaType string) string { return clusterKey + "/" + strings.ToLower(replicaType) + "/pods" } -// GenExpectationServicesKey generates an expectation key for services of a job +// GenExpectationServicesKey generates an expectation key for services of a cluster func GenExpectationServicesKey(clusterKey string, replicaType string) string { return clusterKey + "/" + strings.ToLower(replicaType) + "/services" } -// GenExpectationConfigMapKey generates an expectation key for services of a job +// GenExpectationConfigMapKey generates an expectation key for services of a cluster func GenExpectationConfigMapKey(clusterKey string) string { return clusterKey + "/configmap" } -// GenPreSatisfiedKey generates an expectation key for services of a job +// GenPreSatisfiedKey generates an expectation key for services of a cluster func GenPreSatisfiedKey(clusterKey string) string { return clusterKey + "/presatisfied" } diff --git a/pkg/controller/suite_test.go b/pkg/controller/suite_test.go index f8f57cd..5ea9c39 100644 --- a/pkg/controller/suite_test.go +++ b/pkg/controller/suite_test.go @@ -53,7 +53,7 @@ var _ = BeforeSuite(func() { By("bootstrapping test environment") testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "manifests", "crd", "bases")}, ErrorIfCRDPathMissing: true, // The BinaryAssetsDirectory is only required if you want to run the tests directly diff --git a/pkg/core/cluster.go b/pkg/core/cluster.go index 9439357..a6842b7 100644 --- a/pkg/core/cluster.go +++ b/pkg/core/cluster.go @@ -78,7 +78,7 @@ func RecordAbnormalPods(activePods []*v1.Pod, object runtime.Object, recorder re } } -// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. +// PastActiveDeadline checks if cluster has ActiveDeadlineSeconds field set and if it is exceeded. func PastActiveDeadline(runPolicy *v1alpha1.RunPolicy, clusterStatus v1alpha1.ClusterStatus) bool { if runPolicy.ActiveDeadlineSeconds == nil || clusterStatus.StartTime == nil { return false @@ -92,7 +92,7 @@ func PastActiveDeadline(runPolicy *v1alpha1.RunPolicy, clusterStatus v1alpha1.Cl // PastBackoffLimit checks if container restartCounts sum exceeds BackoffLimit // this method applies only to pods when restartPolicy is one of OnFailure, Always or ExitCode -func PastBackoffLimit(jobName string, runPolicy *v1alpha1.RunPolicy, +func PastBackoffLimit(clusterName string, runPolicy *v1alpha1.RunPolicy, replicas map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec, pods []*v1.Pod, podFilterFunc func(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)) (bool, error) { if runPolicy.BackoffLimit == nil { @@ -103,7 +103,7 @@ func PastBackoffLimit(jobName string, runPolicy *v1alpha1.RunPolicy, if spec.RestartPolicy != v1alpha1.RestartPolicyOnFailure && spec.RestartPolicy != v1alpha1.RestartPolicyAlways && spec.RestartPolicy != v1alpha1.RestartPolicyExitCode { - log.Warnf("The restart policy of replica %v of the job %v is not OnFailure, Always or ExitCode. Not counted in backoff limit.", rtype, jobName) + log.Warnf("The restart policy of replica %v of the cluster %v is not OnFailure, Always or ExitCode. Not counted in backoff limit.", rtype, clusterName) continue } // Convert ReplicaType to lower string. diff --git a/pkg/core/pod.go b/pkg/core/pod.go index d9f500e..711d9f6 100644 --- a/pkg/core/pod.go +++ b/pkg/core/pod.go @@ -76,7 +76,7 @@ func CalculatePodSliceSize(pods []*v1.Pod, replicas int) int { return MaxInt(size+1, replicas) } -// SetRestartPolicy check the RestartPolicy defined in job spec and overwrite RestartPolicy in podTemplate if necessary +// SetRestartPolicy check the RestartPolicy defined in cluster spec and overwrite RestartPolicy in podTemplate if necessary func SetRestartPolicy(podTemplateSpec *v1.PodTemplateSpec, spec *kubeclusterorgv1alpha1.ReplicaSpec) { // This is necessary since restartPolicyExitCode is not supported in v1.PodTemplateSpec if spec.RestartPolicy == kubeclusterorgv1alpha1.RestartPolicyExitCode { diff --git a/pkg/core/service.go b/pkg/core/service.go index 8a67811..317c6b5 100644 --- a/pkg/core/service.go +++ b/pkg/core/service.go @@ -79,7 +79,7 @@ func CalculateServiceSliceSize(services []*v1.Service, replicas int) int { return MaxInt(size+1, replicas) } -// GetPortsFromCluster gets the ports of job container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed. +// GetPortsFromCluster gets the ports of cluster container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed. func GetPortsFromCluster(spec *kubeclusterorgv1alpha1.ReplicaSpec, defaultContainerName string) (map[string]int32, error) { ports := make(map[string]int32) diff --git a/pkg/core/status.go b/pkg/core/status.go index 117ade1..e1e3b9f 100644 --- a/pkg/core/status.go +++ b/pkg/core/status.go @@ -22,27 +22,27 @@ import ( ) // InitializeReplicaStatuses initializes the ReplicaStatuses for replica. -func InitializeReplicaStatuses(jobStatus *kubeclusterorgv1alpha1.ClusterStatus, rtype kubeclusterorgv1alpha1.ReplicaType) { - if jobStatus.ReplicaStatuses == nil { - jobStatus.ReplicaStatuses = make(map[kubeclusterorgv1alpha1.ReplicaType]*kubeclusterorgv1alpha1.ReplicaStatus) +func InitializeReplicaStatuses(clusterStatus *kubeclusterorgv1alpha1.ClusterStatus, rtype kubeclusterorgv1alpha1.ReplicaType) { + if clusterStatus.ReplicaStatuses == nil { + clusterStatus.ReplicaStatuses = make(map[kubeclusterorgv1alpha1.ReplicaType]*kubeclusterorgv1alpha1.ReplicaStatus) } - jobStatus.ReplicaStatuses[rtype] = &kubeclusterorgv1alpha1.ReplicaStatus{} + clusterStatus.ReplicaStatuses[rtype] = &kubeclusterorgv1alpha1.ReplicaStatus{} } // UpdateClusterReplicaStatuses updates the ClusterReplicaStatuses according to the pod. -func UpdateClusterReplicaStatuses(jobStatus *kubeclusterorgv1alpha1.ClusterStatus, rtype kubeclusterorgv1alpha1.ReplicaType, pod *corev1.Pod) { +func UpdateClusterReplicaStatuses(clusterStatus *kubeclusterorgv1alpha1.ClusterStatus, rtype kubeclusterorgv1alpha1.ReplicaType, pod *corev1.Pod) { switch pod.Status.Phase { case corev1.PodRunning: if pod.DeletionTimestamp != nil { // when node is not ready, the pod will be in terminating state. // Count deleted Pods as failures to account for orphan Pods that // never have a chance to reach the Failed phase. - jobStatus.ReplicaStatuses[rtype].Failed++ + clusterStatus.ReplicaStatuses[rtype].Failed++ } else { - jobStatus.ReplicaStatuses[rtype].Active++ + clusterStatus.ReplicaStatuses[rtype].Active++ } case corev1.PodFailed: - jobStatus.ReplicaStatuses[rtype].Failed++ + clusterStatus.ReplicaStatuses[rtype].Failed++ } } diff --git a/pkg/core/utils.go b/pkg/core/utils.go index e969c1d..af34f40 100644 --- a/pkg/core/utils.go +++ b/pkg/core/utils.go @@ -27,7 +27,7 @@ func MaxInt(x, y int) int { return x } -func GenGeneralName(jobName string, rtype string, index string) string { - n := jobName + "-" + strings.ToLower(rtype) + "-" + index +func GenGeneralName(clusterName string, rtype string, index string) string { + n := clusterName + "-" + strings.ToLower(rtype) + "-" + index return strings.Replace(n, "/", "-", -1) } diff --git a/pkg/util/cluster.go b/pkg/util/cluster.go deleted file mode 100644 index c2c448b..0000000 --- a/pkg/util/cluster.go +++ /dev/null @@ -1,10 +0,0 @@ -package util - -import ( - kubeclusterorgv1alpha1 "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" - "k8s.io/utils/pointer" -) - -func IsClusterSuspended(runPolicy *kubeclusterorgv1alpha1.RunPolicy) bool { - return runPolicy != nil && pointer.BoolDeref(runPolicy.Suspend, false) -} diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 17ccede..afb52f1 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -124,16 +124,16 @@ func FilterPodCount(pods []*v1.Pod, phase v1.PodPhase) int32 { } func GetTotalReplicas(replicas map[v1alpha1.ReplicaType]*v1alpha1.ReplicaSpec) int32 { - jobReplicas := int32(0) + clusterReplicas := int32(0) for _, r := range replicas { if r.Replicas != nil { - jobReplicas += *r.Replicas + clusterReplicas += *r.Replicas } else { // If unspecified, defaults to 1. - jobReplicas += 1 + clusterReplicas += 1 } } - return jobReplicas + return clusterReplicas } func GetTotalFailedReplicas(replicas map[v1alpha1.ReplicaType]*v1alpha1.ReplicaStatus) int32 { diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go index de519bd..e9dc739 100644 --- a/pkg/util/labels/labels.go +++ b/pkg/util/labels/labels.go @@ -41,23 +41,10 @@ func GenReplicaTypeLabel(rtype v1alpha1.ReplicaType) string { return strings.ToLower(string(rtype)) } -func ReplicaType(labels map[string]string) (v1alpha1.ReplicaType, error) { - v, ok := labels[v1alpha1.ReplicaTypeLabel] - if !ok { - return "", errors.New("replica type label not found") - } - return v1alpha1.ReplicaType(v), nil -} - func SetReplicaType(labels map[string]string, rt string) { labels[v1alpha1.ReplicaTypeLabel] = rt } -func HasKnownLabels(labels map[string]string, groupName string) bool { - _, has := labels[v1alpha1.ControllerNameLabel] - return has -} - func SetClusterRole(labels map[string]string, role string) { labels[v1alpha1.ClusterRoleLabel] = role } diff --git a/pkg/util/logger.go b/pkg/util/logger.go index f691915..49e0b33 100644 --- a/pkg/util/logger.go +++ b/pkg/util/logger.go @@ -24,52 +24,52 @@ import ( metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) -func LoggerForReplica(job metav1.Object, rtype kubeclusterorgv1alpha1.ReplicaType) *log.Entry { +func LoggerForReplica(cluster metav1.Object, rtype kubeclusterorgv1alpha1.ReplicaType) *log.Entry { return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // Its more common in K8s to use a period to indicate namespace.name. So that's what we use. - "job": job.GetNamespace() + "." + job.GetName(), - "uid": job.GetUID(), + "cluster": cluster.GetNamespace() + "." + cluster.GetName(), + "uid": cluster.GetUID(), "replica-type": rtype, }) } -func LoggerForCluster(job metav1.Object) *log.Entry { +func LoggerForCluster(cluster metav1.Object) *log.Entry { return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // Its more common in K8s to use a period to indicate namespace.name. So that's what we use. - "job": job.GetNamespace() + "." + job.GetName(), - "uid": job.GetUID(), + "cluster": cluster.GetNamespace() + "." + cluster.GetName(), + "uid": cluster.GetUID(), }) } func LoggerForPod(pod *v1.Pod, kind string) *log.Entry { - job := "" + cluster := "" if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil { if controllerRef.Kind == kind { - job = pod.Namespace + "." + controllerRef.Name + cluster = pod.Namespace + "." + controllerRef.Name } } return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // In controller.go we log the key used with the workqueue. - "job": job, - "pod": pod.Namespace + "." + pod.Name, - "uid": pod.ObjectMeta.UID, + "cluster": cluster, + "pod": pod.Namespace + "." + pod.Name, + "uid": pod.ObjectMeta.UID, }) } func LoggerForService(svc *v1.Service, kind string) *log.Entry { - job := "" + cluster := "" if controllerRef := metav1.GetControllerOf(svc); controllerRef != nil { if controllerRef.Kind == kind { - job = svc.Namespace + "." + controllerRef.Name + cluster = svc.Namespace + "." + controllerRef.Name } } return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // In controller.go we log the key used with the workqueue. - "job": job, + "cluster": cluster, "service": svc.Namespace + "." + svc.Name, "uid": svc.ObjectMeta.UID, }) @@ -83,7 +83,7 @@ func LoggerForConfigMap(cm *v1.ConfigMap, kind string) *log.Entry { } } return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // In controller.go we log the key used with the workqueue. "kcluster": kcluster, "configMap": cm.Namespace + "." + cm.Name, @@ -95,19 +95,19 @@ func LoggerForKey(key string) *log.Entry { return log.WithFields(log.Fields{ // The key used by the workQueue should be namespace + "/" + name. // Its more common in K8s to use a period to indicate namespace.name. So that's what we use. - "job": strings.Replace(key, "/", ".", -1), + "cluster": strings.Replace(key, "/", ".", -1), }) } func LoggerForUnstructured(obj *metav1unstructured.Unstructured, kind string) *log.Entry { - job := "" + cluster := "" if obj.GetKind() == kind { - job = obj.GetNamespace() + "." + obj.GetName() + cluster = obj.GetNamespace() + "." + obj.GetName() } return log.WithFields(log.Fields{ - // We use job to match the key used in controller.go + // We use cluster to match the key used in controller.go // In controller.go we log the key used with the workqueue. - "job": job, - "uid": obj.GetUID(), + "cluster": cluster, + "uid": obj.GetUID(), }) } diff --git a/pkg/util/status.go b/pkg/util/status.go index 9ea1ed4..c1fc5d8 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -28,7 +28,7 @@ func NewReason(kind, reason string) string { return fmt.Sprintf("%s%s", kind, reason) } -// IsFinished checks if the job is succeeded or failed +// IsFinished checks if the cluster is succeeded or failed func IsFinished(status v1alpha1.ClusterStatus) bool { return IsFailed(status) } diff --git a/pkg/util/status_test.go b/pkg/util/status_test.go new file mode 100644 index 0000000..889d590 --- /dev/null +++ b/pkg/util/status_test.go @@ -0,0 +1,152 @@ +package util + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" +) + +func TestIsFinished(t *testing.T) { + cases := map[string]struct { + ClusterStatus v1alpha1.ClusterStatus + want bool + }{ + "Running Cluster": { + ClusterStatus: v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + { + Type: v1alpha1.ClusterRunning, + Status: corev1.ConditionTrue, + }, + }, + }, + want: false, + }, + "Failed Cluster": { + ClusterStatus: v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + { + Type: v1alpha1.ClusterFailed, + Status: corev1.ConditionTrue, + }, + }, + }, + want: true, + }, + "Suspended Cluster": { + ClusterStatus: v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + { + Type: v1alpha1.ClusterSuspended, + Status: corev1.ConditionTrue, + }, + }, + }, + want: false, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := IsFinished(tc.ClusterStatus) + if tc.want != got { + t.Errorf("Unexpected result from IsFinished() \nwant: %v, got: %v\n", tc.want, got) + } + }) + } +} + +func TestIsFailed(t *testing.T) { + ClusterStatus := v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + { + Type: v1alpha1.ClusterFailed, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsFailed(ClusterStatus)) +} + +func TestIsRunning(t *testing.T) { + ClusterStatus := v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + { + Type: v1alpha1.ClusterRunning, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsRunning(ClusterStatus)) +} + +func TestIsSuspended(t *testing.T) { + ClusterStatus := v1alpha1.ClusterStatus{ + Conditions: []v1alpha1.ClusterCondition{ + { + Type: v1alpha1.ClusterSuspended, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsSuspended(ClusterStatus)) +} + +func TestUpdateClusterConditions(t *testing.T) { + ClusterStatus := v1alpha1.ClusterStatus{} + conditionType := v1alpha1.ClusterCreated + reason := "Cluster Created" + message := "Cluster Created" + + UpdateClusterConditions(&ClusterStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check ClusterCreated condition is appended + conditionInStatus := ClusterStatus.Conditions[0] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) + + conditionType = v1alpha1.ClusterRunning + reason = "Cluster Running" + message = "Cluster Running" + UpdateClusterConditions(&ClusterStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check ClusterRunning condition is appended + conditionInStatus = ClusterStatus.Conditions[1] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) + + conditionType = v1alpha1.ClusterRestarting + reason = "Cluster Restarting" + message = "Cluster Restarting" + UpdateClusterConditions(&ClusterStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check ClusterRunning condition is filtered out and ClusterRestarting state is appended + conditionInStatus = ClusterStatus.Conditions[1] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) + + conditionType = v1alpha1.ClusterRunning + reason = "Cluster Running" + message = "Cluster Running" + UpdateClusterConditions(&ClusterStatus, conditionType, corev1.ConditionTrue, reason, message) + // Again, Check ClusterRestarting condition is filtered and ClusterRestarting is appended + conditionInStatus = ClusterStatus.Conditions[1] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) + + conditionType = v1alpha1.ClusterFailed + reason = "Cluster Failed" + message = "Cluster Failed" + UpdateClusterConditions(&ClusterStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check ClusterRunning condition is set to false + ClusterRunningCondition := ClusterStatus.Conditions[1] + assert.Equal(t, ClusterRunningCondition.Type, v1alpha1.ClusterRunning) + assert.Equal(t, ClusterRunningCondition.Status, corev1.ConditionFalse) + // Check ClusterFailed state is appended + conditionInStatus = ClusterStatus.Conditions[2] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) +} diff --git a/pkg/util/testutil/constants.go b/pkg/util/testutil/constants.go new file mode 100644 index 0000000..39616b5 --- /dev/null +++ b/pkg/util/testutil/constants.go @@ -0,0 +1,18 @@ +package testutil + +import ( + "github.com/chriskery/kubecluster/apis/kubecluster.org/v1alpha1" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" +) + +const ( + Timeout = 30 * time.Second + Interval = 250 * time.Millisecond + ConsistentDuration = 3 * time.Second +) + +var ( + IgnoreClusterConditionsTimes = cmpopts.IgnoreFields(v1alpha1.ClusterCondition{}, "LastUpdateTime", "LastTransitionTime") +)