Skip to content

Commit

Permalink
Provide a context for managing the lifecycle of gRPC plugins (#55)
Browse files Browse the repository at this point in the history
* Provide a context for managing the lifecycle of gRPC plugins

* Fix tests

* Add context to the TestPluginGRPCConn function

* Add newline
  • Loading branch information
briankassouf authored Jan 18, 2018
1 parent 4b3b291 commit 485ef45
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 7 deletions.
13 changes: 12 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin

import (
"bufio"
"context"
"crypto/subtle"
"crypto/tls"
"errors"
Expand Down Expand Up @@ -79,6 +80,7 @@ type Client struct {
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
}

// ClientConfig is the configuration used to initialize a new
Expand Down Expand Up @@ -310,7 +312,7 @@ func (c *Client) Client() (ClientProtocol, error) {
c.client, err = newRPCClient(c)

case ProtocolGRPC:
c.client, err = newGRPCClient(c)
c.client, err = newGRPCClient(c.doneCtx, c)

default:
return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
Expand Down Expand Up @@ -423,6 +425,9 @@ func (c *Client) Start() (addr net.Addr, err error) {

// Create the logging channel for when we kill
c.doneLogging = make(chan struct{})
// Create a context for when we kill
var ctxCancel context.CancelFunc
c.doneCtx, ctxCancel = context.WithCancel(context.Background())

if c.config.Reattach != nil {
// Verify the process still exists. If not, then it is an error
Expand Down Expand Up @@ -457,6 +462,9 @@ func (c *Client) Start() (addr net.Addr, err error) {

// Close the logging channel since that doesn't work on reattach
close(c.doneLogging)

// Cancel the context
ctxCancel()
}(p.Pid)

// Set the address and process
Expand Down Expand Up @@ -535,6 +543,9 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Mark that we exited
close(exitCh)

// Cancel the context, marking that we exited
ctxCancel()

// Set that we exited, which takes a lock
c.l.Lock()
defer c.l.Unlock()
Expand Down
44 changes: 44 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,50 @@ func TestClient_testInterface(t *testing.T) {
}
}

func TestClient_grpc_servercrash(t *testing.T) {
process := helperProcess("test-grpc")
c := NewClient(&ClientConfig{
Cmd: process,
HandshakeConfig: testHandshake,
Plugins: testPluginMap,
AllowedProtocols: []Protocol{ProtocolGRPC},
})
defer c.Kill()

if _, err := c.Start(); err != nil {
t.Fatalf("err: %s", err)
}

if v := c.Protocol(); v != ProtocolGRPC {
t.Fatalf("bad: %s", v)
}

// Grab the RPC client
client, err := c.Client()
if err != nil {
t.Fatalf("err should be nil, got %s", err)
}

// Grab the impl
raw, err := client.Dispense("test")
if err != nil {
t.Fatalf("err should be nil, got %s", err)
}

_, ok := raw.(testInterface)
if !ok {
t.Fatalf("bad: %#v", raw)
}

c.process.Kill()

select {
case <-c.doneCtx.Done():
case <-time.After(time.Second * 2):
t.Fatal("Context was not closed")
}
}

func TestClient_grpc(t *testing.T) {
process := helperProcess("test-grpc")
c := NewClient(&ClientConfig{
Expand Down
8 changes: 5 additions & 3 deletions grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn,

// newGRPCClient creates a new GRPCClient. The Client argument is expected
// to be successfully started already with a lock held.
func newGRPCClient(c *Client) (*GRPCClient, error) {
func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
conn, err := dialGRPCConn(c.config.TLSConfig, c.dialer)
if err != nil {
return nil, err
Expand All @@ -61,6 +61,7 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
return &GRPCClient{
Conn: conn,
Plugins: c.config.Plugins,
doneCtx: doneCtx,
broker: broker,
}, nil
}
Expand All @@ -70,7 +71,8 @@ type GRPCClient struct {
Conn *grpc.ClientConn
Plugins map[string]Plugin

broker *GRPCBroker
doneCtx context.Context
broker *GRPCBroker
}

// ClientProtocol impl.
Expand All @@ -91,7 +93,7 @@ func (c *GRPCClient) Dispense(name string) (interface{}, error) {
return nil, fmt.Errorf("plugin %q doesn't support gRPC", name)
}

return p.GRPCClient(c.broker, c.Conn)
return p.GRPCClient(c.doneCtx, c.broker, c.Conn)
}

// ClientProtocol impl.
Expand Down
6 changes: 4 additions & 2 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package plugin

import (
"context"
"errors"
"net/rpc"

Expand Down Expand Up @@ -36,8 +37,9 @@ type GRPCPlugin interface {
GRPCServer(*GRPCBroker, *grpc.Server) error

// GRPCClient should return the interface implementation for the plugin
// you're serving via gRPC.
GRPCClient(*GRPCBroker, *grpc.ClientConn) (interface{}, error)
// you're serving via gRPC. The provided context will be canceled by
// go-plugin in the event of the plugin process exiting.
GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error)
}

// NetRPCUnsupportedPlugin implements Plugin but returns errors for the
Expand Down
2 changes: 1 addition & 1 deletion plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (p *testInterfacePlugin) GRPCServer(b *GRPCBroker, s *grpc.Server) error {
return nil
}

func (p *testInterfacePlugin) GRPCClient(b *GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
func (p *testInterfacePlugin) GRPCClient(doneCtx context.Context, b *GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &testGRPCClient{broker: b, Client: grpctest.NewTestClient(c)}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin

import (
"bytes"
"context"
"net"
"net/rpc"

Expand Down Expand Up @@ -119,6 +120,7 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
Conn: conn,
Plugins: ps,
broker: broker,
doneCtx: context.Background(),
}

return client, server
Expand Down

0 comments on commit 485ef45

Please sign in to comment.