Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update running status of KIC node to Konnect #3533

Merged
merged 8 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ Adding a new version? You'll need three changes:
[#3521](https://github.com/Kong/kubernetes-ingress-controller/pull/3521)
- Leader election is enabled by default then kong admin service discovery is enabled.
[#3529](https://github.com/Kong/kubernetes-ingress-controller/pull/3529)
- Added flag `--konnect-refresh-node-period` to set the period of uploading
status of KIC instance to Konnect runtime group.
[#3533](https://github.com/Kong/kubernetes-ingress-controller/pull/3533)

### Fixed

Expand Down
1 change: 1 addition & 0 deletions internal/adminapi/konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type KonnectConfig struct {
ConfigSynchronizationEnabled bool
RuntimeGroupID string
Address string
RefreshNodePeriod time.Duration
TLSClient TLSClientConfig
}

Expand Down
49 changes: 49 additions & 0 deletions internal/dataplane/config_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dataplane

type ConfigStatus int

const (
// ConfigStatusOK: no error happens in translation from k8s objects to kong configuration
// and succeeded to apply kong configuration to kong gateway.
ConfigStatusOK ConfigStatus = iota
// ConfigStatusTranslationErrorHappened: error happened in translation of k8s objects
// but succeeded to apply kong configuration for remaining objects.
ConfigStatusTranslationErrorHappened
// ConfigStatusApplyFailed: failed to apply kong configurations.
ConfigStatusApplyFailed
)

type ConfigStatusNotifier interface {
NotifyConfigStatus(ConfigStatus)
}

type ConfigStatusSubscriber interface {
SubscribeConfigStatus() chan ConfigStatus
}

type NoOpConfigStatusNotifier struct{}

var _ ConfigStatusNotifier = NoOpConfigStatusNotifier{}

func (n NoOpConfigStatusNotifier) NotifyConfigStatus(status ConfigStatus) {
}

type ChannelConfigNotifier struct {
ch chan ConfigStatus
}

var _ ConfigStatusNotifier = &ChannelConfigNotifier{}

func (n *ChannelConfigNotifier) NotifyConfigStatus(status ConfigStatus) {
n.ch <- status
}

func (n *ChannelConfigNotifier) SubscribeConfigStatus() chan ConfigStatus {
return n.ch
}

func NewChannelConfigNotifier(ch chan ConfigStatus) *ChannelConfigNotifier {
return &ChannelConfigNotifier{
ch: ch,
}
}
48 changes: 35 additions & 13 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type KongClient struct {

// clientsProvider allows retrieving the most recent set of clients.
clientsProvider AdminAPIClientsProvider

// configStatusNotifier notifies status of cofiguring kong gateway.
configStatusNotifier ConfigStatusNotifier
}

// NewKongClient provides a new KongClient object after connecting to the
Expand All @@ -154,18 +157,19 @@ func NewKongClient(
// build the client object
cache := store.NewCacheStores()
c := &KongClient{
logger: logger,
ingressClass: ingressClass,
enableReverseSync: enableReverseSync,
skipCACertificates: skipCACertificates,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cache,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
logger: logger,
ingressClass: ingressClass,
enableReverseSync: enableReverseSync,
skipCACertificates: skipCACertificates,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cache,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
configStatusNotifier: NoOpConfigStatusNotifier{},
}

return c, nil
Expand Down Expand Up @@ -422,9 +426,19 @@ func (c *KongClient) Update(ctx context.Context) error {

shas, err := c.sendOutToClients(ctx, kongstate, formatVersion, c.kongConfig)
if err != nil {
c.configStatusNotifier.NotifyConfigStatus(ConfigStatusApplyFailed)
return err
}

// succeeded to apply configuration to Kong gateway.
// notify the receiver of config status that translation error happened when there are translation errors,
// otherwise notify that config status is OK.
if len(translationFailures) > 0 {
c.configStatusNotifier.NotifyConfigStatus(ConfigStatusTranslationErrorHappened)
} else {
c.configStatusNotifier.NotifyConfigStatus(ConfigStatusOK)
}

// report on configured Kubernetes objects if enabled
if c.AreKubernetesObjectReportsEnabled() {
// if the configuration SHAs that have just been pushed are different than
Expand Down Expand Up @@ -520,10 +534,18 @@ func handleSendToClientResult(client sendconfig.KonnectAwareClient, logger logru
}
return "", err
}

return newSHA, nil
}

// SetConfigStatusNotifier sets a notifier notifies configurations to subscribers
// Currently it is used for uploading the node status to konnect runtime group.
randmonkey marked this conversation as resolved.
Show resolved Hide resolved
func (c *KongClient) SetConfigStatusNotifier(n ConfigStatusNotifier) {
randmonkey marked this conversation as resolved.
Show resolved Hide resolved
c.lock.Lock()
defer c.lock.Unlock()

c.configStatusNotifier = n
}

// -----------------------------------------------------------------------------
// Dataplane Client - Kong - Private
// -----------------------------------------------------------------------------
Expand Down
114 changes: 89 additions & 25 deletions internal/konnect/node_agent.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package konnect

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/go-logr/logr"

"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)

const defaultRefreshNodeInterval = 30 * time.Second
const (
MinRefreshNodePeriod = 30 * time.Second
DefaultRefreshNodePeriod = 60 * time.Second
NodeOutdateInterval = 5 * time.Minute
)

type NodeAgent struct {
NodeID string
Expand All @@ -18,24 +25,44 @@ type NodeAgent struct {

Logger logr.Logger

konnectClient *NodeAPIClient
refreshInterval time.Duration
konnectClient *NodeAPIClient
refreshPeriod time.Duration

configStatus atomic.Uint32
configStatusSubscriber dataplane.ConfigStatusSubscriber
}

func NewNodeAgent(hostname string, version string, logger logr.Logger, client *NodeAPIClient) *NodeAgent {
return &NodeAgent{
func NewNodeAgent(
hostname string,
version string,
refreshPeriod time.Duration,
logger logr.Logger,
client *NodeAPIClient,
configStatusSubscriber dataplane.ConfigStatusSubscriber,
) *NodeAgent {
if refreshPeriod < MinRefreshNodePeriod {
refreshPeriod = MinRefreshNodePeriod
}
a := &NodeAgent{
Hostname: hostname,
Version: version,
Logger: logger.
WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID),
konnectClient: client,
// TODO: set refresh interval by some flag
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
refreshInterval: defaultRefreshNodeInterval,
konnectClient: client,
refreshPeriod: refreshPeriod,
configStatusSubscriber: configStatusSubscriber,
}
a.configStatus.Store(uint32(dataplane.ConfigStatusOK))
return a
}

func (a *NodeAgent) createNode() error {
err := a.clearOutdatedNodes()
if err != nil {
// still continue to update the current status if cleanup failed.
a.Logger.Error(err, "failed to clear outdated nodes")
}

createNodeReq := &CreateNodeRequest{
ID: a.NodeID,
Hostname: a.Hostname,
Expand All @@ -45,7 +72,7 @@ func (a *NodeAgent) createNode() error {
}
resp, err := a.konnectClient.CreateNode(createNodeReq)
if err != nil {
return fmt.Errorf("failed to update node, hostname %s: %w", a.Hostname, err)
return fmt.Errorf("failed to create node, hostname %s: %w", a.Hostname, err)
}

a.NodeID = resp.Item.ID
Expand All @@ -60,7 +87,16 @@ func (a *NodeAgent) clearOutdatedNodes() error {
}

for _, node := range nodes.Items {
if node.Type == NodeTypeIngressController && node.Hostname != a.Hostname {
deleteNode := false
if node.Type == NodeTypeIngressController {
// nodes to remove:
// (1) since only one KIC node is allowed in a runtime group, all the nodes with other hostnames are considered outdated.
// (2) in some cases(kind/minikube restart), rebuilt pod uses the same name. So nodes updated for >5mins before should be deleted.
if node.Hostname != a.Hostname || time.Since(time.Unix(node.UpdatedAt, 0)) > NodeOutdateInterval {
deleteNode = true
}
}
if deleteNode {
a.Logger.V(util.DebugLevel).Info("remove outdated KIC node", "node_id", node.ID, "hostname", node.Hostname)
err := a.konnectClient.DeleteNode(node.ID)
if err != nil {
Expand All @@ -71,16 +107,38 @@ func (a *NodeAgent) clearOutdatedNodes() error {
return nil
}

func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) {
for {
select {
case <-ctx.Done():
err := ctx.Err()
a.Logger.Info("subscribe loop stopped", "message", err.Error())
return
case configStatus := <-a.configStatusSubscriber.SubscribeConfigStatus():
a.configStatus.Store(uint32(configStatus))
}
}
randmonkey marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *NodeAgent) updateNode() error {
err := a.clearOutdatedNodes()
if err != nil {
// still continue to update the current status if cleanup failed.
a.Logger.Error(err, "failed to clear outdated nodes")
return err
}

// TODO: retrieve the real state of KIC
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
ingressControllerStatus := IngressControllerStateOperational
var ingressControllerStatus IngressControllerState
configStatus := int(a.configStatus.Load())
switch dataplane.ConfigStatus(configStatus) {
case dataplane.ConfigStatusOK:
ingressControllerStatus = IngressControllerStateOperational
case dataplane.ConfigStatusTranslationErrorHappened:
ingressControllerStatus = IngressControllerStatePartialConfigFail
case dataplane.ConfigStatusApplyFailed:
ingressControllerStatus = IngressControllerStateInoperable
default:
ingressControllerStatus = IngressControllerStateUnknown
}

updateNodeReq := &UpdateNodeRequest{
Hostname: a.Hostname,
Expand All @@ -99,24 +157,30 @@ func (a *NodeAgent) updateNode() error {
return nil
}

func (a *NodeAgent) updateNodeLoop() {
ticker := time.NewTicker(a.refreshInterval)
func (a *NodeAgent) updateNodeLoop(ctx context.Context) {
ticker := time.NewTicker(a.refreshPeriod)
defer ticker.Stop()
// TODO: add some mechanism to break the loop
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
for range ticker.C {
err := a.updateNode()
if err != nil {
a.Logger.Error(err, "failed to update node", "node_id", a.NodeID)
for {
select {
case <-ctx.Done():
err := ctx.Err()
a.Logger.Info("update node loop stopped", "message", err.Error())
return
case <-ticker.C:
err := a.updateNode()
if err != nil {
a.Logger.Error(err, "failed to update node", "node_id", a.NodeID)
}
}
}
}

func (a *NodeAgent) Run() {
func (a *NodeAgent) Run(ctx context.Context) {
err := a.createNode()
if err != nil {
a.Logger.Error(err, "failed to create node, agent abort")
return
}
go a.updateNodeLoop()
go a.updateNodeLoop(ctx)
go a.subscribeConfigStatus(ctx)
}
2 changes: 2 additions & 0 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kong/kubernetes-ingress-controller/v2/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway"
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/konnect"
"github.com/kong/kubernetes-ingress-controller/v2/internal/manager/featuregates"
)

Expand Down Expand Up @@ -238,6 +239,7 @@ func (c *Config) FlagSet() *pflag.FlagSet {
flagSet.StringVar(&c.Konnect.TLSClient.CertFile, "konnect-tls-client-cert-file", "", "Konnect TLS client certificate file path.")
flagSet.StringVar(&c.Konnect.TLSClient.Key, "konnect-tls-client-key", "", "Konnect TLS client key.")
flagSet.StringVar(&c.Konnect.TLSClient.KeyFile, "konnect-tls-client-key-file", "", "Konnect TLS client key file path.")
flagSet.DurationVar(&c.Konnect.RefreshNodePeriod, "konnect-refresh-node-period", konnect.DefaultRefreshNodePeriod, "Period of uploading status of KIC and controlled kong gateway instances")

// Deprecated flags
_ = flagSet.Float32("sync-rate-limit", dataplane.DefaultSyncSeconds, "Use --proxy-sync-seconds instead")
Expand Down
30 changes: 21 additions & 9 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,35 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
// In case of failures when building Konnect related objects, we're not returning errors as Konnect is not
// considered critical feature, and it should not break the basic functionality of the controller.

konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectRuntimeGroup(ctx, c.Konnect)
if err != nil {
setupLog.Error(err, "failed creating Konnect Runtime Group Admin API client, skipping synchronisation")
} else {
setupLog.Info("Initialized Konnect Admin API client")
clientsManager.SetKonnectClient(konnectAdminAPIClient)
}

setupLog.Info("Start Konnect client to register runtime instances to Konnect")
konnectNodeAPIClient, err := konnect.NewNodeAPIClient(c.Konnect)
if err != nil {
setupLog.Error(err, "failed creating konnect client, skipping running NodeAgent")
} else {
hostname, _ := os.Hostname()
version := metadata.Release
agent := konnect.NewNodeAgent(hostname, version, setupLog, konnectNodeAPIClient)
agent.Run()
}
// set channel to send config status.
configStatusChan := make(chan dataplane.ConfigStatus, 1)
configStatusNotifier := dataplane.NewChannelConfigNotifier(configStatusChan)
randmonkey marked this conversation as resolved.
Show resolved Hide resolved
dataplaneClient.SetConfigStatusNotifier(configStatusNotifier)

konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectRuntimeGroup(ctx, c.Konnect)
if err != nil {
setupLog.Error(err, "failed creating Konnect Runtime Group Admin API client, skipping synchronisation")
} else {
setupLog.Info("Initialized Konnect Admin API client")
clientsManager.SetKonnectClient(konnectAdminAPIClient)
agent := konnect.NewNodeAgent(
hostname,
version,
c.Konnect.RefreshNodePeriod,
setupLog,
konnectNodeAPIClient,
configStatusNotifier,
)
agent.Run(ctx)
}
}

Expand Down