Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: plugin multiplexing: handle plugin client cleanup #13896

Merged
merged 16 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion builtin/logical/database/path_config_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
if err != nil {
return logical.ErrorResponse("error creating database object: %s", err), nil
}
b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

initReq := v5.InitializeRequest{
Config: config.ConnectionDetails,
Expand All @@ -330,6 +329,8 @@ func (b *databaseBackend) connectionWriteHandler() framework.OperationFunc {
}
config.ConnectionDetails = initResp.Config

b.Logger().Debug("created database object", "name", name, "plugin_name", config.PluginName)

b.Lock()
defer b.Unlock()

Expand Down
26 changes: 0 additions & 26 deletions sdk/database/dbplugin/v5/grpc_database_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/sdk/database/dbplugin/v5/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// handshakeConfigs are used to just do a basic handshake between
Expand All @@ -18,8 +17,6 @@ var HandshakeConfig = plugin.HandshakeConfig{
MagicCookieValue: "926a0820-aea2-be28-51d6-83cdf00e8edb",
}

const multiplexingCtxKey string = "multiplex_id"

// Factory is the factory function to create a dbplugin Database.
type Factory func() (interface{}, error)

Expand Down Expand Up @@ -50,26 +47,3 @@ func (GRPCDatabasePlugin) GRPCClient(doneCtx context.Context, _ *plugin.GRPCBrok
}
return client, nil
}

type databaseClientConn struct {
*grpc.ClientConn
id string
}

var _ grpc.ClientConnInterface = &databaseClientConn{}

func (d *databaseClientConn) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
// Inject ID to the context
md := metadata.Pairs(multiplexingCtxKey, d.id)
idCtx := metadata.NewOutgoingContext(ctx, md)

return d.ClientConn.Invoke(idCtx, method, args, reply, opts...)
}

func (d *databaseClientConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Inject ID to the context
md := metadata.Pairs(multiplexingCtxKey, d.id)
idCtx := metadata.NewOutgoingContext(ctx, md)

return d.ClientConn.NewStream(idCtx, desc, method, opts...)
}
3 changes: 2 additions & 1 deletion sdk/database/dbplugin/v5/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/vault/sdk/database/dbplugin/v5/proto"
"github.com/hashicorp/vault/sdk/helper/pluginutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand All @@ -29,7 +30,7 @@ func getMultiplexIDFromContext(ctx context.Context) (string, error) {
return "", fmt.Errorf("missing plugin multiplexing metadata")
}

multiplexIDs := md[multiplexingCtxKey]
multiplexIDs := md[pluginutil.MultiplexingCtxKey]
if len(multiplexIDs) != 1 {
return "", fmt.Errorf("unexpected number of IDs in metadata: (%d)", len(multiplexIDs))
}
Expand Down
22 changes: 8 additions & 14 deletions sdk/database/dbplugin/v5/plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type DatabasePluginClient struct {
client pluginutil.Multiplexer
client pluginutil.PluginInstance
Database
}

Expand All @@ -35,8 +35,8 @@ var PluginSets = map[int]plugin.PluginSet{

// NewPluginClient returns a databaseRPCClient with a connection to a running
// plugin.
func NewPluginClient(ctx context.Context, sys pluginutil.RunnerUtil, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (Database, error) {
pluginClient, err := sys.NewPluginClient(ctx, pluginRunner, config)
func NewPluginClient(ctx context.Context, sys pluginutil.RunnerUtil, config pluginutil.PluginClientConfig) (Database, error) {
pluginClient, err := sys.NewPluginClient(ctx, config)
if err != nil {
return nil, err
}
Expand All @@ -50,19 +50,13 @@ func NewPluginClient(ctx context.Context, sys pluginutil.RunnerUtil, pluginRunne
// We should have a database type now. This feels like a normal interface
// implementation but is in fact over an RPC connection.
var db Database
switch raw.(type) {
switch c := raw.(type) {
case gRPCClient:
// This is an abstraction leak from go-plugin but it is necessary in
// order to enable multiplexing on multiplexed plugins
c.client = proto.NewDatabaseClient(pluginClient.Conn())

gRPCClient := raw.(gRPCClient)

// Wrap clientConn with our implementation so that we can inject the
// ID into the context
cc := &databaseClientConn{
ClientConn: pluginClient.Conn(),
id: pluginClient.ID(),
}
gRPCClient.client = proto.NewDatabaseClient(cc)
db = gRPCClient
db = c
default:
return nil, errors.New("unsupported client type")
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/database/dbplugin/v5/plugin_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ func PluginFactory(ctx context.Context, pluginName string, sys pluginutil.LookRu
} else {
config := pluginutil.PluginClientConfig{
Name: pluginName,
PluginType: consts.PluginTypeDatabase,
PluginSets: PluginSets,
HandshakeConfig: HandshakeConfig,
Logger: namedLogger,
IsMetadataMode: false,
AutoMTLS: true,
}
// create a DatabasePluginClient instance
db, err = NewPluginClient(ctx, sys, pluginRunner, config)
db, err = NewPluginClient(ctx, sys, config)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions sdk/helper/pluginutil/run_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/version"
)

type PluginClientConfig struct {
Name string
PluginType consts.PluginType
PluginSets map[int]plugin.PluginSet
HandshakeConfig plugin.HandshakeConfig
Logger log.Logger
Expand Down
13 changes: 8 additions & 5 deletions sdk/helper/pluginutil/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Looker interface {
// configuration and wrapping data in a response wrapped token.
// logical.SystemView implementations satisfy this interface.
type RunnerUtil interface {
NewPluginClient(ctx context.Context, pluginRunner *PluginRunner, config PluginClientConfig) (Multiplexer, error)
NewPluginClient(ctx context.Context, config PluginClientConfig) (PluginInstance, error)
ResponseWrapData(ctx context.Context, data map[string]interface{}, ttl time.Duration, jwt bool) (*wrapping.ResponseWrapInfo, error)
MlockEnabled() bool
}
Expand All @@ -33,14 +33,15 @@ type LookRunnerUtil interface {
RunnerUtil
}

type Multiplexer interface {
ID() string
Conn() *grpc.ClientConn
MultiplexingSupport() bool
type PluginInstance interface {
Conn() grpc.ClientConnInterface
MultiplexingSupport() (bool, error)

plugin.ClientProtocol
}

const MultiplexingCtxKey string = "multiplex_id"

// PluginRunner defines the metadata needed to run a plugin securely with
// go-plugin.
type PluginRunner struct {
Expand Down Expand Up @@ -98,6 +99,8 @@ func CtxCancelIfCanceled(f context.CancelFunc, ctxCanceler context.Context) chan
return quitCh
}

// MultiplexingSupport returns true if a plugin supports multiplexing.
// Currently this is hardcoded for database plugins.
func MultiplexingSupport(version int) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely need a Plugin Type arg in the future but I think it is ok to leave out for now.

return version == 6
}
6 changes: 4 additions & 2 deletions sdk/logical/system_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ type SystemView interface {
// name. Returns a PluginRunner or an error if a plugin can not be found.
LookupPlugin(context.Context, string, consts.PluginType) (*pluginutil.PluginRunner, error)

NewPluginClient(ctx context.Context, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (pluginutil.Multiplexer, error)
// NewPluginClient returns a client for managing the lifecycle of plugin
// processes
NewPluginClient(ctx context.Context, config pluginutil.PluginClientConfig) (pluginutil.PluginInstance, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think whether we should be embedding pluginutil.RunnerUtil instead of duplicating the method signatures...

Copy link
Member

@calvn calvn Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more appropriate to name this NewPluginClientInstance now that the interface is renamed?


// MlockEnabled returns the configuration setting for enabling mlock on
// plugins.
Expand Down Expand Up @@ -154,7 +156,7 @@ func (d StaticSystemView) ReplicationState() consts.ReplicationState {
return d.ReplicationStateVal
}

func (d StaticSystemView) NewPluginClient(ctx context.Context, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (pluginutil.Multiplexer, error) {
func (d StaticSystemView) NewPluginClient(ctx context.Context, config pluginutil.PluginClientConfig) (pluginutil.PluginInstance, error) {
return nil, errors.New("NewPluginClient is not implemented in StaticSystemView")
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/plugin/grpc_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *gRPCSystemViewClient) ResponseWrapData(ctx context.Context, data map[st
return info, nil
}

func (s *gRPCSystemViewClient) NewPluginClient(ctx context.Context, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (pluginutil.Multiplexer, error) {
func (s *gRPCSystemViewClient) NewPluginClient(ctx context.Context, config pluginutil.PluginClientConfig) (pluginutil.PluginInstance, error) {
return nil, fmt.Errorf("cannot call NewPluginClient from a plugin backend")
}

Expand Down
2 changes: 1 addition & 1 deletion vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
if err := c.handleVersionTimeStamps(ctx); err != nil {
return err
}
if err := c.setupPluginCatalog(ctx); err != nil {
if err := c.setupPluginCatalog(ctx, c.enableMlock); err != nil {
return err
}
if err := c.loadMounts(ctx); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions vault/dynamic_system_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,15 @@ func (d dynamicSystemView) ResponseWrapData(ctx context.Context, data map[string
return resp.WrapInfo, nil
}

func (d dynamicSystemView) NewPluginClient(ctx context.Context, pluginRunner *pluginutil.PluginRunner, config pluginutil.PluginClientConfig) (pluginutil.Multiplexer, error) {
c, err := d.core.pluginCatalog.GetPluginClient(ctx, d, pluginRunner, config)
func (d dynamicSystemView) NewPluginClient(ctx context.Context, config pluginutil.PluginClientConfig) (pluginutil.PluginInstance, error) {
if d.core == nil {
return nil, fmt.Errorf("system view core is nil")
}
if d.core.pluginCatalog == nil {
return nil, fmt.Errorf("system view core plugin catalog is nil")
}

c, err := d.core.pluginCatalog.NewPluginClient(ctx, config)
if err != nil {
return nil, err
}
Expand Down
Loading