Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/apis/deployment/v1/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
ActionTypeRenewTLSCACertificate ActionType = "RenewTLSCACertificate"
// ActionTypeSetCurrentImage causes status.CurrentImage to be updated to the image given in the action.
ActionTypeSetCurrentImage ActionType = "SetCurrentImage"
// ActionTypeDisableClusterScaling turns off scaling DBservers and coordinators
ActionTypeDisableClusterScaling ActionType = "ScalingDisabled"
// ActionTypeEnableClusterScaling turns on scaling DBservers and coordinators
ActionTypeEnableClusterScaling ActionType = "ScalingEnabled"
)

const (
Expand Down Expand Up @@ -134,3 +138,8 @@ func (p Plan) Equal(other Plan) bool {

return true
}

// IsEmpty checks if plan is empty
func (p Plan) IsEmpty() bool {
return len(p) == 0
}
3 changes: 3 additions & 0 deletions pkg/apis/deployment/v1/server_group_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (s ServerGroupSpec) GetArgs() []string {

// GetStorageClassName returns the value of storageClassName.
func (s ServerGroupSpec) GetStorageClassName() string {
if pvc := s.GetVolumeClaimTemplate(); pvc != nil {
return util.StringOrDefault(pvc.Spec.StorageClassName)
}
return util.StringOrDefault(s.StorageClassName)
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/deployment/v1alpha/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
ActionTypeRenewTLSCACertificate ActionType = "RenewTLSCACertificate"
// ActionTypeSetCurrentImage causes status.CurrentImage to be updated to the image given in the action.
ActionTypeSetCurrentImage ActionType = "SetCurrentImage"
// ActionTypeDisableClusterScaling turns off scaling DBservers and coordinators
ActionTypeDisableClusterScaling ActionType = "ScalingDisabled"
// ActionTypeEnableClusterScaling turns on scaling DBservers and coordinators
ActionTypeEnableClusterScaling ActionType = "ScalingEnabled"
)

const (
Expand Down Expand Up @@ -134,3 +138,8 @@ func (p Plan) Equal(other Plan) bool {

return true
}

// IsEmpty checks if plan is empty
func (p Plan) IsEmpty() bool {
return len(p) == 0
}
3 changes: 3 additions & 0 deletions pkg/apis/deployment/v1alpha/server_group_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (s ServerGroupSpec) GetArgs() []string {

// GetStorageClassName returns the value of storageClassName.
func (s ServerGroupSpec) GetStorageClassName() string {
if pvc := s.GetVolumeClaimTemplate(); pvc != nil {
return util.StringOrDefault(pvc.Spec.StorageClassName)
}
return util.StringOrDefault(s.StorageClassName)
}

Expand Down
136 changes: 103 additions & 33 deletions pkg/deployment/cluster_scaling_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type clusterScalingIntegration struct {
arangod.NumberOfServers
mutex sync.Mutex
}
scaleEnabled struct {
mutex sync.Mutex
enabled bool
}
}

const (
Expand All @@ -57,10 +61,12 @@ const (

// newClusterScalingIntegration creates a new clusterScalingIntegration.
func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration {
return &clusterScalingIntegration{
ci := &clusterScalingIntegration{
log: depl.deps.Log,
depl: depl,
}
ci.scaleEnabled.enabled = true
return ci
}

// SendUpdateToCluster records the given spec to be sended to the cluster.
Expand All @@ -70,37 +76,61 @@ func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec
ci.pendingUpdate.spec = &spec
}

// checkScalingCluster checks if inspection
// returns true if inspection occurred
func (ci *clusterScalingIntegration) checkScalingCluster(expectSuccess bool) bool {
ci.scaleEnabled.mutex.Lock()
defer ci.scaleEnabled.mutex.Unlock()

if !ci.scaleEnabled.enabled {
// Check if it is possible to turn on scaling without any issue
status, _ := ci.depl.GetStatus()
if status.Plan.IsEmpty() && ci.setNumberOfServers() == nil {
// Scaling should be enabled because there is no Plan.
// It can happen when the enabling action fails
ci.scaleEnabled.enabled = true
}
}

if ci.depl.GetPhase() != api.DeploymentPhaseRunning || !ci.scaleEnabled.enabled {
// Deployment must be in running state and scaling must be enabled
return false
}

// Update cluster with our state
ctx := context.Background()
//expectSuccess := *goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime
safeToAskCluster, err := ci.updateClusterServerCount(ctx, expectSuccess)
if err != nil {
if expectSuccess {
ci.log.Debug().Err(err).Msg("Cluster update failed")
}
} else if safeToAskCluster {
// Inspect once
if err := ci.inspectCluster(ctx, expectSuccess); err != nil {
if expectSuccess {
ci.log.Debug().Err(err).Msg("Cluster inspection failed")
}
} else {
return true
}
}
return false
}

// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) {
start := time.Now()
goodInspections := 0
for {
delay := time.Second * 2

// Is deployment in running state
if ci.depl.GetPhase() == api.DeploymentPhaseRunning {
// Update cluster with our state
ctx := context.Background()
expectSuccess := goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime
safeToAskCluster, err := ci.updateClusterServerCount(ctx, expectSuccess)
if err != nil {
if expectSuccess {
ci.log.Debug().Err(err).Msg("Cluster update failed")
}
} else if safeToAskCluster {
// Inspect once
if err := ci.inspectCluster(ctx, expectSuccess); err != nil {
if expectSuccess {
ci.log.Debug().Err(err).Msg("Cluster inspection failed")
}
} else {
goodInspections++
}
}
expectSuccess := goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime

if ci.checkScalingCluster(expectSuccess) {
goodInspections++
}

select {
case <-time.After(delay):
case <-time.After(time.Second * 2):
// Continue
case <-stopCh:
// We're done
Expand Down Expand Up @@ -200,11 +230,6 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
}

log := ci.log
c, err := ci.depl.clientCache.GetDatabase(ctx)
if err != nil {
return false, maskAny(err)
}

var coordinatorCountPtr *int
var dbserverCountPtr *int

Expand All @@ -223,13 +248,11 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
dbserverCountPtr = &dbserverCount
}

ci.lastNumberOfServers.mutex.Lock()
lastNumberOfServers := ci.lastNumberOfServers.NumberOfServers
ci.lastNumberOfServers.mutex.Unlock()
lastNumberOfServers := ci.GetLastNumberOfServers()

// This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop)
if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() {
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCountPtr, dbserverCountPtr); err != nil {
if err := ci.depl.SetNumberOfServers(ctx, coordinatorCountPtr, dbserverCountPtr); err != nil {
if expectSuccess {
log.Debug().Err(err).Msg("Failed to set number of servers")
}
Expand All @@ -253,3 +276,50 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
ci.lastNumberOfServers.DBServers = &dbserverCount
return safeToAskCluster, nil
}

// GetLastNumberOfServers returns the last number of servers
func (ci *clusterScalingIntegration) GetLastNumberOfServers() arangod.NumberOfServers {
ci.lastNumberOfServers.mutex.Lock()
defer ci.lastNumberOfServers.mutex.Unlock()

return ci.lastNumberOfServers.NumberOfServers
}

// DisableScalingCluster disables scaling DBservers and coordinators
func (ci *clusterScalingIntegration) DisableScalingCluster() error {
ci.scaleEnabled.mutex.Lock()
defer ci.scaleEnabled.mutex.Unlock()

// Turn off scaling DBservers and coordinators in arangoDB for the UI
ctx := context.Background()
if err := ci.depl.SetNumberOfServers(ctx, nil, nil); err != nil {
return maskAny(err)
}

ci.scaleEnabled.enabled = false
return nil
}

// EnableScalingCluster enables scaling DBservers and coordinators
func (ci *clusterScalingIntegration) EnableScalingCluster() error {
ci.scaleEnabled.mutex.Lock()
defer ci.scaleEnabled.mutex.Unlock()

if ci.scaleEnabled.enabled {
return nil
}

if err := ci.setNumberOfServers(); err != nil {
return maskAny(err)
}
ci.scaleEnabled.enabled = true
return nil
}

func (ci *clusterScalingIntegration) setNumberOfServers() error {
ctx := context.Background()
spec := ci.depl.GetSpec()
numOfCoordinators := spec.Coordinators.GetCount()
numOfDBServers := spec.DBServers.GetCount()
return ci.depl.SetNumberOfServers(ctx, &numOfCoordinators, &numOfDBServers)
}
8 changes: 8 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,11 @@ func (d *Deployment) GetShardSyncStatus() bool {
func (d *Deployment) InvalidateSyncStatus() {
d.resources.InvalidateSyncStatus()
}

func (d *Deployment) DisableScalingCluster() error {
return d.clusterScalingIntegration.DisableScalingCluster()
}

func (d *Deployment) EnableScalingCluster() error {
return d.clusterScalingIntegration.EnableScalingCluster()
}
17 changes: 17 additions & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
package deployment

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/arangod"

"github.com/arangodb/arangosync-client/client"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -541,3 +544,17 @@ func (d *Deployment) lookForServiceMonitorCRD() {
log.Warn().Err(err).Msgf("Error when looking for ServiceMonitor CRD")
return
}

// SetNumberOfServers adjust number of DBservers and coordinators in arangod
func (d *Deployment) SetNumberOfServers(ctx context.Context, noCoordinators, noDBServers *int) error {
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
}

err = arangod.SetNumberOfServers(ctx, c.Connection(), noCoordinators, noDBServers)
if err != nil {
return maskAny(err)
}
return nil
}
15 changes: 14 additions & 1 deletion pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ type ActionContext interface {
InvalidateSyncStatus()
// GetSpec returns a copy of the spec
GetSpec() api.DeploymentSpec

// DisableScalingCluster disables scaling DBservers and coordinators
DisableScalingCluster() error
// EnableScalingCluster enables scaling DBservers and coordinators
EnableScalingCluster() error
}

// newActionContext creates a new ActionContext implementation.
Expand Down Expand Up @@ -305,3 +308,13 @@ func (ac *actionContext) SetCurrentImage(imageInfo api.ImageInfo) error {
func (ac *actionContext) InvalidateSyncStatus() {
ac.context.InvalidateSyncStatus()
}

// DisableScalingCluster disables scaling DBservers and coordinators
func (ac *actionContext) DisableScalingCluster() error {
return ac.context.DisableScalingCluster()
}

// EnableScalingCluster enables scaling DBservers and coordinators
func (ac *actionContext) EnableScalingCluster() error {
return ac.context.EnableScalingCluster()
}
50 changes: 50 additions & 0 deletions pkg/deployment/reconcile/action_disable_scaling_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package reconcile

import (
"context"
"time"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/rs/zerolog"
)

// actionDisableScalingCluster implements disabling scaling DBservers and coordinators.
type actionDisableScalingCluster struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
newMemberID string
}

// NewDisableScalingCluster creates the new action with disabling scaling DBservers and coordinators.
func NewDisableScalingCluster(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionDisableScalingCluster{
log: log,
action: action,
actionCtx: actionCtx,
}
}

// Start disables scaling DBservers and coordinators
func (a *actionDisableScalingCluster) Start(ctx context.Context) (bool, error) {
err := a.actionCtx.DisableScalingCluster()
if err != nil {
return false, err
}
return true, nil
}

// CheckProgress does not matter. Everything is done in 'Start' function
func (a *actionDisableScalingCluster) CheckProgress(ctx context.Context) (bool, bool, error) {
return true, false, nil
}

// Timeout does not matter. Everything is done in 'Start' function
func (a *actionDisableScalingCluster) Timeout() time.Duration {
return 0
}

// MemberID is not used
func (a *actionDisableScalingCluster) MemberID() string {
return ""
}
Loading