Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v1/member_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (p MemberPhase) IsFailed() bool {
return p == MemberPhaseFailed
}

// IsReady returns true when given phase == "Created"
func (p MemberPhase) IsReady() bool {
return p == MemberPhaseCreated
}

// IsCreatedOrDrain returns true when given phase is MemberPhaseCreated or MemberPhaseDrain
func (p MemberPhase) IsCreatedOrDrain() bool {
return p == MemberPhaseCreated || p == MemberPhaseDrain
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v1/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) {
return m, nil
}
}
for _, m := range l {
if m.Conditions.IsTrue(ConditionTypeCleanedOut) {
return m, nil
}
}
// Pick a random member that is in created state
perm := rand.Perm(len(l))
for _, idx := range perm {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v2alpha1/member_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (p MemberPhase) IsFailed() bool {
return p == MemberPhaseFailed
}

// IsReady returns true when given phase == "Created"
func (p MemberPhase) IsReady() bool {
return p == MemberPhaseCreated
}

// IsCreatedOrDrain returns true when given phase is MemberPhaseCreated or MemberPhaseDrain
func (p MemberPhase) IsCreatedOrDrain() bool {
return p == MemberPhaseCreated || p == MemberPhaseDrain
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/deployment/v2alpha1/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) {
return m, nil
}
}
for _, m := range l {
if m.Conditions.IsTrue(ConditionTypeCleanedOut) {
return m, nil
}
}
// Pick a random member that is in created state
perm := rand.Perm(len(l))
for _, idx := range perm {
Expand Down
10 changes: 10 additions & 0 deletions pkg/deployment/reconcile/action_cleanout_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type actionCleanoutMember struct {
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) {
if a.action.Group != api.ServerGroupDBServers {
// Proceed only on DBServers
return true, nil
}

m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
// We wanted to remove and it is already gone. All ok
Expand Down Expand Up @@ -87,6 +92,10 @@ func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) {
var jobID string
ctxJobID := driver.WithJobIDResponse(ctxChild, &jobID)
if err := cluster.CleanOutServer(ctxJobID, a.action.MemberID); err != nil {
if driver.IsNotFound(err) {
// Member not found, it could be that it never connected to the cluster
return true, nil
}
log.Debug().Err(err).Msg("Failed to cleanout member")
return false, errors.WithStack(err)
}
Expand Down Expand Up @@ -182,6 +191,7 @@ func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, bool, e
return false, false, errors.WithStack(err)
}
}

// Cleanout completed
return true, false, nil
}
16 changes: 12 additions & 4 deletions pkg/deployment/reconcile/action_remove_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package reconcile
import (
"context"

apiErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -105,14 +107,20 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
}
}
}
// Remove the pod (if any)
if err := a.actionCtx.DeletePod(ctx, m.PodName); err != nil {
return false, errors.WithStack(err)
if m.PodName != "" {
// Remove the pod (if any)
if err := a.actionCtx.DeletePod(ctx, m.PodName); err != nil {
if !apiErrors.IsNotFound(err) {
return false, errors.WithStack(err)
}
}
}
// Remove the pvc (if any)
if m.PersistentVolumeClaimName != "" {
if err := a.actionCtx.DeletePvc(ctx, m.PersistentVolumeClaimName); err != nil {
return false, errors.WithStack(err)
if !apiErrors.IsNotFound(err) {
return false, errors.WithStack(err)
}
}
}
// Remove member
Expand Down
9 changes: 9 additions & 0 deletions pkg/deployment/reconcile/helper_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (s shutdownHelperAPI) Start(ctx context.Context) (bool, error) {
log.Error().Msg("No such member")
return true, nil
}
if m.PodName == "" {
log.Warn().Msgf("Pod is empty")
return true, nil
}
// Remove finalizers, so Kubernetes will quickly terminate the pod
if err := s.actionCtx.RemovePodFinalizers(ctx, m.PodName); err != nil {
return false, errors.WithStack(err)
Expand Down Expand Up @@ -130,6 +134,11 @@ func (s shutdownHelperDelete) Start(ctx context.Context) (bool, error) {
return true, nil
}

if m.PodName == "" {
log.Warn().Msgf("Pod is empty")
return true, nil
}

// Terminate pod
if err := s.actionCtx.DeletePod(ctx, m.PodName); err != nil {
if !k8sutil.IsNotFound(err) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/deployment/reconcile/helper_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,19 @@ func withResignLeadership(group api.ServerGroup, member api.MemberStatus, reason

return api.AsPlan(plan).Before(api.NewAction(api.ActionTypeResignLeadership, group, member.ID, reason))
}

func cleanOutMember(group api.ServerGroup, m api.MemberStatus) api.Plan {
var plan api.Plan

if group == api.ServerGroupDBServers {
plan = append(plan,
api.NewAction(api.ActionTypeCleanOutMember, group, m.ID),
)
}
plan = append(plan,
api.NewAction(api.ActionTypeShutdownMember, group, m.ID),
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
)

return plan
}
15 changes: 11 additions & 4 deletions pkg/deployment/reconcile/plan_builder_clean_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func createCleanOutPlan(ctx context.Context, log zerolog.Logger, _ k8sutil.APIOb
return nil
}

if !status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
// Do not consider to mark cleanedout servers when changes are propagating
return nil
}

cluster, err := getCluster(ctx, planCtx)
if err != nil {
log.Warn().Err(err).Msgf("Unable to get cluster")
Expand All @@ -57,6 +62,8 @@ func createCleanOutPlan(ctx context.Context, log zerolog.Logger, _ k8sutil.APIOb
return nil
}

var plan api.Plan

for id, member := range health.Health {
switch member.Role {
case driver.ServerRoleDBServer:
Expand All @@ -79,13 +86,13 @@ func createCleanOutPlan(ctx context.Context, log zerolog.Logger, _ k8sutil.APIOb
Msgf("server is cleaned out so operator must do the same")

action := api.NewAction(api.ActionTypeSetMemberCondition, api.ServerGroupDBServers, string(id),
"server is cleaned out so operator must do the same")
action = action.AddParam(string(api.ConditionTypeCleanedOut), strconv.FormatBool(true))
"server is cleaned out so operator must do the same").
AddParam(string(api.ConditionTypeCleanedOut), strconv.FormatBool(true))

return api.Plan{action}
plan = append(plan, action)
}
}
}

return nil
return plan
}
9 changes: 3 additions & 6 deletions pkg/deployment/reconcile/plan_builder_normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func createNormalPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil
return newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), nil).
// Check for failed members
ApplyIfEmpty(createMemberFailedRestorePlan).
// Check for cleaned out dbserver in created state
ApplyIfEmpty(createRemoveCleanedDBServersPlan).
// Update status
ApplySubPlanIfEmpty(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyStatusUpdate).
ApplyIfEmpty(createTLSStatusUpdate).
ApplyIfEmpty(createJWTStatusUpdate).
// Check for scale up/down
ApplyIfEmpty(createScaleMemberPlan).
// Check for cleaned out dbserver in created state
ApplyIfEmpty(createRemoveCleanedDBServersPlan).
// Check for members to be removed
ApplyIfEmpty(createReplaceMemberPlan).
// Check for the need to rotate one or more members
Expand Down Expand Up @@ -205,10 +205,7 @@ func createRemoveCleanedDBServersPlan(ctx context.Context,
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Msg("Creating dbserver replacement plan because server is cleanout in created phase")
return api.Plan{
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, m.ID),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
}
return cleanOutMember(api.ServerGroupDBServers, m)
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/deployment/reconcile/plan_builder_rotate_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package reconcile
import (
"context"

"github.com/arangodb/kube-arangodb/pkg/deployment/rotation"

"github.com/arangodb/kube-arangodb/pkg/deployment/features"

"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
Expand Down Expand Up @@ -122,7 +124,7 @@ func createRotateOrUpgradePlanInternal(log zerolog.Logger, apiObject k8sutil.API
newPlan = createUpgradeMemberPlan(log, m, group, "Version upgrade", spec, status,
!decision.AutoUpgradeNeeded)
} else {
if m.Conditions.IsTrue(api.ConditionTypeRestart) {
if rotation.CheckPossible(m) && m.Conditions.IsTrue(api.ConditionTypeRestart) {
newPlan = createRotateMemberPlan(log, m, group, "Restart flag present")
}
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/deployment/reconcile/plan_builder_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,7 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api
Str("member-id", m.ID).
Str("phase", string(m.Phase)).
Msg("Found member to remove")
if group == api.ServerGroupDBServers {
plan = append(plan,
api.NewAction(api.ActionTypeCleanOutMember, group, m.ID),
)
}
plan = append(plan,
api.NewAction(api.ActionTypeShutdownMember, group, m.ID),
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
)
plan = append(plan, cleanOutMember(group, m)...)
log.Debug().
Int("count", count).
Int("actual-count", len(members)).
Expand Down
25 changes: 22 additions & 3 deletions pkg/deployment/reconcile/plan_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,13 +902,13 @@ func TestCreatePlan(t *testing.T) {
ExpectedLog: "Creating member replacement plan because member has failed",
},
{
Name: "Scale down DBservers",
Name: "CleanOut DBserver",
context: &testContext{
ArangoDeployment: deploymentTemplate.DeepCopy(),
},
Helper: func(ad *api.ArangoDeployment) {
ad.Spec.DBServers = api.ServerGroupSpec{
Count: util.NewInt(2),
Count: util.NewInt(3),
}
ad.Status.Members.DBServers[0].Phase = api.MemberPhaseCreated
ad.Status.Members.DBServers[0].Conditions = api.ConditionList{
Expand All @@ -919,11 +919,30 @@ func TestCreatePlan(t *testing.T) {
}
},
ExpectedPlan: []api.Action{
api.NewAction(api.ActionTypeCleanOutMember, api.ServerGroupDBServers, "id"),
api.NewAction(api.ActionTypeShutdownMember, api.ServerGroupDBServers, ""),
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, ""),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
},
ExpectedLog: "Creating dbserver replacement plan because server is cleanout in created phase",
},
{
Name: "Scale down DBservers",
context: &testContext{
ArangoDeployment: deploymentTemplate.DeepCopy(),
},
Helper: func(ad *api.ArangoDeployment) {
ad.Spec.DBServers = api.ServerGroupSpec{
Count: util.NewInt(2),
}
ad.Status.Members.DBServers[0].Phase = api.MemberPhaseCreated
},
ExpectedPlan: []api.Action{
api.NewAction(api.ActionTypeCleanOutMember, api.ServerGroupDBServers, "id"),
api.NewAction(api.ActionTypeShutdownMember, api.ServerGroupDBServers, ""),
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, ""),
},
ExpectedLog: "Creating scale-down plan",
},
}

for _, testCase := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/plan_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (d *Reconciler) executePlan(ctx context.Context, cachedStatus inspectorInte

done, abort, recall, err := d.executeAction(ctx, log, planAction, action)
if err != nil {
return plan, false, errors.WithStack(err)
return nil, false, errors.WithStack(err)
}

if abort {
Expand Down
19 changes: 17 additions & 2 deletions pkg/deployment/rotation/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,29 @@ func (m Mode) And(b Mode) Mode {
return b
}

// CheckPossible returns true if rotation is possible
func CheckPossible(member api.MemberStatus) bool {
if !member.Phase.IsReady() {
// Skip rotation when we are not ready
return false
}

if member.Conditions.IsTrue(api.ConditionTypeTerminated) || member.Conditions.IsTrue(api.ConditionTypeTerminating) {
// Termination in progress, nothing to do
return false
}

return true
}

func IsRotationRequired(log zerolog.Logger, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, member api.MemberStatus, pod *core.Pod, specTemplate, statusTemplate *api.ArangoMemberPodTemplate) (mode Mode, plan api.Plan, reason string, err error) {
// Determine if rotation is required based on plan and actions

// Set default mode for return value
mode = SkippedRotation

if member.Phase.IsPending() {
// Skip rotation when we are not yet created
if !CheckPossible(member) {
// Check is not possible due to improper state of member
return
}

Expand Down