Skip to content

Commit

Permalink
feat: support start and stop through cluster API
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Jul 4, 2024
1 parent a813daf commit 5450ff2
Show file tree
Hide file tree
Showing 25 changed files with 514 additions and 283 deletions.
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,11 @@ type ClusterComponentSpec struct {
// +optional
// +kubebuilder:deprecatedversion:warning="This field has been deprecated since 0.10.0"
Monitor *bool `json:"monitor,omitempty"`

// Specifies the expected status of the Component.
//
// +optional
Status *Status `json:"status,omitempty"`
}

type ComponentMessageMap map[string]string
Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/component_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ type ComponentSpec struct {
//
// +optional
DisableExporter *bool `json:"disableExporter,omitempty"`

// Specifies the expected status of the Component.
//
// +optional
Status *Status `json:"status,omitempty"`
}

// ComponentStatus represents the observed state of a Component within the Cluster.
Expand Down
3 changes: 3 additions & 0 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,9 @@ type LastComponentConfiguration struct {
// Records the name of the ComponentDefinition prior to any changes.
// +optional
ComponentDefinitionName string `json:"componentDefinitionName,omitempty"`

// +optional
Status *Status `json:"status,omitempty"`
}

type LastConfiguration struct {
Expand Down
27 changes: 27 additions & 0 deletions apis/apps/v1alpha1/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,33 @@ const (
UnavailablePhase Phase = "Unavailable"
)

// State represents the state of a CR object.
//
// +enum
// +kubebuilder:validation:Enum={Running,Stopped}
type State string

const (
// StateRunning indicates that the object is in a running state.
StateRunning State = "Running"

// StateStopped indicates that the object is in a stopped state.
StateStopped State = "Stopped"

// TODO: paused & halted
)

// Status represents the status of a CR object.
type Status struct {
// Represents the state of the object.
//
// +kubebuilder:validation:Required
State State `json:"state"`

// +optional
Generation *int64 `json:"generation,omitempty"`
}

// OpsPhase defines opsRequest phase.
// +enum
// +kubebuilder:validation:Enum={Pending,Creating,Running,Cancelling,Cancelled,Aborted,Failed,Succeed}
Expand Down
35 changes: 35 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5202,6 +5202,21 @@ spec:
- name
type: object
type: array
status:
description: Specifies the expected status of the Component.
properties:
generation:
format: int64
type: integer
state:
description: Represents the state of the object.
enum:
- Running
- Stopped
type: string
required:
- state
type: object
switchPolicy:
description: |-
Defines the strategy for switchover and failover when workloadType is Replication.
Expand Down Expand Up @@ -13876,6 +13891,21 @@ spec:
- name
type: object
type: array
status:
description: Specifies the expected status of the Component.
properties:
generation:
format: int64
type: integer
state:
description: Represents the state of the object.
enum:
- Running
- Stopped
type: string
required:
- state
type: object
switchPolicy:
description: |-
Defines the strategy for switchover and failover when workloadType is Replication.
Expand Down
15 changes: 15 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5217,6 +5217,21 @@ spec:
- name
type: object
type: array
status:
description: Specifies the expected status of the Component.
properties:
generation:
format: int64
type: integer
state:
description: Represents the state of the object.
enum:
- Running
- Stopped
type: string
required:
- state
type: object
systemAccounts:
description: Overrides system accounts defined in referenced ComponentDefinition.
items:
Expand Down
15 changes: 15 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8371,6 +8371,21 @@ spec:
- name
type: object
type: array
status:
description: Status represents the status of a CR object.
properties:
generation:
format: int64
type: integer
state:
description: Represents the state of the object.
enum:
- Running
- Stopped
type: string
required:
- state
type: object
targetResources:
additionalProperties:
items:
Expand Down
76 changes: 8 additions & 68 deletions controllers/apps/operations/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"encoding/json"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)
Expand Down Expand Up @@ -55,40 +53,20 @@ func (start StartOpsHandler) ActionStartedCondition(reqCtx intctrlutil.RequestCt

// Action modifies Cluster.spec.components[*].replicas from the opsRequest
func (start StartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
cluster := opsRes.Cluster
componentReplicasMap, err := getComponentReplicasSnapshot(cluster.Annotations)
if err != nil {
return intctrlutil.NewFatalError(err.Error())
}
applyReplicas := func(compSpec *appsv1alpha1.ClusterComponentSpec, componentName string) {
componentKey := getComponentKeyForStartSnapshot(componentName, "")
replicasOfSnapshot := componentReplicasMap[componentKey]
if replicasOfSnapshot == 0 {
return
}
// only reset the component whose replicas number is 0
if compSpec.Replicas == 0 {
compSpec.Replicas = replicasOfSnapshot
for i := range compSpec.Instances {
componentKey = getComponentKeyForStartSnapshot(componentName, compSpec.Instances[i].Name)
replicasOfSnapshot = componentReplicasMap[componentKey]
if replicasOfSnapshot == 0 {
continue
}
compSpec.Instances[i].Replicas = &replicasOfSnapshot
var (
cluster = opsRes.Cluster
startComp = func(compSpec *appsv1alpha1.ClusterComponentSpec) {
compSpec.Status = &appsv1alpha1.Status{
State: appsv1alpha1.StateRunning,
}
}
}
)
for i := range cluster.Spec.ComponentSpecs {
compSpec := &cluster.Spec.ComponentSpecs[i]
applyReplicas(compSpec, compSpec.Name)
startComp(&cluster.Spec.ComponentSpecs[i])
}
for i := range cluster.Spec.ShardingSpecs {
shardingSpec := &cluster.Spec.ShardingSpecs[i]
applyReplicas(&shardingSpec.Template, shardingSpec.Name)
startComp(&cluster.Spec.ShardingSpecs[i].Template)
}
// delete the replicas snapshot of components from the cluster.
delete(cluster.Annotations, constant.SnapShotForStartAnnotationKey)
return cli.Update(reqCtx.Ctx, cluster)
}

Expand All @@ -110,44 +88,6 @@ func (start StartOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli

// SaveLastConfiguration records last configuration to the OpsRequest.status.lastConfiguration
func (start StartOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
componentReplicasMap, err := getComponentReplicasSnapshot(opsRes.Cluster.Annotations)
if err != nil {
return intctrlutil.NewFatalError(err.Error())
}
if err = start.setOpsAnnotation(reqCtx, cli, opsRes, componentReplicasMap); err != nil {
return err
}
saveLastConfigurationForStopAndStart(opsRes)
return nil
}

// setOpsAnnotation sets the replicas snapshot of components before stopping the cluster to the annotations of this opsRequest.
func (start StartOpsHandler) setOpsAnnotation(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource, componentReplicasMap map[string]int32) error {
annotations := opsRes.OpsRequest.Annotations
if annotations == nil {
annotations = map[string]string{}
}
componentReplicasSnapshot, err := json.Marshal(componentReplicasMap)
if err != nil {
return err
}
if _, ok := opsRes.OpsRequest.Annotations[constant.SnapShotForStartAnnotationKey]; !ok {
patch := client.MergeFrom(opsRes.OpsRequest.DeepCopy())
annotations[constant.SnapShotForStartAnnotationKey] = string(componentReplicasSnapshot)
opsRes.OpsRequest.Annotations = annotations
return cli.Patch(reqCtx.Ctx, opsRes.OpsRequest, patch)
}
return nil
}

// getComponentReplicasSnapshot gets the replicas snapshot of components from annotations.
func getComponentReplicasSnapshot(annotations map[string]string) (map[string]int32, error) {
componentReplicasMap := map[string]int32{}
snapshotForStart := annotations[constant.SnapShotForStartAnnotationKey]
if len(snapshotForStart) != 0 {
if err := json.Unmarshal([]byte(snapshotForStart), &componentReplicasMap); err != nil {
return componentReplicasMap, err
}
}
return componentReplicasMap, nil
}
18 changes: 2 additions & 16 deletions controllers/apps/operations/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"encoding/json"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
opsutil "github.com/apecloud/kubeblocks/controllers/apps/operations/util"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
Expand Down Expand Up @@ -74,23 +71,12 @@ var _ = Describe("Start OpsRequest", func() {
testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp)
testapps.MockInstanceSetComponent(&testCtx, clusterName, statelessComp)
testapps.MockInstanceSetComponent(&testCtx, clusterName, statefulComp)
By("mock cluster annotations for start opsRequest")
// mock snapshot annotation for cluster
componentReplicasMap := map[string]int32{}
for _, v := range opsRes.Cluster.Spec.ComponentSpecs {
componentReplicasMap[v.Name] = v.Replicas
}
componentReplicasSnapshot, _ := json.Marshal(componentReplicasMap)
opsRes.Cluster.Annotations = map[string]string{
constant.SnapShotForStartAnnotationKey: string(componentReplicasSnapshot),
}
By("create Start opsRequest")
ops := testapps.NewOpsRequestObj("start-ops-"+randomStr, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.StartType)
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)

By("test start action and reconcile function")
oldComponentReplicasMap, _ := getComponentReplicasSnapshot(opsRes.Cluster.Annotations)
Expect(opsutil.UpdateClusterOpsAnnotations(ctx, k8sClient, opsRes.Cluster, nil)).Should(Succeed())
// mock cluster phase to stopped
Expect(testapps.ChangeObjStatus(&testCtx, opsRes.Cluster, func() {
Expand All @@ -106,8 +92,8 @@ var _ = Describe("Start OpsRequest", func() {
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
for _, v := range opsRes.Cluster.Spec.ComponentSpecs {
oldReplicas := oldComponentReplicasMap[v.Name]
Expect(oldReplicas == v.Replicas).Should(BeTrue())
Expect(v.Status).ShouldNot(BeNil())
Expect(v.Status.State).ShouldNot(Equal(appsv1alpha1.StateRunning))
}
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err == nil).Should(BeTrue())
Expand Down
Loading

0 comments on commit 5450ff2

Please sign in to comment.