Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message queue to implement multi-cluster watch interface When using the default storage layer #593

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
generatedopenapi "github.com/clusterpedia-io/clusterpedia/pkg/generated/openapi"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

type ClusterPediaServerOptions struct {
Expand All @@ -42,6 +46,8 @@ type ClusterPediaServerOptions struct {
Traces *genericoptions.TracingOptions

Storage *storageoptions.StorageOptions

Subscriber *watchoptions.MiddlewareOptions
}

func NewServerOptions() *ClusterPediaServerOptions {
Expand All @@ -68,7 +74,8 @@ func NewServerOptions() *ClusterPediaServerOptions {
Admission: genericoptions.NewAdmissionOptions(),
Traces: genericoptions.NewTracingOptions(),

Storage: storageoptions.NewStorageOptions(),
Storage: storageoptions.NewStorageOptions(),
Subscriber: watchoptions.NewMiddlerwareOptions(),
}
}

Expand Down Expand Up @@ -118,10 +125,21 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
return nil, err
}

return &apiserver.Config{
config := &apiserver.Config{
GenericConfig: genericConfig,
StorageFactory: storage,
}, nil
}

middleware.SubscriberEnabled = o.Subscriber.Enabled
if middleware.SubscriberEnabled {
err = watcher.NewSubscriber(o.Subscriber)
if err != nil {
return nil, err
}
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
}

return config, nil
}

func (o *ClusterPediaServerOptions) genericOptionsApplyTo(config *genericapiserver.RecommendedConfig) error {
Expand Down Expand Up @@ -182,6 +200,7 @@ func (o *ClusterPediaServerOptions) Flags() cliflag.NamedFlagSets {
o.Traces.AddFlags(fss.FlagSet("traces"))

o.Storage.AddFlags(fss.FlagSet("storage"))
o.Subscriber.AddFlags(fss.FlagSet("middleware"))
return fss
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

const (
Expand All @@ -48,6 +51,7 @@ type Options struct {
WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
ShardingName string
Publisher *watchoptions.MiddlewareOptions
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -80,6 +84,7 @@ func NewClusterSynchroManagerOptions() (*Options, error) {
options.KubeStateMetrics = kubestatemetrics.NewOptions()

options.WorkerNumber = 5
options.Publisher = watchoptions.NewMiddlerwareOptions()
return &options, nil
}

Expand Down Expand Up @@ -107,6 +112,7 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
o.Storage.AddFlags(fss.FlagSet("storage"))
o.Metrics.AddFlags(fss.FlagSet("metrics server"))
o.KubeStateMetrics.AddFlags(fss.FlagSet("kube state metrics"))
o.Publisher.AddFlags(fss.FlagSet("middleware"))
return fss
}

Expand All @@ -132,6 +138,14 @@ func (o *Options) Config() (*config.Config, error) {
return nil, err
}

middleware.PublisherEnabled = o.Publisher.Enabled
if middleware.PublisherEnabled {
err = watcher.NewPulisher(o.Publisher)
if err != nil {
return nil, err
}
}

kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
if err != nil {
return nil, err
Expand Down
18 changes: 18 additions & 0 deletions cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

func init() {
Expand Down Expand Up @@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error {
}

if !c.LeaderElection.LeaderElect {
if middleware.PublisherEnabled {
err := middleware.GlobalPublisher.InitPublisher(ctx)
if err != nil {
return err
}
}
synchromanager.Run(c.WorkerNumber, ctx.Done())
return nil
}
Expand Down Expand Up @@ -138,13 +146,23 @@ func Run(ctx context.Context, c *config.Config) error {
defer close(done)

stopCh := ctx.Done()
if middleware.PublisherEnabled {
err := middleware.GlobalPublisher.InitPublisher(ctx)
if err != nil {
return
}
}
synchromanager.Run(c.WorkerNumber, stopCh)
},
OnStoppedLeading: func() {
klog.Info("leaderelection lost")
if done != nil {
<-done
}
if middleware.PublisherEnabled {
middleware.GlobalPublisher.StopPublisher()
components.EC.CloseChannels()
}
},
},
})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/prometheus/exporter-toolkit v0.10.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/streadway/amqp v1.1.0
github.com/stretchr/testify v1.8.3
go.uber.org/atomic v1.10.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
Expand Down
17 changes: 17 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/filters"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

var (
Expand Down Expand Up @@ -106,6 +108,16 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
return nil, fmt.Errorf("CompletedConfig.New() called with config.StorageFactory == nil")
}

// init event cache pool
eventStop := make(chan struct{})
if middleware.SubscriberEnabled {
watchcomponents.InitEventCachePool(eventStop)
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
if err != nil {
return nil, err
}
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -159,6 +171,11 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
}

genericServer.AddPostStartHookOrDie("start-clusterpedia-informers", func(context genericapiserver.PostStartHookContext) error {
// inform to close event watch
go func() {
<-context.StopCh
close(eventStop)
}()
clusterpediaInformerFactory.Start(context.StopCh)
clusterpediaInformerFactory.WaitForCacheSync(context.StopCh)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewREST(serializer runtime.NegotiatedSerializer, factory storage.StorageFac
for irt := range cr.ResourceTypes {
rt := &cr.ResourceTypes[irt]
if rt.Resource != "" {
config, err := configFactory.NewConfig(rt.GroupResource().WithVersion(""), false)
config, err := configFactory.NewConfig(rt.GroupResource().WithVersion(""), false, rt.Kind)
if err != nil {
continue
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions pkg/kube_state_metrics/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ var generators = map[schema.GroupVersionResource]func(allowAnnotationsList, allo
{Group: "networking.k8s.io", Version: "v1", Resource: "ingressclasses"}: ingressClassMetricFamilies,
}

var gvrKinds = map[schema.GroupVersionResource]string{
{Version: "v1", Resource: "pods"}: "Pod",
{Version: "v1", Resource: "secrets"}: "Secret",
{Version: "v1", Resource: "nodes"}: "Node",
{Version: "v1", Resource: "namespaces"}: "Namespace",
{Version: "v1", Resource: "services"}: "Service",

{Group: "apps", Version: "v1", Resource: "deployments"}: "Deployment",
{Group: "apps", Version: "v1", Resource: "daemonsets"}: "DaemonSet",
{Group: "apps", Version: "v1", Resource: "statefulsets"}: "StatefulSet",
{Group: "apps", Version: "v1", Resource: "replicasets"}: "ReplicaSet",

{Group: "batch", Version: "v1", Resource: "jobs"}: "Job",
{Group: "batch", Version: "v1", Resource: "cronjobs"}: "CronJob",

{Group: "networking.k8s.io", Version: "v1", Resource: "ingresses"}: "Ingress",
{Group: "networking.k8s.io", Version: "v1", Resource: "ingressclasses"}: "IngressClass",
}

var rToGVR = make(map[string]schema.GroupVersionResource)

func init() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kube_state_metrics/metrics_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (

func init() {
for gvr := range generators {
config, err := storageConfigFactory.NewLegacyResourceConfig(gvr.GroupResource(), false)
config, err := storageConfigFactory.NewLegacyResourceConfig(gvr.GroupResource(), false, gvrKinds[gvr])
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ type MetricsStoreBuilder struct {
match func(obj interface{}) (bool, error)
}

func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource schema.GroupVersionResource) *MetricsStore {
func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource schema.GroupVersionResource, kind string) *MetricsStore {
if _, ok := builder.resources[resource.Resource]; !ok {
return nil
}
Expand All @@ -101,7 +101,7 @@ func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource sche
return nil
}

config, err := storageConfigFactory.NewLegacyResourceConfig(resource.GroupResource(), false)
config, err := storageConfigFactory.NewLegacyResourceConfig(resource.GroupResource(), false, kind)
if err != nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubeapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
delegate = http.NotFoundHandler()
}

restManager := NewRESTManager(c.GenericConfig.Serializer, runtime.ContentTypeJSON, c.ExtraConfig.StorageFactory, c.ExtraConfig.InitialAPIGroupResources)
restManager := NewRESTManager(c.GenericConfig.Serializer, runtime.ContentTypeJSON, c.ExtraConfig.StorageFactory, c.ExtraConfig.InitialAPIGroupResources, true)
discoveryManager := discovery.NewDiscoveryManager(c.GenericConfig.Serializer, restManager, delegate)

// handle root discovery request
Expand Down
54 changes: 50 additions & 4 deletions pkg/kubeapiserver/clusterresource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
clusterinformer "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions/cluster/v1alpha2"
clusterlister "github.com/clusterpedia-io/clusterpedia/pkg/generated/listers/cluster/v1alpha2"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
)

type ClusterResourceController struct {
Expand All @@ -36,14 +37,16 @@ func NewClusterResourceController(restManager *RESTManager, discoveryManager *di
AddFunc: func(obj interface{}) {
controller.updateClusterResources(obj.(*clusterv1alpha2.PediaCluster))
},
UpdateFunc: func(_, obj interface{}) {
UpdateFunc: func(oldObj, obj interface{}) {
cluster := obj.(*clusterv1alpha2.PediaCluster)
if !cluster.DeletionTimestamp.IsZero() {
controller.clearCache(cluster)
controller.removeCluster(cluster.Name)
return
}

controller.updateClusterResources(obj.(*clusterv1alpha2.PediaCluster))
controller.updateCache(oldObj.(*clusterv1alpha2.PediaCluster), cluster)
controller.updateClusterResources(cluster)
},
DeleteFunc: func(obj interface{}) {
clusterName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
Expand All @@ -60,15 +63,15 @@ func NewClusterResourceController(restManager *RESTManager, discoveryManager *di
return controller
}

func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alpha2.PediaCluster) {
func (c *ClusterResourceController) convertCluster2Map(cluster *clusterv1alpha2.PediaCluster) ResourceInfoMap {
resources := ResourceInfoMap{}
for _, groupResources := range cluster.Status.SyncResources {
for _, resource := range groupResources.Resources {
if len(resource.SyncConditions) == 0 {
continue
}

versions := sets.Set[string]{}
versions := sets.New[string]()
for _, cond := range resource.SyncConditions {
versions.Insert(cond.Version)
}
Expand All @@ -82,6 +85,49 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp
}
}

return resources
}

func (c *ClusterResourceController) updateCache(oldCluster *clusterv1alpha2.PediaCluster, cluster *clusterv1alpha2.PediaCluster) {
if ecp := watchcomponents.GetInitEventCachePool(); ecp == nil {
return
}
oldResources := c.convertCluster2Map(oldCluster)
resources := c.convertCluster2Map(cluster)
for gr, ri := range oldResources {
for version := range ri.Versions {
if !resources[gr].Versions.Has(version) {
// gr has deleted, clear the cache of this gv
watchcomponents.GetInitEventCachePool().ClearCacheByGVR(schema.GroupVersionResource{
Group: gr.Group, Version: version, Resource: gr.Resource,
})
}
}
}
}

func (c *ClusterResourceController) clearCache(cluster *clusterv1alpha2.PediaCluster) {
if ecp := watchcomponents.GetInitEventCachePool(); ecp == nil {
return
}
currentResources := c.clusterresources[cluster.Name]
resources := c.convertCluster2Map(cluster)

for gr, ri := range currentResources {
for version := range ri.Versions {
// clear the cache of this gv which in cluster
if resources[gr].Versions.Has(version) {
watchcomponents.GetInitEventCachePool().ClearCacheByGVR(schema.GroupVersionResource{
Group: gr.Group, Version: version, Resource: gr.Resource,
})
}
}
}
}

func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alpha2.PediaCluster) {
resources := c.convertCluster2Map(cluster)

currentResources := c.clusterresources[cluster.Name]
if reflect.DeepEqual(resources, currentResources) {
return
Expand Down
Loading
Loading