diff --git a/CHANGELOG.md b/CHANGELOG.md index cda7f6e2f0..d9060d6844 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,11 @@ Adding a new version? You'll need three changes: KIC restarts, it is now able to fetch the last good configuration from a running proxy instance and store it in its internal cache. [#4265](https://github.com/Kong/kubernetes-ingress-controller/pull/4265) +- Gateway Discovery feature was adapted to handle Gateways that are not ready yet + in terms of accepting data-plane traffic, but are ready to accept configuration + updates. The controller will now send configuration to such Gateways and will + actively monitor their readiness for accepting configuration updates. + [#4368](https://github.com/Kong/kubernetes-ingress-controller/pull/4368 ### Changed @@ -132,6 +137,10 @@ Adding a new version? You'll need three changes: sending stage (we've observed around 35% reduced time in config marshalling time but be aware that your mileage may vary). [#4222](https://github.com/Kong/kubernetes-ingress-controller/pull/4222) +- Changed the Gateway's readiness probe in all-in-one manifests from `/status` + to `/status/ready`. Gateways will be considered ready only after an initial + configuration is applied by the controller. + [#4368](https://github.com/Kong/kubernetes-ingress-controller/pull/4368 [gojson]: https://github.com/goccy/go-json [httproute-specification]: https://gateway-api.sigs.k8s.io/references/spec/#gateway.networking.k8s.io/v1beta1.HTTPRoute diff --git a/config/variants/multi-gw/base/gateway_deployment.yaml b/config/variants/multi-gw/base/gateway_deployment.yaml index 8e42d21a96..cc1e9c6fdf 100644 --- a/config/variants/multi-gw/base/gateway_deployment.yaml +++ b/config/variants/multi-gw/base/gateway_deployment.yaml @@ -99,7 +99,7 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-enterprise.yaml b/deploy/single/all-in-one-dbless-enterprise.yaml index 60247c1ada..9a1a72670d 100644 --- a/deploy/single/all-in-one-dbless-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-enterprise.yaml @@ -2108,7 +2108,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml index 41f08bc09d..bcfe7e1ba0 100644 --- a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml @@ -2113,7 +2113,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-konnect-enterprise.yaml b/deploy/single/all-in-one-dbless-konnect-enterprise.yaml index 6277a4719d..6c3584fc54 100644 --- a/deploy/single/all-in-one-dbless-konnect-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-konnect-enterprise.yaml @@ -2123,7 +2123,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless-konnect.yaml b/deploy/single/all-in-one-dbless-konnect.yaml index 0696a21239..baf44b23cc 100644 --- a/deploy/single/all-in-one-dbless-konnect.yaml +++ b/deploy/single/all-in-one-dbless-konnect.yaml @@ -2123,7 +2123,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/deploy/single/all-in-one-dbless.yaml b/deploy/single/all-in-one-dbless.yaml index 58da6c20e5..eb912ab561 100644 --- a/deploy/single/all-in-one-dbless.yaml +++ b/deploy/single/all-in-one-dbless.yaml @@ -2108,7 +2108,7 @@ spec: readinessProbe: failureThreshold: 3 httpGet: - path: /status + path: /status/ready port: 8100 scheme: HTTP initialDelaySeconds: 5 diff --git a/internal/adminapi/client.go b/internal/adminapi/client.go index 9a345e5f1c..74d50d3e1e 100644 --- a/internal/adminapi/client.go +++ b/internal/adminapi/client.go @@ -95,6 +95,12 @@ func (c *Client) NodeID(ctx context.Context) (string, error) { return nodeID, nil } +// IsReady returns nil if the Admin API is ready to serve requests. +func (c *Client) IsReady(ctx context.Context) error { + _, err := c.adminAPIClient.Status(ctx) + return err +} + // GetKongVersion returns version of the kong gateway. func (c *Client) GetKongVersion(ctx context.Context) (string, error) { if c.isKonnect { diff --git a/internal/adminapi/endpoints.go b/internal/adminapi/endpoints.go index 675e091cc2..bbeeec0bf0 100644 --- a/internal/adminapi/endpoints.go +++ b/internal/adminapi/endpoints.go @@ -122,7 +122,7 @@ func (d *Discoverer) AdminAPIsFromEndpointSlice( } for _, e := range endpoints.Endpoints { - if e.Conditions.Ready == nil || !*e.Conditions.Ready { + if e.Conditions.Terminating != nil && *e.Conditions.Terminating { continue } diff --git a/internal/adminapi/endpoints_test.go b/internal/adminapi/endpoints_test.go index 63136cab93..d019892888 100644 --- a/internal/adminapi/endpoints_test.go +++ b/internal/adminapi/endpoints_test.go @@ -167,7 +167,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { dnsStrategy: cfgtypes.ServiceScopedPodDNSStrategy, }, { - name: "not ready endpoints are not returned", + name: "not ready endpoints are returned", endpoints: discoveryv1.EndpointSlice{ ObjectMeta: endpointsSliceObjectMeta, AddressType: discoveryv1.AddressTypeIPv4, @@ -183,12 +183,18 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { }, Ports: builder.NewEndpointPort(8444).WithName("admin").IntoSlice(), }, - portNames: sets.New("admin"), - want: sets.New[DiscoveredAdminAPI](), + portNames: sets.New("admin"), + want: sets.New[DiscoveredAdminAPI]( + DiscoveredAdminAPI{ + Address: "https://10.0.0.1:8444", + PodRef: k8stypes.NamespacedName{ + Name: "pod-1", Namespace: namespaceName, + }, + }), dnsStrategy: cfgtypes.IPDNSStrategy, }, { - name: "not ready and terminating endpoints are not returned", + name: "ready and terminating endpoints are not returned", endpoints: discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: uuid.NewString(), @@ -199,7 +205,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { { Addresses: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, Conditions: discoveryv1.EndpointConditions{ - Ready: lo.ToPtr(false), + Ready: lo.ToPtr(true), Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-1"), @@ -237,7 +243,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { Addresses: []string{"10.0.2.1"}, Conditions: discoveryv1.EndpointConditions{ Ready: lo.ToPtr(false), - Terminating: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-3"), }, @@ -289,7 +295,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) { Addresses: []string{"10.0.2.1"}, Conditions: discoveryv1.EndpointConditions{ Ready: lo.ToPtr(false), - Terminating: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-3"), }, @@ -551,7 +557,7 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) { Addresses: []string{"8.0.0.1"}, Conditions: discoveryv1.EndpointConditions{ Ready: lo.ToPtr(false), - Terminating: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-3"), }, @@ -637,7 +643,7 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) { dnsStrategy: cfgtypes.IPDNSStrategy, }, { - name: "not Ready Endpoints are not matched", + name: "terminating Endpoints are not matched", service: k8stypes.NamespacedName{ Namespace: namespaceName, Name: serviceName, @@ -652,7 +658,8 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) { { Addresses: []string{"7.0.0.1"}, Conditions: discoveryv1.EndpointConditions{ - Ready: lo.ToPtr(false), + Ready: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: testPodReference(namespaceName, "pod-1"), }, diff --git a/internal/clients/config_status_test.go b/internal/clients/config_status_test.go index d79576f421..c07bfb56af 100644 --- a/internal/clients/config_status_test.go +++ b/internal/clients/config_status_test.go @@ -5,15 +5,14 @@ import ( "testing" "time" - "github.com/go-logr/logr/testr" + "github.com/go-logr/logr" "github.com/stretchr/testify/require" "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" ) func TestChannelConfigNotifier(t *testing.T) { - logger := testr.New(t) - n := clients.NewChannelConfigNotifier(logger) + n := clients.NewChannelConfigNotifier(logr.Discard()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/clients/manager.go b/internal/clients/manager.go index b3b0a0f50d..dd8fd10f11 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -4,24 +4,44 @@ import ( "context" "errors" "sync" + "time" "github.com/samber/lo" "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock" ) +// DefaultReadinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop. +// It's the same as the default interval of a Kubernetes container's readiness probe. +const DefaultReadinessReconciliationInterval = 10 * time.Second + +// ClientFactory is responsible for creating Admin API clients. type ClientFactory interface { CreateAdminAPIClient(ctx context.Context, address adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) } +// AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that +// we should configure. +type AdminAPIClientsProvider interface { + KonnectClient() *adminapi.KonnectClient + GatewayClients() []*adminapi.Client +} + +// Ticker is an interface that allows to control a ticker. +type Ticker interface { + Stop() + Channel() <-chan time.Time + Reset(d time.Duration) +} + // AdminAPIClientsManager keeps track of current Admin API clients of Gateways that we should configure. -// In particular, it can be notified about the clients' list update with use of Notify method, and queried -// for the latest slice of those with use of Clients method. +// In particular, it can be notified about the discovered clients' list with use of Notify method, and queried +// for the latest slice of ready to be configured clients with use of GatewayClients method. It also runs periodic +// readiness reconciliation loop which is responsible for checking readiness of the clients. type AdminAPIClientsManager struct { - // adminAPIClientFactory is a factory used for creating Admin API clients. - adminAPIClientFactory ClientFactory - // discoveredAdminAPIsNotifyChan is used for notifications that contain Admin API // endpoints list that should be used for configuring the dataplane. discoveredAdminAPIsNotifyChan chan []adminapi.DiscoveredAdminAPI @@ -29,12 +49,21 @@ type AdminAPIClientsManager struct { ctx context.Context onceNotifyLoopRunning sync.Once - notifyLoopRunningCh chan struct{} - isNotifyLoopRunning bool + runningChan chan struct{} + isRunning bool + + // readyGatewayClients represent all Kong Gateway data-planes that are ready to be configured. + readyGatewayClients map[string]*adminapi.Client + + // pendingGatewayClients represent all Kong Gateway data-planes that were discovered but are not ready to be + // configured. + pendingGatewayClients map[string]adminapi.DiscoveredAdminAPI + + // readinessChecker is used to check readiness of the clients. + readinessChecker ReadinessChecker - // gatewayClients represent all Kong Gateway data-planes that are configured by this KIC instance with use of - // their Admin API. - gatewayClients []*adminapi.Client + // readinessReconciliationTicker is used to run readiness reconciliation loop. + readinessReconciliationTicker Ticker // konnectClient represents a special-case of the data-plane which is Konnect cloud. // This client is used to synchronise configuration with Konnect's Runtime Group Admin API. @@ -46,39 +75,61 @@ type AdminAPIClientsManager struct { logger logrus.FieldLogger } +type AdminAPIClientsManagerOption func(*AdminAPIClientsManager) + +// WithReadinessReconciliationTicker allows to set a custom ticker for readiness reconciliation loop. +func WithReadinessReconciliationTicker(ticker Ticker) AdminAPIClientsManagerOption { + return func(m *AdminAPIClientsManager) { + m.readinessReconciliationTicker = ticker + } +} + func NewAdminAPIClientsManager( ctx context.Context, logger logrus.FieldLogger, initialClients []*adminapi.Client, - kongClientFactory ClientFactory, + readinessChecker ReadinessChecker, + opts ...AdminAPIClientsManagerOption, ) (*AdminAPIClientsManager, error) { if len(initialClients) == 0 { return nil, errors.New("at least one initial client must be provided") } - return &AdminAPIClientsManager{ - gatewayClients: initialClients, - adminAPIClientFactory: kongClientFactory, + readyClients := lo.SliceToMap(initialClients, func(c *adminapi.Client) (string, *adminapi.Client) { + return c.BaseRootURL(), c + }) + c := &AdminAPIClientsManager{ + readyGatewayClients: readyClients, + pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI), + readinessChecker: readinessChecker, + readinessReconciliationTicker: clock.NewTicker(), discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI), ctx: ctx, - notifyLoopRunningCh: make(chan struct{}), + runningChan: make(chan struct{}), logger: logger, - }, nil + } + + for _, opt := range opts { + opt(c) + } + + return c, nil } // Running returns a channel that is closed when the manager's background tasks are already running. func (c *AdminAPIClientsManager) Running() chan struct{} { - return c.notifyLoopRunningCh + return c.runningChan } -// RunNotifyLoop runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints. -func (c *AdminAPIClientsManager) RunNotifyLoop() { +// Run runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints. +// It should only be called when Gateway Discovery is enabled. +func (c *AdminAPIClientsManager) Run() { c.onceNotifyLoopRunning.Do(func() { - go c.adminAPIAddressNotifyLoop() + go c.gatewayClientsReconciliationLoop() c.lock.Lock() defer c.lock.Unlock() - c.isNotifyLoopRunning = true + c.isRunning = true }) } @@ -118,16 +169,13 @@ func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient { func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client { c.lock.RLock() defer c.lock.RUnlock() - - copied := make([]*adminapi.Client, len(c.gatewayClients)) - copy(copied, c.gatewayClients) - return copied + return lo.Values(c.readyGatewayClients) } func (c *AdminAPIClientsManager) GatewayClientsCount() int { c.lock.RLock() defer c.lock.RUnlock() - return len(c.gatewayClients) + return len(c.readyGatewayClients) } // SubscribeToGatewayClientsChanges returns a channel that will receive a notification on every Gateway clients update. @@ -145,7 +193,7 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru } // No notify loop running, there will be no updates, let's propagate that to the caller. - if !c.isNotifyLoopRunning { + if !c.isRunning { return nil, false } @@ -154,39 +202,51 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru return ch, true } -// adminAPIAddressNotifyLoop is an inner loop listening on notifyChan which are received via -// Notify() calls. Each time it receives on notifyChan tt will take the provided -// list of addresses and update the internally held list of clients such that: -// - the internal list of kong clients contains only the provided addresses -// - if a client for a provided address already exists it's not recreated again -// (hence no external calls are made to check the provided endpoint if there -// exists a client already using it) -// - client that do not exist in the provided address list are removed if they -// are present in the current state -// -// This function will acquire the internal lock to prevent the modification of -// internal clients list. -func (c *AdminAPIClientsManager) adminAPIAddressNotifyLoop() { - close(c.notifyLoopRunningCh) +// gatewayClientsReconciliationLoop is an inner loop listening on: +// - discoveredAdminAPIsNotifyChan - triggered on every Notify() call. +// - readinessReconciliationTicker - triggered on every readinessReconciliationTicker tick. +func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() { + c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) + defer c.readinessReconciliationTicker.Stop() + + close(c.runningChan) for { select { case <-c.ctx.Done(): c.logger.Infof("closing AdminAPIClientsManager: %s", c.ctx.Err()) - c.discoveredAdminAPIsNotifyChan = nil c.closeGatewayClientsSubscribers() return - case discoveredAdminAPIs := <-c.discoveredAdminAPIsNotifyChan: - c.logger.Debug("received notification about Admin API addresses change") - if clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs); clientsChanged { - // Notify subscribers that the clients list has been updated. - c.logger.Debug("notifying subscribers about gateway clients change") - c.notifyGatewayClientsSubscribers() - } + c.onDiscoveredAdminAPIsNotification(discoveredAdminAPIs) + case <-c.readinessReconciliationTicker.Channel(): + c.onReadinessReconciliationTick() } } } +// onDiscoveredAdminAPIsNotification is called when a new notification about Admin API addresses change is received. +// It will adjust lists of gateway clients and notify subscribers about the change if readyGatewayClients list has +// changed. +func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) { + c.logger.Debug("received notification about Admin API addresses change") + + clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs) + readinessChanged := c.reconcileGatewayClientsReadiness() + if clientsChanged || readinessChanged { + c.notifyGatewayClientsSubscribers() + } +} + +// onReadinessReconciliationTick is called on every readinessReconciliationTicker tick. It will reconcile readiness +// of all gateway clients and notify subscribers about the change if readyGatewayClients list has changed. +func (c *AdminAPIClientsManager) onReadinessReconciliationTick() { + c.logger.Debug("reconciling readiness of gateway clients") + + if changed := c.reconcileGatewayClientsReadiness(); changed { + c.notifyGatewayClientsSubscribers() + } +} + // adjustGatewayClients adjusts internally stored clients slice based on the provided // discovered Admin APIs slice. It consults BaseRootURLs of already stored clients with each // of the discovered Admin APIs and creates only those clients that we don't have. @@ -195,61 +255,89 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi c.lock.Lock() defer c.lock.Unlock() - // Short circuit + // Short circuit. if len(discoveredAdminAPIs) == 0 { - if len(c.gatewayClients) == 0 { + // If we have no clients and the provided list is empty, it means we're in sync. No change was made. + if len(c.readyGatewayClients) == 0 && len(c.pendingGatewayClients) == 0 { return false } - c.gatewayClients = c.gatewayClients[:0] + // Otherwise, we have to clear the clients and return true to indicate that the change was made. + maps.Clear(c.readyGatewayClients) + maps.Clear(c.pendingGatewayClients) return true } - toAdd := lo.Filter(discoveredAdminAPIs, func(api adminapi.DiscoveredAdminAPI, _ int) bool { - // If we already have a client with a provided address then great, no need - // to do anything. + // Make sure all discovered clients that are not in the ready list are in the pending list. + for _, d := range discoveredAdminAPIs { + if _, ok := c.readyGatewayClients[d.Address]; !ok { + c.pendingGatewayClients[d.Address] = d + } + } - // If we don't have a client with new address then filter it and add - // a client for this address. - return !lo.ContainsBy(c.gatewayClients, func(cl *adminapi.Client) bool { - return api.Address == cl.BaseRootURL() + // Remove ready clients that are not present in the discovered list. + for _, cl := range c.readyGatewayClients { + clientNotOnDiscoveredList := !lo.ContainsBy(discoveredAdminAPIs, func(d adminapi.DiscoveredAdminAPI) bool { + return d.Address == cl.BaseRootURL() }) - }) + if clientNotOnDiscoveredList { + delete(c.readyGatewayClients, cl.BaseRootURL()) + changed = true + } + } - var idxToRemove []int - for i, cl := range c.gatewayClients { - // If the new address set contains a client that we already have then - // good, no need to do anything for it. - if lo.ContainsBy(discoveredAdminAPIs, func(api adminapi.DiscoveredAdminAPI) bool { - return api.Address == cl.BaseRootURL() - }) { - continue + // Remove pending clients that are not present in the discovered list. + for _, cl := range c.pendingGatewayClients { + clientNotOnDiscoveredList := !lo.ContainsBy(discoveredAdminAPIs, func(d adminapi.DiscoveredAdminAPI) bool { + return d.Address == cl.Address + }) + if clientNotOnDiscoveredList { + delete(c.pendingGatewayClients, cl.Address) + changed = true } - // If the new address set does not contain an address that we already - // have then remove it. - idxToRemove = append(idxToRemove, i) } - for i := len(idxToRemove) - 1; i >= 0; i-- { - idx := idxToRemove[i] - c.gatewayClients = append(c.gatewayClients[:idx], c.gatewayClients[idx+1:]...) + return changed +} + +// reconcileGatewayClientsReadiness reconciles the readiness of the gateway clients. It ensures that the clients on the +// readyGatewayClients list are still ready and that the clients on the pendingGatewayClients list are still pending. +// If any of the clients is not ready anymore, it will be moved to the pendingGatewayClients list. If any of the clients +// is not pending anymore, it will be moved to the readyGatewayClients list. It returns true if any transition has been +// made, false otherwise. +func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool { + // Reset the ticker after each readiness reconciliation despite the trigger (whether it was a tick or a notification). + // It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications. + defer c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval) + + c.lock.Lock() + defer c.lock.Unlock() + + // Short circuit. + if len(c.readyGatewayClients) == 0 && len(c.pendingGatewayClients) == 0 { + return false } - for _, adminAPI := range toAdd { - client, err := c.adminAPIClientFactory.CreateAdminAPIClient(c.ctx, adminAPI) - if err != nil { - c.logger.WithError(err).Errorf("failed to create a client for %s", adminAPI) - continue - } - client.AttachPodReference(adminAPI.PodRef) + readinessCheckResult := c.readinessChecker.CheckReadiness( + c.ctx, + lo.MapToSlice(c.readyGatewayClients, func(_ string, cl *adminapi.Client) AlreadyCreatedClient { return cl }), + lo.Values(c.pendingGatewayClients), + ) - c.gatewayClients = append(c.gatewayClients, client) + for _, cl := range readinessCheckResult.ClientsTurnedReady { + delete(c.pendingGatewayClients, cl.BaseRootURL()) + c.readyGatewayClients[cl.BaseRootURL()] = cl + } + for _, cl := range readinessCheckResult.ClientsTurnedPending { + delete(c.readyGatewayClients, cl.Address) + c.pendingGatewayClients[cl.Address] = cl } - return len(toAdd) > 0 || len(idxToRemove) > 0 + return readinessCheckResult.HasChanges() } // notifyGatewayClientsSubscribers sends notifications to all subscribers that have called SubscribeToGatewayClientsChanges. func (c *AdminAPIClientsManager) notifyGatewayClientsSubscribers() { + c.logger.Debug("notifying subscribers about gateway clients change") for _, sub := range c.gatewayClientsChangesSubscribers { select { case <-c.ctx.Done(): @@ -265,10 +353,3 @@ func (c *AdminAPIClientsManager) closeGatewayClientsSubscribers() { close(sub) } } - -// AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that -// we should configure. -type AdminAPIClientsProvider interface { - KonnectClient() *adminapi.KonnectClient - GatewayClients() []*adminapi.Client -} diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index adb30285b7..6493aca026 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -1,270 +1,164 @@ -package clients +package clients_test import ( "context" - "fmt" - "net/http" - "net/http/httptest" + "sync" "sync/atomic" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/samber/lo" + "github.com/samber/mo" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" k8stypes "k8s.io/apimachinery/pkg/types" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" + "github.com/kong/kubernetes-ingress-controller/v2/test/mocks" ) -// clientFactoryWithExpected implements ClientFactory interface and can be used -// in tests to assert which clients have been created and signal failure if: -// - client for an unexpected address gets created -// - client which already got created was tried to be created second time. -type clientFactoryWithExpected struct { - expected map[string]bool - t *testing.T +type readinessCheckCall struct { + AlreadyCreatedURLs []string + PendingURLs []string } -func (cf clientFactoryWithExpected) CreateAdminAPIClient(_ context.Context, adminAPI adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) { - address := adminAPI.Address - stillExpecting, ok := cf.expected[address] - if !ok { - cf.t.Errorf("got %s which was unexpected", address) - return nil, fmt.Errorf("got %s which was unexpected", address) - } - if !stillExpecting { - cf.t.Errorf("got %s more than once", address) - return nil, fmt.Errorf("got %s more than once", address) - } - cf.expected[address] = false +type mockReadinessChecker struct { + nextResult clients.ReadinessCheckResult + lastCall mo.Option[readinessCheckCall] + callsCount int + lock sync.RWMutex +} - return adminapi.NewTestClient(address) +func (m *mockReadinessChecker) CheckReadiness( + _ context.Context, + alreadyCreatedClients []clients.AlreadyCreatedClient, + pendingClients []adminapi.DiscoveredAdminAPI, +) clients.ReadinessCheckResult { + m.lock.Lock() + defer m.lock.Unlock() + m.callsCount++ + m.lastCall = mo.Some(readinessCheckCall{ + AlreadyCreatedURLs: lo.Map(alreadyCreatedClients, func(c clients.AlreadyCreatedClient, _ int) string { + return c.BaseRootURL() + }), + PendingURLs: lo.Map(pendingClients, func(c adminapi.DiscoveredAdminAPI, _ int) string { + return c.Address + }), + }) + return m.nextResult } -func (cf clientFactoryWithExpected) AssertExpectedCalls() { - for _, addr := range cf.ExpectedCallsLeft() { - cf.t.Errorf("%s client expected to be called, but wasn't", addr) - } +func (m *mockReadinessChecker) LetChecksReturn(result clients.ReadinessCheckResult) { + m.lock.Lock() + defer m.lock.Unlock() + m.nextResult = result } -func (cf clientFactoryWithExpected) ExpectedCallsLeft() []string { - var notCalled []string - for addr, stillExpected := range cf.expected { - if stillExpected { - notCalled = append(notCalled, addr) - } +func (m *mockReadinessChecker) LastCall() (readinessCheckCall, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + if call, ok := m.lastCall.Get(); ok { + return call, true } - return notCalled + return readinessCheckCall{}, false } -type alwaysSuccessClientFactory struct{} - -func (a alwaysSuccessClientFactory) CreateAdminAPIClient( - _ context.Context, - adminAPI adminapi.DiscoveredAdminAPI, -) (*adminapi.Client, error) { - return adminapi.NewTestClient(adminAPI.Address) +func (m *mockReadinessChecker) CallsCount() int { + m.lock.RLock() + defer m.lock.RUnlock() + return m.callsCount } -func TestClientAddressesNotifications(t *testing.T) { - var ( - logger = logrus.New() - expected = map[string]bool{} - serverCalls int32 - ) - - const numberOfServers = 2 - - createTestServer := func() *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // This test server serves as kong Admin API checking that we only get - // as many calls as new clients requests. - // That said: when we have 1 manager with url1 and we receive a notification - // with url1 and url2 we should only create the second manager with - // url2 and leave the existing one (for url1) in place and reuse it. - - atomic.AddInt32(&serverCalls, 1) - n := int(atomic.LoadInt32(&serverCalls)) - - if n > numberOfServers { - t.Errorf("clients should only call out to the server %d times, but we received %d requests", - numberOfServers, n, - ) - } - })) - } - - srv := createTestServer() - defer srv.Close() - expected[srv.URL] = true +func intoTurnedReady(urls ...string) []*adminapi.Client { + return lo.Map(urls, func(url string, _ int) *adminapi.Client { + return lo.Must(adminapi.NewTestClient(url)) + }) +} - srv2 := createTestServer() - defer srv2.Close() - expected[srv2.URL] = true +func intoTurnedPending(urls ...string) []adminapi.DiscoveredAdminAPI { + return lo.Map(urls, func(url string, _ int) adminapi.DiscoveredAdminAPI { + return testDiscoveredAdminAPI(url) + }) +} - testClientFactoryWithExpected := clientFactoryWithExpected{ - expected: expected, - t: t, - } +func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - initialClient, err := adminapi.NewTestClient("localhost:8083") + logger := logrus.New() + readinessChecker := &mockReadinessChecker{} + initialClient, err := adminapi.NewTestClient("https://localhost:8083") require.NoError(t, err) - manager, err := NewAdminAPIClientsManager( + manager, err := clients.NewAdminAPIClientsManager( ctx, logger, []*adminapi.Client{initialClient}, - testClientFactoryWithExpected, + readinessChecker, ) require.NoError(t, err) require.NotNil(t, manager) - manager.RunNotifyLoop() + manager.Run() <-manager.Running() - defer testClientFactoryWithExpected.AssertExpectedCalls() - - requireClientsCountEventually := func(t *testing.T, c *AdminAPIClientsManager, addresses []string, args ...any) { + requireClientsMatchEventually := func(t *testing.T, c *clients.AdminAPIClientsManager, addresses []string, args ...any) { require.Eventually(t, func() bool { clientAddresses := lo.Map(c.GatewayClients(), func(cl *adminapi.Client, _ int) string { return cl.BaseRootURL() }) return slices.Equal(addresses, clientAddresses) - }, time.Second, time.Millisecond, args..., - ) + }, time.Second, time.Millisecond, args...) } - requireClientsCountEventually(t, manager, []string{"localhost:8083"}, + requireClientsMatchEventually(t, manager, []string{initialClient.BaseRootURL()}, "initially there should be the initial client") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL}, + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, "after notifying about a new address we should get 1 client eventually") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL}, + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, "after notifying the same address there's no update in clients") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL), testDiscoveredAdminAPI(srv2.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL, srv2.URL}, - "after notifying new address set including the old already existing one we get both the old and the new") + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, + "after notifying new address set including the old already existing one but new one not yet ready we get just the old one") + + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1, testURL2}, + "after notifying new address set including the old already existing one and new one turning ready we get both the old and the new") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL), testDiscoveredAdminAPI(srv2.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL, srv2.URL}, - "notifying again with the same set of URLs should not change the existing URLs") + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1, testURL2}, + "after notifying again with the same set of URLs should not change the existing URLs") - manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(srv.URL)}) - requireClientsCountEventually(t, manager, []string{srv.URL}, + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL2)}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, + "after notifying the same address set with one turning pending, we get only one client") + + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{}) + manager.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + requireClientsMatchEventually(t, manager, []string{testURL1}, "notifying again with just one URL should decrease the set of URLs to just this one") manager.Notify([]adminapi.DiscoveredAdminAPI{}) - requireClientsCountEventually(t, manager, []string{}) - - // We could test here notifying about srv.URL and srv2.URL again but there's - // no data structure in the manager that could notify us about a removal of - // a manager which we could use here. + requireClientsMatchEventually(t, manager, []string{}) cancel() require.NotPanics(t, func() { manager.Notify([]adminapi.DiscoveredAdminAPI{}) }, "notifying about new clients after manager has been shut down shouldn't panic") } -func TestClientAdjustInternalClientsAfterNotification(t *testing.T) { - var ( - ctx = context.Background() - logger = logrus.New() - ) - - cf := &clientFactoryWithExpected{ - t: t, - } - - // Initial client is expected to be replaced later on. - testClient, err := adminapi.NewTestClient("localhost:8083") - require.NoError(t, err) - manager, err := NewAdminAPIClientsManager(ctx, logger, []*adminapi.Client{testClient}, cf) - require.NoError(t, err) - require.NotNil(t, manager) - manager.RunNotifyLoop() - <-manager.Running() - - clients := manager.GatewayClients() - require.Len(t, clients, 1) - require.Equal(t, "localhost:8083", clients[0].BaseRootURL()) - - requireNoExpectedCallsLeftEventually := func(t *testing.T) { - require.Eventually(t, func() bool { - return len(cf.ExpectedCallsLeft()) == 0 - }, time.Second, time.Millisecond) - } - - t.Run("2 new clients", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{"localhost:8080": true, "localhost:8081": true} - // there are 2 addresses contained in the notification of which 2 are new - // and client creator should be called exactly 2 times - clients := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("localhost:8080"), - testDiscoveredAdminAPI("localhost:8081"), - } - changed := manager.adjustGatewayClients(clients) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(clients) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) - - t.Run("1 addresses, no new client", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{} - // there is address contained in the notification but a client for that - // address already exists, client creator should not be called - clients := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("localhost:8080")} - changed := manager.adjustGatewayClients(clients) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(clients) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) - - t.Run("2 addresses, 1 new client", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{"localhost:8081": true} - // there are 2 addresses contained in the notification but only 1 is new - // hence the client creator should be called only once - clients := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("localhost:8080"), - testDiscoveredAdminAPI("localhost:8081"), - } - changed := manager.adjustGatewayClients(clients) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(clients) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) - - t.Run("0 addresses", func(t *testing.T) { - // Change expected addresses - cf.expected = map[string]bool{} - // there are 0 addresses contained in the notification hence the client - // creator should not be called - changed := manager.adjustGatewayClients([]adminapi.DiscoveredAdminAPI(nil)) - require.True(t, changed) - requireNoExpectedCallsLeftEventually(t) - - changed = manager.adjustGatewayClients(nil) - require.False(t, changed, "adjusting clients with the same set of addresses should not change anything") - }) -} - func TestNewAdminAPIClientsManager_NoInitialClientsDisallowed(t *testing.T) { - cf := &clientFactoryWithExpected{t: t} - _, err := NewAdminAPIClientsManager(context.Background(), logrus.New(), nil, cf) - require.Error(t, err) + _, err := clients.NewAdminAPIClientsManager(context.Background(), logrus.New(), nil, &mockReadinessChecker{}) + require.ErrorContains(t, err, "at least one initial client must be provided") } func TestAdminAPIClientsManager_NotRunningNotifyLoop(t *testing.T) { @@ -272,17 +166,17 @@ func TestAdminAPIClientsManager_NotRunningNotifyLoop(t *testing.T) { testClient, err := adminapi.NewTestClient("localhost:8080") require.NoError(t, err) - m, err := NewAdminAPIClientsManager( + m, err := clients.NewAdminAPIClientsManager( context.Background(), logrus.New(), []*adminapi.Client{testClient}, - &clientFactoryWithExpected{t: t}, + &mockReadinessChecker{}, ) require.NoError(t, err) select { case <-m.Running(): - t.Error("expected manager to not run without explicitly running it with RunNotifyLoop method") + t.Error("expected manager to not run without explicitly running it with Run method") case <-time.After(time.Millisecond * 100): } } @@ -292,11 +186,11 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { testClient, err := adminapi.NewTestClient("localhost:8080") require.NoError(t, err) - m, err := NewAdminAPIClientsManager( + m, err := clients.NewAdminAPIClientsManager( context.Background(), logrus.New(), []*adminapi.Client{testClient}, - &clientFactoryWithExpected{t: t}, + &mockReadinessChecker{}, ) require.NoError(t, err) require.Len(t, m.GatewayClients(), 1, "expecting one initial client") @@ -309,49 +203,15 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { require.Equal(t, konnectTestClient, m.KonnectClient(), "konnect client should be returned from KonnectClient") } -func TestAdminAPIClientsManager_GatewayClientsFromNotificationsAreExpectedToHavePodRef(t *testing.T) { - t.Parallel() - - cf := &clientFactoryWithExpected{t: t, expected: map[string]bool{"http://10.0.0.1:8080": true}} - testClient, err := adminapi.NewTestClient("http://localhost:8080") - require.NoError(t, err) - m, err := NewAdminAPIClientsManager( - context.Background(), - logrus.New(), - []*adminapi.Client{testClient}, - cf, - ) - require.NoError(t, err) - m.RunNotifyLoop() - - m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")}) - - require.Eventually(t, func() bool { - gwClients := m.GatewayClients() - if len(gwClients) != 1 { - t.Log("there's no gateway clients...") - return false - } - _, ok := gwClients[0].PodReference() - if !ok { - t.Log("there's no pod reference attached") - } - return ok - }, time.Second, time.Millisecond) -} - func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { t.Parallel() - cf := &clientFactoryWithExpected{t: t, expected: map[string]bool{ - "http://10.0.0.1:8080": true, - "http://10.0.0.2:8080": true, - }} + readinessChecker := &mockReadinessChecker{} testClient, err := adminapi.NewTestClient("http://localhost:8080") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) + m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) t.Run("no notify loop running should return false when subscribing", func(t *testing.T) { @@ -360,16 +220,17 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { require.Falsef(t, ok, "expected no subscription to be created because no notification loop is running") }) - m.RunNotifyLoop() + m.Run() t.Run("when notification loop is running subscription should be created", func(t *testing.T) { ch, ok := m.SubscribeToGatewayClientsChanges() require.NotNil(t, ch) require.True(t, ok) + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1, testURL2)}) m.Notify([]adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("http://10.0.0.1:8080"), - testDiscoveredAdminAPI("http://10.0.0.2:8080"), + testDiscoveredAdminAPI(testURL1), + testDiscoveredAdminAPI(testURL2), }) select { @@ -389,7 +250,8 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { require.NotNil(t, sub2) require.True(t, ok) - m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")}) + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedPending: intoTurnedPending(testURL2)}) + m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) select { case <-sub2: @@ -427,73 +289,52 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { } func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { - t.Parallel() - - cf := alwaysSuccessClientFactory{} - testClient, err := adminapi.NewTestClient("http://10.0.0.1:8080") + readinessChecker := &mockReadinessChecker{} + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) + testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) + m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) - m.RunNotifyLoop() - - var receivedNotificationsCount atomic.Uint32 - ch, ok := m.SubscribeToGatewayClientsChanges() - require.NotNil(t, ch) - require.True(t, ok) + m.Run() - // Run subscriber worker in a separate goroutine to consume notifications. + // Run a goroutine that will call GatewayClients() every millisecond. go func() { for { select { - case <-ch: - // Call GatewayClients() here to make sure that we can access the clients safely - // from the subscriber goroutine without causing a deadlock in the notify loop. - require.Len(t, m.GatewayClients(), 1, "expected to get 1 client") - receivedNotificationsCount.Add(1) + case <-time.Tick(time.Millisecond): + _ = m.GatewayClients() case <-ctx.Done(): return } } }() - // Run multiple notifiers in parallel to make sure that Notify is safe for concurrent use. - // We'll use two sets of clients interchangeably to cause an actual change that results in a notification. - oddClients := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.1:8080")} - evenClients := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI("http://10.0.0.2:8080")} - for i := 0; i < 10; i++ { - i := i - go func() { - // Notify with even or odd clients interchangeably depending on the iteration to trigger a change. - if pickEven := i%2 == 0; pickEven { - m.Notify(evenClients) - } else { - m.Notify(oddClients) - } - }() - } + go func() { + for i := 0; i < 100; i++ { + go m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)}) + } + }() require.Eventually(t, func() bool { - if count := receivedNotificationsCount.Load(); count < 2 { - t.Logf("Received %d notifications, expected at least 2, waiting...", count) - return false - } - return true - }, time.Second, time.Millisecond, "expected to receive at least 2 notifications") + return readinessChecker.CallsCount() == 100 + }, time.Second, time.Millisecond) } -func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange(t *testing.T) { - cf := alwaysSuccessClientFactory{} - testClient, err := adminapi.NewTestClient("http://10.0.0.1:8080") +func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) { + testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) + readinessChecker := &mockReadinessChecker{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m, err := NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, cf) + m, err := clients.NewAdminAPIClientsManager(ctx, logrus.New(), []*adminapi.Client{testClient}, readinessChecker) require.NoError(t, err) - m.RunNotifyLoop() + + m.Run() + <-m.Running() var receivedNotificationsCount atomic.Uint32 ch, ok := m.SubscribeToGatewayClientsChanges() @@ -512,12 +353,8 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( } }() - firstClientsSet := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("http://10.0.0.1:8080"), - } - secondClientsSet := []adminapi.DiscoveredAdminAPI{ - testDiscoveredAdminAPI("http://10.0.0.2:8080"), - } + firstClientsSet := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1)} + secondClientsSet := []adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL2)} notificationsCountEventuallyEquals := func(expectedCount int) { require.Eventually(t, func() bool { if count := receivedNotificationsCount.Load(); count != uint32(expectedCount) { @@ -527,22 +364,120 @@ func TestAdminAPIClientsManager_NotifiesSubscribersOnlyWhenGatewayClientsChange( return true }, time.Second, time.Millisecond, "expected to receive %d notifications", expectedCount) } + requireLastReadinessCheckCall := func(expected readinessCheckCall) { + call, ok := readinessChecker.LastCall() + require.True(t, ok, "expected call to readiness checker") + require.Equal(t, expected, call) + } // Notify the first set of clients and make sure that the subscriber doesn't get notified as it was initial state. m.Notify(firstClientsSet) notificationsCountEventuallyEquals(0) + require.Equal(t, 1, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{}, + }) // Notify an empty set of clients and make sure that the subscriber get notified. m.Notify(nil) notificationsCountEventuallyEquals(1) + require.Equal(t, 1, readinessChecker.CallsCount(), "no readiness check should be performed when notifying an empty set") // Notify an empty set again and make sure that the subscriber doesn't get notified as the state didn't change. m.Notify(nil) notificationsCountEventuallyEquals(1) + require.Equal(t, 1, readinessChecker.CallsCount(), "no readiness check should be performed when notifying an empty set") + + // Notify the second set of clients without making the new one ready and make sure that the subscriber gets no notification. + m.Notify(secondClientsSet) + notificationsCountEventuallyEquals(1) + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 2, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") - // Notify the second set of clients and make sure that the subscriber gets notified. + // Notify the second set of clients and make sure that the subscriber gets notified after the new one becomes ready. + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) m.Notify(secondClientsSet) notificationsCountEventuallyEquals(2) + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 3, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") + + m.Notify([]adminapi.DiscoveredAdminAPI{firstClientsSet[0], secondClientsSet[0]}) + notificationsCountEventuallyEquals(3) + requireLastReadinessCheckCall(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL2}, + PendingURLs: []string{testURL1}, + }) + require.Equal(t, 4, readinessChecker.CallsCount(), "expected readiness check on non-empty set of clients") +} + +func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) { + testClient, err := adminapi.NewTestClient(testURL1) + require.NoError(t, err) + + readinessTicker := mocks.NewTicker() + readinessChecker := &mockReadinessChecker{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + m, err := clients.NewAdminAPIClientsManager( + ctx, + logrus.New(), + []*adminapi.Client{testClient}, + readinessChecker, + clients.WithReadinessReconciliationTicker(readinessTicker), + ) + require.NoError(t, err) + m.Run() + <-m.Running() + + readinessCheckCallEventuallyMatches := func(expected readinessCheckCall) { + require.Eventually(t, func() bool { + lastCall, wasCalledAtAll := readinessChecker.LastCall() + if !wasCalledAtAll { + t.Log("Readiness checker was not called yet, waiting...") + return false + } + if diff := cmp.Diff(expected, lastCall); diff != "" { + t.Logf("Readiness checker was called with unexpected arguments: %s", diff) + return false + } + return true + }, time.Second, time.Millisecond) + } + + // Trigger the first readiness check. + readinessTicker.Add(clients.DefaultReadinessReconciliationInterval) + readinessCheckCallEventuallyMatches(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{}, + }) + require.Equal(t, 1, readinessChecker.CallsCount()) + + // Notify with a new client and check the readiness check call was made as expected. + m.Notify([]adminapi.DiscoveredAdminAPI{testDiscoveredAdminAPI(testURL1), testDiscoveredAdminAPI(testURL2)}) + readinessCheckCallEventuallyMatches(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 2, readinessChecker.CallsCount()) + + // Trigger a next readiness check which will make testURL2 ready. + readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL2)}) + readinessTicker.Add(clients.DefaultReadinessReconciliationInterval) + readinessCheckCallEventuallyMatches(readinessCheckCall{ + AlreadyCreatedURLs: []string{testURL1}, + PendingURLs: []string{testURL2}, + }) + require.Equal(t, 3, readinessChecker.CallsCount()) + require.True(t, lo.ContainsBy(m.GatewayClients(), func(c *adminapi.Client) bool { + return c.BaseRootURL() == testURL2 + }), "expected to find the new client in the manager's clients list after it became ready") } func testDiscoveredAdminAPI(address string) adminapi.DiscoveredAdminAPI { diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go new file mode 100644 index 0000000000..0eb388101a --- /dev/null +++ b/internal/clients/readiness.go @@ -0,0 +1,159 @@ +package clients + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/go-logr/logr" + k8stypes "k8s.io/apimachinery/pkg/types" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" +) + +const ( + readinessCheckTimeout = time.Second +) + +// ReadinessCheckResult represents the result of a readiness check. +type ReadinessCheckResult struct { + // ClientsTurnedReady are the clients that were pending and are now ready to be used. + ClientsTurnedReady []*adminapi.Client + // ClientsTurnedPending are the clients that were ready and are now pending to be created. + ClientsTurnedPending []adminapi.DiscoveredAdminAPI +} + +// HasChanges returns true if there are any changes in the readiness check result. +// When no changes are present, it means that the readiness check haven't successfully created any pending client +// nor detected any already created client that became not ready. +func (r ReadinessCheckResult) HasChanges() bool { + return len(r.ClientsTurnedReady) > 0 || len(r.ClientsTurnedPending) > 0 +} + +// ReadinessChecker is responsible for checking the readiness of Admin API clients. +type ReadinessChecker interface { + // CheckReadiness checks readiness of the provided clients: + // - alreadyCreatedClients are the clients that have already been created. The readiness of these clients will be + // checked by their IsReady() method. + // - pendingClients are the clients that have not been created yet and are pending to be created. The readiness of + // these clients will be checked by trying to create them. + CheckReadiness( + ctx context.Context, + alreadyCreatedClients []AlreadyCreatedClient, + pendingClients []adminapi.DiscoveredAdminAPI, + ) ReadinessCheckResult +} + +// AlreadyCreatedClient represents an Admin API client that has already been created. +type AlreadyCreatedClient interface { + IsReady(context.Context) error + PodReference() (k8stypes.NamespacedName, bool) + BaseRootURL() string +} + +type DefaultReadinessChecker struct { + factory ClientFactory + logger logr.Logger +} + +func NewDefaultReadinessChecker(factory ClientFactory, logger logr.Logger) DefaultReadinessChecker { + return DefaultReadinessChecker{ + factory: factory, + logger: logger, + } +} + +func (c DefaultReadinessChecker) CheckReadiness( + ctx context.Context, + readyClients []AlreadyCreatedClient, + pendingClients []adminapi.DiscoveredAdminAPI, +) ReadinessCheckResult { + return ReadinessCheckResult{ + ClientsTurnedReady: c.checkPendingGatewayClients(ctx, pendingClients), + ClientsTurnedPending: c.checkAlreadyExistingClients(ctx, readyClients), + } +} + +// checkPendingGatewayClients checks if the pending clients are ready to be used and returns the ones that are. +func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) { + for _, adminAPI := range lastPending { + if client := c.checkPendingClient(ctx, adminAPI); client != nil { + turnedReady = append(turnedReady, client) + } + } + return turnedReady +} + +// checkPendingClient indirectly check readiness of the client by trying to create it. If it succeeds then it +// means that the client is ready to be used. It returns a non-nil client if the client is ready to be used, otherwise +// nil is returned. +func (c DefaultReadinessChecker) checkPendingClient( + ctx context.Context, + pendingClient adminapi.DiscoveredAdminAPI, +) (client *adminapi.Client) { + defer func() { + c.logger.V(util.DebugLevel). + Info(fmt.Sprintf("checking readiness of pending client for %q", pendingClient.Address), + "ok", client != nil, + ) + }() + + ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) + defer cancel() + client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient) + if err != nil { + // Despite the error reason we still want to keep the client in the pending list to retry later. + c.logger.V(util.DebugLevel).Error(err, fmt.Sprintf("pending client for %q is not ready yet", pendingClient.Address)) + return nil + } + + return client +} + +// checkAlreadyExistingClients checks if the already existing clients are still ready to be used and returns the ones +// that are not. +func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, alreadyCreatedClients []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { + for _, client := range alreadyCreatedClients { + // For ready clients we check readiness by calling the Status endpoint. + if ready := c.checkAlreadyCreatedClient(ctx, client); !ready { + podRef, ok := client.PodReference() + if !ok { + // This should never happen, but if it does, we want to log it. + c.logger.Error( + errors.New("missing pod reference"), + fmt.Sprintf("failed to get PodReference for client %q", client.BaseRootURL()), + ) + continue + } + turnedPending = append(turnedPending, adminapi.DiscoveredAdminAPI{ + Address: client.BaseRootURL(), + PodRef: podRef, + }) + } + } + return turnedPending +} + +func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, client AlreadyCreatedClient) (ready bool) { + defer func() { + c.logger.V(util.DebugLevel).Info( + fmt.Sprintf("checking readiness of already created client for %q", client.BaseRootURL()), + "ok", ready, + ) + }() + + ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) + defer cancel() + if err := client.IsReady(ctx); err != nil { + // Despite the error reason we still want to keep the client in the pending list to retry later. + c.logger.V(util.DebugLevel).Error( + err, + fmt.Sprintf("already created client for %q is not ready, moving to pending", client.BaseRootURL()), + ) + return false + } + + return true +} diff --git a/internal/clients/readiness_test.go b/internal/clients/readiness_test.go new file mode 100644 index 0000000000..b0b6830aa3 --- /dev/null +++ b/internal/clients/readiness_test.go @@ -0,0 +1,219 @@ +package clients_test + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/samber/lo" + "github.com/stretchr/testify/require" + k8stypes "k8s.io/apimachinery/pkg/types" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" +) + +const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" +) + +var testPodRef = k8stypes.NamespacedName{ + Namespace: "default", + Name: "mock", +} + +type mockClientFactory struct { + ready map[string]bool // Maps address to readiness. + callsCount map[string]int // Maps address to number of CreateAdminAPIClient calls. + t *testing.T +} + +func newMockClientFactory(t *testing.T, ready map[string]bool) mockClientFactory { + return mockClientFactory{ + ready: ready, + callsCount: map[string]int{}, + t: t, + } +} + +func (cf mockClientFactory) CreateAdminAPIClient(_ context.Context, adminAPI adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) { + address := adminAPI.Address + + cf.callsCount[address]++ + + ready, ok := cf.ready[address] + if !ok { + cf.t.Errorf("unexpected client creation for %s", address) + } + if !ok || !ready { + return nil, fmt.Errorf("client for %s is not ready", address) + } + + return adminapi.NewTestClient(address) +} + +type mockAlreadyCreatedClient struct { + url string + isReady bool +} + +func (m mockAlreadyCreatedClient) IsReady(context.Context) error { + if !m.isReady { + return errors.New("not ready") + } + return nil +} + +func (m mockAlreadyCreatedClient) PodReference() (k8stypes.NamespacedName, bool) { + return testPodRef, true +} + +func (m mockAlreadyCreatedClient) BaseRootURL() string { + return m.url +} + +func TestDefaultReadinessChecker(t *testing.T) { + testCases := []struct { + name string + + alreadyCreatedClients []clients.AlreadyCreatedClient + pendingClients []adminapi.DiscoveredAdminAPI + pendingClientsReadiness map[string]bool + + expectedTurnedReady []string + expectedTurnedPending []string + }{ + { + name: "ready turning pending", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: false, + }, + }, + expectedTurnedPending: []string{testURL1}, + }, + { + name: "pending turning ready", + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL1, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL1: true, + }, + expectedTurnedReady: []string{testURL1}, + }, + { + name: "ready turning pending, pending turning ready at once", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: false, + }, + }, + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL2, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL2: true, + }, + expectedTurnedReady: []string{testURL2}, + expectedTurnedPending: []string{testURL1}, + }, + { + name: "no changes", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: true, + }, + }, + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL2, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL2: false, + }, + expectedTurnedReady: nil, + expectedTurnedPending: nil, + }, + { + name: "no clients at all", + expectedTurnedReady: nil, + expectedTurnedPending: nil, + }, + { + name: "multiple ready, one turning pending", + alreadyCreatedClients: []clients.AlreadyCreatedClient{ + mockAlreadyCreatedClient{ + url: testURL1, + isReady: true, + }, + mockAlreadyCreatedClient{ + url: testURL2, + isReady: false, // This one will turn pending. + }, + }, + expectedTurnedReady: nil, + expectedTurnedPending: []string{ + testURL2, + }, + }, + { + name: "multiple pending, one turning ready", + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL1, + PodRef: testPodRef, + }, + { + Address: testURL2, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL1: false, + testURL2: true, // This one will turn ready. + }, + expectedTurnedReady: []string{ + testURL2, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + factory := newMockClientFactory(t, tc.pendingClientsReadiness) + checker := clients.NewDefaultReadinessChecker(factory, logr.Discard()) + result := checker.CheckReadiness(context.Background(), tc.alreadyCreatedClients, tc.pendingClients) + + turnedPending := lo.Map(result.ClientsTurnedPending, func(c adminapi.DiscoveredAdminAPI, _ int) string { return c.Address }) + turnedReady := lo.Map(result.ClientsTurnedReady, func(c *adminapi.Client, _ int) string { return c.BaseRootURL() }) + + require.ElementsMatch(t, tc.expectedTurnedReady, turnedReady) + require.ElementsMatch(t, tc.expectedTurnedPending, turnedPending) + + // For every pending client turning ready we expect exactly one call to CreateAdminAPIClient. + for _, url := range tc.pendingClients { + require.Equal(t, 1, factory.callsCount[url.Address]) + } + + // For every already created client we expect NO calls to CreateAdminAPIClient. + for _, url := range tc.alreadyCreatedClients { + require.Zero(t, factory.callsCount[url.BaseRootURL()]) + } + }) + } +} diff --git a/internal/controllers/configuration/kongadminapi_controller.go b/internal/controllers/configuration/kongadminapi_controller.go index 96e4ae8abc..a5b6866b49 100644 --- a/internal/controllers/configuration/kongadminapi_controller.go +++ b/internal/controllers/configuration/kongadminapi_controller.go @@ -166,8 +166,9 @@ func (r *KongAdminAPIServiceReconciler) Reconcile(ctx context.Context, req ctrl. func (r *KongAdminAPIServiceReconciler) notify() { discovered := flattenDiscoveredAdminAPIs(r.Cache) + addresses := lo.Map(discovered, func(d adminapi.DiscoveredAdminAPI, _ int) string { return d.Address }) r.Log.V(util.DebugLevel). - Info("notifying about newly detected Admin APIs", "admin_apis", discovered) + Info("notifying about newly detected Admin APIs", "admin_apis", addresses) r.EndpointsNotifier.Notify(discovered) } diff --git a/internal/controllers/configuration/kongadminapi_controller_envtest_test.go b/internal/controllers/configuration/kongadminapi_controller_envtest_test.go index 1b4b56fe92..5ab6e303d1 100644 --- a/internal/controllers/configuration/kongadminapi_controller_envtest_test.go +++ b/internal/controllers/configuration/kongadminapi_controller_envtest_test.go @@ -192,7 +192,7 @@ func TestKongAdminAPIController(t *testing.T) { ) }) - t.Run("not Ready Endpoints are not matched", func(t *testing.T) { + t.Run("terminating Endpoints are not matched", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() adminService, adminPod, n := startKongAdminAPIServiceReconciler(ctx, t, client, cfg) @@ -218,7 +218,7 @@ func TestKongAdminAPIController(t *testing.T) { { Addresses: []string{"10.0.0.1"}, Conditions: discoveryv1.EndpointConditions{ - Ready: lo.ToPtr(false), + Terminating: lo.ToPtr(true), }, TargetRef: &corev1.ObjectReference{ Kind: "Pod", @@ -459,16 +459,16 @@ func TestKongAdminAPIController(t *testing.T) { n.LastNotified(), ) - // Update all endpoints so that they are not Ready. + // Update all endpoints so that they are Terminating. for i := range endpoints.Endpoints { - endpoints.Endpoints[i].Conditions.Ready = lo.ToPtr(false) + endpoints.Endpoints[i].Conditions.Terminating = lo.ToPtr(true) } require.NoError(t, client.Update(ctx, &endpoints, &ctrlclient.UpdateOptions{})) require.NoError(t, client.Get(ctx, k8stypes.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}, &endpoints, &ctrlclient.GetOptions{})) assert.Eventually(t, func() bool { return len(n.LastNotified()) == 0 }, 3*time.Second, time.Millisecond) - // Update 1 endpoint so that that it's Ready. - endpoints.Endpoints[0].Conditions.Ready = lo.ToPtr(true) + // Update 1 endpoint so that it's not Terminating. + endpoints.Endpoints[0].Conditions.Terminating = nil require.NoError(t, client.Update(ctx, &endpoints, &ctrlclient.UpdateOptions{})) require.NoError(t, client.Get(ctx, k8stypes.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}, &endpoints, &ctrlclient.GetOptions{})) diff --git a/internal/manager/run.go b/internal/manager/run.go index f26d77395e..a86b1ebaf9 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -134,18 +134,19 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("Initializing Dataplane Client") eventRecorder := mgr.GetEventRecorderFor(KongClientEventRecorderComponentName) + readinessChecker := clients.NewDefaultReadinessChecker(adminAPIClientsFactory, setupLog.WithName("readiness-checker")) clientsManager, err := clients.NewAdminAPIClientsManager( ctx, deprecatedLogger, initialKongClients, - adminAPIClientsFactory, + readinessChecker, ) if err != nil { return fmt.Errorf("failed to create AdminAPIClientsManager: %w", err) } if c.KongAdminSvc.IsPresent() { - setupLog.Info("Running AdminAPIClientsManager notify loop") - clientsManager.RunNotifyLoop() + setupLog.Info("Running AdminAPIClientsManager loop") + clientsManager.Run() } setupLog.Info("Starting Admission Server") diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index 654efd2d24..34ecba22b7 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -294,7 +294,7 @@ func waitForDeploymentRollout(ctx context.Context, t *testing.T, env environment } if err := allUpdatedReplicasRolledOutAndReady(deployment); err != nil { - t.Logf("controller deployment not ready: %s", err) + t.Logf("%s/%s deployment not ready: %s", namespace, name, err) return false } @@ -913,3 +913,28 @@ func scaleDeployment(ctx context.Context, t *testing.T, env environments.Environ return deployment.Status.ReadyReplicas == replicas }, time.Minute*3, time.Second, "deployment %s did not scale to %d replicas", deployment.Name, replicas) } + +func (d Deployments) Restart(ctx context.Context, t *testing.T, env environments.Environment) { + t.Helper() + + err := env.Cluster().Client().CoreV1().Pods(d.ControllerNN.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.ControllerNN.Name, + }, + }), + }) + require.NoError(t, err, "failed to delete controller pods") + + err = env.Cluster().Client().CoreV1().Pods(d.ControllerNN.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.ProxyNN.Name, + }, + }), + }) + require.NoError(t, err, "failed to delete proxy pods") + + waitForDeploymentRollout(ctx, t, env, d.ControllerNN.Namespace, d.ControllerNN.Name) + waitForDeploymentRollout(ctx, t, env, d.ProxyNN.Namespace, d.ProxyNN.Name) +} diff --git a/test/e2e/kuma_test.go b/test/e2e/kuma_test.go index 336393d12b..a04d035ca2 100644 --- a/test/e2e/kuma_test.go +++ b/test/e2e/kuma_test.go @@ -28,13 +28,8 @@ func TestDeployAllInOneDBLESSKuma(t *testing.T) { require.NoError(t, kuma.EnableMeshForNamespace(ctx, env.Cluster(), "kong")) require.NoError(t, kuma.EnableMeshForNamespace(ctx, env.Cluster(), "default")) - // scale to force a restart of pods and trigger mesh injection (we can't annotate the Kong namespace in advance, - // it gets clobbered by deployKong()). is there a "rollout restart" in client-go? who knows! - scaleDeployment(ctx, t, env, deployments.ProxyNN, 0) - scaleDeployment(ctx, t, env, deployments.ControllerNN, 0) - - scaleDeployment(ctx, t, env, deployments.ProxyNN, 2) - scaleDeployment(ctx, t, env, deployments.ControllerNN, 2) + // Restart Kong pods to trigger mesh injection. + deployments.Restart(ctx, t, env) t.Log("running ingress tests to verify all-in-one deployed ingress controller and proxy are functional") deployIngressWithEchoBackends(ctx, t, env, numberOfEchoBackends) diff --git a/test/e2e/upgrade_test.go b/test/e2e/upgrade_test.go index 35495956ef..d96d69441a 100644 --- a/test/e2e/upgrade_test.go +++ b/test/e2e/upgrade_test.go @@ -110,8 +110,9 @@ func testManifestsUpgrade( t.Logf("deploying target version of kong manifests: %s", testParams.toManifestPath) deployments := ManifestDeploy{ - Path: testParams.toManifestPath, - SkipTestPatches: true, + Path: testParams.toManifestPath, + // Do not skip test patches - we want to verify that upgrade works with an image override in target manifest. + SkipTestPatches: false, }.Run(ctx, t, env) if featureGates := testParams.controllerFeatureGates; featureGates != "" {