Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- (Feature) Add `ArangoBackupPolicy` CRD auto-installer
- (Feature) Add `ArangoJob` CRD auto-installer
- (Feature) Add RestartPolicyAlways to ArangoDeployment in order to restart ArangoDB on failure
- (Feature) Set a leader in active fail-over mode

## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
- (Bugfix) Fix arangosync members state inspection
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Feature-wise production readiness table:
| Operator Internal Metrics Exporter | 1.2.3 | >= 3.7.0 | Community, Enterprise | Production | True | --deployment.feature.metrics-exporter | It is always enabled |
| Operator Ephemeral Volumes | 1.2.2 | >= 3.7.0 | Community, Enterprise | Alpha | False | --deployment.feature.ephemeral-volumes | N/A |
| Pod RestartPolicyAlways | 1.2.13 | >= 3.7.0 | Community, Enterprise | Alpha | False | --deployment.feature.restart-policy-always | N/A |
| Active fail-over leadership | 1.2.13 | >= 3.7.0 | Community, Enterprise | Production | False | --deployment.feature.failover-leadership | |

## Release notes for 0.3.16

Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
nextInterval = inspectNextInterval
hasError = true

d.CreateEvent(k8sutil.NewErrorEvent("Reconcilation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Reconciliation failed", err, d.apiObject))
} else {
nextInterval = minInspectionInterval
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
}

if err := d.resources.EnsureLeader(ctx, d.GetCachedStatus()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Creating agency pod leader failed")
return minInspectionInterval, errors.Wrapf(err, "Creating leaders failed")
}

if err := d.resources.EnsureArangoMembers(ctx, d.GetCachedStatus()); err != nil {
Expand Down
37 changes: 37 additions & 0 deletions pkg/deployment/features/failoverleadership.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package features

func init() {
registerFeature(failoverLeadership)
}

var failoverLeadership = &feature{
name: "failover-leadership",
description: "Support for leadership in fail-over mode",
version: "3.7.0",
enterpriseRequired: false,
enabledByDefault: false,
}

func FailoverLeadership() Feature {
return failoverLeadership
}
21 changes: 14 additions & 7 deletions pkg/deployment/resources/pod_creator_probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import (
"os"
"path/filepath"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

"github.com/arangodb/kube-arangodb/pkg/deployment/features"
core "k8s.io/api/core/v1"

"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/jwt"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/pod"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/probes"
core "k8s.io/api/core/v1"
)

type Probe interface {
Expand Down Expand Up @@ -400,9 +400,13 @@ func (r *Resources) probeBuilderReadinessCoreSelect() probeBuilder {
return r.probeBuilderReadinessCore
}

func (r *Resources) probeBuilderReadinessCoreOperator(spec api.DeploymentSpec, group api.ServerGroup, version driver.Version) (Probe, error) {
func (r *Resources) probeBuilderReadinessCoreOperator(spec api.DeploymentSpec, _ api.ServerGroup, _ driver.Version) (Probe, error) {
// /_admin/server/availability is the way to go, it is available since 3.3.9
args, err := r.probeCommand(spec, "/_admin/server/availability")
path := "/_admin/server/availability"
if features.FailoverLeadership().Enabled() && r.context.GetMode() == api.DeploymentModeActiveFailover {
path = "/_api/version"
}
args, err := r.probeCommand(spec, path)
if err != nil {
return nil, err
}
Expand All @@ -414,9 +418,12 @@ func (r *Resources) probeBuilderReadinessCoreOperator(spec api.DeploymentSpec, g
}, nil
}

func (r *Resources) probeBuilderReadinessCore(spec api.DeploymentSpec, group api.ServerGroup, version driver.Version) (Probe, error) {
func (r *Resources) probeBuilderReadinessCore(spec api.DeploymentSpec, _ api.ServerGroup, _ driver.Version) (Probe, error) {
// /_admin/server/availability is the way to go, it is available since 3.3.9
localPath := "/_admin/server/availability"
if features.FailoverLeadership().Enabled() && r.context.GetMode() == api.DeploymentModeActiveFailover {
localPath = "/_api/version"
}

authorization := ""
if spec.IsAuthenticated() {
Expand Down
215 changes: 213 additions & 2 deletions pkg/deployment/resources/pod_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ package resources

import (
"context"
"net/http"
"sync"

meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/arangodb/go-driver"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
Expand Down Expand Up @@ -125,8 +131,8 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
if s, ok := cachedStatus.Service().V1().GetSimple(leaderAgentSvcName); ok {
if err, adjusted := r.adjustService(ctx, s, shared.ArangoPort, selector); err == nil {
if !adjusted {
// The service is not changed.
return nil
// The service is not changed, so single server leader can be set.
return r.ensureSingleServerLeader(ctx, cachedStatus)
}

return errors.Reconcile()
Expand All @@ -149,3 +155,208 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
// The service has been created.
return errors.Reconcile()
}

// getSingleServerLeaderID returns id of a single server leader.
func (r *Resources) getSingleServerLeaderID(ctx context.Context) (string, error) {
status, _ := r.context.GetStatus()
var mutex sync.Mutex
var leaderID string
var anyError error

dbServers := func(group api.ServerGroup, list api.MemberStatusList) error {
if len(list) == 0 {
return nil
}
ctxCancel, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()

// Fetch availability of each single server.
var wg sync.WaitGroup
wg.Add(len(list))
for _, m := range list {
go func(id string) {
defer wg.Done()
err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error {
c, err := r.context.GetServerClient(ctxChild, api.ServerGroupSingle, id)
if err != nil {
return err
}

if available, err := isServerAvailable(ctxChild, c); err != nil {
return err
} else if !available {
return errors.New("not available")
}

// Other requests can be interrupted, because a leader is known already.
cancel()
mutex.Lock()
leaderID = id
mutex.Unlock()
return nil
})

if err != nil {
mutex.Lock()
anyError = err
mutex.Unlock()
}
}(m.ID)
}
wg.Wait()

return nil
}

if err := status.Members.ForeachServerInGroups(dbServers, api.ServerGroupSingle); err != nil {
return "", err
}

if len(leaderID) > 0 {
return leaderID, nil
}

if anyError != nil {
return "", errors.WithMessagef(anyError, "unable to get a leader")
}

return "", errors.New("unable to get a leader")
}

// setSingleServerLeadership adds or removes leadership label on a single server pod.
func (r *Resources) ensureSingleServerLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
changed := false

enabled := features.FailoverLeadership().Enabled()
var leaderID string
if enabled {
var err error
if leaderID, err = r.getSingleServerLeaderID(ctx); err != nil {
return err
}
}

singleServers := func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName)
if !exist {
continue
}

labels := pod.GetLabels()
if enabled && m.ID == leaderID {
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; ok && value == "true" {
// Single server is available, and it has a leader label.
continue
}

labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
// Single server is not available, and it does not have a leader label.
continue
}

delete(labels, k8sutil.LabelKeyArangoLeader)
}

err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
return errors.WithMessagef(err, "unable to change leader label for pod %s", m.PodName)
}
changed = true
}

return nil
}

status, _ := r.context.GetStatus()
if err := status.Members.ForeachServerInGroups(singleServers, api.ServerGroupSingle); err != nil {
return err
}

if changed {
return errors.Reconcile()
}

return r.ensureSingleServerLeaderServices(ctx, cachedStatus)
}

// ensureSingleServerLeaderServices adds a leadership label to deployment service and external deployment service.
func (r *Resources) ensureSingleServerLeaderServices(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
// Add a leadership label to deployment service and external deployment service.
deploymentName := r.context.GetAPIObject().GetName()
changed := false
services := []string{
k8sutil.CreateDatabaseClientServiceName(deploymentName),
k8sutil.CreateDatabaseExternalAccessServiceName(deploymentName),
}

enabled := features.FailoverLeadership().Enabled()
for _, svcName := range services {
svc, exists := cachedStatus.Service().V1().GetSimple(svcName)
if !exists {
// It will be created later with a leadership label.
continue
}
selector := svc.Spec.Selector
if enabled {
if v, ok := selector[k8sutil.LabelKeyArangoLeader]; ok && v == "true" {
// It is already OK.
continue
}

selector = addLabel(selector, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := selector[k8sutil.LabelKeyArangoLeader]; !ok {
// Service does not have a leader label, and it should not have.
continue
}

delete(selector, k8sutil.LabelKeyArangoLeader)
}

parser := patch.Patch([]patch.Item{patch.ItemReplace(patch.NewPath("spec", "selector"), selector)})
data, err := parser.Marshal()
if err != nil {
return errors.WithMessagef(err, "unable to marshal labels for service %s", svcName)
}

err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := cachedStatus.ServicesModInterface().V1().Patch(ctxChild, svcName, types.JSONPatchType, data, meta.PatchOptions{})
return err
})
if err != nil {
return errors.WithMessagef(err, "unable to patch labels for service %s", svcName)
}
changed = true
}

if changed {
return errors.Reconcile()
}

return nil
}

// isServerAvailable returns true when server is available.
// In active fail-over mode one of the server should be available.
func isServerAvailable(ctx context.Context, c driver.Client) (bool, error) {
req, err := c.Connection().NewRequest("GET", "_admin/server/availability")
if err != nil {
return false, errors.WithStack(err)
}

resp, err := c.Connection().Do(ctx, req)
if err != nil {
return false, errors.WithStack(err)
}

if err := resp.CheckStatus(http.StatusOK, http.StatusServiceUnavailable); err != nil {
return false, errors.WithStack(err)
}

return resp.StatusCode() == http.StatusOK, nil
}
Loading