From fc8063c9cd9cb19b78a26eeb7db976e4a3a53145 Mon Sep 17 00:00:00 2001 From: lamai93 Date: Thu, 4 Apr 2019 09:45:52 +0200 Subject: [PATCH] Reduce the amount of health calls to once every five seconds. --- pkg/deployment/context_impl.go | 5 +++ pkg/deployment/reconcile/action_context.go | 7 ++++ .../reconcile/action_remove_member.go | 8 ++--- .../reconcile/action_wait_for_member_up.go | 32 ++++++++++--------- pkg/deployment/reconcile/context.go | 2 ++ pkg/deployment/resources/deployment_health.go | 20 ++++++++++++ pkg/deployment/resources/pod_termination.go | 14 ++++---- 7 files changed, 61 insertions(+), 27 deletions(-) diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index de9f9329d..21ab30e62 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -84,6 +84,11 @@ func (d *Deployment) GetSpec() api.DeploymentSpec { return d.apiObject.Spec } +// GetDeploymentHealth returns a copy of the latest known state of cluster health +func (d *Deployment) GetDeploymentHealth() (driver.ClusterHealth, error) { + return d.resources.GetDeploymentHealth() +} + // GetStatus returns the current status of the deployment // together with the current version of that status. func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) { diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index a0c28a02f..e32f6c176 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -84,6 +84,8 @@ type ActionContext interface { // SetCurrentImage changes the CurrentImage field in the deployment // status to the given image. SetCurrentImage(imageInfo api.ImageInfo) error + // GetDeploymentHealth returns a copy of the latest known state of cluster health + GetDeploymentHealth() (driver.ClusterHealth, error) } // newActionContext creates a new ActionContext implementation. @@ -105,6 +107,11 @@ func (ac *actionContext) GetMode() api.DeploymentMode { return ac.context.GetSpec().GetMode() } +// GetDeploymentHealth returns a copy of the latest known state of cluster health +func (ac *actionContext) GetDeploymentHealth() (driver.ClusterHealth, error) { + return ac.context.GetDeploymentHealth() +} + // GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server), // creating one if needed. func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) { diff --git a/pkg/deployment/reconcile/action_remove_member.go b/pkg/deployment/reconcile/action_remove_member.go index 70e99f57e..30a8a7355 100644 --- a/pkg/deployment/reconcile/action_remove_member.go +++ b/pkg/deployment/reconcile/action_remove_member.go @@ -71,13 +71,9 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) { if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) { return false, maskAny(errors.Wrapf(err, "Failed to remove server from cluster: %#v", err)) } else if driver.IsPreconditionFailed(err) { - cluster, err := client.Cluster(ctx) + health, err := a.actionCtx.GetDeploymentHealth() if err != nil { - return false, maskAny(errors.Wrapf(err, "Failed to obtain cluster: %#v", err)) - } - health, err := cluster.Health(ctx) - if err != nil { - return false, maskAny(errors.Wrapf(err, "Failed to obtain cluster health: %#v", err)) + return false, maskAny(errors.Wrapf(err, "failed to get cluster health")) } // We don't care if not found if record, ok := health.Health[driver.ServerID(m.ID)]; ok { diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index 71c98a369..91c9589c1 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -28,6 +28,7 @@ import ( driver "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" + "github.com/pkg/errors" "github.com/rs/zerolog" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" @@ -143,20 +144,9 @@ func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, b // of a cluster deployment (coordinator/dbserver). func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, bool, error) { log := a.log - c, err := a.actionCtx.GetDatabaseClient(ctx) + h, err := a.actionCtx.GetDeploymentHealth() if err != nil { - log.Debug().Err(err).Msg("Failed to create database client") - return false, false, maskAny(err) - } - cluster, err := c.Cluster(ctx) - if err != nil { - log.Debug().Err(err).Msg("Failed to access cluster") - return false, false, maskAny(err) - } - h, err := cluster.Health(ctx) - if err != nil { - log.Debug().Err(err).Msg("Failed to get cluster health") - return false, false, maskAny(err) + return false, false, maskAny(errors.Wrapf(err, "failed to get cluster health")) } sh, found := h.Health[driver.ServerID(a.action.MemberID)] if !found { @@ -168,12 +158,24 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, return false, false, nil } if a.action.Group == api.ServerGroupDBServers { - dbs, err := c.Databases(ctx) + inventoryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + c, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create database client") + return false, false, maskAny(err) + } + cluster, err := c.Cluster(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to access cluster") + return false, false, maskAny(err) + } + dbs, err := c.Databases(inventoryCtx) if err != nil { return false, false, err } for _, db := range dbs { - inv, err := cluster.DatabaseInventory(ctx, db) + inv, err := cluster.DatabaseInventory(inventoryCtx, db) if err != nil { return false, false, err } diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 6d3699d34..e00f1f0a5 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -93,4 +93,6 @@ type Context interface { // GetExpectedPodArguments creates command line arguments for a server in the given group with given ID. GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, agents api.MemberStatusList, id string, version driver.Version) []string + // GetDeploymentHealth returns a copy of the latest known state of cluster health + GetDeploymentHealth() (driver.ClusterHealth, error) } diff --git a/pkg/deployment/resources/deployment_health.go b/pkg/deployment/resources/deployment_health.go index 71b1fb1ba..f7fab1231 100644 --- a/pkg/deployment/resources/deployment_health.go +++ b/pkg/deployment/resources/deployment_health.go @@ -24,8 +24,10 @@ package resources import ( "context" + "fmt" "time" + driver "github.com/arangodb/go-driver" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/metrics" ) @@ -88,3 +90,21 @@ func (r *Resources) fetchDeploymentHealth() error { r.health.timestamp = time.Now() return nil } + +// GetDeploymentHealth returns a copy of the latest known state of cluster health +func (r *Resources) GetDeploymentHealth() (driver.ClusterHealth, error) { + + r.health.mutex.Lock() + defer r.health.mutex.Unlock() + if r.health.timestamp.IsZero() { + return driver.ClusterHealth{}, fmt.Errorf("No cluster health available") + } + + newhealth := r.health.clusterHealth + newhealth.Health = make(map[driver.ServerID]driver.ServerHealth) + + for k, v := range r.health.clusterHealth.Health { + newhealth.Health[k] = v + } + return newhealth, nil +} diff --git a/pkg/deployment/resources/pod_termination.go b/pkg/deployment/resources/pod_termination.go index 98c207569..5c9803e77 100644 --- a/pkg/deployment/resources/pod_termination.go +++ b/pkg/deployment/resources/pod_termination.go @@ -58,11 +58,12 @@ func (r *Resources) prepareAgencyPodTermination(ctx context.Context, log zerolog agentDataWillBeGone := false if p.Spec.NodeName != "" { node, err := r.context.GetKubeCli().CoreV1().Nodes().Get(p.Spec.NodeName, metav1.GetOptions{}) - if err != nil { + if k8sutil.IsNotFound(err) { + log.Warn().Msg("Node not found") + } else if err != nil { log.Warn().Err(err).Msg("Failed to get node for member") return maskAny(err) - } - if node.Spec.Unschedulable { + } else if node.Spec.Unschedulable { agentDataWillBeGone = true } } @@ -140,11 +141,12 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, log zerol dbserverDataWillBeGone := false if p.Spec.NodeName != "" { node, err := r.context.GetKubeCli().CoreV1().Nodes().Get(p.Spec.NodeName, metav1.GetOptions{}) - if err != nil { + if k8sutil.IsNotFound(err) { + log.Warn().Msg("Node not found") + } else if err != nil { log.Warn().Err(err).Msg("Failed to get node for member") return maskAny(err) - } - if node.Spec.Unschedulable { + } else if node.Spec.Unschedulable { dbserverDataWillBeGone = true } }