From 90b1d87daa67695f549ddea7b669e00d3af68afc Mon Sep 17 00:00:00 2001 From: lamai93 Date: Thu, 4 Apr 2019 17:06:27 +0200 Subject: [PATCH 1/4] Monitor sync status concurrently. Only create new upgrade plan if everything is good. --- pkg/deployment/context_impl.go | 5 ++ pkg/deployment/deployment.go | 1 + .../reconcile/action_wait_for_member_up.go | 20 ----- pkg/deployment/reconcile/context.go | 2 + pkg/deployment/reconcile/plan_builder.go | 20 ++++- .../reconcile/plan_builder_context.go | 4 + pkg/deployment/reconcile/timeouts.go | 2 +- pkg/deployment/resources/deployment_health.go | 79 +++++++++++++++++++ pkg/deployment/resources/resources.go | 5 ++ 9 files changed, 114 insertions(+), 24 deletions(-) diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index de9f9329d..a5b4c5439 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -380,3 +380,8 @@ func (d *Deployment) GetExpectedPodArguments(apiObject metav1.Object, deplSpec a agents api.MemberStatusList, id string, version driver.Version) []string { return d.resources.GetExpectedPodArguments(apiObject, deplSpec, group, agents, id, version) } + +// GetShardSyncStatus returns true if all shards are in sync +func (d *Deployment) GetShardSyncStatus() bool { + return d.resources.GetShardSyncStatus() +} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 18cff5b33..35d7cad69 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -141,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De d.clusterScalingIntegration = ci go ci.ListenForClusterEvents(d.stopCh) go d.resources.RunDeploymentHealthLoop(d.stopCh) + go d.resources.RunDeploymentShardSyncLoop(d.stopCh) } if config.AllowChaos { d.chaosMonkey = chaos.NewMonkey(deps.Log, d) diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index 71c98a369..6466758a8 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -167,26 +167,6 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good") return false, false, nil } - if a.action.Group == api.ServerGroupDBServers { - dbs, err := c.Databases(ctx) - if err != nil { - return false, false, err - } - for _, db := range dbs { - inv, err := cluster.DatabaseInventory(ctx, db) - if err != nil { - return false, false, err - } - - for _, col := range inv.Collections { - if !col.AllInSync { - log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync") - return false, false, nil - } - } - } - - } // Wait for the member to become ready from a kubernetes point of view // otherwise the coordinators may be rotated to fast and thus none of them // is ready resulting in a short downtime diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 6d3699d34..6fc39cb2b 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -93,4 +93,6 @@ type Context interface { // GetExpectedPodArguments creates command line arguments for a server in the given group with given ID. GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, agents api.MemberStatusList, id string, version driver.Version) []string + // GetShardSyncStatus returns true if all shards are in sync + GetShardSyncStatus() bool } diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index 71fd64806..4e1dd34b0 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -209,12 +209,17 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, }) return newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense } + if newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense := createRotateOrUpgradePlan(); upgradeNotAllowed { // Upgrade is needed, but not allowed context.CreateEvent(k8sutil.NewUpgradeNotAllowedEvent(apiObject, fromVersion, toVersion, fromLicense, toLicense)) - } else { - // Use the new plan - plan = newPlan + } else if len(newPlan) > 0 { + if clusterReadyForUpgrade(context) { + // Use the new plan + plan = newPlan + } else { + log.Info().Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.") + } } } @@ -237,6 +242,15 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, return plan, true } +// clusterReadyForUpgrade returns true if the cluster is ready for the next update, that is: +// - all shards are in sync +// - all members are ready and fine +func clusterReadyForUpgrade(context PlanBuilderContext) bool { + status, _ := context.GetStatus() + allInSync := context.GetShardSyncStatus() + return allInSync && status.Conditions.IsTrue(api.ConditionTypeReady) +} + // podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with // the given spec) and if that is allowed. func podNeedsUpgrading(log zerolog.Logger, p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision { diff --git a/pkg/deployment/reconcile/plan_builder_context.go b/pkg/deployment/reconcile/plan_builder_context.go index d07df1d68..335690e7d 100644 --- a/pkg/deployment/reconcile/plan_builder_context.go +++ b/pkg/deployment/reconcile/plan_builder_context.go @@ -46,6 +46,10 @@ type PlanBuilderContext interface { // GetExpectedPodArguments creates command line arguments for a server in the given group with given ID. GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, agents api.MemberStatusList, id string, version driver.Version) []string + // GetShardSyncStatus returns true if all shards are in sync + GetShardSyncStatus() bool + // GetStatus returns the current status of the deployment + GetStatus() (api.DeploymentStatus, int32) } // newPlanBuilderContext creates a PlanBuilderContext from the given context diff --git a/pkg/deployment/reconcile/timeouts.go b/pkg/deployment/reconcile/timeouts.go index a2273d18d..5822ed2f2 100644 --- a/pkg/deployment/reconcile/timeouts.go +++ b/pkg/deployment/reconcile/timeouts.go @@ -33,5 +33,5 @@ const ( rotateMemberTimeout = time.Minute * 30 shutdownMemberTimeout = time.Minute * 30 upgradeMemberTimeout = time.Hour * 6 - waitForMemberUpTimeout = time.Minute * 15 + waitForMemberUpTimeout = time.Minute * 45 ) diff --git a/pkg/deployment/resources/deployment_health.go b/pkg/deployment/resources/deployment_health.go index 71b1fb1ba..eb0c0467f 100644 --- a/pkg/deployment/resources/deployment_health.go +++ b/pkg/deployment/resources/deployment_health.go @@ -28,10 +28,12 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/metrics" + "github.com/rs/zerolog/log" ) var ( deploymentHealthFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_health_fetches", "Number of times the health of the deployment was fetched", metrics.DeploymentName, metrics.Result) + deploymentSyncFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_sync_fetches", "Number of times the sync status of shards of the deplyoment was fetched", metrics.DeploymentName, metrics.Result) ) // RunDeploymentHealthLoop creates a loop to fetch the health of the deployment. @@ -88,3 +90,80 @@ func (r *Resources) fetchDeploymentHealth() error { r.health.timestamp = time.Now() return nil } + +// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment. +// The loop ends when the given channel is closed. +func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) { + log := r.log + deploymentName := r.context.GetAPIObject().GetName() + + if r.context.GetSpec().GetMode() != api.DeploymentModeCluster { + // Deployment health is currently only applicable for clusters + return + } + + for { + if err := r.fetchClusterShardSyncState(); err != nil { + log.Debug().Err(err).Msg("Failed to fetch deployment shard sync state") + deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc() + } else { + deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc() + } + select { + case <-time.After(time.Second * 30): + // Continue + case <-stopCh: + // We're done + return + } + } +} + +// fetchClusterShardSyncState performs a single fetch of the cluster inventory and +// checks if all shards are in sync +func (r *Resources) fetchClusterShardSyncState() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + c, err := r.context.GetDatabaseClient(ctx) + if err != nil { + return err + } + cluster, err := c.Cluster(ctx) + if err != nil { + return err + } + dbs, err := c.Databases(ctx) + if err != nil { + return err + } + + allInSync := true +dbloop: + for _, db := range dbs { + inv, err := cluster.DatabaseInventory(ctx, db) + if err != nil { + return err + } + + for _, col := range inv.Collections { + if !col.AllInSync { + log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync") + allInSync = false + break dbloop + } + } + } + + r.shardSync.mutex.Lock() + defer r.shardSync.mutex.Unlock() + r.shardSync.allInSync = allInSync + r.shardSync.timestamp = time.Now() + return nil +} + +// GetShardSyncStatus returns true if all shards are in sync +func (r *Resources) GetShardSyncStatus() bool { + r.shardSync.mutex.Lock() + defer r.shardSync.mutex.Unlock() + return r.shardSync.allInSync +} diff --git a/pkg/deployment/resources/resources.go b/pkg/deployment/resources/resources.go index 7a4aee2b4..efae68648 100644 --- a/pkg/deployment/resources/resources.go +++ b/pkg/deployment/resources/resources.go @@ -40,6 +40,11 @@ type Resources struct { timestamp time.Time // Timestamp of last fetch of cluster health mutex sync.Mutex // Mutex guarding fields in this struct } + shardSync struct { + allInSync bool + timestamp time.Time + mutex sync.Mutex + } } // NewResources creates a new Resources service, used to From 8f9ec45d03f9100bacafe0edd97431200f2c2571 Mon Sep 17 00:00:00 2001 From: lamai93 Date: Fri, 5 Apr 2019 09:03:23 +0200 Subject: [PATCH 2/4] Modified timeouts. --- pkg/deployment/reconcile/timeouts.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/deployment/reconcile/timeouts.go b/pkg/deployment/reconcile/timeouts.go index 5822ed2f2..178326ef9 100644 --- a/pkg/deployment/reconcile/timeouts.go +++ b/pkg/deployment/reconcile/timeouts.go @@ -30,8 +30,8 @@ const ( removeMemberTimeout = time.Minute * 15 renewTLSCertificateTimeout = time.Minute * 30 renewTLSCACertificateTimeout = time.Minute * 30 - rotateMemberTimeout = time.Minute * 30 + rotateMemberTimeout = time.Minute * 15 shutdownMemberTimeout = time.Minute * 30 upgradeMemberTimeout = time.Hour * 6 - waitForMemberUpTimeout = time.Minute * 45 + waitForMemberUpTimeout = time.Minute * 15 ) From 2eada0b3a47710195e7e76129f36b172216ffe89 Mon Sep 17 00:00:00 2001 From: lamai93 Date: Fri, 5 Apr 2019 14:16:51 +0200 Subject: [PATCH 3/4] Modified log message. --- pkg/deployment/resources/deployment_health.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/deployment/resources/deployment_health.go b/pkg/deployment/resources/deployment_health.go index eb0c0467f..89974d245 100644 --- a/pkg/deployment/resources/deployment_health.go +++ b/pkg/deployment/resources/deployment_health.go @@ -28,7 +28,6 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/metrics" - "github.com/rs/zerolog/log" ) var ( @@ -147,7 +146,7 @@ dbloop: for _, col := range inv.Collections { if !col.AllInSync { - log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync") + r.log.Debug().Str("db", db.Name()).Str("col", col.Parameters.Name).Msg("Collection not in sync") allInSync = false break dbloop } From 2b8a8d6b9adf8cf9d03c904abe83d125245117ef Mon Sep 17 00:00:00 2001 From: lamai93 Date: Fri, 5 Apr 2019 15:05:23 +0200 Subject: [PATCH 4/4] Added sync invalidation on server shutdown. --- pkg/deployment/context_impl.go | 5 +++++ pkg/deployment/reconcile/context.go | 2 ++ .../reconcile/plan_builder_context.go | 2 ++ pkg/deployment/resources/deployment_health.go | 18 +++++++++++++++++- pkg/deployment/resources/pod_inspector.go | 2 ++ pkg/deployment/resources/resources.go | 8 +++++--- 6 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index a5b4c5439..a65a4dd39 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -385,3 +385,8 @@ func (d *Deployment) GetExpectedPodArguments(apiObject metav1.Object, deplSpec a func (d *Deployment) GetShardSyncStatus() bool { return d.resources.GetShardSyncStatus() } + +// InvalidateSyncStatus resets the sync state to false and triggers an inspection +func (d *Deployment) InvalidateSyncStatus() { + d.resources.InvalidateSyncStatus() +} diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 6fc39cb2b..bf98aab60 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -95,4 +95,6 @@ type Context interface { agents api.MemberStatusList, id string, version driver.Version) []string // GetShardSyncStatus returns true if all shards are in sync GetShardSyncStatus() bool + // InvalidateSyncStatus resets the sync state to false and triggers an inspection + InvalidateSyncStatus() } diff --git a/pkg/deployment/reconcile/plan_builder_context.go b/pkg/deployment/reconcile/plan_builder_context.go index 335690e7d..35ca8b47f 100644 --- a/pkg/deployment/reconcile/plan_builder_context.go +++ b/pkg/deployment/reconcile/plan_builder_context.go @@ -48,6 +48,8 @@ type PlanBuilderContext interface { agents api.MemberStatusList, id string, version driver.Version) []string // GetShardSyncStatus returns true if all shards are in sync GetShardSyncStatus() bool + // InvalidateSyncStatus resets the sync state to false and triggers an inspection + InvalidateSyncStatus() // GetStatus returns the current status of the deployment GetStatus() (api.DeploymentStatus, int32) } diff --git a/pkg/deployment/resources/deployment_health.go b/pkg/deployment/resources/deployment_health.go index 89974d245..8bf39cf71 100644 --- a/pkg/deployment/resources/deployment_health.go +++ b/pkg/deployment/resources/deployment_health.go @@ -54,6 +54,7 @@ func (r *Resources) RunDeploymentHealthLoop(stopCh <-chan struct{}) { deploymentHealthFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc() } select { + case <-r.shardSync.triggerSyncInspection.Done(): case <-time.After(time.Second * 5): // Continue case <-stopCh: @@ -118,6 +119,15 @@ func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) { } } +// InvalidateSyncStatus resets the sync state to false and triggers an inspection +func (r *Resources) InvalidateSyncStatus() { + r.log.Debug().Msg("Invalidating sync status due to previous events") + r.shardSync.mutex.Lock() + defer r.shardSync.mutex.Unlock() + r.shardSync.allInSync = false + r.shardSync.triggerSyncInspection.Trigger() +} + // fetchClusterShardSyncState performs a single fetch of the cluster inventory and // checks if all shards are in sync func (r *Resources) fetchClusterShardSyncState() error { @@ -154,9 +164,15 @@ dbloop: } r.shardSync.mutex.Lock() - defer r.shardSync.mutex.Unlock() + oldSyncState := r.shardSync.allInSync r.shardSync.allInSync = allInSync r.shardSync.timestamp = time.Now() + r.shardSync.mutex.Unlock() + + if !oldSyncState && allInSync { + r.log.Debug().Msg("Everything is in sync by now") + } + return nil } diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index c5d2636ac..eb551c935 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -109,6 +109,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) { // Record termination time now := metav1.Now() memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + r.InvalidateSyncStatus() } } } else if k8sutil.IsPodFailed(&p) { @@ -122,6 +123,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) { // Record termination time now := metav1.Now() memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + r.InvalidateSyncStatus() } } } diff --git a/pkg/deployment/resources/resources.go b/pkg/deployment/resources/resources.go index efae68648..ab5f281ff 100644 --- a/pkg/deployment/resources/resources.go +++ b/pkg/deployment/resources/resources.go @@ -27,6 +27,7 @@ import ( "time" driver "github.com/arangodb/go-driver" + "github.com/arangodb/kube-arangodb/pkg/util/trigger" "github.com/rs/zerolog" ) @@ -41,9 +42,10 @@ type Resources struct { mutex sync.Mutex // Mutex guarding fields in this struct } shardSync struct { - allInSync bool - timestamp time.Time - mutex sync.Mutex + allInSync bool + timestamp time.Time + mutex sync.Mutex + triggerSyncInspection trigger.Trigger } }