diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index de9f9329d..a65a4dd39 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -380,3 +380,13 @@ func (d *Deployment) GetExpectedPodArguments(apiObject metav1.Object, deplSpec a agents api.MemberStatusList, id string, version driver.Version) []string { return d.resources.GetExpectedPodArguments(apiObject, deplSpec, group, agents, id, version) } + +// GetShardSyncStatus returns true if all shards are in sync +func (d *Deployment) GetShardSyncStatus() bool { + return d.resources.GetShardSyncStatus() +} + +// InvalidateSyncStatus resets the sync state to false and triggers an inspection +func (d *Deployment) InvalidateSyncStatus() { + d.resources.InvalidateSyncStatus() +} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 18cff5b33..35d7cad69 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -141,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De d.clusterScalingIntegration = ci go ci.ListenForClusterEvents(d.stopCh) go d.resources.RunDeploymentHealthLoop(d.stopCh) + go d.resources.RunDeploymentShardSyncLoop(d.stopCh) } if config.AllowChaos { d.chaosMonkey = chaos.NewMonkey(deps.Log, d) diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index 71c98a369..6466758a8 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -167,26 +167,6 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good") return false, false, nil } - if a.action.Group == api.ServerGroupDBServers { - dbs, err := c.Databases(ctx) - if err != nil { - return false, false, err - } - for _, db := range dbs { - inv, err := cluster.DatabaseInventory(ctx, db) - if err != nil { - return false, false, err - } - - for _, col := range inv.Collections { - if !col.AllInSync { - log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync") - return false, false, nil - } - } - } - - } // Wait for the member to become ready from a kubernetes point of view // otherwise the coordinators may be rotated to fast and thus none of them // is ready resulting in a short downtime diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 6d3699d34..bf98aab60 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -93,4 +93,8 @@ type Context interface { // GetExpectedPodArguments creates command line arguments for a server in the given group with given ID. GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, agents api.MemberStatusList, id string, version driver.Version) []string + // GetShardSyncStatus returns true if all shards are in sync + GetShardSyncStatus() bool + // InvalidateSyncStatus resets the sync state to false and triggers an inspection + InvalidateSyncStatus() } diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index 71fd64806..4e1dd34b0 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -209,12 +209,17 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, }) return newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense } + if newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense := createRotateOrUpgradePlan(); upgradeNotAllowed { // Upgrade is needed, but not allowed context.CreateEvent(k8sutil.NewUpgradeNotAllowedEvent(apiObject, fromVersion, toVersion, fromLicense, toLicense)) - } else { - // Use the new plan - plan = newPlan + } else if len(newPlan) > 0 { + if clusterReadyForUpgrade(context) { + // Use the new plan + plan = newPlan + } else { + log.Info().Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.") + } } } @@ -237,6 +242,15 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, return plan, true } +// clusterReadyForUpgrade returns true if the cluster is ready for the next update, that is: +// - all shards are in sync +// - all members are ready and fine +func clusterReadyForUpgrade(context PlanBuilderContext) bool { + status, _ := context.GetStatus() + allInSync := context.GetShardSyncStatus() + return allInSync && status.Conditions.IsTrue(api.ConditionTypeReady) +} + // podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with // the given spec) and if that is allowed. func podNeedsUpgrading(log zerolog.Logger, p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision { diff --git a/pkg/deployment/reconcile/plan_builder_context.go b/pkg/deployment/reconcile/plan_builder_context.go index d07df1d68..35ca8b47f 100644 --- a/pkg/deployment/reconcile/plan_builder_context.go +++ b/pkg/deployment/reconcile/plan_builder_context.go @@ -46,6 +46,12 @@ type PlanBuilderContext interface { // GetExpectedPodArguments creates command line arguments for a server in the given group with given ID. GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, agents api.MemberStatusList, id string, version driver.Version) []string + // GetShardSyncStatus returns true if all shards are in sync + GetShardSyncStatus() bool + // InvalidateSyncStatus resets the sync state to false and triggers an inspection + InvalidateSyncStatus() + // GetStatus returns the current status of the deployment + GetStatus() (api.DeploymentStatus, int32) } // newPlanBuilderContext creates a PlanBuilderContext from the given context diff --git a/pkg/deployment/reconcile/timeouts.go b/pkg/deployment/reconcile/timeouts.go index a2273d18d..178326ef9 100644 --- a/pkg/deployment/reconcile/timeouts.go +++ b/pkg/deployment/reconcile/timeouts.go @@ -30,7 +30,7 @@ const ( removeMemberTimeout = time.Minute * 15 renewTLSCertificateTimeout = time.Minute * 30 renewTLSCACertificateTimeout = time.Minute * 30 - rotateMemberTimeout = time.Minute * 30 + rotateMemberTimeout = time.Minute * 15 shutdownMemberTimeout = time.Minute * 30 upgradeMemberTimeout = time.Hour * 6 waitForMemberUpTimeout = time.Minute * 15 diff --git a/pkg/deployment/resources/deployment_health.go b/pkg/deployment/resources/deployment_health.go index 71b1fb1ba..8bf39cf71 100644 --- a/pkg/deployment/resources/deployment_health.go +++ b/pkg/deployment/resources/deployment_health.go @@ -32,6 +32,7 @@ import ( var ( deploymentHealthFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_health_fetches", "Number of times the health of the deployment was fetched", metrics.DeploymentName, metrics.Result) + deploymentSyncFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_sync_fetches", "Number of times the sync status of shards of the deplyoment was fetched", metrics.DeploymentName, metrics.Result) ) // RunDeploymentHealthLoop creates a loop to fetch the health of the deployment. @@ -53,6 +54,7 @@ func (r *Resources) RunDeploymentHealthLoop(stopCh <-chan struct{}) { deploymentHealthFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc() } select { + case <-r.shardSync.triggerSyncInspection.Done(): case <-time.After(time.Second * 5): // Continue case <-stopCh: @@ -88,3 +90,95 @@ func (r *Resources) fetchDeploymentHealth() error { r.health.timestamp = time.Now() return nil } + +// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment. +// The loop ends when the given channel is closed. +func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) { + log := r.log + deploymentName := r.context.GetAPIObject().GetName() + + if r.context.GetSpec().GetMode() != api.DeploymentModeCluster { + // Deployment health is currently only applicable for clusters + return + } + + for { + if err := r.fetchClusterShardSyncState(); err != nil { + log.Debug().Err(err).Msg("Failed to fetch deployment shard sync state") + deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc() + } else { + deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc() + } + select { + case <-time.After(time.Second * 30): + // Continue + case <-stopCh: + // We're done + return + } + } +} + +// InvalidateSyncStatus resets the sync state to false and triggers an inspection +func (r *Resources) InvalidateSyncStatus() { + r.log.Debug().Msg("Invalidating sync status due to previous events") + r.shardSync.mutex.Lock() + defer r.shardSync.mutex.Unlock() + r.shardSync.allInSync = false + r.shardSync.triggerSyncInspection.Trigger() +} + +// fetchClusterShardSyncState performs a single fetch of the cluster inventory and +// checks if all shards are in sync +func (r *Resources) fetchClusterShardSyncState() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + c, err := r.context.GetDatabaseClient(ctx) + if err != nil { + return err + } + cluster, err := c.Cluster(ctx) + if err != nil { + return err + } + dbs, err := c.Databases(ctx) + if err != nil { + return err + } + + allInSync := true +dbloop: + for _, db := range dbs { + inv, err := cluster.DatabaseInventory(ctx, db) + if err != nil { + return err + } + + for _, col := range inv.Collections { + if !col.AllInSync { + r.log.Debug().Str("db", db.Name()).Str("col", col.Parameters.Name).Msg("Collection not in sync") + allInSync = false + break dbloop + } + } + } + + r.shardSync.mutex.Lock() + oldSyncState := r.shardSync.allInSync + r.shardSync.allInSync = allInSync + r.shardSync.timestamp = time.Now() + r.shardSync.mutex.Unlock() + + if !oldSyncState && allInSync { + r.log.Debug().Msg("Everything is in sync by now") + } + + return nil +} + +// GetShardSyncStatus returns true if all shards are in sync +func (r *Resources) GetShardSyncStatus() bool { + r.shardSync.mutex.Lock() + defer r.shardSync.mutex.Unlock() + return r.shardSync.allInSync +} diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index c5d2636ac..eb551c935 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -109,6 +109,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) { // Record termination time now := metav1.Now() memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + r.InvalidateSyncStatus() } } } else if k8sutil.IsPodFailed(&p) { @@ -122,6 +123,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) { // Record termination time now := metav1.Now() memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now) + r.InvalidateSyncStatus() } } } diff --git a/pkg/deployment/resources/resources.go b/pkg/deployment/resources/resources.go index 7a4aee2b4..ab5f281ff 100644 --- a/pkg/deployment/resources/resources.go +++ b/pkg/deployment/resources/resources.go @@ -27,6 +27,7 @@ import ( "time" driver "github.com/arangodb/go-driver" + "github.com/arangodb/kube-arangodb/pkg/util/trigger" "github.com/rs/zerolog" ) @@ -40,6 +41,12 @@ type Resources struct { timestamp time.Time // Timestamp of last fetch of cluster health mutex sync.Mutex // Mutex guarding fields in this struct } + shardSync struct { + allInSync bool + timestamp time.Time + mutex sync.Mutex + triggerSyncInspection trigger.Trigger + } } // NewResources creates a new Resources service, used to