From 6130955cc2328d1ffcaa3ecef4c5338e607a8778 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Fri, 14 Jul 2023 16:32:46 +0200 Subject: [PATCH 01/11] wip --- internal/clients/manager.go | 89 +++++++++++++++++------------------ internal/clients/readiness.go | 72 ++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 45 deletions(-) create mode 100644 internal/clients/readiness.go diff --git a/internal/clients/manager.go b/internal/clients/manager.go index b3b0a0f50d..cb6fa17ae2 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -7,6 +7,7 @@ import ( "github.com/samber/lo" "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" ) @@ -15,6 +16,13 @@ type ClientFactory interface { CreateAdminAPIClient(ctx context.Context, address adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) } +// AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that +// we should configure. +type AdminAPIClientsProvider interface { + KonnectClient() *adminapi.KonnectClient + GatewayClients() []*adminapi.Client +} + // AdminAPIClientsManager keeps track of current Admin API clients of Gateways that we should configure. // In particular, it can be notified about the clients' list update with use of Notify method, and queried // for the latest slice of those with use of Clients method. @@ -32,9 +40,15 @@ type AdminAPIClientsManager struct { notifyLoopRunningCh chan struct{} isNotifyLoopRunning bool - // gatewayClients represent all Kong Gateway data-planes that are configured by this KIC instance with use of - // their Admin API. - gatewayClients []*adminapi.Client + // readyGatewayClients represent all Kong Gateway data-planes that are ready to be configured. + readyGatewayClients map[string]*adminapi.Client + + // pendingGatewayClients represent all Kong Gateway data-planes that were discovered but are not ready to be + // configured. + pendingGatewayClients map[string]adminapi.DiscoveredAdminAPI + + // readinessChecker is used to check readiness of the clients. + readinessChecker ReadinessChecker // konnectClient represents a special-case of the data-plane which is Konnect cloud. // This client is used to synchronise configuration with Konnect's Runtime Group Admin API. @@ -56,8 +70,12 @@ func NewAdminAPIClientsManager( return nil, errors.New("at least one initial client must be provided") } + readyClients := lo.SliceToMap(initialClients, func(c *adminapi.Client) (string, *adminapi.Client) { + return c.BaseRootURL(), c + }) return &AdminAPIClientsManager{ - gatewayClients: initialClients, + readyGatewayClients: readyClients, + pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), adminAPIClientFactory: kongClientFactory, discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), ctx: ctx, @@ -118,16 +136,13 @@ func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient { func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client { c.lock.RLock() defer c.lock.RUnlock() - - copied := make([]*adminapi.Client, len(c.gatewayClients)) - copy(copied, c.gatewayClients) - return copied + return lo.Values(c.readyGatewayClients) } func (c *AdminAPIClientsManager) GatewayClientsCount() int { c.lock.RLock() defer c.lock.RUnlock() - return len(c.gatewayClients) + return len(c.readyGatewayClients) } // SubscribeToGatewayClientsChanges returns a channel that will receive a notification on every Gateway clients update. @@ -172,7 +187,7 @@ func (c *AdminAPIClientsManager) adminAPIAddressNotifyLoop() { select { case <-c.ctx.Done(): c.logger.Infof("closing AdminAPIClientsManager: %s", c.ctx.Err()) - c.discoveredAdminAPIsNotifyChan = nil + close(c.discoveredAdminAPIsNotifyChan) c.closeGatewayClientsSubscribers() return @@ -197,26 +212,32 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi // Short circuit if len(discoveredAdminAPIs) == 0 { - if len(c.gatewayClients) == 0 { + if len(c.readyGatewayClients) == 0 { return false } - c.gatewayClients = c.gatewayClients[:0] + maps.Clear(c.readyGatewayClients) return true } - toAdd := lo.Filter(discoveredAdminAPIs, func(api adminapi.DiscoveredAdminAPI, _ int) bool { + for _, api := range discoveredAdminAPIs { // If we already have a client with a provided address then great, no need // to do anything. + if _, ok := c.readyGatewayClients[api.Address]; ok { + continue + } - // If we don't have a client with new address then filter it and add + // If we don't have a client with new address then create it and add // a client for this address. - return !lo.ContainsBy(c.gatewayClients, func(cl *adminapi.Client) bool { - return api.Address == cl.BaseRootURL() - }) - }) + client, err := c.adminAPIClientFactory.CreateAdminAPIClient(c.ctx, api) + if err != nil { + c.logger.WithError(err).Errorf("failed to create a client for %s", api.Address) + continue + } + c.readyGatewayClients[api.Address] = client + changed = true + } - var idxToRemove []int - for i, cl := range c.gatewayClients { + for _, cl := range c.readyGatewayClients { // If the new address set contains a client that we already have then // good, no need to do anything for it. if lo.ContainsBy(discoveredAdminAPIs, func(api adminapi.DiscoveredAdminAPI) bool { @@ -226,26 +247,11 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi } // If the new address set does not contain an address that we already // have then remove it. - idxToRemove = append(idxToRemove, i) + delete(c.readyGatewayClients, cl.BaseRootURL()) + changed = true } - for i := len(idxToRemove) - 1; i >= 0; i-- { - idx := idxToRemove[i] - c.gatewayClients = append(c.gatewayClients[:idx], c.gatewayClients[idx+1:]...) - } - - for _, adminAPI := range toAdd { - client, err := c.adminAPIClientFactory.CreateAdminAPIClient(c.ctx, adminAPI) - if err != nil { - c.logger.WithError(err).Errorf("failed to create a client for %s", adminAPI) - continue - } - client.AttachPodReference(adminAPI.PodRef) - - c.gatewayClients = append(c.gatewayClients, client) - } - - return len(toAdd) > 0 || len(idxToRemove) > 0 + return changed } // notifyGatewayClientsSubscribers sends notifications to all subscribers that have called SubscribeToGatewayClientsChanges. @@ -265,10 +271,3 @@ func (c *AdminAPIClientsManager) closeGatewayClientsSubscribers() { close(sub) } } - -// AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that -// we should configure. -type AdminAPIClientsProvider interface { - KonnectClient() *adminapi.KonnectClient - GatewayClients() []*adminapi.Client -} diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go new file mode 100644 index 0000000000..4d763243e4 --- /dev/null +++ b/internal/clients/readiness.go @@ -0,0 +1,72 @@ +package clients + +import ( + "context" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/sirupsen/logrus" +) + +type ReadinessCheckResult struct { + TurnedReady []*adminapi.Client + TurnedPending []adminapi.DiscoveredAdminAPI +} + +type ReadinessChecker interface { + CheckReadiness( + ctx context.Context, + lastReadyClients []*adminapi.Client, + lastPendingClients []adminapi.DiscoveredAdminAPI, + ) ReadinessCheckResult +} + +type DefaultReadinessChecker struct { + factory ClientFactory + logger logrus.FieldLogger +} + +func (c DefaultReadinessChecker) CheckReadiness( + ctx context.Context, + lastReadyClients []*adminapi.Client, + lastPendingClients []adminapi.DiscoveredAdminAPI, +) ReadinessCheckResult { + return ReadinessCheckResult{ + TurnedReady: c.checkPendingGatewayClients(ctx, lastPendingClients), + TurnedPending: c.checkActiveGatewayClients(ctx, lastReadyClients), + } +} + +func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) { + for _, adminAPI := range lastPending { + client, err := c.factory.CreateAdminAPIClient(ctx, adminAPI) + if err != nil { + // Despite the error reason we still want to keep the client in the pending list to retry later. + c.logger.WithError(err).Debugf("pending client for %q is not ready yet", adminAPI.Address) + continue + } + + turnedReady = append(turnedReady, client) + } + return turnedReady +} + +func (c DefaultReadinessChecker) checkActiveGatewayClients(ctx context.Context, lastActive []*adminapi.Client) (turnedPending []adminapi.DiscoveredAdminAPI) { + for _, client := range lastActive { + _, err := client.AdminAPIClient().Status(ctx) + if err != nil { + // Despite the error reason we still want to keep the client in the pending list to retry later. + c.logger.WithError(err).Debugf("active client for %q is not ready, moving to pending", client.BaseRootURL()) + + podRef, ok := client.PodReference() + if !ok { + // This should never happen, but if it does, we want to log it. + c.logger.Errorf("failed to get PodReference for client %q", client.BaseRootURL()) + } + turnedPending = append(turnedPending, adminapi.DiscoveredAdminAPI{ + Address: client.BaseRootURL(), + PodRef: podRef, + }) + } + } + return turnedPending +} From f32eb23a1a04a2fe4432b07fca4d013e5d6e9154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Mon, 17 Jul 2023 17:09:52 +0200 Subject: [PATCH 02/11] wip2 --- internal/clients/manager.go | 59 +++++++++++++++++------------------ internal/clients/readiness.go | 23 +++++++++----- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/internal/clients/manager.go b/internal/clients/manager.go index cb6fa17ae2..e3ec15c5e3 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "time" "github.com/samber/lo" "github.com/sirupsen/logrus" @@ -183,6 +184,9 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru // internal clients list. func (c *AdminAPIClientsManager) adminAPIAddressNotifyLoop() { close(c.notifyLoopRunningCh) + readinessCheckTicker := time.NewTicker(5 * time.Second) + defer readinessCheckTicker.Stop() + for { select { case <-c.ctx.Done(): @@ -198,6 +202,8 @@ func (c *AdminAPIClientsManager) adminAPIAddressNotifyLoop() { c.logger.Debug("notifying subscribers about gateway clients change") c.notifyGatewayClientsSubscribers() } + case <-readinessCheckTicker.C: + c.logger.Debug("checking readiness of gateway clients") } } } @@ -210,48 +216,41 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi c.lock.Lock() defer c.lock.Unlock() - // Short circuit + // Short circuit, if len(discoveredAdminAPIs) == 0 { - if len(c.readyGatewayClients) == 0 { + // If we have no clients and the provided list is empty, it means we're in sync. No change was made. + if len(c.readyGatewayClients) == 0 && len(c.pendingGatewayClients) == 0 { return false } + // Otherwise, we have to clear the clients and return true to indicate that the change was made. maps.Clear(c.readyGatewayClients) + maps.Clear(c.pendingGatewayClients) return true } - for _, api := range discoveredAdminAPIs { - // If we already have a client with a provided address then great, no need - // to do anything. - if _, ok := c.readyGatewayClients[api.Address]; ok { - continue - } + // TODO: make sure that makes sense - // If we don't have a client with new address then create it and add - // a client for this address. - client, err := c.adminAPIClientFactory.CreateAdminAPIClient(c.ctx, api) - if err != nil { - c.logger.WithError(err).Errorf("failed to create a client for %s", api.Address) - continue - } - c.readyGatewayClients[api.Address] = client - changed = true + // Perform a readiness check to see which clients are ready and which are not. We'll use the result + // to update the clients lists. + readinessCheckResult := c.readinessChecker.CheckReadiness( + c.ctx, + lo.Values(c.readyGatewayClients), + discoveredAdminAPIs, + ) + + // Add clients that turned ready to readyGatewayClients and remove them from pendingGatewayClients. + for _, cl := range readinessCheckResult.ClientsTurnedReady { + delete(c.pendingGatewayClients, cl.BaseRootURL()) + c.readyGatewayClients[cl.BaseRootURL()] = cl } - for _, cl := range c.readyGatewayClients { - // If the new address set contains a client that we already have then - // good, no need to do anything for it. - if lo.ContainsBy(discoveredAdminAPIs, func(api adminapi.DiscoveredAdminAPI) bool { - return api.Address == cl.BaseRootURL() - }) { - continue - } - // If the new address set does not contain an address that we already - // have then remove it. - delete(c.readyGatewayClients, cl.BaseRootURL()) - changed = true + // Add clients that turned pending to pendingGatewayClients and remove them from readyGatewayClients. + for _, cl := range readinessCheckResult.ClientsTurnedPending { + delete(c.readyGatewayClients, cl.Address) + c.pendingGatewayClients[cl.Address] = cl } - return changed + return readinessCheckResult.HasChanges() } // notifyGatewayClientsSubscribers sends notifications to all subscribers that have called SubscribeToGatewayClientsChanges. diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 4d763243e4..569f821418 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -8,15 +8,19 @@ import ( ) type ReadinessCheckResult struct { - TurnedReady []*adminapi.Client - TurnedPending []adminapi.DiscoveredAdminAPI + ClientsTurnedReady []*adminapi.Client + ClientsTurnedPending []adminapi.DiscoveredAdminAPI +} + +func (r ReadinessCheckResult) HasChanges() bool { + return len(r.ClientsTurnedReady) > 0 || len(r.ClientsTurnedPending) > 0 } type ReadinessChecker interface { CheckReadiness( ctx context.Context, - lastReadyClients []*adminapi.Client, - lastPendingClients []adminapi.DiscoveredAdminAPI, + readyClients []*adminapi.Client, + pendingClients []adminapi.DiscoveredAdminAPI, ) ReadinessCheckResult } @@ -27,17 +31,19 @@ type DefaultReadinessChecker struct { func (c DefaultReadinessChecker) CheckReadiness( ctx context.Context, - lastReadyClients []*adminapi.Client, - lastPendingClients []adminapi.DiscoveredAdminAPI, + readyClients []*adminapi.Client, + pendingClients []adminapi.DiscoveredAdminAPI, ) ReadinessCheckResult { return ReadinessCheckResult{ - TurnedReady: c.checkPendingGatewayClients(ctx, lastPendingClients), - TurnedPending: c.checkActiveGatewayClients(ctx, lastReadyClients), + ClientsTurnedReady: c.checkPendingGatewayClients(ctx, pendingClients), + ClientsTurnedPending: c.checkActiveGatewayClients(ctx, readyClients), } } func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) { for _, adminAPI := range lastPending { + // We indirectly check readiness of the client by trying to create it. If it succeeds then it means that + // the client is ready to be used. client, err := c.factory.CreateAdminAPIClient(ctx, adminAPI) if err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. @@ -52,6 +58,7 @@ func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, func (c DefaultReadinessChecker) checkActiveGatewayClients(ctx context.Context, lastActive []*adminapi.Client) (turnedPending []adminapi.DiscoveredAdminAPI) { for _, client := range lastActive { + // For active clients we check readiness by calling the Status endpoint. _, err := client.AdminAPIClient().Status(ctx) if err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. From 8917ad80eb09476115e9276689f85008a0ef4b7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Tue, 18 Jul 2023 12:13:41 +0200 Subject: [PATCH 03/11] implement readiness reconciliation and cover readiness checker with tests --- internal/adminapi/client.go | 6 + internal/clients/manager.go | 166 ++++++++++++++++++++------- internal/clients/manager_test.go | 14 +-- internal/clients/readiness.go | 24 +++- internal/clients/readiness_test.go | 178 +++++++++++++++++++++++++++++ 5 files changed, 334 insertions(+), 54 deletions(-) create mode 100644 internal/clients/readiness_test.go diff --git a/internal/adminapi/client.go b/internal/adminapi/client.go index 9a345e5f1c..74d50d3e1e 100644 --- a/internal/adminapi/client.go +++ b/internal/adminapi/client.go @@ -95,6 +95,12 @@ func (c *Client) NodeID(ctx context.Context) (string, error) { return nodeID, nil } +// IsReady returns nil if the Admin API is ready to serve requests. +func (c *Client) IsReady(ctx context.Context) error { + _, err := c.adminAPIClient.Status(ctx) + return err +} + // GetKongVersion returns version of the kong gateway. func (c *Client) GetKongVersion(ctx context.Context) (string, error) { if c.isKonnect { diff --git a/internal/clients/manager.go b/internal/clients/manager.go index e3ec15c5e3..5af50dee9c 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock" "github.com/samber/lo" "github.com/sirupsen/logrus" "golang.org/x/exp/maps" @@ -13,6 +14,10 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" ) +// DefaultReadinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop. +// It's the same as the default interval of the readiness probe. +const DefaultReadinessReconciliationInterval = 10 * time.Second + type ClientFactory interface { CreateAdminAPIClient(ctx context.Context, address adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) } @@ -24,13 +29,16 @@ type AdminAPIClientsProvider interface { GatewayClients() []*adminapi.Client } +type Ticker interface { + Stop() + Channel() <-chan time.Time + Reset(d time.Duration) +} + // AdminAPIClientsManager keeps track of current Admin API clients of Gateways that we should configure. // In particular, it can be notified about the clients' list update with use of Notify method, and queried // for the latest slice of those with use of Clients method. type AdminAPIClientsManager struct { - // adminAPIClientFactory is a factory used for creating Admin API clients. - adminAPIClientFactory ClientFactory - // discoveredAdminAPIsNotifyChan is used for notifications that contain Admin API // endpoints list that should be used for configuring the dataplane. discoveredAdminAPIsNotifyChan chan []adminapi.DiscoveredAdminAPI @@ -38,7 +46,7 @@ type AdminAPIClientsManager struct { ctx context.Context onceNotifyLoopRunning sync.Once - notifyLoopRunningCh chan struct{} + running chan struct{} isNotifyLoopRunning bool // readyGatewayClients represent all Kong Gateway data-planes that are ready to be configured. @@ -51,6 +59,12 @@ type AdminAPIClientsManager struct { // readinessChecker is used to check readiness of the clients. readinessChecker ReadinessChecker + // readinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop. + readinessReconciliationInterval time.Duration + + // readinessReconciliationTicker is used to run readiness reconciliation loop. + readinessReconciliationTicker Ticker + // konnectClient represents a special-case of the data-plane which is Konnect cloud. // This client is used to synchronise configuration with Konnect's Runtime Group Admin API. konnectClient *adminapi.KonnectClient @@ -61,11 +75,21 @@ type AdminAPIClientsManager struct { logger logrus.FieldLogger } +type AdminAPIClientsManagerOption func(*AdminAPIClientsManager) + +// WithReadinessReconciliationTicker allows to set a custom ticker for readiness reconciliation loop. +func WithReadinessReconciliationTicker(ticker Ticker) AdminAPIClientsManagerOption { + return func(m *AdminAPIClientsManager) { + m.readinessReconciliationTicker = ticker + } +} + func NewAdminAPIClientsManager( ctx context.Context, logger logrus.FieldLogger, initialClients []*adminapi.Client, kongClientFactory ClientFactory, + opts ...AdminAPIClientsManagerOption, ) (*AdminAPIClientsManager, error) { if len(initialClients) == 0 { return nil, errors.New("at least one initial client must be provided") @@ -74,26 +98,35 @@ func NewAdminAPIClientsManager( readyClients := lo.SliceToMap(initialClients, func(c *adminapi.Client) (string, *adminapi.Client) { return c.BaseRootURL(), c }) - return &AdminAPIClientsManager{ - readyGatewayClients: readyClients, - pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), - adminAPIClientFactory: kongClientFactory, - discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), - ctx: ctx, - notifyLoopRunningCh: make(chan struct{}), - logger: logger, - }, nil + c := &AdminAPIClientsManager{ + readyGatewayClients: readyClients, + pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), + // inject + readinessChecker: NewDefaultReadinessChecker(kongClientFactory, logger), + readinessReconciliationTicker: clock.NewTicker(), + readinessReconciliationInterval: DefaultReadinessReconciliationInterval, + discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), + ctx: ctx, + running: make(chan struct{}), + logger: logger, + } + + for _, opt := range opts { + opt(c) + } + + return c, nil } // Running returns a channel that is closed when the manager's background tasks are already running. func (c *AdminAPIClientsManager) Running() chan struct{} { - return c.notifyLoopRunningCh + return c.running } -// RunNotifyLoop runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints. -func (c *AdminAPIClientsManager) RunNotifyLoop() { +// Run runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints. +func (c *AdminAPIClientsManager) Run() { c.onceNotifyLoopRunning.Do(func() { - go c.adminAPIAddressNotifyLoop() + go c.gatewayClientsReconciliationLoop() c.lock.Lock() defer c.lock.Unlock() @@ -170,7 +203,7 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru return ch, true } -// adminAPIAddressNotifyLoop is an inner loop listening on notifyChan which are received via +// gatewayClientsReconciliationLoop is an inner loop listening on notifyChan which are received via // Notify() calls. Each time it receives on notifyChan tt will take the provided // list of addresses and update the internally held list of clients such that: // - the internal list of kong clients contains only the provided addresses @@ -182,10 +215,10 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru // // This function will acquire the internal lock to prevent the modification of // internal clients list. -func (c *AdminAPIClientsManager) adminAPIAddressNotifyLoop() { - close(c.notifyLoopRunningCh) - readinessCheckTicker := time.NewTicker(5 * time.Second) - defer readinessCheckTicker.Stop() +func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { + close(c.running) + c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval) + defer c.readinessReconciliationTicker.Stop() for { select { @@ -194,28 +227,43 @@ func (c *AdminAPIClientsManager) adminAPIAddressNotifyLoop() { close(c.discoveredAdminAPIsNotifyChan) c.closeGatewayClientsSubscribers() return - case discoveredAdminAPIs := <-c.discoveredAdminAPIsNotifyChan: - c.logger.Debug("received notification about Admin API addresses change") - if clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs); clientsChanged { - // Notify subscribers that the clients list has been updated. - c.logger.Debug("notifying subscribers about gateway clients change") - c.notifyGatewayClientsSubscribers() - } - case <-readinessCheckTicker.C: - c.logger.Debug("checking readiness of gateway clients") + c.onDiscoveredAdminAPIsNotification(discoveredAdminAPIs) + case <-c.readinessReconciliationTicker.Channel(): + c.onReadinessReconciliationTick() } } } +func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) { + c.lock.Lock() + defer c.lock.Unlock() + + c.logger.Debug("received notification about Admin API addresses change") + if clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs); clientsChanged { + // Notify subscribers that the clients list has been updated. + c.logger.Debug("notifying subscribers about gateway clients change") + c.notifyGatewayClientsSubscribers() + } +} + +func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { + c.lock.Lock() + defer c.lock.Unlock() + + c.logger.Debug("reconciling readiness of gateway clients") + if clientsChanged := c.reconcileGatewayClientsReadiness(); clientsChanged { + // Notify subscribers that the clients list has been updated. + c.logger.Debug("notifying subscribers about gateway clients change") + c.notifyGatewayClientsSubscribers() + } +} + // adjustGatewayClients adjusts internally stored clients slice based on the provided // discovered Admin APIs slice. It consults BaseRootURLs of already stored clients with each // of the discovered Admin APIs and creates only those clients that we don't have. // It returns true if the gatewayClients slice has been changed, false otherwise. func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) (changed bool) { - c.lock.Lock() - defer c.lock.Unlock() - // Short circuit, if len(discoveredAdminAPIs) == 0 { // If we have no clients and the provided list is empty, it means we're in sync. No change was made. @@ -228,23 +276,57 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi return true } - // TODO: make sure that makes sense + // Make sure all discovered clients are in the pending list. + for _, d := range discoveredAdminAPIs { + c.pendingGatewayClients[d.Address] = d + } + + // Remove ready clients that are not present in the discovered list. + for _, cl := range c.readyGatewayClients { + clientNotOnDiscoveredList := !lo.ContainsBy(discoveredAdminAPIs, func(d adminapi.DiscoveredAdminAPI) bool { + return d.Address == cl.BaseRootURL() + }) + if clientNotOnDiscoveredList { + delete(c.readyGatewayClients, cl.BaseRootURL()) + changed = true + } + } + + // Remove pending clients that are not present in the discovered list. + for _, cl := range c.pendingGatewayClients { + clientNotOnDiscoveredList := !lo.ContainsBy(discoveredAdminAPIs, func(d adminapi.DiscoveredAdminAPI) bool { + return d.Address == cl.Address + }) + if clientNotOnDiscoveredList { + delete(c.pendingGatewayClients, cl.Address) + changed = true + } + } + + readinessChanged := c.reconcileGatewayClientsReadiness() + return changed || readinessChanged +} + +// reconcileGatewayClientsReadiness reconciles the readiness of the gateway clients. It ensures that the clients on the +// readyGatewayClients list are still ready and that the clients on the pendingGatewayClients list are still pending. +// If any of the clients is not ready anymore, it will be moved to the pendingGatewayClients list. If any of the clients +// is not pending anymore, it will be moved to the readyGatewayClients list. It returns true if any transition has been +// made, false otherwise. +func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool { + // Reset the ticker after each readiness reconciliation despite the trigger (whether it was a tick or a notification). + // It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications. + defer c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval) - // Perform a readiness check to see which clients are ready and which are not. We'll use the result - // to update the clients lists. readinessCheckResult := c.readinessChecker.CheckReadiness( c.ctx, - lo.Values(c.readyGatewayClients), - discoveredAdminAPIs, + lo.MapToSlice(c.readyGatewayClients, func(_ string, cl *adminapi.Client) AlreadyCreatedClient { return cl }), + lo.Values(c.pendingGatewayClients), ) - // Add clients that turned ready to readyGatewayClients and remove them from pendingGatewayClients. for _, cl := range readinessCheckResult.ClientsTurnedReady { delete(c.pendingGatewayClients, cl.BaseRootURL()) c.readyGatewayClients[cl.BaseRootURL()] = cl } - - // Add clients that turned pending to pendingGatewayClients and remove them from readyGatewayClients. for _, cl := range readinessCheckResult.ClientsTurnedPending { delete(c.readyGatewayClients, cl.Address) c.pendingGatewayClients[cl.Address] = cl diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index adb30285b7..2f60b60327 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -121,7 +121,7 @@ func TestClientAddressesNotifications(t *testing.T) { ) require.NoError(t, err) require.NotNil(t, manager) - manager.RunNotifyLoop() + manager.Run() <-manager.Running() defer testClientFactoryWithExpected.AssertExpectedCalls() @@ -186,7 +186,7 @@ func TestClientAdjustInternalClientsAfterNotification(t *testing.T) { manager, err := NewAdminAPIClientsManager(ctx, logger, []*adminapi.Client{testClient}, cf) require.NoError(t, err) require.NotNil(t, manager) - manager.RunNotifyLoop() + manager.Run() <-manager.Running() clients := manager.GatewayClients() @@ -282,7 +282,7 @@ func TestAdminAPIClientsManager_NotRunningNotifyLoop(t *testing.T) { select { case <-m.Running(): - t.Error("expected manager to not run without explicitly running it with RunNotifyLoop method") + t.Error("expected manager to not run without explicitly running it with Run method") case <-time.After(time.Millisecond * 100): } } @@ -322,7 +322,7 @@ func TestAdminAPIClientsManager_GatewayClientsFromNotificationsAreExpectedToHave cf, ) require.NoError(t, err) - m.RunNotifyLoop() + m.Run() m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")}) @@ -360,7 +360,7 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { require.Falsef(t, ok, "expected no subscription to be created because no notification loop is running") }) - m.RunNotifyLoop() + m.Run() t.Run("when notification loop is running subscription should be created", func(t *testing.T) { ch, ok := m.SubscribeToGatewayClientsChanges() @@ -437,7 +437,7 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { defer cancel() m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) require.NoError(t, err) - m.RunNotifyLoop() + m.Run() var receivedNotificationsCount atomic.Uint32 ch, ok := m.SubscribeToGatewayClientsChanges() @@ -493,7 +493,7 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( defer cancel() m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) require.NoError(t, err) - m.RunNotifyLoop() + m.Run() var receivedNotificationsCount atomic.Uint32 ch, ok := m.SubscribeToGatewayClientsChanges() diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 569f821418..6a717c5613 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -5,6 +5,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/sirupsen/logrus" + k8stypes "k8s.io/apimachinery/pkg/types" ) type ReadinessCheckResult struct { @@ -19,19 +20,32 @@ func (r ReadinessCheckResult) HasChanges() bool { type ReadinessChecker interface { CheckReadiness( ctx context.Context, - readyClients []*adminapi.Client, + alreadyCreatedClients []AlreadyCreatedClient, pendingClients []adminapi.DiscoveredAdminAPI, ) ReadinessCheckResult } +type AlreadyCreatedClient interface { + IsReady(context.Context) error + PodReference() (k8stypes.NamespacedName, bool) + BaseRootURL() string +} + type DefaultReadinessChecker struct { factory ClientFactory logger logrus.FieldLogger } +func NewDefaultReadinessChecker(factory ClientFactory, logger logrus.FieldLogger) DefaultReadinessChecker { + return DefaultReadinessChecker{ + factory: factory, + logger: logger, + } +} + func (c DefaultReadinessChecker) CheckReadiness( ctx context.Context, - readyClients []*adminapi.Client, + readyClients []AlreadyCreatedClient, pendingClients []adminapi.DiscoveredAdminAPI, ) ReadinessCheckResult { return ReadinessCheckResult{ @@ -56,11 +70,10 @@ func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, return turnedReady } -func (c DefaultReadinessChecker) checkActiveGatewayClients(ctx context.Context, lastActive []*adminapi.Client) (turnedPending []adminapi.DiscoveredAdminAPI) { +func (c DefaultReadinessChecker) checkActiveGatewayClients(ctx context.Context, lastActive []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { for _, client := range lastActive { // For active clients we check readiness by calling the Status endpoint. - _, err := client.AdminAPIClient().Status(ctx) - if err != nil { + if err := client.IsReady(ctx); err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. c.logger.WithError(err).Debugf("active client for %q is not ready, moving to pending", client.BaseRootURL()) @@ -68,6 +81,7 @@ func (c DefaultReadinessChecker) checkActiveGatewayClients(ctx context.Context, if !ok { // This should never happen, but if it does, we want to log it. c.logger.Errorf("failed to get PodReference for client %q", client.BaseRootURL()) + continue } turnedPending = append(turnedPending, adminapi.DiscoveredAdminAPI{ Address: client.BaseRootURL(), diff --git a/internal/clients/readiness_test.go b/internal/clients/readiness_test.go new file mode 100644 index 0000000000..0fc2336be1 --- /dev/null +++ b/internal/clients/readiness_test.go @@ -0,0 +1,178 @@ +package clients_test + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" + "github.com/samber/lo" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + k8stypes "k8s.io/apimachinery/pkg/types" +) + +const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" +) + +var ( + testPodRef = k8stypes.NamespacedName{ + Namespace: "default", + Name: "mock", + } +) + +type mockClientFactory struct { + ready map[string]bool // Maps address to readiness. + callsCount map[string]int // Maps address to number of CreateAdminAPIClient calls. + t *testing.T +} + +func newMockClientFactory(t *testing.T, ready map[string]bool) mockClientFactory { + return mockClientFactory{ + ready: ready, + callsCount: map[string]int{}, + t: t, + } +} + +func (cf mockClientFactory) CreateAdminAPIClient(_ context.Context, adminAPI adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) { + address := adminAPI.Address + + cf.callsCount[address]++ + + ready, ok := cf.ready[address] + if !ok { + cf.t.Errorf("unexpected client creation for %s", address) + } + if !ok || !ready { + return nil, fmt.Errorf("client for %s is not ready", address) + } + + return adminapi.NewTestClient(address) +} + +type mockAlreadyCreatedClient struct { + url string + isReady bool +} + +func (m mockAlreadyCreatedClient) IsReady(context.Context) error { + if !m.isReady { + return errors.New("not ready") + } + return nil +} + +func (m mockAlreadyCreatedClient) PodReference() (k8stypes.NamespacedName, bool) { + return testPodRef, true +} + +func (m mockAlreadyCreatedClient) BaseRootURL() string { + return m.url +} + +func TestDefaultReadinessChecker(t *testing.T) { + testCases := []struct { + name string + + alreadyCreatedClients []clients.AlreadyCreatedClient + pendingClients []adminapi.DiscoveredAdminAPI + pendingClientsReadiness map[string]bool + + expectedTurnedReady []string + expectedTurnedPending []string + }{ + { + name: "ready turning pending", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: false, + }, + }, + expectedTurnedPending: []string{testURL1}, + }, + { + name: "pending turning ready", + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL1, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL1: true, + }, + expectedTurnedReady: []string{testURL1}, + }, + { + name: "ready turning pending, pending turning ready at once", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: false, + }, + }, + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL2, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL2: true, + }, + expectedTurnedReady: []string{testURL2}, + expectedTurnedPending: []string{testURL1}, + }, + { + name: "no changes", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: true, + }, + }, + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL2, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL2: false, + }, + expectedTurnedReady: nil, + expectedTurnedPending: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + factory := newMockClientFactory(t, tc.pendingClientsReadiness) + checker := clients.NewDefaultReadinessChecker(factory, logrus.New()) + result := checker.CheckReadiness(context.Background(), tc.alreadyCreatedClients, tc.pendingClients) + + turnedPending := lo.Map(result.ClientsTurnedPending, func(c adminapi.DiscoveredAdminAPI, _ int) string { return c.Address }) + turnedReady := lo.Map(result.ClientsTurnedReady, func(c *adminapi.Client, _ int) string { return c.BaseRootURL() }) + + require.ElementsMatch(t, tc.expectedTurnedReady, turnedReady) + require.ElementsMatch(t, tc.expectedTurnedPending, turnedPending) + + // For every pending client turning ready we expect exactly one call to CreateAdminAPIClient. + for _, url := range tc.pendingClients { + require.Equal(t, 1, factory.callsCount[url.Address]) + } + + // For every already created client we expect no calls to CreateAdminAPIClient. + for _, url := range tc.alreadyCreatedClients { + require.Zero(t, factory.callsCount[url.BaseRootURL()]) + } + }) + } +} From ec0f9019aacfdc5d055f136147b86f30d8be27eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Tue, 18 Jul 2023 17:10:02 +0200 Subject: [PATCH 04/11] more tests --- internal/clients/manager.go | 15 +- internal/clients/manager_test.go | 361 ++++++++--------------------- internal/clients/readiness_test.go | 44 +++- 3 files changed, 147 insertions(+), 273 deletions(-) diff --git a/internal/clients/manager.go b/internal/clients/manager.go index 5af50dee9c..fc0a02a791 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -88,7 +88,7 @@ func NewAdminAPIClientsManager( ctx context.Context, logger logrus.FieldLogger, initialClients []*adminapi.Client, - kongClientFactory ClientFactory, + readinessChecker ReadinessChecker, opts ...AdminAPIClientsManagerOption, ) (*AdminAPIClientsManager, error) { if len(initialClients) == 0 { @@ -99,10 +99,9 @@ func NewAdminAPIClientsManager( return c.BaseRootURL(), c }) c := &AdminAPIClientsManager{ - readyGatewayClients: readyClients, - pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), - // inject - readinessChecker: NewDefaultReadinessChecker(kongClientFactory, logger), + readyGatewayClients: readyClients, + pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), + readinessChecker: readinessChecker, readinessReconciliationTicker: clock.NewTicker(), readinessReconciliationInterval: DefaultReadinessReconciliationInterval, discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), @@ -276,9 +275,11 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi return true } - // Make sure all discovered clients are in the pending list. + // Make sure all discovered clients that are not on ready list are in the pending list. for _, d := range discoveredAdminAPIs { - c.pendingGatewayClients[d.Address] = d + if _, ok := c.readyGatewayClients[d.Address]; !ok { + c.pendingGatewayClients[d.Address] = d + } } // Remove ready clients that are not present in the discovered list. diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index 2f60b60327..0a591155fa 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -1,270 +1,130 @@ -package clients +package clients_test import ( "context" - "fmt" - "net/http" - "net/http/httptest" - "sync/atomic" + "sync" "testing" "time" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" "github.com/samber/lo" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "golang.org/x/exp/slices" k8stypes "k8s.io/apimachinery/pkg/types" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" ) -// clientFactoryWithExpected implements ClientFactory interface and can be used -// in tests to assert which clients have been created and signal failure if: -// - client for an unexpected address gets created -// - client which already got created was tried to be created second time. -type clientFactoryWithExpected struct { - expected map[string]bool - t *testing.T +type mockReadinessChecker struct { + nextResult clients.ReadinessCheckResult + lock sync.RWMutex } -func (cf clientFactoryWithExpected) CreateAdminAPIClient(_ context.Context, adminAPI adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) { - address := adminAPI.Address - stillExpecting, ok := cf.expected[address] - if !ok { - cf.t.Errorf("got %s which was unexpected", address) - return nil, fmt.Errorf("got %s which was unexpected", address) - } - if !stillExpecting { - cf.t.Errorf("got %s more than once", address) - return nil, fmt.Errorf("got %s more than once", address) - } - cf.expected[address] = false - - return adminapi.NewTestClient(address) +func (m *mockReadinessChecker) CheckReadiness( + context.Context, + []clients.AlreadyCreatedClient, + []adminapi.DiscoveredAdminAPI, +) clients.ReadinessCheckResult { + m.lock.RLock() + defer m.lock.RUnlock() + return m.nextResult } -func (cf clientFactoryWithExpected) AssertExpectedCalls() { - for _, addr := range cf.ExpectedCallsLeft() { - cf.t.Errorf("%s client expected to be called, but wasn't", addr) - } +func (m *mockReadinessChecker) LetChecksReturn(result clients.ReadinessCheckResult) { + m.lock.Lock() + defer m.lock.Unlock() + m.nextResult = result } -func (cf clientFactoryWithExpected) ExpectedCallsLeft() []string { - var notCalled []string - for addr, stillExpected := range cf.expected { - if stillExpected { - notCalled = append(notCalled, addr) - } - } - return notCalled +func intoTurnedReady(urls ...string) []*adminapi.Client { + return lo.Map(urls, func(url string, _ int) *adminapi.Client { + return lo.Must(adminapi.NewTestClient(url)) + }) } -type alwaysSuccessClientFactory struct{} - -func (a alwaysSuccessClientFactory) CreateAdminAPIClient( - _ context.Context, - adminAPI adminapi.DiscoveredAdminAPI, -) (*adminapi.Client, error) { - return adminapi.NewTestClient(adminAPI.Address) +func intoTurnedPending(urls ...string) []adminapi.DiscoveredAdminAPI { + return lo.Map(urls, func(url string, _ int) adminapi.DiscoveredAdminAPI { + return testDiscoveredAdminAPI(url) + }) } -func TestClientAddressesNotifications(t *testing.T) { - var ( - logger = logrus.New() - expected = map[string]bool{} - serverCalls int32 - ) - - const numberOfServers = 2 - - createTestServer := func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // This test server serves as kong Admin API checking that we only get - // as many calls as new clients requests. - // That said: when we have 1 manager with url1 and we receive a notification - // with url1 and url2 we should only create the second manager with - // url2 and leave the existing one (for url1) in place and reuse it. - - atomic.AddInt32(&serverCalls, 1) - n := int(atomic.LoadInt32(&serverCalls)) - - if n > numberOfServers { - t.Errorf("clients should only call out to the server %d times, but we received %d requests", - numberOfServers, n, - ) - } - })) - } - - srv := createTestServer() - defer srv.Close() - expected[srv.URL] = true - - srv2 := createTestServer() - defer srv2.Close() - expected[srv2.URL] = true - - testClientFactoryWithExpected := clientFactoryWithExpected{ - expected: expected, - t: t, - } +func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - initialClient, err := adminapi.NewTestClient("localhost:8083") + logger := logrus.New() + readinessChecker := &mockReadinessChecker{} + initialClient, err := adminapi.NewTestClient("https://localhost:8083") require.NoError(t, err) - manager, err := NewAdminAPIClientsManager( + manager, err := clients.NewAdminAPIClientsManager( ctx, logger, []*adminapi.Client{initialClient}, - testClientFactoryWithExpected, + readinessChecker, ) require.NoError(t, err) require.NotNil(t, manager) manager.Run() <-manager.Running() - defer testClientFactoryWithExpected.AssertExpectedCalls() - - requireClientsCountEventually := func(t *testing.T, c *AdminAPIClientsManager, addresses []string, args ...any) { + requireClientsMatchEventually := func(t *testing.T, c *clients.AdminAPIClientsManager, addresses []string, args ...any) { require.Eventually(t, func() bool { clientAddresses := lo.Map(c.GatewayClients(), func(cl *adminapi.Client, _ int) string { return cl.BaseRootURL() }) return slices.Equal(addresses, clientAddresses) - }, time.Second, time.Millisecond, args..., - ) + }, time.Second, time.Millisecond, args...) } - requireClientsCountEventually(t, manager, []string{"localhost:8083"}, + requireClientsMatchEventually(t, manager, []string{initialClient.BaseRootURL()}, "initially there should be the initial client") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL}, + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, "after notifying about a new address we should get 1 client eventually") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL}, + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, "after notifying the same address there's no update in clients") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL), testDiscoveredAdminAPI(srv2.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL, srv2.URL}, - "after notifying new address set including the old already existing one we get both the old and the new") + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, + "after notifying new address set including the old already existing one but new one not yet ready we get just the old one") + + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1, testURL2}, + "after notifying new address set including the old already existing one and new one turning ready we get both the old and the new") + + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1, testURL2}, + "after notifying again with the same set of URLs should not change the existing URLs") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL), testDiscoveredAdminAPI(srv2.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL, srv2.URL}, - "notifying again with the same set of URLs should not change the existing URLs") + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL2)}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, + "after notifying the same address set with one turning pending, we get only one client") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL}, + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, "notifying again with just one URL should decrease the set of URLs to just this one") manager.Notify([]adminapi.DiscoveredAdminAPI{}) - requireClientsCountEventually(t, manager, []string{}) - - // We could test here notifying about srv.URL and srv2.URL again but there's - // no data structure in the manager that could notify us about a removal of - // a manager which we could use here. + requireClientsMatchEventually(t, manager, []string{}) cancel() require.NotPanics(t, func() { manager.Notify([]adminapi.DiscoveredAdminAPI{}) }, "notifying about new clients after manager has been shut down shouldn't panic") } -func TestClientAdjustInternalClientsAfterNotification(t *testing.T) { - var ( - ctx = context.Background() - logger = logrus.New() - ) - - cf := &clientFactoryWithExpected{ - t: t, - } - - // Initial client is expected to be replaced later on. - testClient, err := adminapi.NewTestClient("localhost:8083") - require.NoError(t, err) - manager, err := NewAdminAPIClientsManager(ctx, logger, []*adminapi.Client{testClient}, cf) - require.NoError(t, err) - require.NotNil(t, manager) - manager.Run() - <-manager.Running() - - clients := manager.GatewayClients() - require.Len(t, clients, 1) - require.Equal(t, "localhost:8083", clients[0].BaseRootURL()) - - requireNoExpectedCallsLeftEventually := func(t *testing.T) { - require.Eventually(t, func() bool { - return len(cf.ExpectedCallsLeft()) == 0 - }, time.Second, time.Millisecond) - } - - t.Run("2 new clients", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{"localhost:8080": true, "localhost:8081": true} - // there are 2 addresses contained in the notification of which 2 are new - // and client creator should be called exactly 2 times - clients := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("localhost:8080"), - testDiscoveredAdminAPI("localhost:8081"), - } - changed := manager.adjustGatewayClients(clients) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(clients) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) - - t.Run("1 addresses, no new client", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{} - // there is address contained in the notification but a client for that - // address already exists, client creator should not be called - clients := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("localhost:8080")} - changed := manager.adjustGatewayClients(clients) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(clients) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) - - t.Run("2 addresses, 1 new client", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{"localhost:8081": true} - // there are 2 addresses contained in the notification but only 1 is new - // hence the client creator should be called only once - clients := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("localhost:8080"), - testDiscoveredAdminAPI("localhost:8081"), - } - changed := manager.adjustGatewayClients(clients) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(clients) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) - - t.Run("0 addresses", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{} - // there are 0 addresses contained in the notification hence the client - // creator should not be called - changed := manager.adjustGatewayClients([]adminapi.DiscoveredAdminAPI(nil)) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(nil) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) -} - func TestNewAdminAPIClientsManager_NoInitialClientsDisallowed(t *testing.T) { - cf := &clientFactoryWithExpected{t: t} - _, err := NewAdminAPIClientsManager(context.Background(), logrus.New(), nil, cf) - require.Error(t, err) + _, err := clients.NewAdminAPIClientsManager(context.Background(), logrus.New(), nil, &mockReadinessChecker{}) + require.ErrorContains(t, err, "at least one initial client must be provided") } func TestAdminAPIClientsManager_NotRunningNotifyLoop(t *testing.T) { @@ -272,11 +132,11 @@ func TestAdminAPIClientsManager_NotRunningNotifyLoop(t *testing.T) { testClient, err := adminapi.NewTestClient("localhost:8080") require.NoError(t, err) - m, err := NewAdminAPIClientsManager( + m, err := clients.NewAdminAPIClientsManager( context.Background(), logrus.New(), []*adminapi.Client{testClient}, - &clientFactoryWithExpected{t: t}, + &mockReadinessChecker{}, ) require.NoError(t, err) @@ -292,11 +152,11 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { testClient, err := adminapi.NewTestClient("localhost:8080") require.NoError(t, err) - m, err := NewAdminAPIClientsManager( + m, err := clients.NewAdminAPIClientsManager( context.Background(), logrus.New(), []*adminapi.Client{testClient}, - &clientFactoryWithExpected{t: t}, + &mockReadinessChecker{}, ) require.NoError(t, err) require.Len(t, m.GatewayClients(), 1, "expecting one initial client") @@ -309,49 +169,15 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { require.Equal(t, konnectTestClient, m.KonnectClient(), "konnect client should be returned from KonnectClient") } -func TestAdminAPIClientsManager_GatewayClientsFromNotificationsAreExpectedToHavePodRef(t *testing.T) { - t.Parallel() - - cf := &clientFactoryWithExpected{t: t, expected: map[string]bool{"http://10.0.0.1:8080": true}} - testClient, err := adminapi.NewTestClient("http://localhost:8080") - require.NoError(t, err) - m, err := NewAdminAPIClientsManager( - context.Background(), - logrus.New(), - []*adminapi.Client{testClient}, - cf, - ) - require.NoError(t, err) - m.Run() - - m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")}) - - require.Eventually(t, func() bool { - gwClients := m.GatewayClients() - if len(gwClients) != 1 { - t.Log("there's no gateway clients...") - return false - } - _, ok := gwClients[0].PodReference() - if !ok { - t.Log("there's no pod reference attached") - } - return ok - }, time.Second, time.Millisecond) -} - func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { t.Parallel() - cf := &clientFactoryWithExpected{t: t, expected: map[string]bool{ - "http://10.0.0.1:8080": true, - "http://10.0.0.2:8080": true, - }} + readinessChecker := &mockReadinessChecker{} testClient, err := adminapi.NewTestClient("http://localhost:8080") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) + m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) t.Run("no notify loop running should return false when subscribing", func(t *testing.T) { @@ -367,9 +193,10 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { require.NotNil(t, ch) require.True(t, ok) + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1, testURL2)}) m.Notify([]adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("http://10.0.0.1:8080"), - testDiscoveredAdminAPI("http://10.0.0.2:8080"), + testDiscoveredAdminAPI(testURL1), + testDiscoveredAdminAPI(testURL2), }) select { @@ -389,7 +216,8 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { require.NotNil(t, sub2) require.True(t, ok) - m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")}) + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL2)}) + m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) select { case <-sub2: @@ -429,13 +257,14 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { t.Parallel() - cf := alwaysSuccessClientFactory{} - testClient, err := adminapi.NewTestClient("http://10.0.0.1:8080") + readinessChecker := &mockReadinessChecker{} + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) + testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) + m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) m.Run() @@ -451,7 +280,7 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { case <-ch: // Call GatewayClients() here to make sure that we can access the clients safely // from the subscriber goroutine without causing a deadlock in the notify loop. - require.Len(t, m.GatewayClients(), 1, "expected to get 1 client") + _ = m.GatewayClients() receivedNotificationsCount.Add(1) case <-ctx.Done(): return @@ -459,19 +288,16 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { } }() - // Run multiple notifiers in parallel to make sure that Notify is safe for concurrent use. - // We'll use two sets of clients interchangeably to cause an actual change that results in a notification. - oddClients := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")} - evenClients := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.2:8080")} for i := 0; i < 10; i++ { i := i go func() { - // Notify with even or odd clients interchangeably depending on the iteration to trigger a change. + // Swap between ready and pending interchangeably depending on the iteration to trigger a change. if pickEven := i%2 == 0; pickEven { - m.Notify(evenClients) + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) } else { - m.Notify(oddClients) + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL1)}) } + m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) }() } @@ -485,13 +311,13 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { } func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange(t *testing.T) { - cf := alwaysSuccessClientFactory{} - testClient, err := adminapi.NewTestClient("http://10.0.0.1:8080") + testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) + readinessChecker := &mockReadinessChecker{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) + m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) m.Run() @@ -513,10 +339,10 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( }() firstClientsSet := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("http://10.0.0.1:8080"), + testDiscoveredAdminAPI(testURL1), } secondClientsSet := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("http://10.0.0.2:8080"), + testDiscoveredAdminAPI(testURL2), } notificationsCountEventuallyEquals := func(expectedCount int) { require.Eventually(t, func() bool { @@ -540,7 +366,12 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( m.Notify(nil) notificationsCountEventuallyEquals(1) - // Notify the second set of clients and make sure that the subscriber gets notified. + // Notify the second set of clients without making the new one ready and make sure that the subscriber gets no notification. + m.Notify(secondClientsSet) + notificationsCountEventuallyEquals(1) + + // Notify the second set of clients and make sure that the subscriber gets notified after the new one becomes ready. + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) m.Notify(secondClientsSet) notificationsCountEventuallyEquals(2) } diff --git a/internal/clients/readiness_test.go b/internal/clients/readiness_test.go index 0fc2336be1..3e2a939f25 100644 --- a/internal/clients/readiness_test.go +++ b/internal/clients/readiness_test.go @@ -150,6 +150,48 @@ func TestDefaultReadinessChecker(t *testing.T) { expectedTurnedReady: nil, expectedTurnedPending: nil, }, + { + name: "no clients at all", + expectedTurnedReady: nil, + expectedTurnedPending: nil, + }, + { + name: "multiple ready, one turning pending", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: true, + }, + mockAlreadyCreatedClient{ + url: testURL2, + isReady: false, // This one will turn pending. + }, + }, + expectedTurnedReady: nil, + expectedTurnedPending: []string{ + testURL2, + }, + }, + { + name: "multiple pending, one turning ready", + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL1, + PodRef: testPodRef, + }, + { + Address: testURL2, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL1: false, + testURL2: true, // This one will turn ready. + }, + expectedTurnedReady: []string{ + testURL2, + }, + }, } for _, tc := range testCases { @@ -169,7 +211,7 @@ func TestDefaultReadinessChecker(t *testing.T) { require.Equal(t, 1, factory.callsCount[url.Address]) } - // For every already created client we expect no calls to CreateAdminAPIClient. + // For every already created client we expect NO calls to CreateAdminAPIClient. for _, url := range tc.alreadyCreatedClients { require.Zero(t, factory.callsCount[url.BaseRootURL()]) } From c960833aae10022f97dbe661eb53f2c8cc095707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Tue, 18 Jul 2023 20:39:38 +0200 Subject: [PATCH 05/11] change the readinessProbe, adjust discoverer, align tests --- .../multi-gw/base/gateway_deployment.yaml | 2 +- .../single/all-in-one-dbless-enterprise.yaml | 2 +- .../all-in-one-dbless-k4k8s-enterprise.yaml | 2 +- .../all-in-one-dbless-konnect-enterprise.yaml | 2 +- deploy/single/all-in-one-dbless-konnect.yaml | 2 +- deploy/single/all-in-one-dbless.yaml | 2 +- internal/adminapi/endpoints.go | 2 +- internal/adminapi/endpoints_test.go | 27 ++- internal/clients/config_status_test.go | 5 +- internal/clients/manager.go | 67 +++--- internal/clients/manager_test.go | 192 ++++++++++++++---- internal/clients/readiness.go | 27 ++- internal/clients/readiness_test.go | 15 +- .../configuration/kongadminapi_controller.go | 3 +- .../kongadminapi_controller_envtest_test.go | 12 +- internal/manager/run.go | 7 +- 16 files changed, 246 insertions(+), 123 deletions(-) diff --git a/config/variants/multi-gw/base/gateway_deployment.yaml b/config/variants/multi-gw/base/gateway_deployment.yaml index 8e42d21a96..cc1e9c6fdf 100644 --- a/config/variants/multi-gw/base/gateway_deployment.yaml +++ b/config/variants/multi-gw/base/gateway_deployment.yaml @@ -99,7 +99,7 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-enterprise.yaml b/deploy/single/all-in-one-dbless-enterprise.yaml index 60247c1ada..9a1a72670d 100644 --- a/deploy/single/all-in-one-dbless-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-enterprise.yaml @@ -2108,7 +2108,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml index 41f08bc09d..bcfe7e1ba0 100644 --- a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml @@ -2113,7 +2113,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-konnect-enterprise.yaml b/deploy/single/all-in-one-dbless-konnect-enterprise.yaml index 6277a4719d..6c3584fc54 100644 --- a/deploy/single/all-in-one-dbless-konnect-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-konnect-enterprise.yaml @@ -2123,7 +2123,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-konnect.yaml b/deploy/single/all-in-one-dbless-konnect.yaml index 0696a21239..baf44b23cc 100644 --- a/deploy/single/all-in-one-dbless-konnect.yaml +++ b/deploy/single/all-in-one-dbless-konnect.yaml @@ -2123,7 +2123,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless.yaml b/deploy/single/all-in-one-dbless.yaml index 58da6c20e5..eb912ab561 100644 --- a/deploy/single/all-in-one-dbless.yaml +++ b/deploy/single/all-in-one-dbless.yaml @@ -2108,7 +2108,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/internal/adminapi/endpoints.go b/internal/adminapi/endpoints.go index 675e091cc2..bbeeec0bf0 100644 --- a/internal/adminapi/endpoints.go +++ b/internal/adminapi/endpoints.go @@ -122,7 +122,7 @@ func (d *Discoverer) AdminAPIsFromEndpointSlice( } for _, e := range endpoints.Endpoints { - if e.Conditions.Ready == nil || !*e.Conditions.Ready { + if e.Conditions.Terminating != nil && *e.Conditions.Terminating { continue } diff --git a/internal/adminapi/endpoints_test.go b/internal/adminapi/endpoints_test.go index 63136cab93..d019892888 100644 --- a/internal/adminapi/endpoints_test.go +++ b/internal/adminapi/endpoints_test.go @@ -167,7 +167,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { dnsStrategy: cfgtypes.ServiceScopedPodDNSStrategy, }, { - name: "not ready endpoints are not returned", + name: "not ready endpoints are returned", endpoints: discoveryv1.EndpointSlice{ ObjectMeta: endpointsSliceObjectMeta, AddressType: discoveryv1.AddressTypeIPv4, @@ -183,12 +183,18 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { }, Ports: builder.NewEndpointPort(8444).WithName("admin").IntoSlice(), }, - portNames: sets.New("admin"), - want: sets.New[DiscoveredAdminAPI](), + portNames: sets.New("admin"), + want: sets.New[DiscoveredAdminAPI]( + DiscoveredAdminAPI{ + Address: "https://10.0.0.1:8444", + PodRef: k8stypes.NamespacedName{ + Name: "pod-1", Namespace: namespaceName, + }, + }), dnsStrategy: cfgtypes.IPDNSStrategy, }, { - name: "not ready and terminating endpoints are not returned", + name: "ready and terminating endpoints are not returned", endpoints: discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: uuid.NewString(), @@ -199,7 +205,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { { Addresses: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, Conditions: discoveryv1.EndpointConditions{ - Ready: lo.ToPtr(false), + Ready: lo.ToPtr(true), Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-1"), @@ -237,7 +243,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { Addresses: []string{"10.0.2.1"}, Conditions: discoveryv1.EndpointConditions{ Ready: lo.ToPtr(false), - Terminating: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-3"), }, @@ -289,7 +295,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { Addresses: []string{"10.0.2.1"}, Conditions: discoveryv1.EndpointConditions{ Ready: lo.ToPtr(false), - Terminating: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-3"), }, @@ -551,7 +557,7 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) { Addresses: []string{"8.0.0.1"}, Conditions: discoveryv1.EndpointConditions{ Ready: lo.ToPtr(false), - Terminating: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-3"), }, @@ -637,7 +643,7 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) { dnsStrategy: cfgtypes.IPDNSStrategy, }, { - name: "not Ready Endpoints are not matched", + name: "terminating Endpoints are not matched", service: k8stypes.NamespacedName{ Namespace: namespaceName, Name: serviceName, @@ -652,7 +658,8 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) { { Addresses: []string{"7.0.0.1"}, Conditions: discoveryv1.EndpointConditions{ - Ready: lo.ToPtr(false), + Ready: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-1"), }, diff --git a/internal/clients/config_status_test.go b/internal/clients/config_status_test.go index d79576f421..c07bfb56af 100644 --- a/internal/clients/config_status_test.go +++ b/internal/clients/config_status_test.go @@ -5,15 +5,14 @@ import ( "testing" "time" - "github.com/go-logr/logr/testr" + "github.com/go-logr/logr" "github.com/stretchr/testify/require" "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" ) func TestChannelConfigNotifier(t *testing.T) { - logger := testr.New(t) - n := clients.NewChannelConfigNotifier(logger) + n := clients.NewChannelConfigNotifier(logr.Discard()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/clients/manager.go b/internal/clients/manager.go index fc0a02a791..d9e9f8bb3b 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -6,18 +6,19 @@ import ( "sync" "time" - "github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock" "github.com/samber/lo" "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock" ) // DefaultReadinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop. -// It's the same as the default interval of the readiness probe. +// It's the same as the default interval of a Kubernetes container's readiness probe. const DefaultReadinessReconciliationInterval = 10 * time.Second +// ClientFactory is responsible for creating Admin API clients. type ClientFactory interface { CreateAdminAPIClient(ctx context.Context, address adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) } @@ -29,6 +30,7 @@ type AdminAPIClientsProvider interface { GatewayClients() []*adminapi.Client } +// Ticker is an interface that allows to control a ticker. type Ticker interface { Stop() Channel() <-chan time.Time @@ -36,8 +38,9 @@ type Ticker interface { } // AdminAPIClientsManager keeps track of current Admin API clients of Gateways that we should configure. -// In particular, it can be notified about the clients' list update with use of Notify method, and queried -// for the latest slice of those with use of Clients method. +// In particular, it can be notified about the discovered clients' list with use of Notify method, and queried +// for the latest slice of ready to be configured clients with use of GatewayClients method. It also runs periodic +// readiness reconciliation loop which is responsible for checking readiness of the clients. type AdminAPIClientsManager struct { // discoveredAdminAPIsNotifyChan is used for notifications that contain Admin API // endpoints list that should be used for configuring the dataplane. @@ -47,7 +50,7 @@ type AdminAPIClientsManager struct { ctx context.Context onceNotifyLoopRunning sync.Once running chan struct{} - isNotifyLoopRunning bool + isRunning bool // readyGatewayClients represent all Kong Gateway data-planes that are ready to be configured. readyGatewayClients map[string]*adminapi.Client @@ -59,9 +62,6 @@ type AdminAPIClientsManager struct { // readinessChecker is used to check readiness of the clients. readinessChecker ReadinessChecker - // readinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop. - readinessReconciliationInterval time.Duration - // readinessReconciliationTicker is used to run readiness reconciliation loop. readinessReconciliationTicker Ticker @@ -99,15 +99,14 @@ func NewAdminAPIClientsManager( return c.BaseRootURL(), c }) c := &AdminAPIClientsManager{ - readyGatewayClients: readyClients, - pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), - readinessChecker: readinessChecker, - readinessReconciliationTicker: clock.NewTicker(), - readinessReconciliationInterval: DefaultReadinessReconciliationInterval, - discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), - ctx: ctx, - running: make(chan struct{}), - logger: logger, + readyGatewayClients: readyClients, + pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), + readinessChecker: readinessChecker, + readinessReconciliationTicker: clock.NewTicker(), + discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), + ctx: ctx, + running: make(chan struct{}), + logger: logger, } for _, opt := range opts { @@ -123,13 +122,14 @@ func (c *AdminAPIClientsManager) Running() chan struct{} { } // Run runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints. +// It should only be called when Gateway Discovery is enabled. func (c *AdminAPIClientsManager) Run() { c.onceNotifyLoopRunning.Do(func() { go c.gatewayClientsReconciliationLoop() c.lock.Lock() defer c.lock.Unlock() - c.isNotifyLoopRunning = true + c.isRunning = true }) } @@ -193,7 +193,7 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru } // No notify loop running, there will be no updates, let's propagate that to the caller. - if !c.isNotifyLoopRunning { + if !c.isRunning { return nil, false } @@ -202,28 +202,18 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru return ch, true } -// gatewayClientsReconciliationLoop is an inner loop listening on notifyChan which are received via -// Notify() calls. Each time it receives on notifyChan tt will take the provided -// list of addresses and update the internally held list of clients such that: -// - the internal list of kong clients contains only the provided addresses -// - if a client for a provided address already exists it's not recreated again -// (hence no external calls are made to check the provided endpoint if there -// exists a client already using it) -// - client that do not exist in the provided address list are removed if they -// are present in the current state -// -// This function will acquire the internal lock to prevent the modification of -// internal clients list. +// gatewayClientsReconciliationLoop is an inner loop listening on: +// - discoveredAdminAPIsNotifyChan - triggered on every Notify() call. +// - readinessReconciliationTicker - triggered on every readinessReconciliationTicker tick. func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { - close(c.running) - c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval) + c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) defer c.readinessReconciliationTicker.Stop() + close(c.running) for { select { case <-c.ctx.Done(): c.logger.Infof("closing AdminAPIClientsManager: %s", c.ctx.Err()) - close(c.discoveredAdminAPIsNotifyChan) c.closeGatewayClientsSubscribers() return case discoveredAdminAPIs := <-c.discoveredAdminAPIsNotifyChan: @@ -234,6 +224,9 @@ func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { } } +// onDiscoveredAdminAPIsNotification is called when a new notification about Admin API addresses change is received. +// It will adjust lists of gateway clients and notify subscribers about the change if readyGatewayClients list has +// changed. func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) { c.lock.Lock() defer c.lock.Unlock() @@ -246,6 +239,8 @@ func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdm } } +// onReadinessReconciliationTick is called on every readinessReconciliationTicker tick. It will reconcile readiness +// of all gateway clients and notify subscribers about the change if readyGatewayClients list has changed. func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { c.lock.Lock() defer c.lock.Unlock() @@ -275,7 +270,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi return true } - // Make sure all discovered clients that are not on ready list are in the pending list. + // Make sure all discovered clients that are not in the ready list are in the pending list. for _, d := range discoveredAdminAPIs { if _, ok := c.readyGatewayClients[d.Address]; !ok { c.pendingGatewayClients[d.Address] = d @@ -316,7 +311,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool { // Reset the ticker after each readiness reconciliation despite the trigger (whether it was a tick or a notification). // It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications. - defer c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval) + defer c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) readinessCheckResult := c.readinessChecker.CheckReadiness( c.ctx, diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index 0a591155fa..6493aca026 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -3,32 +3,51 @@ package clients_test import ( "context" "sync" + "sync/atomic" "testing" "time" - "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" + "github.com/google/go-cmp/cmp" "github.com/samber/lo" + "github.com/samber/mo" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "golang.org/x/exp/slices" k8stypes "k8s.io/apimachinery/pkg/types" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" + "github.com/kong/kubernetes-ingress-controller/v2/test/mocks" ) +type readinessCheckCall struct { + AlreadyCreatedURLs []string + PendingURLs []string +} + type mockReadinessChecker struct { nextResult clients.ReadinessCheckResult + lastCall mo.Option[readinessCheckCall] + callsCount int lock sync.RWMutex } func (m *mockReadinessChecker) CheckReadiness( - context.Context, - []clients.AlreadyCreatedClient, - []adminapi.DiscoveredAdminAPI, + _ context.Context, + alreadyCreatedClients []clients.AlreadyCreatedClient, + pendingClients []adminapi.DiscoveredAdminAPI, ) clients.ReadinessCheckResult { - m.lock.RLock() - defer m.lock.RUnlock() + m.lock.Lock() + defer m.lock.Unlock() + m.callsCount++ + m.lastCall = mo.Some(readinessCheckCall{ + AlreadyCreatedURLs: lo.Map(alreadyCreatedClients, func(c clients.AlreadyCreatedClient, _ int) string { + return c.BaseRootURL() + }), + PendingURLs: lo.Map(pendingClients, func(c adminapi.DiscoveredAdminAPI, _ int) string { + return c.Address + }), + }) return m.nextResult } @@ -38,6 +57,21 @@ func (m *mockReadinessChecker) LetChecksReturn(result clients.ReadinessCheckResu m.nextResult = result } +func (m *mockReadinessChecker) LastCall() (readinessCheckCall, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + if call, ok := m.lastCall.Get(); ok { + return call, true + } + return readinessCheckCall{}, false +} + +func (m *mockReadinessChecker) CallsCount() int { + m.lock.RLock() + defer m.lock.RUnlock() + return m.callsCount +} + func intoTurnedReady(urls ...string) []*adminapi.Client { return lo.Map(urls, func(url string, _ int) *adminapi.Client { return lo.Must(adminapi.NewTestClient(url)) @@ -255,8 +289,6 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { } func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { - t.Parallel() - readinessChecker := &mockReadinessChecker{} readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) testClient, err := adminapi.NewTestClient(testURL1) @@ -268,49 +300,30 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { require.NoError(t, err) m.Run() - var receivedNotificationsCount atomic.Uint32 - ch, ok := m.SubscribeToGatewayClientsChanges() - require.NotNil(t, ch) - require.True(t, ok) - - // Run subscriber worker in a separate goroutine to consume notifications. + // Run a goroutine that will call GatewayClients() every millisecond. go func() { for { select { - case <-ch: - // Call GatewayClients() here to make sure that we can access the clients safely - // from the subscriber goroutine without causing a deadlock in the notify loop. + case <-time.Tick(time.Millisecond): _ = m.GatewayClients() - receivedNotificationsCount.Add(1) case <-ctx.Done(): return } } }() - for i := 0; i < 10; i++ { - i := i - go func() { - // Swap between ready and pending interchangeably depending on the iteration to trigger a change. - if pickEven := i%2 == 0; pickEven { - readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) - } else { - readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL1)}) - } - m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) - }() - } + go func() { + for i := 0; i < 100; i++ { + go m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + } + }() require.Eventually(t, func() bool { - if count := receivedNotificationsCount.Load(); count < 2 { - t.Logf("Received %d notifications, expected at least 2, waiting...", count) - return false - } - return true - }, time.Second, time.Millisecond, "expected to receive at least 2 notifications") + return readinessChecker.CallsCount() == 100 + }, time.Second, time.Millisecond) } -func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange(t *testing.T) { +func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) { testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) @@ -319,7 +332,9 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( defer cancel() m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) + m.Run() + <-m.Running() var receivedNotificationsCount atomic.Uint32 ch, ok := m.SubscribeToGatewayClientsChanges() @@ -338,12 +353,8 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( } }() - firstClientsSet := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI(testURL1), - } - secondClientsSet := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI(testURL2), - } + firstClientsSet := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)} + secondClientsSet := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL2)} notificationsCountEventuallyEquals := func(expectedCount int) { require.Eventually(t, func() bool { if count := receivedNotificationsCount.Load(); count != uint32(expectedCount) { @@ -353,27 +364,120 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( return true }, time.Second, time.Millisecond, "expected to receive %d notifications", expectedCount) } + requireLastReadinessCheckCall := func(expected readinessCheckCall) { + call, ok := readinessChecker.LastCall() + require.True(t, ok, "expected call to readiness checker") + require.Equal(t, expected, call) + } // Notify the first set of clients and make sure that the subscriber doesn't get notified as it was initial state. m.Notify(firstClientsSet) notificationsCountEventuallyEquals(0) + require.Equal(t, 1, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{}, + }) // Notify an empty set of clients and make sure that the subscriber get notified. m.Notify(nil) notificationsCountEventuallyEquals(1) + require.Equal(t, 1, readinessChecker.CallsCount(), "no readiness check should be performed when notifying an empty set") // Notify an empty set again and make sure that the subscriber doesn't get notified as the state didn't change. m.Notify(nil) notificationsCountEventuallyEquals(1) + require.Equal(t, 1, readinessChecker.CallsCount(), "no readiness check should be performed when notifying an empty set") // Notify the second set of clients without making the new one ready and make sure that the subscriber gets no notification. m.Notify(secondClientsSet) notificationsCountEventuallyEquals(1) + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 2, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") // Notify the second set of clients and make sure that the subscriber gets notified after the new one becomes ready. readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) m.Notify(secondClientsSet) notificationsCountEventuallyEquals(2) + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 3, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") + + m.Notify([]adminapi.DiscoveredAdminAPI{firstClientsSet[0], secondClientsSet[0]}) + notificationsCountEventuallyEquals(3) + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL2}, + PendingURLs: []string{testURL1}, + }) + require.Equal(t, 4, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") +} + +func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) { + testClient, err := adminapi.NewTestClient(testURL1) + require.NoError(t, err) + + readinessTicker := mocks.NewTicker() + readinessChecker := &mockReadinessChecker{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + m, err := clients.NewAdminAPIClientsManager( + ctx, + logrus.New(), + []*adminapi.Client{testClient}, + readinessChecker, + clients.WithReadinessReconciliationTicker(readinessTicker), + ) + require.NoError(t, err) + m.Run() + <-m.Running() + + readinessCheckCallEventuallyMatches := func(expected readinessCheckCall) { + require.Eventually(t, func() bool { + lastCall, wasCalledAtAll := readinessChecker.LastCall() + if !wasCalledAtAll { + t.Log("Readiness checker was not called yet, waiting...") + return false + } + if diff := cmp.Diff(expected, lastCall); diff != "" { + t.Logf("Readiness checker was called with unexpected arguments: %s", diff) + return false + } + return true + }, time.Second, time.Millisecond) + } + + // Trigger the first readiness check. + readinessTicker.Add(clients.DefaultReadinessReconciliationInterval) + readinessCheckCallEventuallyMatches(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{}, + }) + require.Equal(t, 1, readinessChecker.CallsCount()) + + // Notify with a new client and check the readiness check call was made as expected. + m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + readinessCheckCallEventuallyMatches(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 2, readinessChecker.CallsCount()) + + // Trigger a next readiness check which will make testURL2 ready. + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) + readinessTicker.Add(clients.DefaultReadinessReconciliationInterval) + readinessCheckCallEventuallyMatches(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 3, readinessChecker.CallsCount()) + require.True(t, lo.ContainsBy(m.GatewayClients(), func(c *adminapi.Client) bool { + return c.BaseRootURL() == testURL2 + }), "expected to find the new client in the manager's clients list after it became ready") } func testDiscoveredAdminAPI(address string) adminapi.DiscoveredAdminAPI { diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 6a717c5613..97c8952704 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -3,21 +3,34 @@ package clients import ( "context" - "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/sirupsen/logrus" k8stypes "k8s.io/apimachinery/pkg/types" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" ) +// ReadinessCheckResult represents the result of a readiness check. type ReadinessCheckResult struct { - ClientsTurnedReady []*adminapi.Client + // ClientsTurnedReady are the clients that were pending and are now ready to be used. + ClientsTurnedReady []*adminapi.Client + // ClientsTurnedPending are the clients that were ready and are now pending to be created. ClientsTurnedPending []adminapi.DiscoveredAdminAPI } +// HasChanges returns true if there are any changes in the readiness check result. +// When no changes are present, it means that the readiness check haven't successfully created any pending client +// nor detected any already created client that became not ready. func (r ReadinessCheckResult) HasChanges() bool { return len(r.ClientsTurnedReady) > 0 || len(r.ClientsTurnedPending) > 0 } +// ReadinessChecker is responsible for checking the readiness of Admin API clients. type ReadinessChecker interface { + // CheckReadiness checks readiness of the provided clients: + // - alreadyCreatedClients are the clients that have already been created. The readiness of these clients will be + // checked by their IsReady() method. + // - pendingClients are the clients that have not been created yet and are pending to be created. The readiness of + // these clients will be checked by trying to create them. CheckReadiness( ctx context.Context, alreadyCreatedClients []AlreadyCreatedClient, @@ -25,6 +38,7 @@ type ReadinessChecker interface { ) ReadinessCheckResult } +// AlreadyCreatedClient represents an Admin API client that has already been created. type AlreadyCreatedClient interface { IsReady(context.Context) error PodReference() (k8stypes.NamespacedName, bool) @@ -50,10 +64,11 @@ func (c DefaultReadinessChecker) CheckReadiness( ) ReadinessCheckResult { return ReadinessCheckResult{ ClientsTurnedReady: c.checkPendingGatewayClients(ctx, pendingClients), - ClientsTurnedPending: c.checkActiveGatewayClients(ctx, readyClients), + ClientsTurnedPending: c.checkAlreadyExistingClients(ctx, readyClients), } } +// checkPendingGatewayClients checks if the pending clients are ready to be used and returns the ones that are. func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) { for _, adminAPI := range lastPending { // We indirectly check readiness of the client by trying to create it. If it succeeds then it means that @@ -70,9 +85,11 @@ func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, return turnedReady } -func (c DefaultReadinessChecker) checkActiveGatewayClients(ctx context.Context, lastActive []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { +// checkAlreadyExistingClients checks if the already existing clients are still ready to be used and returns the ones +// that are not. +func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, lastActive []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { for _, client := range lastActive { - // For active clients we check readiness by calling the Status endpoint. + // For ready clients we check readiness by calling the Status endpoint. if err := client.IsReady(ctx); err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. c.logger.WithError(err).Debugf("active client for %q is not ready, moving to pending", client.BaseRootURL()) diff --git a/internal/clients/readiness_test.go b/internal/clients/readiness_test.go index 3e2a939f25..cb4d35e544 100644 --- a/internal/clients/readiness_test.go +++ b/internal/clients/readiness_test.go @@ -6,12 +6,13 @@ import ( "fmt" "testing" - "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" - "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" "github.com/samber/lo" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" k8stypes "k8s.io/apimachinery/pkg/types" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" ) const ( @@ -19,12 +20,10 @@ const ( testURL2 = "http://localhost:8002" ) -var ( - testPodRef = k8stypes.NamespacedName{ - Namespace: "default", - Name: "mock", - } -) +var testPodRef = k8stypes.NamespacedName{ + Namespace: "default", + Name: "mock", +} type mockClientFactory struct { ready map[string]bool // Maps address to readiness. diff --git a/internal/controllers/configuration/kongadminapi_controller.go b/internal/controllers/configuration/kongadminapi_controller.go index 96e4ae8abc..a5b6866b49 100644 --- a/internal/controllers/configuration/kongadminapi_controller.go +++ b/internal/controllers/configuration/kongadminapi_controller.go @@ -166,8 +166,9 @@ func (r *KongAdminAPIServiceReconciler) Reconcile(ctx context.Context, req ctrl. func (r *KongAdminAPIServiceReconciler) notify() { discovered := flattenDiscoveredAdminAPIs(r.Cache) + addresses := lo.Map(discovered, func(d adminapi.DiscoveredAdminAPI, _ int) string { return d.Address }) r.Log.V(util.DebugLevel). - Info("notifying about newly detected Admin APIs", "admin_apis", discovered) + Info("notifying about newly detected Admin APIs", "admin_apis", addresses) r.EndpointsNotifier.Notify(discovered) } diff --git a/internal/controllers/configuration/kongadminapi_controller_envtest_test.go b/internal/controllers/configuration/kongadminapi_controller_envtest_test.go index 1b4b56fe92..5ab6e303d1 100644 --- a/internal/controllers/configuration/kongadminapi_controller_envtest_test.go +++ b/internal/controllers/configuration/kongadminapi_controller_envtest_test.go @@ -192,7 +192,7 @@ func TestKongAdminAPIController(t *testing.T) { ) }) - t.Run("not Ready Endpoints are not matched", func(t *testing.T) { + t.Run("terminating Endpoints are not matched", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() adminService, adminPod, n := startKongAdminAPIServiceReconciler(ctx, t, client, cfg) @@ -218,7 +218,7 @@ func TestKongAdminAPIController(t *testing.T) { { Addresses: []string{"10.0.0.1"}, Conditions: discoveryv1.EndpointConditions{ - Ready: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: &corev1.ObjectReference{ Kind: "Pod", @@ -459,16 +459,16 @@ func TestKongAdminAPIController(t *testing.T) { n.LastNotified(), ) - // Update all endpoints so that they are not Ready. + // Update all endpoints so that they are Terminating. for i := range endpoints.Endpoints { - endpoints.Endpoints[i].Conditions.Ready = lo.ToPtr(false) + endpoints.Endpoints[i].Conditions.Terminating = lo.ToPtr(true) } require.NoError(t, client.Update(ctx, &endpoints, &ctrlclient.UpdateOptions{})) require.NoError(t, client.Get(ctx, k8stypes.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}, &endpoints, &ctrlclient.GetOptions{})) assert.Eventually(t, func() bool { return len(n.LastNotified()) == 0 }, 3*time.Second, time.Millisecond) - // Update 1 endpoint so that that it's Ready. - endpoints.Endpoints[0].Conditions.Ready = lo.ToPtr(true) + // Update 1 endpoint so that it's not Terminating. + endpoints.Endpoints[0].Conditions.Terminating = nil require.NoError(t, client.Update(ctx, &endpoints, &ctrlclient.UpdateOptions{})) require.NoError(t, client.Get(ctx, k8stypes.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}, &endpoints, &ctrlclient.GetOptions{})) diff --git a/internal/manager/run.go b/internal/manager/run.go index f26d77395e..75c147f12e 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -134,18 +134,19 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("Initializing Dataplane Client") eventRecorder := mgr.GetEventRecorderFor(KongClientEventRecorderComponentName) + readinessChecker := clients.NewDefaultReadinessChecker(adminAPIClientsFactory, deprecatedLogger) clientsManager, err := clients.NewAdminAPIClientsManager( ctx, deprecatedLogger, initialKongClients, - adminAPIClientsFactory, + readinessChecker, ) if err != nil { return fmt.Errorf("failed to create AdminAPIClientsManager: %w", err) } if c.KongAdminSvc.IsPresent() { - setupLog.Info("Running AdminAPIClientsManager notify loop") - clientsManager.RunNotifyLoop() + setupLog.Info("Running AdminAPIClientsManager loop") + clientsManager.Run() } setupLog.Info("Starting Admission Server") From e79e60924bd8505a90bcc6c9c6e52477b602f665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Tue, 18 Jul 2023 20:58:09 +0200 Subject: [PATCH 06/11] changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cda7f6e2f0..737aa5d900 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,11 @@ Adding a new version? You'll need three changes: KIC restarts, it is now able to fetch the last good configuration from a running proxy instance and store it in its internal cache. [#4265](https://github.com/Kong/kubernetes-ingress-controller/pull/4265) +- Gateway Discovery feature was adapted to handle Gateways that are not ready yet + in terms of accepting data-plane traffic, but are ready to accept configuration + updates. The controller will now send configuration to such Gateways and will + actively monitor their readiness for accepting configuration updates. + [#4368](https://github.com/Kong/kubernetes-ingress-controller/pull/4368 ### Changed @@ -132,6 +137,10 @@ Adding a new version? You'll need three changes: sending stage (we've observed around 35% reduced time in config marshalling time but be aware that your mileage may vary). [#4222](https://github.com/Kong/kubernetes-ingress-controller/pull/4222) +- Changed the Gateway's readiness probe in all-in-one manifests from `/status` + to `/status/ready`. Gateways will be considered ready only after an initial + configuration is applied by the controller. + [#4368](https://github.com/Kong/kubernetes-ingress-controller/pull/4368 [gojson]: https://github.com/goccy/go-json [httproute-specification]: https://gateway-api.sigs.k8s.io/references/spec/#gateway.networking.k8s.io/v1beta1.HTTPRoute From 6a93c98c26ce11c7790fb522a9c8f3f218df72ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Tue, 18 Jul 2023 21:42:40 +0200 Subject: [PATCH 07/11] fix e2e --- internal/clients/manager.go | 41 +++++++++++++++++++++++-------------- test/e2e/helpers_test.go | 27 +++++++++++++++++++++++- test/e2e/kuma_test.go | 9 ++------ test/e2e/upgrade_test.go | 5 +++-- 4 files changed, 57 insertions(+), 25 deletions(-) diff --git a/internal/clients/manager.go b/internal/clients/manager.go index d9e9f8bb3b..c009e681c7 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -228,13 +228,17 @@ func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { // It will adjust lists of gateway clients and notify subscribers about the change if readyGatewayClients list has // changed. func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) { - c.lock.Lock() - defer c.lock.Unlock() - c.logger.Debug("received notification about Admin API addresses change") - if clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs); clientsChanged { - // Notify subscribers that the clients list has been updated. - c.logger.Debug("notifying subscribers about gateway clients change") + + changed := func() bool { + c.lock.Lock() + defer c.lock.Unlock() + clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs) + readinessChanged := c.reconcileGatewayClientsReadiness() + return clientsChanged || readinessChanged + }() + + if changed { c.notifyGatewayClientsSubscribers() } } @@ -242,13 +246,15 @@ func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdm // onReadinessReconciliationTick is called on every readinessReconciliationTicker tick. It will reconcile readiness // of all gateway clients and notify subscribers about the change if readyGatewayClients list has changed. func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { - c.lock.Lock() - defer c.lock.Unlock() - c.logger.Debug("reconciling readiness of gateway clients") - if clientsChanged := c.reconcileGatewayClientsReadiness(); clientsChanged { - // Notify subscribers that the clients list has been updated. - c.logger.Debug("notifying subscribers about gateway clients change") + + changed := func() bool { + c.lock.Lock() + defer c.lock.Unlock() + return c.reconcileGatewayClientsReadiness() + }() + + if changed { c.notifyGatewayClientsSubscribers() } } @@ -258,7 +264,7 @@ func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { // of the discovered Admin APIs and creates only those clients that we don't have. // It returns true if the gatewayClients slice has been changed, false otherwise. func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) (changed bool) { - // Short circuit, + // Short circuit. if len(discoveredAdminAPIs) == 0 { // If we have no clients and the provided list is empty, it means we're in sync. No change was made. if len(c.readyGatewayClients) == 0 && len(c.pendingGatewayClients) == 0 { @@ -299,8 +305,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi } } - readinessChanged := c.reconcileGatewayClientsReadiness() - return changed || readinessChanged + return changed } // reconcileGatewayClientsReadiness reconciles the readiness of the gateway clients. It ensures that the clients on the @@ -313,6 +318,11 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool { // It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications. defer c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) + // Short circuit. + if len(c.readyGatewayClients) == 0 && len(c.pendingGatewayClients) == 0 { + return false + } + readinessCheckResult := c.readinessChecker.CheckReadiness( c.ctx, lo.MapToSlice(c.readyGatewayClients, func(_ string, cl *adminapi.Client) AlreadyCreatedClient { return cl }), @@ -333,6 +343,7 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool { // notifyGatewayClientsSubscribers sends notifications to all subscribers that have called SubscribeToGatewayClientsChanges. func (c *AdminAPIClientsManager) notifyGatewayClientsSubscribers() { + c.logger.Debug("notifying subscribers about gateway clients change") for _, sub := range c.gatewayClientsChangesSubscribers { select { case <-c.ctx.Done(): diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index 654efd2d24..34ecba22b7 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -294,7 +294,7 @@ func waitForDeploymentRollout(ctx context.Context, t *testing.T, env environment } if err := allUpdatedReplicasRolledOutAndReady(deployment); err != nil { - t.Logf("controller deployment not ready: %s", err) + t.Logf("%s/%s deployment not ready: %s", namespace, name, err) return false } @@ -913,3 +913,28 @@ func scaleDeployment(ctx context.Context, t *testing.T, env environments.Environ return deployment.Status.ReadyReplicas == replicas }, time.Minute*3, time.Second, "deployment %s did not scale to %d replicas", deployment.Name, replicas) } + +func (d Deployments) Restart(ctx context.Context, t *testing.T, env environments.Environment) { + t.Helper() + + err := env.Cluster().Client().CoreV1().Pods(d.ControllerNN.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.ControllerNN.Name, + }, + }), + }) + require.NoError(t, err, "failed to delete controller pods") + + err = env.Cluster().Client().CoreV1().Pods(d.ControllerNN.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.ProxyNN.Name, + }, + }), + }) + require.NoError(t, err, "failed to delete proxy pods") + + waitForDeploymentRollout(ctx, t, env, d.ControllerNN.Namespace, d.ControllerNN.Name) + waitForDeploymentRollout(ctx, t, env, d.ProxyNN.Namespace, d.ProxyNN.Name) +} diff --git a/test/e2e/kuma_test.go b/test/e2e/kuma_test.go index 336393d12b..a04d035ca2 100644 --- a/test/e2e/kuma_test.go +++ b/test/e2e/kuma_test.go @@ -28,13 +28,8 @@ func TestDeployAllInOneDBLESSKuma(t *testing.T) { require.NoError(t, kuma.EnableMeshForNamespace(ctx, env.Cluster(), "kong")) require.NoError(t, kuma.EnableMeshForNamespace(ctx, env.Cluster(), "default")) - // scale to force a restart of pods and trigger mesh injection (we can't annotate the Kong namespace in advance, - // it gets clobbered by deployKong()). is there a "rollout restart" in client-go? who knows! - scaleDeployment(ctx, t, env, deployments.ProxyNN, 0) - scaleDeployment(ctx, t, env, deployments.ControllerNN, 0) - - scaleDeployment(ctx, t, env, deployments.ProxyNN, 2) - scaleDeployment(ctx, t, env, deployments.ControllerNN, 2) + // Restart Kong pods to trigger mesh injection. + deployments.Restart(ctx, t, env) t.Log("running ingress tests to verify all-in-one deployed ingress controller and proxy are functional") deployIngressWithEchoBackends(ctx, t, env, numberOfEchoBackends) diff --git a/test/e2e/upgrade_test.go b/test/e2e/upgrade_test.go index 35495956ef..d96d69441a 100644 --- a/test/e2e/upgrade_test.go +++ b/test/e2e/upgrade_test.go @@ -110,8 +110,9 @@ func testManifestsUpgrade( t.Logf("deploying target version of kong manifests: %s", testParams.toManifestPath) deployments := ManifestDeploy{ - Path: testParams.toManifestPath, - SkipTestPatches: true, + Path: testParams.toManifestPath, + // Do not skip test patches - we want to verify that upgrade works with an image override in target manifest. + SkipTestPatches: false, }.Run(ctx, t, env) if featureGates := testParams.controllerFeatureGates; featureGates != "" { From 35beb615c85a942faafc318d62887ceb0f90282d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Wed, 19 Jul 2023 10:05:49 +0200 Subject: [PATCH 08/11] address review comments --- internal/clients/manager.go | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/internal/clients/manager.go b/internal/clients/manager.go index c009e681c7..dd8fd10f11 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -49,7 +49,7 @@ type AdminAPIClientsManager struct { ctx context.Context onceNotifyLoopRunning sync.Once - running chan struct{} + runningChan chan struct{} isRunning bool // readyGatewayClients represent all Kong Gateway data-planes that are ready to be configured. @@ -105,7 +105,7 @@ func NewAdminAPIClientsManager( readinessReconciliationTicker: clock.NewTicker(), discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), ctx: ctx, - running: make(chan struct{}), + runningChan: make(chan struct{}), logger: logger, } @@ -118,7 +118,7 @@ func NewAdminAPIClientsManager( // Running returns a channel that is closed when the manager's background tasks are already running. func (c *AdminAPIClientsManager) Running() chan struct{} { - return c.running + return c.runningChan } // Run runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints. @@ -209,7 +209,7 @@ func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) defer c.readinessReconciliationTicker.Stop() - close(c.running) + close(c.runningChan) for { select { case <-c.ctx.Done(): @@ -230,15 +230,9 @@ func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) { c.logger.Debug("received notification about Admin API addresses change") - changed := func() bool { - c.lock.Lock() - defer c.lock.Unlock() - clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs) - readinessChanged := c.reconcileGatewayClientsReadiness() - return clientsChanged || readinessChanged - }() - - if changed { + clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs) + readinessChanged := c.reconcileGatewayClientsReadiness() + if clientsChanged || readinessChanged { c.notifyGatewayClientsSubscribers() } } @@ -248,13 +242,7 @@ func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdm func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { c.logger.Debug("reconciling readiness of gateway clients") - changed := func() bool { - c.lock.Lock() - defer c.lock.Unlock() - return c.reconcileGatewayClientsReadiness() - }() - - if changed { + if changed := c.reconcileGatewayClientsReadiness(); changed { c.notifyGatewayClientsSubscribers() } } @@ -264,6 +252,9 @@ func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { // of the discovered Admin APIs and creates only those clients that we don't have. // It returns true if the gatewayClients slice has been changed, false otherwise. func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) (changed bool) { + c.lock.Lock() + defer c.lock.Unlock() + // Short circuit. if len(discoveredAdminAPIs) == 0 { // If we have no clients and the provided list is empty, it means we're in sync. No change was made. @@ -318,6 +309,9 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool { // It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications. defer c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) + c.lock.Lock() + defer c.lock.Unlock() + // Short circuit. if len(c.readyGatewayClients) == 0 && len(c.pendingGatewayClients) == 0 { return false From f0500fe36d92ea29c16a2a6f0f6fa1ebd61af99e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Wed, 19 Jul 2023 11:18:51 +0200 Subject: [PATCH 09/11] add readiness check timeout and debug logs --- internal/clients/readiness.go | 66 +++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 97c8952704..1be9bf00a6 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -2,6 +2,7 @@ package clients import ( "context" + "time" "github.com/sirupsen/logrus" k8stypes "k8s.io/apimachinery/pkg/types" @@ -9,6 +10,10 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" ) +const ( + ReadinessCheckTimeout = time.Second +) + // ReadinessCheckResult represents the result of a readiness check. type ReadinessCheckResult struct { // ClientsTurnedReady are the clients that were pending and are now ready to be used. @@ -71,29 +76,43 @@ func (c DefaultReadinessChecker) CheckReadiness( // checkPendingGatewayClients checks if the pending clients are ready to be used and returns the ones that are. func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) { for _, adminAPI := range lastPending { - // We indirectly check readiness of the client by trying to create it. If it succeeds then it means that - // the client is ready to be used. - client, err := c.factory.CreateAdminAPIClient(ctx, adminAPI) - if err != nil { - // Despite the error reason we still want to keep the client in the pending list to retry later. - c.logger.WithError(err).Debugf("pending client for %q is not ready yet", adminAPI.Address) - continue + if client := c.checkPendingClient(ctx, adminAPI); client != nil { + turnedReady = append(turnedReady, client) } - - turnedReady = append(turnedReady, client) } return turnedReady } +// checkPendingClient indirectly check readiness of the client by trying to create it. If it succeeds then it +// means that the client is ready to be used. It returns a non-nil client if the client is ready to be used, otherwise +// nil is returned. +func (c DefaultReadinessChecker) checkPendingClient( + ctx context.Context, + pendingClient adminapi.DiscoveredAdminAPI, +) (client *adminapi.Client) { + defer func() { + c.logger.WithField("ok", client != nil). + Debugf("checking readiness of pending client for %q", pendingClient.Address) + }() + + ctx, cancel := context.WithTimeout(ctx, ReadinessCheckTimeout) + defer cancel() + client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient) + if err != nil { + // Despite the error reason we still want to keep the client in the pending list to retry later. + c.logger.WithError(err).Debugf("pending client for %q is not ready yet", pendingClient.Address) + return nil + } + + return client +} + // checkAlreadyExistingClients checks if the already existing clients are still ready to be used and returns the ones // that are not. -func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, lastActive []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { - for _, client := range lastActive { +func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, alreadyCreatedClients []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { + for _, client := range alreadyCreatedClients { // For ready clients we check readiness by calling the Status endpoint. - if err := client.IsReady(ctx); err != nil { - // Despite the error reason we still want to keep the client in the pending list to retry later. - c.logger.WithError(err).Debugf("active client for %q is not ready, moving to pending", client.BaseRootURL()) - + if ready := c.checkAlreadyCreatedClient(ctx, client); !ready { podRef, ok := client.PodReference() if !ok { // This should never happen, but if it does, we want to log it. @@ -108,3 +127,20 @@ func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context } return turnedPending } + +func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, client AlreadyCreatedClient) (ready bool) { + defer func() { + c.logger.WithField("ok", ready). + Debugf("checking readiness of already created client for %q", client.BaseRootURL()) + }() + + ctx, cancel := context.WithTimeout(ctx, ReadinessCheckTimeout) + defer cancel() + if err := client.IsReady(ctx); err != nil { + // Despite the error reason we still want to keep the client in the pending list to retry later. + c.logger.WithError(err).Debugf("already created client for %q is not ready, moving to pending", client.BaseRootURL()) + return false + } + + return true +} From 69b0655e03a7b8210de371521dc2cbb029bd54b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Wed, 19 Jul 2023 14:13:24 +0200 Subject: [PATCH 10/11] make const private --- CHANGELOG.md | 2 +- internal/clients/readiness.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 737aa5d900..d9060d6844 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -119,7 +119,7 @@ Adding a new version? You'll need three changes: [#4265](https://github.com/Kong/kubernetes-ingress-controller/pull/4265) - Gateway Discovery feature was adapted to handle Gateways that are not ready yet in terms of accepting data-plane traffic, but are ready to accept configuration - updates. The controller will now send configuration to such Gateways and will + updates. The controller will now send configuration to such Gateways and will actively monitor their readiness for accepting configuration updates. [#4368](https://github.com/Kong/kubernetes-ingress-controller/pull/4368 diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 1be9bf00a6..137bbb12fd 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -11,7 +11,7 @@ import ( ) const ( - ReadinessCheckTimeout = time.Second + readinessCheckTimeout = time.Second ) // ReadinessCheckResult represents the result of a readiness check. @@ -95,7 +95,7 @@ func (c DefaultReadinessChecker) checkPendingClient( Debugf("checking readiness of pending client for %q", pendingClient.Address) }() - ctx, cancel := context.WithTimeout(ctx, ReadinessCheckTimeout) + ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) defer cancel() client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient) if err != nil { @@ -134,7 +134,7 @@ func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, Debugf("checking readiness of already created client for %q", client.BaseRootURL()) }() - ctx, cancel := context.WithTimeout(ctx, ReadinessCheckTimeout) + ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) defer cancel() if err := client.IsReady(ctx); err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. From 92ffd5801b9fcb32bcea9fd38b44a7d735b338e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Fri, 21 Jul 2023 10:47:07 +0200 Subject: [PATCH 11/11] do not use logrus --- internal/clients/readiness.go | 33 +++++++++++++++++++++--------- internal/clients/readiness_test.go | 4 ++-- internal/manager/run.go | 2 +- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 137bbb12fd..0eb388101a 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -2,12 +2,15 @@ package clients import ( "context" + "errors" + "fmt" "time" - "github.com/sirupsen/logrus" + "github.com/go-logr/logr" k8stypes "k8s.io/apimachinery/pkg/types" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) const ( @@ -52,10 +55,10 @@ type AlreadyCreatedClient interface { type DefaultReadinessChecker struct { factory ClientFactory - logger logrus.FieldLogger + logger logr.Logger } -func NewDefaultReadinessChecker(factory ClientFactory, logger logrus.FieldLogger) DefaultReadinessChecker { +func NewDefaultReadinessChecker(factory ClientFactory, logger logr.Logger) DefaultReadinessChecker { return DefaultReadinessChecker{ factory: factory, logger: logger, @@ -91,8 +94,10 @@ func (c DefaultReadinessChecker) checkPendingClient( pendingClient adminapi.DiscoveredAdminAPI, ) (client *adminapi.Client) { defer func() { - c.logger.WithField("ok", client != nil). - Debugf("checking readiness of pending client for %q", pendingClient.Address) + c.logger.V(util.DebugLevel). + Info(fmt.Sprintf("checking readiness of pending client for %q", pendingClient.Address), + "ok", client != nil, + ) }() ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) @@ -100,7 +105,7 @@ func (c DefaultReadinessChecker) checkPendingClient( client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient) if err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. - c.logger.WithError(err).Debugf("pending client for %q is not ready yet", pendingClient.Address) + c.logger.V(util.DebugLevel).Error(err, fmt.Sprintf("pending client for %q is not ready yet", pendingClient.Address)) return nil } @@ -116,7 +121,10 @@ func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context podRef, ok := client.PodReference() if !ok { // This should never happen, but if it does, we want to log it. - c.logger.Errorf("failed to get PodReference for client %q", client.BaseRootURL()) + c.logger.Error( + errors.New("missing pod reference"), + fmt.Sprintf("failed to get PodReference for client %q", client.BaseRootURL()), + ) continue } turnedPending = append(turnedPending, adminapi.DiscoveredAdminAPI{ @@ -130,15 +138,20 @@ func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, client AlreadyCreatedClient) (ready bool) { defer func() { - c.logger.WithField("ok", ready). - Debugf("checking readiness of already created client for %q", client.BaseRootURL()) + c.logger.V(util.DebugLevel).Info( + fmt.Sprintf("checking readiness of already created client for %q", client.BaseRootURL()), + "ok", ready, + ) }() ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) defer cancel() if err := client.IsReady(ctx); err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. - c.logger.WithError(err).Debugf("already created client for %q is not ready, moving to pending", client.BaseRootURL()) + c.logger.V(util.DebugLevel).Error( + err, + fmt.Sprintf("already created client for %q is not ready, moving to pending", client.BaseRootURL()), + ) return false } diff --git a/internal/clients/readiness_test.go b/internal/clients/readiness_test.go index cb4d35e544..b0b6830aa3 100644 --- a/internal/clients/readiness_test.go +++ b/internal/clients/readiness_test.go @@ -6,8 +6,8 @@ import ( "fmt" "testing" + "github.com/go-logr/logr" "github.com/samber/lo" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" k8stypes "k8s.io/apimachinery/pkg/types" @@ -196,7 +196,7 @@ func TestDefaultReadinessChecker(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { factory := newMockClientFactory(t, tc.pendingClientsReadiness) - checker := clients.NewDefaultReadinessChecker(factory, logrus.New()) + checker := clients.NewDefaultReadinessChecker(factory, logr.Discard()) result := checker.CheckReadiness(context.Background(), tc.alreadyCreatedClients, tc.pendingClients) turnedPending := lo.Map(result.ClientsTurnedPending, func(c adminapi.DiscoveredAdminAPI, _ int) string { return c.Address }) diff --git a/internal/manager/run.go b/internal/manager/run.go index 75c147f12e..a86b1ebaf9 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -134,7 +134,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("Initializing Dataplane Client") eventRecorder := mgr.GetEventRecorderFor(KongClientEventRecorderComponentName) - readinessChecker := clients.NewDefaultReadinessChecker(adminAPIClientsFactory, deprecatedLogger) + readinessChecker := clients.NewDefaultReadinessChecker(adminAPIClientsFactory, setupLog.WithName("readiness-checker")) clientsManager, err := clients.NewAdminAPIClientsManager( ctx, deprecatedLogger,