Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/apis/deployment/v1/deployment_status_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,19 @@ func (ds DeploymentStatusMembers) MembersOfGroup(group ServerGroup) MemberStatus
return MemberStatusList{}
}
}

// PodNames returns all members pod names
func (ds DeploymentStatusMembers) PodNames() []string {
var n []string

ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, m := range list {
if m.PodName != "" {
n = append(n, m.PodName)
}
}
return nil
})

return n
}
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v1/member_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,8 @@ func (p MemberPhase) IsFailed() bool {
func (p MemberPhase) IsCreatedOrDrain() bool {
return p == MemberPhaseCreated || p == MemberPhaseDrain
}

// String returns string from MemberPhase
func (p MemberPhase) String() string {
return string(p)
}
33 changes: 33 additions & 0 deletions pkg/apis/deployment/v1/server_group_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,35 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// ServerGroupShutdownMethod enum of possible shutdown methods
type ServerGroupShutdownMethod string

// Default return default value for ServerGroupShutdownMethod
func (s *ServerGroupShutdownMethod) Default() ServerGroupShutdownMethod {
return ServerGroupShutdownMethodAPI
}

// Get return current or default value of ServerGroupShutdownMethod
func (s *ServerGroupShutdownMethod) Get() ServerGroupShutdownMethod {
if s == nil {
return s.Default()
}

switch t := *s; t {
case ServerGroupShutdownMethodAPI, ServerGroupShutdownMethodDelete:
return t
default:
return s.Default()
}
}

const (
// ServerGroupShutdownMethodAPI API Shutdown method
ServerGroupShutdownMethodAPI ServerGroupShutdownMethod = "api"
// ServerGroupShutdownMethodDelete Pod Delete shutdown method
ServerGroupShutdownMethodDelete ServerGroupShutdownMethod = "delete"
)

// ServerGroupSpec contains the specification for all servers in a specific group (e.g. all agents)
type ServerGroupSpec struct {
// Count holds the requested number of servers
Expand Down Expand Up @@ -106,6 +135,10 @@ type ServerGroupSpec struct {
ExtendedRotationCheck *bool `json:"extendedRotationCheck,omitempty"`
// InitContainers Init containers specification
InitContainers *ServerGroupInitContainers `json:"initContainers,omitempty"`
// ShutdownMethod describe procedure of member shutdown taken by Operator
ShutdownMethod *ServerGroupShutdownMethod `json:"shutdownMethod,omitempty"`
// ShutdownDelay define how long operator should delay finalizer removal after shutdown
ShutdownDelay *int `json:"shutdownDelay,omitempty"`
}

// ServerGroupSpecSecurityContext contains specification for pod security context
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/deployment/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions pkg/apis/deployment/v2alpha1/server_group_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,35 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// ServerGroupShutdownMethod enum of possible shutdown methods
type ServerGroupShutdownMethod string

// Default return default value for ServerGroupShutdownMethod
func (s *ServerGroupShutdownMethod) Default() ServerGroupShutdownMethod {
return ServerGroupShutdownMethodAPI
}

// Get return current or default value of ServerGroupShutdownMethod
func (s *ServerGroupShutdownMethod) Get() ServerGroupShutdownMethod {
if s == nil {
return s.Default()
}

switch t := *s; t {
case ServerGroupShutdownMethodAPI, ServerGroupShutdownMethodDelete:
return t
default:
return s.Default()
}
}

const (
// ServerGroupShutdownMethodAPI API Shutdown method
ServerGroupShutdownMethodAPI ServerGroupShutdownMethod = "api"
// ServerGroupShutdownMethodDelete Pod Delete shutdown method
ServerGroupShutdownMethodDelete ServerGroupShutdownMethod = "delete"
)

// ServerGroupSpec contains the specification for all servers in a specific group (e.g. all agents)
type ServerGroupSpec struct {
// Count holds the requested number of servers
Expand Down Expand Up @@ -106,6 +135,8 @@ type ServerGroupSpec struct {
ExtendedRotationCheck *bool `json:"extendedRotationCheck,omitempty"`
// InitContainers Init containers specification
InitContainers *ServerGroupInitContainers `json:"initContainers,omitempty"`
// ShutdownMethod describe procedure of member shutdown taken by Operator
ShutdownMethod *ServerGroupShutdownMethod `json:"shutdownMethod,omitempty"`
}

// ServerGroupSpecSecurityContext contains specification for pod security context
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ func (d *Deployment) CreateMember(group api.ServerGroup, id string) (string, err
return id, nil
}

// GetPod returns pod.
func (d *Deployment) GetPod(podName string) (*v1.Pod, error) {
return d.deps.KubeCli.CoreV1().Pods(d.GetNamespace()).Get(context.Background(), podName, meta.GetOptions{})
}

// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
func (d *Deployment) DeletePod(podName string) error {
Expand Down
10 changes: 8 additions & 2 deletions pkg/deployment/reconcile/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@ import (
"github.com/rs/zerolog"
)

// Action executes a single Plan item.
type Action interface {
// ActionCore executes a single Plan item.
type ActionCore interface {
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
Start(ctx context.Context) (bool, error)
// CheckProgress checks the progress of the action.
// Returns: ready, abort, error.
CheckProgress(ctx context.Context) (bool, bool, error)
}

// Action executes a single Plan item.
type Action interface {
ActionCore

// Timeout returns the amount of time after which this action will timeout.
Timeout(deploymentSpec api.DeploymentSpec) time.Duration
// Return the MemberID used / created in this action
Expand Down
11 changes: 11 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type ActionContext interface {
UpdateMember(member api.MemberStatus) error
// RemoveMemberByID removes a member with given id.
RemoveMemberByID(id string) error
// GetPod returns pod.
GetPod(podName string) (*v1.Pod, error)
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(podName string) error
Expand Down Expand Up @@ -317,6 +319,15 @@ func (ac *actionContext) RemoveMemberByID(id string) error {
return nil
}

// GetPod returns pod.
func (ac *actionContext) GetPod(podName string) (*v1.Pod, error) {
if pod, err := ac.context.GetPod(podName); err != nil {
return nil, errors.WithStack(err)
} else {
return pod, nil
}
}

// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
func (ac *actionContext) DeletePod(podName string) error {
Expand Down
49 changes: 16 additions & 33 deletions pkg/deployment/reconcile/action_rotate_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package reconcile
import (
"context"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
Expand Down Expand Up @@ -56,41 +58,17 @@ type actionRotateMember struct {
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionRotateMember) Start(ctx context.Context) (bool, error) {
log := a.log
group := a.action.Group
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
}
// Remove finalizers, so Kubernetes will quickly terminate the pod
if err := a.actionCtx.RemovePodFinalizers(m.PodName); err != nil {
return false, errors.WithStack(err)
}
if group.IsArangod() {
// Invoke shutdown endpoint
c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, errors.WithStack(err)
}
removeFromCluster := false
log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member")
ctx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
if err := c.Shutdown(ctx, removeFromCluster); err != nil {
// Shutdown failed. Let's check if we're already done
if ready, _, err := a.CheckProgress(ctx); err == nil && ready {
// We're done
return true, nil
}
log.Debug().Err(err).Msg("Failed to shutdown member")
return false, errors.WithStack(err)
}
} else if group.IsArangosync() {
// Terminate pod
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, errors.WithStack(err)
}

if ready, err := getShutdownHelper(&a.action, a.actionCtx, a.log).Start(ctx); err != nil {
return false, err
} else if ready {
return true, nil
}

// Update status
m.Phase = api.MemberPhaseRotating

Expand All @@ -110,13 +88,18 @@ func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, bool, err
log.Error().Msg("No such member")
return true, false, nil
}
if !m.Conditions.IsTrue(api.ConditionTypeTerminated) {
// Pod is not yet terminated

if ready, abort, err := getShutdownHelper(&a.action, a.actionCtx, a.log).CheckProgress(ctx); err != nil {
return false, abort, err
} else if !ready {
return false, false, nil
}

// Pod is terminated, we can now remove it
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, false, errors.WithStack(err)
if !k8sutil.IsNotFound(err) {
return false, false, errors.WithStack(err)
}
}
// Pod is now gone, update the member status
m.Phase = api.MemberPhaseNone
Expand Down
50 changes: 17 additions & 33 deletions pkg/deployment/reconcile/action_rotate_start_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package reconcile
import (
"context"

"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
Expand Down Expand Up @@ -56,41 +58,17 @@ type actionRotateStartMember struct {
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionRotateStartMember) Start(ctx context.Context) (bool, error) {
log := a.log
group := a.action.Group
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
}
// Remove finalizers, so Kubernetes will quickly terminate the pod
if err := a.actionCtx.RemovePodFinalizers(m.PodName); err != nil {
return false, errors.WithStack(err)
}
if group.IsArangod() {
// Invoke shutdown endpoint
c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, errors.WithStack(err)
}
removeFromCluster := false
log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member")
ctx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
if err := c.Shutdown(ctx, removeFromCluster); err != nil {
// Shutdown failed. Let's check if we're already done
if ready, _, err := a.CheckProgress(ctx); err == nil && ready {
// We're done
return true, nil
}
log.Debug().Err(err).Msg("Failed to shutdown member")
return false, errors.WithStack(err)
}
} else if group.IsArangosync() {
// Terminate pod
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, errors.WithStack(err)
}

if ready, err := getShutdownHelper(&a.action, a.actionCtx, a.log).Start(ctx); err != nil {
return false, err
} else if ready {
return true, nil
}

// Update status
m.Phase = api.MemberPhaseRotateStart

Expand All @@ -110,13 +88,19 @@ func (a *actionRotateStartMember) CheckProgress(ctx context.Context) (bool, bool
log.Error().Msg("No such member")
return true, false, nil
}
if !m.Conditions.IsTrue(api.ConditionTypeTerminated) {
// Pod is not yet terminated

if ready, abort, err := getShutdownHelper(&a.action, a.actionCtx, a.log).CheckProgress(ctx); err != nil {
return false, abort, err
} else if !ready {
return false, false, nil
}

// Pod is terminated, we can now remove it
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, false, errors.WithStack(err)
if !k8sutil.IsNotFound(err) {
return false, false, errors.WithStack(err)
}
}

return true, false, nil
}
Loading