Skip to content

Commit

Permalink
add unittest;
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskery committed Oct 7, 2023
1 parent 7f02baa commit ae76eae
Show file tree
Hide file tree
Showing 52 changed files with 1,817 additions and 279 deletions.
6 changes: 0 additions & 6 deletions apis/kubecluster.org/v1alpha1/cluster_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package v1alpha1

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
)

func addKubeClusterDefaultingFuncs(scheme *runtime.Scheme) error {
Expand All @@ -39,11 +38,6 @@ func SetDefaults_KubeCluster(kcluster *KubeCluster) {
}
}

func setDefaultReplicas(replicaSpec *ReplicaSpec, replicas int32) {
if replicaSpec != nil && replicaSpec.Replicas == nil {
replicaSpec.Replicas = pointer.Int32(replicas)
}
}
func SetDefaults_KubeClusterList(in *KubeClusterList) {
for i := range in.Items {
a := &in.Items[i]
Expand Down
70 changes: 69 additions & 1 deletion apis/kubecluster.org/v1alpha1/cluster_defaults_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,69 @@
package v1alpha1_test
package v1alpha1

import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

"testing"
)

func TestSetDefaults_KubeCluster(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)

t.Run("test empty kube cluster", func(t *testing.T) {
var replica2 int32 = 2
kclusetr := &KubeCluster{Spec: ClusterSpec{
ClusterType: "",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{
"replica1": {}, "replica2": {Replicas: &replica2}},
MainContainer: "",
RunPolicy: RunPolicy{},
}}
SetDefaults_KubeCluster(kclusetr)

gomega.Expect(kclusetr.Spec.MainContainer).To(gomega.Equal(ClusterDefaultContainerName))
gomega.Expect(*kclusetr.Spec.RunPolicy.CleanKubeNodePolicy).To(gomega.Equal(CleanKubeNodePolicyAll))

gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica1"].Replicas).To(gomega.Equal(int32(1)))
gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica2"].Replicas).To(gomega.Equal(int32(2)))
})
}

func TestSetDefaults_KubeClusterList(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)

var replica2 int32 = 2
testCount := 10
kclusterList := &KubeClusterList{Items: make([]KubeCluster, 0)}
for i := 0; i < testCount; i++ {
kclusetr := KubeCluster{Spec: ClusterSpec{
ClusterType: "",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{
"replica1": {}, "replica2": {Replicas: &replica2}},
MainContainer: "",
RunPolicy: RunPolicy{},
}}
kclusterList.Items = append(kclusterList.Items, kclusetr)
}
SetDefaults_KubeClusterList(kclusterList)
for i := 0; i < testCount; i++ {
t.Run("test empty kube cluster", func(t *testing.T) {
kclusetr := kclusterList.Items[i]
gomega.Expect(kclusetr.Spec.MainContainer).To(gomega.Equal(ClusterDefaultContainerName))
gomega.Expect(*kclusetr.Spec.RunPolicy.CleanKubeNodePolicy).To(gomega.Equal(CleanKubeNodePolicyAll))

gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica1"].Replicas).To(gomega.Equal(int32(1)))
gomega.Expect(*kclusetr.Spec.ClusterReplicaSpec["replica2"].Replicas).To(gomega.Equal(int32(2)))
})
}
}

func Test_setDefaultReplicas(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)

t.Run("test replicaSpec not set replica", func(t *testing.T) {
replicaSpec := &ReplicaSpec{}
setDefaultReplicas(replicaSpec, 1)
gomega.Expect(*replicaSpec.Replicas).To(gomega.Equal(int32(1)))
})
}
4 changes: 2 additions & 2 deletions apis/kubecluster.org/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type ClusterSpec struct {
MainContainer string `json:"mainContainer,omitempty"`

// `RunPolicy` encapsulates various runtime policies of the distributed training
// job, for example how to clean up resources and how long the job can stay
// cluster, for example how to clean up resources and how long the cluster can stay
// active.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`
}
Expand All @@ -82,7 +82,7 @@ type ClusterStatus struct {
// It is represented in RFC3339 form and is in UTC.
LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"`

// Represents time when the job was completed. It is not guaranteed to
// Represents time when the cluster was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
Expand Down
6 changes: 3 additions & 3 deletions apis/kubecluster.org/v1alpha1/cluster_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

func ValidateV1alphaCluster(cluster *KubeCluster) error {
if errors := apimachineryvalidation.NameIsDNS1035Label(cluster.ObjectMeta.Name, false); errors != nil {
return fmt.Errorf("TFCluster name is invalid: %v", errors)
if errs := apimachineryvalidation.NameIsDNS1035Label(cluster.ObjectMeta.Name, false); errs != nil {
return fmt.Errorf("TFCluster name is invalid: %v", errs)
}
if err := validateV1alphaClusterSpecs(cluster.Spec); err != nil {
return err
Expand All @@ -22,7 +22,7 @@ func validateV1alphaClusterSpecs(spec ClusterSpec) error {
if len(spec.ClusterType) == 0 {
return fmt.Errorf("KubeCluster is not valid: cluster type expected")
}
if spec.ClusterReplicaSpec == nil {
if len(spec.ClusterReplicaSpec) == 0 {
return fmt.Errorf("KubeCluster is not valid")
}
defaultContainerName := ClusterDefaultContainerName
Expand Down
175 changes: 175 additions & 0 deletions apis/kubecluster.org/v1alpha1/cluster_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"testing"
)

func TestValidateV1alphaCluster(t *testing.T) {
validClusterSpec := ClusterSpec{
ClusterType: "test",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": {
Replicas: pointer.Int32(1),
Template: ReplicaTemplate{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: ClusterDefaultContainerName,
Image: "centos",
}},
},
},
}},
}
type args struct {
cluster *KubeCluster
}
tests := []struct {
name string
args args
wantErr bool
}{
{
"valid kubeCluster",
args{
&KubeCluster{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Spec: validClusterSpec,
},
},
false,
},
{
"kubeCluster name does not meet DNS1035",
args{
&KubeCluster{
ObjectMeta: metav1.ObjectMeta{Name: "0-test"},
Spec: validClusterSpec,
},
},
true,
},
{
"cluster type is empty",
args{
&KubeCluster{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
Spec: ClusterSpec{ClusterType: ""},
},
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := ValidateV1alphaCluster(tt.args.cluster); (err != nil) != tt.wantErr {
t.Errorf("ValidateV1alphaCluster() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
Test_validateV1alphaClusterSpecs(t)
}

func Test_validateV1alphaClusterSpecs(t *testing.T) {
type args struct {
spec ClusterSpec
}
tests := []struct {
name string
args args
wantErr bool
}{
{
"replica specs are empty",
args{
ClusterSpec{
ClusterType: "test",
ClusterReplicaSpec: make(map[ReplicaType]*ReplicaSpec),
},
},
true,
},
{
"no containers",
args{
ClusterSpec{
ClusterType: "test",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": {
Replicas: pointer.Int32(1),
Template: ReplicaTemplate{
Spec: corev1.PodSpec{},
},
}},
},
},
true,
},
{
"main container name doesn't present",
args{
ClusterSpec{
ClusterType: "test",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": {
Replicas: pointer.Int32(1),
Template: ReplicaTemplate{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "centos",
}},
},
},
}},
},
},
true,
},
{
"main container name not consistent",
args{
ClusterSpec{
ClusterType: "test",
MainContainer: "testmain",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": {
Replicas: pointer.Int32(1),
Template: ReplicaTemplate{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: ClusterDefaultContainerName,
Image: "centos",
}},
},
},
}},
},
},
true,
},
{
"image is empty",
args{
ClusterSpec{
ClusterType: "test",
ClusterReplicaSpec: map[ReplicaType]*ReplicaSpec{"test": {
Replicas: pointer.Int32(1),
Template: ReplicaTemplate{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: ClusterDefaultContainerName,
}},
},
},
}},
},
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateV1alphaClusterSpecs(tt.args.spec); (err != nil) != tt.wantErr {
t.Errorf("validateV1alphaClusterSpecs() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
2 changes: 1 addition & 1 deletion apis/kubecluster.org/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ReplicaSpec struct {
// will be overide by RestartPolicy in ReplicaSpec
Template ReplicaTemplate `json:"template"`

// Restart policy for all replicas within the job.
// Restart policy for all replicas within the cluster.
// One of Always, OnFailure, Never and ExitCode.
// Default to Never.
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion apis/kubecluster.org/v1alpha1/defauting_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SetDefaultRestartPolicy(replicaSpec *ReplicaSpec, defaultRestartPolicy Rest
}
}

func SetDefaultReplicas(replicaSpec *ReplicaSpec, replicas int32) {
func setDefaultReplicas(replicaSpec *ReplicaSpec, replicas int32) {
if replicaSpec != nil && replicaSpec.Replicas == nil {
replicaSpec.Replicas = pointer.Int32(replicas)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfjob --enable-scheme=pytorchjob, case insensitive."+
flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfcluster --enable-scheme=pytorchcluster, case insensitive."+
" Now supporting TFCluster, PyTorchCluster, MXNetCluster, XGBoostCluster, PaddleCluster. By default, all supported schemes will be enabled.")
flag.IntVar(&controllerThreads, "controller-threads", 1, "Number of worker threads used by the controller.")
flag.StringVar(&gangSchedulerName, "gang-scheduler-name", "", "Now Supporting volcano and scheduler-plugins."+
Expand Down Expand Up @@ -137,7 +137,7 @@ func setupController(mgr ctrl.Manager, enabledSchemes cluster_schema.EnabledSche

reconciler := controller.NewReconciler(mgr, gangSchedulingSetupFunc)

// TODO: We need a general manager. all rest ctrlutil addsToManager
// TODO: We need a general manager. all rest util addsToManager
// Based on the user configuration, we start different controllers
if enabledSchemes.Empty() {
enabledSchemes.FillAll()
Expand Down
6 changes: 3 additions & 3 deletions docs/api/kubecluster.org_v1alpha1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ ClusterSpec defines the desired state of KubeCluster
| *`clusterType`* __ClusterType__ | ClusterType define the type of the cluster to be created
| *`clusterReplicaSpec`* __object (keys:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicaspec[$$ReplicaSpec$$])__ | ClusterType define the template of the cluster to be created
| *`mainContainer`* __string__ | MainContainer specifies name of the main container which run as kubenode.
| *`runPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-runpolicy[$$RunPolicy$$]__ | `RunPolicy` encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.
| *`runPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-runpolicy[$$RunPolicy$$]__ | `RunPolicy` encapsulates various runtime policies of the distributed training cluster, for example how to clean up resources and how long the cluster can stay active.
|===


Expand All @@ -103,7 +103,7 @@ ClusterStatus defines the observed state of KubeCluster
| *`replicaStatuses`* __object (keys:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicatype[$$ReplicaType$$], values:xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicastatus[$$ReplicaStatus$$])__ | ReplicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica.
| *`startTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents time when the KubeCluster was acknowledged by the KubeCluster controller. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.
| *`lastReconcileTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents last time when the KubeCluster was reconciled. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.
| *`completionTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents time when the job was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.
| *`completionTime`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#time-v1-meta[$$Time$$]__ | Represents time when the cluster was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.
|===


Expand Down Expand Up @@ -164,7 +164,7 @@ ReplicaSpec is a description of the replica
| Field | Description
| *`replicas`* __integer__ | Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1.
| *`template`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-replicatemplate[$$ReplicaTemplate$$]__ | Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec
| *`restartPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-restartpolicy[$$RestartPolicy$$]__ | Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never.
| *`restartPolicy`* __xref:{anchor_prefix}-github-com-chriskery-kubecluster-apis-kubecluster-org-v1alpha1-restartpolicy[$$RestartPolicy$$]__ | Restart policy for all replicas within the cluster. One of Always, OnFailure, Never and ExitCode. Default to Never.
|===


Expand Down
6 changes: 3 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ pbspro-centos-sample-cpu-0
sharing = default_shared
last_state_change_time = Thu Sep 28 07:05:43 2023
```
Switch to the normal user and submit the job using [qsub](https://www.jlab.org/hpc/PBS/qsub.html)
Switch to the normal user and submit the cluster using [qsub](https://www.jlab.org/hpc/PBS/qsub.html)
```shell
[root@pbspro-centos-sample-server-0 /]# useradd pbsexample
[root@pbspro-centos-sample-server-0 /]# su pbsexample
[pbsexample@pbspro-centos-sample-server-0 /]$ qsub -- hostname
2.pbspro-centos-sample-server-0
[pbsexample@pbspro-centos-sample-server-0 /]$
```
Use [qstat](https://docs.adaptivecomputing.com/torque/4-0-2/Content/topics/commands/qstat.htm) to view the job we just submitted
Use [qstat](https://docs.adaptivecomputing.com/torque/4-0-2/Content/topics/commands/qstat.htm) to view the cluster we just submitted
```
[pbsexample@pbspro-centos-sample-server-0 /]$ qstat -a
pbspro-centos-sample-server-0:
Req'd Req'd Elap
Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time
cluster ID Username Queue clustername SessID NDS TSK Memory Time S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
2.pbspro-centos pbsexamp workq STDIN 1377 1 1 -- -- E 00:00
```
Expand Down
Loading

0 comments on commit ae76eae

Please sign in to comment.