Skip to content

Commit

Permalink
reschedule bindings on cluster change
Browse files Browse the repository at this point in the history
Signed-off-by: dddddai <dddwq@foxmail.com>
  • Loading branch information
dddddai committed Oct 24, 2021
1 parent 1954628 commit 568b870
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/snapshot.go
Expand Up @@ -33,7 +33,7 @@ func (s *Snapshot) GetClusters() []*framework.ClusterInfo {
func (s *Snapshot) GetReadyClusters() []*framework.ClusterInfo {
var readyClusterInfoList []*framework.ClusterInfo
for _, c := range s.clusterInfoList {
if util.IsClusterReady(&c.Cluster().Status) {
if util.IsClusterReady(&c.Cluster().Status) && c.Cluster().DeletionTimestamp.IsZero() {

This comment has been minimized.

Copy link
@RainbowMango

RainbowMango Oct 25, 2021

Do you mean !c.Cluster().DeletionTimestamp.IsZero()?

This comment has been minimized.

Copy link
@dddddai

dddddai Oct 25, 2021

Author Owner

No, if deletiontimestamp is NOT zero, it means the cluster has been unjoined, we should NOT regard it as a ready cluster, should we?

This comment has been minimized.

Copy link
@RainbowMango

RainbowMango Oct 25, 2021

Sorry, you are right.

readyClusterInfoList = append(readyClusterInfoList, c)
}
}
Expand All @@ -45,7 +45,7 @@ func (s *Snapshot) GetReadyClusters() []*framework.ClusterInfo {
func (s *Snapshot) GetReadyClusterNames() sets.String {
readyClusterNames := sets.NewString()
for _, c := range s.clusterInfoList {
if util.IsClusterReady(&c.Cluster().Status) {
if util.IsClusterReady(&c.Cluster().Status) && c.Cluster().DeletionTimestamp.IsZero() {
readyClusterNames.Insert(c.Cluster().Name)
}
}
Expand Down
52 changes: 32 additions & 20 deletions pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -17,7 +17,6 @@ import (
lister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
Expand All @@ -27,7 +26,7 @@ import (
type ScheduleAlgorithm interface {
Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
ScaleSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
FailoverSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
ReSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha2.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
}

// ScheduleResult includes the clusters selected.
Expand All @@ -46,12 +45,12 @@ type genericScheduler struct {
func NewGenericScheduler(
schedCache cache.Cache,
policyLister lister.PropagationPolicyLister,
plugins []string,
framework framework.Framework,
) ScheduleAlgorithm {
return &genericScheduler{
schedulerCache: schedCache,
policyLister: policyLister,
scheduleFramework: runtime.NewFramework(plugins),
scheduleFramework: framework,
}
}

Expand All @@ -66,7 +65,9 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
}
if len(feasibleClusters) == 0 {
return result, fmt.Errorf("no clusters fit")
// just warn and return
klog.Warningf("There's no cluster that fits %v", placement)
return result, nil
}
klog.V(4).Infof("feasible clusters found: %v", feasibleClusters)

Expand Down Expand Up @@ -182,7 +183,7 @@ func (g *genericScheduler) chooseSpreadGroup(spreadGroup *util.SpreadGroup) []*c
for spreadConstraint, clusterGroups := range spreadGroup.GroupRecord {
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldCluster {
if len(clusterGroups) < spreadConstraint.MinGroups {
return nil
return feasibleClusters
}

if len(clusterGroups) <= spreadConstraint.MaxGroups {
Expand Down Expand Up @@ -553,14 +554,25 @@ func (g *genericScheduler) getPreUsed(clusters []*clusterv1alpha1.Cluster, preUs
return preUsedCluster, unUsedCluster
}

func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *policyv1alpha1.Placement,
func (g *genericScheduler) ReSchedule(ctx context.Context, placement *policyv1alpha1.Placement,
spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) {
readyClusters := g.schedulerCache.Snapshot().GetReadyClusterNames()
totalClusters := util.ConvertToClusterNames(spec.Clusters)

reservedClusters := calcReservedCluster(totalClusters, readyClusters)
availableClusters := calcAvailableCluster(totalClusters, readyClusters)

// Remove reserved clusters that don't fit the placement
for clusterName := range reservedClusters {
clusterObj := g.schedulerCache.Snapshot().GetCluster(clusterName)
resMap := g.scheduleFramework.RunFilterPlugins(ctx, placement, &spec.Resource, clusterObj.Cluster())
res := resMap.Merge()
if !res.IsSuccess() {
klog.V(4).Infof("cluster %q is not fit", clusterName)
reservedClusters.Delete(clusterName)
}
}

candidateClusters := sets.NewString()
for clusterName := range availableClusters {
clusterObj := g.schedulerCache.Snapshot().GetCluster(clusterName)
Expand All @@ -580,21 +592,21 @@ func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *poli
klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List())
klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List())

// TODO: should schedule as much as possible?
deltaLen := len(spec.Clusters) - len(reservedClusters)
if len(candidateClusters) < deltaLen {
// for ReplicaSchedulingTypeDivided, we will try to migrate replicas to the other health clusters
if placement.ReplicaScheduling == nil || placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
klog.Warningf("ignore reschedule binding as insufficient available cluster")
return ScheduleResult{}, nil
}
}
// deltaLen := len(spec.Clusters) - len(reservedClusters)
// if len(candidateClusters) < deltaLen {
// // for ReplicaSchedulingTypeDivided, we will try to migrate replicas to the other health clusters
// if placement.ReplicaScheduling == nil || placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
// klog.Warningf("ignore reschedule binding as insufficient available cluster")
// return ScheduleResult{}, nil
// }
// }

// TODO: check if the final result meets the spread constraints.
targetClusters := reservedClusters
clusterList := candidateClusters.List()
for i := 0; i < deltaLen && i < len(candidateClusters); i++ {
targetClusters.Insert(clusterList[i])
for i := 0; i < len(candidateClusters); i++ {
if helper.CheckSpreadConstraints(spec, placement, clusterList[i]) {
targetClusters.Insert(clusterList[i])
}
}

var reScheduleResult []workv1alpha2.TargetCluster
Expand All @@ -607,7 +619,7 @@ func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *poli

// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'.
func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String {
return bindClusters.Difference(bindClusters.Difference(readyClusters))
return bindClusters.Intersection(readyClusters)
}

// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'.
Expand Down
142 changes: 133 additions & 9 deletions pkg/scheduler/scheduler.go
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -31,9 +32,11 @@ import (
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
Expand Down Expand Up @@ -89,10 +92,12 @@ type Scheduler struct {
informerFactory informerfactory.SharedInformerFactory

// TODO: implement a priority scheduling queue
queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface
clusterQueue workqueue.RateLimitingInterface

Algorithm core.ScheduleAlgorithm
schedulerCache schedulercache.Cache
Algorithm core.ScheduleAlgorithm
schedulerFramework framework.Framework
schedulerCache schedulercache.Cache

enableSchedulerEstimator bool
schedulerEstimatorCache *estimatorclient.SchedulerEstimatorCache
Expand All @@ -113,9 +118,11 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister()
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
clusterQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
schedulerCache := schedulercache.NewCache(clusterLister)
// TODO: make plugins as a flag
algorithm := core.NewGenericScheduler(schedulerCache, policyLister, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name})
schedulerFramwork := runtime.NewFramework([]string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name})
algorithm := core.NewGenericScheduler(schedulerCache, policyLister, schedulerFramwork)
sched := &Scheduler{
DynamicClient: dynamicClient,
KarmadaClient: karmadaClient,
Expand All @@ -131,7 +138,9 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
clusterLister: clusterLister,
informerFactory: factory,
queue: queue,
clusterQueue: clusterQueue,
Algorithm: algorithm,
schedulerFramework: schedulerFramwork,
schedulerCache: schedulerCache,
enableSchedulerEstimator: opts.EnableSchedulerEstimator,
}
Expand Down Expand Up @@ -193,6 +202,7 @@ func (s *Scheduler) Run(ctx context.Context) {
}

go wait.Until(s.worker, time.Second, stopCh)
go wait.Until(s.rescheduleOnClusterChange, 0, stopCh)

<-stopCh
}
Expand Down Expand Up @@ -267,6 +277,28 @@ func (s *Scheduler) worker() {
}
}

func (s *Scheduler) rescheduleOnClusterChange() {
key, shutdown := s.clusterQueue.Get()
if shutdown {
klog.Errorf("Failed to pop item from clusterQueue")
}
defer s.clusterQueue.Done(key)

clusterName := key.(string)
cluster, err := s.clusterLister.Get(clusterName)
if err != nil {
if apierrors.IsNotFound(err) {
return

This comment has been minimized.

Copy link
@RainbowMango

RainbowMango Oct 25, 2021

Do nothing when a cluster is not found(removed)?

This comment has been minimized.

Copy link
@dddddai

dddddai Oct 25, 2021

Author Owner

Yes, cluster delete events are handled as FailoverSchedule in updateCluster below

if meta.IsStatusConditionFalse(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady) ||
        !newCluster.DeletionTimestamp.IsZero() {

This comment has been minimized.

Copy link
@dddddai

dddddai Oct 25, 2021

Author Owner

clusterQueue focuses on those clusters that :

  1. newly joined cluster
  2. cluster whose field or label updated, which makes it can/cannot be scheduled to
}
klog.Errorf("Failed to get cluster %q: %v", clusterName, err)
s.clusterQueue.AddRateLimited(key)
}
if err = s.reschduleBindingsForCluster(cluster); err != nil {
klog.Errorf("Failed to reschedule bindings for cluster %q: %v", clusterName, err)
s.clusterQueue.AddRateLimited(key)
}
}

// requeueResourceBindings will retrieve all ResourceBinding objects by the label selector and put them to queue.
func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
referenceBindings, err := s.bindingLister.List(selector)
Expand Down Expand Up @@ -533,6 +565,10 @@ func (s *Scheduler) addCluster(obj interface{}) {
}
klog.V(3).Infof("add event for cluster %s", cluster.Name)

if meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady) {
s.clusterQueue.AddRateLimited(cluster.Name)
}

if s.enableSchedulerEstimator {
s.schedulerEstimatorWorker.AddRateLimited(cluster.Name)
}
Expand All @@ -551,14 +587,18 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
}

// Check if cluster becomes failure
if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
if meta.IsStatusConditionFalse(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady) ||
!newCluster.DeletionTimestamp.IsZero() {
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover)

if Failover { // Trigger reschedule on cluster failure only when flag is true.
s.enqueueAffectedBinding(newCluster.Name)
s.enqueueAffectedClusterBinding(newCluster.Name)
return
}
} else if meta.IsStatusConditionTrue(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady) {
// reschedule bindings since cluster updated
s.clusterQueue.AddRateLimited(newCluster.Name)
}
}

Expand Down Expand Up @@ -670,7 +710,7 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor
return err
}

reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
reScheduleResult, err := s.Algorithm.ReSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
if err != nil {
return err
}
Expand All @@ -695,7 +735,7 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.Reso
return err
}

reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &placement, &resourceBinding.Spec)
reScheduleResult, err := s.Algorithm.ReSchedule(context.TODO(), &placement, &resourceBinding.Spec)
if err != nil {
return err
}
Expand Down Expand Up @@ -869,17 +909,20 @@ func (s *Scheduler) getTypeFromClusterResourceBindings(name string) ScheduleType

func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool {
clusters := s.schedulerCache.Snapshot().GetClusters()
count := 0
for i := range tcs {
for _, c := range clusters {
if c.Cluster().Name == tcs[i].Name {
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) ||
!c.Cluster().DeletionTimestamp.IsZero() {
return false
}
count++
continue
}
}
}
return true
return count == len(tcs)
}

func (s *Scheduler) reconcileEstimatorConnection(key util.QueueKey) error {
Expand Down Expand Up @@ -954,3 +997,84 @@ func (s *Scheduler) updateClusterBindingStatusIfNeeded(crb *workv1alpha2.Cluster
}
return nil
}

func (s *Scheduler) reschduleBindingsForCluster(cluster *clusterv1alpha1.Cluster) error {
rbs, err := s.bindingLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list all resource bindings: %v", err)
return err
}
var errs []error
for _, rb := range rbs {
placement, _, err := s.getPlacement(rb)
if err != nil {
klog.Errorf("Failed to get placement of ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
}
if s.needReschedule(cluster, &rb.Spec, placement) {
key, err := cache.MetaNamespaceKeyFunc(rb)
if err != nil {
klog.Errorf("Failed to get key of ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
}
if err = s.rescheduleOne(key); err != nil {
errs = append(errs, err)
}
}
}

crbs, err := s.clusterBindingLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list all cluster resource bindings: %v", err)
return err
}
for _, crb := range crbs {
policyName := util.GetLabelValue(crb.Labels, policyv1alpha1.ClusterPropagationPolicyLabel)
policy, err := s.clusterPolicyLister.Get(policyName)
if err != nil {
klog.Errorf("Failed to get policy of ClusterResourceBinding(%s): %v", crb.Name, err)
return err
}
if s.needReschedule(cluster, &crb.Spec, policy.Spec.Placement) {
key, err := cache.MetaNamespaceKeyFunc(crb)
if err != nil {
klog.Errorf("Failed to get key of ClusterResourceBinding(%s): %v", crb.Name, err)
return err
}
if err = s.rescheduleOne(key); err != nil {
errs = append(errs, err)
}
}
}
return errors.NewAggregate(errs)
}

func (s *Scheduler) needReschedule(cluster *clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec, placement policyv1alpha1.Placement) bool {
// There are 2 cases that a binding needs rescheduling
// 1. The cluster fits placement but is NOT in binding.spec.newClusters (No more than MaxGroup)
// 2. The cluster does NOT fit the placement but is in binding.spec.newClusters
binded := false
for _, bindingCluster := range spec.Clusters {
if cluster.Name == bindingCluster.Name {
binded = true
break
}
}
if binded {
if !s.clusterFitsPlacement(cluster, placement, &spec.Resource) {
// the cluster does NOT fit the placement but is in binding.spec.clusters
return true
}
} else if s.clusterFitsPlacement(cluster, placement, &spec.Resource) && helper.CheckSpreadConstraints(spec, &placement, cluster.Name) {
// the cluster fits the placement but is NOT in binding.spec.clusters (No more than MaxGroup)
return true
}
return false
}

func (s *Scheduler) clusterFitsPlacement(
cluster *clusterv1alpha1.Cluster,
placement policyv1alpha1.Placement,
resource *workv1alpha2.ObjectReference) bool {
return s.schedulerFramework.RunFilterPlugins(context.TODO(), &placement, resource, cluster).Merge().IsSuccess()
}

0 comments on commit 568b870

Please sign in to comment.