Skip to content

Commit

Permalink
feat: Support setting ParallelPodManagement concurrency (#7879)
Browse files Browse the repository at this point in the history
Signed-off-by: Liang Deng <283304489@qq.com>
  • Loading branch information
YTGhost committed Jul 26, 2024
1 parent 220ede3 commit daeb239
Show file tree
Hide file tree
Showing 24 changed files with 412 additions and 83 deletions.
8 changes: 8 additions & 0 deletions apis/apps/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
Expand Down Expand Up @@ -783,6 +784,13 @@ type ClusterComponentSpec struct {
// +optional
UpdateStrategy *UpdateStrategy `json:"updateStrategy,omitempty"`

// Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
// or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
// The default Concurrency is 100%.
//
// +optional
ParallelPodManagementConcurrency *intstr.IntOrString `json:"parallelPodManagementConcurrency,omitempty"`

// PodUpdatePolicy indicates how pods should be updated
//
// - `StrictInPlace` indicates that only allows in-place upgrades.
Expand Down
8 changes: 8 additions & 0 deletions apis/apps/v1alpha1/component_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
)
Expand Down Expand Up @@ -172,6 +173,13 @@ type ComponentSpec struct {
// +optional
ServiceAccountName string `json:"serviceAccountName,omitempty"`

// Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
// or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
// The default Concurrency is 100%.
//
// +optional
ParallelPodManagementConcurrency *intstr.IntOrString `json:"parallelPodManagementConcurrency,omitempty"`

// PodUpdatePolicy indicates how pods should be updated
//
// - `StrictInPlace` indicates that only allows in-place upgrades.
Expand Down
11 changes: 11 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions apis/workloads/v1alpha1/instanceset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -304,6 +305,13 @@ type InstanceSetSpec struct {
// +optional
PodManagementPolicy appsv1.PodManagementPolicyType `json:"podManagementPolicy,omitempty"`

// Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
// or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
// The default Concurrency is 100%.
//
// +optional
ParallelPodManagementConcurrency *intstr.IntOrString `json:"parallelPodManagementConcurrency,omitempty"`

// PodUpdatePolicy indicates how pods should be updated
//
// - `StrictInPlace` indicates that only allows in-place upgrades.
Expand Down
6 changes: 6 additions & 0 deletions apis/workloads/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3808,6 +3808,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
podUpdatePolicy:
description: |-
PodUpdatePolicy indicates how pods should be updated
Expand Down Expand Up @@ -10366,6 +10375,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
podUpdatePolicy:
description: |-
PodUpdatePolicy indicates how pods should be updated
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3615,6 +3615,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
podUpdatePolicy:
description: |-
PodUpdatePolicy indicates how pods should be updated
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/workloads.kubeblocks.io_instancesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3899,6 +3899,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
paused:
description: Indicates that the InstanceSet is paused, meaning the
reconciliation of this InstanceSet object will be paused.
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/transformer_cluster_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func copyAndMergeComponent(oldCompObj, newCompObj *appsv1alpha1.Component) *apps
// compObjCopy.Spec.Monitor = compProto.Spec.Monitor
compObjCopy.Spec.EnabledLogs = compProto.Spec.EnabledLogs
compObjCopy.Spec.ServiceAccountName = compProto.Spec.ServiceAccountName
compObjCopy.Spec.ParallelPodManagementConcurrency = compProto.Spec.ParallelPodManagementConcurrency
compObjCopy.Spec.PodUpdatePolicy = compProto.Spec.PodUpdatePolicy
compObjCopy.Spec.Affinity = compProto.Spec.Affinity
compObjCopy.Spec.Tolerations = compProto.Spec.Tolerations
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet, synthesizeComp *comp
itsObjCopy.Spec.OfflineInstances = itsProto.Spec.OfflineInstances
itsObjCopy.Spec.MinReadySeconds = itsProto.Spec.MinReadySeconds
itsObjCopy.Spec.VolumeClaimTemplates = itsProto.Spec.VolumeClaimTemplates
itsObjCopy.Spec.ParallelPodManagementConcurrency = itsProto.Spec.ParallelPodManagementConcurrency

if itsProto.Spec.UpdateStrategy.Type != "" || itsProto.Spec.UpdateStrategy.RollingUpdate != nil {
updateUpdateStrategy(itsObjCopy, itsProto)
Expand Down
18 changes: 18 additions & 0 deletions deploy/helm/crds/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3808,6 +3808,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
podUpdatePolicy:
description: |-
PodUpdatePolicy indicates how pods should be updated
Expand Down Expand Up @@ -10366,6 +10375,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
podUpdatePolicy:
description: |-
PodUpdatePolicy indicates how pods should be updated
Expand Down
9 changes: 9 additions & 0 deletions deploy/helm/crds/apps.kubeblocks.io_components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3615,6 +3615,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
podUpdatePolicy:
description: |-
PodUpdatePolicy indicates how pods should be updated
Expand Down
9 changes: 9 additions & 0 deletions deploy/helm/crds/workloads.kubeblocks.io_instancesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3899,6 +3899,15 @@ spec:
items:
type: string
type: array
parallelPodManagementConcurrency:
anyOf:
- type: integer
- type: string
description: |-
Controls the concurrency of pods during initial scale up, when replacing pods on nodes,
or when scaling down. It only used when `PodManagementPolicy` is set to `Parallel`.
The default Concurrency is 100%.
x-kubernetes-int-or-string: true
paused:
description: Indicates that the InstanceSet is paused, meaning the
reconciliation of this InstanceSet object will be paused.
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/builder/builder_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package builder

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
Expand Down Expand Up @@ -91,6 +92,11 @@ func (builder *ComponentBuilder) SetServiceAccountName(serviceAccountName string
return builder
}

func (builder *ComponentBuilder) SetParallelPodManagementConcurrency(parallelPodManagementConcurrency *intstr.IntOrString) *ComponentBuilder {
builder.get().Spec.ParallelPodManagementConcurrency = parallelPodManagementConcurrency
return builder
}

func (builder *ComponentBuilder) SetPodUpdatePolicy(policy *workloads.PodUpdatePolicyType) *ComponentBuilder {
builder.get().Spec.PodUpdatePolicy = policy
return builder
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/builder/builder_instance_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
)
Expand Down Expand Up @@ -107,6 +108,11 @@ func (builder *InstanceSetBuilder) SetPodManagementPolicy(policy apps.PodManagem
return builder
}

func (builder *InstanceSetBuilder) SetParallelPodManagementConcurrency(parallelPodManagementConcurrency *intstr.IntOrString) *InstanceSetBuilder {
builder.get().Spec.ParallelPodManagementConcurrency = parallelPodManagementConcurrency
return builder
}

func (builder *InstanceSetBuilder) SetPodUpdatePolicy(policy workloads.PodUpdatePolicyType) *InstanceSetBuilder {
builder.get().Spec.PodUpdatePolicy = policy
return builder
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/builder/builder_instance_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var _ = Describe("instance_set builder", func() {
policy = apps.OrderedReadyPodManagement
podUpdatePolicy = workloads.PreferInPlacePodUpdatePolicyType
)
parallelPodManagementConcurrency := &intstr.IntOrString{Type: intstr.String, StrVal: "100%"}
selectors := map[string]string{selectorKey4: selectorValue4}
role := workloads.ReplicaRole{
Name: "foo",
Expand Down Expand Up @@ -166,6 +167,7 @@ var _ = Describe("instance_set builder", func() {
SetVolumeClaimTemplates(vcs...).
AddVolumeClaimTemplates(vc).
SetPodManagementPolicy(policy).
SetParallelPodManagementConcurrency(parallelPodManagementConcurrency).
SetPodUpdatePolicy(podUpdatePolicy).
SetUpdateStrategy(strategy).
SetUpdateStrategyType(strategyType).
Expand Down Expand Up @@ -198,6 +200,7 @@ var _ = Describe("instance_set builder", func() {
Expect(its.Spec.VolumeClaimTemplates[0]).Should(Equal(vcs[0]))
Expect(its.Spec.VolumeClaimTemplates[1]).Should(Equal(vc))
Expect(its.Spec.PodManagementPolicy).Should(Equal(policy))
Expect(its.Spec.ParallelPodManagementConcurrency).Should(Equal(parallelPodManagementConcurrency))
Expect(its.Spec.PodUpdatePolicy).Should(Equal(podUpdatePolicy))
Expect(its.Spec.UpdateStrategy.Type).Should(Equal(strategyType))
Expect(its.Spec.UpdateStrategy.RollingUpdate).ShouldNot(BeNil())
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func BuildComponent(cluster *appsv1alpha1.Cluster, compSpec *appsv1alpha1.Cluste
SetReplicas(compSpec.Replicas).
SetResources(compSpec.Resources).
SetServiceAccountName(compSpec.ServiceAccountName).
SetParallelPodManagementConcurrency(compSpec.ParallelPodManagementConcurrency).
SetPodUpdatePolicy(compSpec.PodUpdatePolicy).
SetVolumeClaimTemplates(compSpec.VolumeClaimTemplates).
SetVolumes(compSpec.Volumes).
Expand Down
40 changes: 28 additions & 12 deletions pkg/controller/component/rsm_convertor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
Expand All @@ -40,18 +41,19 @@ func BuildWorkloadFrom(synthesizeComp *SynthesizedComponent, protoITS *workloads
protoITS = &workloads.InstanceSet{}
}
convertors := map[string]convertor{
"service": &itsServiceConvertor{},
"alternativeservices": &itsAlternativeServicesConvertor{},
"roles": &itsRolesConvertor{},
"roleprobe": &itsRoleProbeConvertor{},
"credential": &itsCredentialConvertor{},
"membershipreconfiguration": &itsMembershipReconfigurationConvertor{},
"memberupdatestrategy": &itsMemberUpdateStrategyConvertor{},
"podmanagementpolicy": &itsPodManagementPolicyConvertor{},
"podupdatepolicy": &itsPodUpdatePolicyConvertor{},
"updatestrategy": &itsUpdateStrategyConvertor{},
"instances": &itsInstancesConvertor{},
"offlineinstances": &itsOfflineInstancesConvertor{},
"service": &itsServiceConvertor{},
"alternativeservices": &itsAlternativeServicesConvertor{},
"roles": &itsRolesConvertor{},
"roleprobe": &itsRoleProbeConvertor{},
"credential": &itsCredentialConvertor{},
"membershipreconfiguration": &itsMembershipReconfigurationConvertor{},
"memberupdatestrategy": &itsMemberUpdateStrategyConvertor{},
"podmanagementpolicy": &itsPodManagementPolicyConvertor{},
"parallelpodmanagementconcurrency": &itsParallelPodManagementConcurrencyConvertor{},
"podupdatepolicy": &itsPodUpdatePolicyConvertor{},
"updatestrategy": &itsUpdateStrategyConvertor{},
"instances": &itsInstancesConvertor{},
"offlineinstances": &itsOfflineInstancesConvertor{},
}
if err := covertObject(convertors, &protoITS.Spec, synthesizeComp); err != nil {
return nil, err
Expand Down Expand Up @@ -106,6 +108,20 @@ func (c *itsPodManagementPolicyConvertor) convert(args ...any) (any, error) {
return appsv1.ParallelPodManagement, nil
}

// itsParallelPodManagementConcurrencyConvertor is an implementation of the convertor interface, used to convert the given object into InstanceSet.Spec.ParallelPodManagementConcurrency.
type itsParallelPodManagementConcurrencyConvertor struct{}

func (c *itsParallelPodManagementConcurrencyConvertor) convert(args ...any) (any, error) {
synthesizedComp, err := parseITSConvertorArgs(args...)
if err != nil {
return nil, err
}
if synthesizedComp.ParallelPodManagementConcurrency != nil {
return synthesizedComp.ParallelPodManagementConcurrency, nil
}
return &intstr.IntOrString{Type: intstr.String, StrVal: "100%"}, nil
}

// itsPodUpdatePolicyConvertor is an implementation of the convertor interface, used to convert the given object into InstanceSet.Spec.PodUpdatePolicy.
type itsPodUpdatePolicyConvertor struct{}

Expand Down
Loading

0 comments on commit daeb239

Please sign in to comment.