diff --git a/CHANGELOG.md b/CHANGELOG.md index 76b5d2921..e5a1994ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Add Agency Cache internally - Add Recovery during PlanBuild operation - Fix Exporter in Deployments without authentication +- Allow to disable ClusterScalingIntegration and add proper Scheduled label to pods ## [1.2.5](https://github.com/arangodb/kube-arangodb/tree/1.2.5) (2021-10-25) - Split & Unify Lifecycle management functionality diff --git a/main.go b/main.go index 2854d4f15..b8a763fdf 100644 --- a/main.go +++ b/main.go @@ -116,6 +116,8 @@ var ( enableBackup bool // Run backup operator versionOnly bool // Run only version endpoint, explicitly disabled with other + scalingIntegrationEnabled bool + alpineImage, metricsExporterImage, arangoImage string singleMode bool @@ -158,6 +160,7 @@ func init() { f.StringVar(&operatorOptions.scope, "scope", scope.DefaultScope.String(), "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access") f.DurationVar(&timeouts.k8s, "timeout.k8s", time.Second*3, "The request timeout to the kubernetes") f.DurationVar(&timeouts.arangoD, "timeout.arangod", time.Second*10, "The request timeout to the ArangoDB") + f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "scaling-integration", false, "Enable Scaling Integration") features.Init(&cmdMain) } @@ -368,6 +371,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper EnableStorage: operatorOptions.enableStorage, EnableBackup: operatorOptions.enableBackup, AllowChaos: chaosOptions.allowed, + ScalingIntegrationEnabled: operatorOptions.scalingIntegrationEnabled, ArangoImage: operatorOptions.arangoImage, SingleMode: operatorOptions.singleMode, Scope: scope, diff --git a/pkg/apis/deployment/v1/topology_member_status.go b/pkg/apis/deployment/v1/topology_member_status.go index 3988f6576..6a1847b67 100644 --- a/pkg/apis/deployment/v1/topology_member_status.go +++ b/pkg/apis/deployment/v1/topology_member_status.go @@ -22,8 +22,17 @@ package v1 import "k8s.io/apimachinery/pkg/types" +type TopologyMemberStatusInitPhase string + +const ( + TopologyMemberStatusInitPhaseNone TopologyMemberStatusInitPhase = "" + TopologyMemberStatusInitPhasePending TopologyMemberStatusInitPhase = "pending" + TopologyMemberStatusInitPhaseOK TopologyMemberStatusInitPhase = "ok" +) + type TopologyMemberStatus struct { - ID types.UID `json:"id"` - Zone int `json:"rack"` - Label string `json:"label,omitempty"` + ID types.UID `json:"id"` + Zone int `json:"rack"` + Label string `json:"label,omitempty"` + InitPhase TopologyMemberStatusInitPhase `json:"init_phase,omitempty"` } diff --git a/pkg/apis/deployment/v1/topology_status.go b/pkg/apis/deployment/v1/topology_status.go index 4716e27a3..733d0fa53 100644 --- a/pkg/apis/deployment/v1/topology_status.go +++ b/pkg/apis/deployment/v1/topology_status.go @@ -103,6 +103,28 @@ func (t *TopologyStatus) IsTopologyOwned(m *TopologyMemberStatus) bool { return t.ID == m.ID } +func (t *TopologyStatus) IsTopologyEvenlyDistributed(group ServerGroup) bool { + if t == nil { + return true + } + + max, min := 0, math.MaxInt64 + + for _, z := range t.Zones { + l := len(z.Members[group.AsRoleAbbreviated()]) + + if min > l { + min = l + } + + if max < l { + max = l + } + } + + return min+1 >= max +} + func (t *TopologyStatus) Enabled() bool { return t != nil } diff --git a/pkg/apis/deployment/v2alpha1/topology_member_status.go b/pkg/apis/deployment/v2alpha1/topology_member_status.go index 724106cdb..b19c0f074 100644 --- a/pkg/apis/deployment/v2alpha1/topology_member_status.go +++ b/pkg/apis/deployment/v2alpha1/topology_member_status.go @@ -22,8 +22,17 @@ package v2alpha1 import "k8s.io/apimachinery/pkg/types" +type TopologyMemberStatusInitPhase string + +const ( + TopologyMemberStatusInitPhaseNone TopologyMemberStatusInitPhase = "" + TopologyMemberStatusInitPhasePending TopologyMemberStatusInitPhase = "pending" + TopologyMemberStatusInitPhaseOK TopologyMemberStatusInitPhase = "ok" +) + type TopologyMemberStatus struct { - ID types.UID `json:"id"` - Zone int `json:"rack"` - Label string `json:"label,omitempty"` + ID types.UID `json:"id"` + Zone int `json:"rack"` + Label string `json:"label,omitempty"` + InitPhase TopologyMemberStatusInitPhase `json:"init_phase,omitempty"` } diff --git a/pkg/apis/deployment/v2alpha1/topology_status.go b/pkg/apis/deployment/v2alpha1/topology_status.go index e737b9e9d..10d619df0 100644 --- a/pkg/apis/deployment/v2alpha1/topology_status.go +++ b/pkg/apis/deployment/v2alpha1/topology_status.go @@ -103,6 +103,28 @@ func (t *TopologyStatus) IsTopologyOwned(m *TopologyMemberStatus) bool { return t.ID == m.ID } +func (t *TopologyStatus) IsTopologyEvenlyDistributed(group ServerGroup) bool { + if t == nil { + return true + } + + max, min := 0, math.MaxInt64 + + for _, z := range t.Zones { + l := len(z.Members[group.AsRoleAbbreviated()]) + + if min > l { + min = l + } + + if max < l { + max = l + } + } + + return min+1 >= max +} + func (t *TopologyStatus) Enabled() bool { return t != nil } diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go index 3cb9ffe9a..748591f5f 100644 --- a/pkg/deployment/cluster_scaling_integration.go +++ b/pkg/deployment/cluster_scaling_integration.go @@ -85,9 +85,14 @@ func (ci *clusterScalingIntegration) checkScalingCluster(ctx context.Context, ex ci.scaleEnabled.mutex.Lock() defer ci.scaleEnabled.mutex.Unlock() + if !ci.depl.config.ScalingIntegrationEnabled { + return false + } + + status, _ := ci.depl.GetStatus() + if !ci.scaleEnabled.enabled { // Check if it is possible to turn on scaling without any issue - status, _ := ci.depl.GetStatus() if status.Plan.IsEmpty() && ci.setNumberOfServers(ctx) == nil { // Scaling should be enabled because there is no Plan. // It can happen when the enabling action fails diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index df107dc88..ea52ff6f5 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -31,6 +31,9 @@ import ( "strconv" "time" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + "k8s.io/apimachinery/pkg/types" + "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember" @@ -710,3 +713,23 @@ func (d *Deployment) WithArangoMemberStatusUpdate(ctx context.Context, namespace return nil } + +func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error { + parser := patch.Patch(p) + + data, err := parser.Marshal() + if err != nil { + return err + } + + c := d.deps.KubeCli.CoreV1().Pods(pod.GetNamespace()) + + ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout()) + defer cancel() + _, err = c.Patch(ctxChild, pod.GetName(), types.JSONPatchType, data, meta.PatchOptions{}) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 1cead1ba2..be8d32756 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -70,11 +70,12 @@ import ( // Config holds configuration settings for a Deployment type Config struct { - ServiceAccount string - AllowChaos bool - OperatorImage string - ArangoImage string - Scope scope.Scope + ServiceAccount string + AllowChaos bool + ScalingIntegrationEnabled bool + OperatorImage string + ArangoImage string + Scope scope.Scope } // Dependencies holds dependent services for a Deployment diff --git a/pkg/deployment/resources/context.go b/pkg/deployment/resources/context.go index d71cc587a..200a11007 100644 --- a/pkg/deployment/resources/context.go +++ b/pkg/deployment/resources/context.go @@ -26,6 +26,8 @@ package resources import ( "context" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember" @@ -138,6 +140,10 @@ type ArangoAgency interface { RefreshAgencyCache(ctx context.Context) (uint64, error) } +type ArangoApplier interface { + ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error +} + // Context provides all functions needed by the Resources service // to perform its service. type Context interface { @@ -148,6 +154,7 @@ type Context interface { DeploymentModInterfaces DeploymentCachedStatus ArangoAgency + ArangoApplier // GetAPIObject returns the deployment as k8s object. GetAPIObject() k8sutil.APIObject diff --git a/pkg/deployment/resources/pod_creator.go b/pkg/deployment/resources/pod_creator.go index 695958c96..414628ca5 100644 --- a/pkg/deployment/resources/pod_creator.go +++ b/pkg/deployment/resources/pod_creator.go @@ -552,9 +552,16 @@ func (r *Resources) createPodForMember(ctx context.Context, cachedStatus inspect member.GetPhaseExecutor().Execute(&m, api.Action{}, newPhase) - if status.Topology.Enabled() { - if m.Topology != nil && m.Topology.ID == status.Topology.ID { - m.Conditions.Update(api.ConditionTypeTopologyAware, true, "Topology Aware", "Topology Aware") + if top := status.Topology; top.Enabled() { + if m.Topology != nil && m.Topology.ID == top.ID { + if top.IsTopologyEvenlyDistributed(group) { + m.Conditions.Update(api.ConditionTypeTopologyAware, true, "Topology Aware", "Topology Aware") + } else { + m.Conditions.Update(api.ConditionTypeTopologyAware, false, "Topology Aware", "Topology invalid") + } + if m.Topology.InitPhase == api.TopologyMemberStatusInitPhaseNone { + m.Topology.InitPhase = api.TopologyMemberStatusInitPhasePending + } } else { m.Conditions.Update(api.ConditionTypeTopologyAware, false, "Topology spec missing", "Topology spec missing") } diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index c2555c2a0..d37ddc16d 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -28,6 +28,8 @@ import ( "fmt" "time" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + "github.com/arangodb/kube-arangodb/pkg/util/errors" inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" @@ -184,6 +186,20 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter } } + if k8sutil.IsPodScheduled(pod) { + if _, ok := pod.Labels[k8sutil.LabelKeyArangoScheduled]; k8sutil.IsPodScheduled(pod) && !ok { + // Adding scheduled label to the pod + l := pod.Labels + if l == nil { + l = map[string]string{} + } + l[k8sutil.LabelKeyArangoScheduled] = "1" + if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), l)); err != nil { + log.Error().Err(err).Msgf("Unable to update scheduled labels") + } + } + } + if k8sutil.IsContainerReady(pod, k8sutil.ServerContainerName) { // Pod is now ready if memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", "") { @@ -199,6 +215,10 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter memberStatus.Topology.Label = label } } + + if memberStatus.Topology.InitPhase == api.TopologyMemberStatusInitPhasePending { + memberStatus.Topology.InitPhase = api.TopologyMemberStatusInitPhaseOK + } } } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index fd900ee9d..b5ade5d3b 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -94,6 +94,7 @@ type Config struct { EnableStorage bool EnableBackup bool AllowChaos bool + ScalingIntegrationEnabled bool SingleMode bool Scope scope.Scope } diff --git a/pkg/operator/operator_deployment.go b/pkg/operator/operator_deployment.go index 2e4ca2b0f..f4ae5bac3 100644 --- a/pkg/operator/operator_deployment.go +++ b/pkg/operator/operator_deployment.go @@ -203,11 +203,12 @@ func (o *Operator) handleDeploymentEvent(event *Event) error { // makeDeploymentConfigAndDeps creates a Config & Dependencies object for a new Deployment. func (o *Operator) makeDeploymentConfigAndDeps(apiObject *api.ArangoDeployment) (deployment.Config, deployment.Dependencies) { cfg := deployment.Config{ - ServiceAccount: o.Config.ServiceAccount, - OperatorImage: o.Config.OperatorImage, - ArangoImage: o.ArangoImage, - AllowChaos: o.Config.AllowChaos, - Scope: o.Scope, + ServiceAccount: o.Config.ServiceAccount, + OperatorImage: o.Config.OperatorImage, + ArangoImage: o.ArangoImage, + AllowChaos: o.Config.AllowChaos, + ScalingIntegrationEnabled: o.Config.ScalingIntegrationEnabled, + Scope: o.Scope, } deps := deployment.Dependencies{ Log: o.Dependencies.LogService.MustGetLogger(logging.LoggerNameDeployment).With(). diff --git a/pkg/util/k8sutil/util.go b/pkg/util/k8sutil/util.go index 635199971..e3ae55c2d 100644 --- a/pkg/util/k8sutil/util.go +++ b/pkg/util/k8sutil/util.go @@ -48,6 +48,8 @@ const ( LabelKeyArangoMember = "deployment.arangodb.com/member" // LabelKeyArangoZone is the key of the label used to store the ArangoDeployment zone ID in LabelKeyArangoZone = "deployment.arangodb.com/zone" + // LabelKeyArangoScheduled is the key of the label used to define that member is already scheduled + LabelKeyArangoScheduled = "deployment.arangodb.com/scheduled" // LabelKeyArangoTopology is the key of the label used to store the ArangoDeployment topology ID in LabelKeyArangoTopology = "deployment.arangodb.com/topology"