From 28f910d4fda59b41c11edbf2b8bdca2bbf43aa58 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Fri, 10 Feb 2023 16:37:04 +0800 Subject: [PATCH 1/8] feat: update status of kic to konnect --- internal/dataplane/kong_client.go | 8 ++++++++ internal/konnect/node_agent.go | 31 ++++++++++++++++++++++++++++--- internal/manager/run.go | 14 ++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 1dac5c6dc3..b29a44b70c 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -419,11 +419,19 @@ func (c *KongClient) Update(ctx context.Context) error { c.prometheusMetrics.RecordTranslationSuccess() c.logger.Debug("successfully built data-plane configuration") } + // send the status whether translation errors happened if there is a channel to receive the status. + if c.hasTranslationErrorChan != nil { + c.hasTranslationErrorChan <- (len(translationFailures) > 0) + } shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig) if err != nil { return err } + // send the error on applying kong configurations if there is a channel to receive it. + if c.sendConfigErrorChan != nil { + c.sendConfigErrorChan <- err + } // report on configured Kubernetes objects if enabled if c.AreKubernetesObjectReportsEnabled() { diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index ea69fa3b3e..9543307a76 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -20,6 +20,12 @@ type NodeAgent struct { konnectClient *NodeAPIClient refreshInterval time.Duration + + hasTranslationFailureChan chan bool + hasTranslationFailure bool + + sendConfigErrorChan chan error + sendCondifError error } func NewNodeAgent(hostname string, version string, logger logr.Logger, client *NodeAPIClient) *NodeAgent { @@ -71,6 +77,16 @@ func (a *NodeAgent) clearOutdatedNodes() error { return nil } +func (a *NodeAgent) calculateStatus() IngressControllerState { + if a.sendCondifError != nil { + return IngressControllerStateInoperable + } + if a.hasTranslationFailure { + return IngressControllerStatePartialConfigFail + } + return IngressControllerStateOperational +} + func (a *NodeAgent) updateNode() error { err := a.clearOutdatedNodes() if err != nil { @@ -78,9 +94,7 @@ func (a *NodeAgent) updateNode() error { return err } - // TODO: retrieve the real state of KIC - // https://github.com/Kong/kubernetes-ingress-controller/issues/3515 - ingressControllerStatus := IngressControllerStateOperational + ingressControllerStatus := a.calculateStatus() updateNodeReq := &UpdateNodeRequest{ Hostname: a.Hostname, @@ -112,6 +126,16 @@ func (a *NodeAgent) updateNodeLoop() { } } +// receiveStatus receives the necessary information to set the status. +func (a *NodeAgent) receiveStatus() { + for { + select { + case a.hasTranslationFailure = <-a.hasTranslationFailureChan: + case a.sendCondifError = <-a.sendConfigErrorChan: + } + } +} + func (a *NodeAgent) Run() { err := a.createNode() if err != nil { @@ -119,4 +143,5 @@ func (a *NodeAgent) Run() { return } go a.updateNodeLoop() + go a.receiveStatus() } diff --git a/internal/manager/run.go b/internal/manager/run.go index 2f3dde6e7f..4fc104c73c 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -183,6 +183,11 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d if c.Konnect.ConfigSynchronizationEnabled { // In case of failures when building Konnect related objects, we're not returning errors as Konnect is not // considered critical feature, and it should not break the basic functionality of the controller. + // set channel to send ingress controller status to dataplane client. + hasTranslationFailureChan := make(chan bool, 1) + dataplaneClient.SetHasTranslationFailureChan(hasTranslationFailureChan) + sendConfigErrorChan := make(chan error, 1) + dataplaneClient.SetSendConfigErrorChan(sendConfigErrorChan) setupLog.Info("Start Konnect client to register runtime instances to Konnect") konnectNodeAPIClient, err := konnect.NewNodeAPIClient(c.Konnect) @@ -202,6 +207,15 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("Initialized Konnect Admin API client") clientsManager.SetKonnectClient(konnectAdminAPIClient) } + hostname, _ := os.Hostname() + version := metadata.Release + agent := konnect.NewNodeAgent( + hostname, version, + hasTranslationFailureChan, + sendConfigErrorChan, + setupLog, konnectClient, + ) + agent.Run() } if c.AnonymousReports { From 49454521a7b4a69c7ff575c3dc0763fe6b851b38 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Mon, 13 Feb 2023 14:58:55 +0800 Subject: [PATCH 2/8] resolve conflicts and add flag to set period of uploading node status --- CHANGELOG.md | 3 +++ internal/adminapi/konnect.go | 1 + internal/dataplane/kong_client.go | 13 +++++++++++ internal/konnect/node_agent.go | 37 +++++++++++++++++++++---------- internal/manager/config.go | 2 ++ internal/manager/run.go | 1 + 6 files changed, 45 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cea36cc52..15e5a25dd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,9 @@ Adding a new version? You'll need three changes: [#3521](https://github.com/Kong/kubernetes-ingress-controller/pull/3521) - Leader election is enabled by default then kong admin service discovery is enabled. [#3529](https://github.com/Kong/kubernetes-ingress-controller/pull/3529) +- Added flag `--konnect-refresh-node-period` to set the period of uploading + status of KIC instance to Konnect runtime group. + [#3533](https://github.com/Kong/kubernetes-ingress-controller/pull/3533) ### Fixed diff --git a/internal/adminapi/konnect.go b/internal/adminapi/konnect.go index 720bed9356..2931bcab9a 100644 --- a/internal/adminapi/konnect.go +++ b/internal/adminapi/konnect.go @@ -17,6 +17,7 @@ type KonnectConfig struct { ConfigSynchronizationEnabled bool RuntimeGroupID string Address string + RefreshNodePeriod time.Duration TLSClient TLSClientConfig } diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index b29a44b70c..f4926bb1dc 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -135,6 +135,14 @@ type KongClient struct { // clientsProvider allows retrieving the most recent set of clients. clientsProvider AdminAPIClientsProvider + + // hasTranslationErrorChan is used to notify konnect node agent the whether + // error happened in traslating k8s objects to kong configuration. + hasTranslationErrorChan chan bool + + // sendConfigErrorChan is used to notify whether error happened in sending + // translated configurations to kong. + sendConfigErrorChan chan error } // NewKongClient provides a new KongClient object after connecting to the @@ -528,6 +536,11 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru } return "", err } +// SetHasTranslationFailureChan sets the channel to receive the status of whether +// translation failure happens. +func (c *KongClient) SetHasTranslationFailureChan(ch chan bool) { + c.hasTranslationErrorChan = ch +} return newSHA, nil } diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index 9543307a76..b934b7e9e1 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -9,7 +9,10 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) -const defaultRefreshNodeInterval = 30 * time.Second +const ( + MinRefreshNodePeriod = 30 * time.Second + DefaultRefreshNodePeriod = 60 * time.Second +) type NodeAgent struct { NodeID string @@ -18,8 +21,8 @@ type NodeAgent struct { Logger logr.Logger - konnectClient *NodeAPIClient - refreshInterval time.Duration + konnectClient *Client + refreshPeriod time.Duration hasTranslationFailureChan chan bool hasTranslationFailure bool @@ -28,16 +31,25 @@ type NodeAgent struct { sendCondifError error } -func NewNodeAgent(hostname string, version string, logger logr.Logger, client *NodeAPIClient) *NodeAgent { +func NewNodeAgent( + hostname string, + version string, + refreshPeriod time.Duration, + hasTranslationFailureChan chan bool, + sendConfigErrorChan chan error, + logger logr.Logger, + client *Client, +) *NodeAgent { + if refreshPeriod < MinRefreshNodePeriod { + refreshPeriod = MinRefreshNodePeriod + } return &NodeAgent{ Hostname: hostname, Version: version, Logger: logger. WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID), konnectClient: client, - // TODO: set refresh interval by some flag - // https://github.com/Kong/kubernetes-ingress-controller/issues/3515 - refreshInterval: defaultRefreshNodeInterval, + refreshPeriod: refreshPeriod, } } @@ -51,7 +63,7 @@ func (a *NodeAgent) createNode() error { } resp, err := a.konnectClient.CreateNode(createNodeReq) if err != nil { - return fmt.Errorf("failed to update node, hostname %s: %w", a.Hostname, err) + return fmt.Errorf("failed to create node, hostname %s: %w", a.Hostname, err) } a.NodeID = resp.Item.ID @@ -66,6 +78,10 @@ func (a *NodeAgent) clearOutdatedNodes() error { } for _, node := range nodes.Items { + // REVIEW: what should the condition be to delete the node in Konnect RG? + // (1) Do we check the "last update" of the node, and only delete it when the last update is too old(say, 5 mins ago)? + // (2) What if there is a node with the same name but not the same node exists? + // for example, When KIC runs in minikube/kind env and whole cluster is stopped then started again. if node.Type == NodeTypeIngressController && node.Hostname != a.Hostname { a.Logger.V(util.DebugLevel).Info("remove outdated KIC node", "node_id", node.ID, "hostname", node.Hostname) err := a.konnectClient.DeleteNode(node.ID) @@ -91,7 +107,6 @@ func (a *NodeAgent) updateNode() error { err := a.clearOutdatedNodes() if err != nil { a.Logger.Error(err, "failed to clear outdated nodes") - return err } ingressControllerStatus := a.calculateStatus() @@ -114,10 +129,8 @@ func (a *NodeAgent) updateNode() error { } func (a *NodeAgent) updateNodeLoop() { - ticker := time.NewTicker(a.refreshInterval) + ticker := time.NewTicker(a.refreshPeriod) defer ticker.Stop() - // TODO: add some mechanism to break the loop - // https://github.com/Kong/kubernetes-ingress-controller/issues/3515 for range ticker.C { err := a.updateNode() if err != nil { diff --git a/internal/manager/config.go b/internal/manager/config.go index 64dda71f18..65b257b5af 100644 --- a/internal/manager/config.go +++ b/internal/manager/config.go @@ -16,6 +16,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/annotations" "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" + "github.com/kong/kubernetes-ingress-controller/v2/internal/konnect" "github.com/kong/kubernetes-ingress-controller/v2/internal/manager/featuregates" ) @@ -238,6 +239,7 @@ func (c *Config) FlagSet() *pflag.FlagSet { flagSet.StringVar(&c.Konnect.TLSClient.CertFile, "konnect-tls-client-cert-file", "", "Konnect TLS client certificate file path.") flagSet.StringVar(&c.Konnect.TLSClient.Key, "konnect-tls-client-key", "", "Konnect TLS client key.") flagSet.StringVar(&c.Konnect.TLSClient.KeyFile, "konnect-tls-client-key-file", "", "Konnect TLS client key file path.") + flagSet.DurationVar(&c.Konnect.RefreshNodePeriod, "konnect-refresh-node-period", konnect.DefaultRefreshNodePeriod, "Period of uploading status of KIC and controlled kong gateway instances") // Deprecated flags _ = flagSet.Float32("sync-rate-limit", dataplane.DefaultSyncSeconds, "Use --proxy-sync-seconds instead") diff --git a/internal/manager/run.go b/internal/manager/run.go index 4fc104c73c..ae4496632f 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -211,6 +211,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d version := metadata.Release agent := konnect.NewNodeAgent( hostname, version, + c.Konnect.RefreshNodePeriod, hasTranslationFailureChan, sendConfigErrorChan, setupLog, konnectClient, From 0984bb71253814e6fe700d5c391ecb6d3b9f1346 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Mon, 13 Feb 2023 16:18:49 +0800 Subject: [PATCH 3/8] add comments --- internal/konnect/node_agent.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index b934b7e9e1..08476b242c 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -54,6 +54,7 @@ func NewNodeAgent( } func (a *NodeAgent) createNode() error { + // REVIEW: consider existing nodes in runtime group as outdated and delete them before creating? createNodeReq := &CreateNodeRequest{ ID: a.NodeID, Hostname: a.Hostname, @@ -106,6 +107,7 @@ func (a *NodeAgent) calculateStatus() IngressControllerState { func (a *NodeAgent) updateNode() error { err := a.clearOutdatedNodes() if err != nil { + // still continue to update the current status if cleanup failed. a.Logger.Error(err, "failed to clear outdated nodes") } From 7c88a66e7dbf1a0476c2c5ee6b9d5959babd6bd9 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Tue, 14 Feb 2023 15:14:32 +0800 Subject: [PATCH 4/8] move config status pub/sub to interface --- internal/dataplane/config_status.go | 40 +++++++++ internal/dataplane/kong_client.go | 58 ++++++------- internal/konnect/node_agent.go | 122 ++++++++++++++++++---------- internal/manager/run.go | 47 +++++------ 4 files changed, 169 insertions(+), 98 deletions(-) create mode 100644 internal/dataplane/config_status.go diff --git a/internal/dataplane/config_status.go b/internal/dataplane/config_status.go new file mode 100644 index 0000000000..b524c6b1d8 --- /dev/null +++ b/internal/dataplane/config_status.go @@ -0,0 +1,40 @@ +package dataplane + +// REVIEW: put the package here, or in internal/adminapi? + +type ConfigStatus int + +const ( + // ConfigStatusOK: no error happens in translation from + ConfigStatusOK ConfigStatus = iota + ConfigStatusTranslationErrorHappened + ConfigStatusApplyFailed +) + +type ConfigStatusNotifier interface { + NotifyConfigStatus(ConfigStatus) +} + +type NoOpConfigStatusNotifier struct { +} + +var _ ConfigStatusNotifier = NoOpConfigStatusNotifier{} + +func (n NoOpConfigStatusNotifier) NotifyConfigStatus(status ConfigStatus) { +} + +type ChannelConfigNotifier struct { + ch chan ConfigStatus +} + +var _ ConfigStatusNotifier = &ChannelConfigNotifier{} + +func (n *ChannelConfigNotifier) NotifyConfigStatus(status ConfigStatus) { + n.ch <- status +} + +func NewChannelConfigNotifier(ch chan ConfigStatus) ConfigStatusNotifier { + return &ChannelConfigNotifier{ + ch: ch, + } +} diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index f4926bb1dc..a2626a1f90 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -136,13 +136,8 @@ type KongClient struct { // clientsProvider allows retrieving the most recent set of clients. clientsProvider AdminAPIClientsProvider - // hasTranslationErrorChan is used to notify konnect node agent the whether - // error happened in traslating k8s objects to kong configuration. - hasTranslationErrorChan chan bool - - // sendConfigErrorChan is used to notify whether error happened in sending - // translated configurations to kong. - sendConfigErrorChan chan error + // configStatusNotifier notifies status of cofiguring kong gateway. + configStatusNotifier ConfigStatusNotifier } // NewKongClient provides a new KongClient object after connecting to the @@ -162,18 +157,19 @@ func NewKongClient( // build the client object cache := store.NewCacheStores() c := &KongClient{ - logger: logger, - ingressClass: ingressClass, - enableReverseSync: enableReverseSync, - skipCACertificates: skipCACertificates, - requestTimeout: timeout, - diagnostic: diagnostic, - prometheusMetrics: metrics.NewCtrlFuncMetrics(), - cache: &cache, - kongConfig: kongConfig, - eventRecorder: eventRecorder, - dbmode: dbMode, - clientsProvider: clientsProvider, + logger: logger, + ingressClass: ingressClass, + enableReverseSync: enableReverseSync, + skipCACertificates: skipCACertificates, + requestTimeout: timeout, + diagnostic: diagnostic, + prometheusMetrics: metrics.NewCtrlFuncMetrics(), + cache: &cache, + kongConfig: kongConfig, + eventRecorder: eventRecorder, + dbmode: dbMode, + clientsProvider: clientsProvider, + configStatusNotifier: NoOpConfigStatusNotifier{}, } return c, nil @@ -417,29 +413,26 @@ func (c *KongClient) Update(ctx context.Context) error { formatVersion = "3.0" } + configStatus := ConfigStatusOK + defer c.configStatusNotifier.NotifyConfigStatus(configStatus) + // parse the Kubernetes objects from the storer into Kong configuration kongstate, translationFailures := p.Build() if failuresCount := len(translationFailures); failuresCount > 0 { c.prometheusMetrics.RecordTranslationFailure() c.recordResourceFailureEvents(translationFailures, KongConfigurationTranslationFailedEventReason) c.logger.Debugf("%d translation failures have occurred when building data-plane configuration", failuresCount) + configStatus = ConfigStatusTranslationErrorHappened } else { c.prometheusMetrics.RecordTranslationSuccess() c.logger.Debug("successfully built data-plane configuration") } - // send the status whether translation errors happened if there is a channel to receive the status. - if c.hasTranslationErrorChan != nil { - c.hasTranslationErrorChan <- (len(translationFailures) > 0) - } shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig) if err != nil { + configStatus = ConfigStatusApplyFailed return err } - // send the error on applying kong configurations if there is a channel to receive it. - if c.sendConfigErrorChan != nil { - c.sendConfigErrorChan <- err - } // report on configured Kubernetes objects if enabled if c.AreKubernetesObjectReportsEnabled() { @@ -536,13 +529,14 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru } return "", err } -// SetHasTranslationFailureChan sets the channel to receive the status of whether -// translation failure happens. -func (c *KongClient) SetHasTranslationFailureChan(ch chan bool) { - c.hasTranslationErrorChan = ch + return newSHA, nil } - return newSHA, nil +func (c *KongClient) SetConfigStatusNotifier(n ConfigStatusNotifier) { + c.lock.Lock() + defer c.lock.Unlock() + + c.configStatusNotifier = n } // ----------------------------------------------------------------------------- diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index 08476b242c..aeb61f4307 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -1,19 +1,41 @@ package konnect import ( + "context" "fmt" "time" "github.com/go-logr/logr" + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) const ( MinRefreshNodePeriod = 30 * time.Second DefaultRefreshNodePeriod = 60 * time.Second + NodeOutdateInterval = 5 * time.Minute ) +// REVIEW: define the subscriber here, or internal/adminapi for common usage? +type ConfigStatusSubscriber interface { + Subscribe() chan dataplane.ConfigStatus +} + +type configStatusSubscriber struct { + ch chan dataplane.ConfigStatus +} + +var _ ConfigStatusSubscriber = &configStatusSubscriber{} + +func (s *configStatusSubscriber) Subscribe() chan dataplane.ConfigStatus { + return s.ch +} + +func NewConfigStatusSubscriber(ch chan dataplane.ConfigStatus) *configStatusSubscriber { + return &configStatusSubscriber{ch: ch} +} + type NodeAgent struct { NodeID string Hostname string @@ -21,24 +43,20 @@ type NodeAgent struct { Logger logr.Logger - konnectClient *Client + konnectClient *NodeAPIClient refreshPeriod time.Duration - hasTranslationFailureChan chan bool - hasTranslationFailure bool - - sendConfigErrorChan chan error - sendCondifError error + configStatus dataplane.ConfigStatus + configStatusSubscriber ConfigStatusSubscriber } func NewNodeAgent( hostname string, version string, refreshPeriod time.Duration, - hasTranslationFailureChan chan bool, - sendConfigErrorChan chan error, logger logr.Logger, - client *Client, + client *NodeAPIClient, + configStatusSubscriber ConfigStatusSubscriber, ) *NodeAgent { if refreshPeriod < MinRefreshNodePeriod { refreshPeriod = MinRefreshNodePeriod @@ -48,13 +66,21 @@ func NewNodeAgent( Version: version, Logger: logger. WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID), - konnectClient: client, - refreshPeriod: refreshPeriod, + konnectClient: client, + refreshPeriod: refreshPeriod, + configStatus: dataplane.ConfigStatusOK, + configStatusSubscriber: configStatusSubscriber, } } func (a *NodeAgent) createNode() error { - // REVIEW: consider existing nodes in runtime group as outdated and delete them before creating? + + err := a.clearOutdatedNodes() + if err != nil { + // still continue to update the current status if cleanup failed. + a.Logger.Error(err, "failed to clear outdated nodes") + } + createNodeReq := &CreateNodeRequest{ ID: a.NodeID, Hostname: a.Hostname, @@ -79,11 +105,16 @@ func (a *NodeAgent) clearOutdatedNodes() error { } for _, node := range nodes.Items { - // REVIEW: what should the condition be to delete the node in Konnect RG? - // (1) Do we check the "last update" of the node, and only delete it when the last update is too old(say, 5 mins ago)? - // (2) What if there is a node with the same name but not the same node exists? - // for example, When KIC runs in minikube/kind env and whole cluster is stopped then started again. - if node.Type == NodeTypeIngressController && node.Hostname != a.Hostname { + deleteNode := false + if node.Type == NodeTypeIngressController { + // nodes to remove: + // (1) since only one KIC node is allowed in a runtime group, all the nodes with other hostnames are considered outdated. + // (2) in some cases(kind/minikube restart), rebuilt pod uses the same name. So nodes updated for >5mins before should be deleted. + if node.Hostname != a.Hostname || time.Now().Sub(time.Unix(node.UpdatedAt, 0)) > NodeOutdateInterval { + deleteNode = true + } + } + if deleteNode { a.Logger.V(util.DebugLevel).Info("remove outdated KIC node", "node_id", node.ID, "hostname", node.Hostname) err := a.konnectClient.DeleteNode(node.ID) if err != nil { @@ -94,14 +125,16 @@ func (a *NodeAgent) clearOutdatedNodes() error { return nil } -func (a *NodeAgent) calculateStatus() IngressControllerState { - if a.sendCondifError != nil { - return IngressControllerStateInoperable - } - if a.hasTranslationFailure { - return IngressControllerStatePartialConfigFail +func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) { + for { + select { + case <-ctx.Done(): + err := ctx.Err() + a.Logger.Info("subscribe loop stopped", "message", err.Error()) + return + case a.configStatus = <-a.configStatusSubscriber.Subscribe(): + } } - return IngressControllerStateOperational } func (a *NodeAgent) updateNode() error { @@ -111,7 +144,17 @@ func (a *NodeAgent) updateNode() error { a.Logger.Error(err, "failed to clear outdated nodes") } - ingressControllerStatus := a.calculateStatus() + var ingressControllerStatus IngressControllerState + switch a.configStatus { + case dataplane.ConfigStatusOK: + ingressControllerStatus = IngressControllerStateOperational + case dataplane.ConfigStatusTranslationErrorHappened: + ingressControllerStatus = IngressControllerStatePartialConfigFail + case dataplane.ConfigStatusApplyFailed: + ingressControllerStatus = IngressControllerStateInoperable + default: + ingressControllerStatus = IngressControllerStateUnknown + } updateNodeReq := &UpdateNodeRequest{ Hostname: a.Hostname, @@ -130,33 +173,30 @@ func (a *NodeAgent) updateNode() error { return nil } -func (a *NodeAgent) updateNodeLoop() { +func (a *NodeAgent) updateNodeLoop(ctx context.Context) { ticker := time.NewTicker(a.refreshPeriod) defer ticker.Stop() - for range ticker.C { - err := a.updateNode() - if err != nil { - a.Logger.Error(err, "failed to update node", "node_id", a.NodeID) - } - } -} - -// receiveStatus receives the necessary information to set the status. -func (a *NodeAgent) receiveStatus() { for { select { - case a.hasTranslationFailure = <-a.hasTranslationFailureChan: - case a.sendCondifError = <-a.sendConfigErrorChan: + case <-ctx.Done(): + err := ctx.Err() + a.Logger.Info("update node loop stopped", "message", err.Error()) + return + case <-ticker.C: + err := a.updateNode() + if err != nil { + a.Logger.Error(err, "failed to update node", "node_id", a.NodeID) + } } } } -func (a *NodeAgent) Run() { +func (a *NodeAgent) Run(ctx context.Context) { err := a.createNode() if err != nil { a.Logger.Error(err, "failed to create node, agent abort") return } - go a.updateNodeLoop() - go a.receiveStatus() + go a.updateNodeLoop(ctx) + go a.subscribeConfigStatus(ctx) } diff --git a/internal/manager/run.go b/internal/manager/run.go index ae4496632f..f9109abf30 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -183,11 +183,14 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d if c.Konnect.ConfigSynchronizationEnabled { // In case of failures when building Konnect related objects, we're not returning errors as Konnect is not // considered critical feature, and it should not break the basic functionality of the controller. - // set channel to send ingress controller status to dataplane client. - hasTranslationFailureChan := make(chan bool, 1) - dataplaneClient.SetHasTranslationFailureChan(hasTranslationFailureChan) - sendConfigErrorChan := make(chan error, 1) - dataplaneClient.SetSendConfigErrorChan(sendConfigErrorChan) + + konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectRuntimeGroup(ctx, c.Konnect) + if err != nil { + setupLog.Error(err, "failed creating Konnect Runtime Group Admin API client, skipping synchronisation") + } else { + setupLog.Info("Initialized Konnect Admin API client") + clientsManager.SetKonnectClient(konnectAdminAPIClient) + } setupLog.Info("Start Konnect client to register runtime instances to Konnect") konnectNodeAPIClient, err := konnect.NewNodeAPIClient(c.Konnect) @@ -196,27 +199,21 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d } else { hostname, _ := os.Hostname() version := metadata.Release - agent := konnect.NewNodeAgent(hostname, version, setupLog, konnectNodeAPIClient) - agent.Run() - } - - konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectRuntimeGroup(ctx, c.Konnect) - if err != nil { - setupLog.Error(err, "failed creating Konnect Runtime Group Admin API client, skipping synchronisation") - } else { - setupLog.Info("Initialized Konnect Admin API client") - clientsManager.SetKonnectClient(konnectAdminAPIClient) + // set channel to send config status. + configStatusChan := make(chan dataplane.ConfigStatus, 1) + dataplaneClient.SetConfigStatusNotifier( + dataplane.NewChannelConfigNotifier(configStatusChan), + ) + configStatusSubscriber := konnect.NewConfigStatusSubscriber(configStatusChan) + agent := konnect.NewNodeAgent( + hostname, + version, + c.Konnect.RefreshNodePeriod, + setupLog, + konnectNodeAPIClient, + configStatusSubscriber) + agent.Run(ctx) } - hostname, _ := os.Hostname() - version := metadata.Release - agent := konnect.NewNodeAgent( - hostname, version, - c.Konnect.RefreshNodePeriod, - hasTranslationFailureChan, - sendConfigErrorChan, - setupLog, konnectClient, - ) - agent.Run() } if c.AnonymousReports { From 3a476b70f69bbf043518ec714963764dd10cad29 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Tue, 14 Feb 2023 15:38:21 +0800 Subject: [PATCH 5/8] fix notify status --- internal/dataplane/kong_client.go | 11 +++++++---- internal/konnect/node_agent.go | 6 +++--- internal/manager/run.go | 3 ++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index a2626a1f90..7554bf99c8 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -413,8 +413,11 @@ func (c *KongClient) Update(ctx context.Context) error { formatVersion = "3.0" } - configStatus := ConfigStatusOK - defer c.configStatusNotifier.NotifyConfigStatus(configStatus) + configStatus := new(ConfigStatus) + *configStatus = ConfigStatusOK + defer func(status *ConfigStatus) { + c.configStatusNotifier.NotifyConfigStatus(*status) + }(configStatus) // parse the Kubernetes objects from the storer into Kong configuration kongstate, translationFailures := p.Build() @@ -422,7 +425,7 @@ func (c *KongClient) Update(ctx context.Context) error { c.prometheusMetrics.RecordTranslationFailure() c.recordResourceFailureEvents(translationFailures, KongConfigurationTranslationFailedEventReason) c.logger.Debugf("%d translation failures have occurred when building data-plane configuration", failuresCount) - configStatus = ConfigStatusTranslationErrorHappened + *configStatus = ConfigStatusTranslationErrorHappened } else { c.prometheusMetrics.RecordTranslationSuccess() c.logger.Debug("successfully built data-plane configuration") @@ -430,7 +433,7 @@ func (c *KongClient) Update(ctx context.Context) error { shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig) if err != nil { - configStatus = ConfigStatusApplyFailed + *configStatus = ConfigStatusApplyFailed return err } diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index aeb61f4307..f6bde9f678 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -17,6 +17,7 @@ const ( NodeOutdateInterval = 5 * time.Minute ) +// ConfigStatusSubscriber subscribes status of configuring kong. // REVIEW: define the subscriber here, or internal/adminapi for common usage? type ConfigStatusSubscriber interface { Subscribe() chan dataplane.ConfigStatus @@ -32,7 +33,7 @@ func (s *configStatusSubscriber) Subscribe() chan dataplane.ConfigStatus { return s.ch } -func NewConfigStatusSubscriber(ch chan dataplane.ConfigStatus) *configStatusSubscriber { +func NewConfigStatusSubscriber(ch chan dataplane.ConfigStatus) ConfigStatusSubscriber { return &configStatusSubscriber{ch: ch} } @@ -74,7 +75,6 @@ func NewNodeAgent( } func (a *NodeAgent) createNode() error { - err := a.clearOutdatedNodes() if err != nil { // still continue to update the current status if cleanup failed. @@ -110,7 +110,7 @@ func (a *NodeAgent) clearOutdatedNodes() error { // nodes to remove: // (1) since only one KIC node is allowed in a runtime group, all the nodes with other hostnames are considered outdated. // (2) in some cases(kind/minikube restart), rebuilt pod uses the same name. So nodes updated for >5mins before should be deleted. - if node.Hostname != a.Hostname || time.Now().Sub(time.Unix(node.UpdatedAt, 0)) > NodeOutdateInterval { + if node.Hostname != a.Hostname || time.Since(time.Unix(node.UpdatedAt, 0)) > NodeOutdateInterval { deleteNode = true } } diff --git a/internal/manager/run.go b/internal/manager/run.go index f9109abf30..8229bd02d4 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -211,7 +211,8 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d c.Konnect.RefreshNodePeriod, setupLog, konnectNodeAPIClient, - configStatusSubscriber) + configStatusSubscriber, + ) agent.Run(ctx) } } From f8bc28f299054add2001579c57464c5079d99d96 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Tue, 14 Feb 2023 17:45:04 +0800 Subject: [PATCH 6/8] move subscribe config status interface to dataplane --- internal/dataplane/config_status.go | 19 ++++++++++++++----- internal/konnect/node_agent.go | 26 +++----------------------- internal/manager/run.go | 9 ++++----- 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/internal/dataplane/config_status.go b/internal/dataplane/config_status.go index b524c6b1d8..9168456791 100644 --- a/internal/dataplane/config_status.go +++ b/internal/dataplane/config_status.go @@ -1,13 +1,15 @@ package dataplane -// REVIEW: put the package here, or in internal/adminapi? - type ConfigStatus int const ( - // ConfigStatusOK: no error happens in translation from + // ConfigStatusOK: no error happens in translation from k8s objects to kong configuration + // and succeeded to apply kong configuration to kong gateway. ConfigStatusOK ConfigStatus = iota + // ConfigStatusTranslationErrorHappened: error happened in translation of k8s objects + // but succeeded to apply kong configuration for remaining objects. ConfigStatusTranslationErrorHappened + // ConfigStatusApplyFailed: failed to apply kong configurations. ConfigStatusApplyFailed ) @@ -15,9 +17,12 @@ type ConfigStatusNotifier interface { NotifyConfigStatus(ConfigStatus) } -type NoOpConfigStatusNotifier struct { +type ConfigStatusSubscriber interface { + SubscribeConfigStatus() chan ConfigStatus } +type NoOpConfigStatusNotifier struct{} + var _ ConfigStatusNotifier = NoOpConfigStatusNotifier{} func (n NoOpConfigStatusNotifier) NotifyConfigStatus(status ConfigStatus) { @@ -33,7 +38,11 @@ func (n *ChannelConfigNotifier) NotifyConfigStatus(status ConfigStatus) { n.ch <- status } -func NewChannelConfigNotifier(ch chan ConfigStatus) ConfigStatusNotifier { +func (n *ChannelConfigNotifier) SubscribeConfigStatus() chan ConfigStatus { + return n.ch +} + +func NewChannelConfigNotifier(ch chan ConfigStatus) *ChannelConfigNotifier { return &ChannelConfigNotifier{ ch: ch, } diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index f6bde9f678..aa306d9949 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -17,26 +17,6 @@ const ( NodeOutdateInterval = 5 * time.Minute ) -// ConfigStatusSubscriber subscribes status of configuring kong. -// REVIEW: define the subscriber here, or internal/adminapi for common usage? -type ConfigStatusSubscriber interface { - Subscribe() chan dataplane.ConfigStatus -} - -type configStatusSubscriber struct { - ch chan dataplane.ConfigStatus -} - -var _ ConfigStatusSubscriber = &configStatusSubscriber{} - -func (s *configStatusSubscriber) Subscribe() chan dataplane.ConfigStatus { - return s.ch -} - -func NewConfigStatusSubscriber(ch chan dataplane.ConfigStatus) ConfigStatusSubscriber { - return &configStatusSubscriber{ch: ch} -} - type NodeAgent struct { NodeID string Hostname string @@ -48,7 +28,7 @@ type NodeAgent struct { refreshPeriod time.Duration configStatus dataplane.ConfigStatus - configStatusSubscriber ConfigStatusSubscriber + configStatusSubscriber dataplane.ConfigStatusSubscriber } func NewNodeAgent( @@ -57,7 +37,7 @@ func NewNodeAgent( refreshPeriod time.Duration, logger logr.Logger, client *NodeAPIClient, - configStatusSubscriber ConfigStatusSubscriber, + configStatusSubscriber dataplane.ConfigStatusSubscriber, ) *NodeAgent { if refreshPeriod < MinRefreshNodePeriod { refreshPeriod = MinRefreshNodePeriod @@ -132,7 +112,7 @@ func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) { err := ctx.Err() a.Logger.Info("subscribe loop stopped", "message", err.Error()) return - case a.configStatus = <-a.configStatusSubscriber.Subscribe(): + case a.configStatus = <-a.configStatusSubscriber.SubscribeConfigStatus(): } } } diff --git a/internal/manager/run.go b/internal/manager/run.go index 8229bd02d4..613d1fe836 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -201,17 +201,16 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d version := metadata.Release // set channel to send config status. configStatusChan := make(chan dataplane.ConfigStatus, 1) - dataplaneClient.SetConfigStatusNotifier( - dataplane.NewChannelConfigNotifier(configStatusChan), - ) - configStatusSubscriber := konnect.NewConfigStatusSubscriber(configStatusChan) + configStatusNotifier := dataplane.NewChannelConfigNotifier(configStatusChan) + dataplaneClient.SetConfigStatusNotifier(configStatusNotifier) + agent := konnect.NewNodeAgent( hostname, version, c.Konnect.RefreshNodePeriod, setupLog, konnectNodeAPIClient, - configStatusSubscriber, + configStatusNotifier, ) agent.Run(ctx) } From 2356d5a1ead36c039b3284e54a56acd6a863faf1 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Tue, 14 Feb 2023 18:30:11 +0800 Subject: [PATCH 7/8] address comments Again --- internal/dataplane/kong_client.go | 20 ++++++++++++-------- internal/konnect/node_agent.go | 14 +++++++++----- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 7554bf99c8..2cbe108a0b 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -413,19 +413,12 @@ func (c *KongClient) Update(ctx context.Context) error { formatVersion = "3.0" } - configStatus := new(ConfigStatus) - *configStatus = ConfigStatusOK - defer func(status *ConfigStatus) { - c.configStatusNotifier.NotifyConfigStatus(*status) - }(configStatus) - // parse the Kubernetes objects from the storer into Kong configuration kongstate, translationFailures := p.Build() if failuresCount := len(translationFailures); failuresCount > 0 { c.prometheusMetrics.RecordTranslationFailure() c.recordResourceFailureEvents(translationFailures, KongConfigurationTranslationFailedEventReason) c.logger.Debugf("%d translation failures have occurred when building data-plane configuration", failuresCount) - *configStatus = ConfigStatusTranslationErrorHappened } else { c.prometheusMetrics.RecordTranslationSuccess() c.logger.Debug("successfully built data-plane configuration") @@ -433,10 +426,19 @@ func (c *KongClient) Update(ctx context.Context) error { shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig) if err != nil { - *configStatus = ConfigStatusApplyFailed + c.configStatusNotifier.NotifyConfigStatus(ConfigStatusApplyFailed) return err } + // succeeded to apply configuration to Kong gateway. + // notify the receiver of config status that translation error happened when there are translation errors, + // otherwise notify that config status is OK. + if len(translationFailures) > 0 { + c.configStatusNotifier.NotifyConfigStatus(ConfigStatusTranslationErrorHappened) + } else { + c.configStatusNotifier.NotifyConfigStatus(ConfigStatusOK) + } + // report on configured Kubernetes objects if enabled if c.AreKubernetesObjectReportsEnabled() { // if the configuration SHAs that have just been pushed are different than @@ -535,6 +537,8 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru return newSHA, nil } +// SetConfigStatusNotifier sets a notifier notifies configurations to subscribers +// Currently it is used for uploading the node status to konnect runtime group. func (c *KongClient) SetConfigStatusNotifier(n ConfigStatusNotifier) { c.lock.Lock() defer c.lock.Unlock() diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index aa306d9949..6898f3a401 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -3,6 +3,7 @@ package konnect import ( "context" "fmt" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -27,7 +28,7 @@ type NodeAgent struct { konnectClient *NodeAPIClient refreshPeriod time.Duration - configStatus dataplane.ConfigStatus + configStatus atomic.Uint32 configStatusSubscriber dataplane.ConfigStatusSubscriber } @@ -42,16 +43,17 @@ func NewNodeAgent( if refreshPeriod < MinRefreshNodePeriod { refreshPeriod = MinRefreshNodePeriod } - return &NodeAgent{ + a := &NodeAgent{ Hostname: hostname, Version: version, Logger: logger. WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID), konnectClient: client, refreshPeriod: refreshPeriod, - configStatus: dataplane.ConfigStatusOK, configStatusSubscriber: configStatusSubscriber, } + a.configStatus.Store(uint32(dataplane.ConfigStatusOK)) + return a } func (a *NodeAgent) createNode() error { @@ -112,7 +114,8 @@ func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) { err := ctx.Err() a.Logger.Info("subscribe loop stopped", "message", err.Error()) return - case a.configStatus = <-a.configStatusSubscriber.SubscribeConfigStatus(): + case configStatus := <-a.configStatusSubscriber.SubscribeConfigStatus(): + a.configStatus.Store(uint32(configStatus)) } } } @@ -125,7 +128,8 @@ func (a *NodeAgent) updateNode() error { } var ingressControllerStatus IngressControllerState - switch a.configStatus { + configStatus := int(a.configStatus.Load()) + switch dataplane.ConfigStatus(configStatus) { case dataplane.ConfigStatusOK: ingressControllerStatus = IngressControllerStateOperational case dataplane.ConfigStatusTranslationErrorHappened: From bfde4d34cf3082245d7317a822614bc11f5a3fd2 Mon Sep 17 00:00:00 2001 From: Yi Tao Date: Tue, 14 Feb 2023 21:46:53 +0800 Subject: [PATCH 8/8] address comments AGain --- internal/dataplane/config_status.go | 4 ++-- internal/dataplane/kong_client.go | 2 +- internal/konnect/node_agent.go | 10 ++++++---- internal/manager/run.go | 3 +-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/dataplane/config_status.go b/internal/dataplane/config_status.go index 9168456791..7ab50b1719 100644 --- a/internal/dataplane/config_status.go +++ b/internal/dataplane/config_status.go @@ -42,8 +42,8 @@ func (n *ChannelConfigNotifier) SubscribeConfigStatus() chan ConfigStatus { return n.ch } -func NewChannelConfigNotifier(ch chan ConfigStatus) *ChannelConfigNotifier { +func NewChannelConfigNotifier() *ChannelConfigNotifier { return &ChannelConfigNotifier{ - ch: ch, + ch: make(chan ConfigStatus, 1), } } diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 2cbe108a0b..6fcbf9e017 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -537,7 +537,7 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru return newSHA, nil } -// SetConfigStatusNotifier sets a notifier notifies configurations to subscribers +// SetConfigStatusNotifier sets a notifier which notifies subscribers about configuration sending results. // Currently it is used for uploading the node status to konnect runtime group. func (c *KongClient) SetConfigStatusNotifier(n ConfigStatusNotifier) { c.lock.Lock() diff --git a/internal/konnect/node_agent.go b/internal/konnect/node_agent.go index 6898f3a401..4318927f20 100644 --- a/internal/konnect/node_agent.go +++ b/internal/konnect/node_agent.go @@ -108,13 +108,15 @@ func (a *NodeAgent) clearOutdatedNodes() error { } func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) { + ch := a.configStatusSubscriber.SubscribeConfigStatus() + chDone := ctx.Done() + for { select { - case <-ctx.Done(): - err := ctx.Err() - a.Logger.Info("subscribe loop stopped", "message", err.Error()) + case <-chDone: + a.Logger.Info("subscribe loop stopped", "message", ctx.Err().Error()) return - case configStatus := <-a.configStatusSubscriber.SubscribeConfigStatus(): + case configStatus := <-ch: a.configStatus.Store(uint32(configStatus)) } } diff --git a/internal/manager/run.go b/internal/manager/run.go index 613d1fe836..7147599239 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -200,8 +200,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d hostname, _ := os.Hostname() version := metadata.Release // set channel to send config status. - configStatusChan := make(chan dataplane.ConfigStatus, 1) - configStatusNotifier := dataplane.NewChannelConfigNotifier(configStatusChan) + configStatusNotifier := dataplane.NewChannelConfigNotifier() dataplaneClient.SetConfigStatusNotifier(configStatusNotifier) agent := konnect.NewNodeAgent(