Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,13 @@ func (d *Deployment) GetExpectedPodArguments(apiObject metav1.Object, deplSpec a
agents api.MemberStatusList, id string, version driver.Version) []string {
return d.resources.GetExpectedPodArguments(apiObject, deplSpec, group, agents, id, version)
}

// GetShardSyncStatus returns true if all shards are in sync
func (d *Deployment) GetShardSyncStatus() bool {
return d.resources.GetShardSyncStatus()
}

// InvalidateSyncStatus resets the sync state to false and triggers an inspection
func (d *Deployment) InvalidateSyncStatus() {
d.resources.InvalidateSyncStatus()
}
1 change: 1 addition & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
d.clusterScalingIntegration = ci
go ci.ListenForClusterEvents(d.stopCh)
go d.resources.RunDeploymentHealthLoop(d.stopCh)
go d.resources.RunDeploymentShardSyncLoop(d.stopCh)
}
if config.AllowChaos {
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)
Expand Down
20 changes: 0 additions & 20 deletions pkg/deployment/reconcile/action_wait_for_member_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,6 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool,
log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good")
return false, false, nil
}
if a.action.Group == api.ServerGroupDBServers {
dbs, err := c.Databases(ctx)
if err != nil {
return false, false, err
}
for _, db := range dbs {
inv, err := cluster.DatabaseInventory(ctx, db)
if err != nil {
return false, false, err
}

for _, col := range inv.Collections {
if !col.AllInSync {
log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync")
return false, false, nil
}
}
}

}
// Wait for the member to become ready from a kubernetes point of view
// otherwise the coordinators may be rotated to fast and thus none of them
// is ready resulting in a short downtime
Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/reconcile/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ type Context interface {
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
agents api.MemberStatusList, id string, version driver.Version) []string
// GetShardSyncStatus returns true if all shards are in sync
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
}
20 changes: 17 additions & 3 deletions pkg/deployment/reconcile/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,17 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
})
return newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense
}

if newPlan, upgradeNotAllowed, fromVersion, toVersion, fromLicense, toLicense := createRotateOrUpgradePlan(); upgradeNotAllowed {
// Upgrade is needed, but not allowed
context.CreateEvent(k8sutil.NewUpgradeNotAllowedEvent(apiObject, fromVersion, toVersion, fromLicense, toLicense))
} else {
// Use the new plan
plan = newPlan
} else if len(newPlan) > 0 {
if clusterReadyForUpgrade(context) {
// Use the new plan
plan = newPlan
} else {
log.Info().Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.")
}
}
}

Expand All @@ -237,6 +242,15 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
return plan, true
}

// clusterReadyForUpgrade returns true if the cluster is ready for the next update, that is:
// - all shards are in sync
// - all members are ready and fine
func clusterReadyForUpgrade(context PlanBuilderContext) bool {
status, _ := context.GetStatus()
allInSync := context.GetShardSyncStatus()
return allInSync && status.Conditions.IsTrue(api.ConditionTypeReady)
}

// podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with
// the given spec) and if that is allowed.
func podNeedsUpgrading(log zerolog.Logger, p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision {
Expand Down
6 changes: 6 additions & 0 deletions pkg/deployment/reconcile/plan_builder_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ type PlanBuilderContext interface {
// GetExpectedPodArguments creates command line arguments for a server in the given group with given ID.
GetExpectedPodArguments(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
agents api.MemberStatusList, id string, version driver.Version) []string
// GetShardSyncStatus returns true if all shards are in sync
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
// GetStatus returns the current status of the deployment
GetStatus() (api.DeploymentStatus, int32)
}

// newPlanBuilderContext creates a PlanBuilderContext from the given context
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
removeMemberTimeout = time.Minute * 15
renewTLSCertificateTimeout = time.Minute * 30
renewTLSCACertificateTimeout = time.Minute * 30
rotateMemberTimeout = time.Minute * 30
rotateMemberTimeout = time.Minute * 15
shutdownMemberTimeout = time.Minute * 30
upgradeMemberTimeout = time.Hour * 6
waitForMemberUpTimeout = time.Minute * 15
Expand Down
94 changes: 94 additions & 0 deletions pkg/deployment/resources/deployment_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var (
deploymentHealthFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_health_fetches", "Number of times the health of the deployment was fetched", metrics.DeploymentName, metrics.Result)
deploymentSyncFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_sync_fetches", "Number of times the sync status of shards of the deplyoment was fetched", metrics.DeploymentName, metrics.Result)
)

// RunDeploymentHealthLoop creates a loop to fetch the health of the deployment.
Expand All @@ -53,6 +54,7 @@ func (r *Resources) RunDeploymentHealthLoop(stopCh <-chan struct{}) {
deploymentHealthFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
}
select {
case <-r.shardSync.triggerSyncInspection.Done():
case <-time.After(time.Second * 5):
// Continue
case <-stopCh:
Expand Down Expand Up @@ -88,3 +90,95 @@ func (r *Resources) fetchDeploymentHealth() error {
r.health.timestamp = time.Now()
return nil
}

// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment.
// The loop ends when the given channel is closed.
func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) {
log := r.log
deploymentName := r.context.GetAPIObject().GetName()

if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
// Deployment health is currently only applicable for clusters
return
}

for {
if err := r.fetchClusterShardSyncState(); err != nil {
log.Debug().Err(err).Msg("Failed to fetch deployment shard sync state")
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc()
} else {
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
}
select {
case <-time.After(time.Second * 30):
// Continue
case <-stopCh:
// We're done
return
}
}
}

// InvalidateSyncStatus resets the sync state to false and triggers an inspection
func (r *Resources) InvalidateSyncStatus() {
r.log.Debug().Msg("Invalidating sync status due to previous events")
r.shardSync.mutex.Lock()
defer r.shardSync.mutex.Unlock()
r.shardSync.allInSync = false
r.shardSync.triggerSyncInspection.Trigger()
}

// fetchClusterShardSyncState performs a single fetch of the cluster inventory and
// checks if all shards are in sync
func (r *Resources) fetchClusterShardSyncState() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
c, err := r.context.GetDatabaseClient(ctx)
if err != nil {
return err
}
cluster, err := c.Cluster(ctx)
if err != nil {
return err
}
dbs, err := c.Databases(ctx)
if err != nil {
return err
}

allInSync := true
dbloop:
for _, db := range dbs {
inv, err := cluster.DatabaseInventory(ctx, db)
if err != nil {
return err
}

for _, col := range inv.Collections {
if !col.AllInSync {
r.log.Debug().Str("db", db.Name()).Str("col", col.Parameters.Name).Msg("Collection not in sync")
allInSync = false
break dbloop
}
}
}

r.shardSync.mutex.Lock()
oldSyncState := r.shardSync.allInSync
r.shardSync.allInSync = allInSync
r.shardSync.timestamp = time.Now()
r.shardSync.mutex.Unlock()

if !oldSyncState && allInSync {
r.log.Debug().Msg("Everything is in sync by now")
}

return nil
}

// GetShardSyncStatus returns true if all shards are in sync
func (r *Resources) GetShardSyncStatus() bool {
r.shardSync.mutex.Lock()
defer r.shardSync.mutex.Unlock()
return r.shardSync.allInSync
}
2 changes: 2 additions & 0 deletions pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) {
// Record termination time
now := metav1.Now()
memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now)
r.InvalidateSyncStatus()
}
}
} else if k8sutil.IsPodFailed(&p) {
Expand All @@ -122,6 +123,7 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) {
// Record termination time
now := metav1.Now()
memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now)
r.InvalidateSyncStatus()
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/deployment/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

driver "github.com/arangodb/go-driver"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
"github.com/rs/zerolog"
)

Expand All @@ -40,6 +41,12 @@ type Resources struct {
timestamp time.Time // Timestamp of last fetch of cluster health
mutex sync.Mutex // Mutex guarding fields in this struct
}
shardSync struct {
allInSync bool
timestamp time.Time
mutex sync.Mutex
triggerSyncInspection trigger.Trigger
}
}

// NewResources creates a new Resources service, used to
Expand Down