From 1dd07258f3169020935cdaed6b556c7c12fc130d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Wed, 17 May 2023 19:24:16 +0200 Subject: [PATCH 1/2] feat(konnect): account for konnect sync failures in reported node status It extends the config sync status reported to Konnect's Node API with values that take into account failures of syncing configuration with Konnect RG, effectively extending it with three new values (on the API surface level): - INGRESS_CONTROLLER_STATE_OPERATIONAL_KONNECT_OUT_OF_SYNC - INGRESS_CONTROLLER_STATE_PARTIAL_CONFIG_FAIL_KONNECT_OUT_OF_SYNC - INGRESS_CONTROLLER_STATE_INOPERABLE_KONNECT_OUT_OF_SYNC There's no strict contract for allowed values on the API level, however, there's an implicit contract between KIC and Konnect UI that directly consumes the Node object, therefore we're keeping the old values unchanged to make them backward-compatible. --- CHANGELOG.md | 8 +- internal/clients/config_status.go | 53 ++++++-- internal/clients/config_status_test.go | 77 ++++++++++- internal/dataplane/kong_client.go | 42 +++--- internal/dataplane/kong_client_test.go | 176 ++++++++++++++++++++++--- internal/konnect/node_agent.go | 20 ++- internal/konnect/node_agent_test.go | 80 +++++++++++ internal/konnect/nodes/types.go | 13 +- 8 files changed, 409 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1372851552..b7605c87a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,11 +114,15 @@ Adding a new version? You'll need three changes: during the start-up phase. From now on, they will be dynamically started in runtime once their installation is detected, making restarting the process unnecessary. [#3996](https://github.com/Kong/kubernetes-ingress-controller/pull/3996) -- Disable translation of unspported kubernetes objects when translating to - expression based routes is enabled (`ExpressionRoutes` feature enabled AND +- Disable translation of unsupported Kubernetes objects when translating to + expression based routes is enabled (`ExpressionRoutes` feature enabled AND kong using router flavor `expressions`), and generate a translation failure event attached to each of the unsupported objects. [#4022](https://github.com/Kong/kubernetes-ingress-controller/pull/4022) +- Controller's configuration synchronization status reported to Konnect's Node API + now accounts for potential failures in synchronizing configuration with Konnect's + Runtime Group Admin API. + [#4029](https://github.com/Kong/kubernetes-ingress-controller/pull/4029) ### Changed diff --git a/internal/clients/config_status.go b/internal/clients/config_status.go index a62da397f6..1b998fb4f3 100644 --- a/internal/clients/config_status.go +++ b/internal/clients/config_status.go @@ -7,19 +7,54 @@ import ( "github.com/go-logr/logr" ) -type ConfigStatus int +// ConfigStatus is an enumerated type that represents the status of the configuration synchronisation. +// Look at CalculateConfigStatus for more details. +type ConfigStatus string const ( - // ConfigStatusOK: no error happens in translation from k8s objects to kong configuration - // and succeeded to apply kong configuration to kong gateway. - ConfigStatusOK ConfigStatus = iota - // ConfigStatusTranslationErrorHappened: error happened in translation of k8s objects - // but succeeded to apply kong configuration for remaining objects. - ConfigStatusTranslationErrorHappened - // ConfigStatusApplyFailed: failed to apply kong configurations. - ConfigStatusApplyFailed + ConfigStatusOK ConfigStatus = "OK" + ConfigStatusTranslationErrorHappened ConfigStatus = "TranslationErrorHappened" + ConfigStatusApplyFailed ConfigStatus = "ApplyFailed" + ConfigStatusOKKonnectApplyFailed ConfigStatus = "OKKonnectApplyFailed" + ConfigStatusTranslationErrorHappenedKonnectApplyFailed ConfigStatus = "TranslationErrorHappenedKonnectApplyFailed" + ConfigStatusApplyFailedKonnectApplyFailed ConfigStatus = "ApplyFailedKonnectApplyFailed" + ConfigStatusUnknown ConfigStatus = "Unknown" ) +// CalculateConfigStatusInput aggregates the input to CalculateConfigStatus. +type CalculateConfigStatusInput struct { + // Any error occurred when syncing with Gateways. + GatewaysFailed bool + + // Any error occurred when syncing with Konnect, + KonnectFailed bool + + // Translation of some of Kubernetes objects failed. + TranslationFailuresOccurred bool +} + +// CalculateConfigStatus calculates a clients.ConfigStatus that sums up the configuration synchronisation result as +// a single enumerated value. +func CalculateConfigStatus(i CalculateConfigStatusInput) ConfigStatus { + switch { + case !i.GatewaysFailed && !i.KonnectFailed && !i.TranslationFailuresOccurred: + return ConfigStatusOK + case !i.GatewaysFailed && !i.KonnectFailed && i.TranslationFailuresOccurred: + return ConfigStatusTranslationErrorHappened + case i.GatewaysFailed && !i.KonnectFailed: // We don't care about translation failures if we can't apply to gateways. + return ConfigStatusApplyFailed + case !i.GatewaysFailed && i.KonnectFailed && !i.TranslationFailuresOccurred: + return ConfigStatusOKKonnectApplyFailed + case !i.GatewaysFailed && i.KonnectFailed && i.TranslationFailuresOccurred: + return ConfigStatusTranslationErrorHappenedKonnectApplyFailed + case i.GatewaysFailed && i.KonnectFailed: // We don't care about translation failures if we can't apply to gateways. + return ConfigStatusApplyFailedKonnectApplyFailed + } + + // Shouldn't happen. + return ConfigStatusUnknown +} + type ConfigStatusNotifier interface { NotifyConfigStatus(context.Context, ConfigStatus) } diff --git a/internal/clients/config_status_test.go b/internal/clients/config_status_test.go index 6d1a811572..d79576f421 100644 --- a/internal/clients/config_status_test.go +++ b/internal/clients/config_status_test.go @@ -1,4 +1,4 @@ -package clients +package clients_test import ( "context" @@ -6,11 +6,14 @@ import ( "time" "github.com/go-logr/logr/testr" + "github.com/stretchr/testify/require" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" ) func TestChannelConfigNotifier(t *testing.T) { logger := testr.New(t) - n := NewChannelConfigNotifier(logger) + n := clients.NewChannelConfigNotifier(logger) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -18,7 +21,7 @@ func TestChannelConfigNotifier(t *testing.T) { // Call NotifyConfigStatus 5 times to make sure that the method is non-blocking. for i := 0; i < 5; i++ { - n.NotifyConfigStatus(ctx, ConfigStatusOK) + n.NotifyConfigStatus(ctx, clients.ConfigStatusOK) } for i := 0; i < 5; i++ { @@ -29,3 +32,71 @@ func TestChannelConfigNotifier(t *testing.T) { } } } + +func TestCalculateConfigStatus(t *testing.T) { + testCases := []struct { + name string + + gatewayFailure bool + konnectFailure bool + translationFailures bool + + expectedConfigStatus clients.ConfigStatus + }{ + { + name: "success", + expectedConfigStatus: clients.ConfigStatusOK, + }, + { + name: "gateway failure", + gatewayFailure: true, + expectedConfigStatus: clients.ConfigStatusApplyFailed, + }, + { + name: "translation failures", + translationFailures: true, + expectedConfigStatus: clients.ConfigStatusTranslationErrorHappened, + }, + { + name: "konnect failure", + konnectFailure: true, + expectedConfigStatus: clients.ConfigStatusOKKonnectApplyFailed, + }, + { + name: "both gateway and konnect failure", + gatewayFailure: true, + konnectFailure: true, + expectedConfigStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, + }, + { + name: "translation failures and konnect failure", + translationFailures: true, + konnectFailure: true, + expectedConfigStatus: clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed, + }, + { + name: "gateway failure with translation failures", + gatewayFailure: true, + translationFailures: true, + expectedConfigStatus: clients.ConfigStatusApplyFailed, + }, + { + name: "both gateway and konnect failure with translation failures", + gatewayFailure: true, + konnectFailure: true, + translationFailures: true, + expectedConfigStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := clients.CalculateConfigStatus(clients.CalculateConfigStatusInput{ + GatewaysFailed: tc.gatewayFailure, + KonnectFailed: tc.konnectFailure, + TranslationFailuresOccurred: tc.translationFailures, + }) + require.Equal(t, tc.expectedConfigStatus, result) + }) + } +} diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index e9f202f080..537725fc95 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -368,21 +368,22 @@ func (c *KongClient) Update(ctx context.Context) error { c.logger.Debug("successfully built data-plane configuration") } - shas, err := c.sendOutToGatewayClients(ctx, parsingResult.KongState, c.kongConfig) - if err != nil { - c.configStatusNotifier.NotifyConfigStatus(ctx, clients.ConfigStatusApplyFailed) - return err - } - - c.trySendOutToKonnectClient(ctx, parsingResult.KongState, c.kongConfig) - - // succeeded to apply configuration to Kong gateway. - // notify the receiver of config status that translation error happened when there are translation errors, - // otherwise notify that config status is OK. - if len(parsingResult.TranslationFailures) > 0 { - c.configStatusNotifier.NotifyConfigStatus(ctx, clients.ConfigStatusTranslationErrorHappened) - } else { - c.configStatusNotifier.NotifyConfigStatus(ctx, clients.ConfigStatusOK) + shas, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, parsingResult.KongState, c.kongConfig) + konnectSyncErr := c.maybeSendOutToKonnectClient(ctx, parsingResult.KongState, c.kongConfig) + + // Taking into account the results of syncing configuration with Gateways and Konnect, and potential translation + // failures, calculate the config status and report it. + c.configStatusNotifier.NotifyConfigStatus(ctx, clients.CalculateConfigStatus( + clients.CalculateConfigStatusInput{ + GatewaysFailed: gatewaysSyncErr != nil, + KonnectFailed: konnectSyncErr != nil, + TranslationFailuresOccurred: len(parsingResult.TranslationFailures) > 0, + }, + )) + + // In case of a failure in syncing configuration with Gateways, propagate the error. + if gatewaysSyncErr != nil { + return gatewaysSyncErr } // report on configured Kubernetes objects if enabled @@ -420,12 +421,13 @@ func (c *KongClient) sendOutToGatewayClients( return previousSHAs, nil } -// It will try to send ignore errors that are returned from Konnect client. -func (c *KongClient) trySendOutToKonnectClient(ctx context.Context, s *kongstate.KongState, config sendconfig.Config) { +// maybeSendOutToKonnectClient sends out the configuration to Konnect when KonnectClient is provided. +// It's a noop when Konnect integration is not enabled. +func (c *KongClient) maybeSendOutToKonnectClient(ctx context.Context, s *kongstate.KongState, config sendconfig.Config) error { konnectClient := c.clientsProvider.KonnectClient() // There's no KonnectClient configured, that's totally fine. if konnectClient == nil { - return + return nil } if _, err := c.sendToClient(ctx, konnectClient, s, config); err != nil { @@ -434,11 +436,13 @@ func (c *KongClient) trySendOutToKonnectClient(ctx context.Context, s *kongstate if errors.Is(err, sendconfig.ErrUpdateSkippedDueToBackoffStrategy{}) { c.logger.WithError(err).Warn("Skipped pushing configuration to Konnect") - return } c.logger.WithError(err).Warn("Failed pushing configuration to Konnect") + return err } + + return nil } func (c *KongClient) sendToClient( diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 7b2b855b02..ce23e71641 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/clients" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/failures" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/kongstate" @@ -164,11 +165,15 @@ func (f *mockUpdateStrategyResolver) ResolveUpdateStrategy(c sendconfig.UpdateCl } // returnErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return an error on Update(). -func (f *mockUpdateStrategyResolver) returnErrorOnUpdate(url string) { +func (f *mockUpdateStrategyResolver) returnErrorOnUpdate(url string, shouldReturnErr bool) { f.lock.Lock() defer f.lock.Unlock() - f.shouldReturnErrorOnUpdate[url] = struct{}{} + if shouldReturnErr { + f.shouldReturnErrorOnUpdate[url] = struct{}{} + } else { + delete(f.shouldReturnErrorOnUpdate, url) + } } // updateCalledForURLCallback returns a function that will be called when the mockUpdateStrategy is called. @@ -234,15 +239,7 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged( return m.hasConfigurationChanged, nil } -type noopKongConfigBuilder struct{} - -func (p noopKongConfigBuilder) BuildKongConfig() parser.KongConfigBuildingResult { - return parser.KongConfigBuildingResult{ - KongState: &kongstate.KongState{}, - } -} - -func TestKongClientUpdate_AllExpectedClientsAreCalled(t *testing.T) { +func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) { t.Parallel() var ( @@ -315,12 +312,13 @@ func TestKongClientUpdate_AllExpectedClientsAreCalled(t *testing.T) { } updateStrategyResolver := newMockUpdateStrategyResolver(t) for _, url := range tc.errorOnUpdateForURLs { - updateStrategyResolver.returnErrorOnUpdate(url) + updateStrategyResolver.returnErrorOnUpdate(url, true) } // always return true for HasConfigurationChanged to trigger an update configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + configBuilder := newMockKongConfigBuilder() - kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector) + kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder) err := kongClient.Update(ctx) if tc.expectError { @@ -347,8 +345,9 @@ func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) { // no change in config, we'll expect no update to be called configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: false} + configBuilder := newMockKongConfigBuilder() - kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector) + kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder) ctx := context.Background() err := kongClient.Update(ctx) @@ -357,19 +356,150 @@ func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) { updateStrategyResolver.assertNoUpdateCalled() } +type mockConfigStatusQueue struct { + wasNotified bool +} + +func newMockConfigStatusQueue() *mockConfigStatusQueue { + return &mockConfigStatusQueue{} +} + +func (m *mockConfigStatusQueue) NotifyConfigStatus(context.Context, clients.ConfigStatus) { + m.wasNotified = true +} + +func (m *mockConfigStatusQueue) WasNotified() bool { + return m.wasNotified +} + +type mockKongConfigBuilder struct { + translationFailuresToReturn []failures.ResourceFailure +} + +func newMockKongConfigBuilder() *mockKongConfigBuilder { + return &mockKongConfigBuilder{} +} + +func (p *mockKongConfigBuilder) BuildKongConfig() parser.KongConfigBuildingResult { + return parser.KongConfigBuildingResult{ + KongState: &kongstate.KongState{}, + TranslationFailures: p.translationFailuresToReturn, + } +} + +func (p *mockKongConfigBuilder) returnTranslationFailures(enabled bool) { + if enabled { + // Return some mocked translation failures. + p.translationFailuresToReturn = []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("some reason", &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + }, + )), + } + } else { + p.translationFailuresToReturn = nil + } +} + +func TestKongClientUpdate_ConfigStatusIsAlwaysNotified(t *testing.T) { + t.Parallel() + + var ( + ctx = context.Background() + testKonnectClient = mustSampleKonnectClient(t) + testGatewayClient = mustSampleGatewayClient(t) + + clientsProvider = mockGatewayClientsProvider{ + gatewayClients: []*adminapi.Client{testGatewayClient}, + konnectClient: testKonnectClient, + } + + updateStrategyResolver = newMockUpdateStrategyResolver(t) + configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + configBuilder = newMockKongConfigBuilder() + kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder) + ) + + testCases := []struct { + name string + gatewayFailure bool + konnectFailure bool + translationFailures bool + }{ + { + name: "success", + gatewayFailure: false, + konnectFailure: false, + translationFailures: false, + }, + { + name: "gateway failure", + gatewayFailure: true, + konnectFailure: false, + translationFailures: false, + }, + { + name: "translation failures", + gatewayFailure: false, + konnectFailure: false, + translationFailures: true, + }, + { + name: "konnect failure", + gatewayFailure: false, + konnectFailure: true, + translationFailures: false, + }, + { + name: "both gateway and konnect failure", + gatewayFailure: true, + konnectFailure: true, + translationFailures: false, + }, + { + name: "translation failures and konnect failure", + gatewayFailure: false, + konnectFailure: true, + translationFailures: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Reset the status queue. We want to make sure that the status is always notified. + statusQueue := newMockConfigStatusQueue() + kongClient.SetConfigStatusNotifier(statusQueue) + + updateStrategyResolver.returnErrorOnUpdate(testGatewayClient.BaseRootURL(), tc.gatewayFailure) + updateStrategyResolver.returnErrorOnUpdate(testKonnectClient.BaseRootURL(), tc.konnectFailure) + configBuilder.returnTranslationFailures(tc.translationFailures) + + _ = kongClient.Update(ctx) + require.True(t, statusQueue.WasNotified()) + }) + } +} + // setupTestKongClient creates a KongClient with mocked dependencies. func setupTestKongClient( t *testing.T, updateStrategyResolver *mockUpdateStrategyResolver, clientsProvider mockGatewayClientsProvider, configChangeDetector sendconfig.ConfigurationChangeDetector, + configBuilder *mockKongConfigBuilder, ) *dataplane.KongClient { logger := logrus.New() timeout := time.Second ingressClass := "kong" diagnostic := util.ConfigDumpDiagnostic{} config := sendconfig.Config{} - eventRecorder := record.NewFakeRecorder(0) dbMode := "off" kongClient, err := dataplane.NewKongClient( @@ -378,18 +508,30 @@ func setupTestKongClient( ingressClass, diagnostic, config, - eventRecorder, + newFakeEventsRecorder(), dbMode, clientsProvider, updateStrategyResolver, configChangeDetector, - noopKongConfigBuilder{}, + configBuilder, store.NewCacheStores(), ) require.NoError(t, err) return kongClient } +func newFakeEventsRecorder() *record.FakeRecorder { + eventRecorder := record.NewFakeRecorder(0) + + // Ingest events to unblock writing side. + go func() { + for range eventRecorder.Events { + } + }() + + return eventRecorder +} + func mustSampleGatewayClient(t *testing.T) *adminapi.Client { t.Helper() c, err := adminapi.NewTestClient(fmt.Sprintf("https://%s:8080", uuid.NewString())) diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index 2e2eacb756..20b3b333c5 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -61,7 +61,7 @@ type NodeAgent struct { nodeClient NodeClient refreshPeriod time.Duration - configStatus atomic.Uint32 + configStatus atomic.Value configStatusSubscriber clients.ConfigStatusSubscriber gatewayInstanceGetter GatewayInstanceGetter @@ -96,7 +96,7 @@ func NewNodeAgent( gatewayClientsChangesNotifier: gatewayClientsChangesNotifier, managerInstanceIDProvider: managerInstanceIDProvider, } - a.configStatus.Store(uint32(clients.ConfigStatusOK)) + a.configStatus.Store(clients.ConfigStatusOK) return a } @@ -145,7 +145,10 @@ func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) { a.logger.Info("subscribe loop stopped", "message", ctx.Err().Error()) return case configStatus := <-ch: - a.configStatus.Store(uint32(configStatus)) + a.configStatus.Store(configStatus) + if err := a.updateNodes(ctx); err != nil { + a.logger.Error(err, "failed to update nodes after config status changed") + } } } } @@ -195,14 +198,21 @@ func (a *NodeAgent) updateKICNode(ctx context.Context, existingNodes []*nodes.No sortNodesByLastPing(nodesWithSameName) var ingressControllerStatus nodes.IngressControllerState - configStatus := int(a.configStatus.Load()) - switch clients.ConfigStatus(configStatus) { + configStatus := a.configStatus.Load().(clients.ConfigStatus) + switch configStatus { case clients.ConfigStatusOK: ingressControllerStatus = nodes.IngressControllerStateOperational case clients.ConfigStatusTranslationErrorHappened: ingressControllerStatus = nodes.IngressControllerStatePartialConfigFail case clients.ConfigStatusApplyFailed: ingressControllerStatus = nodes.IngressControllerStateInoperable + case clients.ConfigStatusOKKonnectApplyFailed: + ingressControllerStatus = nodes.IngressControllerStateOperationalKonnectOutOfSync + case clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed: + ingressControllerStatus = nodes.IngressControllerStatePartialConfigFailKonnectOutOfSync + case clients.ConfigStatusApplyFailedKonnectApplyFailed: + ingressControllerStatus = nodes.IngressControllerStateInoperableKonnectOutOfSync + case clients.ConfigStatusUnknown: default: ingressControllerStatus = nodes.IngressControllerStateUnknown } diff --git a/internal/konnect/node_agent_test.go b/internal/konnect/node_agent_test.go index 087fd92e7a..64b7fece86 100644 --- a/internal/konnect/node_agent_test.go +++ b/internal/konnect/node_agent_test.go @@ -2,6 +2,7 @@ package konnect_test import ( "context" + "fmt" "testing" "time" @@ -381,3 +382,82 @@ func TestNodeAgent_StartDoesntReturnUntilContextGetsCancelled(t *testing.T) { case <-agentReturned: } } + +func TestNodeAgent_ControllerNodeStatusGetsUpdatedOnStatusNotification(t *testing.T) { + nodeClient := newMockNodeClient(nil) + configStatusQueue := newMockConfigStatusNotifier() + gatewayClientsChangesNotifier := newMockGatewayClientsNotifier() + + nodeAgent := konnect.NewNodeAgent( + testHostname, + testKicVersion, + konnect.DefaultRefreshNodePeriod, + logr.Discard(), + nodeClient, + configStatusQueue, + newMockGatewayInstanceGetter(nil), + gatewayClientsChangesNotifier, + newMockManagerInstanceIDProvider(uuid.New()), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + agentReturned := make(chan struct{}) + go func() { + require.NoError(t, nodeAgent.Start(ctx)) + close(agentReturned) + }() + + testCases := []struct { + notifiedConfigStatus clients.ConfigStatus + expectedControllerState nodes.IngressControllerState + }{ + { + notifiedConfigStatus: clients.ConfigStatusOK, + expectedControllerState: nodes.IngressControllerStateOperational, + }, + { + notifiedConfigStatus: clients.ConfigStatusTranslationErrorHappened, + expectedControllerState: nodes.IngressControllerStatePartialConfigFail, + }, + { + notifiedConfigStatus: clients.ConfigStatusApplyFailed, + expectedControllerState: nodes.IngressControllerStateInoperable, + }, + { + notifiedConfigStatus: clients.ConfigStatusOKKonnectApplyFailed, + expectedControllerState: nodes.IngressControllerStateOperationalKonnectOutOfSync, + }, + { + notifiedConfigStatus: clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed, + expectedControllerState: nodes.IngressControllerStatePartialConfigFailKonnectOutOfSync, + }, + { + notifiedConfigStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, + expectedControllerState: nodes.IngressControllerStateInoperableKonnectOutOfSync, + }, + } + + for _, tc := range testCases { + t.Run(fmt.Sprint(tc.notifiedConfigStatus), func(t *testing.T) { + configStatusQueue.NotifyConfigStatus(ctx, tc.notifiedConfigStatus) + + require.Eventually(t, func() bool { + controllerNode, ok := lo.Find(nodeClient.MustAllNodes(), func(n *nodes.NodeItem) bool { + return n.Type == nodes.NodeTypeIngressController + }) + if !ok { + t.Log("controller node not found") + return false + } + + if controllerNode.Status != string(tc.expectedControllerState) { + t.Logf("expected controller node status to be %q, got %q", tc.expectedControllerState, controllerNode.Status) + return false + } + + return true + }, time.Second, time.Millisecond*10) + }) + } +} diff --git a/internal/konnect/nodes/types.go b/internal/konnect/nodes/types.go index e89ddae623..15a6f229a3 100644 --- a/internal/konnect/nodes/types.go +++ b/internal/konnect/nodes/types.go @@ -50,11 +50,14 @@ type CompatibilityStatus struct { type IngressControllerState string const ( - IngressControllerStateUnspecified IngressControllerState = "INGRESS_CONTROLLER_STATE_UNSPECIFIED" - IngressControllerStateOperational IngressControllerState = "INGRESS_CONTROLLER_STATE_OPERATIONAL" - IngressControllerStatePartialConfigFail IngressControllerState = "INGRESS_CONTROLLER_STATE_PARTIAL_CONFIG_FAIL" - IngressControllerStateInoperable IngressControllerState = "INGRESS_CONTROLLER_STATE_INOPERABLE" - IngressControllerStateUnknown IngressControllerState = "INGRESS_CONTROLLER_STATE_UNKNOWN" + IngressControllerStateUnspecified IngressControllerState = "INGRESS_CONTROLLER_STATE_UNSPECIFIED" + IngressControllerStateOperational IngressControllerState = "INGRESS_CONTROLLER_STATE_OPERATIONAL" + IngressControllerStatePartialConfigFail IngressControllerState = "INGRESS_CONTROLLER_STATE_PARTIAL_CONFIG_FAIL" + IngressControllerStateInoperable IngressControllerState = "INGRESS_CONTROLLER_STATE_INOPERABLE" + IngressControllerStateOperationalKonnectOutOfSync IngressControllerState = "INGRESS_CONTROLLER_STATE_OPERATIONAL_KONNECT_OUT_OF_SYNC" + IngressControllerStatePartialConfigFailKonnectOutOfSync IngressControllerState = "INGRESS_CONTROLLER_STATE_PARTIAL_CONFIG_FAIL_KONNECT_OUT_OF_SYNC" + IngressControllerStateInoperableKonnectOutOfSync IngressControllerState = "INGRESS_CONTROLLER_STATE_INOPERABLE_KONNECT_OUT_OF_SYNC" + IngressControllerStateUnknown IngressControllerState = "INGRESS_CONTROLLER_STATE_UNKNOWN" ) type CreateNodeRequest struct { From a74a8007ecc7f106760849925c792e1b09e1e704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Thu, 18 May 2023 12:52:23 +0200 Subject: [PATCH 2/2] address review comments --- internal/dataplane/kong_client_test.go | 7 +--- internal/konnect/node_agent_test.go | 56 +++++++++++++------------- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index ce23e71641..78da91f61f 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -240,8 +240,6 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged( } func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) { - t.Parallel() - var ( ctx = context.Background() testKonnectClient = mustSampleKonnectClient(t) @@ -409,8 +407,6 @@ func (p *mockKongConfigBuilder) returnTranslationFailures(enabled bool) { } func TestKongClientUpdate_ConfigStatusIsAlwaysNotified(t *testing.T) { - t.Parallel() - var ( ctx = context.Background() testKonnectClient = mustSampleKonnectClient(t) @@ -525,7 +521,8 @@ func newFakeEventsRecorder() *record.FakeRecorder { // Ingest events to unblock writing side. go func() { - for range eventRecorder.Events { + for { + <-eventRecorder.Events } }() diff --git a/internal/konnect/node_agent_test.go b/internal/konnect/node_agent_test.go index 64b7fece86..4aad345616 100644 --- a/internal/konnect/node_agent_test.go +++ b/internal/konnect/node_agent_test.go @@ -79,7 +79,7 @@ func (m mockConfigStatusQueue) SubscribeConfigStatus() chan clients.ConfigStatus return m.ch } -func (m mockConfigStatusQueue) NotifyConfigStatus(_ context.Context, status clients.ConfigStatus) { +func (m mockConfigStatusQueue) Notify(status clients.ConfigStatus) { m.ch <- status } @@ -272,15 +272,10 @@ func TestNodeAgentUpdateNodes(t *testing.T) { newMockManagerInstanceIDProvider(testManagerID), ) - ctx, cancel := context.WithCancel(context.Background()) - agentReturned := make(chan struct{}) - go func() { - require.NoError(t, nodeAgent.Start(ctx)) - close(agentReturned) - }() + runAgent(t, nodeAgent) if tc.configStatus != nil { - configStatusQueue.NotifyConfigStatus(ctx, *tc.configStatus) + configStatusQueue.Notify(*tc.configStatus) } require.Eventually(t, func() bool { @@ -323,21 +318,11 @@ func TestNodeAgentUpdateNodes(t *testing.T) { return true }, timeout, tick) - - // Cancel the context and wait for the nodeAgent.Start() to return. - cancel() - select { - case <-time.After(timeout): - t.Fatal("expected the agent to return after the context was cancelled") - case <-agentReturned: - } }) } } func TestNodeAgent_StartDoesntReturnUntilContextGetsCancelled(t *testing.T) { - t.Parallel() - nodeClient := newMockNodeClient(nil) // Always return errors from ListNodes to ensure that the agent doesn't propagate it to the Start() caller. // ListNodes is the first call made by the agent in Start(), so we care only about this one. @@ -400,13 +385,7 @@ func TestNodeAgent_ControllerNodeStatusGetsUpdatedOnStatusNotification(t *testin newMockManagerInstanceIDProvider(uuid.New()), ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - agentReturned := make(chan struct{}) - go func() { - require.NoError(t, nodeAgent.Start(ctx)) - close(agentReturned) - }() + runAgent(t, nodeAgent) testCases := []struct { notifiedConfigStatus clients.ConfigStatus @@ -440,7 +419,7 @@ func TestNodeAgent_ControllerNodeStatusGetsUpdatedOnStatusNotification(t *testin for _, tc := range testCases { t.Run(fmt.Sprint(tc.notifiedConfigStatus), func(t *testing.T) { - configStatusQueue.NotifyConfigStatus(ctx, tc.notifiedConfigStatus) + configStatusQueue.Notify(tc.notifiedConfigStatus) require.Eventually(t, func() bool { controllerNode, ok := lo.Find(nodeClient.MustAllNodes(), func(n *nodes.NodeItem) bool { @@ -457,7 +436,30 @@ func TestNodeAgent_ControllerNodeStatusGetsUpdatedOnStatusNotification(t *testin } return true - }, time.Second, time.Millisecond*10) + }, time.Second, time.Millisecond) }) } } + +// runAgent runs the agent in a goroutine and cancels the context after the test is done, ensuring that the agent +// doesn't return prematurely. +func runAgent(t *testing.T, nodeAgent *konnect.NodeAgent) { + ctx, cancel := context.WithCancel(context.Background()) + + // To be used as a barrier to ensure that the agent returned after the context was cancelled. + agentReturned := make(chan struct{}) + go func() { + err := nodeAgent.Start(ctx) + require.NoError(t, err, "expected no error even when the context is cancelled") + close(agentReturned) + }() + + t.Cleanup(func() { + cancel() + select { + case <-time.After(time.Second): + t.Fatal("expected the agent to return after the context was cancelled") + case <-agentReturned: + } + }) +}