Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (d *Deployment) GetSpec() api.DeploymentSpec {
return d.apiObject.Spec
}

// GetDeploymentHealth returns a copy of the latest known state of cluster health
func (d *Deployment) GetDeploymentHealth() (driver.ClusterHealth, error) {
return d.resources.GetDeploymentHealth()
}

// GetStatus returns the current status of the deployment
// together with the current version of that status.
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type ActionContext interface {
// SetCurrentImage changes the CurrentImage field in the deployment
// status to the given image.
SetCurrentImage(imageInfo api.ImageInfo) error
// GetDeploymentHealth returns a copy of the latest known state of cluster health
GetDeploymentHealth() (driver.ClusterHealth, error)
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
}
Expand All @@ -107,6 +109,11 @@ func (ac *actionContext) GetMode() api.DeploymentMode {
return ac.context.GetSpec().GetMode()
}

// GetDeploymentHealth returns a copy of the latest known state of cluster health
func (ac *actionContext) GetDeploymentHealth() (driver.ClusterHealth, error) {
return ac.context.GetDeploymentHealth()
}

// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
Expand Down
8 changes: 2 additions & 6 deletions pkg/deployment/reconcile/action_remove_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) {
return false, maskAny(errors.Wrapf(err, "Failed to remove server from cluster: %#v", err))
} else if driver.IsPreconditionFailed(err) {
cluster, err := client.Cluster(ctx)
health, err := a.actionCtx.GetDeploymentHealth()
if err != nil {
return false, maskAny(errors.Wrapf(err, "Failed to obtain cluster: %#v", err))
}
health, err := cluster.Health(ctx)
if err != nil {
return false, maskAny(errors.Wrapf(err, "Failed to obtain cluster health: %#v", err))
return false, maskAny(errors.Wrapf(err, "failed to get cluster health"))
}
// We don't care if not found
if record, ok := health.Health[driver.ServerID(m.ID)]; ok {
Expand Down
16 changes: 3 additions & 13 deletions pkg/deployment/reconcile/action_wait_for_member_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
"github.com/pkg/errors"
"github.com/rs/zerolog"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
Expand Down Expand Up @@ -143,20 +144,9 @@ func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, b
// of a cluster deployment (coordinator/dbserver).
func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client")
return false, false, maskAny(err)
}
cluster, err := c.Cluster(ctx)
h, err := a.actionCtx.GetDeploymentHealth()
if err != nil {
log.Debug().Err(err).Msg("Failed to access cluster")
return false, false, maskAny(err)
}
h, err := cluster.Health(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to get cluster health")
return false, false, maskAny(err)
return false, false, maskAny(errors.Wrapf(err, "failed to get cluster health"))
}
sh, found := h.Health[driver.ServerID(a.action.MemberID)]
if !found {
Expand Down
3 changes: 2 additions & 1 deletion pkg/deployment/reconcile/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/arangodb/arangosync/client"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
Expand Down Expand Up @@ -95,6 +94,8 @@ type Context interface {
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
agents api.MemberStatusList, id string, version driver.Version) []string
// GetDeploymentHealth returns a copy of the latest known state of cluster health
GetDeploymentHealth() (driver.ClusterHealth, error)
// GetShardSyncStatus returns true if all shards are in sync
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
Expand Down
20 changes: 20 additions & 0 deletions pkg/deployment/resources/deployment_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ package resources

import (
"context"
"fmt"
"time"

driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/metrics"
)
Expand Down Expand Up @@ -91,6 +93,24 @@ func (r *Resources) fetchDeploymentHealth() error {
return nil
}

// GetDeploymentHealth returns a copy of the latest known state of cluster health
func (r *Resources) GetDeploymentHealth() (driver.ClusterHealth, error) {

r.health.mutex.Lock()
defer r.health.mutex.Unlock()
if r.health.timestamp.IsZero() {
return driver.ClusterHealth{}, fmt.Errorf("No cluster health available")
}

newhealth := r.health.clusterHealth
newhealth.Health = make(map[driver.ServerID]driver.ServerHealth)

for k, v := range r.health.clusterHealth.Health {
newhealth.Health[k] = v
}
return newhealth, nil
}

// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment.
// The loop ends when the given channel is closed.
func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) {
Expand Down
14 changes: 8 additions & 6 deletions pkg/deployment/resources/pod_termination.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ func (r *Resources) prepareAgencyPodTermination(ctx context.Context, log zerolog
agentDataWillBeGone := false
if p.Spec.NodeName != "" {
node, err := r.context.GetKubeCli().CoreV1().Nodes().Get(p.Spec.NodeName, metav1.GetOptions{})
if err != nil {
if k8sutil.IsNotFound(err) {
log.Warn().Msg("Node not found")
} else if err != nil {
log.Warn().Err(err).Msg("Failed to get node for member")
return maskAny(err)
}
if node.Spec.Unschedulable {
} else if node.Spec.Unschedulable {
agentDataWillBeGone = true
}
}
Expand Down Expand Up @@ -140,11 +141,12 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, log zerol
dbserverDataWillBeGone := false
if p.Spec.NodeName != "" {
node, err := r.context.GetKubeCli().CoreV1().Nodes().Get(p.Spec.NodeName, metav1.GetOptions{})
if err != nil {
if k8sutil.IsNotFound(err) {
log.Warn().Msg("Node not found")
} else if err != nil {
log.Warn().Err(err).Msg("Failed to get node for member")
return maskAny(err)
}
if node.Spec.Unschedulable {
} else if node.Spec.Unschedulable {
dbserverDataWillBeGone = true
}
}
Expand Down