Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clustermesh: ensure that the status of the remote clusters controller is correcty reported #26271

Merged
merged 1 commit into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/clustermesh/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var _ = Suite(&ClusterMeshTestSuite{})

type fakeRemoteCluster struct{}

func (*fakeRemoteCluster) Run(context.Context, kvstore.BackendOperations, *types.CiliumClusterConfig) error {
return nil
func (*fakeRemoteCluster) Run(_ context.Context, _ kvstore.BackendOperations, _ *types.CiliumClusterConfig, ready chan<- error) {
close(ready)
}
func (*fakeRemoteCluster) Stop() {}
func (*fakeRemoteCluster) Remove() {}
Expand Down
39 changes: 27 additions & 12 deletions pkg/clustermesh/internal/remote_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
)

type RemoteCluster interface {
Run(ctx context.Context, backend kvstore.BackendOperations, config *types.CiliumClusterConfig) error
// Run implements the actual business logic once the connection to the remote cluster has been established.
// The ready channel shall be closed when the initialization tasks completed, possibly returning an error.
Run(ctx context.Context, backend kvstore.BackendOperations, config *types.CiliumClusterConfig, ready chan<- error)

Stop()
Remove()
Expand Down Expand Up @@ -58,6 +60,10 @@ type remoteCluster struct {

controllers *controller.Manager

// wg is used to wait for the termination of the goroutines spawned by the
// controller upon reconnection for long running background tasks.
wg sync.WaitGroup

// remoteConnectionControllerName is the name of the backing controller
// that maintains the remote connection
remoteConnectionControllerName string
Expand Down Expand Up @@ -107,6 +113,11 @@ func (rc *remoteCluster) getLogger() *logrus.Entry {

// releaseOldConnection releases the etcd connection to a remote cluster
func (rc *remoteCluster) releaseOldConnection() {
rc.metricReadinessStatus.Set(metrics.BoolToFloat64(false))

// Make sure that all child goroutines terminated before performing cleanup.
rc.wg.Wait()

rc.mutex.Lock()
backend := rc.backend
rc.backend = nil
Expand Down Expand Up @@ -163,28 +174,32 @@ func (rc *remoteCluster) restartRemoteConnection() {
rc.backend = backend
rc.mutex.Unlock()

rc.metricReadinessStatus.Set(metrics.BoolToFloat64(true))
defer rc.metricReadinessStatus.Set(metrics.BoolToFloat64(false))

ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)

rc.wg.Add(1)
go func() {
rc.watchdog(ctx, backend)
wg.Done()
rc.wg.Done()
}()

defer func() {
ready := make(chan error)

// Let's execute the long running logic in background. This allows
// to return early from the controller body, so that the statistics
// are updated correctly. Instead, blocking until rc.Run terminates
// would prevent a previous failure from being cleared out.
rc.wg.Add(1)
go func() {
rc.Run(ctx, backend, config, ready)
cancel()
wg.Wait()
rc.wg.Done()
}()

if err := rc.Run(ctx, backend, config); err != nil {
rc.getLogger().WithError(err).Error("Connection to remote cluster failed")
if <-ready != nil {
rc.getLogger().WithError(err).Warning("Connection to remote cluster failed")
return err
}

rc.metricReadinessStatus.Set(metrics.BoolToFloat64(true))
return nil
},
StopFunc: func(ctx context.Context) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/clustermesh/kvstoremesh/kvstoremesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,17 @@ func TestRemoteClusterRun(t *testing.T) {

km := KVStoreMesh{backend: kvstore.Client()}
rc := km.newRemoteCluster("foo", nil)
ready := make(chan error)

wg.Add(1)
go func() {
rc.Run(ctx, remoteClient, tt.srccfg)
rc.Run(ctx, remoteClient, tt.srccfg, ready)
rc.Stop()
wg.Done()
}()

require.NoError(t, <-ready, "rc.Run() failed")

// Assert that the cluster config got properly propagated
require.EventuallyWithT(t, func(c *assert.CollectT) {
cfg, err := utils.GetClusterConfig(ctx, "foo", kvstore.Client())
Expand Down
8 changes: 5 additions & 3 deletions pkg/clustermesh/kvstoremesh/remote_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type remoteCluster struct {
wg sync.WaitGroup
}

func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperations, srccfg *types.CiliumClusterConfig) error {
func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperations, srccfg *types.CiliumClusterConfig, ready chan<- error) {
dstcfg := types.CiliumClusterConfig{
Capabilities: types.CiliumClusterConfigCapabilities{
SyncedCanaries: true,
Expand All @@ -48,7 +48,9 @@ func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperati
}

if err := cmutils.SetClusterConfig(ctx, rc.name, &dstcfg, rc.localBackend); err != nil {
return fmt.Errorf("failed to propagate cluster configuration: %w", err)
ready <- fmt.Errorf("failed to propagate cluster configuration: %w", err)
close(ready)
return
}

var capabilities types.CiliumClusterConfigCapabilities
Expand Down Expand Up @@ -94,8 +96,8 @@ func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperati
rc.identities.watcher.Watch(ctx, backend, path.Join(adapter(identityCache.IdentitiesPath), suffix))
})

close(ready)
mgr.Run(ctx)
return nil
}

func (rc *remoteCluster) Stop() {
Expand Down
12 changes: 8 additions & 4 deletions pkg/clustermesh/remote_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ type remoteCluster struct {
swg *lock.StoppableWaitGroup
}

func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperations, config *cmtypes.CiliumClusterConfig) error {
func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperations, config *cmtypes.CiliumClusterConfig, ready chan<- error) {
if err := rc.mesh.canConnect(rc.name, config); err != nil {
return err
ready <- err
close(ready)
return
}

var capabilities types.CiliumClusterConfigCapabilities
Expand All @@ -72,7 +74,9 @@ func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperati

remoteIdentityCache, err := rc.mesh.conf.RemoteIdentityWatcher.WatchRemoteIdentities(rc.name, backend, capabilities.Cached)
if err != nil {
return err
ready <- err
close(ready)
return
}

rc.mutex.Lock()
Expand Down Expand Up @@ -108,8 +112,8 @@ func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperati
rc.remoteIdentityCache.Watch(ctx)
})

close(ready)
mgr.Run(ctx)
return nil
}

func (rc *remoteCluster) Stop() {}
Expand Down
5 changes: 4 additions & 1 deletion pkg/clustermesh/remote_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func TestRemoteClusterRun(t *testing.T) {
globalServices: newGlobalServiceCache("cluster", "node"),
}
rc := cm.newRemoteCluster("foo", nil).(*remoteCluster)
ready := make(chan error)

remoteClient := &remoteEtcdClientWrapper{
BackendOperations: kvstore.Client(),
Expand All @@ -163,10 +164,12 @@ func TestRemoteClusterRun(t *testing.T) {

wg.Add(1)
go func() {
rc.Run(ctx, remoteClient, tt.srccfg)
rc.Run(ctx, remoteClient, tt.srccfg, ready)
wg.Done()
}()

require.NoError(t, <-ready, "rc.Run() failed")

// Assert that we correctly watch nodes
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.EqualValues(c, 1, rc.remoteNodes.NumEntries())
Expand Down