Skip to content

Commit

Permalink
feat: Add app data to sharding cache to allow sharding by apps (#17014)
Browse files Browse the repository at this point in the history
* Adding app list to sharding cache

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

* Add shard by apps test

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

* Fix lint

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

* Add coverage to test

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

* Fix lint

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

* Converted cluster/app accesors to private, add apps-in-any-namespace suport in shardingcache init, added read lock to GetAppDistribution

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

* Fix tests

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>

---------

Signed-off-by: Andrew Lee <andrewkl@enclavenet.com>
  • Loading branch information
Enclavet committed Mar 1, 2024
1 parent 62003f0 commit d73304e
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 27 deletions.
20 changes: 6 additions & 14 deletions cmd/argocd/commands/admin/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
if err != nil {
return nil, err
}
appItems, err := appClient.ArgoprojV1alpha1().Applications(namespace).List(ctx, v1.ListOptions{})
if err != nil {
return nil, err
}
clusterShardingCache := sharding.NewClusterSharding(argoDB, shard, replicas, shardingAlgorithm)
clusterShardingCache.Init(clustersList)
clusterShardingCache.Init(clustersList, appItems)
clusterShards := clusterShardingCache.GetDistribution()

var cache *appstatecache.Cache
Expand All @@ -113,10 +117,6 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
}
}

appItems, err := appClient.ArgoprojV1alpha1().Applications(namespace).List(ctx, v1.ListOptions{})
if err != nil {
return nil, err
}
apps := appItems.Items
for i, app := range apps {
err := argo.ValidateDestination(ctx, &app.Spec.Destination, argoDB)
Expand All @@ -129,12 +129,6 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie

batchSize := 10
batchesCount := int(math.Ceil(float64(len(clusters)) / float64(batchSize)))
clusterSharding := &sharding.ClusterSharding{
Shard: shard,
Replicas: replicas,
Shards: make(map[string]int),
Clusters: make(map[string]*v1alpha1.Cluster),
}
for batchNum := 0; batchNum < batchesCount; batchNum++ {
batchStart := batchSize * batchNum
batchEnd := batchSize * (batchNum + 1)
Expand All @@ -146,9 +140,7 @@ func loadClusters(ctx context.Context, kubeClient *kubernetes.Clientset, appClie
clusterShard := 0
cluster := batch[i]
if replicas > 0 {
distributionFunction := sharding.GetDistributionFunction(clusterSharding.GetClusterAccessor(), common.DefaultShardingAlgorithm, replicas)
distributionFunction(&cluster)
clusterShard := clusterShards[cluster.Server]
clusterShard = clusterShards[cluster.Server]
cluster.Shard = pointer.Int64(int64(clusterShard))
log.Infof("Cluster with uid: %s will be processed by shard %d", cluster.ID, clusterShard)
}
Expand Down
39 changes: 38 additions & 1 deletion controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,13 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int
if err != nil {
log.Warnf("Cannot init sharding. Error while querying clusters list from database: %v", err)
} else {
ctrl.clusterSharding.Init(clusters)
appItems, err := ctrl.getAppList(metav1.ListOptions{})

if err != nil {
log.Warnf("Cannot init sharding. Error while querying application list from database: %v", err)
} else {
ctrl.clusterSharding.Init(clusters, appItems)
}
}

errors.CheckError(ctrl.stateCache.Init())
Expand Down Expand Up @@ -2106,6 +2112,10 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.appOperationQueue.AddRateLimited(key)
}
newApp, newOK := obj.(*appv1.Application)
if err == nil && newOK {
ctrl.clusterSharding.AddApp(newApp)
}
},
UpdateFunc: func(old, new interface{}) {
if !ctrl.canProcessApp(new) {
Expand Down Expand Up @@ -2136,6 +2146,7 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar

ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, delay)
ctrl.appOperationQueue.AddRateLimited(key)
ctrl.clusterSharding.UpdateApp(newApp)
},
DeleteFunc: func(obj interface{}) {
if !ctrl.canProcessApp(obj) {
Expand All @@ -2148,6 +2159,10 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
// for deletes, we immediately add to the refresh queue
ctrl.appRefreshQueue.Add(key)
}
delApp, delOK := obj.(*appv1.Application)
if err == nil && delOK {
ctrl.clusterSharding.DeleteApp(delApp)
}
},
},
)
Expand Down Expand Up @@ -2223,4 +2238,26 @@ func (ctrl *ApplicationController) toAppQualifiedName(appName, appNamespace stri
return fmt.Sprintf("%s/%s", appNamespace, appName)
}

func (ctrl *ApplicationController) getAppList(options metav1.ListOptions) (*appv1.ApplicationList, error) {
watchNamespace := ctrl.namespace
// If we have at least one additional namespace configured, we need to
// watch on them all.
if len(ctrl.applicationNamespaces) > 0 {
watchNamespace = ""
}

appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).List(context.TODO(), options)
if err != nil {
return nil, err
}
newItems := []appv1.Application{}
for _, app := range appList.Items {
if ctrl.isAppNamespaceAllowed(&app) {
newItems = append(newItems, app)
}
}
appList.Items = newItems
return appList, nil
}

type ClusterFilterFunction func(c *appv1.Cluster, distributionFunction sharding.DistributionFunction) bool
87 changes: 83 additions & 4 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@ import (
)

type ClusterShardingCache interface {
Init(clusters *v1alpha1.ClusterList)
Init(clusters *v1alpha1.ClusterList, apps *v1alpha1.ApplicationList)
Add(c *v1alpha1.Cluster)
Delete(clusterServer string)
Update(oldCluster *v1alpha1.Cluster, newCluster *v1alpha1.Cluster)
AddApp(a *v1alpha1.Application)
DeleteApp(a *v1alpha1.Application)
UpdateApp(a *v1alpha1.Application)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
GetAppDistribution() map[string]int
}

type ClusterSharding struct {
Shard int
Replicas int
Shards map[string]int
Clusters map[string]*v1alpha1.Cluster
Apps map[string]*v1alpha1.Application
lock sync.RWMutex
getClusterShard DistributionFunction
}
Expand All @@ -33,11 +38,12 @@ func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm stri
Replicas: replicas,
Shards: make(map[string]int),
Clusters: make(map[string]*v1alpha1.Cluster),
Apps: make(map[string]*v1alpha1.Application),
}
distributionFunction := NoShardingDistributionFunction()
if replicas > 1 {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
distributionFunction = GetDistributionFunction(clusterSharding.GetClusterAccessor(), shardingAlgorithm, replicas)
distributionFunction = GetDistributionFunction(clusterSharding.getClusterAccessor(), clusterSharding.getAppAccessor(), shardingAlgorithm, replicas)
} else {
log.Info("Processing all cluster shards")
}
Expand All @@ -62,7 +68,7 @@ func (s *ClusterSharding) IsManagedCluster(c *v1alpha1.Cluster) bool {
return clusterShard == s.Shard
}

func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList, apps *v1alpha1.ApplicationList) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
Expand All @@ -71,6 +77,13 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters

newApps := make(map[string]*v1alpha1.Application, len(apps.Items))
for i := range apps.Items {
app := apps.Items[i]
newApps[app.Name] = &app
}
sharding.Apps = newApps
sharding.updateDistribution()
}

Expand Down Expand Up @@ -173,7 +186,8 @@ func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
}

func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
// A read lock should be acquired before calling getClusterAccessor.
func (d *ClusterSharding) getClusterAccessor() clusterAccessor {
return func() []*v1alpha1.Cluster {
// no need to lock, as this is only called from the updateDistribution function
clusters := make([]*v1alpha1.Cluster, 0, len(d.Clusters))
Expand All @@ -183,3 +197,68 @@ func (d *ClusterSharding) GetClusterAccessor() clusterAccessor {
return clusters
}
}

// A read lock should be acquired before calling getAppAccessor.
func (d *ClusterSharding) getAppAccessor() appAccessor {
return func() []*v1alpha1.Application {
apps := make([]*v1alpha1.Application, 0, len(d.Apps))
for _, a := range d.Apps {
apps = append(apps, a)
}
return apps
}
}

func (sharding *ClusterSharding) AddApp(a *v1alpha1.Application) {
sharding.lock.Lock()
defer sharding.lock.Unlock()

_, ok := sharding.Apps[a.Name]
sharding.Apps[a.Name] = a
if !ok {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. App already added")
}
}

func (sharding *ClusterSharding) DeleteApp(a *v1alpha1.Application) {
sharding.lock.Lock()
defer sharding.lock.Unlock()
if _, ok := sharding.Apps[a.Name]; ok {
delete(sharding.Apps, a.Name)
sharding.updateDistribution()
}
}

func (sharding *ClusterSharding) UpdateApp(a *v1alpha1.Application) {
sharding.lock.Lock()
defer sharding.lock.Unlock()

_, ok := sharding.Apps[a.Name]
sharding.Apps[a.Name] = a
if !ok {
sharding.updateDistribution()
} else {
log.Debugf("Skipping sharding distribution update. No relevant changes")
}
}

// GetAppDistribution should be not be called from a DestributionFunction because
// it could cause a deadlock when updateDistribution is called.
func (sharding *ClusterSharding) GetAppDistribution() map[string]int {
sharding.lock.RLock()
clusters := sharding.Clusters
apps := sharding.Apps
sharding.lock.RUnlock()

appDistribution := make(map[string]int, len(clusters))

for _, a := range apps {
if _, ok := appDistribution[a.Spec.Destination.Server]; !ok {
appDistribution[a.Spec.Destination.Server] = 0
}
appDistribution[a.Spec.Destination.Server]++
}
return appDistribution
}
36 changes: 36 additions & 0 deletions controller/sharding/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func TestClusterSharding_Delete(t *testing.T) {
},
},
},
&v1alpha1.ApplicationList{
Items: []v1alpha1.Application{
createApp("app2", "https://127.0.0.1:6443"),
createApp("app1", "https://kubernetes.default.svc"),
},
},
)

sharding.Delete("https://kubernetes.default.svc")
Expand All @@ -164,6 +170,12 @@ func TestClusterSharding_Update(t *testing.T) {
},
},
},
&v1alpha1.ApplicationList{
Items: []v1alpha1.Application{
createApp("app2", "https://127.0.0.1:6443"),
createApp("app1", "https://kubernetes.default.svc"),
},
},
)

distributionBefore := sharding.GetDistribution()
Expand Down Expand Up @@ -207,6 +219,12 @@ func TestClusterSharding_UpdateServerName(t *testing.T) {
},
},
},
&v1alpha1.ApplicationList{
Items: []v1alpha1.Application{
createApp("app2", "https://127.0.0.1:6443"),
createApp("app1", "https://kubernetes.default.svc"),
},
},
)

distributionBefore := sharding.GetDistribution()
Expand Down Expand Up @@ -251,6 +269,12 @@ func TestClusterSharding_IsManagedCluster(t *testing.T) {
},
},
},
&v1alpha1.ApplicationList{
Items: []v1alpha1.Application{
createApp("app2", "https://127.0.0.1:6443"),
createApp("app1", "https://kubernetes.default.svc"),
},
},
)

assert.True(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{
Expand Down Expand Up @@ -278,6 +302,12 @@ func TestClusterSharding_IsManagedCluster(t *testing.T) {
},
},
},
&v1alpha1.ApplicationList{
Items: []v1alpha1.Application{
createApp("app2", "https://127.0.0.1:6443"),
createApp("app1", "https://kubernetes.default.svc"),
},
},
)

assert.False(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{
Expand Down Expand Up @@ -327,6 +357,12 @@ func TestClusterSharding_ClusterShardOfResourceShouldNotBeChanged(t *testing.T)
*clusterWithToBigValue,
},
},
&v1alpha1.ApplicationList{
Items: []v1alpha1.Application{
createApp("app2", "https://127.0.0.1:6443"),
createApp("app1", "https://kubernetes.default.svc"),
},
},
)
distribution := sharding.GetDistribution()
assert.Equal(t, 3, len(distribution))
Expand Down
3 changes: 2 additions & 1 deletion controller/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const ShardControllerMappingKey = "shardControllerMapping"
type DistributionFunction func(c *v1alpha1.Cluster) int
type ClusterFilterFunction func(c *v1alpha1.Cluster) bool
type clusterAccessor func() []*v1alpha1.Cluster
type appAccessor func() []*v1alpha1.Application

// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
// It also stores the heartbeat of last synced time of the application controller.
Expand Down Expand Up @@ -75,7 +76,7 @@ func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, r

// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
// the current datas.
func GetDistributionFunction(clusters clusterAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction {
func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardingAlgorithm string, replicasCount int) DistributionFunction {
log.Debugf("Using filter function: %s", shardingAlgorithm)
distributionFunction := LegacyDistributionFunction(replicasCount)
switch shardingAlgorithm {
Expand Down

0 comments on commit d73304e

Please sign in to comment.