Skip to content

Commit

Permalink
grpc: rename public/private directories to external/internal (#13721)
Browse files Browse the repository at this point in the history
Previously, public referred to gRPC services that are both exposed on
the dedicated gRPC port and have their definitions in the proto-public
directory (so were considered usable by 3rd parties). Whereas private
referred to services on the multiplexed server port that are only usable
by agents and other servers.

Now, we're splitting these definitions, such that external/internal
refers to the port and public/private refers to whether they can be used
by 3rd parties.

This is necessary because the peering replication API needs to be
exposed on the dedicated port, but is not (yet) suitable for use by 3rd
parties.
  • Loading branch information
boxofrad committed Jul 13, 2022
1 parent 30fffd0 commit b9e525d
Show file tree
Hide file tree
Showing 91 changed files with 267 additions and 242 deletions.
20 changes: 10 additions & 10 deletions agent/agent.go
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/dns"
publicgrpc "github.com/hashicorp/consul/agent/grpc/public"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
Expand Down Expand Up @@ -213,9 +213,9 @@ type Agent struct {
// depending on the configuration
delegate delegate

// publicGRPCServer is the gRPC server exposed on the dedicated gRPC port (as
// externalGRPCServer is the gRPC server exposed on the dedicated gRPC port (as
// opposed to the multiplexed "server" port).
publicGRPCServer *grpc.Server
externalGRPCServer *grpc.Server

// state stores a local representation of the node,
// services and checks. Used for anti-entropy.
Expand Down Expand Up @@ -539,7 +539,7 @@ func (a *Agent) Start(ctx context.Context) error {

// This needs to happen after the initial auto-config is loaded, because TLS
// can only be configured on the gRPC server at the point of creation.
a.buildPublicGRPCServer()
a.buildExternalGRPCServer()

if err := a.startLicenseManager(ctx); err != nil {
return err
Expand Down Expand Up @@ -578,7 +578,7 @@ func (a *Agent) Start(ctx context.Context) error {

// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.publicGRPCServer)
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
Expand Down Expand Up @@ -760,13 +760,13 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}

func (a *Agent) buildPublicGRPCServer() {
func (a *Agent) buildExternalGRPCServer() {
// TLS is only enabled on the gRPC server if there's an HTTPS port configured.
var tls *tlsutil.Configurator
if a.config.HTTPSPort > 0 {
tls = a.tlsConfigurator
}
a.publicGRPCServer = publicgrpc.NewServer(a.logger.Named("grpc.public"), tls)
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"), tls)
}

func (a *Agent) listenAndServeGRPC() error {
Expand Down Expand Up @@ -803,7 +803,7 @@ func (a *Agent) listenAndServeGRPC() error {
},
a,
)
a.xdsServer.Register(a.publicGRPCServer)
a.xdsServer.Register(a.externalGRPCServer)

ln, err := a.startListeners(a.config.GRPCAddrs)
if err != nil {
Expand All @@ -816,7 +816,7 @@ func (a *Agent) listenAndServeGRPC() error {
"address", innerL.Addr().String(),
"network", innerL.Addr().Network(),
)
err := a.publicGRPCServer.Serve(innerL)
err := a.externalGRPCServer.Serve(innerL)
if err != nil {
a.logger.Error("gRPC server failed", "error", err)
}
Expand Down Expand Up @@ -1494,7 +1494,7 @@ func (a *Agent) ShutdownAgent() error {
}

// Stop gRPC
a.publicGRPCServer.Stop()
a.externalGRPCServer.Stop()

// Stop the proxy config manager
if a.proxyConfig != nil {
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/client_test.go
Expand Up @@ -18,8 +18,8 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"

"github.com/hashicorp/consul/agent/consul/stream"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/middleware"
Expand Down
8 changes: 4 additions & 4 deletions agent/consul/grpc_integration_test.go
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/authmethod/testauth"
"github.com/hashicorp/consul/agent/grpc/public"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/proto-public/pbacl"
Expand All @@ -26,7 +26,7 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) {
// correctly wiring everything up in the server by:
//
// * Starting a cluster with multiple servers.
// * Making a request to a follower's public gRPC port.
// * Making a request to a follower's external gRPC port.
// * Ensuring that the request is correctly forwarded to the leader.
// * Ensuring we get a valid certificate back (so it went through the CAManager).
server1, conn1, _ := testGRPCIntegrationServer(t, func(c *Config) {
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestGRPCIntegration_ConnectCA_Sign(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

ctx = public.ContextWithToken(ctx, TestDefaultInitialManagementToken)
ctx = external.ContextWithToken(ctx, TestDefaultInitialManagementToken)

// This would fail if it wasn't forwarded to the leader.
rsp, err := client.Sign(ctx, &pbconnectca.SignRequest{
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestGRPCIntegration_ServerDiscovery_WatchServers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)

ctx = public.ContextWithToken(ctx, TestDefaultInitialManagementToken)
ctx = external.ContextWithToken(ctx, TestDefaultInitialManagementToken)

serverStream, err := client.WatchServers(ctx, &pbserverdiscovery.WatchServersRequest{Wan: false})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_peering.go
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/grpc/public/services/peerstream"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/logging"
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/peering_backend.go
Expand Up @@ -8,7 +8,7 @@ import (
"sync"

"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc/public/services/peerstream"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/rpc_test.go
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
agent_grpc "github.com/hashicorp/consul/agent/grpc/private"
agent_grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
Expand Down
65 changes: 33 additions & 32 deletions agent/consul/server.go
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul-net-rpc/net/rpc"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
Expand All @@ -30,6 +29,8 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"

"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
Expand All @@ -38,13 +39,13 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/wanfed"
agentgrpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/services/subscribe"
aclgrpc "github.com/hashicorp/consul/agent/grpc/public/services/acl"
"github.com/hashicorp/consul/agent/grpc/public/services/connectca"
"github.com/hashicorp/consul/agent/grpc/public/services/dataplane"
"github.com/hashicorp/consul/agent/grpc/public/services/peerstream"
"github.com/hashicorp/consul/agent/grpc/public/services/serverdiscovery"
aclgrpc "github.com/hashicorp/consul/agent/grpc-external/services/acl"
"github.com/hashicorp/consul/agent/grpc-external/services/connectca"
"github.com/hashicorp/consul/agent/grpc-external/services/dataplane"
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
"github.com/hashicorp/consul/agent/grpc-external/services/serverdiscovery"
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
Expand Down Expand Up @@ -241,19 +242,19 @@ type Server struct {
// is only ever closed.
leaveCh chan struct{}

// publicACLServer serves the ACL service exposed on the public gRPC port.
// It is also exposed on the private multiplexed "server" port to enable
// externalACLServer serves the ACL service exposed on the external gRPC port.
// It is also exposed on the internal multiplexed "server" port to enable
// RPC forwarding.
publicACLServer *aclgrpc.Server
externalACLServer *aclgrpc.Server

// publicConnectCAServer serves the Connect CA service exposed on the public
// gRPC port. It is also exposed on the private multiplexed "server" port to
// externalConnectCAServer serves the Connect CA service exposed on the external
// gRPC port. It is also exposed on the internal multiplexed "server" port to
// enable RPC forwarding.
publicConnectCAServer *connectca.Server
externalConnectCAServer *connectca.Server

// publicGRPCServer is the gRPC server exposed on the dedicated gRPC port, as
// externalGRPCServer is the gRPC server exposed on the dedicated gRPC port, as
// opposed to the multiplexed "server" port which is served by grpcHandler.
publicGRPCServer *grpc.Server
externalGRPCServer *grpc.Server

// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
Expand Down Expand Up @@ -363,7 +364,7 @@ type Server struct {
// this into the Deps struct and created it much earlier on.
publisher *stream.EventPublisher

// peeringBackend is shared between the public and private gRPC services for peering
// peeringBackend is shared between the external and internal gRPC services for peering
peeringBackend *PeeringBackend

// peerStreamServer is a server used to handle peering streams
Expand All @@ -383,7 +384,7 @@ type connHandler interface {

// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Server, error) {
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
Expand Down Expand Up @@ -429,7 +430,7 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
reconcileCh: make(chan serf.Member, reconcileChSize),
router: flat.Router,
tlsConfigurator: flat.TLSConfigurator,
publicGRPCServer: publicGRPCServer,
externalGRPCServer: externalGRPCServer,
reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
Expand Down Expand Up @@ -676,8 +677,8 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Initialize public gRPC server - register services on public gRPC server.
s.publicACLServer = aclgrpc.NewServer(aclgrpc.Config{
// Initialize external gRPC server - register services on external gRPC server.
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
return s.ForwardGRPC(s.grpcConnPool, info, fn)
Expand All @@ -693,9 +694,9 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
PrimaryDatacenter: s.config.PrimaryDatacenter,
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
})
s.publicACLServer.Register(s.publicGRPCServer)
s.externalACLServer.Register(s.externalGRPCServer)

s.publicConnectCAServer = connectca.NewServer(connectca.Config{
s.externalConnectCAServer = connectca.NewServer(connectca.Config{
Publisher: s.publisher,
GetStore: func() connectca.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.connect-ca"),
Expand All @@ -706,20 +707,20 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
},
ConnectEnabled: s.config.ConnectEnabled,
})
s.publicConnectCAServer.Register(s.publicGRPCServer)
s.externalConnectCAServer.Register(s.externalGRPCServer)

dataplane.NewServer(dataplane.Config{
GetStore: func() dataplane.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.dataplane"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
}).Register(s.publicGRPCServer)
}).Register(s.externalGRPCServer)

serverdiscovery.NewServer(serverdiscovery.Config{
Publisher: s.publisher,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.server-discovery"),
}).Register(s.publicGRPCServer)
}).Register(s.externalGRPCServer)

s.peerStreamTracker = peerstream.NewTracker()
s.peeringBackend = NewPeeringBackend(s)
Expand All @@ -732,11 +733,11 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
Datacenter: s.config.Datacenter,
ConnectEnabled: s.config.ConnectEnabled,
})
s.peerStreamServer.Register(s.publicGRPCServer)
s.peerStreamServer.Register(s.externalGRPCServer)

// Initialize private gRPC server.
// Initialize internal gRPC server.
//
// Note: some "public" gRPC services are also exposed on the private gRPC server
// Note: some "external" gRPC services are also exposed on the internal gRPC server
// to enable RPC forwarding.
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder
Expand Down Expand Up @@ -803,10 +804,10 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.peeringServer.Register(srv)
s.registerEnterpriseGRPCServices(deps, srv)

// Note: these public gRPC services are also exposed on the private server to
// Note: these external gRPC services are also exposed on the internal server to
// enable RPC forwarding.
s.publicACLServer.Register(srv)
s.publicConnectCAServer.Register(srv)
s.externalACLServer.Register(srv)
s.externalConnectCAServer.Register(srv)
}

return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register)
Expand Down
11 changes: 6 additions & 5 deletions agent/consul/server_test.go
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/armon/go-metrics"
"github.com/google/tcpproxy"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
Expand All @@ -23,6 +22,8 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"

"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/rpc/middleware"
Expand Down Expand Up @@ -241,14 +242,14 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
if srv.config.GRPCPort > 0 {
// Normally the gRPC server listener is created at the agent level and
// passed down into the Server creation.
publicGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)
externalGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)

ln, err := net.Listen("tcp", publicGRPCAddr)
ln, err := net.Listen("tcp", externalGRPCAddr)
require.NoError(t, err)
go func() {
_ = srv.publicGRPCServer.Serve(ln)
_ = srv.externalGRPCServer.Serve(ln)
}()
t.Cleanup(srv.publicGRPCServer.Stop)
t.Cleanup(srv.externalGRPCServer.Stop)
}

return dir, srv
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/subscribe_backend.go
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc/private/services/subscribe"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
"github.com/hashicorp/consul/agent/structs"
)

Expand Down
4 changes: 2 additions & 2 deletions agent/consul/subscribe_backend_test.go
Expand Up @@ -14,8 +14,8 @@ import (
"golang.org/x/sync/errgroup"
gogrpc "google.golang.org/grpc"

grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"
Expand Down
@@ -1,4 +1,4 @@
package public
package external

import (
"context"
Expand Down

0 comments on commit b9e525d

Please sign in to comment.