Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- (Feature) (License) Activation API Integration
- (Feature) (Platform) Chart & Service Kubernetes Events
- (Feature) (Platform) Registry Secret
- (Bugfix) (Platform) Ensure Inventory picks active leader

## [1.3.1](https://github.com/arangodb/kube-arangodb/tree/1.3.1) (2025-10-07)
- (Documentation) Add ArangoPlatformStorage Docs & Examples
Expand Down
50 changes: 28 additions & 22 deletions pkg/deployment/reconcile/plan_builder_license.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package reconcile

import (
"context"
"math/rand"
"time"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
Expand Down Expand Up @@ -117,6 +118,29 @@ func (r *Reconciler) updateClusterLicenseDiscover(spec api.DeploymentSpec, conte
return "", errors.Errorf("Unable to discover License mode")
}

func (r *Reconciler) updateClusterLicenseMember(status api.DeploymentStatus) (api.DeploymentStatusMemberElement, bool) {
members := status.Members.AsListInGroups(arangod.GroupsWithLicenseV2()...).Filter(func(a api.DeploymentStatusMemberElement) bool {
i := a.Member.Image
if i == nil {
return false
}

return i.ArangoDBVersion.CompareTo("3.9.0") >= 0 && i.Enterprise
}).Filter(func(a api.DeploymentStatusMemberElement) bool {
return a.Member.Conditions.IsTrue(api.ConditionTypeReady)
})

if len(members) == 0 {
return api.DeploymentStatusMemberElement{}, false
}

if len(members) == 1 {
return members[0], true
}

return members[rand.Intn(len(members))], true
}

func (r *Reconciler) updateClusterLicenseKey(ctx context.Context, spec api.DeploymentSpec, status api.DeploymentStatus, context PlanBuilderContext) api.Plan {
l, err := k8sutil.GetLicenseFromSecret(context.ACS().CurrentClusterCache(), spec.License.GetSecretName())
if err != nil {
Expand All @@ -129,23 +153,14 @@ func (r *Reconciler) updateClusterLicenseKey(ctx context.Context, spec api.Deplo
return nil
}

members := status.Members.AsListInGroups(arangod.GroupsWithLicenseV2()...).Filter(func(a api.DeploymentStatusMemberElement) bool {
i := a.Member.Image
if i == nil {
return false
}
member, ok := r.updateClusterLicenseMember(status)

return i.ArangoDBVersion.CompareTo("3.9.0") >= 0 && i.Enterprise
})

if len(members) == 0 {
if !ok {
// No member found to take this action
r.log.Trace("No enterprise member in version 3.9.0 or above")
return nil
}

member := members[0]

ctxChild, cancel := globals.GetGlobals().Timeouts().ArangoD().WithTimeout(ctx)
defer cancel()

Expand Down Expand Up @@ -187,23 +202,14 @@ func (r *Reconciler) updateClusterLicenseAPI(ctx context.Context, spec api.Deplo
return nil
}

members := status.Members.AsListInGroups(api.ServerGroupCoordinators, api.ServerGroupSingle).Filter(func(a api.DeploymentStatusMemberElement) bool {
i := a.Member.Image
if i == nil {
return false
}

return i.ArangoDBVersion.CompareTo("3.9.0") >= 0 && i.Enterprise
})
member, ok := r.updateClusterLicenseMember(status)

if len(members) == 0 {
if !ok {
// No member found to take this action
r.log.Trace("No enterprise member in version 3.9.0 or above")
return nil
}

member := members[0]

ctxChild, cancel := globals.GetGlobals().Timeouts().ArangoD().WithTimeout(ctx)
defer cancel()

Expand Down
19 changes: 19 additions & 0 deletions pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
}
}

if spec.Mode.Get() == api.DeploymentModeActiveFailover && features.FailoverLeadership().Enabled() && group == api.ServerGroupSingle {
s, ok := r.context.GetMembersState().MemberState(memberStatus.ID)
if ok && s.IsServing() {
if v, ok := pod.Labels[k8sutil.LabelKeyArangoLeader]; !ok || v != k8sutil.LabelValueArangoActive {
pod.Labels[k8sutil.LabelKeyArangoLeader] = k8sutil.LabelValueArangoActive
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), pod.Labels)); err != nil {
log.Str("pod-name", pod.GetName()).Err(err).Error("Unable to update labels")
}
}
} else {
if _, ok := pod.Labels[k8sutil.LabelKeyArangoLeader]; ok {
delete(pod.Labels, k8sutil.LabelKeyArangoLeader)
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), pod.Labels)); err != nil {
log.Str("pod-name", pod.GetName()).Err(err).Error("Unable to update labels")
}
}
}
}

if memberStatus.Conditions.IsTrue(api.ConditionTypeActive) {
if v, ok := pod.Labels[k8sutil.LabelKeyArangoActive]; !ok || v != k8sutil.LabelValueArangoActive {
pod.Labels[k8sutil.LabelKeyArangoActive] = k8sutil.LabelValueArangoActive
Expand Down
176 changes: 2 additions & 174 deletions pkg/deployment/resources/pod_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,12 @@ package resources

import (
"context"
goStrings "strings"
"sync"

core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
Expand All @@ -46,7 +41,7 @@ import (
// consequentially service will not point to any pod.
// It works only in active fail-over mode.
func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
if r.context.GetSpec().GetMode() != api.DeploymentModeActiveFailover {
if !r.context.GetSpec().GetMode().HasAgents() {
return nil
}

Expand Down Expand Up @@ -129,7 +124,7 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
return err
} else {
if !c {
return r.ensureSingleServerLeader(ctx, cachedStatus)
return nil
}

return errors.Reconcile()
Expand All @@ -150,170 +145,3 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
// The service has been created.
return errors.Reconcile()
}

// getSingleServerLeaderID returns ids of a single server leaders.
func (r *Resources) getSingleServerLeaderID(ctx context.Context) ([]string, error) {
status := r.context.GetStatus()
var mutex sync.Mutex
var leaderIDs []string
var anyError error

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
for _, m := range status.Members.Single {
wg.Add(1)
go func(id string) {
defer wg.Done()
err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error {
c, err := r.context.GetMembersState().GetMemberClient(id)
if err != nil {
return err
}

if available, err := arangod.IsServerAvailable(ctxChild, c); err != nil {
return err
} else if !available {
return errors.New("not available")
}

mutex.Lock()
leaderIDs = append(leaderIDs, id)
mutex.Unlock()
return nil
})

if err != nil {
mutex.Lock()
anyError = err
mutex.Unlock()
}
}(m.ID)
}
wg.Wait()

if len(leaderIDs) > 0 {
return leaderIDs, nil
}

if anyError != nil {
return nil, errors.WithMessagef(anyError, "unable to get a leader")
}

return nil, errors.New("unable to get a leader")
}

// setSingleServerLeadership adds or removes leadership label on a single server pod.
func (r *Resources) ensureSingleServerLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
changed := false

enabled := features.FailoverLeadership().Enabled()
var leaderID string
if enabled {
leaderIDs, err := r.getSingleServerLeaderID(ctx)
if err != nil {
return err
}

if len(leaderIDs) == 1 {
leaderID = leaderIDs[0]
} else if len(leaderIDs) > 1 {
r.log.Error("multiple leaders found: %s. Blocking traffic to the deployment services", goStrings.Join(leaderIDs, ", "))
}
}

status := r.context.GetStatus()
for _, m := range status.Members.Single {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.Pod.GetName())
if !exist {
continue
}

labels := pod.GetLabels()
if enabled && m.ID == leaderID {
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; ok && value == "true" {
// Single server is available, and it has a leader label.
continue
}

labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
// Single server is not available, and it does not have a leader label.
continue
}

delete(labels, k8sutil.LabelKeyArangoLeader)
}

err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
return errors.WithMessagef(err, "unable to change leader label for pod %s", m.Pod.GetName())
}
changed = true
}

if changed {
return errors.Reconcile()
}

return r.ensureSingleServerLeaderServices(ctx, cachedStatus)
}

// ensureSingleServerLeaderServices adds a leadership label to deployment service and external deployment service.
func (r *Resources) ensureSingleServerLeaderServices(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
// Add a leadership label to deployment service and external deployment service.
deploymentName := r.context.GetAPIObject().GetName()
changed := false
services := []string{
k8sutil.CreateDatabaseClientServiceName(deploymentName),
k8sutil.CreateDatabaseExternalAccessServiceName(deploymentName),
}

enabled := features.FailoverLeadership().Enabled()
for _, svcName := range services {
svc, exists := cachedStatus.Service().V1().GetSimple(svcName)
if !exists {
// It will be created later with a leadership label.
continue
}
selector := svc.Spec.Selector
if enabled {
if v, ok := selector[k8sutil.LabelKeyArangoLeader]; ok && v == "true" {
// It is already OK.
continue
}

selector = addLabel(selector, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := selector[k8sutil.LabelKeyArangoLeader]; !ok {
// Service does not have a leader label, and it should not have.
continue
}

delete(selector, k8sutil.LabelKeyArangoLeader)
}

parser := patch.Patch([]patch.Item{patch.ItemReplace(patch.NewPath("spec", "selector"), selector)})
data, err := parser.Marshal()
if err != nil {
return errors.WithMessagef(err, "unable to marshal labels for service %s", svcName)
}

err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := cachedStatus.ServicesModInterface().V1().Patch(ctxChild, svcName, types.JSONPatchType, data, meta.PatchOptions{})
return err
})
if err != nil {
return errors.WithMessagef(err, "unable to patch labels for service %s", svcName)
}
changed = true
}

if changed {
return errors.Reconcile()
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/util/k8sutil/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func CreateExporterClientServiceName(deploymentName string) string {

// CreateAgentLeaderServiceName returns the name of the service used to access a leader agent.
func CreateAgentLeaderServiceName(deploymentName string) string {
return deploymentName + "-agent"
return deploymentName + "-agent-leader"
}

// CreateExporterService
Expand Down