Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Add ArangoClusterSynchronization Operator
- Update licenses
- Fix restart procedure in case of failing members
- Fix status propagation race condition

## [1.2.6](https://github.com/arangodb/kube-arangodb/tree/1.2.6) (2021-12-15)
- Add ArangoBackup backoff functionality
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/deployment/v1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (c ConditionType) String() string {
const (
// ConditionTypeReady indicates that the member or entire deployment is ready and running normally.
ConditionTypeReady ConditionType = "Ready"
// ConditionTypeStarted indicates that the member was ready at least once.
ConditionTypeStarted ConditionType = "Started"
// ConditionTypeServing indicates that the member core services are running.
ConditionTypeServing ConditionType = "Serving"
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
ConditionTypeTerminated ConditionType = "Terminated"
// ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once.
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/deployment/v1/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ func (l MemberStatusList) MembersReady() int {
return readyCount
}

// MembersServing returns the number of members that are in the Serving state.
func (l MemberStatusList) MembersServing() int {
servingCount := 0
for _, x := range l {
if x.Conditions.IsTrue(ConditionTypeServing) {
servingCount++
}
}
return servingCount
}

// AllMembersServing returns the true if all members are in the Serving state.
func (l MemberStatusList) AllMembersServing() bool {
return len(l) == l.MembersServing()
}

// AllMembersReady returns the true if all members are in the Ready state.
func (l MemberStatusList) AllMembersReady() bool {
return len(l) == l.MembersReady()
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/deployment/v2alpha1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (c ConditionType) String() string {
const (
// ConditionTypeReady indicates that the member or entire deployment is ready and running normally.
ConditionTypeReady ConditionType = "Ready"
// ConditionTypeStarted indicates that the member was ready at least once.
ConditionTypeStarted ConditionType = "Started"
// ConditionTypeServing indicates that the member core services are running.
ConditionTypeServing ConditionType = "Serving"
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
ConditionTypeTerminated ConditionType = "Terminated"
// ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once.
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/deployment/v2alpha1/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ func (l MemberStatusList) MembersReady() int {
return readyCount
}

// MembersServing returns the number of members that are in the Serving state.
func (l MemberStatusList) MembersServing() int {
servingCount := 0
for _, x := range l {
if x.Conditions.IsTrue(ConditionTypeServing) {
servingCount++
}
}
return servingCount
}

// AllMembersServing returns the true if all members are in the Serving state.
func (l MemberStatusList) AllMembersServing() bool {
return len(l) == l.MembersServing()
}

// AllMembersReady returns the true if all members are in the Ready state.
func (l MemberStatusList) AllMembersReady() bool {
return len(l) == l.MembersReady()
Expand Down
50 changes: 19 additions & 31 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
"github.com/arangodb/arangosync-client/client"
"github.com/rs/zerolog"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -381,7 +381,7 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(ctx context.Context) err
// Get the most recent version of the deployment from the API server
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), metav1.GetOptions{})
current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), meta.GetOptions{})
if err != nil {
log.Debug().Err(err).Msg("Failed to get current version of deployment from API server")
if k8sutil.IsNotFound(err) {
Expand Down Expand Up @@ -465,21 +465,22 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error {
}

// Send update to API server
ns := d.apiObject.GetNamespace()
depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns)
update := d.apiObject.DeepCopy()
depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.GetNamespace())
attempt := 0
for {
attempt++
update.Status = d.status.last
if update.GetDeletionTimestamp() == nil {
ensureFinalizers(update)
if d.apiObject.GetDeletionTimestamp() == nil {
ensureFinalizers(d.apiObject)
}

var newAPIObject *api.ArangoDeployment
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
var err error
newAPIObject, err = depls.Update(ctxChild, update, metav1.UpdateOptions{})
p, err := patch.NewPatch(patch.ItemReplace(patch.NewPath("status"), d.status.last)).Marshal()
if err != nil {
return err
}

newAPIObject, err = depls.Patch(ctxChild, d.GetName(), types.JSONPatchType, p, meta.PatchOptions{})

return err
})
Expand All @@ -488,21 +489,8 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error {
d.apiObject = newAPIObject
return nil
}
if attempt < 10 && k8sutil.IsConflict(err) {
// API object may have been changed already,
// Reload api object and try again
var current *api.ArangoDeployment

err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
var err error
current, err = depls.Get(ctxChild, update.GetName(), metav1.GetOptions{})

return err
})
if err == nil {
update = current.DeepCopy()
continue
}
if attempt < 10 {
continue
}
if err != nil {
d.deps.Log.Debug().Err(err).Msg("failed to patch ArangoDeployment status")
Expand Down Expand Up @@ -535,7 +523,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe
var newAPIObject *api.ArangoDeployment
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
var err error
newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, metav1.UpdateOptions{})
newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, meta.UpdateOptions{})

return err
})
Expand All @@ -551,7 +539,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe

err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
var err error
current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), metav1.GetOptions{})
current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), meta.GetOptions{})

return err
})
Expand All @@ -568,7 +556,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe
}

// isOwnerOf returns true if the given object belong to this deployment.
func (d *Deployment) isOwnerOf(obj metav1.Object) bool {
func (d *Deployment) isOwnerOf(obj meta.Object) bool {
ownerRefs := obj.GetOwnerReferences()
if len(ownerRefs) < 1 {
return false
Expand All @@ -583,9 +571,9 @@ func (d *Deployment) isOwnerOf(obj metav1.Object) bool {
func (d *Deployment) lookForServiceMonitorCRD() {
var err error
if d.GetScope().IsNamespaced() {
_, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), metav1.ListOptions{})
_, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), meta.ListOptions{})
} else {
_, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", metav1.GetOptions{})
_, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", meta.GetOptions{})
}
log := d.deps.Log
log.Debug().Msgf("Looking for ServiceMonitor CRD...")
Expand Down Expand Up @@ -637,7 +625,7 @@ func (d *Deployment) ApplyPatch(ctx context.Context, p ...patch.Item) error {

ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
depl, err := c.Patch(ctxChild, d.apiObject.GetName(), types.JSONPatchType, data, metav1.PatchOptions{})
depl, err := c.Patch(ctxChild, d.apiObject.GetName(), types.JSONPatchType, data, meta.PatchOptions{})
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/deployment/member/phase_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var phase = phaseMap{
func removeMemberConditionsMapFunc(m *api.MemberStatus) {
// Clean conditions
m.Conditions.Remove(api.ConditionTypeReady)
m.Conditions.Remove(api.ConditionTypeStarted)
m.Conditions.Remove(api.ConditionTypeTerminated)
m.Conditions.Remove(api.ConditionTypeTerminating)
m.Conditions.Remove(api.ConditionTypeAgentRecoveryNeeded)
Expand Down
13 changes: 9 additions & 4 deletions pkg/deployment/reconcile/plan_builder_rotate_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,23 @@ func groupReadyForRestart(context PlanBuilderContext, spec api.DeploymentSpec, s
return true
}

// If current member is not ready, kill anyway
if !member.Conditions.IsTrue(api.ConditionTypeReady) {
// If current member did not become ready even once. Kill it
if !member.Conditions.IsTrue(api.ConditionTypeStarted) {
return true
}

// If current core containers are dead kill it.
if !member.Conditions.IsTrue(api.ConditionTypeServing) {
return true
}

switch group {
case api.ServerGroupDBServers:
// TODO: Improve shard placement discovery and keep WriteConcern
return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersReady()
return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersServing()
default:
// In case of agents we can kill only one agent at same time
return status.Members.MembersOfGroup(group).AllMembersReady()
return status.Members.MembersOfGroup(group).AllMembersServing()
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/reconcile/plan_builder_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func emptyPlanBuilder(ctx context.Context,

func removeConditionActionV2(actionReason string, conditionType api.ConditionType) api.Action {
return api.NewAction(api.ActionTypeSetConditionV2, api.ServerGroupUnknown, "", actionReason).
AddParam(setConditionActionV2KeyAction, setConditionActionV2KeyTypeRemove).
AddParam(setConditionActionV2KeyType, string(conditionType))
AddParam(setConditionActionV2KeyAction, string(conditionType)).
AddParam(setConditionActionV2KeyType, setConditionActionV2KeyTypeRemove)
}

func updateConditionActionV2(actionReason string, conditionType api.ConditionType, status bool, reason, message, hash string) api.Action {
Expand Down
33 changes: 28 additions & 5 deletions pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"strings"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util"
Expand Down Expand Up @@ -216,10 +218,12 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
}
// End of Topology labels

if k8sutil.AreContainersReady(pod, coreContainers) {
if k8sutil.IsPodReady(pod) && k8sutil.AreContainersReady(pod, coreContainers) {
// Pod is now ready
if memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", "") {
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready & Initialised to true")
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", ""),
memberStatus.Conditions.Update(api.ConditionTypeStarted, true, "Pod Started", ""),
memberStatus.Conditions.Update(api.ConditionTypeServing, true, "Pod Serving", "")) {
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready, Started & Serving to true")

if status.Topology.IsTopologyOwned(memberStatus.Topology) {
nodes, ok := cachedStatus.GetNodes()
Expand All @@ -238,10 +242,19 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
updateMemberStatusNeeded = true
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
}
} else if k8sutil.AreContainersReady(pod, coreContainers) {
// Pod is not ready, but core containers are fine
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", ""),
memberStatus.Conditions.Update(api.ConditionTypeServing, true, "Pod is still serving", "")) {
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready to false, while all core containers are ready")
updateMemberStatusNeeded = true
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
}
} else {
// Pod is not ready
if memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", "") {
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready to false")
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", ""),
memberStatus.Conditions.Update(api.ConditionTypeServing, false, "Pod Core containers are not ready", strings.Join(coreContainers, ", "))) {
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready & Serving to false")
updateMemberStatusNeeded = true
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
}
Expand Down Expand Up @@ -394,3 +407,13 @@ func removeLabel(labels map[string]string, key string) map[string]string {

return labels
}

func anyOf(bools ...bool) bool {
for _, b := range bools {
if b {
return true
}
}

return false
}
29 changes: 26 additions & 3 deletions pkg/util/k8sutil/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ func AreContainersReady(pod *core.Pod, coreContainers utils.StringList) bool {
// From here on all required containers are running, but unready condition must be checked additionally.
switch condition.Reason {
case ServerContainerConditionContainersNotReady:
if !strings.HasPrefix(condition.Message, ServerContainerConditionPrefix) {
unreadyContainers, ok := extractContainerNamesFromConditionMessage(condition.Message)

if !ok {
return false
}

unreadyContainers := strings.TrimPrefix(condition.Message, ServerContainerConditionPrefix)
for _, c := range coreContainers {
if strings.Contains(unreadyContainers, c) {
if unreadyContainers.Has(c) {
// The container is on the list with unready containers.
return false
}
Expand All @@ -138,6 +139,28 @@ func AreContainersReady(pod *core.Pod, coreContainers utils.StringList) bool {
return false
}

func extractContainerNamesFromConditionMessage(msg string) (utils.StringList, bool) {
if !strings.HasPrefix(msg, ServerContainerConditionPrefix) {
return nil, false
}

unreadyContainers := strings.TrimPrefix(msg, ServerContainerConditionPrefix)

if !strings.HasPrefix(unreadyContainers, "[") {
return nil, false
}

if !strings.HasSuffix(unreadyContainers, "]") {
return nil, false
}

unreadyContainers = strings.TrimPrefix(strings.TrimSuffix(unreadyContainers, "]"), "[")

unreadyContainersList := utils.StringList(strings.Split(unreadyContainers, " "))

return unreadyContainersList, true
}

// GetPodByName returns pod if it exists among the pods' list
// Returns false if not found.
func GetPodByName(pods []core.Pod, podName string) (core.Pod, bool) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/k8sutil/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
"github.com/stretchr/testify/require"
)

// TestIsPodReady tests IsPodReady.
Expand Down Expand Up @@ -318,3 +319,14 @@ func TestIsPodSucceeded(t *testing.T) {
})
}
}

func Test_extractContainerNamesFromConditionMessage(t *testing.T) {
t.Run("Valid name", func(t *testing.T) {
c, ok := extractContainerNamesFromConditionMessage("containers with unready status: [sidecar2 sidecar3]")
require.True(t, ok)
require.Len(t, c, 2)
require.Contains(t, c, "sidecar2")
require.Contains(t, c, "sidecar3")
require.NotContains(t, c, "sidecar")
})
}