Skip to content

Commit

Permalink
Add a Connection Broker for GRPC plugins (#52)
Browse files Browse the repository at this point in the history
* GRPC Mux Broker

* ensure each part of the broker is cleanned up

* Add tests

* Update the bidirectional example

* remove old example

* Ensure the accept and serve call closes if the server is closed

* Update comment

* Update the sendErr variable name
  • Loading branch information
briankassouf committed Jan 18, 2018
1 parent e37881a commit 4b3b291
Show file tree
Hide file tree
Showing 19 changed files with 1,750 additions and 40 deletions.
23 changes: 17 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,18 +707,29 @@ func (c *Client) Protocol() Protocol {
return c.protocol
}

func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
return func(_ string, _ time.Duration) (net.Conn, error) {
// Connect to the client
conn, err := net.Dial(addr.Network(), addr.String())
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}

return conn, nil
}
}

// dialer is compatible with grpc.WithDialer and creates the connection
// to the plugin.
func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
// Connect to the client
conn, err := net.Dial(c.address.Network(), c.address.String())
conn, err := netAddrDialer(c.address)("", timeout)
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}

// If we have a TLS config we wrap our connection. We only do this
// for net/rpc since gRPC uses its own mechanism for TLS.
Expand Down
48 changes: 48 additions & 0 deletions examples/bidirectional/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Counter Example

This example builds a simple key/counter store CLI where the mechanism
for storing and retrieving keys is pluggable. However, in this example we don't
trust the plugin to do the summation work. We use bi-directional plugins to
call back into the main proccess to do the sum of two numbers. To build this example:

```sh
# This builds the main CLI
$ go build -o counter

# This builds the plugin written in Go
$ go build -o counter-go-grpc ./plugin-go-grpc

# This tells the Counter binary to use the "counter-go-grpc" binary
$ export COUNTER_PLUGIN="./counter-go-grpc"

# Read and write
$ ./counter put hello 1
$ ./counter put hello 1

$ ./counter get hello
2
```

### Plugin: plugin-go-grpc

This plugin uses gRPC to serve a plugin that is written in Go:

```
# This builds the plugin written in Go
$ go build -o counter-go-grpc ./plugin-go-grpc
# This tells the KV binary to use the "kv-go-grpc" binary
$ export COUNTER_PLUGIN="./counter-go-grpc"
```

## Updating the Protocol

If you update the protocol buffers file, you can regenerate the file
using the following command from this directory. You do not need to run
this if you're just trying the example.

For Go:

```sh
$ protoc -I proto/ proto/kv.proto --go_out=plugins=grpc:proto/
```
81 changes: 81 additions & 0 deletions examples/bidirectional/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"strconv"

"github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-plugin/examples/grpc-bidirectional/shared"
)

type addHelper struct{}

func (*addHelper) Sum(a, b int64) (int64, error) {
return a + b, nil
}

func main() {
// We don't want to see the plugin logs.
log.SetOutput(ioutil.Discard)

// We're a host. Start by launching the plugin process.
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: shared.Handshake,
Plugins: shared.PluginMap,
Cmd: exec.Command("sh", "-c", os.Getenv("COUNTER_PLUGIN")),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC},
})
defer client.Kill()

// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}

// Request the plugin
raw, err := rpcClient.Dispense("counter")
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}

// We should have a Counter store now! This feels like a normal interface
// implementation but is in fact over an RPC connection.
counter := raw.(shared.Counter)

os.Args = os.Args[1:]
switch os.Args[0] {
case "get":
result, err := counter.Get(os.Args[1])
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}

fmt.Println(result)

case "put":
i, err := strconv.Atoi(os.Args[2])
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}

err = counter.Put(os.Args[1], int64(i), &addHelper{})
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}

default:
fmt.Println("Please only use 'get' or 'put'")
os.Exit(1)
}
}
61 changes: 61 additions & 0 deletions examples/bidirectional/plugin-go-grpc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"encoding/json"
"io/ioutil"

"github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-plugin/examples/grpc-bidirectional/shared"
)

// Here is a real implementation of KV that writes to a local file with
// the key name and the contents are the value of the key.
type Counter struct {
}

type data struct {
Value int64
}

func (k *Counter) Put(key string, value int64, a shared.AddHelper) error {
v, _ := k.Get(key)

r, err := a.Sum(v, value)
if err != nil {
return err
}

buf, err := json.Marshal(&data{r})
if err != nil {
return err
}

return ioutil.WriteFile("kv_"+key, buf, 0644)
}

func (k *Counter) Get(key string) (int64, error) {
dataRaw, err := ioutil.ReadFile("kv_" + key)
if err != nil {
return 0, err
}

data := &data{}
err = json.Unmarshal(dataRaw, data)
if err != nil {
return 0, err
}

return data.Value, nil
}

func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: shared.Handshake,
Plugins: map[string]plugin.Plugin{
"counter": &shared.CounterPlugin{Impl: &Counter{}},
},

// A non-nil value here enables gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
}
Loading

1 comment on commit 4b3b291

@srinarayanant
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @briankassouf ,
THe above commit is breaking our build and grpc connection failes as
Error: plugin "PY_PLUGIN" doesn't support gRPC

Opened an issue :
#57

Please let me know if you need more info.
Thanks,
Sri

Please sign in to comment.