diff --git a/CHANGELOG.md b/CHANGELOG.md index 950c7828f..166b40bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - Add ArangoClusterSynchronization Operator - Update licenses - Fix restart procedure in case of failing members +- Fix status propagation race condition ## [1.2.6](https://github.com/arangodb/kube-arangodb/tree/1.2.6) (2021-12-15) - Add ArangoBackup backoff functionality diff --git a/pkg/apis/deployment/v1/conditions.go b/pkg/apis/deployment/v1/conditions.go index a439a807c..15a00da54 100644 --- a/pkg/apis/deployment/v1/conditions.go +++ b/pkg/apis/deployment/v1/conditions.go @@ -36,6 +36,10 @@ func (c ConditionType) String() string { const ( // ConditionTypeReady indicates that the member or entire deployment is ready and running normally. ConditionTypeReady ConditionType = "Ready" + // ConditionTypeStarted indicates that the member was ready at least once. + ConditionTypeStarted ConditionType = "Started" + // ConditionTypeServing indicates that the member core services are running. + ConditionTypeServing ConditionType = "Serving" // ConditionTypeTerminated indicates that the member has terminated and will not restart. ConditionTypeTerminated ConditionType = "Terminated" // ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once. diff --git a/pkg/apis/deployment/v1/member_status_list.go b/pkg/apis/deployment/v1/member_status_list.go index e6622b41a..544433429 100644 --- a/pkg/apis/deployment/v1/member_status_list.go +++ b/pkg/apis/deployment/v1/member_status_list.go @@ -205,6 +205,22 @@ func (l MemberStatusList) MembersReady() int { return readyCount } +// MembersServing returns the number of members that are in the Serving state. +func (l MemberStatusList) MembersServing() int { + servingCount := 0 + for _, x := range l { + if x.Conditions.IsTrue(ConditionTypeServing) { + servingCount++ + } + } + return servingCount +} + +// AllMembersServing returns the true if all members are in the Serving state. +func (l MemberStatusList) AllMembersServing() bool { + return len(l) == l.MembersServing() +} + // AllMembersReady returns the true if all members are in the Ready state. func (l MemberStatusList) AllMembersReady() bool { return len(l) == l.MembersReady() diff --git a/pkg/apis/deployment/v2alpha1/conditions.go b/pkg/apis/deployment/v2alpha1/conditions.go index 5e3bdf91b..3a3dce052 100644 --- a/pkg/apis/deployment/v2alpha1/conditions.go +++ b/pkg/apis/deployment/v2alpha1/conditions.go @@ -36,6 +36,10 @@ func (c ConditionType) String() string { const ( // ConditionTypeReady indicates that the member or entire deployment is ready and running normally. ConditionTypeReady ConditionType = "Ready" + // ConditionTypeStarted indicates that the member was ready at least once. + ConditionTypeStarted ConditionType = "Started" + // ConditionTypeServing indicates that the member core services are running. + ConditionTypeServing ConditionType = "Serving" // ConditionTypeTerminated indicates that the member has terminated and will not restart. ConditionTypeTerminated ConditionType = "Terminated" // ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once. diff --git a/pkg/apis/deployment/v2alpha1/member_status_list.go b/pkg/apis/deployment/v2alpha1/member_status_list.go index bd1d1e256..af1a38851 100644 --- a/pkg/apis/deployment/v2alpha1/member_status_list.go +++ b/pkg/apis/deployment/v2alpha1/member_status_list.go @@ -205,6 +205,22 @@ func (l MemberStatusList) MembersReady() int { return readyCount } +// MembersServing returns the number of members that are in the Serving state. +func (l MemberStatusList) MembersServing() int { + servingCount := 0 + for _, x := range l { + if x.Conditions.IsTrue(ConditionTypeServing) { + servingCount++ + } + } + return servingCount +} + +// AllMembersServing returns the true if all members are in the Serving state. +func (l MemberStatusList) AllMembersServing() bool { + return len(l) == l.MembersServing() +} + // AllMembersReady returns the true if all members are in the Ready state. func (l MemberStatusList) AllMembersReady() bool { return len(l) == l.MembersReady() diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index e5ce2a8ae..90cc80811 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -52,7 +52,7 @@ import ( "github.com/arangodb/arangosync-client/client" "github.com/rs/zerolog" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -381,7 +381,7 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(ctx context.Context) err // Get the most recent version of the deployment from the API server ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), metav1.GetOptions{}) + current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), meta.GetOptions{}) if err != nil { log.Debug().Err(err).Msg("Failed to get current version of deployment from API server") if k8sutil.IsNotFound(err) { @@ -465,21 +465,22 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error { } // Send update to API server - ns := d.apiObject.GetNamespace() - depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns) - update := d.apiObject.DeepCopy() + depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.GetNamespace()) attempt := 0 for { attempt++ - update.Status = d.status.last - if update.GetDeletionTimestamp() == nil { - ensureFinalizers(update) + if d.apiObject.GetDeletionTimestamp() == nil { + ensureFinalizers(d.apiObject) } var newAPIObject *api.ArangoDeployment err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { - var err error - newAPIObject, err = depls.Update(ctxChild, update, metav1.UpdateOptions{}) + p, err := patch.NewPatch(patch.ItemReplace(patch.NewPath("status"), d.status.last)).Marshal() + if err != nil { + return err + } + + newAPIObject, err = depls.Patch(ctxChild, d.GetName(), types.JSONPatchType, p, meta.PatchOptions{}) return err }) @@ -488,21 +489,8 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error { d.apiObject = newAPIObject return nil } - if attempt < 10 && k8sutil.IsConflict(err) { - // API object may have been changed already, - // Reload api object and try again - var current *api.ArangoDeployment - - err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { - var err error - current, err = depls.Get(ctxChild, update.GetName(), metav1.GetOptions{}) - - return err - }) - if err == nil { - update = current.DeepCopy() - continue - } + if attempt < 10 { + continue } if err != nil { d.deps.Log.Debug().Err(err).Msg("failed to patch ArangoDeployment status") @@ -535,7 +523,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe var newAPIObject *api.ArangoDeployment err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { var err error - newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, metav1.UpdateOptions{}) + newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, meta.UpdateOptions{}) return err }) @@ -551,7 +539,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { var err error - current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), metav1.GetOptions{}) + current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), meta.GetOptions{}) return err }) @@ -568,7 +556,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe } // isOwnerOf returns true if the given object belong to this deployment. -func (d *Deployment) isOwnerOf(obj metav1.Object) bool { +func (d *Deployment) isOwnerOf(obj meta.Object) bool { ownerRefs := obj.GetOwnerReferences() if len(ownerRefs) < 1 { return false @@ -583,9 +571,9 @@ func (d *Deployment) isOwnerOf(obj metav1.Object) bool { func (d *Deployment) lookForServiceMonitorCRD() { var err error if d.GetScope().IsNamespaced() { - _, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), metav1.ListOptions{}) + _, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), meta.ListOptions{}) } else { - _, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", metav1.GetOptions{}) + _, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", meta.GetOptions{}) } log := d.deps.Log log.Debug().Msgf("Looking for ServiceMonitor CRD...") @@ -637,7 +625,7 @@ func (d *Deployment) ApplyPatch(ctx context.Context, p ...patch.Item) error { ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - depl, err := c.Patch(ctxChild, d.apiObject.GetName(), types.JSONPatchType, data, metav1.PatchOptions{}) + depl, err := c.Patch(ctxChild, d.apiObject.GetName(), types.JSONPatchType, data, meta.PatchOptions{}) if err != nil { return err } diff --git a/pkg/deployment/member/phase_updates.go b/pkg/deployment/member/phase_updates.go index a0f3f097e..aa5adf38e 100644 --- a/pkg/deployment/member/phase_updates.go +++ b/pkg/deployment/member/phase_updates.go @@ -67,6 +67,7 @@ var phase = phaseMap{ func removeMemberConditionsMapFunc(m *api.MemberStatus) { // Clean conditions m.Conditions.Remove(api.ConditionTypeReady) + m.Conditions.Remove(api.ConditionTypeStarted) m.Conditions.Remove(api.ConditionTypeTerminated) m.Conditions.Remove(api.ConditionTypeTerminating) m.Conditions.Remove(api.ConditionTypeAgentRecoveryNeeded) diff --git a/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go b/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go index 33b59b08e..65f95f2b1 100644 --- a/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go +++ b/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go @@ -384,18 +384,23 @@ func groupReadyForRestart(context PlanBuilderContext, spec api.DeploymentSpec, s return true } - // If current member is not ready, kill anyway - if !member.Conditions.IsTrue(api.ConditionTypeReady) { + // If current member did not become ready even once. Kill it + if !member.Conditions.IsTrue(api.ConditionTypeStarted) { + return true + } + + // If current core containers are dead kill it. + if !member.Conditions.IsTrue(api.ConditionTypeServing) { return true } switch group { case api.ServerGroupDBServers: // TODO: Improve shard placement discovery and keep WriteConcern - return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersReady() + return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersServing() default: // In case of agents we can kill only one agent at same time - return status.Members.MembersOfGroup(group).AllMembersReady() + return status.Members.MembersOfGroup(group).AllMembersServing() } } diff --git a/pkg/deployment/reconcile/plan_builder_utils.go b/pkg/deployment/reconcile/plan_builder_utils.go index 11a095e7b..d7b0ac01d 100644 --- a/pkg/deployment/reconcile/plan_builder_utils.go +++ b/pkg/deployment/reconcile/plan_builder_utils.go @@ -60,8 +60,8 @@ func emptyPlanBuilder(ctx context.Context, func removeConditionActionV2(actionReason string, conditionType api.ConditionType) api.Action { return api.NewAction(api.ActionTypeSetConditionV2, api.ServerGroupUnknown, "", actionReason). - AddParam(setConditionActionV2KeyAction, setConditionActionV2KeyTypeRemove). - AddParam(setConditionActionV2KeyType, string(conditionType)) + AddParam(setConditionActionV2KeyAction, string(conditionType)). + AddParam(setConditionActionV2KeyType, setConditionActionV2KeyTypeRemove) } func updateConditionActionV2(actionReason string, conditionType api.ConditionType, status bool, reason, message, hash string) api.Action { diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index b46f5867e..673bf8d39 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -35,6 +35,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "strings" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/metrics" "github.com/arangodb/kube-arangodb/pkg/util" @@ -216,10 +218,12 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter } // End of Topology labels - if k8sutil.AreContainersReady(pod, coreContainers) { + if k8sutil.IsPodReady(pod) && k8sutil.AreContainersReady(pod, coreContainers) { // Pod is now ready - if memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", "") { - log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready & Initialised to true") + if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", ""), + memberStatus.Conditions.Update(api.ConditionTypeStarted, true, "Pod Started", ""), + memberStatus.Conditions.Update(api.ConditionTypeServing, true, "Pod Serving", "")) { + log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready, Started & Serving to true") if status.Topology.IsTopologyOwned(memberStatus.Topology) { nodes, ok := cachedStatus.GetNodes() @@ -238,10 +242,19 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter updateMemberStatusNeeded = true nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval) } + } else if k8sutil.AreContainersReady(pod, coreContainers) { + // Pod is not ready, but core containers are fine + if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", ""), + memberStatus.Conditions.Update(api.ConditionTypeServing, true, "Pod is still serving", "")) { + log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready to false, while all core containers are ready") + updateMemberStatusNeeded = true + nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval) + } } else { // Pod is not ready - if memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", "") { - log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready to false") + if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", ""), + memberStatus.Conditions.Update(api.ConditionTypeServing, false, "Pod Core containers are not ready", strings.Join(coreContainers, ", "))) { + log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready & Serving to false") updateMemberStatusNeeded = true nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval) } @@ -394,3 +407,13 @@ func removeLabel(labels map[string]string, key string) map[string]string { return labels } + +func anyOf(bools ...bool) bool { + for _, b := range bools { + if b { + return true + } + } + + return false +} diff --git a/pkg/util/k8sutil/pods.go b/pkg/util/k8sutil/pods.go index ba2503be5..d60d83306 100644 --- a/pkg/util/k8sutil/pods.go +++ b/pkg/util/k8sutil/pods.go @@ -120,13 +120,14 @@ func AreContainersReady(pod *core.Pod, coreContainers utils.StringList) bool { // From here on all required containers are running, but unready condition must be checked additionally. switch condition.Reason { case ServerContainerConditionContainersNotReady: - if !strings.HasPrefix(condition.Message, ServerContainerConditionPrefix) { + unreadyContainers, ok := extractContainerNamesFromConditionMessage(condition.Message) + + if !ok { return false } - unreadyContainers := strings.TrimPrefix(condition.Message, ServerContainerConditionPrefix) for _, c := range coreContainers { - if strings.Contains(unreadyContainers, c) { + if unreadyContainers.Has(c) { // The container is on the list with unready containers. return false } @@ -138,6 +139,28 @@ func AreContainersReady(pod *core.Pod, coreContainers utils.StringList) bool { return false } +func extractContainerNamesFromConditionMessage(msg string) (utils.StringList, bool) { + if !strings.HasPrefix(msg, ServerContainerConditionPrefix) { + return nil, false + } + + unreadyContainers := strings.TrimPrefix(msg, ServerContainerConditionPrefix) + + if !strings.HasPrefix(unreadyContainers, "[") { + return nil, false + } + + if !strings.HasSuffix(unreadyContainers, "]") { + return nil, false + } + + unreadyContainers = strings.TrimPrefix(strings.TrimSuffix(unreadyContainers, "]"), "[") + + unreadyContainersList := utils.StringList(strings.Split(unreadyContainers, " ")) + + return unreadyContainersList, true +} + // GetPodByName returns pod if it exists among the pods' list // Returns false if not found. func GetPodByName(pods []core.Pod, podName string) (core.Pod, bool) { diff --git a/pkg/util/k8sutil/pods_test.go b/pkg/util/k8sutil/pods_test.go index 53aefedaa..18ba296a6 100644 --- a/pkg/util/k8sutil/pods_test.go +++ b/pkg/util/k8sutil/pods_test.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/arangodb/kube-arangodb/pkg/handlers/utils" + "github.com/stretchr/testify/require" ) // TestIsPodReady tests IsPodReady. @@ -318,3 +319,14 @@ func TestIsPodSucceeded(t *testing.T) { }) } } + +func Test_extractContainerNamesFromConditionMessage(t *testing.T) { + t.Run("Valid name", func(t *testing.T) { + c, ok := extractContainerNamesFromConditionMessage("containers with unready status: [sidecar2 sidecar3]") + require.True(t, ok) + require.Len(t, c, 2) + require.Contains(t, c, "sidecar2") + require.Contains(t, c, "sidecar3") + require.NotContains(t, c, "sidecar") + }) +}