Skip to content

Commit

Permalink
resolve conflicts and add flag to set period of uploading node status
Browse files Browse the repository at this point in the history
  • Loading branch information
randmonkey committed Feb 13, 2023
1 parent 3b52070 commit 2850a4e
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 118 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ Adding a new version? You'll need three changes:
[#3521](https://github.com/Kong/kubernetes-ingress-controller/pull/3521)
- Leader election is enabled by default then kong admin service discovery is enabled.
[#3529](https://github.com/Kong/kubernetes-ingress-controller/pull/3529)
- Added flag `--konnect-refresh-node-period` to set the period of uploading
status of KIC instance to Konnect runtime group.
[#3533](https://github.com/Kong/kubernetes-ingress-controller/pull/3533)

### Fixed

Expand Down
1 change: 1 addition & 0 deletions internal/adminapi/konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type KonnectConfig struct {
ConfigSynchronizationEnabled bool
RuntimeGroupID string
Address string
RefreshNodePeriod time.Duration
TLSClient TLSClientConfig
}

Expand Down
107 changes: 0 additions & 107 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,8 @@ type KongClient struct {
// SHAs is a slice is configuration hashes send in last batch send.
SHAs []string

<<<<<<< HEAD
// clientsProvider allows retrieving the most recent set of clients.
clientsProvider AdminAPIClientsProvider
=======
// adminAPIClientFactory is a factory used for creating Admin API clients.
adminAPIClientFactory ClientFactory

// adminAPIAddressNotifyChan is used for notifications that contain Admin API
// endpoints list that should be used for configuring the dataplane.
adminAPIAddressNotifyChan chan []string

// hasTranslationErrorChan is used to notify konnect node agent the whether
// error happened in traslating k8s objects to kong configuration.
Expand All @@ -150,14 +142,6 @@ type KongClient struct {
// sendConfigErrorChan is used to notify whether error happened in sending
// translated configurations to kong.
sendConfigErrorChan chan error

close chan struct{}
onceClose sync.Once
}

type ClientFactory interface {
CreateAdminAPIClient(ctx context.Context, address string) (adminapi.Client, error)
>>>>>>> feat: update status of kic to konnect
}

// NewKongClient provides a new KongClient object after connecting to the
Expand Down Expand Up @@ -539,97 +523,6 @@ func (c *KongClient) sendToClient(
return string(newConfigSHA), nil
}

// adminAPIAddressNotifyLoop is an inner loop listening on notifyChan which are received via
// Notify() calls. Each time it receives on notifyChan tt will take the provided
// list of addresses and update the internally held list of clients such that:
// - the internal list of kong clients contains only the provided addresses
// - if a client for a provided address already exists it's not recreated again
// (hence no external calls are made to check the provided endpoint if there
// exists a client already using it)
// - client that do not exist in the provided address list are removed if they
// are present in the current state
//
// This function whill acquire the internal lock to prevent the modification of
// internal clients list.
func (c *KongClient) adminAPIAddressNotifyLoop(ctx context.Context) {
for {
select {
case <-c.close:
c.adminAPIAddressNotifyChan = nil
return

case addresses := <-c.adminAPIAddressNotifyChan:
// This call will only log errors e.g. during creation of new clients.
// If need be we might consider propagating those errors up the stack.
c.adjustKongClients(ctx, addresses)
}
}
}

// adjustKongClients adjusts internally stored clients slice based on the provided
// addresses slice. It consults BaseRootURLs of already stored clients with each
// of the addreses and creates only those clients that we don't have.
func (c *KongClient) adjustKongClients(ctx context.Context, addresses []string) {
c.lock.Lock()
defer c.lock.Unlock()

toAdd := lo.Filter(addresses, func(addr string, _ int) bool {
// If we already have a client with a provided address then great, no need
// to do anything.

// If we don't have a client with new address then filter it and add
// a client for this address.
return !lo.ContainsBy(c.kongConfig.Clients, func(cl adminapi.Client) bool {
return addr == cl.BaseRootURL()
})
})

var idxToRemove []int
for i, cl := range c.kongConfig.Clients {
// If the new address set contains a client that we already have then
// good, no need to do anything for it.
if lo.Contains(addresses, cl.BaseRootURL()) {
continue
}
// If the new address set does not contain an address that we already
// have then remove it.
idxToRemove = append(idxToRemove, i)
}

for i := len(idxToRemove) - 1; i >= 0; i-- {
idx := idxToRemove[i]
c.kongConfig.Clients = append(c.kongConfig.Clients[:idx], c.kongConfig.Clients[idx+1:]...)
}

for _, addr := range toAdd {
client, err := c.adminAPIClientFactory.CreateAdminAPIClient(ctx, addr)
if err != nil {
c.logger.WithError(err).Errorf("failed to create a client for %s", addr)
continue
}

c.kongConfig.Clients = append(c.kongConfig.Clients, client)
}
}

// Notify receives a list of addresses that KongClient should use from now on as
// a list of Kong Admin API endpoints.
func (c *KongClient) Notify(addresses []string) {
// Ensure here that we're not closed.
select {
case <-c.close:
return
default:
}

// And here also listen on c.close to allow the notification to be interrupted
// by Shutdown().
select {
case <-c.close:
case c.adminAPIAddressNotifyChan <- addresses:
}
}

// SetHasTranslationFailureChan sets the channel to receive the status of whether
// translation failure happens.
func (c *KongClient) SetHasTranslationFailureChan(ch chan bool) {
Expand Down
28 changes: 17 additions & 11 deletions internal/konnect/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/kong/kubernetes-ingress-controller/v2/internal/util"
)

const defaultRefreshNodeInterval = 30 * time.Second
const (
MinRefreshNodePeriod = 30 * time.Second
DefaultRefreshNodePeriod = 60 * time.Second
)

type NodeAgent struct {
NodeID string
Expand All @@ -18,8 +21,8 @@ type NodeAgent struct {

Logger logr.Logger

konnectClient *Client
refreshInterval time.Duration
konnectClient *Client
refreshPeriod time.Duration

hasTranslationFailureChan chan bool
hasTranslationFailure bool
Expand All @@ -31,20 +34,22 @@ type NodeAgent struct {
func NewNodeAgent(
hostname string,
version string,
refreshPeriod time.Duration,
hasTranslationFailureChan chan bool,
sendConfigErrorChan chan error,
logger logr.Logger,
client *Client,
) *NodeAgent {
if refreshPeriod < MinRefreshNodePeriod {
refreshPeriod = MinRefreshNodePeriod
}
return &NodeAgent{
Hostname: hostname,
Version: version,
Logger: logger.
WithName("konnect-node").WithValues("runtime_group_id", client.RuntimeGroupID),
konnectClient: client,
// TODO: set refresh interval by some flag
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
refreshInterval: defaultRefreshNodeInterval,
refreshPeriod: refreshPeriod,
}
}

Expand All @@ -58,7 +63,7 @@ func (a *NodeAgent) createNode() error {
}
resp, err := a.konnectClient.CreateNode(createNodeReq)
if err != nil {
return fmt.Errorf("failed to update node, hostname %s: %w", a.Hostname, err)
return fmt.Errorf("failed to create node, hostname %s: %w", a.Hostname, err)
}

a.NodeID = resp.Item.ID
Expand All @@ -73,6 +78,10 @@ func (a *NodeAgent) clearOutdatedNodes() error {
}

for _, node := range nodes.Items {
// REVIEW: what should the condition be to delete the node in Konnect RG?
// (1) Do we check the "last update" of the node, and only delete it when the last update is too old(say, 5 mins ago)?
// (2) What if there is a node with the same name but not the same node exists?
// for example, When KIC runs in minikube/kind env and whole cluster is stopped then started again.
if node.Type == NodeTypeIngressController && node.Hostname != a.Hostname {
a.Logger.V(util.DebugLevel).Info("remove outdated KIC node", "node_id", node.ID, "hostname", node.Hostname)
err := a.konnectClient.DeleteNode(node.ID)
Expand All @@ -98,7 +107,6 @@ func (a *NodeAgent) updateNode() error {
err := a.clearOutdatedNodes()
if err != nil {
a.Logger.Error(err, "failed to clear outdated nodes")
return err
}

ingressControllerStatus := a.calculateStatus()
Expand All @@ -121,10 +129,8 @@ func (a *NodeAgent) updateNode() error {
}

func (a *NodeAgent) updateNodeLoop() {
ticker := time.NewTicker(a.refreshInterval)
ticker := time.NewTicker(a.refreshPeriod)
defer ticker.Stop()
// TODO: add some mechanism to break the loop
// https://github.com/Kong/kubernetes-ingress-controller/issues/3515
for range ticker.C {
err := a.updateNode()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kong/kubernetes-ingress-controller/v2/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway"
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
"github.com/kong/kubernetes-ingress-controller/v2/internal/konnect"
)

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -237,6 +238,7 @@ func (c *Config) FlagSet() *pflag.FlagSet {
flagSet.StringVar(&c.Konnect.TLSClient.CertFile, "konnect-tls-client-cert-file", "", "Konnect TLS client certificate file path.")
flagSet.StringVar(&c.Konnect.TLSClient.Key, "konnect-tls-client-key", "", "Konnect TLS client key.")
flagSet.StringVar(&c.Konnect.TLSClient.KeyFile, "konnect-tls-client-key-file", "", "Konnect TLS client key file path.")
flagSet.DurationVar(&c.Konnect.RefreshNodePeriod, "konnect-refresh-node-period", konnect.DefaultRefreshNodePeriod, "Period of uploading status of KIC and controlled kong gateway instances")

// Deprecated flags
_ = flagSet.Float32("sync-rate-limit", dataplane.DefaultSyncSeconds, "Use --proxy-sync-seconds instead")
Expand Down
1 change: 1 addition & 0 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
version := metadata.Release
agent := konnect.NewNodeAgent(
hostname, version,
c.Konnect.RefreshNodePeriod,
hasTranslationFailureChan,
sendConfigErrorChan,
setupLog, konnectClient,
Expand Down

0 comments on commit 2850a4e

Please sign in to comment.