Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: plugin multiplexing: handle plugin client cleanup #13896

Merged
merged 16 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion builtin/logical/database/path_config_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
if err != nil {
return logical.ErrorResponse("error creating database object: %s", err), nil
}
b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

initReq := v5.InitializeRequest{
Config: config.ConnectionDetails,
Expand All @@ -330,6 +329,8 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
}
config.ConnectionDetails = initResp.Config

b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

b.Lock()
defer b.Unlock()

Expand Down
26 changes: 0 additions & 26 deletions sdk/database/dbplugin/v5/grpc_database_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/sdk/database/dbplugin/v5/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// handshakeConfigs are used to just do a basic handshake between
Expand All @@ -18,8 +17,6 @@ var HandshakeConfig = plugin.HandshakeConfig{
MagicCookieValue: "926a0820-aea2-be28-51d6-83cdf00e8edb",
}

const multiplexingCtxKey string = "multiplex_id"

// Factory is the factory function to create a dbplugin Database.
type Factory func() (interface{}, error)

Expand Down Expand Up @@ -50,26 +47,3 @@ func (GRPCDatabasePlugin) GRPCClient(doneCtx context.Context, _ *plugin.GRPCBrok
}
return client, nil
}

type databaseClientConn struct {
*grpc.ClientConn
id string
}

var _ grpc.ClientConnInterface = &databaseClientConn{}

func (d *databaseClientConn) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
// Inject ID to the context
md := metadata.Pairs(multiplexingCtxKey, d.id)
idCtx := metadata.NewOutgoingContext(ctx, md)

return d.ClientConn.Invoke(idCtx, method, args, reply, opts...)
}

func (d *databaseClientConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Inject ID to the context
md := metadata.Pairs(multiplexingCtxKey, d.id)
idCtx := metadata.NewOutgoingContext(ctx, md)

return d.ClientConn.NewStream(idCtx, desc, method, opts...)
}
3 changes: 2 additions & 1 deletion sdk/database/dbplugin/v5/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/vault/sdk/database/dbplugin/v5/proto"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand All @@ -29,7 +30,7 @@ func getMultiplexIDFromContext(ctx context.Context) (string, error) {
return "", fmt.Errorf("missing plugin multiplexing metadata")
}

multiplexIDs := md[multiplexingCtxKey]
multiplexIDs := md[pluginutil.MultiplexingCtxKey]
if len(multiplexIDs) != 1 {
return "", fmt.Errorf("unexpected number of IDs in metadata: (%d)", len(multiplexIDs))
}
Expand Down
16 changes: 5 additions & 11 deletions sdk/database/dbplugin/v5/plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,13 @@ func NewPluginClient(ctx context.Context, sys pluginutil.RunnerUtil, pluginRunne
// We should have a database type now. This feels like a normal interface
// implementation but is in fact over an RPC connection.
var db Database
switch raw.(type) {
switch c := raw.(type) {
case gRPCClient:
// This is an abstraction leak from go-plugin but it is necessary in
// order to enable multiplexing on multiplexed plugins
c.client = proto.NewDatabaseClient(pluginClient.Conn())

gRPCClient := raw.(gRPCClient)

// Wrap clientConn with our implementation so that we can inject the
// ID into the context
cc := &databaseClientConn{
ClientConn: pluginClient.Conn(),
id: pluginClient.ID(),
}
gRPCClient.client = proto.NewDatabaseClient(cc)
db = gRPCClient
db = c
default:
return nil, errors.New("unsupported client type")
}
Expand Down
9 changes: 6 additions & 3 deletions sdk/helper/pluginutil/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ type LookRunnerUtil interface {
}

type Multiplexer interface {
ID() string
Conn() *grpc.ClientConn
MultiplexingSupport() bool
Conn() grpc.ClientConnInterface
MultiplexingSupport() (bool, error)

plugin.ClientProtocol
}

const MultiplexingCtxKey string = "multiplex_id"

// PluginRunner defines the metadata needed to run a plugin securely with
// go-plugin.
type PluginRunner struct {
Expand Down Expand Up @@ -98,6 +99,8 @@ func CtxCancelIfCanceled(f context.CancelFunc, ctxCanceler context.Context) chan
return quitCh
}

// MultiplexingSupport returns true if a plugin supports multiplexing.
// Currently this is hardcoded for database plugins.
func MultiplexingSupport(version int) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely need a Plugin Type arg in the future but I think it is ok to leave out for now.

return version == 6
}
2 changes: 2 additions & 0 deletions sdk/logical/system_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type SystemView interface {
// name. Returns a PluginRunner or an error if a plugin can not be found.
LookupPlugin(context.Context, string, consts.PluginType) (*pluginutil.PluginRunner, error)

// NewPluginClient returns a client for managing the lifecycle of plugin
// processes
NewPluginClient(ctx context.Context, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (pluginutil.Multiplexer, error)

// MlockEnabled returns the configuration setting for enabling mlock on
Expand Down
2 changes: 1 addition & 1 deletion vault/dynamic_system_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (d dynamicSystemView) ResponseWrapData(ctx context.Context, data map[string
}

func (d dynamicSystemView) NewPluginClient(ctx context.Context, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (pluginutil.Multiplexer, error) {
c, err := d.core.pluginCatalog.GetPluginClient(ctx, d, pluginRunner, config)
c, err := d.core.pluginCatalog.NewPluginClient(ctx, d, pluginRunner, config)
if err != nil {
return nil, err
}
Expand Down
Loading