diff --git a/cmd/cli/instance.go b/cmd/cli/instance.go index b971e33bb..f53422fb8 100644 --- a/cmd/cli/instance.go +++ b/cmd/cli/instance.go @@ -10,6 +10,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin" instance_plugin "github.com/docker/infrakit/pkg/rpc/instance" "github.com/docker/infrakit/pkg/spi/instance" "github.com/spf13/cobra" @@ -31,7 +32,7 @@ func instancePluginCommand(plugins func() discovery.Plugins) *cobra.Command { return err } - instancePlugin = instance_plugin.NewClient(endpoint.Address) + instancePlugin = instance_plugin.NewClient(plugin.Name(*name), endpoint.Address) return nil } diff --git a/cmd/cli/plugin.go b/cmd/cli/plugin.go index ac322aa7d..bc3e3a30c 100644 --- a/cmd/cli/plugin.go +++ b/cmd/cli/plugin.go @@ -7,6 +7,7 @@ import ( "github.com/docker/infrakit/pkg/discovery" "github.com/docker/infrakit/pkg/launch" "github.com/docker/infrakit/pkg/launch/os" + "github.com/docker/infrakit/pkg/plugin" "github.com/docker/infrakit/pkg/template" "github.com/spf13/cobra" ) @@ -96,16 +97,16 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command { } // now start all the plugins - for _, plugin := range args { - fmt.Println("Starting up", plugin) + for _, pluginToStart := range args { + fmt.Println("Starting up", pluginToStart) wait.Add(1) for _, ch := range input { - name := plugin + name := pluginToStart ch <- launch.StartPlugin{ - Plugin: name, + Plugin: plugin.Name(name), Started: func(config *launch.Config) { fmt.Println(name, "started.") wait.Done() diff --git a/cmd/group/main.go b/cmd/group/main.go index 9811d1626..84fb978e7 100644 --- a/cmd/group/main.go +++ b/cmd/group/main.go @@ -7,6 +7,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/infrakit/pkg/cli" "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin" "github.com/docker/infrakit/pkg/plugin/group" flavor_client "github.com/docker/infrakit/pkg/rpc/flavor" group_server "github.com/docker/infrakit/pkg/rpc/group" @@ -40,7 +41,7 @@ func main() { if err != nil { return nil, err } - return instance_client.NewClient(endpoint.Address), nil + return instance_client.NewClient(plugin.Name(n), endpoint.Address), nil } flavorPluginLookup := func(n string) (flavor.Plugin, error) { diff --git a/pkg/discovery/dir.go b/pkg/discovery/dir.go index 1e865ed6b..ed1dafc7e 100644 --- a/pkg/discovery/dir.go +++ b/pkg/discovery/dir.go @@ -18,15 +18,15 @@ type dirPluginDiscovery struct { // Find returns a plugin by name func (r *dirPluginDiscovery) Find(name string) (*plugin.Endpoint, error) { - + lookup, _ := plugin.Name(name).GetLookupAndType() plugins, err := r.List() if err != nil { return nil, err } - p, exists := plugins[name] + p, exists := plugins[lookup] if !exists { - return nil, fmt.Errorf("Plugin not found: %s", name) + return nil, fmt.Errorf("Plugin not found: %s (looked up using %s)", name, lookup) } return p, nil diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 8973b00fc..85a7a4371 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -11,6 +11,7 @@ import ( // Plugins provides access to plugin discovery. type Plugins interface { + // Find looks up the plugin by name. The name can be of the form $lookup[/$subtype]. See GetLookupAndType(). Find(name string) (*plugin.Endpoint, error) List() (map[string]*plugin.Endpoint, error) } diff --git a/pkg/launch/monitor.go b/pkg/launch/monitor.go index 8af849ae4..913795ee8 100644 --- a/pkg/launch/monitor.go +++ b/pkg/launch/monitor.go @@ -5,6 +5,7 @@ import ( "sync" log "github.com/Sirupsen/logrus" + "github.com/docker/infrakit/pkg/plugin" ) var errNoConfig = errors.New("no-counfig") @@ -21,7 +22,7 @@ type ExecRule struct { type Rule struct { // Plugin is the name of the plugin - Plugin string + Plugin plugin.Name // Launch is the rule for starting / launching the plugin. Launch ExecRule @@ -31,7 +32,7 @@ type Rule struct { // Monitor uses a launcher to actually start the process of the plugin. type Monitor struct { exec Exec - rules map[string]Rule + rules map[plugin.Name]Rule startChan <-chan StartPlugin inputChan chan<- StartPlugin stop chan interface{} @@ -41,7 +42,7 @@ type Monitor struct { // NewMonitor returns a monitor that continuously watches for input // requests and launches the process for the plugin, if not already running. func NewMonitor(l Exec, rules []Rule) *Monitor { - m := map[string]Rule{} + m := map[plugin.Name]Rule{} // index by name of plugin for _, r := range rules { if r.Launch.Exec == l.Name() { @@ -56,7 +57,7 @@ func NewMonitor(l Exec, rules []Rule) *Monitor { // StartPlugin is the command to start a plugin type StartPlugin struct { - Plugin string + Plugin plugin.Name Started func(*Config) Error func(*Config, error) } @@ -97,7 +98,13 @@ func (m *Monitor) Start() (chan<- StartPlugin, error) { return } + // match first by full name of the form lookup/type -- 'specialization' r, has := m.rules[req.Plugin] + if !has { + // match now by lookup only -- 'base class' + alternate, _ := req.Plugin.GetLookupAndType() + r, has = m.rules[plugin.Name(alternate)] + } if !has { log.Warningln("no plugin:", req) req.reportError(r.Launch.Properties, errNoConfig) @@ -109,7 +116,7 @@ func (m *Monitor) Start() (chan<- StartPlugin, error) { *configCopy = *r.Launch.Properties } - block, err := m.exec.Exec(r.Plugin, configCopy) + block, err := m.exec.Exec(r.Plugin.String(), configCopy) if err != nil { req.reportError(configCopy, err) continue loop diff --git a/pkg/launch/monitor_test.go b/pkg/launch/monitor_test.go index 22971a2ab..65d295b9b 100644 --- a/pkg/launch/monitor_test.go +++ b/pkg/launch/monitor_test.go @@ -108,3 +108,53 @@ func TestMonitorLoopValidRule(t *testing.T) { monitor.Stop() } + +func TestMonitorLoopRuleLookupBehavior(t *testing.T) { + raw := &Config{} + config := &testConfig{ + Cmd: "hello", + Args: []string{"world", "hello"}, + } + + rawErr := raw.Marshal(config) + require.NoError(t, rawErr) + require.True(t, len([]byte(*raw)) > 0) + + var receivedArgs *Config + rule := Rule{ + Plugin: "hello", + Launch: ExecRule{ + Exec: "test", + Properties: raw, + }, + } + monitor := NewMonitor(&testLauncher{ + name: "test", + t: t, + callback: func(c *Config) { + receivedArgs = c + }, + }, []Rule{rule}) + + input, err := monitor.Start() + require.NoError(t, err) + require.NotNil(t, input) + + started := make(chan interface{}) + input <- StartPlugin{ + Plugin: "hello", + Started: func(config *Config) { + close(started) + }, + } + + <-started + + expected := &Config{} + err = expected.Marshal(config) + require.NoError(t, err) + + require.Equal(t, *expected, *receivedArgs) + + monitor.Stop() +} diff --git a/pkg/plugin/name.go b/pkg/plugin/name.go new file mode 100644 index 000000000..b8ef79a26 --- /dev/null +++ b/pkg/plugin/name.go @@ -0,0 +1,25 @@ +package plugin + +import ( + "strings" +) + +// Name is a reference to the plugin. Places where it appears include JSON files as type of field `Plugin`. +type Name string + +// GetLookupAndType returns the plugin name for lookup and sub-type supported by the plugin. +// The name follows a microformat of $plugin[/$subtype] where $plugin is used for the discovery / lookup by name. +// The $subtype is used for the Type parameter in the RPC requests. +// Example: instance-file/json means lookup socket file 'instance-file' and the type is 'json'. +func (r Name) GetLookupAndType() (string, string) { + name := string(r) + if first := strings.Index(name, "/"); first >= 0 { + return name[0:first], name[first+1:] + } + return name, "" +} + +// String returns the string representation +func (r Name) String() string { + return string(r) +} diff --git a/pkg/plugin/name_test.go b/pkg/plugin/name_test.go new file mode 100644 index 000000000..80078312f --- /dev/null +++ b/pkg/plugin/name_test.go @@ -0,0 +1,25 @@ +package plugin + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetLookupAndType(t *testing.T) { + + ref := Name("instance-file") + lookup, instanceType := ref.GetLookupAndType() + require.Equal(t, "instance-file", lookup) + require.Equal(t, "", instanceType) + + ref = Name("instance-file/json") + lookup, instanceType = ref.GetLookupAndType() + require.Equal(t, "instance-file", lookup) + require.Equal(t, "json", instanceType) + + ref = Name("instance-file/text/html") + lookup, instanceType = ref.GetLookupAndType() + require.Equal(t, "instance-file", lookup) + require.Equal(t, "text/html", instanceType) +} diff --git a/pkg/rpc/instance/client.go b/pkg/rpc/instance/client.go index 766a8141e..335896272 100644 --- a/pkg/rpc/instance/client.go +++ b/pkg/rpc/instance/client.go @@ -2,22 +2,26 @@ package instance import ( "encoding/json" + + "github.com/docker/infrakit/pkg/plugin" rpc_client "github.com/docker/infrakit/pkg/rpc/client" "github.com/docker/infrakit/pkg/spi/instance" ) // NewClient returns a plugin interface implementation connected to a plugin -func NewClient(socketPath string) instance.Plugin { - return &client{client: rpc_client.New(socketPath, instance.InterfaceSpec)} +func NewClient(name plugin.Name, socketPath string) instance.Plugin { + return &client{name: name, client: rpc_client.New(socketPath, instance.InterfaceSpec)} } type client struct { + name plugin.Name client rpc_client.Client } // Validate performs local validation on a provision request. func (c client) Validate(properties json.RawMessage) error { - req := ValidateRequest{Properties: &properties} + _, instanceType := c.name.GetLookupAndType() + req := ValidateRequest{Properties: &properties, Type: instanceType} resp := ValidateResponse{} return c.client.Call("Instance.Validate", req, &resp) @@ -25,7 +29,8 @@ func (c client) Validate(properties json.RawMessage) error { // Provision creates a new instance based on the spec. func (c client) Provision(spec instance.Spec) (*instance.ID, error) { - req := ProvisionRequest{Spec: spec} + _, instanceType := c.name.GetLookupAndType() + req := ProvisionRequest{Spec: spec, Type: instanceType} resp := ProvisionResponse{} if err := c.client.Call("Instance.Provision", req, &resp); err != nil { @@ -37,7 +42,8 @@ func (c client) Provision(spec instance.Spec) (*instance.ID, error) { // Destroy terminates an existing instance. func (c client) Destroy(instance instance.ID) error { - req := DestroyRequest{Instance: instance} + _, instanceType := c.name.GetLookupAndType() + req := DestroyRequest{Instance: instance, Type: instanceType} resp := DestroyResponse{} return c.client.Call("Instance.Destroy", req, &resp) @@ -45,7 +51,8 @@ func (c client) Destroy(instance instance.ID) error { // DescribeInstances returns descriptions of all instances matching all of the provided tags. func (c client) DescribeInstances(tags map[string]string) ([]instance.Description, error) { - req := DescribeInstancesRequest{Tags: tags} + _, instanceType := c.name.GetLookupAndType() + req := DescribeInstancesRequest{Tags: tags, Type: instanceType} resp := DescribeInstancesResponse{} err := c.client.Call("Instance.DescribeInstances", req, &resp) diff --git a/pkg/rpc/instance/rpc_multi_test.go b/pkg/rpc/instance/rpc_multi_test.go new file mode 100644 index 000000000..4bb989978 --- /dev/null +++ b/pkg/rpc/instance/rpc_multi_test.go @@ -0,0 +1,269 @@ +package instance + +import ( + "encoding/json" + "errors" + "path/filepath" + "testing" + + "github.com/docker/infrakit/pkg/plugin" + rpc_server "github.com/docker/infrakit/pkg/rpc/server" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/stretchr/testify/require" +) + +func TestInstanceTypedPluginValidate(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + raw1 := json.RawMessage([]byte(`{"name":"instance","type":"xlarge1"}`)) + raw2 := json.RawMessage([]byte(`{"name":"instance","type":"xlarge2"}`)) + + rawActual1 := make(chan json.RawMessage, 1) + rawActual2 := make(chan json.RawMessage, 1) + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]instance.Plugin{ + "type1": &testPlugin{ + DoValidate: func(req json.RawMessage) error { + + rawActual1 <- req + + return nil + }, + }, + "type2": &testPlugin{ + DoValidate: func(req json.RawMessage) error { + + rawActual2 <- req + + return nil + }, + }, + })) + require.NoError(t, err) + + err = NewClient(plugin.Name(name+"/type1"), socketPath).Validate(raw1) + require.NoError(t, err) + + err = NewClient(plugin.Name(name+"/type2"), socketPath).Validate(raw2) + require.NoError(t, err) + + err = NewClient(plugin.Name(name+"/typeUnknown"), socketPath).Validate(raw2) + require.Error(t, err) + require.Equal(t, "no-plugin:typeUnknown", err.Error()) + + server.Stop() + + require.Equal(t, raw1, <-rawActual1) + require.Equal(t, raw2, <-rawActual2) +} + +func TestInstanceTypedPluginValidateError(t *testing.T) { + + socketPath := tempSocket() + name := filepath.Base(socketPath) + + raw1 := json.RawMessage([]byte(`{"name":"instance","type":"xlarge1"}`)) + rawActual1 := make(chan json.RawMessage, 1) + raw2 := json.RawMessage([]byte(`{"name":"instance","type":"xlarge2"}`)) + rawActual2 := make(chan json.RawMessage, 1) + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes(map[string]instance.Plugin{ + "type1": &testPlugin{ + DoValidate: func(req json.RawMessage) error { + + rawActual1 <- req + + return errors.New("whoops") + }, + }, + "type2": &testPlugin{ + DoValidate: func(req json.RawMessage) error { + + rawActual2 <- req + + return errors.New("whoops2") + }, + }, + })) + require.NoError(t, err) + + err = NewClient(plugin.Name(name+"/type1"), socketPath).Validate(raw1) + require.Error(t, err) + require.Equal(t, "whoops", err.Error()) + + err = NewClient(plugin.Name(name+"/type2"), socketPath).Validate(raw2) + require.Error(t, err) + require.Equal(t, "whoops2", err.Error()) + + server.Stop() + + require.Equal(t, raw1, <-rawActual1) + require.Equal(t, raw2, <-rawActual2) +} + +func TestInstanceTypedPluginProvision(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + raw1 := json.RawMessage([]byte(`{"test":"foo"}`)) + specActual1 := make(chan instance.Spec, 1) + raw2 := json.RawMessage([]byte(`{"test":"foo2"}`)) + specActual2 := make(chan instance.Spec, 1) + raw3 := json.RawMessage([]byte(`{"test":"foo3"}`)) + specActual3 := make(chan instance.Spec, 1) + spec1 := instance.Spec{ + Properties: &raw1, + } + spec2 := instance.Spec{ + Properties: &raw2, + } + spec3 := instance.Spec{ + Properties: &raw3, + } + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]instance.Plugin{ + "type1": &testPlugin{ + DoProvision: func(req instance.Spec) (*instance.ID, error) { + specActual1 <- req + return nil, nil + }, + }, + "type2": &testPlugin{ + DoProvision: func(req instance.Spec) (*instance.ID, error) { + specActual2 <- req + v := instance.ID("test") + return &v, nil + }, + }, + "error": &testPlugin{ + DoProvision: func(req instance.Spec) (*instance.ID, error) { + specActual3 <- req + return nil, errors.New("nope") + }, + }, + })) + require.NoError(t, err) + + var id *instance.ID + id, err = NewClient(plugin.Name(name+"/type1"), socketPath).Provision(spec1) + require.NoError(t, err) + require.Nil(t, id) + + id, err = NewClient(plugin.Name(name+"/type2"), socketPath).Provision(spec2) + require.NoError(t, err) + require.Equal(t, "test", string(*id)) + + _, err = NewClient(plugin.Name(name+"/error"), socketPath).Provision(spec3) + require.Error(t, err) + require.Equal(t, "nope", err.Error()) + + server.Stop() + + require.Equal(t, spec1, <-specActual1) + require.Equal(t, spec2, <-specActual2) + require.Equal(t, spec3, <-specActual3) +} + +func TestInstanceTypedPluginDestroy(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + inst1 := instance.ID("hello1") + instActual1 := make(chan instance.ID, 1) + inst2 := instance.ID("hello2") + instActual2 := make(chan instance.ID, 2) + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]instance.Plugin{ + "type1": &testPlugin{ + DoDestroy: func(req instance.ID) error { + instActual1 <- req + return nil + }, + }, + "type2": &testPlugin{ + DoDestroy: func(req instance.ID) error { + instActual2 <- req + return errors.New("can't do") + }, + }, + })) + require.NoError(t, err) + + err = NewClient(plugin.Name(name+"/type1"), socketPath).Destroy(inst1) + require.NoError(t, err) + + err = NewClient(plugin.Name(name+"/type2"), socketPath).Destroy(inst2) + require.Error(t, err) + require.Equal(t, "can't do", err.Error()) + + server.Stop() + + require.Equal(t, inst1, <-instActual1) + require.Equal(t, inst2, <-instActual2) +} + +func TestInstanceTypedPluginDescribeInstances(t *testing.T) { + socketPath := tempSocket() + name := filepath.Base(socketPath) + + tags1 := map[string]string{} + tagsActual1 := make(chan map[string]string, 1) + list1 := []instance.Description{ + {ID: instance.ID("boo1")}, {ID: instance.ID("boop")}, + } + tags2 := map[string]string{ + "foo": "bar", + } + tagsActual2 := make(chan map[string]string, 1) + list2 := []instance.Description{ + {ID: instance.ID("boo")}, {ID: instance.ID("boop2")}, + } + tags3 := map[string]string{ + "foo": "bar", + } + tagsActual3 := make(chan map[string]string, 1) + list3 := []instance.Description{ + {ID: instance.ID("boo3")}, {ID: instance.ID("boop")}, + } + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]instance.Plugin{ + "type1": &testPlugin{ + DoDescribeInstances: func(req map[string]string) ([]instance.Description, error) { + tagsActual1 <- req + return list1, nil + }, + }, + "type2": &testPlugin{ + DoDescribeInstances: func(req map[string]string) ([]instance.Description, error) { + tagsActual2 <- req + return list2, nil + }, + }, + "type3": &testPlugin{ + DoDescribeInstances: func(req map[string]string) ([]instance.Description, error) { + tagsActual3 <- req + return list3, errors.New("bad") + }, + }, + })) + + l, err := NewClient(plugin.Name(name+"/type1"), socketPath).DescribeInstances(tags1) + require.NoError(t, err) + require.Equal(t, list1, l) + + l, err = NewClient(plugin.Name(name+"/type2"), socketPath).DescribeInstances(tags2) + require.NoError(t, err) + require.Equal(t, list2, l) + + _, err = NewClient(plugin.Name(name+"/type3"), socketPath).DescribeInstances(tags3) + require.Error(t, err) + require.Equal(t, "bad", err.Error()) + + server.Stop() + require.Equal(t, tags1, <-tagsActual1) + require.Equal(t, tags2, <-tagsActual2) + require.Equal(t, tags3, <-tagsActual3) +} diff --git a/pkg/rpc/instance/rpc_test.go b/pkg/rpc/instance/rpc_test.go index 133906b00..b371be660 100644 --- a/pkg/rpc/instance/rpc_test.go +++ b/pkg/rpc/instance/rpc_test.go @@ -3,8 +3,10 @@ package instance import ( "encoding/json" "errors" + "path/filepath" "testing" + "github.com/docker/infrakit/pkg/plugin" rpc_server "github.com/docker/infrakit/pkg/rpc/server" "github.com/docker/infrakit/pkg/spi/instance" "github.com/stretchr/testify/require" @@ -49,8 +51,8 @@ func tempSocket() string { } func TestInstancePluginValidate(t *testing.T) { - socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) raw := json.RawMessage([]byte(`{"name":"instance","type":"xlarge"}`)) @@ -66,7 +68,7 @@ func TestInstancePluginValidate(t *testing.T) { })) require.NoError(t, err) - err = NewClient(socketPath).Validate(raw) + err = NewClient(name, socketPath).Validate(raw) require.NoError(t, err) server.Stop() @@ -77,6 +79,8 @@ func TestInstancePluginValidate(t *testing.T) { func TestInstancePluginValidateError(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) + raw := json.RawMessage([]byte(`{"name":"instance","type":"xlarge"}`)) rawActual := make(chan json.RawMessage, 1) @@ -90,7 +94,7 @@ func TestInstancePluginValidateError(t *testing.T) { })) require.NoError(t, err) - err = NewClient(socketPath).Validate(raw) + err = NewClient(name, socketPath).Validate(raw) require.Error(t, err) require.Equal(t, "whoops", err.Error()) @@ -100,6 +104,7 @@ func TestInstancePluginValidateError(t *testing.T) { func TestInstancePluginProvisionNil(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) raw := json.RawMessage([]byte(`{"test":"foo"}`)) specActual := make(chan instance.Spec, 1) @@ -115,7 +120,7 @@ func TestInstancePluginProvisionNil(t *testing.T) { require.NoError(t, err) var id *instance.ID - id, err = NewClient(socketPath).Provision(spec) + id, err = NewClient(name, socketPath).Provision(spec) require.NoError(t, err) require.Nil(t, id) @@ -126,6 +131,7 @@ func TestInstancePluginProvisionNil(t *testing.T) { func TestInstancePluginProvision(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) raw := json.RawMessage([]byte(`{"test":"foo"}`)) specActual := make(chan instance.Spec, 1) @@ -142,7 +148,7 @@ func TestInstancePluginProvision(t *testing.T) { require.NoError(t, err) var id *instance.ID - id, err = NewClient(socketPath).Provision(spec) + id, err = NewClient(name, socketPath).Provision(spec) require.NoError(t, err) require.Equal(t, "test", string(*id)) @@ -153,6 +159,7 @@ func TestInstancePluginProvision(t *testing.T) { func TestInstancePluginProvisionError(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) raw := json.RawMessage([]byte(`{"test":"foo"}`)) specActual := make(chan instance.Spec, 1) @@ -167,7 +174,7 @@ func TestInstancePluginProvisionError(t *testing.T) { })) require.NoError(t, err) - _, err = NewClient(socketPath).Provision(spec) + _, err = NewClient(name, socketPath).Provision(spec) require.Error(t, err) require.Equal(t, "nope", err.Error()) @@ -178,6 +185,7 @@ func TestInstancePluginProvisionError(t *testing.T) { func TestInstancePluginDestroy(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) inst := instance.ID("hello") instActual := make(chan instance.ID, 1) @@ -190,7 +198,7 @@ func TestInstancePluginDestroy(t *testing.T) { })) require.NoError(t, err) - err = NewClient(socketPath).Destroy(inst) + err = NewClient(name, socketPath).Destroy(inst) require.NoError(t, err) server.Stop() @@ -200,6 +208,7 @@ func TestInstancePluginDestroy(t *testing.T) { func TestInstancePluginDestroyError(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) inst := instance.ID("hello") instActual := make(chan instance.ID, 1) @@ -212,7 +221,7 @@ func TestInstancePluginDestroyError(t *testing.T) { })) require.NoError(t, err) - err = NewClient(socketPath).Destroy(inst) + err = NewClient(name, socketPath).Destroy(inst) require.Error(t, err) require.Equal(t, "can't do", err.Error()) @@ -222,6 +231,7 @@ func TestInstancePluginDestroyError(t *testing.T) { func TestInstancePluginDescribeInstancesNiInput(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) var tags map[string]string tagsActual := make(chan map[string]string, 1) @@ -235,7 +245,7 @@ func TestInstancePluginDescribeInstancesNiInput(t *testing.T) { }, })) - l, err := NewClient(socketPath).DescribeInstances(tags) + l, err := NewClient(name, socketPath).DescribeInstances(tags) require.NoError(t, err) require.Equal(t, list, l) @@ -245,6 +255,7 @@ func TestInstancePluginDescribeInstancesNiInput(t *testing.T) { func TestInstancePluginDescribeInstances(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) tags := map[string]string{ "foo": "bar", @@ -261,7 +272,7 @@ func TestInstancePluginDescribeInstances(t *testing.T) { })) require.NoError(t, err) - l, err := NewClient(socketPath).DescribeInstances(tags) + l, err := NewClient(name, socketPath).DescribeInstances(tags) require.NoError(t, err) require.Equal(t, list, l) @@ -271,6 +282,7 @@ func TestInstancePluginDescribeInstances(t *testing.T) { func TestInstancePluginDescribeInstancesError(t *testing.T) { socketPath := tempSocket() + name := plugin.Name(filepath.Base(socketPath)) tags := map[string]string{ "foo": "bar", @@ -287,7 +299,7 @@ func TestInstancePluginDescribeInstancesError(t *testing.T) { })) require.NoError(t, err) - _, err = NewClient(socketPath).DescribeInstances(tags) + _, err = NewClient(name, socketPath).DescribeInstances(tags) require.Error(t, err) require.Equal(t, "bad", err.Error()) diff --git a/pkg/rpc/instance/service.go b/pkg/rpc/instance/service.go index 197c73786..1dcadfa8d 100644 --- a/pkg/rpc/instance/service.go +++ b/pkg/rpc/instance/service.go @@ -2,6 +2,7 @@ package instance import ( "errors" + "fmt" "net/http" "github.com/docker/infrakit/pkg/spi" @@ -10,17 +11,29 @@ import ( // PluginServer returns a RPCService that conforms to the net/rpc rpc call convention. func PluginServer(p instance.Plugin) *Instance { - return &Instance{plugin: p} + return &Instance{plugin: p, typedPlugins: map[string]instance.Plugin{}} +} + +// PluginServerWithTypes which supports multiple types of instance plugins. The de-multiplexing +// is done by the server's RPC method implementations. +func PluginServerWithTypes(typed map[string]instance.Plugin) *Instance { + return &Instance{typedPlugins: typed} } // Instance is the JSON RPC service representing the Instance Plugin. It must be exported in order to be // registered by the rpc server package. type Instance struct { - plugin instance.Plugin + plugin instance.Plugin // the default plugin + typedPlugins map[string]instance.Plugin // by type, as qualified in the name of the plugin } // VendorInfo returns a metadata object about the plugin, if the plugin implements it. func (p *Instance) VendorInfo() *spi.VendorInfo { + // TODO(chungers) - support typed plugins + if p.plugin == nil { + return nil + } + if m, is := p.plugin.(spi.Vendor); is { return m.VendorInfo() } @@ -29,6 +42,11 @@ func (p *Instance) VendorInfo() *spi.VendorInfo { // SetExampleProperties sets the rpc request with any example properties/ custom type func (p *Instance) SetExampleProperties(request interface{}) { + // TODO(chungers) - support typed plugins + if p.plugin == nil { + return + } + i, is := p.plugin.(spi.InputExample) if !is { return @@ -51,13 +69,28 @@ func (p *Instance) ImplementedInterface() spi.InterfaceSpec { return instance.InterfaceSpec } +func (p *Instance) getPlugin(instanceType string) instance.Plugin { + if instanceType == "" { + return p.plugin + } + if p, has := p.typedPlugins[instanceType]; has { + return p + } + return nil +} + // Validate performs local validation on a provision request. func (p *Instance) Validate(_ *http.Request, req *ValidateRequest, resp *ValidateResponse) error { if req.Properties == nil { return errors.New("Request Properties must be set") } - err := p.plugin.Validate(*req.Properties) + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + resp.Type = req.Type + err := c.Validate(*req.Properties) if err != nil { return err } @@ -67,7 +100,12 @@ func (p *Instance) Validate(_ *http.Request, req *ValidateRequest, resp *Validat // Provision creates a new instance based on the spec. func (p *Instance) Provision(_ *http.Request, req *ProvisionRequest, resp *ProvisionResponse) error { - id, err := p.plugin.Provision(req.Spec) + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + id, err := c.Provision(req.Spec) if err != nil { return err } @@ -77,7 +115,12 @@ func (p *Instance) Provision(_ *http.Request, req *ProvisionRequest, resp *Provi // Destroy terminates an existing instance. func (p *Instance) Destroy(_ *http.Request, req *DestroyRequest, resp *DestroyResponse) error { - err := p.plugin.Destroy(req.Instance) + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + err := c.Destroy(req.Instance) if err != nil { return err } @@ -87,7 +130,12 @@ func (p *Instance) Destroy(_ *http.Request, req *DestroyRequest, resp *DestroyRe // DescribeInstances returns descriptions of all instances matching all of the provided tags. func (p *Instance) DescribeInstances(_ *http.Request, req *DescribeInstancesRequest, resp *DescribeInstancesResponse) error { - desc, err := p.plugin.DescribeInstances(req.Tags) + resp.Type = req.Type + c := p.getPlugin(req.Type) + if c == nil { + return fmt.Errorf("no-plugin:%s", req.Type) + } + desc, err := c.DescribeInstances(req.Tags) if err != nil { return err } diff --git a/pkg/rpc/instance/types.go b/pkg/rpc/instance/types.go index 3077cdbee..8c445f87e 100644 --- a/pkg/rpc/instance/types.go +++ b/pkg/rpc/instance/types.go @@ -8,40 +8,48 @@ import ( // ValidateRequest is the rpc wrapper for the Validate method args type ValidateRequest struct { + Type string Properties *json.RawMessage } // ValidateResponse is the rpc wrapper for the Validate response values type ValidateResponse struct { - OK bool + Type string + OK bool } // ProvisionRequest is the rpc wrapper for Provision request type ProvisionRequest struct { + Type string Spec instance.Spec } // ProvisionResponse is the rpc wrapper for Provision response type ProvisionResponse struct { - ID *instance.ID + Type string + ID *instance.ID } // DestroyRequest is the rpc wrapper for Destroy request type DestroyRequest struct { + Type string Instance instance.ID } // DestroyResponse is the rpc wrapper for Destroy response type DestroyResponse struct { - OK bool + Type string + OK bool } // DescribeInstancesRequest is the rpc wrapper for DescribeInstances request type DescribeInstancesRequest struct { + Type string Tags map[string]string } // DescribeInstancesResponse is the rpc wrapper for the DescribeInstances response type DescribeInstancesResponse struct { + Type string Descriptions []instance.Description } diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 90eb89395..49f8e39a7 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -10,6 +10,7 @@ import ( "time" plugin_mock "github.com/docker/infrakit/pkg/mock/spi/instance" + "github.com/docker/infrakit/pkg/plugin" plugin_rpc "github.com/docker/infrakit/pkg/rpc/instance" "github.com/docker/infrakit/pkg/spi/instance" "github.com/golang/mock/gomock" @@ -41,10 +42,11 @@ func TestUnixSocketServer(t *testing.T) { service := plugin_rpc.PluginServer(mock) socket := filepath.Join(os.TempDir(), fmt.Sprintf("%d.sock", time.Now().Unix())) + name := plugin.Name(filepath.Base(socket)) server, err := StartPluginAtPath(socket, service) require.NoError(t, err) - c := plugin_rpc.NewClient(socket) + c := plugin_rpc.NewClient(name, socket) err = c.Validate(properties) require.Error(t, err)