Skip to content

Commit

Permalink
rpc: remove GRPCDial() and disallow anonymous non-gossip connections
Browse files Browse the repository at this point in the history
The previous patch introduced node ID verification for GRPC
connections but preserved the `GRPCDial()` API, alongside
the ability to use node ID 0 with `GRPCDialNode()`, to signal
that node ID verification should be disabled.

Further examination revealed that this flexibility is 1) hard to
reason about and 2) unneeded.

So instead of keeping this option and then investing time into
producing tests for all the combinations of verifications protocols,
this patch "cuts the gordian knot" by removing this flexibility
altogether.

In summary:

- `GRPCDial()` is removed.
- `GRPCDialNode()` will call log.Fatal() if provided a 0 node ID.
- `GRPCGossipDial()` is introduced, with a clarification
  about its contract. I have audited the code to validate that
  this is indeed only used by gossip, and the CLI client commands
  that really don't care about the node ID.

Release note: None
  • Loading branch information
knz committed Apr 16, 2019
1 parent 68ba844 commit 25747e5
Show file tree
Hide file tree
Showing 17 changed files with 150 additions and 80 deletions.
5 changes: 4 additions & 1 deletion pkg/cli/debug_test.go
Expand Up @@ -252,7 +252,10 @@ func TestRemoveDeadReplicas(t *testing.T) {
tc := testcluster.StartTestCluster(t, 3, clusterArgs)
defer tc.Stopper().Stop(ctx)

grpcConn, err := tc.Server(0).RPCContext().GRPCDial(tc.Server(0).ServingAddr()).Connect(ctx)
grpcConn, err := tc.Server(0).RPCContext().GRPCDialNode(
tc.Server(0).ServingAddr(),
tc.Server(0).NodeID(),
).Connect(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/start.go
Expand Up @@ -1132,7 +1132,9 @@ func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, func(
stopper.Stop(ctx)
return nil, nil, nil, err
}
conn, err := rpcContext.GRPCDial(addr).Connect(ctx)
// We use GRPCGossipDial() here because it does not matter
// to which node we're talking to.
conn, err := rpcContext.GRPCGossipDial(addr).Connect(ctx)
if err != nil {
stopper.Stop(ctx)
return nil, nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/client.go
Expand Up @@ -109,7 +109,7 @@ func (c *client) startLocked(
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCDial(c.addr.String()).Connect(ctx)
conn, err := rpcCtx.GRPCGossipDial(c.addr.String()).Connect(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/gossip_test.go
Expand Up @@ -461,7 +461,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, local.GetNodeAddr(), makeMetrics())

testutils.SucceedsSoon(t, func() error {
conn, err := peer.rpcContext.GRPCDial(c.addr.String()).Connect(ctx)
conn, err := peer.rpcContext.GRPCGossipDial(c.addr.String()).Connect(ctx)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/send_test.go
Expand Up @@ -66,6 +66,12 @@ func TestSendToOneClient(t *testing.T) {
stopper,
&cluster.MakeTestingClusterSettings().Version,
)

// This test uses the testing function sendBatch() which does not
// support setting the node ID on GRPCDialNode(). Disable Node ID
// checks to avoid log.Fatal.
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

s := rpc.NewServer(rpcContext)
roachpb.RegisterInternalServer(s, Node(0))
ln, err := netutil.ListenAndServeGRPC(rpcContext.Stopper, s, util.TestAddr)
Expand Down Expand Up @@ -136,6 +142,10 @@ func TestComplexScenarios(t *testing.T) {
stopper,
&cluster.MakeTestingClusterSettings().Version,
)

// We're going to serve multiple node IDs with that one
// context. Disable node ID checks.
nodeContext.TestingAllowNamedRPCToAnonymousServer = true
nodeDialer := nodedialer.New(nodeContext, nil)

// TODO(bdarnell): the retryable flag is no longer used for RPC errors.
Expand Down
45 changes: 28 additions & 17 deletions pkg/rpc/context.go
Expand Up @@ -459,8 +459,10 @@ func (ctx *Context) GetStatsMap() *syncmap.Map {

// GetLocalInternalClientForAddr returns the context's internal batch client
// for target, if it exists.
func (ctx *Context) GetLocalInternalClientForAddr(target string) roachpb.InternalClient {
if target == ctx.AdvertiseAddr {
func (ctx *Context) GetLocalInternalClientForAddr(
target string, nodeID roachpb.NodeID,
) roachpb.InternalClient {
if target == ctx.AdvertiseAddr && nodeID == ctx.NodeID.Get() {
return ctx.localInternalClient
}
return nil
Expand Down Expand Up @@ -695,21 +697,28 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{
return conn, dialer.redialChan, err
}

// GRPCDial calls grpc.Dial with options appropriate for the context.
//
// It does not require validation of the node ID between client and server:
// if a connection existed already with some node ID requirement, that
// requirement will remain; if no connection existed yet,
// a new one is created without a node ID requirement.
func (ctx *Context) GRPCDial(target string) *Connection {
return ctx.GRPCDialNode(target, 0)
// GRPCGossipDial uses GRPCDialNode and disables validation of the
// node ID between client and server. This function should only be
// used with the gossip client and CLI commands which can talk to any
// node.
func (ctx *Context) GRPCGossipDial(target string) *Connection {
return ctx.grpcDialNodeInternal(target, 0)
}

// GRPCDialNode calls grpc.Dial with options appropriate for the context.
//
// The remoteNodeID, if non-zero, becomes a constraint on the expected
// node ID of the remote node; this is checked during heartbeats.
// The remoteNodeID becomes a constraint on the expected node ID of
// the remote node; this is checked during heartbeats. The caller is
// responsible for ensuring the remote node ID is known prior to using
// this function.
func (ctx *Context) GRPCDialNode(target string, remoteNodeID roachpb.NodeID) *Connection {
if remoteNodeID == 0 && !ctx.TestingAllowNamedRPCToAnonymousServer {
log.Fatalf(context.TODO(), "invalid node ID 0 in GRPCDialNode()")
}
return ctx.grpcDialNodeInternal(target, remoteNodeID)
}

func (ctx *Context) grpcDialNodeInternal(target string, remoteNodeID roachpb.NodeID) *Connection {
thisConnKey := connKey{target, remoteNodeID}
value, ok := ctx.conns.Load(thisConnKey)
if !ok {
Expand Down Expand Up @@ -765,7 +774,7 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
// the first heartbeat.
var ErrNotHeartbeated = errors.New("not yet heartbeated")

// ConnHealth returns nil if we have an open connection to the given
// TestingConnHealth returns nil if we have an open connection to the given
// target that succeeded on its most recent heartbeat. Otherwise, it
// kicks off a connection attempt (unless one is already in progress
// or we are in a backoff state) and returns an error (typically
Expand All @@ -776,13 +785,15 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated")
// "unhealthy" nodes.
//
// This is used in tests only; in clusters use (*Dialer).ConnHealth()
// instead which validates the node ID.
func (ctx *Context) ConnHealth(target string) error {
if ctx.GetLocalInternalClientForAddr(target) != nil {
// instead which automates the address resolution.
//
// TODO(knz): remove this altogether. Use the dialer in all cases.
func (ctx *Context) TestingConnHealth(target string, nodeID roachpb.NodeID) error {
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
// The local server is always considered healthy.
return nil
}
conn := ctx.GRPCDial(target)
conn := ctx.GRPCDialNode(target, nodeID)
return conn.Health()
}

Expand Down

0 comments on commit 25747e5

Please sign in to comment.