Skip to content

Commit

Permalink
upgrade test: fix flaky peering through mesh gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
huikang committed Feb 15, 2023
1 parent 247211d commit 7e23647
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 29 deletions.
2 changes: 1 addition & 1 deletion test/integration/consul-container/libs/assert/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
return &retry.Timer{Timeout: 60 * time.Second, Wait: 500 * time.Millisecond}
}

retry.RunWith(failer(), t, func(r *retry.R) {
Expand Down
7 changes: 7 additions & 0 deletions test/integration/consul-container/libs/service/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ func (g ConnectContainer) Start() error {
return g.container.Start(g.ctx)
}

func (g ConnectContainer) Stop() error {
if g.container == nil {
return fmt.Errorf("container has not been initialized")
}
return g.container.Stop(context.Background(), nil)
}

func (g ConnectContainer) Terminate() error {
return cluster.TerminateContainer(g.ctx, g.container, true)
}
Expand Down
7 changes: 7 additions & 0 deletions test/integration/consul-container/libs/service/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func (g exampleContainer) Start() error {
return g.container.Start(context.Background())
}

func (g exampleContainer) Stop() error {
if g.container == nil {
return fmt.Errorf("container has not been initialized")
}
return g.container.Stop(context.Background(), nil)
}

func (c exampleContainer) Terminate() error {
return cluster.TerminateContainer(c.ctx, c.container, true)
}
Expand Down
7 changes: 7 additions & 0 deletions test/integration/consul-container/libs/service/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ func (g gatewayContainer) Start() error {
return g.container.Start(context.Background())
}

func (g gatewayContainer) Stop() error {
if g.container == nil {
return fmt.Errorf("container has not been initialized")
}
return g.container.Stop(context.Background(), nil)
}

func (c gatewayContainer) Terminate() error {
return cluster.TerminateContainer(c.ctx, c.container, true)
}
Expand Down
1 change: 1 addition & 0 deletions test/integration/consul-container/libs/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Service interface {
GetName() string
GetServiceName() string
Start() (err error)
Stop() (err error)
Terminate() error
Restart() error
GetStatus() (string, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type BuiltCluster struct {
func BasicPeeringTwoClustersSetup(
t *testing.T,
consulVersion string,
peeringThroughMeshgateway bool,
) (*BuiltCluster, *BuiltCluster) {
// acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, "dc1", 3, consulVersion, true)
acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, 3, &libcluster.BuildOptions{
Expand All @@ -53,14 +54,45 @@ func BasicPeeringTwoClustersSetup(
ConsulVersion: consulVersion,
InjectAutoEncryption: true,
})

// Create the mesh gateway for dataplane traffic and peering control plane traffic (if enabled)
acceptingClusterGateway, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", acceptingCluster.Clients()[0])
require.NoError(t, err)
dialingClusterGateway, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", dialingCluster.Clients()[0])
require.NoError(t, err)

// Enable peering control plane traffic through mesh gateway
if peeringThroughMeshgateway {
req := &api.MeshConfigEntry{
Peering: &api.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}
configCluster := func(cli *api.Client) error {
libassert.CatalogServiceExists(t, cli, "mesh")
ok, _, err := cli.ConfigEntries().Set(req, &api.WriteOptions{})
if !ok {
return fmt.Errorf("config entry is not set")
}

if err != nil {
return fmt.Errorf("error writing config entry: %s", err)
}
return nil
}
err = configCluster(dialingClient)
require.NoError(t, err)
err = configCluster(acceptingClient)
require.NoError(t, err)
}

require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))

libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
// libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)

// Register an static-server service in acceptingCluster and export to dialing cluster
var serverService, serverSidecarService libservice.Service
var acceptingClusterGateway libservice.Service
{
clientNode := acceptingCluster.Clients()[0]

Expand All @@ -81,15 +113,10 @@ func BasicPeeringTwoClustersSetup(
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")

require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))

// Create the mesh gateway for dataplane traffic
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)
}

// Register an static-client service in dialing cluster and set upstream to static-server service
var clientSidecarService *libservice.ConnectContainer
var dialingClusterGateway libservice.Service
{
clientNode := dialingCluster.Clients()[0]

Expand All @@ -100,9 +127,6 @@ func BasicPeeringTwoClustersSetup(

libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")

// Create the mesh gateway for dataplane traffic
dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)
}

_, adminPort := clientSidecarService.GetAdminAddr()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
t.Parallel()

accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.TargetVersion)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.TargetVersion, false)
var (
acceptingCluster = accepting.Cluster
dialingCluster = dialing.Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
}

run := func(t *testing.T, tc testcase) {
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion, true)
var (
acceptingCluster = accepting.Cluster
dialingCluster = dialing.Cluster
Expand All @@ -54,26 +54,16 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
acceptingClient, err := acceptingCluster.GetClient(nil, false)
require.NoError(t, err)

// Enable peering control plane traffic through mesh gateway
req := &api.MeshConfigEntry{
Peering: &api.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}
ok, _, err := dialingClient.ConfigEntries().Set(req, &api.WriteOptions{})
require.True(t, ok)
require.NoError(t, err)
ok, _, err = acceptingClient.ConfigEntries().Set(req, &api.WriteOptions{})
require.True(t, ok)
require.NoError(t, err)

// Verify control plane endpoints and traffic in gateway
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc2.peering", "HEALTHY", 1)
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 1)
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.server.dc1.peering",
"upstream_cx_total", 1)

// Upgrade the accepting cluster and assert peering is still ACTIVE
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
Expand All @@ -90,18 +80,20 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
// - Register a new static-client service in dialing cluster and
// - set upstream to static-server service in peered cluster

// Restart the gateway & proxy sidecar
// Stop the accepting gateway and restart dialing gateway
// to force peering control plane traffic through dialing mesh gateway
require.NoError(t, accepting.Gateway.Stop())
require.NoError(t, dialing.Gateway.Restart())
require.NoError(t, dialing.Container.Restart())

// Restarted gateway should not have any measurement on data plane traffic
// Restarted dialing gateway should not have any measurement on data plane traffic
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 0)
// control plane metrics should be observed
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.server.dc1.peering",
"upstream_cx_total", 1)
require.NoError(t, accepting.Gateway.Start())

clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
}

run := func(t *testing.T, tc testcase) {
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion, false)
var (
acceptingCluster = accepting.Cluster
dialingCluster = dialing.Cluster
Expand Down

0 comments on commit 7e23647

Please sign in to comment.