diff --git a/cmd/cli/info.go b/cmd/cli/info.go index 3bb5c0dc0..fb6636b0f 100644 --- a/cmd/cli/info.go +++ b/cmd/cli/info.go @@ -67,7 +67,7 @@ func infoCommand(plugins func() discovery.Plugins) *cobra.Command { return err } - view, err := renderer.AddDef("plugin", *name).Render(info) + view, err := renderer.Def("plugin", *name, "Plugin name").Render(info) if err != nil { return err } @@ -103,7 +103,7 @@ func infoCommand(plugins func() discovery.Plugins) *cobra.Command { return err } - view, err := renderer.AddDef("plugin", *name).Render(info) + view, err := renderer.Def("plugin", *name, "Plugin name").Render(info) if err != nil { return err } diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 05a40c430..0c8d6a142 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -43,7 +43,10 @@ func main() { cmd.AddCommand(infoCommand(f)) cmd.AddCommand(templateCommand(f)) cmd.AddCommand(managerCommand(f)) - cmd.AddCommand(pluginCommand(f), instancePluginCommand(f), groupPluginCommand(f), flavorPluginCommand(f)) + cmd.AddCommand(metadataCommand(f)) + cmd.AddCommand(pluginCommand(f)) + + cmd.AddCommand(instancePluginCommand(f), groupPluginCommand(f), flavorPluginCommand(f)) err := cmd.Execute() if err != nil { diff --git a/cmd/cli/metadata.go b/cmd/cli/metadata.go new file mode 100644 index 000000000..64ca80fcf --- /dev/null +++ b/cmd/cli/metadata.go @@ -0,0 +1,219 @@ +package main + +import ( + "fmt" + "strconv" + + log "github.com/Sirupsen/logrus" + "github.com/docker/infrakit/pkg/discovery" + metadata_plugin "github.com/docker/infrakit/pkg/plugin/metadata" + "github.com/docker/infrakit/pkg/rpc/client" + metadata_rpc "github.com/docker/infrakit/pkg/rpc/metadata" + "github.com/docker/infrakit/pkg/spi/metadata" + "github.com/spf13/cobra" +) + +func getPlugin(plugins func() discovery.Plugins, name string) (found metadata.Plugin, err error) { + err = forPlugin(plugins, func(n string, p metadata.Plugin) error { + if n == name { + found = p + } + return nil + }) + return +} + +func forPlugin(plugins func() discovery.Plugins, do func(string, metadata.Plugin) error) error { + all, err := plugins().List() + if err != nil { + return err + } + for name, endpoint := range all { + rpcClient, err := client.New(endpoint.Address, metadata.InterfaceSpec) + if err != nil { + continue + } + if err := do(name, metadata_rpc.Adapt(rpcClient)); err != nil { + return err + } + } + return nil +} + +func listAll(m metadata.Plugin, path metadata.Path) ([]metadata.Path, error) { + if m == nil { + return nil, fmt.Errorf("no plugin") + } + result := []metadata.Path{} + nodes, err := m.List(path) + if err != nil { + return nil, err + } + for _, n := range nodes { + c := path.Join(n) + more, err := listAll(m, c) + if err != nil { + return nil, err + } + if len(more) == 0 { + result = append(result, c) + } + for _, pp := range more { + result = append(result, pp) + } + } + return result, nil +} + +func metadataCommand(plugins func() discovery.Plugins) *cobra.Command { + + cmd := &cobra.Command{ + Use: "metadata", + Short: "Access metadata exposed by infrakit plugins", + } + + ls := &cobra.Command{ + Use: "ls", + Short: "List all metadata entries", + } + + long := ls.Flags().BoolP("long", "l", false, "Print full path") + all := ls.Flags().BoolP("all", "a", false, "Find all under the paths given") + + ls.RunE = func(c *cobra.Command, args []string) error { + paths := []string{"."} + + // All implies long + if *all { + *long = true + } + + if len(args) > 0 { + paths = args + } + + for _, p := range paths { + + if p == "/" { + // TODO(chungers) -- this is a 'local' infrakit ensemble. + // Absolute paths will come in a multi-cluster / federated model. + return fmt.Errorf("No absolute path") + } + + path := metadata_plugin.Path(p) + first := path.Index(0) + + targets := []string{} // target plugins to query + + // Check all the plugins -- scanning via discovery + if err := forPlugin(plugins, + func(name string, mp metadata.Plugin) error { + if p == "." || (first != nil && name == *first) { + targets = append(targets, name) + } + return nil + }); err != nil { + return err + } + + for _, target := range targets { + + nodes := []metadata.Path{} // the result set to print + + match, err := getPlugin(plugins, target) + if err != nil { + return err + } + + if p == "." { + if *all { + allPaths, err := listAll(match, path.Shift(1)) + if err != nil { + log.Warningln("Cannot metadata ls on plugin", target, "err=", err) + } + for _, c := range allPaths { + nodes = append(nodes, metadata_plugin.Path(target).Sub(c)) + } + } else { + for _, t := range targets { + nodes = append(nodes, metadata_plugin.Path(t)) + } + } + } else { + if *all { + allPaths, err := listAll(match, path.Shift(1)) + if err != nil { + log.Warningln("Cannot metadata ls on plugin", target, "err=", err) + } + for _, c := range allPaths { + nodes = append(nodes, metadata_plugin.Path(target).Sub(c)) + } + } else { + children, err := match.List(path.Shift(1)) + if err != nil { + log.Warningln("Cannot metadata ls on plugin", target, "err=", err) + } + for _, c := range children { + nodes = append(nodes, path.Join(c)) + } + } + } + + if len(targets) > 1 { + fmt.Printf("%s:\n", target) + } + if *long { + fmt.Printf("total %d:\n", len(nodes)) + } + for _, l := range nodes { + if *long { + fmt.Println(metadata_plugin.String(l)) + } else { + fmt.Println(metadata_plugin.String(l.Rel(path))) + } + } + fmt.Println() + } + + } + return nil + } + + cat := &cobra.Command{ + Use: "cat", + Short: "Get metadata entry by path", + RunE: func(c *cobra.Command, args []string) error { + + for _, p := range args { + + path := metadata_plugin.Path(p) + first := path.Index(0) + if first != nil { + match, err := getPlugin(plugins, *first) + if err != nil { + return err + } + + value, err := match.Get(path.Shift(1)) + if err == nil { + if value != nil { + str := value.String() + if s, err := strconv.Unquote(value.String()); err == nil { + str = s + } + fmt.Println(str) + } + + } else { + log.Warningln("Cannot metadata cat on plugin", *first, "err=", err) + } + } + } + return nil + }, + } + + cmd.AddCommand(ls, cat) + + return cmd +} diff --git a/cmd/cli/template.go b/cmd/cli/template.go index 882a2cf03..6e6034d37 100644 --- a/cmd/cli/template.go +++ b/cmd/cli/template.go @@ -5,6 +5,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/infrakit/pkg/discovery" + metadata_template "github.com/docker/infrakit/pkg/plugin/metadata/template" "github.com/docker/infrakit/pkg/template" "github.com/spf13/cobra" ) @@ -24,6 +25,21 @@ func templateCommand(plugins func() discovery.Plugins) *cobra.Command { if err != nil { return err } + + // Add functions + engine.WithFunctions(func() []template.Function { + return []template.Function{ + { + Name: "metadata", + Description: []string{ + "Metadata function takes a path of the form \"plugin_name/path/to/data\"", + "and calls GET on the plugin with the path \"path/to/data\".", + "It's identical to the CLI command infrakit metadata cat ...", + }, + Func: metadata_template.MetadataFunc(plugins), + }, + } + }) view, err := engine.Render(nil) if err != nil { return err diff --git a/examples/flavor/swarm/flavor.go b/examples/flavor/swarm/flavor.go index a9977f528..bbc603d4c 100644 --- a/examples/flavor/swarm/flavor.go +++ b/examples/flavor/swarm/flavor.go @@ -11,8 +11,10 @@ import ( "github.com/docker/docker/client" "github.com/docker/go-connections/tlsconfig" group_types "github.com/docker/infrakit/pkg/plugin/group/types" + metadata_plugin "github.com/docker/infrakit/pkg/plugin/metadata" "github.com/docker/infrakit/pkg/spi/flavor" "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/spi/metadata" "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" "github.com/docker/infrakit/pkg/util/docker" @@ -64,6 +66,52 @@ type baseFlavor struct { initScript *template.Template } +// List implements the metadata.Plugin SPI's List method +func (s *baseFlavor) List(path metadata.Path) ([]string, error) { + docker, err := s.getDockerClient(Spec{ + Docker: ConnectInfo{ + Host: "unix:///var/run/docker.sock", // defaults to local socket + }, + }) + if err != nil { + return nil, err + } + status, node, err := swarmState(docker) + if err != nil { + return nil, err + } + data := map[string]interface{}{ + "local": map[string]interface{}{ + "status": status, + "node": node, + }, + } + return metadata_plugin.List(path, data), nil +} + +// Get implements the metadata.Plugin SPI's List method +func (s *baseFlavor) Get(path metadata.Path) (*types.Any, error) { + docker, err := s.getDockerClient(Spec{ + Docker: ConnectInfo{ + Host: "unix:///var/run/docker.sock", // defaults to local socket + }, + }) + if err != nil { + return nil, err + } + status, node, err := swarmState(docker) + if err != nil { + return nil, err + } + data := map[string]interface{}{ + "local": map[string]interface{}{ + "status": status, + "node": node, + }, + } + return metadata_plugin.GetValue(path, data) +} + // Funcs implements the template.FunctionExporter interface that allows the RPC server to expose help on the // functions it exports func (s *baseFlavor) Funcs() []template.Function { diff --git a/examples/flavor/swarm/main.go b/examples/flavor/swarm/main.go index e6e8a4622..b3274ad1e 100644 --- a/examples/flavor/swarm/main.go +++ b/examples/flavor/swarm/main.go @@ -6,8 +6,11 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/infrakit/pkg/cli" "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin/metadata" flavor_plugin "github.com/docker/infrakit/pkg/rpc/flavor" - "github.com/docker/infrakit/pkg/spi/flavor" + metadata_plugin "github.com/docker/infrakit/pkg/rpc/metadata" + flavor_spi "github.com/docker/infrakit/pkg/spi/flavor" + metadata_spi "github.com/docker/infrakit/pkg/spi/metadata" "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/util/docker" "github.com/spf13/cobra" @@ -48,11 +51,28 @@ func main() { return err } - cli.RunPlugin(*name, flavor_plugin.PluginServerWithTypes( - map[string]flavor.Plugin{ - "manager": NewManagerFlavor(DockerClient, mt), - "worker": NewWorkerFlavor(DockerClient, wt), - })) + managerFlavor := NewManagerFlavor(DockerClient, mt) + workerFlavor := NewWorkerFlavor(DockerClient, wt) + + cli.RunPlugin(*name, + + // Metadata plugins + metadata_plugin.PluginServer(metadata.NewPluginFromData(map[string]interface{}{ + "version": cli.Version, + "revision": cli.Revision, + "implements": flavor_spi.InterfaceSpec, + })).WithTypes( + map[string]metadata_spi.Plugin{ + "manager": managerFlavor, + "worker": workerFlavor, + }), + + // Flavor plugins + flavor_plugin.PluginServerWithTypes( + map[string]flavor_spi.Plugin{ + "manager": managerFlavor, + "worker": workerFlavor, + })) return nil } diff --git a/examples/flavor/swarm/manager.go b/examples/flavor/swarm/manager.go index 58830cd15..b076d59b8 100644 --- a/examples/flavor/swarm/manager.go +++ b/examples/flavor/swarm/manager.go @@ -6,22 +6,23 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/client" group_types "github.com/docker/infrakit/pkg/plugin/group/types" - "github.com/docker/infrakit/pkg/spi/flavor" "github.com/docker/infrakit/pkg/spi/instance" "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" ) // NewManagerFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm. -func NewManagerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template) flavor.Plugin { - return &managerFlavor{&baseFlavor{initScript: templ, getDockerClient: connect}} +func NewManagerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template) *ManagerFlavor { + return &ManagerFlavor{&baseFlavor{initScript: templ, getDockerClient: connect}} } -type managerFlavor struct { +// ManagerFlavor is the flavor for swarm managers +type ManagerFlavor struct { *baseFlavor } -func (s *managerFlavor) Validate(flavorProperties *types.Any, allocation group_types.AllocationMethod) error { +// Validate checks whether the helper can support a configuration. +func (s *ManagerFlavor) Validate(flavorProperties *types.Any, allocation group_types.AllocationMethod) error { if err := s.baseFlavor.Validate(flavorProperties, allocation); err != nil { return err @@ -46,7 +47,7 @@ func (s *managerFlavor) Validate(flavorProperties *types.Any, allocation group_t } // Prepare sets up the provisioner / instance plugin's spec based on information about the swarm to join. -func (s *managerFlavor) Prepare(flavorProperties *types.Any, +func (s *ManagerFlavor) Prepare(flavorProperties *types.Any, instanceSpec instance.Spec, allocation group_types.AllocationMethod) (instance.Spec, error) { return s.baseFlavor.prepare("manager", flavorProperties, instanceSpec, allocation) } diff --git a/examples/flavor/swarm/worker.go b/examples/flavor/swarm/worker.go index 54b24f806..1ab37e1d4 100644 --- a/examples/flavor/swarm/worker.go +++ b/examples/flavor/swarm/worker.go @@ -2,15 +2,12 @@ package main import ( "fmt" - //"time" log "github.com/Sirupsen/logrus" docker_types "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" - //"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/client" group_types "github.com/docker/infrakit/pkg/plugin/group/types" - "github.com/docker/infrakit/pkg/spi/flavor" "github.com/docker/infrakit/pkg/spi/instance" "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" @@ -18,22 +15,23 @@ import ( ) // NewWorkerFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm. -func NewWorkerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template) flavor.Plugin { - return &workerFlavor{&baseFlavor{initScript: templ, getDockerClient: connect}} +func NewWorkerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template) *WorkerFlavor { + return &WorkerFlavor{&baseFlavor{initScript: templ, getDockerClient: connect}} } -type workerFlavor struct { +// WorkerFlavor implements the flavor and metadata plugins +type WorkerFlavor struct { *baseFlavor } // Prepare sets up the provisioner / instance plugin's spec based on information about the swarm to join. -func (s *workerFlavor) Prepare(flavorProperties *types.Any, instanceSpec instance.Spec, +func (s *WorkerFlavor) Prepare(flavorProperties *types.Any, instanceSpec instance.Spec, allocation group_types.AllocationMethod) (instance.Spec, error) { return s.baseFlavor.prepare("worker", flavorProperties, instanceSpec, allocation) } // Drain in the case of worker will force a node removal in the swarm. -func (s *workerFlavor) Drain(flavorProperties *types.Any, inst instance.Description) error { +func (s *WorkerFlavor) Drain(flavorProperties *types.Any, inst instance.Description) error { if flavorProperties == nil { return fmt.Errorf("missing config") } diff --git a/examples/instance/vagrant/main.go b/examples/instance/vagrant/main.go index d3804c9d2..f8f962ba0 100644 --- a/examples/instance/vagrant/main.go +++ b/examples/instance/vagrant/main.go @@ -8,7 +8,10 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/infrakit/pkg/cli" "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin/metadata" instance_plugin "github.com/docker/infrakit/pkg/rpc/instance" + metadata_plugin "github.com/docker/infrakit/pkg/rpc/metadata" + instance_spi "github.com/docker/infrakit/pkg/spi/instance" "github.com/docker/infrakit/pkg/template" "github.com/spf13/cobra" ) @@ -62,7 +65,16 @@ func main() { } cli.SetLogLevel(*logLevel) - cli.RunPlugin(*name, instance_plugin.PluginServer(NewVagrantPlugin(*dir, templ))) + cli.RunPlugin(*name, + instance_plugin.PluginServer(NewVagrantPlugin(*dir, templ)), + metadata_plugin.PluginServer(metadata.NewPluginFromData( + map[string]interface{}{ + "version": cli.Version, + "revision": cli.Revision, + "implements": instance_spi.InterfaceSpec, + }, + )), + ) return nil } diff --git a/pkg/plugin/metadata/path.go b/pkg/plugin/metadata/path.go new file mode 100644 index 000000000..4a2c5de84 --- /dev/null +++ b/pkg/plugin/metadata/path.go @@ -0,0 +1,30 @@ +package metadata + +import ( + "path/filepath" + "strings" + + "github.com/docker/infrakit/pkg/spi/metadata" +) + +// Path returns the path compoments of a / separated path +func Path(path string) metadata.Path { + return metadata.Path(strings.Split(filepath.Clean(path), "/")) +} + +// PathFromStrings returns the path from a list of strings +func PathFromStrings(a string, b ...string) metadata.Path { + if a != "" { + return metadata.Path(append([]string{a}, b...)) + } + return metadata.Path(b) +} + +// String returns the string representation of path +func String(p metadata.Path) string { + s := strings.Join([]string(p), "/") + if len(s) == 0 { + return "." + } + return s +} diff --git a/pkg/plugin/metadata/plugin.go b/pkg/plugin/metadata/plugin.go new file mode 100644 index 000000000..91bdb6ba3 --- /dev/null +++ b/pkg/plugin/metadata/plugin.go @@ -0,0 +1,90 @@ +package metadata + +import ( + log "github.com/Sirupsen/logrus" + "github.com/docker/infrakit/pkg/spi/metadata" + "github.com/docker/infrakit/pkg/types" +) + +// NewPluginFromData creates a plugin out of a simple data map. Note the updates to the map +// is not guarded and synchronized with the reads. +func NewPluginFromData(data map[string]interface{}) metadata.Plugin { + return &plugin{data: data} +} + +// NewPluginFromChannel returns a plugin implementation where reads and writes are serialized +// via channel of functions that have a view to the metadata. Closing the write channel stops +// the serialized read/writes and falls back to unserialized reads. +func NewPluginFromChannel(writes <-chan func(map[string]interface{})) metadata.Plugin { + + readChan := make(chan func(map[string]interface{})) + p := &plugin{reads: readChan} + + go func() { + + defer func() { + if r := recover(); r != nil { + log.Warningln("Plugin stopped:", r) + } + }() + + data := map[string]interface{}{} + for { + select { + case writer, open := <-writes: + if !open { + close(readChan) + p.reads = nil + return + } + writer(data) + + case reader := <-p.reads: + copy := data + reader(copy) + } + } + }() + return p +} + +type plugin struct { + data map[string]interface{} + reads chan func(data map[string]interface{}) +} + +// List returns a list of *child nodes* given a path, which is specified as a slice +// where for i > j path[i] is the parent of path[j] +func (p *plugin) List(path metadata.Path) ([]string, error) { + if p.reads == nil && p.data != nil { + return List(path, p.data), nil + } + + children := make(chan []string) + + p.reads <- func(data map[string]interface{}) { + children <- List(path, data) + return + } + + return <-children, nil +} + +// Get retrieves the value at path given. +func (p *plugin) Get(path metadata.Path) (*types.Any, error) { + if p.reads == nil && p.data != nil { + return types.AnyValue(Get(path, p.data)) + } + + value := make(chan *types.Any) + err := make(chan error) + + p.reads <- func(data map[string]interface{}) { + v, e := types.AnyValue(Get(path, data)) + value <- v + err <- e + return + } + + return <-value, <-err +} diff --git a/pkg/plugin/metadata/plugin_test.go b/pkg/plugin/metadata/plugin_test.go new file mode 100644 index 000000000..1521ab8c8 --- /dev/null +++ b/pkg/plugin/metadata/plugin_test.go @@ -0,0 +1,75 @@ +package metadata + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/docker/infrakit/pkg/types" + "github.com/stretchr/testify/require" +) + +func first(a, b interface{}) interface{} { + return a +} + +func TestPluginUnserializedReadWrites(t *testing.T) { + + m := map[string]interface{}{} + + require.True(t, Put(Path("us-west-1/metrics/instances/count"), 2000, m)) + require.True(t, Put(Path("us-west-2/metrics/instances/count"), 1000, m)) + + p := NewPluginFromData(m) + + require.Equal(t, []string{"us-west-1", "us-west-2"}, first(p.List(Path("/")))) + require.Nil(t, first(p.Get(Path("us-west-1/metrics/instances/foo")))) + require.Equal(t, "2000", first(p.Get(Path("us-west-1/metrics/instances/count"))).(*types.Any).String()) + +} + +func TestPluginSerializedReadWrites(t *testing.T) { + + c := make(chan func(map[string]interface{})) + p := NewPluginFromChannel(c) + + var wait sync.WaitGroup + + start := make(chan struct{}) + for i := range []int{0, 1, 2, 3} { + k := fmt.Sprintf("namespace/%d/value", i) + v := i * 100 + go func() { + <-start + c <- func(m map[string]interface{}) { + Put(Path(k), v, m) + wait.Add(1) + } + }() + } + + close(start) // start! + + time.Sleep(10 * time.Millisecond) + + results := []string{} + for i := range []int{0, 1, 2, 3} { + + k := fmt.Sprintf("namespace/%d/value", i) + val, err := p.Get(Path(k)) + require.NoError(t, err) + + if val != nil { + results = append(results, val.String()) + } + + wait.Done() + } + + close(c) + + wait.Wait() + + require.Equal(t, []string{"0", "100", "200", "300"}, results) +} diff --git a/pkg/plugin/metadata/reflect.go b/pkg/plugin/metadata/reflect.go new file mode 100644 index 000000000..51dd49fdc --- /dev/null +++ b/pkg/plugin/metadata/reflect.go @@ -0,0 +1,219 @@ +package metadata + +import ( + "fmt" + "reflect" + "regexp" + "sort" + "strconv" + "strings" + + "github.com/docker/infrakit/pkg/types" +) + +var ( + indexRoot = "\\[(([+|-]*[0-9]+)|((.*)=(.*)))\\]$" + arrayIndexExp = regexp.MustCompile("(.*)" + indexRoot) + indexExp = regexp.MustCompile("^" + indexRoot) +) + +// Put sets the attribute of an object at path to the given value +func Put(path []string, value interface{}, object map[string]interface{}) bool { + return put(path, value, object) +} + +// Get returns the attribute of the object at path +func Get(path []string, object interface{}) interface{} { + return get(path, object) +} + +// GetValue returns the attribute of the object at path, as serialized blob +func GetValue(path []string, object interface{}) (*types.Any, error) { + if any, is := object.(*types.Any); is { + return any, nil + } + return types.AnyValue(Get(path, object)) +} + +// List lists the members at the path +func List(path []string, object interface{}) []string { + list := []string{} + v := get(path, object) + if v == nil { + return list + } + if any, is := v.(*types.Any); is { + temp := map[string]interface{}{} + if err := any.Decode(&temp); err == nil { + if len(temp) > 0 { + return List([]string{"."}, temp) + } + return []string{} + } + return []string{} + } + + val := reflect.Indirect(reflect.ValueOf(v)) + switch val.Kind() { + case reflect.Slice: + // this is a slice, so return the name as '[%d]' + for i := 0; i < val.Len(); i++ { + list = append(list, fmt.Sprintf("[%d]", i)) //val.Index(i).String()) + } + + case reflect.Map: + for _, k := range val.MapKeys() { + list = append(list, k.String()) + } + case reflect.Struct: + vt := val.Type() + for i := 0; i < vt.NumField(); i++ { + if vt.Field(i).PkgPath == "" { + list = append(list, vt.Field(i).Name) + } + } + } + + sort.Strings(list) + return list +} + +func put(p []string, value interface{}, store map[string]interface{}) bool { + if len(p) == 0 { + return false + } + + key := p[0] + if key == "" { + return put(p[1:], value, store) + } + // check if key is an array index of the form <1>[<2>] + matches := arrayIndexExp.FindStringSubmatch(key) + if len(matches) > 2 && matches[1] != "" { + key = matches[1] + p = append([]string{key, fmt.Sprintf("[%s]", matches[2])}, p[1:]...) + return put(p, value, store) + } + + s := reflect.Indirect(reflect.ValueOf(store)) + switch s.Kind() { + case reflect.Slice: + return false // not supported + + case reflect.Map: + if reflect.TypeOf(p[0]).AssignableTo(s.Type().Key()) { + m := s.MapIndex(reflect.ValueOf(p[0])) + if !m.IsValid() { + m = reflect.ValueOf(map[string]interface{}{}) + s.SetMapIndex(reflect.ValueOf(p[0]), m) + } + if len(p) > 1 { + return put(p[1:], value, m.Interface().(map[string]interface{})) + } + s.SetMapIndex(reflect.ValueOf(p[0]), reflect.ValueOf(value)) + return true + } + } + return false +} + +func get(path []string, object interface{}) interface{} { + if len(path) == 0 { + return object + } + + key := path[0] + + switch key { + case ".": + return object + case "": + return get(path[1:], object) + } + + // check if key is an array index of the form <1>[<2>] + matches := arrayIndexExp.FindStringSubmatch(key) + if len(matches) > 2 && matches[1] != "" { + key = matches[1] + path = append([]string{key, fmt.Sprintf("[%s]", matches[2])}, path[1:]...) + return get(path, object) + } + + v := reflect.Indirect(reflect.ValueOf(object)) + switch v.Kind() { + case reflect.Slice: + i := 0 + matches = indexExp.FindStringSubmatch(key) + if len(matches) > 0 { + if matches[2] != "" { + // numeric index + if index, err := strconv.Atoi(matches[1]); err == nil { + switch { + case index >= 0 && v.Len() > index: + i = index + case index < 0 && v.Len() > -index: // negative index like python + i = v.Len() + index + } + } + return get(path[1:], v.Index(i).Interface()) + + } else if matches[3] != "" { + // equality search index for 'field=check' + lhs := matches[4] // supports another select expression for extracting deeply from the struct + rhs := matches[5] + // loop through the array looking for field that matches the check value + for j := 0; j < v.Len(); j++ { + if el := get(tokenize(lhs), v.Index(j).Interface()); el != nil { + if fmt.Sprintf("%v", el) == rhs { + return get(path[1:], v.Index(j).Interface()) + } + } + } + } + } + case reflect.Map: + value := v.MapIndex(reflect.ValueOf(key)) + if value.IsValid() { + return get(path[1:], value.Interface()) + } + case reflect.Struct: + fv := v.FieldByName(key) + if !fv.IsValid() { + return nil + } + if !fv.CanInterface() { + return nil + } + return get(path[1:], fv.Interface()) + } + return nil +} + +// With quoting to support azure rm type names: e.g. Microsoft.Network/virtualNetworks +// This will split a sting like /Resources/'Microsoft.Network/virtualNetworks'/managerSubnet/Name" into +// [ , Resources, Microsoft.Network/virtualNetworks, managerSubnet, Name] +func tokenize(s string) []string { + if len(s) == 0 { + return []string{} + } + + a := []string{} + start := 0 + quoted := false + for i := 0; i < len(s); i++ { + switch s[i] { + case '/': + if !quoted { + a = append(a, strings.Replace(s[start:i], "'", "", -1)) + start = i + 1 + } + case '\'': + quoted = !quoted + } + } + if start < len(s)-1 { + a = append(a, strings.Replace(s[start:], "'", "", -1)) + } + + return a +} diff --git a/pkg/plugin/metadata/reflect_test.go b/pkg/plugin/metadata/reflect_test.go new file mode 100644 index 000000000..854fc685b --- /dev/null +++ b/pkg/plugin/metadata/reflect_test.go @@ -0,0 +1,100 @@ +package metadata + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +func pretty(v interface{}) string { + buff, err := json.MarshalIndent(v, "", " ") + if err != nil { + panic(err) + } + return string(buff) +} + +func TestTokenizer(t *testing.T) { + require.Equal(t, []string{"", "foo"}, tokenize("/foo")) + require.Equal(t, []string{"", "foo", "bar", "baz"}, tokenize("/foo/bar/baz")) + require.Equal(t, []string{"foo", "bar", "baz"}, tokenize("foo/bar/baz")) + require.Equal(t, []string{"foo"}, tokenize("foo")) + + // with quoting to support azure rm type names: e.g. Microsoft.Network/virtualNetworks + require.Equal(t, []string{"", "foo"}, tokenize("/'fo'o")) + require.Equal(t, []string{"", "foo/bar", "baz"}, tokenize("/'foo/bar'/baz")) + require.Equal(t, []string{"foo", "bar/baz"}, tokenize("foo/'bar/baz'")) + require.Equal(t, []string{"foo"}, tokenize("'foo'")) +} + +func TestMap(t *testing.T) { + m := map[string]interface{}{} + require.True(t, put(Path("region/us-west-1/vpc/vpc1/network/network1/id"), "id-network1", m)) + require.True(t, put(Path("region/us-west-1/vpc/vpc1/network/network2/id"), "id-network2", m)) + require.True(t, put(Path("region/us-west-1/vpc/vpc1/network/network3/id"), "id-network3", m)) + require.True(t, put(Path("region/us-west-1/vpc/vpc2/network/network10/id"), "id-network10", m)) + require.True(t, put(Path("region/us-west-1/vpc/vpc2/network/network11/id"), "id-network11", m)) + require.True(t, put(Path("region/us-west-2/vpc/vpc21/network/network210/id"), "id-network210", m)) + require.True(t, put(Path("region/us-west-2/vpc/vpc21/network/network211/id"), "id-network211", m)) + require.True(t, put(Path("region/us-west-2/metrics/instances/count"), 100, m)) + + require.Equal(t, "id-network1", get(Path("region/us-west-1/vpc/vpc1/network/network1/id"), m)) + require.Equal(t, "id-network1", get(Path("region/us-west-1/vpc/vpc1/network/network1/id/"), m)) + require.Equal(t, map[string]interface{}{"id": "id-network1"}, + get(Path("region/us-west-1/vpc/vpc1/network/network1"), m)) + require.Equal(t, map[string]interface{}{ + "network1": map[string]interface{}{ + "id": "id-network1", + }, + "network2": map[string]interface{}{ + "id": "id-network2", + }, + "network3": map[string]interface{}{ + "id": "id-network3", + }, + }, get(Path("region/us-west-1/vpc/vpc1/network/"), m)) + + require.Equal(t, 100, get(Path("region/us-west-2/metrics/instances/count"), m)) + + require.Equal(t, []string{"region"}, List(Path("."), m)) + require.Equal(t, []string{}, List(Path("region/us-west-1/bogus"), m)) + require.Equal(t, []string{}, List(Path("region/us-west-1/vpc/vpc1/network/network1/id"), m)) + require.Equal(t, []string{"id"}, List(Path("region/us-west-1/vpc/vpc1/network/network1"), m)) + require.Equal(t, []string{"us-west-1", "us-west-2"}, List(Path("region/"), m)) + require.Equal(t, []string{"us-west-1", "us-west-2"}, List(Path("region"), m)) + require.Equal(t, []string{"network1", "network2", "network3"}, List(Path("region/us-west-1/vpc/vpc1/network/"), m)) + require.Equal(t, []string{}, List(Path("region/us-west-2/metrics/instances/count"), m)) + +} + +func TestGetFromStruct(t *testing.T) { + + type metric struct { + Name string + Value int + } + + type region struct { + Metrics map[string]metric + } + + m := map[string]region{ + "us-west-1": { + Metrics: map[string]metric{ + "instances": {Name: "instances", Value: 2000}, + "subnets": {Name: "subnets", Value: 20}, + }, + }, + "us-west-2": { + Metrics: map[string]metric{ + "instances": {Name: "instances", Value: 4000}, + "subnets": {Name: "subnets", Value: 40}, + }, + }, + } + + require.Equal(t, nil, Get(Path("us-west-1/Metrics/instances/Count"), m)) + require.Equal(t, 2000, Get(Path("us-west-1/Metrics/instances/Value"), m)) + +} diff --git a/pkg/plugin/metadata/template/template.go b/pkg/plugin/metadata/template/template.go new file mode 100644 index 000000000..7d395bcf3 --- /dev/null +++ b/pkg/plugin/metadata/template/template.go @@ -0,0 +1,40 @@ +package template + +import ( + "fmt" + + "github.com/docker/infrakit/pkg/discovery" + metadata_plugin "github.com/docker/infrakit/pkg/plugin/metadata" + "github.com/docker/infrakit/pkg/rpc/client" + metadata_rpc "github.com/docker/infrakit/pkg/rpc/metadata" + "github.com/docker/infrakit/pkg/spi/metadata" +) + +// MetadataFunc returns a template function to support metadata retrieval in templates. +func MetadataFunc(plugins func() discovery.Plugins) func(string) (interface{}, error) { + return func(path string) (interface{}, error) { + + mpath := metadata_plugin.Path(path) + first := mpath.Index(0) + if first == nil { + return nil, fmt.Errorf("unknown plugin from path: %s", path) + } + + lookup, err := plugins().List() + if err != nil { + return nil, err + } + + endpoint, has := lookup[*first] + if !has { + return nil, fmt.Errorf("plugin: %s not found", *first) + } + + rpcClient, err := client.New(endpoint.Address, metadata.InterfaceSpec) + if err != nil { + return nil, fmt.Errorf("cannot connect to plugin: %s", *first) + } + + return metadata_rpc.Adapt(rpcClient).Get(mpath.Shift(1)) + } +} diff --git a/pkg/rpc/metadata/client.go b/pkg/rpc/metadata/client.go new file mode 100644 index 000000000..4b77d0fd9 --- /dev/null +++ b/pkg/rpc/metadata/client.go @@ -0,0 +1,44 @@ +package metadata + +import ( + rpc_client "github.com/docker/infrakit/pkg/rpc/client" + "github.com/docker/infrakit/pkg/spi/metadata" + "github.com/docker/infrakit/pkg/types" +) + +// NewClient returns a plugin interface implementation connected to a remote plugin +func NewClient(socketPath string) (metadata.Plugin, error) { + rpcClient, err := rpc_client.New(socketPath, metadata.InterfaceSpec) + if err != nil { + return nil, err + } + return &client{client: rpcClient}, nil +} + +// Adapt converts a rpc client to a Metadata plugin object +func Adapt(rpcClient rpc_client.Client) metadata.Plugin { + return &client{client: rpcClient} +} + +type client struct { + client rpc_client.Client +} + +// List returns a list of nodes under path. +func (c client) List(path metadata.Path) ([]string, error) { + req := ListRequest{Path: path} + resp := ListResponse{} + err := c.client.Call("Metadata.List", req, &resp) + return resp.Nodes, err +} + +// Get retrieves the metadata at path. +func (c client) Get(path metadata.Path) (*types.Any, error) { + req := GetRequest{Path: path} + resp := GetResponse{} + err := c.client.Call("Metadata.Get", req, &resp) + if err != nil { + return nil, err + } + return resp.Value, err +} diff --git a/pkg/rpc/metadata/metadata.test b/pkg/rpc/metadata/metadata.test new file mode 100755 index 000000000..ed33cd169 Binary files /dev/null and b/pkg/rpc/metadata/metadata.test differ diff --git a/pkg/rpc/metadata/rpc_test.go b/pkg/rpc/metadata/rpc_test.go new file mode 100644 index 000000000..947f08194 --- /dev/null +++ b/pkg/rpc/metadata/rpc_test.go @@ -0,0 +1,281 @@ +package metadata + +import ( + "errors" + "io/ioutil" + "path/filepath" + "testing" + + plugin_metadata "github.com/docker/infrakit/pkg/plugin/metadata" + rpc_server "github.com/docker/infrakit/pkg/rpc/server" + "github.com/docker/infrakit/pkg/spi/metadata" + testing_metadata "github.com/docker/infrakit/pkg/testing/metadata" + "github.com/docker/infrakit/pkg/types" + "github.com/stretchr/testify/require" +) + +func tempSocket() string { + dir, err := ioutil.TempDir("", "infrakit-test-") + if err != nil { + panic(err) + } + + return filepath.Join(dir, "metadata-impl-test") +} + +func must(p metadata.Plugin, err error) metadata.Plugin { + if err != nil { + panic(err) + } + return p +} + +func first(a, b interface{}) interface{} { + return a +} + +func firstAny(a, b interface{}) *types.Any { + v := first(a, b) + return v.(*types.Any) +} + +func second(a, b interface{}) interface{} { + return b +} + +func TestMetadataMultiPlugin(t *testing.T) { + socketPath := tempSocket() + + inputMetadataPathListActual1 := make(chan []string, 1) + inputMetadataPathGetActual1 := make(chan []string, 1) + + inputMetadataPathListActual2 := make(chan []string, 1) + inputMetadataPathGetActual2 := make(chan []string, 1) + + m := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("region/count"), 3, m) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc1/network/network1/id"), "id-network1", m) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc2/network/network10/id"), "id-network10", m) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc2/network/network11/id"), "id-network11", m) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/vpc/vpc21/network/network210/id"), "id-network210", m) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/vpc/vpc21/network/network211/id"), "id-network211", m) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/metrics/instances/count"), 100, m) + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]metadata.Plugin{ + "aws": &testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + inputMetadataPathListActual1 <- path + return plugin_metadata.List(path, m), nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + inputMetadataPathGetActual1 <- path + return plugin_metadata.GetValue(path, m) + }, + }, + "azure": &testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + inputMetadataPathListActual2 <- path + return nil, errors.New("azure-error") + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + inputMetadataPathGetActual2 <- path + return nil, errors.New("azure-error2") + }, + }, + })) + require.NoError(t, err) + + require.Equal(t, []string{"region"}, first(must(NewClient(socketPath)).List(plugin_metadata.Path("aws")))) + require.Error(t, second(must(NewClient(socketPath)).List(plugin_metadata.Path("azure"))).(error)) + + require.Equal(t, []string{"aws", "azure"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("/")))) + + require.Equal(t, []string{}, <-inputMetadataPathListActual1) + require.Equal(t, []string{}, <-inputMetadataPathListActual2) + + require.Equal(t, "3", firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("aws/region/count"))).String()) + require.Error(t, second(must(NewClient(socketPath)).Get(plugin_metadata.Path("azure"))).(error)) + + require.Equal(t, []string{"region", "count"}, <-inputMetadataPathGetActual1) + require.Equal(t, []string{}, <-inputMetadataPathGetActual2) + + server.Stop() +} + +func TestMetadataMultiPlugin2(t *testing.T) { + socketPath := tempSocket() + + m1 := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc1/network/network1/id"), "id-network1", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc2/network/network10/id"), "id-network10", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc2/network/network11/id"), "id-network11", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/vpc/vpc21/network/network210/id"), "id-network210", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/vpc/vpc21/network/network211/id"), "id-network211", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/metrics/instances/count"), 100, m1) + + m2 := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("dc/us-1/vpc/vpc1/network/network1/id"), "id-network1", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-1/vpc/vpc2/network/network10/id"), "id-network10", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-1/vpc/vpc2/network/network11/id"), "id-network11", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-2/vpc/vpc21/network/network210/id"), "id-network210", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-2/vpc/vpc21/network/network211/id"), "id-network211", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-2/metrics/instances/count"), 100, m2) + + server, err := rpc_server.StartPluginAtPath(socketPath, PluginServerWithTypes( + map[string]metadata.Plugin{ + "aws": &testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + res := plugin_metadata.List(path, m1) + return res, nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + return plugin_metadata.GetValue(path, m1) + }, + }, + "azure": &testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + res := plugin_metadata.List(path, m2) + return res, nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + return plugin_metadata.GetValue(path, m2) + }, + }, + })) + require.NoError(t, err) + + require.Equal(t, []string{"aws", "azure"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("")))) + require.Equal(t, []string{"region"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("aws")))) + require.Equal(t, []string{"dc"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("azure/")))) + require.Equal(t, []string(nil), + first(must(NewClient(socketPath)).List(plugin_metadata.Path("gce/")))) + require.Equal(t, []string{"network10", "network11"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("aws/region/us-west-1/vpc/vpc2/network")))) + + require.Equal(t, "100", + firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("aws/region/us-west-2/metrics/instances/count"))).String()) + require.Equal(t, "{\"network\":{\"network210\":{\"id\":\"id-network210\"},\"network211\":{\"id\":\"id-network211\"}}}", + firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("azure/dc/us-2/vpc/vpc21"))).String()) + require.Nil(t, firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("aws/none")))) + + server.Stop() +} + +func TestMetadataMultiPlugin3(t *testing.T) { + socketPath := tempSocket() + + m0 := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("metrics/instances/count"), 100, m0) + plugin_metadata.Put(plugin_metadata.Path("metrics/networks/count"), 10, m0) + plugin_metadata.Put(plugin_metadata.Path("metrics/workers/count"), 1000, m0) + plugin_metadata.Put(plugin_metadata.Path("metrics/managers/count"), 7, m0) + + m1 := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc1/network/network1/id"), "id-network1", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc2/network/network10/id"), "id-network10", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-1/vpc/vpc2/network/network11/id"), "id-network11", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/vpc/vpc21/network/network210/id"), "id-network210", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/vpc/vpc21/network/network211/id"), "id-network211", m1) + plugin_metadata.Put(plugin_metadata.Path("region/us-west-2/metrics/instances/count"), 100, m1) + + m2 := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("dc/us-1/vpc/vpc1/network/network1/id"), "id-network1", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-1/vpc/vpc2/network/network10/id"), "id-network10", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-1/vpc/vpc2/network/network11/id"), "id-network11", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-2/vpc/vpc21/network/network210/id"), "id-network210", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-2/vpc/vpc21/network/network211/id"), "id-network211", m2) + plugin_metadata.Put(plugin_metadata.Path("dc/us-2/metrics/instances/count"), 100, m2) + + server, err := rpc_server.StartPluginAtPath(socketPath, + PluginServer(&testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + res := plugin_metadata.List(path, m0) + return res, nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + return plugin_metadata.GetValue(path, m0) + }, + }).WithTypes(map[string]metadata.Plugin{ + "aws": &testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + res := plugin_metadata.List(path, m1) + return res, nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + return plugin_metadata.GetValue(path, m1) + }, + }, + "azure": &testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + res := plugin_metadata.List(path, m2) + return res, nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + return plugin_metadata.GetValue(path, m2) + }, + }, + })) + require.NoError(t, err) + + require.Equal(t, []string{"aws", "azure", "metrics"}, + first(must(NewClient(socketPath)).List(metadata.Path([]string{})))) + require.Equal(t, []string{"aws", "azure", "metrics"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("/")))) + require.Equal(t, []string{"region"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("aws")))) + require.Equal(t, []string{"dc"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("azure/")))) + require.Equal(t, []string{}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("gce/")))) + require.Equal(t, []string{"network10", "network11"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("aws/region/us-west-1/vpc/vpc2/network")))) + require.Equal(t, []string{"aws", "azure", "metrics"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path(".")))) + + require.Equal(t, "100", + firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("metrics/instances/count"))).String()) + require.Equal(t, "{\"network\":{\"network210\":{\"id\":\"id-network210\"},\"network211\":{\"id\":\"id-network211\"}}}", + firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("azure/dc/us-2/vpc/vpc21"))).String()) + require.Nil(t, firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("aws/none")))) + + server.Stop() +} + +func TestMetadataMultiPlugin4(t *testing.T) { + socketPath := tempSocket() + + m0 := map[string]interface{}{} + plugin_metadata.Put(plugin_metadata.Path("metrics/instances/count"), 100, m0) + plugin_metadata.Put(plugin_metadata.Path("metrics/networks/count"), 10, m0) + plugin_metadata.Put(plugin_metadata.Path("metrics/workers/count"), 1000, m0) + plugin_metadata.Put(plugin_metadata.Path("metrics/managers/count"), 7, m0) + + server, err := rpc_server.StartPluginAtPath(socketPath, + PluginServer(&testing_metadata.Plugin{ + DoList: func(path metadata.Path) ([]string, error) { + res := plugin_metadata.List(path, m0) + return res, nil + }, + DoGet: func(path metadata.Path) (*types.Any, error) { + return plugin_metadata.GetValue(path, m0) + }, + })) + require.NoError(t, err) + + require.Equal(t, []string{"metrics"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("")))) + require.Equal(t, []string{"instances", "managers", "networks", "workers"}, + first(must(NewClient(socketPath)).List(plugin_metadata.Path("metrics/")))) + + require.Equal(t, "100", + firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("metrics/instances/count"))).String()) + require.Equal(t, "{\"instances\":{\"count\":100},\"managers\":{\"count\":7},\"networks\":{\"count\":10},\"workers\":{\"count\":1000}}", + firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("metrics"))).String()) + require.Nil(t, firstAny(must(NewClient(socketPath)).Get(plugin_metadata.Path("aws/none")))) + server.Stop() +} diff --git a/pkg/rpc/metadata/service.go b/pkg/rpc/metadata/service.go new file mode 100644 index 000000000..ef503512b --- /dev/null +++ b/pkg/rpc/metadata/service.go @@ -0,0 +1,180 @@ +package metadata + +import ( + "net/http" + "sort" + + "github.com/docker/infrakit/pkg/spi" + "github.com/docker/infrakit/pkg/spi/metadata" + "github.com/docker/infrakit/pkg/template" +) + +// PluginServer returns a Metadata that conforms to the net/rpc rpc call convention. +func PluginServer(p metadata.Plugin) *Metadata { + return &Metadata{plugin: p} +} + +// PluginServerWithTypes which supports multiple types of metadata plugins. The de-multiplexing +// is done by the server's RPC method implementations. +func PluginServerWithTypes(typed map[string]metadata.Plugin) *Metadata { + return &Metadata{typedPlugins: typed} +} + +// Metadata the exported type needed to conform to json-rpc call convention +type Metadata struct { + plugin metadata.Plugin + typedPlugins map[string]metadata.Plugin // by type, as qualified in the name of the plugin +} + +// WithBase sets the base plugin to the given plugin object +func (p *Metadata) WithBase(m metadata.Plugin) *Metadata { + p.plugin = m + return p +} + +// WithTypes sets the typed plugins to the given map of plugins (by type name) +func (p *Metadata) WithTypes(typed map[string]metadata.Plugin) *Metadata { + p.typedPlugins = typed + return p +} + +// VendorInfo returns a metadata object about the plugin, if the plugin implements it. See spi.Vendor +func (p *Metadata) VendorInfo() *spi.VendorInfo { + // TODO(chungers) - support typed plugins + if p.plugin == nil { + return nil + } + + if m, is := p.plugin.(spi.Vendor); is { + return m.VendorInfo() + } + return nil +} + +// Funcs implements the template.FunctionExporter method to expose help for plugin's template functions +func (p *Metadata) Funcs() []template.Function { + f, is := p.plugin.(template.FunctionExporter) + if !is { + return []template.Function{} + } + return f.Funcs() +} + +// Types implements server.TypedFunctionExporter +func (p *Metadata) Types() []string { + if p.typedPlugins == nil { + return nil + } + list := []string{} + for k := range p.typedPlugins { + list = append(list, k) + } + return list +} + +// FuncsByType implements server.TypedFunctionExporter +func (p *Metadata) FuncsByType(t string) []template.Function { + if p.typedPlugins == nil { + return nil + } + fp, has := p.typedPlugins[t] + if !has { + return nil + } + exp, is := fp.(template.FunctionExporter) + if !is { + return nil + } + return exp.Funcs() +} + +// ImplementedInterface returns the interface implemented by this RPC service. +func (p *Metadata) ImplementedInterface() spi.InterfaceSpec { + return metadata.InterfaceSpec +} + +func (p *Metadata) getPlugin(metadataType string) metadata.Plugin { + if metadataType == "" { + return p.plugin + } + if p, has := p.typedPlugins[metadataType]; has { + return p + } + return nil +} + +// List returns a list of child nodes given a path. +func (p *Metadata) List(_ *http.Request, req *ListRequest, resp *ListResponse) error { + nodes := []string{} + + // the . case - list the typed plugins and the default's first level. + if len(req.Path) == 0 || req.Path[0] == "" || req.Path[0] == "." { + if p.plugin != nil { + n, err := p.plugin.List(req.Path) + if err != nil { + return err + } + nodes = append(nodes, n...) + } + for k := range p.typedPlugins { + nodes = append(nodes, k) + } + sort.Strings(nodes) + resp.Nodes = nodes + return nil + } + + c, has := p.typedPlugins[req.Path[0]] + if !has { + + if p.plugin == nil { + return nil + } + + nodes, err := p.plugin.List(req.Path) + if err != nil { + return err + } + sort.Strings(nodes) + resp.Nodes = nodes + return nil + } + + nodes, err := c.List(req.Path[1:]) + if err != nil { + return err + } + + sort.Strings(nodes) + resp.Nodes = nodes + return nil +} + +// Get retrieves the value at path given. +func (p *Metadata) Get(_ *http.Request, req *GetRequest, resp *GetResponse) error { + if len(req.Path) == 0 { + return nil + } + + c, has := p.typedPlugins[req.Path[0]] + if !has { + + if p.plugin == nil { + return nil + } + + value, err := p.plugin.Get(req.Path) + if err != nil { + return err + } + resp.Value = value + return nil + } + + value, err := c.Get(req.Path[1:]) + if err != nil { + return err + } + resp.Value = value + return nil +} diff --git a/pkg/rpc/metadata/types.go b/pkg/rpc/metadata/types.go new file mode 100644 index 000000000..18d30d10c --- /dev/null +++ b/pkg/rpc/metadata/types.go @@ -0,0 +1,26 @@ +package metadata + +import ( + "github.com/docker/infrakit/pkg/spi/metadata" + "github.com/docker/infrakit/pkg/types" +) + +// ListRequest is the rpc wrapper for request parameters to List +type ListRequest struct { + Path metadata.Path +} + +// ListResponse is the rpc wrapper for the results of List +type ListResponse struct { + Nodes []string +} + +// GetRequest is the rpc wrapper of the params to Get +type GetRequest struct { + Path metadata.Path +} + +// GetResponse is the rpc wrapper of the result of Get +type GetResponse struct { + Value *types.Any +} diff --git a/pkg/spi/metadata/path.go b/pkg/spi/metadata/path.go new file mode 100644 index 000000000..51ab524cc --- /dev/null +++ b/pkg/spi/metadata/path.go @@ -0,0 +1,102 @@ +package metadata + +var ( + // NullPath means no path + NullPath = Path([]string{}) +) + +// Path is used to identify a particle of metadata. The path can be strings separated by / as in a URL. +type Path []string + +// Clean scrubs the path to remove any empty string or . or .. and collapse the path into a concise form. +// It's similar to path/filepath.Clean in the standard lib. +func (p Path) Clean() Path { + this := []string(p) + copy := []string{} + for _, v := range this { + switch v { + case "", ".": + case "..": + if len(copy) == 0 { + copy = append(copy, "..") + } else { + copy = copy[0 : len(copy)-1] + if len(copy) == 0 { + return NullPath + } + } + default: + copy = append(copy, v) + + } + } + return Path(copy) +} + +// Len returns the length of the path +func (p Path) Len() int { + return len([]string(p)) +} + +// Index returns the ith component in the path +func (p Path) Index(i int) *string { + if p.Len() <= i { + return nil + } + copy := []string(p)[i] + return © +} + +// Shift returns a new path that's shifted i positions to the left -- ith child of the head at index=0 +func (p Path) Shift(i int) Path { + len := p.Len() - i + if len <= 0 { + return Path([]string{}) + } + new := make([]string, len) + copy(new, []string(p)[i:]) + return Path(new) +} + +// Dir returns the 'dir' of the path +func (p Path) Dir() Path { + pp := p.Clean() + if len(pp) > 1 { + return p[0 : len(pp)-1] + } + return Path([]string{"."}) +} + +// Base returns the base of the path +func (p Path) Base() string { + pp := p.Clean() + return pp[len(pp)-1] +} + +// Join joins the input as a child of this path +func (p Path) Join(child string) Path { + return p.Sub(Path([]string{child})) +} + +// Sub joins the child to the parent +func (p Path) Sub(child Path) Path { + pp := p.Clean() + return Path(append(pp, []string(child)...)) +} + +// Rel returns a new path that is a child of the input from this path. +// e.g. For a path a/b/c/d Rel(a/b/) returns c/d. NullPath is returned if +// the two are not relative to one another. +func (p Path) Rel(path Path) Path { + this := []string(p.Clean()) + parent := []string(path.Clean()) + if len(this) < len(parent) { + return NullPath + } + for i := 0; i < len(parent); i++ { + if parent[i] != this[i] { + return NullPath + } + } + return Path(this[len(parent):]) +} diff --git a/pkg/spi/metadata/path_test.go b/pkg/spi/metadata/path_test.go new file mode 100644 index 000000000..2c1bb8128 --- /dev/null +++ b/pkg/spi/metadata/path_test.go @@ -0,0 +1,39 @@ +package metadata + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func p(s string) []string { + return strings.Split(s, "/") +} + +func TestPath(t *testing.T) { + + p1 := Path(p("a/b/c")) + p2 := Path(p("b/c")) + + require.Equal(t, "a", *p1.Index(0)) + require.Equal(t, p2, p1.Shift(1)) + + require.Equal(t, "c", p1.Base()) + require.Equal(t, Path(p("a/b")), Path(p("a/b/c")).Dir()) + + require.Equal(t, "a", Path(p("a")).Base()) + require.Equal(t, Path(p(".")), Path(p("a")).Dir()) + + require.Equal(t, Path(p("a/b/c/d")), Path(p("a/b/c/d/")).Clean()) + require.Equal(t, Path(p("a/b/c/d")), Path(p("a/b/c/d/.")).Clean()) + require.Equal(t, Path(p("a/b/c")), Path(p("a/b/c/d/..")).Clean()) + require.Equal(t, Path(p("a/b/c")), Path(p("a/b/c/d/../")).Clean()) + require.Equal(t, NullPath, Path(p("a/..")).Clean()) + + require.Equal(t, Path(p("a/b/c/d")), Path(p("a/b/c/d/")).Clean()) + require.Equal(t, Path(p("a/b/c/d")), Path(p("a/b/c/")).Join("d")) + require.Equal(t, Path(p("a/b/c/d/x/y")), Path(p("a/b/c/")).Sub(p("d/x/y"))) + + require.Equal(t, Path(p("c/d/e/f")), Path(p("a/b/c/d/e/f")).Rel(p("a/b/"))) +} diff --git a/pkg/spi/metadata/spi.go b/pkg/spi/metadata/spi.go new file mode 100644 index 000000000..d7a8dd13f --- /dev/null +++ b/pkg/spi/metadata/spi.go @@ -0,0 +1,23 @@ +package metadata + +import ( + "github.com/docker/infrakit/pkg/spi" + "github.com/docker/infrakit/pkg/types" +) + +// InterfaceSpec is the current name and version of the Metadata API. +var InterfaceSpec = spi.InterfaceSpec{ + Name: "Metadata", + Version: "0.1.0", +} + +// Plugin is the interface for metadata-related operations. +type Plugin interface { + + // List returns a list of *child nodes* given a path, which is specified as a slice + // where for i > j path[i] is the parent of path[j] + List(path Path) (child []string, err error) + + // Get retrieves the value at path given. + Get(path Path) (value *types.Any, err error) +} diff --git a/pkg/template/funcs.go b/pkg/template/funcs.go index 90e3d7cf4..4d2932a9f 100644 --- a/pkg/template/funcs.go +++ b/pkg/template/funcs.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/docker/infrakit/pkg/types" "github.com/jmespath/go-jmespath" ) @@ -60,6 +61,9 @@ func FromJSON(o interface{}) (interface{}, error) { case []byte: err := json.Unmarshal(o, &ret) return ret, err + case *types.Any: + err := json.Unmarshal(o.Bytes(), &ret) + return ret, err } return ret, fmt.Errorf("not-supported-value-type") } @@ -218,6 +222,18 @@ func (t *Template) DefaultFuncs() []Function { return make([]struct{}, c) }, }, + { + Name: "global", + Description: []string{ + "Sets a global variable named after the first argument, with the value as the second argument.", + "This is similar to def (which sets the default value).", + "Global variables are propagated to all templates that are rendered via the 'include' function.", + }, + Func: func(n string, v interface{}) Void { + t.Global(n, v) + return voidValue + }, + }, { Name: "def", Description: []string{ @@ -240,22 +256,10 @@ func (t *Template) DefaultFuncs() []Function { doc = fmt.Sprintf("%v", args[0]) value = args[1] } - t.AddDef(name, value, doc) + t.Def(name, value, doc) return voidValue, nil }, }, - { - Name: "global", - Description: []string{ - "Sets a global variable named after the first argument, with the value as the second argument.", - "This is similar to def (which sets the default value).", - "Global variables are propagated to all templates that are rendered via the 'include' function.", - }, - Func: func(n string, v interface{}) Void { - t.Global(n, v) - return voidValue - }, - }, { Name: "ref", Description: []string{ diff --git a/pkg/template/integration_test.go b/pkg/template/integration_test.go index 7aeef7fd4..9f163625e 100644 --- a/pkg/template/integration_test.go +++ b/pkg/template/integration_test.go @@ -159,13 +159,31 @@ The message is {{str}} require.Equal(t, 23, context.invokes) // note this is private state not accessible in template } -func TestAddDef(t *testing.T) { +func TestMissingGlobal(t *testing.T) { + s := `{{ if not (ref "/not/exist")}}none{{else}}here{{end}}` + tt, err := NewTemplate("str://"+s, Options{}) + require.NoError(t, err) + view, err := tt.Render(nil) + require.NoError(t, err) + require.Equal(t, "none", view) +} +func TestSourceAndDef(t *testing.T) { + r := `{{ def \"foo\" 100 }}` + s := `{{ source "str://` + r + `" }}foo={{ref "foo"}}` + tt, err := NewTemplate("str://"+s, Options{}) + require.NoError(t, err) + view, err := tt.Render(nil) + require.NoError(t, err) + require.Equal(t, "foo=100", view) +} + +func TestAddDef(t *testing.T) { s := `{{ ref "message" }}: x + y = {{ add (ref "x") (ref "y") }}` tt, err := NewTemplate("str://"+s, Options{}) require.NoError(t, err) - view, err := tt.AddDef("x", 25, "Default value for x").AddDef("y", 100).AddDef("message", "hello").Render(nil) + view, err := tt.Def("x", 25, "Default value for x").Def("y", 100, "no doc").Def("message", "hello", "").Render(nil) require.NoError(t, err) require.Equal(t, "hello: x + y = 125", view) } @@ -218,3 +236,23 @@ func TestIncludeAndGlobalWithContext(t *testing.T) { require.NoError(t, err) require.Equal(t, "a=1", view) // the included template cannot mutate the calling template's context. } + +func TestWithFunctions(t *testing.T) { + ctx := map[string]interface{}{ + "a": 1, + "b": 2, + } + s := `hello={{hello .a }}` + tt, err := NewTemplate("str://"+s, Options{}) + require.NoError(t, err) + view, err := tt.WithFunctions(func() []Function { + return []Function{ + { + Name: "hello", + Func: func(n interface{}) interface{} { return n }, + }, + } + }).Render(ctx) + require.NoError(t, err) + require.Equal(t, "hello=1", view) +} diff --git a/pkg/template/template.go b/pkg/template/template.go index ee5675f30..58e5f7e72 100644 --- a/pkg/template/template.go +++ b/pkg/template/template.go @@ -67,13 +67,14 @@ type defaultValue struct { type Template struct { options Options - url string - body []byte - parsed *template.Template - funcs map[string]interface{} - globals map[string]interface{} - defaults map[string]defaultValue - context interface{} + url string + body []byte + parsed *template.Template + functions []func() []Function + funcs map[string]interface{} + globals map[string]interface{} + defaults map[string]defaultValue + context interface{} registered []Function lock sync.Mutex @@ -114,12 +115,13 @@ func NewTemplateFromBytes(buff []byte, contextURL string, opt Options) (*Templat } return &Template{ - options: opt, - url: contextURL, - body: buff, - funcs: map[string]interface{}{}, - globals: map[string]interface{}{}, - defaults: map[string]defaultValue{}, + options: opt, + url: contextURL, + body: buff, + funcs: map[string]interface{}{}, + globals: map[string]interface{}{}, + defaults: map[string]defaultValue{}, + functions: []func() []Function{}, }, nil } @@ -131,24 +133,19 @@ func (t *Template) SetOptions(opt Options) *Template { return t } -// AddFunc adds a new function to support in template -func (t *Template) AddFunc(name string, f interface{}) *Template { +// WithFunctions allows client code to extend the template by adding its own functions. +func (t *Template) WithFunctions(functions func() []Function) *Template { t.lock.Lock() defer t.lock.Unlock() - t.funcs[name] = f + t.functions = append(t.functions, functions) return t } -// AddDef is equivalent to a {{ def "key" value "description" }} in defining a variable with a default value. -// The value is accessible via a {{ ref "key" }} in the template. -func (t *Template) AddDef(name string, val interface{}, doc ...string) *Template { +// AddFunc adds a new function to support in template +func (t *Template) AddFunc(name string, f interface{}) *Template { t.lock.Lock() defer t.lock.Unlock() - t.defaults[name] = defaultValue{ - Name: name, - Value: val, - Doc: strings.Join(doc, " "), - } + t.funcs[name] = f return t } @@ -175,6 +172,10 @@ func (t *Template) forkFrom(parent *Template) (dotCopy interface{}, err error) { for k, v := range parent.globals { t.globals[k] = v } + // copy the defaults in the parent scope into the child + for k, v := range parent.defaults { + t.defaults[k] = v + } // inherit the functions defined for this template for k, v := range parent.funcs { t.AddFunc(k, v) @@ -187,10 +188,11 @@ func (t *Template) forkFrom(parent *Template) (dotCopy interface{}, err error) { // Global sets the a key, value in the context of this template. It is visible to all the 'included' // and 'sourced' templates by the calling template. -func (t *Template) Global(name string, value interface{}) { +func (t *Template) Global(name string, value interface{}) *Template { for here := t; here != nil; here = here.parent { here.updateGlobal(name, value) } + return t } func (t *Template) updateGlobal(name string, value interface{}) { @@ -199,6 +201,26 @@ func (t *Template) updateGlobal(name string, value interface{}) { t.globals[name] = value } +// Def is equivalent to a {{ def "key" value "description" }} in defining a variable with a default value. +// The value is accessible via a {{ ref "key" }} in the template. +func (t *Template) Def(name string, value interface{}, doc string) *Template { + for here := t; here != nil; here = here.parent { + here.updateDef(name, value, doc) + } + return t +} + +func (t *Template) updateDef(name string, val interface{}, doc ...string) *Template { + t.lock.Lock() + defer t.lock.Unlock() + t.defaults[name] = defaultValue{ + Name: name, + Value: val, + Doc: strings.Join(doc, " "), + } + return t +} + // Validate parses the template and checks for validity. func (t *Template) Validate() (*Template, error) { t.lock.Lock() @@ -235,6 +257,30 @@ func (t *Template) build(context Context) error { } } + // the default functions cannot be overriden + for _, f := range t.DefaultFuncs() { + tf, err := makeTemplateFunc(context, f.Func) + if err != nil { + return err + } + fm[f.Name] = tf + registered = append(registered, f) + } + + // If there are any function sources that was set via WithFunctions() + for _, exp := range t.functions { + for _, f := range exp() { + tf, err := makeTemplateFunc(context, f.Func) + if err != nil { + return err + } + fm[f.Name] = tf + registered = append(registered, f) + } + } + + // If the context implements the FunctionExporter interface, it can add more functions + // and potentially override existing. if context != nil { for _, f := range context.Funcs() { if tf, err := makeTemplateFunc(context, f.Func); err == nil { @@ -246,12 +292,6 @@ func (t *Template) build(context Context) error { } } - // the default functions cannot be overriden - for _, f := range t.DefaultFuncs() { - fm[f.Name] = f.Func - registered = append(registered, f) - } - t.registered = registered parsed, err := template.New(t.url).Funcs(fm).Parse(string(t.body)) diff --git a/pkg/testing/metadata/plugin.go b/pkg/testing/metadata/plugin.go new file mode 100644 index 000000000..92ab7922e --- /dev/null +++ b/pkg/testing/metadata/plugin.go @@ -0,0 +1,26 @@ +package metadata + +import ( + "github.com/docker/infrakit/pkg/spi/metadata" + "github.com/docker/infrakit/pkg/types" +) + +// Plugin implements metadata.Plugin +type Plugin struct { + + // DoList implements List via function + DoList func(path metadata.Path) (child []string, err error) + + // DoGet implements Get via function + DoGet func(path metadata.Path) (value *types.Any, err error) +} + +// List lists the child nodes under path +func (t *Plugin) List(path metadata.Path) (child []string, err error) { + return t.DoList(path) +} + +// Get gets the value +func (t *Plugin) Get(path metadata.Path) (value *types.Any, err error) { + return t.DoGet(path) +}