Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2625] Refactor Clients to avoid hard-code checks #844

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,42 +113,42 @@ func NewContextWithBootstrapConfigMaps(apis client.APIProvider, bootstrapConfigM

func (ctx *Context) AddSchedulingEventHandlers() error {
err := ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.ConfigMapInformerHandlers,
FilterFn: ctx.filterConfigMaps,
AddFn: ctx.addConfigMaps,
UpdateFn: ctx.updateConfigMaps,
DeleteFn: ctx.deleteConfigMaps,
InformerType: client.ConfigMapInformerHandlers,
FilterFn: ctx.filterConfigMaps,
AddFn: ctx.addConfigMaps,
UpdateFn: ctx.updateConfigMaps,
DeleteFn: ctx.deleteConfigMaps,
})
if err != nil {
return err
}

err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PriorityClassInformerHandlers,
FilterFn: ctx.filterPriorityClasses,
AddFn: ctx.addPriorityClass,
UpdateFn: ctx.updatePriorityClass,
DeleteFn: ctx.deletePriorityClass,
InformerType: client.PriorityClassInformerHandlers,
FilterFn: ctx.filterPriorityClasses,
AddFn: ctx.addPriorityClass,
UpdateFn: ctx.updatePriorityClass,
DeleteFn: ctx.deletePriorityClass,
})
if err != nil {
return err
}

err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.NodeInformerHandlers,
AddFn: ctx.addNode,
UpdateFn: ctx.updateNode,
DeleteFn: ctx.deleteNode,
InformerType: client.NodeInformerHandlers,
AddFn: ctx.addNode,
UpdateFn: ctx.updateNode,
DeleteFn: ctx.deleteNode,
})
if err != nil {
return err
}

err = ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
AddFn: ctx.AddPod,
UpdateFn: ctx.UpdatePod,
DeleteFn: ctx.DeletePod,
InformerType: client.PodInformerHandlers,
AddFn: ctx.AddPod,
UpdateFn: ctx.UpdatePod,
DeleteFn: ctx.DeletePod,
})
if err != nil {
return err
Expand Down
75 changes: 5 additions & 70 deletions pkg/client/apifactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,13 @@
"go.uber.org/zap"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"

"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
)

type Type int

var informerTypes = [...]string{"Pod", "Node", "ConfigMap", "Storage", "PV", "PVC", "PriorityClass"}

const (
PodInformerHandlers Type = iota
NodeInformerHandlers
ConfigMapInformerHandlers
StorageInformerHandlers
PVInformerHandlers
PVCInformerHandlers
PriorityClassInformerHandlers
)

func (t Type) String() string {
return informerTypes[t]
}

type APIProvider interface {
GetAPIs() *Clients
AddEventHandler(handlers *ResourceEventHandlers) error
Expand All @@ -64,7 +44,7 @@
// resource handlers defines add/update/delete operations in response to the corresponding resources updates.
// The associated the type field points the handler functions to the correct receiver.
type ResourceEventHandlers struct {
Type
InformerType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this changed to avoid confusion with type?

FilterFn func(obj interface{}) bool
AddFn func(obj interface{})
UpdateFn func(old, new interface{})
Expand All @@ -83,53 +63,8 @@
func NewAPIFactory(scheduler api.SchedulerAPI, informerFactory informers.SharedInformerFactory, configs *conf.SchedulerConf, testMode bool) *APIFactory {
kubeClient := NewKubeClient(configs.KubeConfig)

// init informers
// volume informers are also used to get the Listers for the predicates
nodeInformer := informerFactory.Core().V1().Nodes()
podInformer := informerFactory.Core().V1().Pods()
configMapInformer := informerFactory.Core().V1().ConfigMaps()
storageInformer := informerFactory.Storage().V1().StorageClasses()
csiNodeInformer := informerFactory.Storage().V1().CSINodes()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
namespaceInformer := informerFactory.Core().V1().Namespaces()
priorityClassInformer := informerFactory.Scheduling().V1().PriorityClasses()

var capacityCheck = volumebinding.CapacityCheck{
CSIDriverInformer: informerFactory.Storage().V1().CSIDrivers(),
CSIStorageCapacityInformer: informerFactory.Storage().V1().CSIStorageCapacities(),
}

// create a volume binder (needs the informers)
volumeBinder := volumebinding.NewVolumeBinder(
klog.NewKlogr(),
kubeClient.GetClientSet(),
podInformer,
nodeInformer,
csiNodeInformer,
pvcInformer,
pvInformer,
storageInformer,
capacityCheck,
configs.VolumeBindTimeout)

return &APIFactory{
clients: &Clients{
conf: configs,
KubeClient: kubeClient,
SchedulerAPI: scheduler,
InformerFactory: informerFactory,
PodInformer: podInformer,
NodeInformer: nodeInformer,
ConfigMapInformer: configMapInformer,
PVInformer: pvInformer,
PVCInformer: pvcInformer,
NamespaceInformer: namespaceInformer,
StorageInformer: storageInformer,
CSINodeInformer: csiNodeInformer,
PriorityClassInformer: priorityClassInformer,
VolumeBinder: volumeBinder,
},
clients: NewClients(scheduler, informerFactory, configs, kubeClient),

Check warning on line 67 in pkg/client/apifactory.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/apifactory.go#L67

Added line #L67 was not covered by tests
testMode: testMode,
stopChan: make(chan struct{}),
lock: &locking.RWMutex{},
Expand Down Expand Up @@ -166,15 +101,15 @@
h = fns
}

log.Log(log.ShimClient).Info("registering event handler", zap.Stringer("type", handlers.Type))
if err := s.addEventHandlers(handlers.Type, h, 0); err != nil {
log.Log(log.ShimClient).Info("registering event handler", zap.Stringer("type", handlers.InformerType))
if err := s.addEventHandlers(handlers.InformerType, h, 0); err != nil {

Check warning on line 105 in pkg/client/apifactory.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/apifactory.go#L104-L105

Added lines #L104 - L105 were not covered by tests
return fmt.Errorf("failed to initialize event handlers: %w", err)
}
return nil
}

func (s *APIFactory) addEventHandlers(
handlerType Type, handler cache.ResourceEventHandler, resyncPeriod time.Duration) error {
handlerType InformerType, handler cache.ResourceEventHandler, resyncPeriod time.Duration) error {

Check warning on line 112 in pkg/client/apifactory.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/apifactory.go#L112

Added line #L112 was not covered by tests
var err error
switch handlerType {
case PodInformerHandlers:
Expand Down
8 changes: 4 additions & 4 deletions pkg/client/apifactory_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
type operation int

type informerEvent struct {
handlerType Type
handlerType InformerType
op operation
obj interface{}
oldObj interface{}
Expand Down Expand Up @@ -229,7 +229,7 @@
}

m.eventHandler <- handlers
log.Log(log.Test).Info("registering event handler", zap.Stringer("type", handlers.Type))
log.Log(log.Test).Info("registering event handler", zap.Stringer("type", handlers.InformerType))

Check warning on line 232 in pkg/client/apifactory_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/apifactory_mock.go#L232

Added line #L232 was not covered by tests

return nil
}
Expand All @@ -243,7 +243,7 @@
m.running = true
log.Log(log.Test).Info("mock shared informers: starting background event handler")
go func() {
eventHandlers := make(map[Type][]cache.ResourceEventHandler)
eventHandlers := make(map[InformerType][]cache.ResourceEventHandler)

Check warning on line 246 in pkg/client/apifactory_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/apifactory_mock.go#L246

Added line #L246 was not covered by tests

for {
select {
Expand All @@ -265,7 +265,7 @@
} else {
h = fns
}
handlerType := handlers.Type
handlerType := handlers.InformerType

Check warning on line 268 in pkg/client/apifactory_mock.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/apifactory_mock.go#L268

Added line #L268 was not covered by tests

forType := eventHandlers[handlerType]
forType = append(forType, h)
Expand Down
135 changes: 109 additions & 26 deletions pkg/client/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,41 @@
coreInformerV1 "k8s.io/client-go/informers/core/v1"
schedulingInformerV1 "k8s.io/client-go/informers/scheduling/v1"
storageInformerV1 "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"

"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
)

var informerTypes = [...]string{"Pod", "Node", "ConfigMap", "Storage", "PV", "PVC", "PriorityClass", "Namespaces", "CSINode", "CSIDrivers", "CSIStorageCapacity"}

type InformerType int

func (t InformerType) String() string {
return informerTypes[t]

Check warning on line 44 in pkg/client/clients.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/clients.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}

const (
PodInformerHandlers InformerType = iota
NodeInformerHandlers
ConfigMapInformerHandlers
StorageInformerHandlers
PVInformerHandlers
PVCInformerHandlers
PriorityClassInformerHandlers
NamespacesInformerHandlers
CSINodeInformerHandlers
CSIDriversInformerHandlers
CSIStorageCapacityInformerHandlers
)

type hasInformer interface {
Informer() cache.SharedIndexInformer
}

// clients encapsulates a set of useful client APIs
// that can be shared by callers when talking to K8s api-server,
// or the scheduler core.
Expand All @@ -49,6 +77,7 @@
InformerFactory informers.SharedInformerFactory

// resource informers
hasInformers []hasInformer
PodInformer coreInformerV1.PodInformer
NodeInformer coreInformerV1.NodeInformer
ConfigMapInformer coreInformerV1.ConfigMapInformer
Expand All @@ -63,42 +92,96 @@
VolumeBinder volumebinding.SchedulerVolumeBinder
}

func NewClients(scheduler api.SchedulerAPI, informerFactory informers.SharedInformerFactory, configs *conf.SchedulerConf, kubeClient KubeClient) *Clients {
// init informers
// volume informers are also used to get the Listers for the predicates
hasInformers := []hasInformer{}

podInformer := save(informerFactory.Core().V1().Pods(), &hasInformers)
nodeInformer := save(informerFactory.Core().V1().Nodes(), &hasInformers)
configMapInformer := save(informerFactory.Core().V1().ConfigMaps(), &hasInformers)
storageInformer := save(informerFactory.Storage().V1().StorageClasses(), &hasInformers)
pvInformer := save(informerFactory.Core().V1().PersistentVolumes(), &hasInformers)
pvcInformer := save(informerFactory.Core().V1().PersistentVolumeClaims(), &hasInformers)
priorityClassInformer := save(informerFactory.Scheduling().V1().PriorityClasses(), &hasInformers)
namespaceInformer := save(informerFactory.Core().V1().Namespaces(), &hasInformers)
csiNodeInformer := save(informerFactory.Storage().V1().CSINodes(), &hasInformers)
csiDriversInformer := save(informerFactory.Storage().V1().CSIDrivers(), &hasInformers)
csiStorageCapacityInformer := save(informerFactory.Storage().V1().CSIStorageCapacities(), &hasInformers)
Comment on lines +98 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I don't know. We do save some lines by introducing this intermediate type, at the same, readability is not the best. In fact, I had to download and apply the patch locally to understand what's happening here.
I'm not the fan of this.

For example, even though this is more LOC, it's just much simpler:

	var sharedInformers []cache.SharedIndexInformer
	
	podInformer := informerFactory.Core().V1().Pods()
	sharedInformers = append(sharedInformers, podInformer.Informer())
	nodeInformer := informerFactory.Core().V1().Nodes()
	sharedInformers = append(sharedInformers, nodeInformer.Informer())
...

Then sharedInformers can be passed as a slice to Client after it's done. This way, we also don't need to copy the slice every time we call getInformers(), another thing which I don't really like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, even though this is more LOC, it's just much simpler:

yep, that is a good idea. However, the previous discussion (#844 (comment)) seems to want a style of avoid ignoring the SharedIndexInformer in creating XXXInformer . Otherwise, this simple way is good to me.

Let me summary the requests:

  1. we must save the SharedIndexInformer from all used XXXInformer ([YUNIKORN-2625] Refactor Clients to avoid hard-code checks #844 (comment))
  2. the informerTypes should be synced with all used XXXInformer ([YUNIKORN-2625] Refactor Clients to avoid hard-code checks #844 (comment))

I agree to @wilfred-s said "Not sure how we do that without manual work" - Maybe we should let this PR go and focus on adding missed CSINodeInformer and NamespaceInformer to informerTypes manually


var capacityCheck = volumebinding.CapacityCheck{
CSIDriverInformer: csiDriversInformer,
CSIStorageCapacityInformer: csiStorageCapacityInformer,
}

// create a volume binder (needs the informers)
volumeBinder := volumebinding.NewVolumeBinder(
klog.NewKlogr(),
kubeClient.GetClientSet(),
podInformer,
nodeInformer,
csiNodeInformer,
pvcInformer,
pvInformer,
storageInformer,
capacityCheck,
configs.VolumeBindTimeout,
)

return &Clients{
conf: configs,
KubeClient: kubeClient,
SchedulerAPI: scheduler,
InformerFactory: informerFactory,
hasInformers: hasInformers,
PodInformer: podInformer,
NodeInformer: nodeInformer,
ConfigMapInformer: configMapInformer,
PVInformer: pvInformer,
PVCInformer: pvcInformer,
NamespaceInformer: namespaceInformer,
StorageInformer: storageInformer,
CSINodeInformer: csiNodeInformer,
PriorityClassInformer: priorityClassInformer,
VolumeBinder: volumeBinder,
}
}

func save[T hasInformer](n T, collector *[]hasInformer) T {
*collector = append(*collector, n)
return n
}

func (c *Clients) GetConf() *conf.SchedulerConf {
return c.conf
}

func (c *Clients) getInformers() []cache.SharedIndexInformer {
informers := make([]cache.SharedIndexInformer, len(c.hasInformers))
for i, informer := range c.hasInformers {
informers[i] = informer.Informer()
}
return informers
}

func (c *Clients) WaitForSync() {
syncStartTime := time.Now()
counter := 0
for {
if c.NodeInformer.Informer().HasSynced() &&
c.PodInformer.Informer().HasSynced() &&
c.PVCInformer.Informer().HasSynced() &&
c.PVInformer.Informer().HasSynced() &&
c.StorageInformer.Informer().HasSynced() &&
c.CSINodeInformer.Informer().HasSynced() &&
c.ConfigMapInformer.Informer().HasSynced() &&
c.NamespaceInformer.Informer().HasSynced() &&
c.PriorityClassInformer.Informer().HasSynced() {
return
}
time.Sleep(time.Second)
counter++
if counter%10 == 0 {
log.Log(log.ShimClient).Info("Waiting for informers to sync",
zap.Duration("timeElapsed", time.Since(syncStartTime).Round(time.Second)))

for _, informer := range c.getInformers() {
for !informer.HasSynced() {
time.Sleep(time.Second)
counter++
if counter%10 == 0 {
log.Log(log.ShimClient).Info("Waiting for informers to sync",
zap.Duration("timeElapsed", time.Since(syncStartTime).Round(time.Second)))

Check warning on line 177 in pkg/client/clients.go

View check run for this annotation

Codecov / codecov/patch

pkg/client/clients.go#L176-L177

Added lines #L176 - L177 were not covered by tests
}
}
}
}

func (c *Clients) Run(stopCh <-chan struct{}) {
go c.NodeInformer.Informer().Run(stopCh)
go c.PodInformer.Informer().Run(stopCh)
go c.PVInformer.Informer().Run(stopCh)
go c.PVCInformer.Informer().Run(stopCh)
go c.StorageInformer.Informer().Run(stopCh)
go c.CSINodeInformer.Informer().Run(stopCh)
go c.ConfigMapInformer.Informer().Run(stopCh)
go c.NamespaceInformer.Informer().Run(stopCh)
go c.PriorityClassInformer.Informer().Run(stopCh)
for _, informer := range c.getInformers() {
go informer.Run(stopCh)
}
}
Loading