Skip to content

Commit

Permalink
make multicluster secret controller handle updates
Browse files Browse the repository at this point in the history
The multicluster secret controller was silently ignoring updates to
secrets. Users had to delete and re-add secrets if they wanted to
change the credentials for remote clusters. This PR adds update
support and also improves the unit tests.

fixes istio#18708
  • Loading branch information
ayj committed Mar 5, 2020
1 parent b966cdb commit ef19d83
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 105 deletions.
10 changes: 9 additions & 1 deletion mixer/adapter/kubernetesenv/kubernetesenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ func (b *builder) createCacheController(k8sInterface k8s.Interface, clusterID st
return b.kubeHandler.env.Logger().Errorf("error on creating remote controller %s err = %v", clusterID, err)
}

func (b *builder) updateCacheController(k8sInterface k8s.Interface, clusterID string) error {
if err := b.deleteCacheController(clusterID); err != nil {
return err
}
return b.createCacheController(k8sInterface, clusterID)
}

func (b *builder) deleteCacheController(clusterID string) error {
b.Lock()
delete(b.controllers, clusterID)
Expand Down Expand Up @@ -418,7 +425,8 @@ func initMultiClusterSecretController(b *builder, kubeconfig string, env adapter
return fmt.Errorf("could not create K8s client: %v", err)
}

err = secretcontroller.StartSecretController(kubeClient, b.createCacheController, b.deleteCacheController, clusterNs)
err = secretcontroller.StartSecretController(kubeClient, b.createCacheController,
b.updateCacheController, b.deleteCacheController, clusterNs)
if err != nil {
return fmt.Errorf("could not start secret controller: %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewMulticluster(kc kubernetes.Interface, secretNamespace string,

err := secretcontroller.StartSecretController(kc,
mc.AddMemberCluster,
mc.UpdateMemberCluster,
mc.DeleteMemberCluster,
secretNamespace)
return mc, err
Expand Down Expand Up @@ -107,6 +108,13 @@ func (m *Multicluster) AddMemberCluster(clientset kubernetes.Interface, clusterI
return nil
}

func (m *Multicluster) UpdateMemberCluster(clientset kubernetes.Interface, clusterID string) error {
if err := m.DeleteMemberCluster(clusterID); err != nil {
return err
}
return m.AddMemberCluster(clientset, clusterID)
}

// DeleteMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is deleted. Also must clear the cache so remote resources
// are removed.
Expand Down
117 changes: 87 additions & 30 deletions pkg/kube/secretcontroller/secretcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package secretcontroller

import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"reflect"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -52,6 +56,9 @@ var CreateInterfaceFromClusterConfig = kube.CreateInterfaceFromClusterConfig
// addSecretCallback prototype for the add secret callback function.
type addSecretCallback func(clientset kubernetes.Interface, dataKey string) error

// updateSecretCallback prototype for the update secret callback function.
type updateSecretCallback func(clientset kubernetes.Interface, dataKey string) error

// removeSecretCallback prototype for the remove secret callback function.
type removeSecretCallback func(dataKey string) error

Expand All @@ -63,12 +70,15 @@ type Controller struct {
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
addCallback addSecretCallback
updateCallback updateSecretCallback
removeCallback removeSecretCallback
}

// RemoteCluster defines cluster structZZ
type RemoteCluster struct {
secretName string
secretName string
client kubernetes.Interface
kubeConfigSha [sha256.Size]byte
}

// ClusterStore is a collection of clusters
Expand All @@ -90,6 +100,7 @@ func NewController(
namespace string,
cs *ClusterStore,
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback) *Controller {

secretsInformer := cache.NewSharedIndexInformer(
Expand All @@ -115,6 +126,7 @@ func NewController(
informer: secretsInformer,
queue: queue,
addCallback: addCallback,
updateCallback: updateCallback,
removeCallback: removeCallback,
}

Expand All @@ -127,6 +139,17 @@ func NewController(
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if oldObj == newObj || reflect.DeepEqual(oldObj, newObj) {
return
}

key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Processing update: %s", key)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Processing delete: %s", key)
Expand Down Expand Up @@ -160,11 +183,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
// StartSecretController creates the secret controller.
func StartSecretController(k8s kubernetes.Interface,
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback,
namespace string) error {
stopCh := make(chan struct{})
clusterStore := newClustersStore()
controller := NewController(k8s, namespace, clusterStore, addCallback, removeCallback)
controller := NewController(k8s, namespace, clusterStore, addCallback, updateCallback, removeCallback)

go controller.Run(stopCh)

Expand Down Expand Up @@ -215,56 +239,89 @@ func (c *Controller) processItem(secretName string) error {
return nil
}

func createRemoteCluster(kubeConfig []byte, secretName string) (*RemoteCluster, error) {
if len(kubeConfig) == 0 {
return nil, errors.New("kubeconfig is empty")
}

clientConfig, err := LoadKubeConfig(kubeConfig)
if err != nil {
return nil, fmt.Errorf("kubeconfig cannot be loaded: %v", err)
}

if err := ValidateClientConfig(*clientConfig); err != nil {
return nil, fmt.Errorf("kubeconfig is not valid: %v", err)
}

client, err := CreateInterfaceFromClusterConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("couldn't create client interface: %v", err)
}

return &RemoteCluster{
secretName: secretName,
client: client,
kubeConfigSha: sha256.Sum256(kubeConfig),
}, nil
}

func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
for clusterID, kubeConfig := range s.Data {
// clusterID must be unique even across multiple secrets
if _, ok := c.cs.remoteClusters[clusterID]; !ok {
if len(kubeConfig) == 0 {
log.Infof("Data '%s' in the secret %s in namespace %s is empty, and disregarded ",
clusterID, secretName, s.Namespace)
continue
}
if prev, ok := c.cs.remoteClusters[clusterID]; !ok {
log.Infof("Adding cluster_id=%v from secret=%v", clusterID, secretName)

clientConfig, err := LoadKubeConfig(kubeConfig)
remoteCluster, err := createRemoteCluster(kubeConfig, secretName)
if err != nil {
log.Infof("Data '%s' in the secret %s in namespace %s is not a kubeconfig: %v",
clusterID, secretName, s.Namespace, err)
log.Errorf("Failed to add remote cluster from secret=%v for cluster_id=%v: %v",
secretName, clusterID, err)
continue
}

if err := ValidateClientConfig(*clientConfig); err != nil {
log.Errorf("Data '%s' in the secret %s in namespace %s is not a valid kubeconfig: %v",
clusterID, secretName, s.Namespace, err)
continue
c.cs.remoteClusters[clusterID] = remoteCluster
if err := c.addCallback(remoteCluster.client, clusterID); err != nil {
log.Errorf("Error creating cluster_id=%s from secret %v: %v",
clusterID, secretName, err)
}

log.Infof("Adding new cluster member: %s", clusterID)
c.cs.remoteClusters[clusterID] = &RemoteCluster{}
c.cs.remoteClusters[clusterID].secretName = secretName
client, err := CreateInterfaceFromClusterConfig(clientConfig)
if err != nil {
log.Errorf("error during create of kubernetes client interface for cluster: %s %v", clusterID, err)
} else {
if prev.secretName != secretName {
log.Errorf("ClusterID reused in two different secrets: %v and %v. ClusterID "+
"must be unique across all secrets", prev.secretName, secretName)
continue
}
err = c.addCallback(client, clusterID)
if err != nil {
log.Errorf("error during create of clusterID: %s %v", clusterID, err)

kubeConfigSha := sha256.Sum256(kubeConfig)
if bytes.Equal(kubeConfigSha[:], prev.kubeConfigSha[:]) {
log.Infof("Updating cluster_id=%v from secret=%v: (kubeconfig are identical)", clusterID, secretName)
} else {
log.Infof("Updating cluster %v from secret %v", clusterID, secretName)

remoteCluster, err := createRemoteCluster(kubeConfig, secretName)
if err != nil {
log.Errorf("Error updating cluster_id=%v from secret=%v: %v",
clusterID, secretName, err)
continue
}
c.cs.remoteClusters[clusterID] = remoteCluster
if err := c.updateCallback(remoteCluster.client, clusterID); err != nil {
log.Errorf("Error updating cluster_id from secret=%v: %s %v",
clusterID, secretName, err)
}
}
} else {
log.Infof("Cluster %s in the secret %s in namespace %s already exists",
clusterID, c.cs.remoteClusters[clusterID].secretName, s.Namespace)
}
}

log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters))
}

func (c *Controller) deleteMemberCluster(secretName string) {
for clusterID, cluster := range c.cs.remoteClusters {
if cluster.secretName == secretName {
log.Infof("Deleting cluster member: %s", clusterID)
log.Infof("Deleting cluster_id=%v configured by secret=%v", clusterID, secretName)
err := c.removeCallback(clusterID)
if err != nil {
log.Errorf("error during cluster delete: %s %v", clusterID, err)
log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
clusterID, secretName, err)
}
delete(c.cs.remoteClusters, clusterID)
}
Expand Down
Loading

0 comments on commit ef19d83

Please sign in to comment.