From 7f3960b5ca585127d7ef014671f39d34707de1e9 Mon Sep 17 00:00:00 2001 From: informalict Date: Fri, 18 Oct 2019 15:34:26 +0200 Subject: [PATCH] Fixed bug with changing storage class name Integrtion tests for changing storage clas Add actions with disabling and enabling scaling servers Use Empty function for Plan Add unit tests for changing storage 100% coverage for storage plan Fix race condition when enabling scaling Fix race condition during scaleing and inspection cluster Check documents of a new storage when the storage changes Check content of new storage Adjust to v1 version Adjust integration tests for v1 version Improve integration tests Fix Increase timeout for connection to database --- pkg/apis/deployment/v1/plan.go | 9 + pkg/apis/deployment/v1/server_group_spec.go | 3 + pkg/apis/deployment/v1alpha/plan.go | 9 + .../deployment/v1alpha/server_group_spec.go | 3 + pkg/deployment/cluster_scaling_integration.go | 136 ++++-- pkg/deployment/context_impl.go | 8 + pkg/deployment/deployment.go | 17 + pkg/deployment/reconcile/action_context.go | 15 +- .../action_disable_scaling_cluster.go | 50 ++ .../action_enable_scaling_cluster.go | 50 ++ pkg/deployment/reconcile/context.go | 6 +- pkg/deployment/reconcile/plan_builder.go | 19 +- .../reconcile/plan_builder_rotate_upgrade.go | 4 +- .../reconcile/plan_builder_storage.go | 61 +-- pkg/deployment/reconcile/plan_builder_test.go | 436 +++++++++++++++++- pkg/deployment/reconcile/plan_builder_tls.go | 2 +- pkg/deployment/reconcile/plan_executor.go | 4 + pkg/deployment/resources/pvcs.go | 25 +- tests/persistent_volumes_test.go | 227 +++++++++ tests/test_util.go | 89 +++- 20 files changed, 1072 insertions(+), 101 deletions(-) create mode 100644 pkg/deployment/reconcile/action_disable_scaling_cluster.go create mode 100644 pkg/deployment/reconcile/action_enable_scaling_cluster.go diff --git a/pkg/apis/deployment/v1/plan.go b/pkg/apis/deployment/v1/plan.go index d94945870..5cb01158e 100644 --- a/pkg/apis/deployment/v1/plan.go +++ b/pkg/apis/deployment/v1/plan.go @@ -52,6 +52,10 @@ const ( ActionTypeRenewTLSCACertificate ActionType = "RenewTLSCACertificate" // ActionTypeSetCurrentImage causes status.CurrentImage to be updated to the image given in the action. ActionTypeSetCurrentImage ActionType = "SetCurrentImage" + // ActionTypeDisableClusterScaling turns off scaling DBservers and coordinators + ActionTypeDisableClusterScaling ActionType = "ScalingDisabled" + // ActionTypeEnableClusterScaling turns on scaling DBservers and coordinators + ActionTypeEnableClusterScaling ActionType = "ScalingEnabled" ) const ( @@ -134,3 +138,8 @@ func (p Plan) Equal(other Plan) bool { return true } + +// IsEmpty checks if plan is empty +func (p Plan) IsEmpty() bool { + return len(p) == 0 +} diff --git a/pkg/apis/deployment/v1/server_group_spec.go b/pkg/apis/deployment/v1/server_group_spec.go index 9f362a2ef..3f2becfbb 100644 --- a/pkg/apis/deployment/v1/server_group_spec.go +++ b/pkg/apis/deployment/v1/server_group_spec.go @@ -147,6 +147,9 @@ func (s ServerGroupSpec) GetArgs() []string { // GetStorageClassName returns the value of storageClassName. func (s ServerGroupSpec) GetStorageClassName() string { + if pvc := s.GetVolumeClaimTemplate(); pvc != nil { + return util.StringOrDefault(pvc.Spec.StorageClassName) + } return util.StringOrDefault(s.StorageClassName) } diff --git a/pkg/apis/deployment/v1alpha/plan.go b/pkg/apis/deployment/v1alpha/plan.go index 201b74e4e..188b3761f 100644 --- a/pkg/apis/deployment/v1alpha/plan.go +++ b/pkg/apis/deployment/v1alpha/plan.go @@ -52,6 +52,10 @@ const ( ActionTypeRenewTLSCACertificate ActionType = "RenewTLSCACertificate" // ActionTypeSetCurrentImage causes status.CurrentImage to be updated to the image given in the action. ActionTypeSetCurrentImage ActionType = "SetCurrentImage" + // ActionTypeDisableClusterScaling turns off scaling DBservers and coordinators + ActionTypeDisableClusterScaling ActionType = "ScalingDisabled" + // ActionTypeEnableClusterScaling turns on scaling DBservers and coordinators + ActionTypeEnableClusterScaling ActionType = "ScalingEnabled" ) const ( @@ -134,3 +138,8 @@ func (p Plan) Equal(other Plan) bool { return true } + +// IsEmpty checks if plan is empty +func (p Plan) IsEmpty() bool { + return len(p) == 0 +} diff --git a/pkg/apis/deployment/v1alpha/server_group_spec.go b/pkg/apis/deployment/v1alpha/server_group_spec.go index 28bb2af61..695f0213a 100644 --- a/pkg/apis/deployment/v1alpha/server_group_spec.go +++ b/pkg/apis/deployment/v1alpha/server_group_spec.go @@ -147,6 +147,9 @@ func (s ServerGroupSpec) GetArgs() []string { // GetStorageClassName returns the value of storageClassName. func (s ServerGroupSpec) GetStorageClassName() string { + if pvc := s.GetVolumeClaimTemplate(); pvc != nil { + return util.StringOrDefault(pvc.Spec.StorageClassName) + } return util.StringOrDefault(s.StorageClassName) } diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go index 45dcbbc45..b0541616a 100644 --- a/pkg/deployment/cluster_scaling_integration.go +++ b/pkg/deployment/cluster_scaling_integration.go @@ -49,6 +49,10 @@ type clusterScalingIntegration struct { arangod.NumberOfServers mutex sync.Mutex } + scaleEnabled struct { + mutex sync.Mutex + enabled bool + } } const ( @@ -57,10 +61,12 @@ const ( // newClusterScalingIntegration creates a new clusterScalingIntegration. func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration { - return &clusterScalingIntegration{ + ci := &clusterScalingIntegration{ log: depl.deps.Log, depl: depl, } + ci.scaleEnabled.enabled = true + return ci } // SendUpdateToCluster records the given spec to be sended to the cluster. @@ -70,37 +76,61 @@ func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec ci.pendingUpdate.spec = &spec } +// checkScalingCluster checks if inspection +// returns true if inspection occurred +func (ci *clusterScalingIntegration) checkScalingCluster(expectSuccess bool) bool { + ci.scaleEnabled.mutex.Lock() + defer ci.scaleEnabled.mutex.Unlock() + + if !ci.scaleEnabled.enabled { + // Check if it is possible to turn on scaling without any issue + status, _ := ci.depl.GetStatus() + if status.Plan.IsEmpty() && ci.setNumberOfServers() == nil { + // Scaling should be enabled because there is no Plan. + // It can happen when the enabling action fails + ci.scaleEnabled.enabled = true + } + } + + if ci.depl.GetPhase() != api.DeploymentPhaseRunning || !ci.scaleEnabled.enabled { + // Deployment must be in running state and scaling must be enabled + return false + } + + // Update cluster with our state + ctx := context.Background() + //expectSuccess := *goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime + safeToAskCluster, err := ci.updateClusterServerCount(ctx, expectSuccess) + if err != nil { + if expectSuccess { + ci.log.Debug().Err(err).Msg("Cluster update failed") + } + } else if safeToAskCluster { + // Inspect once + if err := ci.inspectCluster(ctx, expectSuccess); err != nil { + if expectSuccess { + ci.log.Debug().Err(err).Msg("Cluster inspection failed") + } + } else { + return true + } + } + return false +} + // listenForClusterEvents keep listening for changes entered in the UI of the cluster. func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) { start := time.Now() goodInspections := 0 for { - delay := time.Second * 2 - - // Is deployment in running state - if ci.depl.GetPhase() == api.DeploymentPhaseRunning { - // Update cluster with our state - ctx := context.Background() - expectSuccess := goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime - safeToAskCluster, err := ci.updateClusterServerCount(ctx, expectSuccess) - if err != nil { - if expectSuccess { - ci.log.Debug().Err(err).Msg("Cluster update failed") - } - } else if safeToAskCluster { - // Inspect once - if err := ci.inspectCluster(ctx, expectSuccess); err != nil { - if expectSuccess { - ci.log.Debug().Err(err).Msg("Cluster inspection failed") - } - } else { - goodInspections++ - } - } + expectSuccess := goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime + + if ci.checkScalingCluster(expectSuccess) { + goodInspections++ } select { - case <-time.After(delay): + case <-time.After(time.Second * 2): // Continue case <-stopCh: // We're done @@ -200,11 +230,6 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex } log := ci.log - c, err := ci.depl.clientCache.GetDatabase(ctx) - if err != nil { - return false, maskAny(err) - } - var coordinatorCountPtr *int var dbserverCountPtr *int @@ -223,13 +248,11 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex dbserverCountPtr = &dbserverCount } - ci.lastNumberOfServers.mutex.Lock() - lastNumberOfServers := ci.lastNumberOfServers.NumberOfServers - ci.lastNumberOfServers.mutex.Unlock() + lastNumberOfServers := ci.GetLastNumberOfServers() // This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop) if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() { - if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCountPtr, dbserverCountPtr); err != nil { + if err := ci.depl.SetNumberOfServers(ctx, coordinatorCountPtr, dbserverCountPtr); err != nil { if expectSuccess { log.Debug().Err(err).Msg("Failed to set number of servers") } @@ -253,3 +276,50 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex ci.lastNumberOfServers.DBServers = &dbserverCount return safeToAskCluster, nil } + +// GetLastNumberOfServers returns the last number of servers +func (ci *clusterScalingIntegration) GetLastNumberOfServers() arangod.NumberOfServers { + ci.lastNumberOfServers.mutex.Lock() + defer ci.lastNumberOfServers.mutex.Unlock() + + return ci.lastNumberOfServers.NumberOfServers +} + +// DisableScalingCluster disables scaling DBservers and coordinators +func (ci *clusterScalingIntegration) DisableScalingCluster() error { + ci.scaleEnabled.mutex.Lock() + defer ci.scaleEnabled.mutex.Unlock() + + // Turn off scaling DBservers and coordinators in arangoDB for the UI + ctx := context.Background() + if err := ci.depl.SetNumberOfServers(ctx, nil, nil); err != nil { + return maskAny(err) + } + + ci.scaleEnabled.enabled = false + return nil +} + +// EnableScalingCluster enables scaling DBservers and coordinators +func (ci *clusterScalingIntegration) EnableScalingCluster() error { + ci.scaleEnabled.mutex.Lock() + defer ci.scaleEnabled.mutex.Unlock() + + if ci.scaleEnabled.enabled { + return nil + } + + if err := ci.setNumberOfServers(); err != nil { + return maskAny(err) + } + ci.scaleEnabled.enabled = true + return nil +} + +func (ci *clusterScalingIntegration) setNumberOfServers() error { + ctx := context.Background() + spec := ci.depl.GetSpec() + numOfCoordinators := spec.Coordinators.GetCount() + numOfDBServers := spec.DBServers.GetCount() + return ci.depl.SetNumberOfServers(ctx, &numOfCoordinators, &numOfDBServers) +} diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 53f14b60e..9dddf782f 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -418,3 +418,11 @@ func (d *Deployment) GetShardSyncStatus() bool { func (d *Deployment) InvalidateSyncStatus() { d.resources.InvalidateSyncStatus() } + +func (d *Deployment) DisableScalingCluster() error { + return d.clusterScalingIntegration.DisableScalingCluster() +} + +func (d *Deployment) EnableScalingCluster() error { + return d.clusterScalingIntegration.EnableScalingCluster() +} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index c38bde99f..ee17ce900 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -23,11 +23,14 @@ package deployment import ( + "context" "fmt" "sync" "sync/atomic" "time" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + "github.com/arangodb/arangosync-client/client" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -541,3 +544,17 @@ func (d *Deployment) lookForServiceMonitorCRD() { log.Warn().Err(err).Msgf("Error when looking for ServiceMonitor CRD") return } + +// SetNumberOfServers adjust number of DBservers and coordinators in arangod +func (d *Deployment) SetNumberOfServers(ctx context.Context, noCoordinators, noDBServers *int) error { + c, err := d.clientCache.GetDatabase(ctx) + if err != nil { + return maskAny(err) + } + + err = arangod.SetNumberOfServers(ctx, c.Connection(), noCoordinators, noDBServers) + if err != nil { + return maskAny(err) + } + return nil +} diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 2eeda69b4..99baa1d1d 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -90,7 +90,10 @@ type ActionContext interface { InvalidateSyncStatus() // GetSpec returns a copy of the spec GetSpec() api.DeploymentSpec - + // DisableScalingCluster disables scaling DBservers and coordinators + DisableScalingCluster() error + // EnableScalingCluster enables scaling DBservers and coordinators + EnableScalingCluster() error } // newActionContext creates a new ActionContext implementation. @@ -305,3 +308,13 @@ func (ac *actionContext) SetCurrentImage(imageInfo api.ImageInfo) error { func (ac *actionContext) InvalidateSyncStatus() { ac.context.InvalidateSyncStatus() } + +// DisableScalingCluster disables scaling DBservers and coordinators +func (ac *actionContext) DisableScalingCluster() error { + return ac.context.DisableScalingCluster() +} + +// EnableScalingCluster enables scaling DBservers and coordinators +func (ac *actionContext) EnableScalingCluster() error { + return ac.context.EnableScalingCluster() +} diff --git a/pkg/deployment/reconcile/action_disable_scaling_cluster.go b/pkg/deployment/reconcile/action_disable_scaling_cluster.go new file mode 100644 index 000000000..faddcf7ae --- /dev/null +++ b/pkg/deployment/reconcile/action_disable_scaling_cluster.go @@ -0,0 +1,50 @@ +package reconcile + +import ( + "context" + "time" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/rs/zerolog" +) + +// actionDisableScalingCluster implements disabling scaling DBservers and coordinators. +type actionDisableScalingCluster struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext + newMemberID string +} + +// NewDisableScalingCluster creates the new action with disabling scaling DBservers and coordinators. +func NewDisableScalingCluster(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionDisableScalingCluster{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// Start disables scaling DBservers and coordinators +func (a *actionDisableScalingCluster) Start(ctx context.Context) (bool, error) { + err := a.actionCtx.DisableScalingCluster() + if err != nil { + return false, err + } + return true, nil +} + +// CheckProgress does not matter. Everything is done in 'Start' function +func (a *actionDisableScalingCluster) CheckProgress(ctx context.Context) (bool, bool, error) { + return true, false, nil +} + +// Timeout does not matter. Everything is done in 'Start' function +func (a *actionDisableScalingCluster) Timeout() time.Duration { + return 0 +} + +// MemberID is not used +func (a *actionDisableScalingCluster) MemberID() string { + return "" +} diff --git a/pkg/deployment/reconcile/action_enable_scaling_cluster.go b/pkg/deployment/reconcile/action_enable_scaling_cluster.go new file mode 100644 index 000000000..ac4f83b17 --- /dev/null +++ b/pkg/deployment/reconcile/action_enable_scaling_cluster.go @@ -0,0 +1,50 @@ +package reconcile + +import ( + "context" + "time" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/rs/zerolog" +) + +// actionEnableScalingCluster implements enabling scaling DBservers and coordinators. +type actionEnableScalingCluster struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext + newMemberID string +} + +// NewEnableScalingCluster creates the new action with enabling scaling DBservers and coordinators. +func NewEnableScalingCluster(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionEnableScalingCluster{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// Start enables scaling DBservers and coordinators +func (a *actionEnableScalingCluster) Start(ctx context.Context) (bool, error) { + err := a.actionCtx.EnableScalingCluster() + if err != nil { + return false, err + } + return true, nil +} + +// CheckProgress does not matter. Everything is done in 'Start' function +func (a *actionEnableScalingCluster) CheckProgress(ctx context.Context) (bool, bool, error) { + return true, false, nil +} + +// Timeout does not matter. Everything is done in 'Start' function +func (a *actionEnableScalingCluster) Timeout() time.Duration { + return 0 +} + +// MemberID is not used +func (a *actionEnableScalingCluster) MemberID() string { + return "" +} diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index a1c241055..c16a59a09 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -28,7 +28,7 @@ import ( "github.com/arangodb/arangosync-client/client" driver "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -101,4 +101,8 @@ type Context interface { GetShardSyncStatus() bool // InvalidateSyncStatus resets the sync state to false and triggers an inspection InvalidateSyncStatus() + // DisableScalingCluster disables scaling DBservers and coordinators + DisableScalingCluster() error + // EnableScalingCluster enables scaling DBservers and coordinators + EnableScalingCluster() error } diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index f30793856..5b817f528 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -26,7 +26,6 @@ import ( driver "github.com/arangodb/go-driver" upgraderules "github.com/arangodb/go-upgrade-rules" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" @@ -51,7 +50,7 @@ func (d *Reconciler) CreatePlan() error { // Get all current pods pods, err := d.context.GetOwnedPods() if err != nil { - log.Debug().Err(err).Msg("Failed to get owned pods") + d.log.Debug().Err(err).Msg("Failed to get owned pods") return maskAny(err) } @@ -86,7 +85,7 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, currentPlan api.Plan, spec api.DeploymentSpec, status api.DeploymentStatus, pods []v1.Pod, context PlanBuilderContext) (api.Plan, bool) { - if len(currentPlan) > 0 { + if !currentPlan.IsEmpty() { // Plan already exists, complete that first return currentPlan, false } @@ -97,7 +96,7 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, // Check for members in failed state status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error { for _, m := range members { - if m.Phase == api.MemberPhaseFailed && len(plan) == 0 { + if m.Phase == api.MemberPhaseFailed && plan.IsEmpty() { log.Debug(). Str("id", m.ID). Str("role", group.AsRole()). @@ -117,7 +116,7 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, // Check for cleaned out dbserver in created state for _, m := range status.Members.DBServers { - if len(plan) == 0 && m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) { + if plan.IsEmpty() && m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) { log.Debug(). Str("id", m.ID). Str("role", api.ServerGroupDBServers.AsRole()). @@ -130,27 +129,27 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, } // Check for scale up/down - if len(plan) == 0 { + if plan.IsEmpty() { plan = createScaleMemeberPlan(log, spec, status) } // Check for the need to rotate one or more members - if len(plan) == 0 { + if plan.IsEmpty() { plan = createRotateOrUpgradePlan(log, apiObject, spec, status, context, pods) } // Check for the need to rotate TLS certificate of a members - if len(plan) == 0 { + if plan.IsEmpty() { plan = createRotateTLSServerCertificatePlan(log, spec, status, context.GetTLSKeyfile) } // Check for changes storage classes or requirements - if len(plan) == 0 { + if plan.IsEmpty() { plan = createRotateServerStoragePlan(log, apiObject, spec, status, context.GetPvc, context.CreateEvent) } // Check for the need to rotate TLS CA certificate and all members - if len(plan) == 0 { + if plan.IsEmpty() { plan = createRotateTLSCAPlan(log, apiObject, spec, status, context.GetTLSCA, context.CreateEvent) } diff --git a/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go b/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go index 9faf677de..ebe0f608e 100644 --- a/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go +++ b/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go @@ -69,7 +69,7 @@ func createRotateOrUpgradePlan(log zerolog.Logger, apiObject k8sutil.APIObject, return nil } - if len(newPlan) > 0 { + if !newPlan.IsEmpty() { // Only rotate/upgrade 1 pod at a time continue } @@ -91,7 +91,7 @@ func createRotateOrUpgradePlan(log zerolog.Logger, apiObject k8sutil.APIObject, if upgradeNotAllowed { context.CreateEvent(k8sutil.NewUpgradeNotAllowedEvent(apiObject, fromVersion, toVersion, fromLicense, toLicense)) - } else if len(newPlan) > 0 { + } else if !newPlan.IsEmpty() { if clusterReadyForUpgrade(context) { // Use the new plan return newPlan diff --git a/pkg/deployment/reconcile/plan_builder_storage.go b/pkg/deployment/reconcile/plan_builder_storage.go index 67bcb7edc..7e98549dd 100644 --- a/pkg/deployment/reconcile/plan_builder_storage.go +++ b/pkg/deployment/reconcile/plan_builder_storage.go @@ -24,7 +24,7 @@ package reconcile import ( "github.com/rs/zerolog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/util" @@ -43,7 +43,7 @@ func createRotateServerStoragePlan(log zerolog.Logger, apiObject k8sutil.APIObje var plan api.Plan status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error { for _, m := range members { - if len(plan) > 0 { + if !plan.IsEmpty() { // Only 1 change at a time continue } @@ -66,51 +66,36 @@ func createRotateServerStoragePlan(log zerolog.Logger, apiObject k8sutil.APIObje Msg("Failed to get PVC") continue } - replacementNeeded := false + if util.StringOrDefault(pvc.Spec.StorageClassName) != storageClassName && storageClassName != "" { // Storageclass has changed - log.Debug().Str("pod-name", m.PodName). + log.Info().Str("pod-name", m.PodName). Str("pvc-storage-class", util.StringOrDefault(pvc.Spec.StorageClassName)). Str("group-storage-class", storageClassName).Msg("Storage class has changed - pod needs replacement") - replacementNeeded = true - } - rotationNeeded := false - if k8sutil.IsPersistentVolumeClaimFileSystemResizePending(pvc) { - rotationNeeded = true - } - if replacementNeeded { - if group != api.ServerGroupAgents && group != api.ServerGroupDBServers { - // Only agents & dbservers are allowed to change their storage class. - createEvent(k8sutil.NewCannotChangeStorageClassEvent(apiObject, m.ID, group.AsRole(), "Not supported")) - continue - } else { - if group != api.ServerGroupAgents { - plan = append(plan, - // Scale up, so we're sure that a new member is available - api.NewAction(api.ActionTypeAddMember, group, ""), - api.NewAction(api.ActionTypeWaitForMemberUp, group, api.MemberIDPreviousAction), - ) - } - if group == api.ServerGroupDBServers { - plan = append(plan, - // Cleanout - api.NewAction(api.ActionTypeCleanOutMember, group, m.ID), - ) - } + + if group == api.ServerGroupDBServers { plan = append(plan, - // Shutdown & remove the server + api.NewAction(api.ActionTypeDisableClusterScaling, group, ""), + api.NewAction(api.ActionTypeAddMember, group, ""), + api.NewAction(api.ActionTypeWaitForMemberUp, group, api.MemberIDPreviousAction), + api.NewAction(api.ActionTypeCleanOutMember, group, m.ID), api.NewAction(api.ActionTypeShutdownMember, group, m.ID), api.NewAction(api.ActionTypeRemoveMember, group, m.ID), + api.NewAction(api.ActionTypeEnableClusterScaling, group, ""), ) - if group == api.ServerGroupAgents { - plan = append(plan, - // Scale up, so we're adding the removed agent (note: with the old ID) - api.NewAction(api.ActionTypeAddMember, group, m.ID), - api.NewAction(api.ActionTypeWaitForMemberUp, group, m.ID), - ) - } + } else if group == api.ServerGroupAgents { + plan = append(plan, + api.NewAction(api.ActionTypeShutdownMember, group, m.ID), + api.NewAction(api.ActionTypeRemoveMember, group, m.ID), + api.NewAction(api.ActionTypeAddMember, group, m.ID), + api.NewAction(api.ActionTypeWaitForMemberUp, group, m.ID), + ) + } else { + // Only agents & dbservers are allowed to change their storage class. + createEvent(k8sutil.NewCannotChangeStorageClassEvent(apiObject, m.ID, group.AsRole(), "Not supported")) } - } else if rotationNeeded { + } else if k8sutil.IsPersistentVolumeClaimFileSystemResizePending(pvc) { + // rotation needed plan = createRotateMemberPlan(log, m, group, "Filesystem resize pending") } } diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 1d19345b2..02fe40865 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -23,9 +23,14 @@ package reconcile import ( + "context" + "errors" "fmt" + "io/ioutil" "testing" + "github.com/arangodb/arangosync-client/client" + "github.com/arangodb/go-driver/agency" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,7 +43,101 @@ import ( v1 "k8s.io/api/core/v1" ) -type testContext struct{} +type testContext struct { + Pods []v1.Pod + ErrPods error + ArangoDeployment *api.ArangoDeployment + PVC *v1.PersistentVolumeClaim + PVCErr error + RecordedEvent *k8sutil.Event +} + +func (c *testContext) GetAPIObject() k8sutil.APIObject { + if c.ArangoDeployment == nil { + return &api.ArangoDeployment{} + } + return c.ArangoDeployment +} + +func (c *testContext) GetSpec() api.DeploymentSpec { + return c.ArangoDeployment.Spec +} + +func (c *testContext) UpdateStatus(status api.DeploymentStatus, lastVersion int32, force ...bool) error { + c.ArangoDeployment.Status = status + return nil +} + +func (c *testContext) UpdateMember(member api.MemberStatus) error { + panic("implement me") +} + +func (c *testContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) { + panic("implement me") +} + +func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) { + panic("implement me") +} + +func (c *testContext) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) { + panic("implement me") +} + +func (c *testContext) GetAgency(ctx context.Context) (agency.Agency, error) { + panic("implement me") +} + +func (c *testContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) { + panic("implement me") +} + +func (c *testContext) CreateMember(group api.ServerGroup, id string) (string, error) { + panic("implement me") +} + +func (c *testContext) DeletePod(podName string) error { + panic("implement me") +} + +func (c *testContext) DeletePvc(pvcName string) error { + panic("implement me") +} + +func (c *testContext) RemovePodFinalizers(podName string) error { + panic("implement me") +} + +func (c *testContext) GetOwnedPods() ([]v1.Pod, error) { + if c.ErrPods != nil { + return nil, c.ErrPods + } + + if c.Pods == nil { + return make([]v1.Pod, 0), c.ErrPods + } + return c.Pods, c.ErrPods +} + +func (c *testContext) DeleteTLSKeyfile(group api.ServerGroup, member api.MemberStatus) error { + panic("implement me") +} + +func (c *testContext) DeleteSecret(secretName string) error { + panic("implement me") +} + +func (c *testContext) GetDeploymentHealth() (driver.ClusterHealth, error) { + panic("implement me") +} + +func (c *testContext) DisableScalingCluster() error { + panic("implement me") +} + +func (c *testContext) EnableScalingCluster() error { + panic("implement me") +} // GetTLSKeyfile returns the keyfile encoded TLS certificate+key for // the given member. @@ -55,12 +154,12 @@ func (c *testContext) GetTLSCA(secretName string) (string, string, bool, error) // CreateEvent creates a given event. // On error, the error is logged. func (c *testContext) CreateEvent(evt *k8sutil.Event) { - // not implemented + c.RecordedEvent = evt } // GetPvc gets a PVC by the given name, in the samespace of the deployment. func (c *testContext) GetPvc(pvcName string) (*v1.PersistentVolumeClaim, error) { - return nil, maskAny(fmt.Errorf("Not implemented")) + return c.PVC, c.PVCErr } // GetExpectedPodArguments creates command line arguments for a server in the given group with given ID. @@ -79,7 +178,7 @@ func (c *testContext) InvalidateSyncStatus() {} // GetStatus returns the current status of the deployment func (c *testContext) GetStatus() (api.DeploymentStatus, int32) { - return api.DeploymentStatus{}, 0 + return c.ArangoDeployment.Status, 0 } // TestCreatePlanSingleScale creates a `single` deployment to test the creating of scaling plan. @@ -299,3 +398,332 @@ func TestCreatePlanClusterScale(t *testing.T) { assert.Equal(t, api.ServerGroupCoordinators, newPlan[3].Group) assert.Equal(t, api.ServerGroupCoordinators, newPlan[4].Group) } + +type LastLogRecord struct { + msg string +} + +func (l *LastLogRecord) Run(e *zerolog.Event, level zerolog.Level, msg string) { + l.msg = msg +} + +func TestCreatePlan(t *testing.T) { + // Arrange + threeCoordinators := api.MemberStatusList{ + { + ID: "1", + }, + { + ID: "2", + }, + { + ID: "3", + }, + } + twoAgents := api.MemberStatusList{ + { + ID: "1", + }, + { + ID: "2", + }, + } + threeDBServers := api.MemberStatusList{ + { + ID: "1", + }, + { + ID: "2", + }, + { + ID: "3", + }, + } + + deploymentTemplate := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_depl", + Namespace: "test", + }, + Spec: api.DeploymentSpec{ + Mode: api.NewMode(api.DeploymentModeCluster), + TLS: api.TLSSpec{ + CASecretName: util.NewString(api.CASecretNameDisabled), + }, + }, + Status: api.DeploymentStatus{ + Members: api.DeploymentStatusMembers{ + DBServers: threeDBServers, + Coordinators: threeCoordinators, + Agents: twoAgents, + }, + }, + } + deploymentTemplate.Spec.SetDefaults("createPlanTest") + + testCases := []struct { + Name string + context *testContext + Helper func(*api.ArangoDeployment) + ExpectedError error + ExpectedPlan api.Plan + ExpectedLog string + ExpectedEvent *k8sutil.Event + }{ + { + Name: "Can not get pods", + context: &testContext{ + ErrPods: errors.New("fake error"), + }, + ExpectedError: errors.New("fake error"), + ExpectedLog: "Failed to get owned pods", + }, + { + Name: "Can not create plan for single deployment", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.Mode = api.NewMode(api.DeploymentModeSingle) + }, + ExpectedPlan: []api.Action{}, + }, + { + Name: "Can not create plan for not created member", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Status.Members.DBServers[0].Phase = api.MemberPhaseNone + }, + ExpectedPlan: []api.Action{}, + }, + { + Name: "Can not create plan without PVC name", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Status.Members.DBServers[0].Phase = api.MemberPhaseCreated + //ad.Status.Members.DBServers[0].PersistentVolumeClaimName = "" + }, + ExpectedPlan: []api.Action{}, + }, + { + Name: "Getting PVC from kubernetes failed", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + PVCErr: errors.New("fake error"), + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Status.Members.DBServers[0].Phase = api.MemberPhaseCreated + ad.Status.Members.DBServers[0].PersistentVolumeClaimName = "pvc_test" + }, + ExpectedLog: "Failed to get PVC", + }, + { + Name: "Change Storage for DBServers", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + PVC: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("oldStorage"), + }, + }, + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.DBServers = api.ServerGroupSpec{ + Count: util.NewInt(3), + VolumeClaimTemplate: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("newStorage"), + }, + }, + } + ad.Status.Members.DBServers[0].Phase = api.MemberPhaseCreated + ad.Status.Members.DBServers[0].PersistentVolumeClaimName = "pvc_test" + }, + ExpectedPlan: []api.Action{ + api.NewAction(api.ActionTypeDisableClusterScaling, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeWaitForMemberUp, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeCleanOutMember, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeShutdownMember, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeEnableClusterScaling, api.ServerGroupDBServers, ""), + }, + ExpectedLog: "Storage class has changed - pod needs replacement", + }, + { + Name: "Change Storage for Agents with deprecated storage class name", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + PVC: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("oldStorage"), + }, + }, + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.Agents = api.ServerGroupSpec{ + Count: util.NewInt(2), + StorageClassName: util.NewString("newStorage"), + } + ad.Status.Members.Agents[0].Phase = api.MemberPhaseCreated + ad.Status.Members.Agents[0].PersistentVolumeClaimName = "pvc_test" + }, + ExpectedPlan: []api.Action{ + api.NewAction(api.ActionTypeShutdownMember, api.ServerGroupAgents, ""), + api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupAgents, ""), + api.NewAction(api.ActionTypeAddMember, api.ServerGroupAgents, ""), + api.NewAction(api.ActionTypeWaitForMemberUp, api.ServerGroupAgents, ""), + }, + ExpectedLog: "Storage class has changed - pod needs replacement", + }, + { + Name: "Storage for Coordinators is not possible", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + PVC: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("oldStorage"), + }, + }, + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.Coordinators = api.ServerGroupSpec{ + Count: util.NewInt(3), + VolumeClaimTemplate: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("newStorage"), + }, + }, + } + ad.Status.Members.Coordinators[0].Phase = api.MemberPhaseCreated + ad.Status.Members.Coordinators[0].PersistentVolumeClaimName = "pvc_test" + }, + ExpectedPlan: []api.Action{}, + ExpectedLog: "Storage class has changed - pod needs replacement", + ExpectedEvent: &k8sutil.Event{ + Type: v1.EventTypeNormal, + Reason: "Coordinator Member StorageClass Cannot Change", + Message: "Member 1 with role coordinator should use a different StorageClass, but is cannot because: Not supported", + }, + }, + { + Name: "Create rotation plan", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + PVC: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("oldStorage"), + }, + Status: v1.PersistentVolumeClaimStatus{ + Conditions: []v1.PersistentVolumeClaimCondition{ + { + Type: v1.PersistentVolumeClaimFileSystemResizePending, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.Agents = api.ServerGroupSpec{ + Count: util.NewInt(2), + VolumeClaimTemplate: &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: util.NewString("oldStorage"), + }, + }, + } + ad.Status.Members.Agents[0].Phase = api.MemberPhaseCreated + ad.Status.Members.Agents[0].PersistentVolumeClaimName = "pvc_test" + }, + ExpectedPlan: []api.Action{ + api.NewAction(api.ActionTypeRotateMember, api.ServerGroupAgents, ""), + api.NewAction(api.ActionTypeWaitForMemberUp, api.ServerGroupAgents, ""), + }, + ExpectedLog: "Creating rotation plan", + }, + { + Name: "Member in failed state", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.Agents = api.ServerGroupSpec{ + Count: util.NewInt(2), + } + ad.Status.Members.Agents[0].Phase = api.MemberPhaseFailed + }, + ExpectedPlan: []api.Action{ + api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupAgents, ""), + api.NewAction(api.ActionTypeAddMember, api.ServerGroupAgents, ""), + }, + ExpectedLog: "Creating member replacement plan because member has failed", + }, + { + Name: "Scale down DBservers", + context: &testContext{ + ArangoDeployment: deploymentTemplate.DeepCopy(), + }, + Helper: func(ad *api.ArangoDeployment) { + ad.Spec.DBServers = api.ServerGroupSpec{ + Count: util.NewInt(2), + } + ad.Status.Members.DBServers[0].Phase = api.MemberPhaseCreated + ad.Status.Members.DBServers[0].Conditions = api.ConditionList{ + { + Type: api.ConditionTypeCleanedOut, + Status: v1.ConditionTrue, + }, + } + }, + ExpectedPlan: []api.Action{ + api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, ""), + api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""), + }, + ExpectedLog: "Creating dbserver replacement plan because server is cleanout in created phase", + }, + } + + for _, testCase := range testCases { + //nolint:scopelint + t.Run(testCase.Name, func(t *testing.T) { + // Arrange + h := &LastLogRecord{} + logger := zerolog.New(ioutil.Discard).Hook(h) + r := NewReconciler(logger, testCase.context) + + // Act + if testCase.Helper != nil { + testCase.Helper(testCase.context.ArangoDeployment) + } + err := r.CreatePlan() + + // Assert + if testCase.ExpectedEvent != nil { + require.NotNil(t, testCase.context.RecordedEvent) + require.Equal(t, testCase.ExpectedEvent.Type, testCase.context.RecordedEvent.Type) + require.Equal(t, testCase.ExpectedEvent.Message, testCase.context.RecordedEvent.Message) + require.Equal(t, testCase.ExpectedEvent.Reason, testCase.context.RecordedEvent.Reason) + } + if len(testCase.ExpectedLog) > 0 { + require.Equal(t, testCase.ExpectedLog, h.msg) + } + if testCase.ExpectedError != nil { + assert.EqualError(t, err, testCase.ExpectedError.Error()) + return + } + + require.NoError(t, err) + status, _ := testCase.context.GetStatus() + require.Len(t, status.Plan, len(testCase.ExpectedPlan)) + for i, v := range testCase.ExpectedPlan { + assert.Equal(t, v.Type, status.Plan[i].Type) + assert.Equal(t, v.Group, status.Plan[i].Group) + } + }) + } +} diff --git a/pkg/deployment/reconcile/plan_builder_tls.go b/pkg/deployment/reconcile/plan_builder_tls.go index 377414465..0d7caf8bd 100644 --- a/pkg/deployment/reconcile/plan_builder_tls.go +++ b/pkg/deployment/reconcile/plan_builder_tls.go @@ -42,7 +42,7 @@ func createRotateTLSServerCertificatePlan(log zerolog.Logger, spec api.Deploymen var plan api.Plan status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error { for _, m := range members { - if len(plan) > 0 { + if !plan.IsEmpty() { // Only 1 change at a time continue } diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 4c18482ac..0c7b64005 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -183,6 +183,10 @@ func (d *Reconciler) createAction(ctx context.Context, log zerolog.Logger, actio return NewRenewTLSCACertificateAction(log, action, actionCtx) case api.ActionTypeSetCurrentImage: return NewSetCurrentImageAction(log, action, actionCtx) + case api.ActionTypeDisableClusterScaling: + return NewDisableScalingCluster(log, action, actionCtx) + case api.ActionTypeEnableClusterScaling: + return NewEnableScalingCluster(log, action, actionCtx) default: panic(fmt.Sprintf("Unknown action type '%s'", action.Type)) } diff --git a/pkg/deployment/resources/pvcs.go b/pkg/deployment/resources/pvcs.go index 2cddeb5c5..5675c1ab2 100644 --- a/pkg/deployment/resources/pvcs.go +++ b/pkg/deployment/resources/pvcs.go @@ -49,17 +49,22 @@ func (r *Resources) EnsurePVCs() error { pvcs := k8sutil.NewPersistentVolumeClaimCache(kubecli.CoreV1().PersistentVolumeClaims(ns)) if err := iterator.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error { for _, m := range *status { - if m.PersistentVolumeClaimName != "" { - if _, err := pvcs.Get(m.PersistentVolumeClaimName, metav1.GetOptions{}); err != nil { - storageClassName := spec.GetStorageClassName() - role := group.AsRole() - resources := spec.Resources - vct := spec.VolumeClaimTemplate - finalizers := r.createPVCFinalizers(group) - if err := k8sutil.CreatePersistentVolumeClaim(pvcs, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, enforceAntiAffinity, resources, vct, finalizers, owner); err != nil { - return maskAny(err) - } + if m.PersistentVolumeClaimName == "" { + continue + } + + _, err := pvcs.Get(m.PersistentVolumeClaimName, metav1.GetOptions{}) + if k8sutil.IsNotFound(err) { + storageClassName := spec.GetStorageClassName() + role := group.AsRole() + resources := spec.Resources + vct := spec.VolumeClaimTemplate + finalizers := r.createPVCFinalizers(group) + if err := k8sutil.CreatePersistentVolumeClaim(pvcs, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, enforceAntiAffinity, resources, vct, finalizers, owner); err != nil { + return maskAny(err) } + } else if err != nil { + return maskAny(err) } } return nil diff --git a/tests/persistent_volumes_test.go b/tests/persistent_volumes_test.go index 3644d031a..16beeaf22 100644 --- a/tests/persistent_volumes_test.go +++ b/tests/persistent_volumes_test.go @@ -27,6 +27,14 @@ import ( "testing" "time" + "k8s.io/client-go/kubernetes" + + storagev1 "k8s.io/api/storage/v1" + + "github.com/arangodb/kube-arangodb/pkg/util" + + "github.com/stretchr/testify/require" + "github.com/arangodb/arangosync-client/pkg/retry" "github.com/dchest/uniuri" @@ -258,3 +266,222 @@ func TestPVCTemplateResize(t *testing.T) { } } + +func TestPVCChangeStorage(t *testing.T) { + longOrSkip(t) + + k8sNameSpace := getNamespace(t) + arangoClient := kubeArangoClient.MustNewInCluster() + kubecli := mustNewKubeClient(t) + mode := api.DeploymentModeCluster + + defaultStorageClass := getDefaultStorageClassOrDie(t, kubecli) + randomString := strings.ToLower(uniuri.NewLen(4)) + newStorageClassName := defaultStorageClass.GetName() + randomString + + newStorage := defaultStorageClass.DeepCopy() + newStorage.ObjectMeta = metav1.ObjectMeta{ + Name: newStorageClassName, + } + newStorage, err := kubecli.StorageV1().StorageClasses().Create(newStorage) + require.NoError(t, err) + defer func() { + err := kubecli.StorageV1().StorageClasses().Delete(newStorage.Name, &metav1.DeleteOptions{}) + assert.NoError(t, err) + }() + + name := strings.Replace(fmt.Sprintf("tcs-%s-%s", mode[:2], randomString), ".", "", -1) + depl, err := newDeploymentWithValidation(name, func(deployment *api.ArangoDeployment) { + var agentsCount, coordinatorCount, DBServersCount = 3, 2, 3 + + deployment.Spec.Mode = api.NewMode(mode) + deployment.Spec.Environment = api.NewEnvironment(api.EnvironmentProduction) + + volumeMode := corev1.PersistentVolumeFilesystem + deployment.Spec.DBServers.VolumeClaimTemplate = &corev1.PersistentVolumeClaim{ + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + VolumeMode: &volumeMode, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + } + + deployment.Spec.DBServers.Count = util.NewInt(DBServersCount) + deployment.Spec.Agents.Count = util.NewInt(agentsCount) + deployment.Spec.Coordinators.Count = util.NewInt(coordinatorCount) + }) + require.NoError(t, err) + + // Create deployment + _, err = arangoClient.DatabaseV1().ArangoDeployments(k8sNameSpace).Create(depl) + require.NoError(t, err, "failed to create deployment: %s", err) + defer deferedCleanupDeployment(arangoClient, depl.GetName(), k8sNameSpace) + + depl, err = waitUntilDeployment(arangoClient, depl.GetName(), k8sNameSpace, deploymentIsReady()) + require.NoError(t, err, fmt.Sprintf("Deployment not running in time: %s", err)) + require.NotNil(t, depl.Spec.DBServers.VolumeClaimTemplate) + + // Fill collection with documents + documentGenerator := NewDocumentGenerator(kubecli, depl, "collectionTest", 3, 200) + documentGenerator.generate(t, func(documentIndex int) interface{} { + type oneValue struct { + value int + } + return &oneValue{value: documentIndex} + }) + + // Update deployment + _, err = updateDeployment(arangoClient, depl.GetName(), k8sNameSpace, func(spec *api.DeploymentSpec) { + spec.DBServers.VolumeClaimTemplate.Spec.StorageClassName = util.NewString(newStorageClassName) + }) + require.NoError(t, err, "failed to update deployment: %s", err) + + // Check for updated deployment + isStorageChanged := func(deployment *api.ArangoDeployment) error { + pvc := deployment.Spec.DBServers.VolumeClaimTemplate + if pvc == nil { + return fmt.Errorf("persistant volume claim can not be nil") + } + if pvc.Spec.StorageClassName == nil { + return fmt.Errorf("storage class name can not be nil") + } + if *pvc.Spec.StorageClassName != newStorageClassName { + return fmt.Errorf("storage class name has not been changed") + } + + for _, server := range deployment.Status.Members.DBServers { + pvc, err := kubecli.CoreV1().PersistentVolumeClaims(k8sNameSpace).Get(server.PersistentVolumeClaimName, metav1.GetOptions{}) + if err != nil { + return err + } + if pvc.Spec.StorageClassName == nil { + return fmt.Errorf("storage class name can not be nil") + } + if *pvc.Spec.StorageClassName != newStorageClassName { + return fmt.Errorf("storage class name has not been chagned") + } + } + return nil + } + + depl, err = waitUntilDeployment(arangoClient, depl.GetName(), k8sNameSpace, isStorageChanged, time.Minute*5) + require.NoError(t, err, "failed to change storage class for db servers: %s", err) + + // Check if documents are the same in the new storage + documentGenerator.check(t) + + // Cleanup + removeDeployment(arangoClient, depl.GetName(), k8sNameSpace) +} + +// Test deprecated functionality for changing storage class +func TestPVCChangeStorageDeprecated(t *testing.T) { + longOrSkip(t) + + k8sNameSpace := getNamespace(t) + arangoClient := kubeArangoClient.MustNewInCluster() + kubecli := mustNewKubeClient(t) + mode := api.DeploymentModeCluster + + defaultStorageClass := getDefaultStorageClassOrDie(t, kubecli) + randomString := strings.ToLower(uniuri.NewLen(4)) + newStorageClassName := defaultStorageClass.GetName() + randomString + + newStorage := defaultStorageClass.DeepCopy() + newStorage.ObjectMeta = metav1.ObjectMeta{ + Name: newStorageClassName, + } + newStorage, err := kubecli.StorageV1().StorageClasses().Create(newStorage) + require.NoError(t, err) + defer func() { + err := kubecli.StorageV1().StorageClasses().Delete(newStorage.Name, &metav1.DeleteOptions{}) + assert.NoError(t, err) + }() + + name := strings.Replace(fmt.Sprintf("tcs-%s-%s", mode[:2], randomString), ".", "", -1) + depl, err := newDeploymentWithValidation(name, func(deployment *api.ArangoDeployment) { + var agentsCount, coordinatorCount, DBServersCount = 3, 2, 3 + + deployment.Spec.Mode = api.NewMode(mode) + deployment.Spec.Environment = api.NewEnvironment(api.EnvironmentProduction) + + deployment.Spec.DBServers.Resources.Requests = map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("2Gi"), + } + deployment.Spec.DBServers.StorageClassName = util.NewString(defaultStorageClass.Name) + deployment.Spec.DBServers.Count = util.NewInt(DBServersCount) + deployment.Spec.Agents.Count = util.NewInt(agentsCount) + deployment.Spec.Coordinators.Count = util.NewInt(coordinatorCount) + }) + require.NoError(t, err) + + // Create deployment + _, err = arangoClient.DatabaseV1().ArangoDeployments(k8sNameSpace).Create(depl) + require.NoError(t, err, "failed to create deployment: %s", err) + defer deferedCleanupDeployment(arangoClient, depl.GetName(), k8sNameSpace) + + depl, err = waitUntilDeployment(arangoClient, depl.GetName(), k8sNameSpace, deploymentIsReady()) + require.NoError(t, err, fmt.Sprintf("Deployment not running in time: %s", err)) + + // Fill collection with documents + documentGenerator := NewDocumentGenerator(kubecli, depl, "collectionTest", 3, 200) + documentGenerator.generate(t, func(documentIndex int) interface{} { + type oneValue struct { + value int + } + return &oneValue{value: documentIndex} + }) + + // Update deployment + _, err = updateDeployment(arangoClient, depl.GetName(), k8sNameSpace, func(spec *api.DeploymentSpec) { + spec.DBServers.StorageClassName = util.NewString(newStorageClassName) + }) + require.NoError(t, err, "failed to update deployment: %s", err) + + // Check for updated deployment + isDeprecatedStorageChanged := func(deployment *api.ArangoDeployment) error { + for _, server := range deployment.Status.Members.DBServers { + pvc, err := kubecli.CoreV1().PersistentVolumeClaims(k8sNameSpace).Get(server.PersistentVolumeClaimName, metav1.GetOptions{}) + if err != nil { + return err + } + if pvc.Spec.StorageClassName == nil { + return fmt.Errorf("storage class name can not be nil") + } + if *pvc.Spec.StorageClassName != newStorageClassName { + return fmt.Errorf("storage class name has not been chagned") + } + } + return nil + } + + depl, err = waitUntilDeployment(arangoClient, depl.GetName(), k8sNameSpace, isDeprecatedStorageChanged, time.Minute*5) + require.NoError(t, err, "failed to change storage class for db servers: %s", err) + + // Check if documents are the same in the new storage + documentGenerator.check(t) + + // Cleanup + removeDeployment(arangoClient, depl.GetName(), k8sNameSpace) +} + +func getDefaultStorageClassOrDie(t *testing.T, kubecli kubernetes.Interface) *storagev1.StorageClass { + var defaultStorageClass *storagev1.StorageClass + storageClasses, err := kubecli.StorageV1().StorageClasses().List(metav1.ListOptions{}) + require.NoError(t, err) + + for _, sc := range storageClasses.Items { + if k8sutil.StorageClassIsDefault(&sc) { + defaultStorageClass = &sc + break + } + } + require.NotNilf(t, defaultStorageClass, "test needs default storage class") + return defaultStorageClass +} diff --git a/tests/test_util.go b/tests/test_util.go index 523bff4a6..30b2ae951 100644 --- a/tests/test_util.go +++ b/tests/test_util.go @@ -38,6 +38,8 @@ import ( "github.com/arangodb/kube-arangodb/pkg/apis/deployment" "github.com/arangodb/kube-arangodb/pkg/apis/replication" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" @@ -304,6 +306,18 @@ func waitUntilDeploymentMembers(cli versioned.Interface, deploymentName, ns stri }, timeout...) } +func newDeploymentWithValidation(name string, adjustDeployment func(*api.ArangoDeployment)) (*api.ArangoDeployment, error) { + deployment := newDeployment(name) + adjustDeployment(deployment) + + deployment.Spec.SetDefaults(deployment.GetName()) + if err := deployment.Spec.Validate(); err != nil { + return nil, err + } + + return deployment, nil +} + // waitUntilDeployment waits until a deployment with given name in given namespace // reached a state where the given predicate returns true. func waitUntilDeployment(cli versioned.Interface, deploymentName, ns string, predicate func(*api.ArangoDeployment) error, timeout ...time.Duration) (*api.ArangoDeployment, error) { @@ -678,7 +692,7 @@ func removeDeployment(cli versioned.Interface, deploymentName, ns string) error return nil } -// removeReplication removes a deployment +// removeReplication removes a replication func removeReplication(cli versioned.Interface, replicationName, ns string) error { if err := cli.ReplicationV1().ArangoDeploymentReplications(ns).Delete(replicationName, nil); err != nil && k8sutil.IsNotFound(err) { return maskAny(err) @@ -864,3 +878,76 @@ func checkPodCreationTimes(t *testing.T, kubecli kubernetes.Interface, depl *api t.Errorf("Number of pods found (%d) in creation time check does not match expected %d!", len(foundTimes), len(times)) } } + +type DocumentGenerator struct { + kubecli kubernetes.Interface + deployment *api.ArangoDeployment + collectionName string + numberOfShards uint32 + numberOfDocuments uint32 + documentsMeta driver.DocumentMetaSlice +} + +func NewDocumentGenerator(kubecli kubernetes.Interface, deployment *api.ArangoDeployment, + collectionName string, numberOfShards, numberOfDocuments uint32) *DocumentGenerator { + return &DocumentGenerator{ + kubecli: kubecli, + deployment: deployment, + collectionName: collectionName, + numberOfShards: numberOfShards, + numberOfDocuments: numberOfDocuments, + } +} + +func (d *DocumentGenerator) generate(t *testing.T, generator func(int) interface{}) { + + opts := &driver.CreateCollectionOptions{ + NumberOfShards: int(d.numberOfShards), + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + DBClient := mustNewArangodDatabaseClient(ctx, d.kubecli, d.deployment, t, nil) + db, err := DBClient.Database(ctx, "_system") + require.NoError(t, err, "failed to get database") + + collection, err := db.CreateCollection(context.Background(), d.collectionName, opts) + require.NoError(t, err, "failed to create collection") + + d.documentsMeta = make(driver.DocumentMetaSlice, d.numberOfDocuments) + items := make([]interface{}, int(d.numberOfDocuments)) + for i := 0; i < int(d.numberOfDocuments); i++ { + items[i] = generator(i) + } + + var errorSlice driver.ErrorSlice + errorSliceExpected := make(driver.ErrorSlice, int(d.numberOfDocuments)) + d.documentsMeta, errorSlice, err = collection.CreateDocuments(context.Background(), items) + require.NoError(t, err, "failed to create documents") + require.Equal(t, errorSlice, errorSliceExpected) + return +} + +func (d *DocumentGenerator) check(t *testing.T) { + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + DBClient := mustNewArangodDatabaseClient(ctx, d.kubecli, d.deployment, t, nil) + db, err := DBClient.Database(ctx, "_system") + require.NoError(t, err, "failed to get database") + + collection, err := db.Collection(context.Background(), d.collectionName) + require.NoError(t, err, "failed to create collection") + + count, err := collection.Count(context.Background()) + require.NoError(t, err, "failed to get number of documents in the collection") + require.Equal(t, int64(len(d.documentsMeta)), count, "number of documents are not equal") + + for _, m := range d.documentsMeta { + exist, err := collection.DocumentExists(context.Background(), m.Key) + require.NoError(t, err, "failed to create document") + require.Equal(t, true, exist, "document does not exits") + } +}