Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/apis/deployment/v1alpha/member_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
MemberPhaseFailed MemberPhase = "Failed"
// MemberPhaseCleanOut indicates that a dbserver is in the process of being cleaned out
MemberPhaseCleanOut MemberPhase = "CleanOut"
// MemberPhaseDrain indicates that a dbserver is in the process of being cleaned out as result of draining a node
MemberPhaseDrain MemberPhase = "Drain"
// MemberPhaseShuttingDown indicates that a member is shutting down
MemberPhaseShuttingDown MemberPhase = "ShuttingDown"
// MemberPhaseRotating indicates that a member is being rotated
Expand All @@ -46,3 +48,8 @@ const (
func (p MemberPhase) IsFailed() bool {
return p == MemberPhaseFailed
}

// IsCreatedOrDrain returns true when given phase is MemberPhaseCreated or MemberPhaseDrain
func (p MemberPhase) IsCreatedOrDrain() bool {
return p == MemberPhaseCreated || p == MemberPhaseDrain
}
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,

// Check for cleaned out dbserver in created state
for _, m := range status.Members.DBServers {
if len(plan) == 0 && m.Phase == api.MemberPhaseCreated && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
if len(plan) == 0 && m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
log.Debug().
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Expand Down
3 changes: 3 additions & 0 deletions pkg/deployment/resources/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"

driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"k8s.io/api/core/v1"
Expand Down Expand Up @@ -85,4 +86,6 @@ type Context interface {
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
GetDatabaseClient(ctx context.Context) (driver.Client, error)
// GetAgency returns a connection to the entire agency.
GetAgency(ctx context.Context) (agency.Agency, error)
}
2 changes: 1 addition & 1 deletion pkg/deployment/resources/pod_finalizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *Resources) inspectFinalizerPodDrainDBServer(ctx context.Context, log ze
}

// If this DBServer is cleaned out, we need to remove the PVC.
if memberStatus.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
if memberStatus.Conditions.IsTrue(api.ConditionTypeCleanedOut) || memberStatus.Phase == api.MemberPhaseDrain {
pvcs := r.context.GetKubeCli().CoreV1().PersistentVolumeClaims(r.context.GetNamespace())
if err := pvcs.Delete(memberStatus.PersistentVolumeClaimName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Warn().Err(err).Msg("Failed to delete PVC for member")
Expand Down
2 changes: 2 additions & 0 deletions pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) {
for _, m := range members {
if podName := m.PodName; podName != "" {
if !podExists(podName) {
log.Debug().Str("pod-name", podName).Msg("Does not exist")
switch m.Phase {
case api.MemberPhaseNone:
// Do nothing
log.Debug().Str("pod-name", podName).Msg("PodPhase is None, waiting for the pod to be recreated")
case api.MemberPhaseShuttingDown, api.MemberPhaseRotating, api.MemberPhaseUpgrading, api.MemberPhaseFailed:
// Shutdown was intended, so not need to do anything here.
// Just mark terminated
Expand Down
54 changes: 49 additions & 5 deletions pkg/deployment/resources/pod_termination.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

Expand Down Expand Up @@ -187,13 +189,55 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, log zerol
// Not cleaned out yet, check member status
if memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) {
log.Warn().Msg("Member is already terminated before it could be cleaned out. Not good, but removing dbserver pod because we cannot do anything further")
// At this point we have to set CleanedOut to true,
// because we can no longer reason about the state in the agency and
// bringing back the dbserver again may result in an cleaned out server without us knowing
memberStatus.Conditions.Update(api.ConditionTypeCleanedOut, true, "Draining server failed", "")
memberStatus.CleanoutJobID = ""
if memberStatus.Phase == api.MemberPhaseDrain {
memberStatus.Phase = api.MemberPhaseCreated
}
if err := updateMember(memberStatus); err != nil {
return maskAny(err)
}
return nil
}
// Ensure the cleanout is triggered
log.Debug().Msg("Server is not yet clean out. Triggering a clean out now")
if err := cluster.CleanOutServer(ctx, memberStatus.ID); err != nil {
log.Debug().Err(err).Msg("Failed to clean out server")
return maskAny(err)
if memberStatus.Phase == api.MemberPhaseCreated {
// No cleanout job triggered
var jobID string
ctx = driver.WithJobIDResponse(ctx, &jobID)
// Ensure the cleanout is triggered
log.Debug().Msg("Server is not yet clean out. Triggering a clean out now")
if err := cluster.CleanOutServer(ctx, memberStatus.ID); err != nil {
log.Debug().Err(err).Msg("Failed to clean out server")
return maskAny(err)
}
memberStatus.CleanoutJobID = jobID
memberStatus.Phase = api.MemberPhaseDrain
if err := updateMember(memberStatus); err != nil {
return maskAny(err)
}
} else if memberStatus.Phase == api.MemberPhaseDrain {
// Check the job progress
agency, err := r.context.GetAgency(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create agency client")
return maskAny(err)
}
jobStatus, err := arangod.CleanoutServerJobStatus(ctx, memberStatus.CleanoutJobID, c, agency)
if err != nil {
log.Debug().Err(err).Msg("Failed to fetch cleanout job status")
return maskAny(err)
}
if jobStatus.IsFailed() {
log.Warn().Str("reason", jobStatus.Reason()).Msg("Cleanout Job failed. Aborting plan")
// Revert cleanout state
memberStatus.Phase = api.MemberPhaseCreated
memberStatus.CleanoutJobID = ""
return maskAny(fmt.Errorf("Clean out server job failed"))
}
}

return maskAny(fmt.Errorf("Server is not yet cleaned out"))

}