Skip to content

Commit

Permalink
Support muxing gRPC broker connections over a single net.Conn (#288)
Browse files Browse the repository at this point in the history
* Support muxing gRPC broker connections over a single net.Conn, via ClientConfig.GRPCBrokerMultiplex
* upgrade yamux, fix yamux config, and go mod tidy -compat=1.17
* Check for multiplexing support in protocol negotiation if enabled
  • Loading branch information
tomhjp committed Nov 13, 2023
1 parent 017b758 commit d16cec3
Show file tree
Hide file tree
Showing 22 changed files with 1,037 additions and 196 deletions.
93 changes: 82 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/cmdrunner"
"github.com/hashicorp/go-plugin/internal/grpcmux"
"github.com/hashicorp/go-plugin/runner"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -63,6 +64,13 @@ var (
// ErrSecureConfigAndReattach is returned when both Reattach and
// SecureConfig are set.
ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")

// ErrGRPCBrokerMuxNotSupported is returned when the client requests
// multiplexing over the gRPC broker, but the plugin does not support the
// feature. In most cases, this should be resolvable by updating and
// rebuilding the plugin, or restarting the plugin with
// ClientConfig.GRPCBrokerMultiplex set to false.
ErrGRPCBrokerMuxNotSupported = errors.New("client requested gRPC broker multiplexing but plugin does not support the feature")
)

// Client handles the lifecycle of a plugin application. It launches
Expand Down Expand Up @@ -102,6 +110,9 @@ type Client struct {
processKilled bool

unixSocketCfg UnixSocketConfig

grpcMuxerOnce sync.Once
grpcMuxer *grpcmux.GRPCClientMuxer
}

// NegotiatedVersion returns the protocol version negotiated with the server.
Expand Down Expand Up @@ -237,6 +248,19 @@ type ClientConfig struct {
// protocol.
GRPCDialOptions []grpc.DialOption

// GRPCBrokerMultiplex turns on multiplexing for the gRPC broker. The gRPC
// broker will multiplex all brokered gRPC servers over the plugin's original
// listener socket instead of making a new listener for each server. The
// go-plugin library currently only includes a Go implementation for the
// server (i.e. plugin) side of gRPC broker multiplexing.
//
// Does not support reattaching.
//
// Multiplexed gRPC streams MUST be established sequentially, i.e. after
// calling AcceptAndServe from one side, wait for the other side to Dial
// before calling AcceptAndServe again.
GRPCBrokerMultiplex bool

// SkipHostEnv allows plugins to run without inheriting the parent process'
// environment variables.
SkipHostEnv bool
Expand Down Expand Up @@ -352,7 +376,7 @@ func CleanupClients() {
wg.Wait()
}

// Creates a new plugin client which manages the lifecycle of an external
// NewClient creates a new plugin client which manages the lifecycle of an external
// plugin and gets the address for the RPC connection.
//
// The client must be cleaned up at some point by calling Kill(). If
Expand All @@ -374,10 +398,10 @@ func NewClient(config *ClientConfig) (c *Client) {
}

if config.SyncStdout == nil {
config.SyncStdout = ioutil.Discard
config.SyncStdout = io.Discard
}
if config.SyncStderr == nil {
config.SyncStderr = ioutil.Discard
config.SyncStderr = io.Discard
}

if config.AllowedProtocols == nil {
Expand Down Expand Up @@ -572,6 +596,10 @@ func (c *Client) Start() (addr net.Addr, err error) {
if c.config.SecureConfig != nil && c.config.Reattach != nil {
return nil, ErrSecureConfigAndReattach
}

if c.config.GRPCBrokerMultiplex && c.config.Reattach != nil {
return nil, fmt.Errorf("gRPC broker multiplexing is not supported with Reattach config")
}
}

if c.config.Reattach != nil {
Expand Down Expand Up @@ -603,6 +631,9 @@ func (c *Client) Start() (addr net.Addr, err error) {
fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
}
if c.config.GRPCBrokerMultiplex {
env = append(env, fmt.Sprintf("%s=true", envMultiplexGRPC))
}

cmd := c.config.Cmd
if cmd == nil {
Expand Down Expand Up @@ -790,7 +821,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Trim the line and split by "|" in order to get the parts of
// the output.
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "|", 6)
parts := strings.Split(line, "|")
if len(parts) < 4 {
errText := fmt.Sprintf("Unrecognized remote plugin message: %s", line)
if !ok {
Expand Down Expand Up @@ -878,6 +909,18 @@ func (c *Client) Start() (addr net.Addr, err error) {
return nil, fmt.Errorf("error parsing server cert: %s", err)
}
}

if c.config.GRPCBrokerMultiplex && c.protocol == ProtocolGRPC {
if len(parts) <= 6 {
return nil, fmt.Errorf("%w; for Go plugins, you will need to update the "+
"github.com/hashicorp/go-plugin dependency and recompile", ErrGRPCBrokerMuxNotSupported)
}
if muxSupported, err := strconv.ParseBool(parts[6]); err != nil {
return nil, fmt.Errorf("error parsing %q as a boolean for gRPC broker multiplexing support", parts[6])
} else if !muxSupported {
return nil, ErrGRPCBrokerMuxNotSupported
}
}
}

c.address = addr
Expand Down Expand Up @@ -951,12 +994,11 @@ func (c *Client) reattach() (net.Addr, error) {

if c.config.Reattach.Test {
c.negotiatedVersion = c.config.Reattach.ProtocolVersion
}

// If we're in test mode, we do NOT set the process. This avoids the
// process being killed (the only purpose we have for c.process), since
// in test mode the process is responsible for exiting on its own.
if !c.config.Reattach.Test {
} else {
// If we're in test mode, we do NOT set the runner. This avoids the
// runner being killed (the only purpose we have for setting c.runner
// when reattaching), since in test mode the process is responsible for
// exiting on its own.
c.runner = r
}

Expand Down Expand Up @@ -1061,11 +1103,24 @@ func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error)
// dialer is compatible with grpc.WithDialer and creates the connection
// to the plugin.
func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
conn, err := netAddrDialer(c.address)("", timeout)
muxer, err := c.getGRPCMuxer(c.address)
if err != nil {
return nil, err
}

var conn net.Conn
if muxer.Enabled() {
conn, err = muxer.Dial()
if err != nil {
return nil, err
}
} else {
conn, err = netAddrDialer(c.address)("", timeout)
if err != nil {
return nil, err
}
}

// If we have a TLS config we wrap our connection. We only do this
// for net/rpc since gRPC uses its own mechanism for TLS.
if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil {
Expand All @@ -1075,6 +1130,22 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
return conn, nil
}

func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) {
if c.protocol != ProtocolGRPC || !c.config.GRPCBrokerMultiplex {
return nil, nil
}

var err error
c.grpcMuxerOnce.Do(func() {
c.grpcMuxer, err = grpcmux.NewGRPCClientMuxer(c.logger, addr)
})
if err != nil {
return nil, err
}

return c.grpcMuxer, nil
}

var stdErrBufferSize = 64 * 1024

func (c *Client) logStderr(name string, r io.Reader) {
Expand Down
42 changes: 32 additions & 10 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
Expand Down Expand Up @@ -65,10 +65,7 @@ func TestClient(t *testing.T) {
// This tests a bug where Kill would start
func TestClient_killStart(t *testing.T) {
// Create a temporary dir to store the result file
td, err := ioutil.TempDir("", "plugin")
if err != nil {
t.Fatalf("err: %s", err)
}
td := t.TempDir()
defer os.RemoveAll(td)

// Start the client
Expand Down Expand Up @@ -115,10 +112,7 @@ func TestClient_killStart(t *testing.T) {

func TestClient_testCleanup(t *testing.T) {
// Create a temporary dir to store the result file
td, err := ioutil.TempDir("", "plugin")
if err != nil {
t.Fatalf("err: %s", err)
}
td := t.TempDir()
defer os.RemoveAll(td)

// Create a path that the helper process will write on cleanup
Expand Down Expand Up @@ -825,7 +819,7 @@ func TestClient_textLogLevel(t *testing.T) {

func TestClient_Stdin(t *testing.T) {
// Overwrite stdin for this test with a temporary file
tf, err := ioutil.TempFile("", "terraform")
tf, err := os.CreateTemp("", "terraform")
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down Expand Up @@ -914,6 +908,34 @@ func TestClient_SkipHostEnv(t *testing.T) {
}
}

func TestClient_RequestGRPCMultiplexing_UnsupportedByPlugin(t *testing.T) {
for _, name := range []string{
"mux-grpc-with-old-plugin",
"mux-grpc-with-unsupported-plugin",
} {
t.Run(name, func(t *testing.T) {
process := helperProcess(name)
c := NewClient(&ClientConfig{
Cmd: process,
HandshakeConfig: testHandshake,
Plugins: testGRPCPluginMap,
AllowedProtocols: []Protocol{ProtocolGRPC},
GRPCBrokerMultiplex: true,
})
defer c.Kill()

_, err := c.Start()
if err == nil {
t.Fatal("expected error")
}

if !errors.Is(err, ErrGRPCBrokerMuxNotSupported) {
t.Fatalf("expected %s, but got %s", ErrGRPCBrokerMuxNotSupported, err)
}
})
}
}

func TestClient_SecureConfig(t *testing.T) {
// Test failure case
secureConfig := &SecureConfig{
Expand Down
2 changes: 2 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ const (
// EnvUnixSocketGroup specifies the owning, writable group to set for Unix
// sockets created by _plugins_. Does not affect client behavior.
EnvUnixSocketGroup = "PLUGIN_UNIX_SOCKET_GROUP"

envMultiplexGRPC = "PLUGIN_MULTIPLEX_GRPC"
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ go 1.17
require (
github.com/golang/protobuf v1.5.0
github.com/hashicorp/go-hclog v0.14.1
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb
github.com/hashicorp/yamux v0.1.1
github.com/jhump/protoreflect v1.15.1
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77
github.com/oklog/run v1.0.0
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743
)

require (
Expand All @@ -22,5 +23,4 @@ require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743 // indirect
)
Loading

0 comments on commit d16cec3

Please sign in to comment.