Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/cli/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/plugin"
instance_plugin "github.com/docker/infrakit/pkg/rpc/instance"
"github.com/docker/infrakit/pkg/spi/instance"
"github.com/spf13/cobra"
Expand All @@ -31,7 +32,7 @@ func instancePluginCommand(plugins func() discovery.Plugins) *cobra.Command {
return err
}

instancePlugin = instance_plugin.NewClient(endpoint.Address)
instancePlugin = instance_plugin.NewClient(plugin.Name(*name), endpoint.Address)

return nil
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/cli/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/launch"
"github.com/docker/infrakit/pkg/launch/os"
"github.com/docker/infrakit/pkg/plugin"
"github.com/docker/infrakit/pkg/template"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -96,16 +97,16 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
}

// now start all the plugins
for _, plugin := range args {
fmt.Println("Starting up", plugin)
for _, pluginToStart := range args {
fmt.Println("Starting up", pluginToStart)

wait.Add(1)

for _, ch := range input {

name := plugin
name := pluginToStart
ch <- launch.StartPlugin{
Plugin: name,
Plugin: plugin.Name(name),
Started: func(config *launch.Config) {
fmt.Println(name, "started.")
wait.Done()
Expand Down
3 changes: 2 additions & 1 deletion cmd/group/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/infrakit/pkg/cli"
"github.com/docker/infrakit/pkg/discovery"
"github.com/docker/infrakit/pkg/plugin"
"github.com/docker/infrakit/pkg/plugin/group"
flavor_client "github.com/docker/infrakit/pkg/rpc/flavor"
group_server "github.com/docker/infrakit/pkg/rpc/group"
Expand Down Expand Up @@ -40,7 +41,7 @@ func main() {
if err != nil {
return nil, err
}
return instance_client.NewClient(endpoint.Address), nil
return instance_client.NewClient(plugin.Name(n), endpoint.Address), nil
}

flavorPluginLookup := func(n string) (flavor.Plugin, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/discovery/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ type dirPluginDiscovery struct {

// Find returns a plugin by name
func (r *dirPluginDiscovery) Find(name string) (*plugin.Endpoint, error) {

lookup, _ := plugin.Name(name).GetLookupAndType()
plugins, err := r.List()
if err != nil {
return nil, err
}

p, exists := plugins[name]
p, exists := plugins[lookup]
if !exists {
return nil, fmt.Errorf("Plugin not found: %s", name)
return nil, fmt.Errorf("Plugin not found: %s (looked up using %s)", name, lookup)
}

return p, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

// Plugins provides access to plugin discovery.
type Plugins interface {
// Find looks up the plugin by name. The name can be of the form $lookup[/$subtype]. See GetLookupAndType().
Find(name string) (*plugin.Endpoint, error)
List() (map[string]*plugin.Endpoint, error)
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/launch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

log "github.com/Sirupsen/logrus"
"github.com/docker/infrakit/pkg/plugin"
)

var errNoConfig = errors.New("no-counfig")
Expand All @@ -21,7 +22,7 @@ type ExecRule struct {
type Rule struct {

// Plugin is the name of the plugin
Plugin string
Plugin plugin.Name

// Launch is the rule for starting / launching the plugin.
Launch ExecRule
Expand All @@ -31,7 +32,7 @@ type Rule struct {
// Monitor uses a launcher to actually start the process of the plugin.
type Monitor struct {
exec Exec
rules map[string]Rule
rules map[plugin.Name]Rule
startChan <-chan StartPlugin
inputChan chan<- StartPlugin
stop chan interface{}
Expand All @@ -41,7 +42,7 @@ type Monitor struct {
// NewMonitor returns a monitor that continuously watches for input
// requests and launches the process for the plugin, if not already running.
func NewMonitor(l Exec, rules []Rule) *Monitor {
m := map[string]Rule{}
m := map[plugin.Name]Rule{}
// index by name of plugin
for _, r := range rules {
if r.Launch.Exec == l.Name() {
Expand All @@ -56,7 +57,7 @@ func NewMonitor(l Exec, rules []Rule) *Monitor {

// StartPlugin is the command to start a plugin
type StartPlugin struct {
Plugin string
Plugin plugin.Name
Started func(*Config)
Error func(*Config, error)
}
Expand Down Expand Up @@ -97,7 +98,13 @@ func (m *Monitor) Start() (chan<- StartPlugin, error) {
return
}

// match first by full name of the form lookup/type -- 'specialization'
r, has := m.rules[req.Plugin]
if !has {
// match now by lookup only -- 'base class'
alternate, _ := req.Plugin.GetLookupAndType()
r, has = m.rules[plugin.Name(alternate)]
}
if !has {
log.Warningln("no plugin:", req)
req.reportError(r.Launch.Properties, errNoConfig)
Expand All @@ -109,7 +116,7 @@ func (m *Monitor) Start() (chan<- StartPlugin, error) {
*configCopy = *r.Launch.Properties
}

block, err := m.exec.Exec(r.Plugin, configCopy)
block, err := m.exec.Exec(r.Plugin.String(), configCopy)
if err != nil {
req.reportError(configCopy, err)
continue loop
Expand Down
50 changes: 50 additions & 0 deletions pkg/launch/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,53 @@ func TestMonitorLoopValidRule(t *testing.T) {

monitor.Stop()
}

func TestMonitorLoopRuleLookupBehavior(t *testing.T) {
raw := &Config{}
config := &testConfig{
Cmd: "hello",
Args: []string{"world", "hello"},
}

rawErr := raw.Marshal(config)
require.NoError(t, rawErr)
require.True(t, len([]byte(*raw)) > 0)

var receivedArgs *Config
rule := Rule{
Plugin: "hello",
Launch: ExecRule{
Exec: "test",
Properties: raw,
},
}
monitor := NewMonitor(&testLauncher{
name: "test",
t: t,
callback: func(c *Config) {
receivedArgs = c
},
}, []Rule{rule})

input, err := monitor.Start()
require.NoError(t, err)
require.NotNil(t, input)

started := make(chan interface{})
input <- StartPlugin{
Plugin: "hello",
Started: func(config *Config) {
close(started)
},
}

<-started

expected := &Config{}
err = expected.Marshal(config)
require.NoError(t, err)

require.Equal(t, *expected, *receivedArgs)

monitor.Stop()
}
25 changes: 25 additions & 0 deletions pkg/plugin/name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package plugin

import (
"strings"
)

// Name is a reference to the plugin. Places where it appears include JSON files as type of field `Plugin`.
type Name string

// GetLookupAndType returns the plugin name for lookup and sub-type supported by the plugin.
// The name follows a microformat of $plugin[/$subtype] where $plugin is used for the discovery / lookup by name.
// The $subtype is used for the Type parameter in the RPC requests.
// Example: instance-file/json means lookup socket file 'instance-file' and the type is 'json'.
func (r Name) GetLookupAndType() (string, string) {
name := string(r)
if first := strings.Index(name, "/"); first >= 0 {
return name[0:first], name[first+1:]
}
return name, ""
}

// String returns the string representation
func (r Name) String() string {
return string(r)
}
25 changes: 25 additions & 0 deletions pkg/plugin/name_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package plugin

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetLookupAndType(t *testing.T) {

ref := Name("instance-file")
lookup, instanceType := ref.GetLookupAndType()
require.Equal(t, "instance-file", lookup)
require.Equal(t, "", instanceType)

ref = Name("instance-file/json")
lookup, instanceType = ref.GetLookupAndType()
require.Equal(t, "instance-file", lookup)
require.Equal(t, "json", instanceType)

ref = Name("instance-file/text/html")
lookup, instanceType = ref.GetLookupAndType()
require.Equal(t, "instance-file", lookup)
require.Equal(t, "text/html", instanceType)
}
19 changes: 13 additions & 6 deletions pkg/rpc/instance/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,35 @@ package instance

import (
"encoding/json"

"github.com/docker/infrakit/pkg/plugin"
rpc_client "github.com/docker/infrakit/pkg/rpc/client"
"github.com/docker/infrakit/pkg/spi/instance"
)

// NewClient returns a plugin interface implementation connected to a plugin
func NewClient(socketPath string) instance.Plugin {
return &client{client: rpc_client.New(socketPath, instance.InterfaceSpec)}
func NewClient(name plugin.Name, socketPath string) instance.Plugin {
return &client{name: name, client: rpc_client.New(socketPath, instance.InterfaceSpec)}
}

type client struct {
name plugin.Name
client rpc_client.Client
}

// Validate performs local validation on a provision request.
func (c client) Validate(properties json.RawMessage) error {
req := ValidateRequest{Properties: &properties}
_, instanceType := c.name.GetLookupAndType()
req := ValidateRequest{Properties: &properties, Type: instanceType}
resp := ValidateResponse{}

return c.client.Call("Instance.Validate", req, &resp)
}

// Provision creates a new instance based on the spec.
func (c client) Provision(spec instance.Spec) (*instance.ID, error) {
req := ProvisionRequest{Spec: spec}
_, instanceType := c.name.GetLookupAndType()
req := ProvisionRequest{Spec: spec, Type: instanceType}
resp := ProvisionResponse{}

if err := c.client.Call("Instance.Provision", req, &resp); err != nil {
Expand All @@ -37,15 +42,17 @@ func (c client) Provision(spec instance.Spec) (*instance.ID, error) {

// Destroy terminates an existing instance.
func (c client) Destroy(instance instance.ID) error {
req := DestroyRequest{Instance: instance}
_, instanceType := c.name.GetLookupAndType()
req := DestroyRequest{Instance: instance, Type: instanceType}
resp := DestroyResponse{}

return c.client.Call("Instance.Destroy", req, &resp)
}

// DescribeInstances returns descriptions of all instances matching all of the provided tags.
func (c client) DescribeInstances(tags map[string]string) ([]instance.Description, error) {
req := DescribeInstancesRequest{Tags: tags}
_, instanceType := c.name.GetLookupAndType()
req := DescribeInstancesRequest{Tags: tags, Type: instanceType}
resp := DescribeInstancesResponse{}

err := c.client.Call("Instance.DescribeInstances", req, &resp)
Expand Down
Loading