Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clustermesh: allow waiting for the CiliumClusterConfig to appear when required #25671

Merged
merged 4 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion clustermesh-apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, servi
ID: cfg.clusterID,
}

if err := clustermesh.SetClusterConfig(cfg.clusterName, &config, kvstore.Client()); err != nil {
if err := clustermesh.SetClusterConfig(context.Background(), cfg.clusterName, &config, kvstore.Client()); err != nil {
log.WithError(err).Fatal("Unable to set local cluster config on kvstore")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ spec:
- |
rm -rf /var/run/etcd/*;
/usr/local/bin/etcd --data-dir=/var/run/etcd --name=clustermesh-apiserver --listen-client-urls=http://127.0.0.1:2379 --advertise-client-urls=http://127.0.0.1:2379 --initial-cluster-token=clustermesh-apiserver --initial-cluster-state=new --auto-compaction-retention=1 &

# The following key needs to be created before that the cilium agents
# have the possibility of connecting to etcd.
etcdctl put cilium/.has-cluster-config true

etcdctl user add root --no-password;
etcdctl user grant-role root root;
etcdctl user add admin-{{ .Values.cluster.name }} --no-password;
Expand Down
18 changes: 14 additions & 4 deletions pkg/clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ type Configuration struct {
ClusterSizeDependantInterval kvstore.ClusterSizeDependantIntervalFunc `optional:"true"`
}

func SetClusterConfig(clusterName string, config *cmtypes.CiliumClusterConfig, backend kvstore.BackendOperations) error {
func SetClusterConfig(ctx context.Context, clusterName string, config *cmtypes.CiliumClusterConfig, backend kvstore.BackendOperations) error {
key := path.Join(kvstore.ClusterConfigPrefix, clusterName)

val, err := json.Marshal(config)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

_, err = backend.UpdateIfDifferent(ctx, key, val, true)
Expand All @@ -85,10 +85,10 @@ func SetClusterConfig(clusterName string, config *cmtypes.CiliumClusterConfig, b
return nil
}

func GetClusterConfig(clusterName string, backend kvstore.BackendOperations) (*cmtypes.CiliumClusterConfig, error) {
func GetClusterConfig(ctx context.Context, clusterName string, backend kvstore.BackendOperations) (*cmtypes.CiliumClusterConfig, error) {
var config cmtypes.CiliumClusterConfig

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

val, err := backend.Get(ctx, path.Join(kvstore.ClusterConfigPrefix, clusterName))
Expand All @@ -108,6 +108,16 @@ func GetClusterConfig(clusterName string, backend kvstore.BackendOperations) (*c
return &config, nil
}

// IsClusterConfigRequired returns whether the remote kvstore guarantees that the
// cilium cluster config will be eventually created.
func IsClusterConfigRequired(ctx context.Context, backend kvstore.BackendOperations) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

val, err := backend.Get(ctx, kvstore.HasClusterConfigPath)
return val != nil, err
}

// RemoteIdentityWatcher is any type which provides identities that have been
// allocated on a remote cluster.
type RemoteIdentityWatcher interface {
Expand Down
11 changes: 6 additions & 5 deletions pkg/clustermesh/clustermesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ func (o *testObserver) OnDelete(k store.NamedKey) {
}

func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kvstore.SetupDummy("etcd")
defer kvstore.Client().Close(context.TODO())
defer kvstore.Client().Close(ctx)

identity.InitWellKnownIdentities(&fakeConfig.Config{})
// The nils are only used by k8s CRD identities. We default to kvstore.
Expand All @@ -118,7 +121,7 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
ID: uint32(i),
}

err = SetClusterConfig(name, &config, kvstore.Client())
err = SetClusterConfig(ctx, name, &config, kvstore.Client())
c.Assert(err, IsNil)
}

Expand All @@ -134,8 +137,6 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
err = os.WriteFile(config3, etcdConfig, 0644)
c.Assert(err, IsNil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ipc := ipcache.NewIPCache(&ipcache.Configuration{
Context: ctx,
})
Expand Down Expand Up @@ -163,7 +164,7 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
for _, rc := range cm.clusters {
rc.mutex.RLock()
for _, name := range nodeNames {
err = rc.remoteNodes.UpdateLocalKeySync(context.TODO(), &testNode{Name: name, Cluster: rc.name})
err = rc.remoteNodes.UpdateLocalKeySync(ctx, &testNode{Name: name, Cluster: rc.name})
c.Assert(err, IsNil)
}
rc.mutex.RUnlock()
Expand Down
61 changes: 60 additions & 1 deletion pkg/clustermesh/remote_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package clustermesh

import (
"context"
"errors"
"fmt"
"path"
"time"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/pkg/allocator"
"github.com/cilium/cilium/pkg/clustermesh/types"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/defaults"
Expand Down Expand Up @@ -185,7 +187,7 @@ func (rc *remoteCluster) restartRemoteConnection(allocator RemoteIdentityWatcher

rc.getLogger().Info("Connection to remote cluster established")

config, err := GetClusterConfig(rc.name, backend)
config, err := rc.getClusterConfig(ctx, backend, false)
if err == nil && config == nil {
rc.getLogger().Warning("Remote cluster doesn't have cluster configuration, falling back to the old behavior. This is expected when connecting to the old cluster running Cilium without cluster configuration feature.")
} else if err == nil {
Expand Down Expand Up @@ -274,6 +276,63 @@ func (rc *remoteCluster) restartRemoteConnection(allocator RemoteIdentityWatcher
)
}

func (rc *remoteCluster) getClusterConfig(ctx context.Context, backend kvstore.BackendOperations, forceRequired bool) (*cmtypes.CiliumClusterConfig, error) {
var (
err error
requireConfig = forceRequired
clusterConfigRetrievalTimeout = 3 * time.Minute
)

ctx, cancel := context.WithTimeout(ctx, clusterConfigRetrievalTimeout)
defer cancel()

if !requireConfig {
// Let's check whether the kvstore states that the cluster configuration should be always present.
requireConfig, err = IsClusterConfigRequired(ctx, backend)
if err != nil {
return nil, fmt.Errorf("failed to detect whether the cluster configuration is required: %w", err)
}
}

cfgch := make(chan *types.CiliumClusterConfig)
defer close(cfgch)

// We retry here rather than simply returning an error and relying on the external
// controller backoff period to avoid recreating every time a new connection to the remote
// kvstore, which would introduce an unnecessary overhead. Still, we do return in case of
// consecutive failures, to ensure that we do not retry forever if something strange happened.
ctrlname := rc.remoteConnectionControllerName + "-cluster-config"
defer rc.controllers.RemoveControllerAndWait(ctrlname)
rc.controllers.UpdateController(ctrlname, controller.ControllerParams{
DoFunc: func(ctx context.Context) error {
rc.getLogger().Debug("Retrieving cluster configuration from remote kvstore")
config, err := GetClusterConfig(ctx, rc.name, backend)
if err != nil {
return err
}

if config == nil && requireConfig {
return errors.New("cluster configuration expected to be present but not found")
}

// We should stop retrying in case we either successfully retrieved the cluster
// configuration, or we are not required to wait for it.
cfgch <- config
return nil
},
Context: ctx,
MaxRetryInterval: 30 * time.Second,
})

// Wait until either the configuration is retrieved, or the context expires
select {
case config := <-cfgch:
return config, nil
case <-ctx.Done():
return nil, fmt.Errorf("failed to retrieve cluster configuration")
}
}

func (rc *remoteCluster) makeEtcdOpts() map[string]string {
opts := map[string]string{
kvstore.EtcdOptionConfig: rc.configPath,
Expand Down
7 changes: 4 additions & 3 deletions pkg/clustermesh/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (s *ClusterMeshServicesTestSuite) SetUpSuite(c *C) {
}

func (s *ClusterMeshServicesTestSuite) SetUpTest(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kvstore.SetupDummy("etcd")

s.randomName = rand.RandomString()
Expand All @@ -77,7 +80,7 @@ func (s *ClusterMeshServicesTestSuite) SetUpTest(c *C) {
config := cmtypes.CiliumClusterConfig{
ID: uint32(i),
}
err := SetClusterConfig(cluster, &config, kvstore.Client())
err := SetClusterConfig(ctx, cluster, &config, kvstore.Client())
c.Assert(err, IsNil)
}

Expand All @@ -89,8 +92,6 @@ func (s *ClusterMeshServicesTestSuite) SetUpTest(c *C) {
err = os.WriteFile(config2, etcdConfig, 0644)
c.Assert(err, IsNil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ipc := ipcache.NewIPCache(&ipcache.Configuration{
Context: ctx,
})
Expand Down
9 changes: 9 additions & 0 deletions pkg/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ const (
// the heartbeat
HeartbeatPath = BaseKeyPrefix + "/.heartbeat"

// HasClusterConfigPath is the path to the key used to convey that the cluster
// configuration will be eventually created, and remote cilium agents shall
// wait until it is present. If this key is not set, the cilium configuration
// might, or might not, be configured, but the agents will continue regardless,
// falling back to the backward compatible behavior. It must be set before that
// the agents have the possibility to connect to the kvstore (that is, when
// it is not yet exposed). The corresponding values is ignored.
HasClusterConfigPath = BaseKeyPrefix + "/.has-cluster-config"

// ClusterConfigPrefix is the kvstore prefix to cluster configuration
ClusterConfigPrefix = BaseKeyPrefix + "/cluster-config"

Expand Down