Skip to content

Commit

Permalink
Merge pull request #14694 from hashicorp/peering/mgw-addrs
Browse files Browse the repository at this point in the history
  • Loading branch information
freddygv authored Oct 3, 2022
2 parents de04344 + a8c4d6b commit 74b87d4
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 80 deletions.
35 changes: 33 additions & 2 deletions agent/consul/peering_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/proto/pbpeering"
)

Expand Down Expand Up @@ -57,9 +59,38 @@ func (b *PeeringBackend) GetAgentCACertificates() ([]string, error) {
return b.srv.tlsConfigurator.GRPCManualCAPems(), nil
}

// GetServerAddresses looks up server node addresses from the state store.
// GetServerAddresses looks up server or mesh gateway addresses from the state store.
func (b *PeeringBackend) GetServerAddresses() ([]string, error) {
state := b.srv.fsm.State()
_, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
if err != nil {
return nil, fmt.Errorf("failed to read mesh config entry: %w", err)
}

meshConfig, ok := rawEntry.(*structs.MeshConfigEntry)
if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways {
return meshGatewayAdresses(b.srv.fsm.State())
}
return serverAddresses(b.srv.fsm.State())
}

func meshGatewayAdresses(state *state.Store) ([]string, error) {
_, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
if err != nil {
return nil, fmt.Errorf("failed to dump gateway addresses: %w", err)
}

var addrs []string
for _, node := range nodes {
_, addr, port := node.BestAddress(true)
addrs = append(addrs, ipaddr.FormatAddressPort(addr, port))
}
if len(addrs) == 0 {
return nil, fmt.Errorf("servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
}
return addrs, nil
}

func serverAddresses(state *state.Store) ([]string, error) {
_, nodes, err := state.ServiceNodes(nil, "consul", structs.DefaultEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
if err != nil {
return nil, err
Expand Down
89 changes: 87 additions & 2 deletions agent/consul/peering_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package consul

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"

Expand All @@ -17,7 +21,9 @@ import (
)

func TestPeeringBackend_ForwardToLeader(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("too slow for testing.Short")
}

_, conf1 := testServerConfig(t)
server1, err := newServer(t, conf1)
Expand Down Expand Up @@ -60,6 +66,83 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) {
})
}

func TestPeeringBackend_GetServerAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

_, cfg := testServerConfig(t)
cfg.GRPCTLSPort = freeport.GetOne(t)

srv, err := newServer(t, cfg)
require.NoError(t, err)
testrpc.WaitForLeader(t, srv.RPC, "dc1")

backend := NewPeeringBackend(srv)

testutil.RunStep(t, "peer to servers", func(t *testing.T) {
addrs, err := backend.GetServerAddresses()
require.NoError(t, err)

expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
require.Equal(t, []string{expect}, addrs)
})

testutil.RunStep(t, "existence of mesh config entry is not enough to peer through gateways", func(t *testing.T) {
mesh := structs.MeshConfigEntry{
// Enable unrelated config.
TransparentProxy: structs.TransparentProxyMeshConfig{
MeshDestinationsOnly: true,
},
}

require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
addrs, err := backend.GetServerAddresses()
require.NoError(t, err)

// Still expect server address because PeerThroughMeshGateways was not enabled.
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
require.Equal(t, []string{expect}, addrs)
})

testutil.RunStep(t, "cannot peer through gateways without registered gateways", func(t *testing.T) {
mesh := structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{PeerThroughMeshGateways: true},
}
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))

addrs, err := backend.GetServerAddresses()
require.Nil(t, addrs)
testutil.RequireErrorContains(t, err,
"servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
})

testutil.RunStep(t, "peer through mesh gateways", func(t *testing.T) {
reg := structs.RegisterRequest{
ID: types.NodeID("b5489ca9-f5e9-4dba-a779-61fec4e8e364"),
Node: "gw-node",
Address: "1.2.3.4",
TaggedAddresses: map[string]string{
structs.TaggedAddressWAN: "172.217.22.14",
},
Service: &structs.NodeService{
ID: "mesh-gateway",
Service: "mesh-gateway",
Kind: structs.ServiceKindMeshGateway,
Port: 443,
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWAN: {Address: "154.238.12.252", Port: 8443},
},
},
}
require.NoError(t, srv.fsm.State().EnsureRegistration(2, &reg))

addrs, err := backend.GetServerAddresses()
require.NoError(t, err)
require.Equal(t, []string{"154.238.12.252:8443"}, addrs)
})
}

func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
Expand All @@ -79,7 +162,9 @@ func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn,
}

func TestPeerStreamService_ForwardToLeader(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("too slow for testing.Short")
}

_, conf1 := testServerConfig(t)
server1, err := newServer(t, conf1)
Expand Down
1 change: 0 additions & 1 deletion agent/consul/servercert/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func (m *CertManager) watchServerToken(ctx context.Context) {

// Cancel existing the leaf cert watch and spin up new one any time the server token changes.
// The watch needs the current token as set by the leader since certificate signing requests go to the leader.
fmt.Println("canceling and resetting")
cancel()
notifyCtx, cancel = context.WithCancel(ctx)

Expand Down
1 change: 1 addition & 0 deletions agent/grpc-external/services/peerstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,6 @@ type StateStore interface {
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *acl.EnterpriseMeta) (uint64, structs.ConfigEntry, error)
AbandonCh() <-chan struct{}
}
3 changes: 0 additions & 3 deletions agent/grpc-external/services/peerstream/stream_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
continue
}

case strings.HasPrefix(update.CorrelationID, subMeshGateway):
// TODO(Peering): figure out how to sync this separately

case update.CorrelationID == subCARoot:
resp, err = makeCARootsResponse(update)
if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions agent/grpc-external/services/peerstream/subscription_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,25 @@ func (m *subscriptionManager) syncViaBlockingQuery(
ws.Add(store.AbandonCh())
ws.Add(ctx.Done())

if result, err := queryFn(ctx, store, ws); err != nil {
if result, err := queryFn(ctx, store, ws); err != nil && ctx.Err() == nil {
logger.Error("failed to sync from query", "error", err)

} else {
// Block for any changes to the state store.
updateCh <- cache.UpdateEvent{
CorrelationID: correlationID,
Result: result,
select {
case <-ctx.Done():
return
case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}:
}
ws.WatchCtx(ctx)
}

if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logger.Error("failed to wait before re-trying sync", "error", err)
// Block for any changes to the state store.
ws.WatchCtx(ctx)
}

select {
case <-ctx.Done():
err := waiter.Wait(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
default:
} else if err != nil {
logger.Error("failed to wait before re-trying sync", "error", err)
}
}
}
Loading

0 comments on commit 74b87d4

Please sign in to comment.