Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Add Agency Cache internally
- Add Recovery during PlanBuild operation
- Fix Exporter in Deployments without authentication
- Allow to disable ClusterScalingIntegration and add proper Scheduled label to pods

## [1.2.5](https://github.com/arangodb/kube-arangodb/tree/1.2.5) (2021-10-25)
- Split & Unify Lifecycle management functionality
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ var (
enableBackup bool // Run backup operator
versionOnly bool // Run only version endpoint, explicitly disabled with other

scalingIntegrationEnabled bool

alpineImage, metricsExporterImage, arangoImage string

singleMode bool
Expand Down Expand Up @@ -158,6 +160,7 @@ func init() {
f.StringVar(&operatorOptions.scope, "scope", scope.DefaultScope.String(), "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access")
f.DurationVar(&timeouts.k8s, "timeout.k8s", time.Second*3, "The request timeout to the kubernetes")
f.DurationVar(&timeouts.arangoD, "timeout.arangod", time.Second*10, "The request timeout to the ArangoDB")
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "scaling-integration", false, "Enable Scaling Integration")
features.Init(&cmdMain)
}

Expand Down Expand Up @@ -368,6 +371,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper
EnableStorage: operatorOptions.enableStorage,
EnableBackup: operatorOptions.enableBackup,
AllowChaos: chaosOptions.allowed,
ScalingIntegrationEnabled: operatorOptions.scalingIntegrationEnabled,
ArangoImage: operatorOptions.arangoImage,
SingleMode: operatorOptions.singleMode,
Scope: scope,
Expand Down
15 changes: 12 additions & 3 deletions pkg/apis/deployment/v1/topology_member_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@ package v1

import "k8s.io/apimachinery/pkg/types"

type TopologyMemberStatusInitPhase string

const (
TopologyMemberStatusInitPhaseNone TopologyMemberStatusInitPhase = ""
TopologyMemberStatusInitPhasePending TopologyMemberStatusInitPhase = "pending"
TopologyMemberStatusInitPhaseOK TopologyMemberStatusInitPhase = "ok"
)

type TopologyMemberStatus struct {
ID types.UID `json:"id"`
Zone int `json:"rack"`
Label string `json:"label,omitempty"`
ID types.UID `json:"id"`
Zone int `json:"rack"`
Label string `json:"label,omitempty"`
InitPhase TopologyMemberStatusInitPhase `json:"init_phase,omitempty"`
}
22 changes: 22 additions & 0 deletions pkg/apis/deployment/v1/topology_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,28 @@ func (t *TopologyStatus) IsTopologyOwned(m *TopologyMemberStatus) bool {
return t.ID == m.ID
}

func (t *TopologyStatus) IsTopologyEvenlyDistributed(group ServerGroup) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have a description here. What function should return when t.Zones is empty. Currently,
the function IsTopologyEvenlyDistributed returns false (not distributed evenly) when t.Zones are empty. Is it intentional? The description of the function should explain it because I don't know the intensions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And t.Zones cant be with size 0 - it is checked in code. If it is, then false is expected

if t == nil {
return true
}

max, min := 0, math.MaxInt64

for _, z := range t.Zones {
l := len(z.Members[group.AsRoleAbbreviated()])

if min > l {
min = l
}

if max < l {
max = l
}
}

return min+1 >= max
}

func (t *TopologyStatus) Enabled() bool {
return t != nil
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/apis/deployment/v2alpha1/topology_member_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@ package v2alpha1

import "k8s.io/apimachinery/pkg/types"

type TopologyMemberStatusInitPhase string

const (
TopologyMemberStatusInitPhaseNone TopologyMemberStatusInitPhase = ""
TopologyMemberStatusInitPhasePending TopologyMemberStatusInitPhase = "pending"
TopologyMemberStatusInitPhaseOK TopologyMemberStatusInitPhase = "ok"
)

type TopologyMemberStatus struct {
ID types.UID `json:"id"`
Zone int `json:"rack"`
Label string `json:"label,omitempty"`
ID types.UID `json:"id"`
Zone int `json:"rack"`
Label string `json:"label,omitempty"`
InitPhase TopologyMemberStatusInitPhase `json:"init_phase,omitempty"`
}
22 changes: 22 additions & 0 deletions pkg/apis/deployment/v2alpha1/topology_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,28 @@ func (t *TopologyStatus) IsTopologyOwned(m *TopologyMemberStatus) bool {
return t.ID == m.ID
}

func (t *TopologyStatus) IsTopologyEvenlyDistributed(group ServerGroup) bool {
if t == nil {
return true
}

max, min := 0, math.MaxInt64

for _, z := range t.Zones {
l := len(z.Members[group.AsRoleAbbreviated()])

if min > l {
min = l
}

if max < l {
max = l
}
}

return min+1 >= max
}

func (t *TopologyStatus) Enabled() bool {
return t != nil
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/deployment/cluster_scaling_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@ func (ci *clusterScalingIntegration) checkScalingCluster(ctx context.Context, ex
ci.scaleEnabled.mutex.Lock()
defer ci.scaleEnabled.mutex.Unlock()

if !ci.depl.config.ScalingIntegrationEnabled {
return false
}

status, _ := ci.depl.GetStatus()

if !ci.scaleEnabled.enabled {
// Check if it is possible to turn on scaling without any issue
status, _ := ci.depl.GetStatus()
if status.Plan.IsEmpty() && ci.setNumberOfServers(ctx) == nil {
// Scaling should be enabled because there is no Plan.
// It can happen when the enabling action fails
Expand Down
23 changes: 23 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"strconv"
"time"

"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"k8s.io/apimachinery/pkg/types"

"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
Expand Down Expand Up @@ -710,3 +713,23 @@ func (d *Deployment) WithArangoMemberStatusUpdate(ctx context.Context, namespace

return nil
}

func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error {
parser := patch.Patch(p)

data, err := parser.Marshal()
if err != nil {
return err
}

c := d.deps.KubeCli.CoreV1().Pods(pod.GetNamespace())

ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
_, err = c.Patch(ctxChild, pod.GetName(), types.JSONPatchType, data, meta.PatchOptions{})
if err != nil {
return err
}

return nil
}
11 changes: 6 additions & 5 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ import (

// Config holds configuration settings for a Deployment
type Config struct {
ServiceAccount string
AllowChaos bool
OperatorImage string
ArangoImage string
Scope scope.Scope
ServiceAccount string
AllowChaos bool
ScalingIntegrationEnabled bool
OperatorImage string
ArangoImage string
Scope scope.Scope
}

// Dependencies holds dependent services for a Deployment
Expand Down
7 changes: 7 additions & 0 deletions pkg/deployment/resources/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package resources
import (
"context"

"github.com/arangodb/kube-arangodb/pkg/deployment/patch"

agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
Expand Down Expand Up @@ -138,6 +140,10 @@ type ArangoAgency interface {
RefreshAgencyCache(ctx context.Context) (uint64, error)
}

type ArangoApplier interface {
ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error
}

// Context provides all functions needed by the Resources service
// to perform its service.
type Context interface {
Expand All @@ -148,6 +154,7 @@ type Context interface {
DeploymentModInterfaces
DeploymentCachedStatus
ArangoAgency
ArangoApplier

// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
Expand Down
13 changes: 10 additions & 3 deletions pkg/deployment/resources/pod_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,16 @@ func (r *Resources) createPodForMember(ctx context.Context, cachedStatus inspect

member.GetPhaseExecutor().Execute(&m, api.Action{}, newPhase)

if status.Topology.Enabled() {
if m.Topology != nil && m.Topology.ID == status.Topology.ID {
m.Conditions.Update(api.ConditionTypeTopologyAware, true, "Topology Aware", "Topology Aware")
if top := status.Topology; top.Enabled() {
if m.Topology != nil && m.Topology.ID == top.ID {
if top.IsTopologyEvenlyDistributed(group) {
m.Conditions.Update(api.ConditionTypeTopologyAware, true, "Topology Aware", "Topology Aware")
} else {
m.Conditions.Update(api.ConditionTypeTopologyAware, false, "Topology Aware", "Topology invalid")
}
if m.Topology.InitPhase == api.TopologyMemberStatusInitPhaseNone {
m.Topology.InitPhase = api.TopologyMemberStatusInitPhasePending
}
} else {
m.Conditions.Update(api.ConditionTypeTopologyAware, false, "Topology spec missing", "Topology spec missing")
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"fmt"
"time"

"github.com/arangodb/kube-arangodb/pkg/deployment/patch"

"github.com/arangodb/kube-arangodb/pkg/util/errors"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"

Expand Down Expand Up @@ -184,6 +186,20 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
}
}

if k8sutil.IsPodScheduled(pod) {
if _, ok := pod.Labels[k8sutil.LabelKeyArangoScheduled]; k8sutil.IsPodScheduled(pod) && !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we check k8sutil.IsPodScheduled(pod) twice? in line 189 it was checked

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it - small issue which just use cycles

// Adding scheduled label to the pod
l := pod.Labels
if l == nil {
l = map[string]string{}
}
l[k8sutil.LabelKeyArangoScheduled] = "1"
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), l)); err != nil {
log.Error().Err(err).Msgf("Unable to update scheduled labels")
}
}
}

if k8sutil.IsContainerReady(pod, k8sutil.ServerContainerName) {
// Pod is now ready
if memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", "") {
Expand All @@ -199,6 +215,10 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
memberStatus.Topology.Label = label
}
}

if memberStatus.Topology.InitPhase == api.TopologyMemberStatusInitPhasePending {
memberStatus.Topology.InitPhase = api.TopologyMemberStatusInitPhaseOK
}
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Config struct {
EnableStorage bool
EnableBackup bool
AllowChaos bool
ScalingIntegrationEnabled bool
SingleMode bool
Scope scope.Scope
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/operator/operator_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,12 @@ func (o *Operator) handleDeploymentEvent(event *Event) error {
// makeDeploymentConfigAndDeps creates a Config & Dependencies object for a new Deployment.
func (o *Operator) makeDeploymentConfigAndDeps(apiObject *api.ArangoDeployment) (deployment.Config, deployment.Dependencies) {
cfg := deployment.Config{
ServiceAccount: o.Config.ServiceAccount,
OperatorImage: o.Config.OperatorImage,
ArangoImage: o.ArangoImage,
AllowChaos: o.Config.AllowChaos,
Scope: o.Scope,
ServiceAccount: o.Config.ServiceAccount,
OperatorImage: o.Config.OperatorImage,
ArangoImage: o.ArangoImage,
AllowChaos: o.Config.AllowChaos,
ScalingIntegrationEnabled: o.Config.ScalingIntegrationEnabled,
Scope: o.Scope,
}
deps := deployment.Dependencies{
Log: o.Dependencies.LogService.MustGetLogger(logging.LoggerNameDeployment).With().
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/k8sutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
LabelKeyArangoMember = "deployment.arangodb.com/member"
// LabelKeyArangoZone is the key of the label used to store the ArangoDeployment zone ID in
LabelKeyArangoZone = "deployment.arangodb.com/zone"
// LabelKeyArangoScheduled is the key of the label used to define that member is already scheduled
LabelKeyArangoScheduled = "deployment.arangodb.com/scheduled"
// LabelKeyArangoTopology is the key of the label used to store the ArangoDeployment topology ID in
LabelKeyArangoTopology = "deployment.arangodb.com/topology"

Expand Down