Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update running status of KIC node to Konnect #3533

Merged
merged 8 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ Adding a new version? You'll need three changes:
[#3521](https://github.com/Kong/kubernetes-ingress-controller/pull/3521)
- Leader election is enabled by default then kong admin service discovery is enabled.
[#3529](https://github.com/Kong/kubernetes-ingress-controller/pull/3529)
- Added flag `--konnect-refresh-node-period` to set the period of uploading
status of KIC instance to Konnect runtime group.
[#3533](https://github.com/Kong/kubernetes-ingress-controller/pull/3533)

### Fixed

Expand Down
1 change: 1 addition & 0 deletions internal/adminapi/konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type KonnectConfig struct {
ConfigSynchronizationEnabled bool
RuntimeGroupID string
Address string
RefreshNodePeriod time.Duration
TLSClient TLSClientConfig
}

Expand Down
40 changes: 40 additions & 0 deletions internal/dataplane/config_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package dataplane

// REVIEW: put the package here, or in internal/adminapi?
randmonkey marked this conversation as resolved.
Show resolved Hide resolved

type ConfigStatus int

const (
// ConfigStatusOK: no error happens in translation from
ConfigStatusOK ConfigStatus = iota
ConfigStatusTranslationErrorHappened
ConfigStatusApplyFailed
)

type ConfigStatusNotifier interface {
NotifyConfigStatus(ConfigStatus)
}

type NoOpConfigStatusNotifier struct {
}

var _ ConfigStatusNotifier = NoOpConfigStatusNotifier{}

func (n NoOpConfigStatusNotifier) NotifyConfigStatus(status ConfigStatus) {
}

type ChannelConfigNotifier struct {
ch chan ConfigStatus
}

var _ ConfigStatusNotifier = &ChannelConfigNotifier{}

func (n *ChannelConfigNotifier) NotifyConfigStatus(status ConfigStatus) {
n.ch <- status
}

func NewChannelConfigNotifier(ch chan ConfigStatus) ConfigStatusNotifier {
return &ChannelConfigNotifier{
ch: ch,
}
}
44 changes: 31 additions & 13 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type KongClient struct {

// clientsProvider allows retrieving the most recent set of clients.
clientsProvider AdminAPIClientsProvider

// configStatusNotifier notifies status of cofiguring kong gateway.
configStatusNotifier ConfigStatusNotifier
}

// NewKongClient provides a new KongClient object after connecting to the
Expand All @@ -154,18 +157,19 @@ func NewKongClient(
// build the client object
cache := store.NewCacheStores()
c := &KongClient{
logger: logger,
ingressClass: ingressClass,
enableReverseSync: enableReverseSync,
skipCACertificates: skipCACertificates,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cache,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
logger: logger,
ingressClass: ingressClass,
enableReverseSync: enableReverseSync,
skipCACertificates: skipCACertificates,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cache,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
configStatusNotifier: NoOpConfigStatusNotifier{},
}

return c, nil
Expand Down Expand Up @@ -409,19 +413,27 @@ func (c *KongClient) Update(ctx context.Context) error {
formatVersion = "3.0"
}

configStatus := new(ConfigStatus)
*configStatus = ConfigStatusOK
defer func(status *ConfigStatus) {
c.configStatusNotifier.NotifyConfigStatus(*status)
}(configStatus)
randmonkey marked this conversation as resolved.
Show resolved Hide resolved

// parse the Kubernetes objects from the storer into Kong configuration
kongstate, translationFailures := p.Build()
if failuresCount := len(translationFailures); failuresCount > 0 {
c.prometheusMetrics.RecordTranslationFailure()
c.recordResourceFailureEvents(translationFailures, KongConfigurationTranslationFailedEventReason)
c.logger.Debugf("%d translation failures have occurred when building data-plane configuration", failuresCount)
*configStatus = ConfigStatusTranslationErrorHappened
} else {
c.prometheusMetrics.RecordTranslationSuccess()
c.logger.Debug("successfully built data-plane configuration")
}

shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig)
if err != nil {
*configStatus = ConfigStatusApplyFailed
return err
}

Expand Down Expand Up @@ -520,10 +532,16 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru
}
return "", err
}

return newSHA, nil
}

func (c *KongClient) SetConfigStatusNotifier(n ConfigStatusNotifier) {
randmonkey marked this conversation as resolved.
Show resolved Hide resolved
c.lock.Lock()
defer c.lock.Unlock()

c.configStatusNotifier = n
}

// -----------------------------------------------------------------------------
// Dataplane Client - Kong - Private
// -----------------------------------------------------------------------------
Expand Down
128 changes: 104 additions & 24 deletions internal/konnect/node_agent.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
package konnect

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"

"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)

const defaultRefreshNodeInterval = 30 * time.Second
const (
MinRefreshNodePeriod = 30 * time.Second
DefaultRefreshNodePeriod = 60 * time.Second
NodeOutdateInterval = 5 * time.Minute
)

// ConfigStatusSubscriber subscribes status of configuring kong.
// REVIEW: define the subscriber here, or internal/adminapi for common usage?
type ConfigStatusSubscriber interface {
Subscribe() chan dataplane.ConfigStatus
}

type configStatusSubscriber struct {
ch chan dataplane.ConfigStatus
}

var _ ConfigStatusSubscriber = &configStatusSubscriber{}

func (s *configStatusSubscriber) Subscribe() chan dataplane.ConfigStatus {
return s.ch
}

func NewConfigStatusSubscriber(ch chan dataplane.ConfigStatus) ConfigStatusSubscriber {
return &configStatusSubscriber{ch: ch}
}
randmonkey marked this conversation as resolved.
Show resolved Hide resolved

type NodeAgent struct {
NodeID string
Expand All @@ -18,24 +44,43 @@ type NodeAgent struct {

Logger logr.Logger

konnectClient *NodeAPIClient
refreshInterval time.Duration
konnectClient *NodeAPIClient
refreshPeriod time.Duration

configStatus dataplane.ConfigStatus
configStatusSubscriber ConfigStatusSubscriber
}

func NewNodeAgent(hostname string, version string, logger logr.Logger, client *NodeAPIClient) *NodeAgent {
func NewNodeAgent(
hostname string,
version string,
refreshPeriod time.Duration,
logger logr.Logger,
client *NodeAPIClient,
configStatusSubscriber ConfigStatusSubscriber,
) *NodeAgent {
if refreshPeriod < MinRefreshNodePeriod {
refreshPeriod = MinRefreshNodePeriod
}
return &NodeAgent{
Hostname: hostname,
Version: version,
Logger: logger.
WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID),
konnectClient: client,
// TODO: set refresh interval by some flag
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
refreshInterval: defaultRefreshNodeInterval,
konnectClient: client,
refreshPeriod: refreshPeriod,
configStatus: dataplane.ConfigStatusOK,
configStatusSubscriber: configStatusSubscriber,
}
}

func (a *NodeAgent) createNode() error {
err := a.clearOutdatedNodes()
if err != nil {
// still continue to update the current status if cleanup failed.
a.Logger.Error(err, "failed to clear outdated nodes")
}

createNodeReq := &CreateNodeRequest{
ID: a.NodeID,
Hostname: a.Hostname,
Expand All @@ -45,7 +90,7 @@ func (a *NodeAgent) createNode() error {
}
resp, err := a.konnectClient.CreateNode(createNodeReq)
if err != nil {
return fmt.Errorf("failed to update node, hostname %s: %w", a.Hostname, err)
return fmt.Errorf("failed to create node, hostname %s: %w", a.Hostname, err)
}

a.NodeID = resp.Item.ID
Expand All @@ -60,7 +105,16 @@ func (a *NodeAgent) clearOutdatedNodes() error {
}

for _, node := range nodes.Items {
if node.Type == NodeTypeIngressController && node.Hostname != a.Hostname {
deleteNode := false
if node.Type == NodeTypeIngressController {
// nodes to remove:
// (1) since only one KIC node is allowed in a runtime group, all the nodes with other hostnames are considered outdated.
// (2) in some cases(kind/minikube restart), rebuilt pod uses the same name. So nodes updated for >5mins before should be deleted.
if node.Hostname != a.Hostname || time.Since(time.Unix(node.UpdatedAt, 0)) > NodeOutdateInterval {
deleteNode = true
}
}
if deleteNode {
a.Logger.V(util.DebugLevel).Info("remove outdated KIC node", "node_id", node.ID, "hostname", node.Hostname)
err := a.konnectClient.DeleteNode(node.ID)
if err != nil {
Expand All @@ -71,16 +125,36 @@ func (a *NodeAgent) clearOutdatedNodes() error {
return nil
}

func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) {
for {
select {
case <-ctx.Done():
err := ctx.Err()
a.Logger.Info("subscribe loop stopped", "message", err.Error())
return
case a.configStatus = <-a.configStatusSubscriber.Subscribe():
}
}
}

func (a *NodeAgent) updateNode() error {
err := a.clearOutdatedNodes()
if err != nil {
// still continue to update the current status if cleanup failed.
a.Logger.Error(err, "failed to clear outdated nodes")
return err
}

// TODO: retrieve the real state of KIC
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
ingressControllerStatus := IngressControllerStateOperational
var ingressControllerStatus IngressControllerState
switch a.configStatus {
case dataplane.ConfigStatusOK:
ingressControllerStatus = IngressControllerStateOperational
case dataplane.ConfigStatusTranslationErrorHappened:
ingressControllerStatus = IngressControllerStatePartialConfigFail
case dataplane.ConfigStatusApplyFailed:
ingressControllerStatus = IngressControllerStateInoperable
default:
ingressControllerStatus = IngressControllerStateUnknown
}

updateNodeReq := &UpdateNodeRequest{
Hostname: a.Hostname,
Expand All @@ -99,24 +173,30 @@ func (a *NodeAgent) updateNode() error {
return nil
}

func (a *NodeAgent) updateNodeLoop() {
ticker := time.NewTicker(a.refreshInterval)
func (a *NodeAgent) updateNodeLoop(ctx context.Context) {
ticker := time.NewTicker(a.refreshPeriod)
defer ticker.Stop()
// TODO: add some mechanism to break the loop
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
for range ticker.C {
err := a.updateNode()
if err != nil {
a.Logger.Error(err, "failed to update node", "node_id", a.NodeID)
for {
select {
case <-ctx.Done():
err := ctx.Err()
a.Logger.Info("update node loop stopped", "message", err.Error())
return
case <-ticker.C:
err := a.updateNode()
if err != nil {
a.Logger.Error(err, "failed to update node", "node_id", a.NodeID)
}
}
}
}

func (a *NodeAgent) Run() {
func (a *NodeAgent) Run(ctx context.Context) {
err := a.createNode()
if err != nil {
a.Logger.Error(err, "failed to create node, agent abort")
return
}
go a.updateNodeLoop()
go a.updateNodeLoop(ctx)
go a.subscribeConfigStatus(ctx)
}
2 changes: 2 additions & 0 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kong/kubernetes-ingress-controller/v2/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway"
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/konnect"
"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/featuregates"
)

Expand Down Expand Up @@ -238,6 +239,7 @@ func (c *Config) FlagSet() *pflag.FlagSet {
flagSet.StringVar(&c.Konnect.TLSClient.CertFile, "konnect-tls-client-cert-file", "", "Konnect TLS client certificate file path.")
flagSet.StringVar(&c.Konnect.TLSClient.Key, "konnect-tls-client-key", "", "Konnect TLS client key.")
flagSet.StringVar(&c.Konnect.TLSClient.KeyFile, "konnect-tls-client-key-file", "", "Konnect TLS client key file path.")
flagSet.DurationVar(&c.Konnect.RefreshNodePeriod, "konnect-refresh-node-period", konnect.DefaultRefreshNodePeriod, "Period of uploading status of KIC and controlled kong gateway instances")

// Deprecated flags
_ = flagSet.Float32("sync-rate-limit", dataplane.DefaultSyncSeconds, "Use --proxy-sync-seconds instead")
Expand Down