From 4fb25246ab910b7f0a02592feec5800987e92ede Mon Sep 17 00:00:00 2001 From: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> Date: Mon, 22 Mar 2021 14:54:05 +0100 Subject: [PATCH] Add member shutdown option --- .../v1/deployment_status_members.go | 16 ++ pkg/apis/deployment/v1/member_phase.go | 5 + pkg/apis/deployment/v1/server_group_spec.go | 33 ++++ .../deployment/v1/zz_generated.deepcopy.go | 10 ++ .../deployment/v2alpha1/server_group_spec.go | 31 ++++ .../v2alpha1/zz_generated.deepcopy.go | 5 + pkg/deployment/context_impl.go | 5 + pkg/deployment/reconcile/action.go | 10 +- pkg/deployment/reconcile/action_context.go | 11 ++ .../reconcile/action_rotate_member.go | 49 ++---- .../reconcile/action_rotate_start_member.go | 50 ++---- .../reconcile/action_shutdown_member.go | 50 ++---- .../reconcile/action_upgrade_member.go | 49 ++---- pkg/deployment/reconcile/context.go | 2 + pkg/deployment/reconcile/helper_shutdown.go | 165 ++++++++++++++++++ pkg/deployment/reconcile/plan_builder_test.go | 17 ++ pkg/deployment/resources/pod_creator.go | 29 +-- pkg/deployment/resources/pod_finalizers.go | 19 ++ pkg/deployment/resources/pod_inspector.go | 6 +- pkg/util/constants/constants.go | 1 + 20 files changed, 408 insertions(+), 155 deletions(-) create mode 100644 pkg/deployment/reconcile/helper_shutdown.go diff --git a/pkg/apis/deployment/v1/deployment_status_members.go b/pkg/apis/deployment/v1/deployment_status_members.go index 7ddebd7aa..7f4f59f9e 100644 --- a/pkg/apis/deployment/v1/deployment_status_members.go +++ b/pkg/apis/deployment/v1/deployment_status_members.go @@ -285,3 +285,19 @@ func (ds DeploymentStatusMembers) MembersOfGroup(group ServerGroup) MemberStatus return MemberStatusList{} } } + +// PodNames returns all members pod names +func (ds DeploymentStatusMembers) PodNames() []string { + var n []string + + ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error { + for _, m := range list { + if m.PodName != "" { + n = append(n, m.PodName) + } + } + return nil + }) + + return n +} diff --git a/pkg/apis/deployment/v1/member_phase.go b/pkg/apis/deployment/v1/member_phase.go index 531ef3941..6b0b9c2f4 100644 --- a/pkg/apis/deployment/v1/member_phase.go +++ b/pkg/apis/deployment/v1/member_phase.go @@ -57,3 +57,8 @@ func (p MemberPhase) IsFailed() bool { func (p MemberPhase) IsCreatedOrDrain() bool { return p == MemberPhaseCreated || p == MemberPhaseDrain } + +// String returns string from MemberPhase +func (p MemberPhase) String() string { + return string(p) +} diff --git a/pkg/apis/deployment/v1/server_group_spec.go b/pkg/apis/deployment/v1/server_group_spec.go index 5c8cb6c4a..86df5b3a7 100644 --- a/pkg/apis/deployment/v1/server_group_spec.go +++ b/pkg/apis/deployment/v1/server_group_spec.go @@ -39,6 +39,35 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) +// ServerGroupShutdownMethod enum of possible shutdown methods +type ServerGroupShutdownMethod string + +// Default return default value for ServerGroupShutdownMethod +func (s *ServerGroupShutdownMethod) Default() ServerGroupShutdownMethod { + return ServerGroupShutdownMethodAPI +} + +// Get return current or default value of ServerGroupShutdownMethod +func (s *ServerGroupShutdownMethod) Get() ServerGroupShutdownMethod { + if s == nil { + return s.Default() + } + + switch t := *s; t { + case ServerGroupShutdownMethodAPI, ServerGroupShutdownMethodDelete: + return t + default: + return s.Default() + } +} + +const ( + // ServerGroupShutdownMethodAPI API Shutdown method + ServerGroupShutdownMethodAPI ServerGroupShutdownMethod = "api" + // ServerGroupShutdownMethodDelete Pod Delete shutdown method + ServerGroupShutdownMethodDelete ServerGroupShutdownMethod = "delete" +) + // ServerGroupSpec contains the specification for all servers in a specific group (e.g. all agents) type ServerGroupSpec struct { // Count holds the requested number of servers @@ -106,6 +135,10 @@ type ServerGroupSpec struct { ExtendedRotationCheck *bool `json:"extendedRotationCheck,omitempty"` // InitContainers Init containers specification InitContainers *ServerGroupInitContainers `json:"initContainers,omitempty"` + // ShutdownMethod describe procedure of member shutdown taken by Operator + ShutdownMethod *ServerGroupShutdownMethod `json:"shutdownMethod,omitempty"` + // ShutdownDelay define how long operator should delay finalizer removal after shutdown + ShutdownDelay *int `json:"shutdownDelay,omitempty"` } // ServerGroupSpecSecurityContext contains specification for pod security context diff --git a/pkg/apis/deployment/v1/zz_generated.deepcopy.go b/pkg/apis/deployment/v1/zz_generated.deepcopy.go index 87610eab5..722cd6028 100644 --- a/pkg/apis/deployment/v1/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v1/zz_generated.deepcopy.go @@ -1502,6 +1502,16 @@ func (in *ServerGroupSpec) DeepCopyInto(out *ServerGroupSpec) { *out = new(ServerGroupInitContainers) (*in).DeepCopyInto(*out) } + if in.ShutdownMethod != nil { + in, out := &in.ShutdownMethod, &out.ShutdownMethod + *out = new(ServerGroupShutdownMethod) + **out = **in + } + if in.ShutdownDelay != nil { + in, out := &in.ShutdownDelay, &out.ShutdownDelay + *out = new(int) + **out = **in + } return } diff --git a/pkg/apis/deployment/v2alpha1/server_group_spec.go b/pkg/apis/deployment/v2alpha1/server_group_spec.go index 34c2272bd..7731136d4 100644 --- a/pkg/apis/deployment/v2alpha1/server_group_spec.go +++ b/pkg/apis/deployment/v2alpha1/server_group_spec.go @@ -39,6 +39,35 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) +// ServerGroupShutdownMethod enum of possible shutdown methods +type ServerGroupShutdownMethod string + +// Default return default value for ServerGroupShutdownMethod +func (s *ServerGroupShutdownMethod) Default() ServerGroupShutdownMethod { + return ServerGroupShutdownMethodAPI +} + +// Get return current or default value of ServerGroupShutdownMethod +func (s *ServerGroupShutdownMethod) Get() ServerGroupShutdownMethod { + if s == nil { + return s.Default() + } + + switch t := *s; t { + case ServerGroupShutdownMethodAPI, ServerGroupShutdownMethodDelete: + return t + default: + return s.Default() + } +} + +const ( + // ServerGroupShutdownMethodAPI API Shutdown method + ServerGroupShutdownMethodAPI ServerGroupShutdownMethod = "api" + // ServerGroupShutdownMethodDelete Pod Delete shutdown method + ServerGroupShutdownMethodDelete ServerGroupShutdownMethod = "delete" +) + // ServerGroupSpec contains the specification for all servers in a specific group (e.g. all agents) type ServerGroupSpec struct { // Count holds the requested number of servers @@ -106,6 +135,8 @@ type ServerGroupSpec struct { ExtendedRotationCheck *bool `json:"extendedRotationCheck,omitempty"` // InitContainers Init containers specification InitContainers *ServerGroupInitContainers `json:"initContainers,omitempty"` + // ShutdownMethod describe procedure of member shutdown taken by Operator + ShutdownMethod *ServerGroupShutdownMethod `json:"shutdownMethod,omitempty"` } // ServerGroupSpecSecurityContext contains specification for pod security context diff --git a/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go index 799161572..206c36b66 100644 --- a/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go @@ -1502,6 +1502,11 @@ func (in *ServerGroupSpec) DeepCopyInto(out *ServerGroupSpec) { *out = new(ServerGroupInitContainers) (*in).DeepCopyInto(*out) } + if in.ShutdownMethod != nil { + in, out := &in.ShutdownMethod, &out.ShutdownMethod + *out = new(ServerGroupShutdownMethod) + **out = **in + } return } diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 21ba96f6c..2a45dff56 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -374,6 +374,11 @@ func (d *Deployment) CreateMember(group api.ServerGroup, id string) (string, err return id, nil } +// GetPod returns pod. +func (d *Deployment) GetPod(podName string) (*v1.Pod, error) { + return d.deps.KubeCli.CoreV1().Pods(d.GetNamespace()).Get(context.Background(), podName, meta.GetOptions{}) +} + // DeletePod deletes a pod with given name in the namespace // of the deployment. If the pod does not exist, the error is ignored. func (d *Deployment) DeletePod(podName string) error { diff --git a/pkg/deployment/reconcile/action.go b/pkg/deployment/reconcile/action.go index 76b1a7909..af41cb04a 100644 --- a/pkg/deployment/reconcile/action.go +++ b/pkg/deployment/reconcile/action.go @@ -32,8 +32,8 @@ import ( "github.com/rs/zerolog" ) -// Action executes a single Plan item. -type Action interface { +// ActionCore executes a single Plan item. +type ActionCore interface { // Start performs the start of the action. // Returns true if the action is completely finished, false in case // the start time needs to be recorded and a ready condition needs to be checked. @@ -41,6 +41,12 @@ type Action interface { // CheckProgress checks the progress of the action. // Returns: ready, abort, error. CheckProgress(ctx context.Context) (bool, bool, error) +} + +// Action executes a single Plan item. +type Action interface { + ActionCore + // Timeout returns the amount of time after which this action will timeout. Timeout(deploymentSpec api.DeploymentSpec) time.Duration // Return the MemberID used / created in this action diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index c10afc868..9364516c7 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -76,6 +76,8 @@ type ActionContext interface { UpdateMember(member api.MemberStatus) error // RemoveMemberByID removes a member with given id. RemoveMemberByID(id string) error + // GetPod returns pod. + GetPod(podName string) (*v1.Pod, error) // DeletePod deletes a pod with given name in the namespace // of the deployment. If the pod does not exist, the error is ignored. DeletePod(podName string) error @@ -317,6 +319,15 @@ func (ac *actionContext) RemoveMemberByID(id string) error { return nil } +// GetPod returns pod. +func (ac *actionContext) GetPod(podName string) (*v1.Pod, error) { + if pod, err := ac.context.GetPod(podName); err != nil { + return nil, errors.WithStack(err) + } else { + return pod, nil + } +} + // DeletePod deletes a pod with given name in the namespace // of the deployment. If the pod does not exist, the error is ignored. func (ac *actionContext) DeletePod(podName string) error { diff --git a/pkg/deployment/reconcile/action_rotate_member.go b/pkg/deployment/reconcile/action_rotate_member.go index 1271c9897..c9f51f3bc 100644 --- a/pkg/deployment/reconcile/action_rotate_member.go +++ b/pkg/deployment/reconcile/action_rotate_member.go @@ -25,6 +25,8 @@ package reconcile import ( "context" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/errors" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -56,41 +58,17 @@ type actionRotateMember struct { // the start time needs to be recorded and a ready condition needs to be checked. func (a *actionRotateMember) Start(ctx context.Context) (bool, error) { log := a.log - group := a.action.Group m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !ok { log.Error().Msg("No such member") } - // Remove finalizers, so Kubernetes will quickly terminate the pod - if err := a.actionCtx.RemovePodFinalizers(m.PodName); err != nil { - return false, errors.WithStack(err) - } - if group.IsArangod() { - // Invoke shutdown endpoint - c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) - if err != nil { - log.Debug().Err(err).Msg("Failed to create member client") - return false, errors.WithStack(err) - } - removeFromCluster := false - log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") - ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) - defer cancel() - if err := c.Shutdown(ctx, removeFromCluster); err != nil { - // Shutdown failed. Let's check if we're already done - if ready, _, err := a.CheckProgress(ctx); err == nil && ready { - // We're done - return true, nil - } - log.Debug().Err(err).Msg("Failed to shutdown member") - return false, errors.WithStack(err) - } - } else if group.IsArangosync() { - // Terminate pod - if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, errors.WithStack(err) - } + + if ready, err := getShutdownHelper(&a.action, a.actionCtx, a.log).Start(ctx); err != nil { + return false, err + } else if ready { + return true, nil } + // Update status m.Phase = api.MemberPhaseRotating @@ -110,13 +88,18 @@ func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, bool, err log.Error().Msg("No such member") return true, false, nil } - if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { - // Pod is not yet terminated + + if ready, abort, err := getShutdownHelper(&a.action, a.actionCtx, a.log).CheckProgress(ctx); err != nil { + return false, abort, err + } else if !ready { return false, false, nil } + // Pod is terminated, we can now remove it if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, false, errors.WithStack(err) + if !k8sutil.IsNotFound(err) { + return false, false, errors.WithStack(err) + } } // Pod is now gone, update the member status m.Phase = api.MemberPhaseNone diff --git a/pkg/deployment/reconcile/action_rotate_start_member.go b/pkg/deployment/reconcile/action_rotate_start_member.go index 8f7f708fc..73e0188db 100644 --- a/pkg/deployment/reconcile/action_rotate_start_member.go +++ b/pkg/deployment/reconcile/action_rotate_start_member.go @@ -25,6 +25,8 @@ package reconcile import ( "context" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/errors" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -56,41 +58,17 @@ type actionRotateStartMember struct { // the start time needs to be recorded and a ready condition needs to be checked. func (a *actionRotateStartMember) Start(ctx context.Context) (bool, error) { log := a.log - group := a.action.Group m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !ok { log.Error().Msg("No such member") } - // Remove finalizers, so Kubernetes will quickly terminate the pod - if err := a.actionCtx.RemovePodFinalizers(m.PodName); err != nil { - return false, errors.WithStack(err) - } - if group.IsArangod() { - // Invoke shutdown endpoint - c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) - if err != nil { - log.Debug().Err(err).Msg("Failed to create member client") - return false, errors.WithStack(err) - } - removeFromCluster := false - log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") - ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) - defer cancel() - if err := c.Shutdown(ctx, removeFromCluster); err != nil { - // Shutdown failed. Let's check if we're already done - if ready, _, err := a.CheckProgress(ctx); err == nil && ready { - // We're done - return true, nil - } - log.Debug().Err(err).Msg("Failed to shutdown member") - return false, errors.WithStack(err) - } - } else if group.IsArangosync() { - // Terminate pod - if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, errors.WithStack(err) - } + + if ready, err := getShutdownHelper(&a.action, a.actionCtx, a.log).Start(ctx); err != nil { + return false, err + } else if ready { + return true, nil } + // Update status m.Phase = api.MemberPhaseRotateStart @@ -110,13 +88,19 @@ func (a *actionRotateStartMember) CheckProgress(ctx context.Context) (bool, bool log.Error().Msg("No such member") return true, false, nil } - if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { - // Pod is not yet terminated + + if ready, abort, err := getShutdownHelper(&a.action, a.actionCtx, a.log).CheckProgress(ctx); err != nil { + return false, abort, err + } else if !ready { return false, false, nil } + // Pod is terminated, we can now remove it if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, false, errors.WithStack(err) + if !k8sutil.IsNotFound(err) { + return false, false, errors.WithStack(err) + } } + return true, false, nil } diff --git a/pkg/deployment/reconcile/action_shutdown_member.go b/pkg/deployment/reconcile/action_shutdown_member.go index 8c0502ebc..06ff2dde6 100644 --- a/pkg/deployment/reconcile/action_shutdown_member.go +++ b/pkg/deployment/reconcile/action_shutdown_member.go @@ -56,42 +56,18 @@ type actionShutdownMember struct { // the start time needs to be recorded and a ready condition needs to be checked. func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) { log := a.log - group := a.action.Group m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !ok { log.Error().Msg("No such member") return true, nil } - if group.IsArangod() { - // do not try to shut down a pod that is not ready - if !m.Conditions.IsTrue(api.ConditionTypeReady) { - return true, nil - } - // Invoke shutdown endpoint - c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) - if err != nil { - log.Debug().Err(err).Msg("Failed to create member client") - return false, errors.WithStack(err) - } - removeFromCluster := true - log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") - ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) - defer cancel() - if err := c.Shutdown(ctx, removeFromCluster); err != nil { - // Shutdown failed. Let's check if we're already done - if ready, _, err := a.CheckProgress(ctx); err == nil && ready { - // We're done - return true, nil - } - log.Debug().Err(err).Msg("Failed to shutdown member") - return false, errors.WithStack(err) - } - } else if group.IsArangosync() { - // Terminate pod - if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, errors.WithStack(err) - } + + if ready, err := getShutdownHelper(&a.action, a.actionCtx, a.log).Start(ctx); err != nil { + return false, err + } else if ready { + return true, nil } + // Update status m.Phase = api.MemberPhaseShuttingDown if err := a.actionCtx.UpdateMember(m); err != nil { @@ -103,15 +79,9 @@ func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) { // CheckProgress checks the progress of the action. // Returns: ready, abort, error. func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, bool, error) { - m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) - if !found { - // Member not long exists - return true, false, nil - } - if m.Conditions.IsTrue(api.ConditionTypeTerminated) { - // Shutdown completed - return true, false, nil + if ready, abort, err := getShutdownHelper(&a.action, a.actionCtx, a.log).CheckProgress(ctx); err != nil { + return false, abort, err + } else { + return ready, false, nil } - // Member still not shutdown, retry soon - return false, false, nil } diff --git a/pkg/deployment/reconcile/action_upgrade_member.go b/pkg/deployment/reconcile/action_upgrade_member.go index 1a844c07e..d9117f4c4 100644 --- a/pkg/deployment/reconcile/action_upgrade_member.go +++ b/pkg/deployment/reconcile/action_upgrade_member.go @@ -56,7 +56,6 @@ type actionUpgradeMember struct { // the start time needs to be recorded and a ready condition needs to be checked. func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) { log := a.log - group := a.action.Group m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !ok { log.Error().Msg("No such member") @@ -66,38 +65,12 @@ func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) { if err := a.actionCtx.UpdateMember(m); err != nil { return false, errors.WithStack(err) } - if group.IsArangod() { - // Invoke shutdown endpoint - c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) - if err != nil { - log.Debug().Err(err).Msg("Failed to create member client") - return false, errors.WithStack(err) - } - removeFromCluster := false - log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") - ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) - defer cancel() - if err := c.Shutdown(ctx, removeFromCluster); err != nil { - // Shutdown failed. Let's check if we're already done - if ready, _, err := a.CheckProgress(ctx); err == nil && ready { - // We're done - return true, nil - } - log.Debug().Err(err).Msg("Failed to shutdown member") - return false, errors.WithStack(err) - } - } else if group.IsArangosync() { - // Terminate pod - if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, errors.WithStack(err) - } - } - // Update status - m.Phase = api.MemberPhaseRotating // We keep the rotation phase here, since only when a new pod is created, it will get the Upgrading phase. - if err := a.actionCtx.UpdateMember(m); err != nil { - return false, errors.WithStack(err) + + act := actionRotateMember{ + actionImpl: a.actionImpl, } - return false, nil + + return act.Start(ctx) } // CheckProgress checks the progress of the action. @@ -111,6 +84,18 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, bool, er return true, false, nil } + if m.Phase == api.MemberPhaseRotating { + act := actionRotateMember{ + actionImpl: a.actionImpl, + } + + if _, abort, err := act.CheckProgress(ctx); err != nil || abort { + return false, abort, err + } + + return false, true, nil + } + isUpgrading := m.Phase == api.MemberPhaseUpgrading if isUpgrading { diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 6da733f67..333c0c71e 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -71,6 +71,8 @@ type Context interface { // If ID is non-empty, it will be used, otherwise a new ID is created. // Returns ID, error CreateMember(group api.ServerGroup, id string) (string, error) + // GetPod returns pod. + GetPod(podName string) (*v1.Pod, error) // DeletePod deletes a pod with given name in the namespace // of the deployment. If the pod does not exist, the error is ignored. DeletePod(podName string) error diff --git a/pkg/deployment/reconcile/helper_shutdown.go b/pkg/deployment/reconcile/helper_shutdown.go new file mode 100644 index 000000000..1ce2951f0 --- /dev/null +++ b/pkg/deployment/reconcile/helper_shutdown.go @@ -0,0 +1,165 @@ +// +// DISCLAIMER +// +// Copyright 2021 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Adam Janikowski +// + +package reconcile + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/rs/zerolog" +) + +func getShutdownHelper(a *api.Action, ctx ActionContext, log zerolog.Logger) ActionCore { + serverGroup := ctx.GetSpec().GetServerGroupSpec(a.Group) + + switch serverGroup.ShutdownMethod.Get() { + case api.ServerGroupShutdownMethodDelete: + return shutdownHelperDelete{action: a, actionCtx: ctx, log: log} + default: + return shutdownHelperAPI{action: a, actionCtx: ctx, log: log} + } +} + +type shutdownHelperAPI struct { + log zerolog.Logger + action *api.Action + actionCtx ActionContext +} + +func (s shutdownHelperAPI) Start(ctx context.Context) (bool, error) { + log := s.log + + log.Info().Msgf("Using API to shutdown member") + + group := s.action.Group + m, ok := s.actionCtx.GetMemberStatusByID(s.action.MemberID) + if !ok { + log.Error().Msg("No such member") + return true, nil + } + // Remove finalizers, so Kubernetes will quickly terminate the pod + if err := s.actionCtx.RemovePodFinalizers(m.PodName); err != nil { + return false, errors.WithStack(err) + } + if group.IsArangod() { + // Invoke shutdown endpoint + c, err := s.actionCtx.GetServerClient(ctx, group, s.action.MemberID) + if err != nil { + log.Debug().Err(err).Msg("Failed to create member client") + return false, errors.WithStack(err) + } + removeFromCluster := false + log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") + ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) + defer cancel() + if err := c.Shutdown(ctx, removeFromCluster); err != nil { + // Shutdown failed. Let's check if we're already done + if ready, _, err := s.CheckProgress(ctx); err == nil && ready { + // We're done + return true, nil + } + log.Debug().Err(err).Msg("Failed to shutdown member") + return false, errors.WithStack(err) + } + } else if group.IsArangosync() { + // Terminate pod + if err := s.actionCtx.DeletePod(m.PodName); err != nil { + return false, errors.WithStack(err) + } + } + + return false, nil +} + +func (s shutdownHelperAPI) CheckProgress(ctx context.Context) (bool, bool, error) { + // Check that pod is removed + log := s.log + m, found := s.actionCtx.GetMemberStatusByID(s.action.MemberID) + if !found { + log.Error().Msg("No such member") + return true, false, nil + } + if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { + // Pod is not yet terminated + return false, false, nil + } + + return true, false, nil +} + +type shutdownHelperDelete struct { + log zerolog.Logger + action *api.Action + actionCtx ActionContext +} + +func (s shutdownHelperDelete) Start(ctx context.Context) (bool, error) { + log := s.log + + log.Info().Msgf("Using Pod Delete to shutdown member") + + m, ok := s.actionCtx.GetMemberStatusByID(s.action.MemberID) + if !ok { + log.Error().Msg("No such member") + return true, nil + } + + // Terminate pod + if err := s.actionCtx.DeletePod(m.PodName); err != nil { + if !k8sutil.IsNotFound(err) { + return false, errors.WithStack(err) + } + + } + + return false, nil +} + +func (s shutdownHelperDelete) CheckProgress(ctx context.Context) (bool, bool, error) { + // Check that pod is removed + log := s.log + m, found := s.actionCtx.GetMemberStatusByID(s.action.MemberID) + if !found { + log.Error().Msg("No such member") + return true, false, nil + } + + if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { + // Pod is not yet terminated + log.Warn().Msgf("Pod not yet terminated") + return false, false, nil + } + + if m.PodName != "" { + if _, err := s.actionCtx.GetPod(m.PodName); err == nil { + log.Warn().Msgf("Pod still exists") + return false, false, nil + } else if !k8sutil.IsNotFound(err) { + return false, false, errors.WithStack(err) + } + } + + return true, false, nil +} diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 736580794..79742872b 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -28,6 +28,9 @@ import ( "io/ioutil" "testing" + apiErrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/arangodb/kube-arangodb/pkg/util/errors" inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" @@ -67,6 +70,20 @@ type testContext struct { RecordedEvent *k8sutil.Event } +func (c *testContext) GetPod(podName string) (*core.Pod, error) { + if c.ErrPods != nil { + return nil, c.ErrPods + } + + for _, p := range c.Pods { + if p.Name == podName { + return p.DeepCopy(), nil + } + } + + return nil, apiErrors.NewNotFound(schema.GroupResource{}, podName) +} + func (c *testContext) GetAuthentication() conn.Auth { return func() (authentication driver.Authentication, err error) { return nil, nil diff --git a/pkg/deployment/resources/pod_creator.go b/pkg/deployment/resources/pod_creator.go index 69de71c1e..1584851b5 100644 --- a/pkg/deployment/resources/pod_creator.go +++ b/pkg/deployment/resources/pod_creator.go @@ -241,14 +241,19 @@ func createArangoSyncArgs(apiObject metav1.Object, spec api.DeploymentSpec, grou // CreatePodFinalizers creates a list of finalizers for a pod created for the given group. func (r *Resources) CreatePodFinalizers(group api.ServerGroup) []string { + var finalizers []string + if d := r.context.GetSpec().GetServerGroupSpec(group).ShutdownDelay; d != nil { + finalizers = append(finalizers, constants.FinalizerDelayPodTermination) + } + switch group { case api.ServerGroupAgents: - return []string{constants.FinalizerPodAgencyServing} + finalizers = append(finalizers, constants.FinalizerPodAgencyServing) case api.ServerGroupDBServers: - return []string{constants.FinalizerPodDrainDBServer} - default: - return nil + finalizers = append(finalizers, constants.FinalizerPodDrainDBServer) } + + return finalizers } // CreatePodTolerations creates a list of tolerations for a pod created for the given group. @@ -311,15 +316,17 @@ func (r *Resources) RenderPodForMember(cachedStatus inspectorInterface.Inspector role := group.AsRole() roleAbbr := group.AsRoleAbbreviated() - m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, CreatePodSuffix(spec)) + newMember := m.DeepCopy() + + newMember.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, newMember.ID, CreatePodSuffix(spec)) // Render pod if group.IsArangod() { // Prepare arguments - autoUpgrade := m.Conditions.IsTrue(api.ConditionTypeAutoUpgrade) || spec.Upgrade.Get().AutoUpgrade + autoUpgrade := newMember.Conditions.IsTrue(api.ConditionTypeAutoUpgrade) || spec.Upgrade.Get().AutoUpgrade memberPod := MemberArangoDPod{ - status: m, + status: *newMember, groupSpec: groupSpec, spec: spec, group: group, @@ -340,7 +347,7 @@ func (r *Resources) RenderPodForMember(cachedStatus inspectorInterface.Inspector return nil, errors.WithStack(errors.Wrapf(err, "Validation of pods resources failed")) } - return RenderArangoPod(apiObject, role, m.ID, m.PodName, args, &memberPod) + return RenderArangoPod(apiObject, role, newMember.ID, newMember.PodName, args, &memberPod) } else if group.IsArangosync() { // Check image if !imageInfo.Enterprise { @@ -368,7 +375,7 @@ func (r *Resources) RenderPodForMember(cachedStatus inspectorInterface.Inspector if group == api.ServerGroupSyncMasters { // Create TLS secret - tlsKeyfileSecretName = k8sutil.CreateTLSKeyfileSecretName(apiObject.GetName(), role, m.ID) + tlsKeyfileSecretName = k8sutil.CreateTLSKeyfileSecretName(apiObject.GetName(), role, newMember.ID) // Check cluster JWT secret if spec.IsAuthenticated() { clusterJWTSecretName = spec.Authentication.GetJWTSecretName() @@ -384,7 +391,7 @@ func (r *Resources) RenderPodForMember(cachedStatus inspectorInterface.Inspector } // Prepare arguments - args := createArangoSyncArgs(apiObject, spec, group, groupSpec, m) + args := createArangoSyncArgs(apiObject, spec, group, groupSpec, *newMember) memberSyncPod := MemberSyncPod{ tlsKeyfileSecretName: tlsKeyfileSecretName, @@ -399,7 +406,7 @@ func (r *Resources) RenderPodForMember(cachedStatus inspectorInterface.Inspector arangoMember: *member, } - return RenderArangoPod(apiObject, role, m.ID, m.PodName, args, &memberSyncPod) + return RenderArangoPod(apiObject, role, newMember.ID, newMember.PodName, args, &memberSyncPod) } else { return nil, errors.Newf("unable to render Pod") } diff --git a/pkg/deployment/resources/pod_finalizers.go b/pkg/deployment/resources/pod_finalizers.go index eda510ea2..6e05e6061 100644 --- a/pkg/deployment/resources/pod_finalizers.go +++ b/pkg/deployment/resources/pod_finalizers.go @@ -64,6 +64,25 @@ func (r *Resources) runPodFinalizers(ctx context.Context, p *v1.Pod, memberStatu } else { log.Debug().Err(err).Str("finalizer", f).Msg("Cannot remove Pod finalizer yet") } + case constants.FinalizerDelayPodTermination: + s, _ := r.context.GetStatus() + _, group, ok := s.Members.ElementByID(memberStatus.ID) + if !ok { + continue + } + log.Error().Str("finalizer", f).Msg("Delay finalizer") + + groupSpec := r.context.GetSpec().GetServerGroupSpec(group) + d := time.Duration(util.IntOrDefault(groupSpec.ShutdownDelay, 0)) * time.Second + if t := p.ObjectMeta.DeletionTimestamp; t != nil { + e := p.ObjectMeta.DeletionTimestamp.Time.Sub(time.Now().Add(d)) + log.Error().Str("finalizer", f).Dur("left", e).Msg("Delay finalizer status") + if e < 0 { + removalList = append(removalList, f) + } + } else { + continue + } } } // Remove finalizers (if needed) diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index 86fd72950..f0abcbe7d 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -79,7 +79,7 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter memberStatus, group, found := status.Members.MemberStatusByPodName(pod.GetName()) if !found { - log.Debug().Str("pod", pod.GetName()).Msg("no memberstatus found for pod") + log.Warn().Str("pod", pod.GetName()).Strs("existing-pods", status.Members.PodNames()).Msg("no memberstatus found for pod") if k8sutil.IsPodMarkedForDeletion(pod) && len(pod.GetFinalizers()) > 0 { // Strange, pod belongs to us, but we have no member for it. // Remove all finalizers, so it can be removed. @@ -249,7 +249,7 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter case api.MemberPhaseNone: // Do nothing log.Debug().Str("pod-name", podName).Msg("PodPhase is None, waiting for the pod to be recreated") - case api.MemberPhaseShuttingDown, api.MemberPhaseUpgrading, api.MemberPhaseFailed, api.MemberPhaseRotateStart: + case api.MemberPhaseShuttingDown, api.MemberPhaseUpgrading, api.MemberPhaseFailed, api.MemberPhaseRotateStart, api.MemberPhaseRotating: // Shutdown was intended, so not need to do anything here. // Just mark terminated wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated) @@ -264,8 +264,6 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter return errors.WithStack(err) } } - case api.MemberPhaseRotating: - fallthrough default: log.Debug().Str("pod-name", podName).Msg("Pod is gone") m.Phase = api.MemberPhaseNone // This is trigger a recreate of the pod. diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index 9f5f608cf..10fbf0a60 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -51,6 +51,7 @@ const ( FinalizerPodAgencyServing = "agent.database.arangodb.com/agency-serving" // Finalizer added to Agents, indicating the need for keeping enough agents alive FinalizerPodDrainDBServer = "dbserver.database.arangodb.com/drain" // Finalizer added to DBServers, indicating the need for draining that dbserver FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists + FinalizerDelayPodTermination = "pod.database.arangodb.com/delay" // Finalizer added to Pod, delays termination AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false"