From 4cb81ab20f4c8657579c7fee04fa0161da3766a3 Mon Sep 17 00:00:00 2001 From: Nick Stott Date: Mon, 27 Oct 2014 11:41:22 -0400 Subject: [PATCH] First public release --- .gitignore | 4 + CONTRIBUTING.md | 1 + LICENSE | 27 ++ README.md | 70 ++++ circle.yml | 32 ++ cmd/transporter/command.go | 196 +++++++++++ cmd/transporter/config.go | 58 ++++ cmd/transporter/javascript_builder.go | 247 ++++++++++++++ cmd/transporter/main.go | 32 ++ cmd/transporter/node.go | 76 +++++ pkg/adaptor/adaptor.go | 128 ++++++++ pkg/adaptor/adaptor_test.go | 75 +++++ pkg/adaptor/elasticsearch.go | 140 ++++++++ pkg/adaptor/errors.go | 61 ++++ pkg/adaptor/file.go | 126 ++++++++ pkg/adaptor/influxdb.go | 118 +++++++ pkg/adaptor/mongodb.go | 304 ++++++++++++++++++ pkg/adaptor/rethinkdb.go | 137 ++++++++ pkg/adaptor/transformer.go | 170 ++++++++++ pkg/events/emitter.go | 208 ++++++++++++ pkg/events/events.go | 130 ++++++++ pkg/events/events_test.go | 40 +++ pkg/message/message.go | 101 ++++++ pkg/message/message_test.go | 190 +++++++++++ pkg/message/ops.go | 64 ++++ pkg/pipe/pipe.go | 137 ++++++++ pkg/transporter/doc.go | 25 ++ pkg/transporter/node.go | 196 +++++++++++ pkg/transporter/node_test.go | 94 ++++++ pkg/transporter/pipeline.go | 161 ++++++++++ .../pipeline_events_integration_test.go | 105 ++++++ pkg/transporter/pipeline_integration_test.go | 161 ++++++++++ pkg/transporter/pipeline_test.go | 75 +++++ test/application-es.js | 4 + test/application-filex2.js | 3 + test/application-influx.js | 4 + test/application-mongo-file.js | 6 + test/application-multisave.js | 5 + test/application.js | 3 + test/config.yaml | 31 ++ test/transformers/influx_test.js | 1 + test/transformers/passthrough_and_log.js | 4 + test/transformers/passthrough_and_log2.js | 4 + test/transformers/transform2.js | 6 + 44 files changed, 3760 insertions(+) create mode 100644 .gitignore create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE create mode 100644 README.md create mode 100644 circle.yml create mode 100644 cmd/transporter/command.go create mode 100644 cmd/transporter/config.go create mode 100644 cmd/transporter/javascript_builder.go create mode 100644 cmd/transporter/main.go create mode 100644 cmd/transporter/node.go create mode 100644 pkg/adaptor/adaptor.go create mode 100644 pkg/adaptor/adaptor_test.go create mode 100644 pkg/adaptor/elasticsearch.go create mode 100644 pkg/adaptor/errors.go create mode 100644 pkg/adaptor/file.go create mode 100644 pkg/adaptor/influxdb.go create mode 100644 pkg/adaptor/mongodb.go create mode 100644 pkg/adaptor/rethinkdb.go create mode 100644 pkg/adaptor/transformer.go create mode 100644 pkg/events/emitter.go create mode 100644 pkg/events/events.go create mode 100644 pkg/events/events_test.go create mode 100644 pkg/message/message.go create mode 100644 pkg/message/message_test.go create mode 100644 pkg/message/ops.go create mode 100644 pkg/pipe/pipe.go create mode 100644 pkg/transporter/doc.go create mode 100644 pkg/transporter/node.go create mode 100644 pkg/transporter/node_test.go create mode 100644 pkg/transporter/pipeline.go create mode 100644 pkg/transporter/pipeline_events_integration_test.go create mode 100644 pkg/transporter/pipeline_integration_test.go create mode 100644 pkg/transporter/pipeline_test.go create mode 100644 test/application-es.js create mode 100644 test/application-filex2.js create mode 100644 test/application-influx.js create mode 100644 test/application-mongo-file.js create mode 100644 test/application-multisave.js create mode 100644 test/application.js create mode 100644 test/config.yaml create mode 100644 test/transformers/influx_test.js create mode 100644 test/transformers/passthrough_and_log.js create mode 100644 test/transformers/passthrough_and_log2.js create mode 100644 test/transformers/transform2.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..c436e2c15 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +transporter +!cmd/transporter +!pkg/transporter + diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 000000000..0cb14bae3 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1 @@ +# Contributing to Transporter \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 000000000..7d5ca4e60 --- /dev/null +++ b/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2014, Compose, Inc +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of [project] nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 000000000..226edb3d3 --- /dev/null +++ b/README.md @@ -0,0 +1,70 @@ +Transporter + +Build +----- +`go build -a ./cmd/...` + + +Configure +--------- +there is a sample config in test/config.yaml. The config defines the endpoints, (either sources or sinks) that are available to the application. +```yaml +api: + interval: 1 # number of milliseconds between metrics pings + uri: "http://requestb.in/1a0zlf11" +nodes: + localmongo: + type: mongo + uri: mongodb://localhost/boom + supernick: + type: elasticsearch + uri: http://10.0.0.1,10.0.0.2:9200/indexname + debug: + type: file + uri: stdout:// + foofile: + type: file + uri: file:///tmp/foo + stdout: + type: file + uri: stdout:// +``` + +There is also a sample 'application.js' in test/application.js. The application is responsible for building transporter pipelines. +Given the above config, this Transporter application.js will copy from a file (in /tmp/foo) to stdout. +```js +Source({name:"foofile"}).save({name:"stdout"}) + +``` + +This application.js will copy from the local mongo to a file on the local disk +```js +Source({name:"localmongo", namespace: "boom.foo"}).save({name:"tofile"}) +``` + +Transformers can also configured in the application.js as follows +```js +var pipeline = Source({name:"mongodb-production", namespace: "compose.milestones2"}) +pipeline = pipeline.transform("transformers/transform1.js").transform("transformers/transform2.js") +pipeline.save({name:"supernick", namespace: "something/posts2"}); + +``` +Run +--- + +- list `transporter list --config ./test/config.yaml` +- run `transporter run --config ./test/config.yaml ./test/application.js` +- eval `transporter eval --config ./test/config.yaml 'Source({name:"localmongo", namespace: "boom.foo"}).save({name:"tofile"})' ` +- test `transporter test --config ./test/config.yaml test/application.js ` + +Contributing to Transporter +====================== + +[![Circle CI](https://circleci.com/gh/compose/transporter/tree/master.png?style=badge)](https://circleci.com/gh/compose/transporter/tree/master) + +Want to help out with Transporter? Great! There are instructions to get you +started [here](CONTRIBUTING.md). + +Licensing +========= +Transporter is licensed under the New BSD. See LICENSE for full license text. diff --git a/circle.yml b/circle.yml new file mode 100644 index 000000000..72dbfe6db --- /dev/null +++ b/circle.yml @@ -0,0 +1,32 @@ +machine: + environment: + ME: "transporter" + GOBIN: "$CIRCLE_ARTIFACTS" + +dependencies: + override: + - test -d /home/ubuntu/.go_workspace/src/github.com/compose/ || mkdir -p /home/ubuntu/.go_workspace/src/github.com/compose/ + - test -e /home/ubuntu/.go_workspace/src/github.com/compose/transporter || ln -s /home/ubuntu/transporter/ /home/ubuntu/.go_workspace/src/github.com/compose/ + - cd /home/ubuntu/.go_workspace/src/github.com/compose/ && go get ./transporter/... + - test -d $HOME/boto || pip install -t $HOME/boto boto + + + cache_directories: + - "/home/ubuntu/boto" + - "/home/ubuntu/bin" + +test: + override: + # - $CIRCLE_ARTIFACTS/godep go test ./... + - go test ./... -tags=integration -v + +deployment: + default: + branch: [master, stage, dev, experimental] + commands: + - goxc -os="linux,darwin" -arch="amd64" + # - sh ./circle/build_deb.sh + # - tar zcvf $HOME/$ME-$CIRCLE_BRANCH.tgz $CIRCLE_ARTIFACTS/ + # - cd $CIRCLE_ARTIFACTS && tar zcvf $HOME/$ME-$CIRCLE_BRANCH.tgz . ; cd $HOME/$ME + # - python ./circle/s3push.py "$HOME/$ME-$CIRCLE_BRANCH.tgz" "$ME-$CIRCLE_BRANCH.tgz" + diff --git a/cmd/transporter/command.go b/cmd/transporter/command.go new file mode 100644 index 000000000..a87610b56 --- /dev/null +++ b/cmd/transporter/command.go @@ -0,0 +1,196 @@ +package main + +import ( + "flag" + "fmt" + + "github.com/mitchellh/cli" +) + +// a list of generators for all the subcommand types +var subCommandFactory = map[string]cli.CommandFactory{ + "list": func() (cli.Command, error) { + return &listCommand{}, nil + }, + "test": func() (cli.Command, error) { + return &testCommand{}, nil + }, + "run": func() (cli.Command, error) { + return &runCommand{}, nil + }, + "eval": func() (cli.Command, error) { + return &evalCommand{}, nil + }, +} + +// listCommand loads the config, and lists the configured nodes +type listCommand struct { + configFilename string +} + +func (c *listCommand) Synopsis() string { + return "list all configured nodes" +} + +func (c *listCommand) Help() string { + return `Usage: trasporter list --config [file] + + list the nodes that have been configured in the configuration yaml` +} + +func (c *listCommand) Run(args []string) int { + var configFilename string + cmdFlags := flag.NewFlagSet("list", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Help() } + cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file") + cmdFlags.Parse(args) + + config, err := LoadConfig(configFilename) + if err != nil { + fmt.Println(err) + return 1 + } + + for _, v := range config.Nodes { + fmt.Println(v) + } + + return 0 +} + +// runCommand loads a js file, and compiles and runs a +// javascript pipeline defined therein +type runCommand struct { +} + +func newRunCommand() (cli.Command, error) { + return &runCommand{}, nil +} + +func (c *runCommand) Help() string { + return `Usage: transporter run [--config file] + +Run a transporter transporter application by sourcing a file containing the javascript application +and compiling the transporter pipeline` +} + +func (c *runCommand) Synopsis() string { + return "Run a transporter application loaded from a file" +} + +func (c *runCommand) Run(args []string) int { + var configFilename string + cmdFlags := flag.NewFlagSet("run", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Help() } + cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file") + cmdFlags.Parse(args) + + config, err := LoadConfig(configFilename) + if err != nil { + fmt.Println(err) + return 1 + } + + builder, err := NewJavascriptBuilder(config, cmdFlags.Args()[0], "") + if err != nil { + fmt.Println(err) + return 1 + } + if err = builder.Build(); err != nil { + fmt.Println(err) + return 1 + } + + if err = builder.Run(); err != nil { + fmt.Println(err) + return 1 + } + return 0 +} + +// runCommand loads a js file, and compiles and runs a +// javascript pipeline defined therein +type testCommand struct { +} + +func (c *testCommand) Help() string { + return `Usage: transporter test [--config file] + +Compile a transporter application by sourcing an application file, but do not run it` +} + +func (c *testCommand) Synopsis() string { + return "display the compiled nodes without starting a pipeline" +} + +func (c *testCommand) Run(args []string) int { + var configFilename string + cmdFlags := flag.NewFlagSet("test", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Help() } + cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file") + cmdFlags.Parse(args) + + config, err := LoadConfig(configFilename) + if err != nil { + fmt.Println(err) + return 1 + } + + builder, err := NewJavascriptBuilder(config, cmdFlags.Args()[0], "") + if err != nil { + fmt.Println(err) + return 1 + } + if err = builder.Build(); err != nil { + fmt.Println(err) + return 1 + } + fmt.Println(builder) + return 0 +} + +// evalCommand compiles inline javascript into a transporter pipeline, +// and runs it +type evalCommand struct { +} + +func (c *evalCommand) Help() string { + return `Usage: transporter eval [--config file] + +Compile a transporter application by evaluating the given javascript` +} + +func (c *evalCommand) Synopsis() string { + return "Eval javascript to build and run a transporter application" +} + +func (c *evalCommand) Run(args []string) int { + var configFilename string + cmdFlags := flag.NewFlagSet("run", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Help() } + cmdFlags.StringVar(&configFilename, "config", "config.yaml", "config file") + cmdFlags.Parse(args) + + config, err := LoadConfig(configFilename) + if err != nil { + fmt.Println(err) + return 1 + } + + builder, err := NewJavascriptBuilder(config, "", cmdFlags.Args()[0]) + if err != nil { + fmt.Println(err) + return 1 + } + if err = builder.Build(); err != nil { + fmt.Println(err) + return 1 + } + + if err = builder.Run(); err != nil { + fmt.Println(err) + return 1 + } + + return 0 +} diff --git a/cmd/transporter/config.go b/cmd/transporter/config.go new file mode 100644 index 000000000..4dc4f5d55 --- /dev/null +++ b/cmd/transporter/config.go @@ -0,0 +1,58 @@ +package main + +import ( + "fmt" + "io/ioutil" + "os" + "time" + + "gopkg.in/yaml.v2" +) + +// A Config stores meta information about the transporter. This contains a +// list of the the nodes that are available to a transporter (sources and sinks, not transformers) +// as well as information about the api used to handle transporter events, and the interval +// between metrics events. +type Config struct { + API struct { + URI string `json:"uri" yaml:"uri"` // Uri to connect to + MetricsInterval string `json:"interval" yaml:"interval"` // how often to emit metrics, (in ms) + Key string `json:"key" yaml:"key"` // http basic auth password to send with each event + Pid string `json:"pid" yaml:"pid"` // http basic auth username to send with each event + } `json:"api" yaml:"api"` + Nodes map[string]struct { + Type string `json:"type" yaml:"type"` + URI string `json:"uri" yaml:"uri"` + } +} + +// LoadConfig loads a config yaml from a file on disk. +// if the pid is not set in the yaml, pull it from the environment TRANSPORTER_PID. +// if that env var isn't present, then generate a pid +func LoadConfig(filename string) (config Config, err error) { + if filename == "" { + return + } + + ba, err := ioutil.ReadFile(filename) + if err != nil { + return + } + + err = yaml.Unmarshal(ba, &config) + + for k, v := range config.Nodes { + config.Nodes[k] = v + } + + if len(config.API.Pid) < 1 { + config.API.Pid = os.Getenv("TRANSPORTER_PID") + } + + if len(config.API.Pid) < 1 { + hostname, _ := os.Hostname() + config.API.Pid = fmt.Sprintf("%s@%d", hostname, time.Now().Unix()) + } + + return +} diff --git a/cmd/transporter/javascript_builder.go b/cmd/transporter/javascript_builder.go new file mode 100644 index 000000000..e1d5efa7a --- /dev/null +++ b/cmd/transporter/javascript_builder.go @@ -0,0 +1,247 @@ +package main + +import ( + "fmt" + "path/filepath" + "time" + + "github.com/compose/transporter/pkg/adaptor" + "github.com/compose/transporter/pkg/transporter" + "github.com/nu7hatch/gouuid" + "github.com/robertkrimen/otto" +) + +// JavascriptBuilder runs the javascript provided and uses it to compile a +// list of transporter nodes and instantiate a transporter pipeline +type JavascriptBuilder struct { + file string + path string + script *otto.Script + vm *otto.Otto + + nodes map[string]Node + pipelines []*transporter.Pipeline + + err error + config Config +} + +// NewJavascriptBuilder compiles the supplied javascript and creates a Javascriptbulder +func NewJavascriptBuilder(config Config, file, src string) (*JavascriptBuilder, error) { + js := &JavascriptBuilder{ + file: file, + vm: otto.New(), + path: filepath.Dir(file), + config: config, + nodes: make(map[string]Node), + pipelines: make([]*transporter.Pipeline, 0), + } + + var ( + script *otto.Script + err error + ) + if src != "" { + script, err = js.vm.Compile("", src) + } else { + script, err = js.vm.Compile(file, nil) + } + + if err != nil { + return js, err + } + js.script = script + js.vm.Set("Source", js.source) + + return js, nil +} + +// source initialize a transporter Node as a source and adds it to the builder's node map. +// Source(..) takes one argument, a javascript hash which generally contains at +// least a name and a namespace property +// {name: .., namespace: ..} +func (js *JavascriptBuilder) source(call otto.FunctionCall) otto.Value { + if len(call.ArgumentList) != 1 { + js.err = fmt.Errorf("Source must be called with 1 arg. (%d given)", len(call.ArgumentList)) + return otto.NullValue() + } + + node, err := js.findNode(call.Argument(0)) + if err != nil { + js.err = err + return otto.NullValue() + } + js.nodes[node.UUID] = node // persist this + + nodeObject, err := node.Object() + if err != nil { + js.err = err + return otto.NullValue() + } + + js.setFunc(nodeObject, "transform", js.transform) + js.setFunc(nodeObject, "save", js.save) + return nodeObject.Value() +} + +// save adds a sink to the transporter pipeline +// each pipeline can have multiple sinks +func (js *JavascriptBuilder) save(node Node, call otto.FunctionCall) (Node, error) { + thisNode, err := js.findNode(call.Argument(0)) + if err != nil { + return node, err + } + root := js.nodes[node.RootUUID] + + if node.UUID == root.UUID { // save is being called on a root node + root.Add(&thisNode) + } else { + node.Add(&thisNode) // add the generated not to the `this` + root.Add(&node) // add the result to the root + } + + js.nodes[root.UUID] = root + return root, err +} + +// adds a transform function to the transporter pipeline +// transform takes one argument, which is a path to a transformer file. +func (js *JavascriptBuilder) transform(node Node, call otto.FunctionCall) (Node, error) { + if !call.Argument(0).IsString() { + return node, fmt.Errorf("bad arguments, expected string, got %T", call.Argument(0).Class()) + } + + fn, _ := call.Argument(0).Export() + + filename := fn.(string) + if !filepath.IsAbs(filename) { + filename = filepath.Join(js.path, filename) + } + name, err := uuid.NewV4() + if err != nil { + return node, err + } + transformer, err := NewNode(name.String(), "transformer", adaptor.Config{"filename": filename}) + if err != nil { + return node, err + } + + node.Add(&transformer) + + return transformer, nil +} + +// pipelines in javascript are chainable, you take in a pipeline, and you return a pipeline +// we just generalize some of that logic here +func (js *JavascriptBuilder) setFunc(obj *otto.Object, token string, fn func(Node, otto.FunctionCall) (Node, error)) error { + return obj.Set(token, func(call otto.FunctionCall) otto.Value { + this, _ := call.This.Export() + + node, err := CreateNode(this) + if err != nil { + js.err = err + return otto.NullValue() + } + + node, err = fn(node, call) + if err != nil { + js.err = err + return otto.NullValue() + } + + o, err := node.Object() + if err != nil { + js.err = err + return otto.NullValue() + } + + js.setFunc(o, "transform", js.transform) + js.setFunc(o, "save", js.save) + + return o.Value() + }) +} + +// find the node from the based ont the hash passed in +// the hash needs to at least have a {name: }property +func (js *JavascriptBuilder) findNode(in otto.Value) (n Node, err error) { + e, err := in.Export() + if err != nil { + return n, err + } + + rawMap, ok := e.(map[string]interface{}) + if !ok { + return n, fmt.Errorf("first argument must be an hash. (got %T instead)", in) + } + + // make sure the hash validates. + // we need a "name" property, and it must be a string + if _, ok := rawMap["name"]; !ok { + return n, fmt.Errorf("hash requires a name") + } + sourceString, ok := rawMap["name"].(string) + if !(ok) { + return n, fmt.Errorf("hash requires a name") + } + + val, ok := js.config.Nodes[sourceString] + if !ok { + return n, fmt.Errorf("no configured nodes found named %s", sourceString) + } + rawMap["uri"] = val.URI + + return NewNode(sourceString, val.Type, rawMap) +} + +// Build runs the javascript script. +// each call to the Source() in the javascript creates a new JavascriptPipeline struct, +// and transformers and sinks are added with calls to Transform(), and Save(). +// the call to Transporter.add(pipeline) adds the JavascriptPipeline to the Builder's js_pipeline property +func (js *JavascriptBuilder) Build() error { + _, err := js.vm.Run(js.script) + if js.err != nil { + return js.err + } + if err != nil { + return err + } + + for _, node := range js.nodes { + n := node.CreateTransporterNode() + + interval, err := time.ParseDuration(js.config.API.MetricsInterval) + if err != nil { + return err + } + + pipeline, err := transporter.NewDefaultPipeline(n, js.config.API.URI, js.config.API.Key, js.config.API.Pid, interval) + if err != nil { + return err + } + js.pipelines = append(js.pipelines, pipeline) // remember this pipeline + } + + return nil +} + +// Run runs each of the transporter pipelines sequentially +func (js *JavascriptBuilder) Run() error { + for _, p := range js.pipelines { + err := p.Run() + if err != nil { + return err + } + } + + return nil +} + +// String represents the pipelines as a string +func (js *JavascriptBuilder) String() string { + out := "TransporterApplication:\n" + for _, p := range js.pipelines { + out += fmt.Sprintf("%s", p.String()) + } + return out +} diff --git a/cmd/transporter/main.go b/cmd/transporter/main.go new file mode 100644 index 000000000..9fc9fdc81 --- /dev/null +++ b/cmd/transporter/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "log" + "os" + + "github.com/mitchellh/cli" +) + +func main() { + + log.SetPrefix("transporter: ") + log.SetFlags(0) + + c := cli.NewCLI("transporter", "1.0.0") + + c.Args = os.Args[1:] + c.Commands = map[string]cli.CommandFactory{ + "list": subCommandFactory["list"], + "run": subCommandFactory["run"], + "eval": subCommandFactory["eval"], + "test": subCommandFactory["test"], + } + + exitStatus, err := c.Run() + if err != nil { + log.Println(err) + } + + os.Exit(exitStatus) + +} diff --git a/cmd/transporter/node.go b/cmd/transporter/node.go new file mode 100644 index 000000000..c749f8f96 --- /dev/null +++ b/cmd/transporter/node.go @@ -0,0 +1,76 @@ +package main + +import ( + "encoding/json" + "fmt" + + "github.com/compose/transporter/pkg/adaptor" + "github.com/compose/transporter/pkg/transporter" + "github.com/nu7hatch/gouuid" + "github.com/robertkrimen/otto" +) + +// Node is a struct modelled after the transporter.Node struct, but +// more easily able to serialize to json for to use within the application.js +type Node struct { + UUID string + Name string `json:"name"` + Type string `json:"type"` + Extra adaptor.Config `json:"extra"` + Children []*Node `json:"children"` + RootUUID string +} + +// NewNode creates a node +func NewNode(name, kind string, extra adaptor.Config) (node Node, err error) { + uuid, err := uuid.NewV4() + if err != nil { + return node, err + } + + return Node{UUID: uuid.String(), Name: name, Type: kind, Extra: extra, RootUUID: uuid.String(), Children: make([]*Node, 0)}, nil +} + +// CreateNode creates a node by marshalling an interface to json, +// and then unmarshalling into a struct. Useful in the javascript builder +// to persist nodes in the js environment +func CreateNode(val interface{}) (Node, error) { + t := Node{} + ba, err := json.Marshal(val) + + if err != nil { + return t, err + } + + err = json.Unmarshal(ba, &t) + return t, err +} + +// Object turns this pipeline into an otto Object +func (n *Node) Object() (*otto.Object, error) { + vm := otto.New() + ba, err := json.Marshal(n) + if err != nil { + return nil, err + } + + return vm.Object(fmt.Sprintf(`(%s)`, string(ba))) +} + +// Add will add a node as a child of the current node +func (n *Node) Add(node *Node) { + node.RootUUID = n.RootUUID + n.Children = append(n.Children, node) +} + +// CreateTransporterNode will turn this node into a transporter.Node. +// will recurse down the tree and transform each child +func (n *Node) CreateTransporterNode() *transporter.Node { + self := transporter.NewNode(n.Name, n.Type, n.Extra) + + for _, child := range n.Children { + self.Add(child.CreateTransporterNode()) + } + + return self +} diff --git a/pkg/adaptor/adaptor.go b/pkg/adaptor/adaptor.go new file mode 100644 index 000000000..0027bcbf3 --- /dev/null +++ b/pkg/adaptor/adaptor.go @@ -0,0 +1,128 @@ +package adaptor + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + + "github.com/compose/transporter/pkg/pipe" +) + +var ( + // ErrMissingNode is returned when the requested node type was not found in the map + ErrMissingNode = errors.New("adaptor not found in registry") + + // a registry of adaptor types and their constructors + registry = map[string]interface{}{ + "mongo": NewMongodb, + "file": NewFile, + "elasticsearch": NewElasticsearch, + "influx": NewInfluxdb, + "transformer": NewTransformer, + } +) + +// Register registers an adaptor (database adaptor) for use with Transporter +// The second argument, fn, is a constructor that returns an instance of the +// given adaptor +func Register(name string, fn func(*pipe.Pipe, string, Config) (StopStartListener, error)) { + registry[name] = fn +} + +// StopStartListener defines the interface that all database connectors and nodes must follow. +// Start() consumes data from the interface, +// Listen() listens on a pipe, processes data, and then emits it. +// Stop() shuts down the adaptor +type StopStartListener interface { + Start() error + Listen() error + Stop() error +} + +// Createadaptor instantiates an adaptor given the adaptor type and the Config. +// Constructors are expected to be in the form +// func NewWhatever(p *pipe.Pipe, extra Config) (*Whatever, error) {} +// and are expected to confirm to the adaptor interface +func Createadaptor(kind, path string, extra Config, p *pipe.Pipe) (adaptor StopStartListener, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("cannot create node: %v", r) + } + }() + + fn, ok := registry[kind] + if !ok { + return nil, ErrMissingNode + } + + args := []reflect.Value{ + reflect.ValueOf(p), + reflect.ValueOf(path), + reflect.ValueOf(extra), + } + + result := reflect.ValueOf(fn).Call(args) + + val := result[0] + inter := result[1].Interface() + + if inter != nil { + return nil, inter.(error) + } + + return val.Interface().(StopStartListener), err +} + +// Config is an alias to map[string]interface{} and helps us +// turn a fuzzy document into a conrete named struct +type Config map[string]interface{} + +// Construct will Marshal the Config and then Unmarshal it into a +// named struct the generic map into a proper struct +func (c *Config) Construct(conf interface{}) error { + b, err := json.Marshal(c) + if err != nil { + return err + } + + err = json.Unmarshal(b, conf) + if err != nil { + return err + } + return nil +} + +// GetString returns value stored in the config under the given key, or +// an empty string if the key doesn't exist, or isn't a string value +func (c Config) GetString(key string) string { + i, ok := c[key] + if !ok { + return "" + } + s, ok := i.(string) + if !ok { + return "" + } + return s +} + +// split a namespace into it's elements +// this covers a few standard cases, elasticsearch, mongo, rethink, but it's +// expected to be all inclusive. +func (c *Config) splitNamespace() (string, string, error) { + fields := strings.SplitN(c.GetString("namespace"), ".", 2) + + if len(fields) != 2 { + return "", "", fmt.Errorf("malformed namespace, expected a '.' deliminated string") + } + return fields[0], fields[1], nil +} + +// dbConfig is a standard typed config struct to use for as general purpose config for most databases. +type dbConfig struct { + URI string `json:"uri"` // the database uri + Namespace string `json:"namespace"` // namespace + Debug bool `json:"debug"` // debug mode +} diff --git a/pkg/adaptor/adaptor_test.go b/pkg/adaptor/adaptor_test.go new file mode 100644 index 000000000..2e6471620 --- /dev/null +++ b/pkg/adaptor/adaptor_test.go @@ -0,0 +1,75 @@ +package adaptor + +import ( + "errors" + "reflect" + "testing" + + "github.com/compose/transporter/pkg/pipe" +) + +// a random type that adaptorements the adaptor interface +type Testadaptor struct { + value string +} + +func NewTestadaptor(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + val, ok := extra["value"] + if !ok { + return nil, errors.New("this is an error") + } + return &Testadaptor{value: val.(string)}, nil +} + +func (s *Testadaptor) Start() error { + return nil +} + +func (s *Testadaptor) Stop() error { + return nil +} + +func (s *Testadaptor) Listen() error { + return nil +} + +func TestCreateadaptor(t *testing.T) { + Register("testadaptor", NewTestadaptor) + + data := []struct { + kind string + extra Config + out *Testadaptor + err string + }{ + { + "testadaptor", + Config{"value": "rockettes"}, + &Testadaptor{value: "rockettes"}, + "", + }, + { + "testadaptor", + Config{"blah": "rockettes"}, + &Testadaptor{}, + "this is an error", + }, + { + "notasource", + Config{"blah": "rockettes"}, + nil, + "adaptor not found in registry", + }, + } + for _, v := range data { + adaptor, err := Createadaptor(v.kind, "a/b/c", v.extra, pipe.NewPipe(nil, "some name")) + + if err != nil && err.Error() != v.err { + t.Errorf("\nexpected error: %v\ngot error: %v\n", v.err, err.Error()) + t.FailNow() + } + if !reflect.DeepEqual(v.out, adaptor) && err == nil { + t.Errorf("expected:\n%+v\ngot:\n%+v\n", v.out, adaptor) + } + } +} diff --git a/pkg/adaptor/elasticsearch.go b/pkg/adaptor/elasticsearch.go new file mode 100644 index 000000000..b3090c6fb --- /dev/null +++ b/pkg/adaptor/elasticsearch.go @@ -0,0 +1,140 @@ +package adaptor + +import ( + "fmt" + "net/url" + "strings" + + "github.com/compose/transporter/pkg/message" + "github.com/compose/transporter/pkg/pipe" + elastigo "github.com/mattbaird/elastigo/lib" +) + +// Elasticsearch is an adaptor to connect a pipeline to +// an elasticsearch cluster. +type Elasticsearch struct { + // pull these in from the node + uri *url.URL + + _type string + index string + + pipe *pipe.Pipe + path string + + indexer *elastigo.BulkIndexer + running bool +} + +// NewElasticsearch creates a new Elasticsearch adaptor. +// Elasticsearch adaptors cannot be used as a source, +func NewElasticsearch(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + var ( + conf dbConfig + err error + ) + if err = extra.Construct(&conf); err != nil { + return nil, NewError(CRITICAL, path, fmt.Sprintf("Can't create constructor (%s)", err.Error()), nil) + } + + u, err := url.Parse(conf.URI) + if err != nil { + return nil, err + } + + e := &Elasticsearch{ + uri: u, + pipe: p, + } + + e.index, e._type, err = extra.splitNamespace() + if err != nil { + return e, NewError(CRITICAL, path, fmt.Sprintf("Can't split namespace into _index._type (%s)", err.Error()), nil) + } + + return e, nil +} + +// Start the adaptor as a source (not implemented) +func (e *Elasticsearch) Start() error { + return fmt.Errorf("Elasticsearch can't function as a source") +} + +// Listen starts the listener +func (e *Elasticsearch) Listen() error { + e.setupClient() + e.indexer.Start() + e.running = true + + go func(cherr chan *elastigo.ErrorBuffer) { + for err := range e.indexer.ErrorChannel { + e.pipe.Err <- NewError(CRITICAL, e.path, fmt.Sprintf("Elasticsearch error (%s)", err.Err), nil) + } + }(e.indexer.ErrorChannel) + + defer func() { + if e.running { + e.running = false + e.pipe.Stop() + e.indexer.Stop() + } + }() + + return e.pipe.Listen(e.applyOp) +} + +// Stop the adaptor +func (e *Elasticsearch) Stop() error { + if e.running { + e.running = false + e.pipe.Stop() + e.indexer.Stop() + } + return nil +} + +func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) { + if msg.Op == message.Command { + err := e.runCommand(msg) + e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Document()) + return msg, nil + } + + err := e.indexer.Index(e.index, e._type, msg.IDString(), "", nil, msg.Document(), false) + e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Document()) + return msg, nil +} + +func (e *Elasticsearch) setupClient() { + // set up the client, we need host(s), port, username, password, and scheme + client := elastigo.NewConn() + + if e.uri.User != nil { + client.Username = e.uri.User.Username() + if password, set := e.uri.User.Password(); set { + client.Password = password + } + } + + // we might have a port in the host bit + hostBits := strings.Split(e.uri.Host, ":") + if len(hostBits) > 1 { + client.SetPort(hostBits[1]) + } + + client.SetHosts(strings.Split(hostBits[0], ",")) + client.Protocol = e.uri.Scheme + + e.indexer = client.NewBulkIndexerErrors(10, 60) +} + +func (e *Elasticsearch) runCommand(msg *message.Msg) error { + if _, hasKey := msg.Document()["flush"]; hasKey { + e.indexer.Flush() + } + return nil +} + +func (e *Elasticsearch) getNamespace() string { + return strings.Join([]string{e.index, e._type}, ".") +} diff --git a/pkg/adaptor/errors.go b/pkg/adaptor/errors.go new file mode 100644 index 000000000..3f702798b --- /dev/null +++ b/pkg/adaptor/errors.go @@ -0,0 +1,61 @@ +package adaptor + +import ( + "fmt" + + "gopkg.in/mgo.v2/bson" +) + +// Adaptor errors have levels to indicate their severity. +// CRITICAL errors indicate that the program cannot continue running. +// +// ERROR errors indicate a problem with a specific document or message. +// a document might not have been applied properly to a source, but the program can continue +// +// WARNING Todo +// +// NOTICE ToDo +const ( + NOTICE ErrorLevel = iota + WARNING + ERROR + CRITICAL +) + +// ErrorLevel indicated the severity of the error +type ErrorLevel int + +func levelToString(lvl ErrorLevel) string { + switch lvl { + case NOTICE: + return "NOTICE" + case WARNING: + return "WARNING" + case ERROR: + return "ERROR" + case CRITICAL: + return "CRITICAL" + default: + return "UNKNOWN" + } +} + +// Error is an error that happened during an adaptor's operation. +// Error's include both an indication of the severity, Level, as well as +// a reference to the Record that was in process when the error occured +type Error struct { + Lvl ErrorLevel + Str string + Path string + Record bson.M +} + +// NewError creates an Error type with the specificed level, path, message and record +func NewError(lvl ErrorLevel, path, str string, record bson.M) Error { + return Error{Lvl: lvl, Path: path, Str: str, Record: record} +} + +// Error returns the error as a string +func (t Error) Error() string { + return fmt.Sprintf("%s: %s", levelToString(t.Lvl), t.Str) +} diff --git a/pkg/adaptor/file.go b/pkg/adaptor/file.go new file mode 100644 index 000000000..a8805e681 --- /dev/null +++ b/pkg/adaptor/file.go @@ -0,0 +1,126 @@ +package adaptor + +import ( + "encoding/json" + "fmt" + "io" + "os" + "strings" + + "github.com/compose/transporter/pkg/message" + "github.com/compose/transporter/pkg/pipe" +) + +// File is an adaptor that can be used as a +// source / sink for file's on disk, as well as a sink to stdout. +type File struct { + uri string + pipe *pipe.Pipe + path string + filehandle *os.File +} + +// NewFile returns a File Adaptor +func NewFile(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + var ( + conf FileConfig + err error + ) + if err = extra.Construct(&conf); err != nil { + return nil, NewError(CRITICAL, path, fmt.Sprintf("Can't configure adaptor (%s)", err.Error()), nil) + } + + return &File{ + uri: conf.URI, + pipe: p, + path: path, + }, nil +} + +// Start the file adaptor +// TODO: we only know how to listen on stdout for now +func (d *File) Start() (err error) { + defer func() { + d.Stop() + }() + + return d.readFile() +} + +// Listen starts the listen loop +func (d *File) Listen() (err error) { + defer func() { + d.Stop() + }() + + if strings.HasPrefix(d.uri, "file://") { + filename := strings.Replace(d.uri, "file://", "", 1) + d.filehandle, err = os.Create(filename) + if err != nil { + d.pipe.Err <- NewError(CRITICAL, d.path, fmt.Sprintf("Can't open output file (%s)", err.Error()), nil) + return err + } + } + + return d.pipe.Listen(d.dumpMessage) +} + +// Stop the adaptor +func (d *File) Stop() error { + d.pipe.Stop() + return nil +} + +/* + * read each message from the file + */ +func (d *File) readFile() (err error) { + filename := strings.Replace(d.uri, "file://", "", 1) + d.filehandle, err = os.Open(filename) + if err != nil { + d.pipe.Err <- NewError(CRITICAL, d.path, fmt.Sprintf("Can't open input file (%s)", err.Error()), nil) + return err + } + + decoder := json.NewDecoder(d.filehandle) + for { + var doc map[string]interface{} + if err := decoder.Decode(&doc); err == io.EOF { + break + } else if err != nil { + d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't marshal document (%s)", err.Error()), nil) + return err + } + d.pipe.Send(message.NewMsg(message.Insert, doc)) + } + return nil +} + +/* + * dump each message to the file + */ +func (d *File) dumpMessage(msg *message.Msg) (*message.Msg, error) { + jdoc, err := json.Marshal(msg.Document()) + if err != nil { + d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Document()) + return msg, nil + } + + if strings.HasPrefix(d.uri, "stdout://") { + fmt.Println(string(jdoc)) + } else { + _, err = fmt.Fprintln(d.filehandle, string(jdoc)) + if err != nil { + d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Document()) + return msg, nil + } + } + + return msg, nil +} + +// FileConfig is used to configure the File Adaptor, +type FileConfig struct { + // URI pointing to the resource. We only recognize file:// and stdout:// currently + URI string `json:"uri"` +} diff --git a/pkg/adaptor/influxdb.go b/pkg/adaptor/influxdb.go new file mode 100644 index 000000000..b177fe7d5 --- /dev/null +++ b/pkg/adaptor/influxdb.go @@ -0,0 +1,118 @@ +package adaptor + +import ( + "fmt" + "net/url" + + "github.com/compose/transporter/pkg/message" + "github.com/compose/transporter/pkg/pipe" + "github.com/influxdb/influxdb/client" +) + +// Influxdb is an adaptor that writes metrics to influxdb (http://influxdb.com/) +// a high performant time series database +type Influxdb struct { + // pull these in from the node + uri *url.URL + + // save time by setting these once + database string + seriesName string + + // + pipe *pipe.Pipe + path string + + // influx connection and options + influxClient *client.Client +} + +// NewInfluxdb creates an Influxdb adaptor +func NewInfluxdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + var ( + conf dbConfig + err error + ) + if err = extra.Construct(&conf); err != nil { + return nil, err + } + + u, err := url.Parse(conf.URI) + if err != nil { + return nil, err + } + + i := &Influxdb{ + uri: u, + pipe: p, + path: path, + } + + i.database, i.seriesName, err = extra.splitNamespace() + if err != nil { + return i, err + } + + return i, nil +} + +// Start the adaptor as a source (not implemented) +func (i *Influxdb) Start() error { + return fmt.Errorf("Influxdb can't function as a source") +} + +// Listen starts the listener +func (i *Influxdb) Listen() (err error) { + i.influxClient, err = i.setupClient() + if err != nil { + i.pipe.Err <- err + return err + } + + return i.pipe.Listen(i.applyOp) +} + +// Stop the adaptor +func (i *Influxdb) Stop() error { + i.pipe.Stop() + return nil +} + +func (i *Influxdb) applyOp(msg *message.Msg) (*message.Msg, error) { + switch msg.Op { + case message.Insert: + docSize := len(msg.Document()) + columns := make([]string, 0, docSize) + points := make([][]interface{}, 1) + points[0] = make([]interface{}, 0, docSize) + for k := range msg.Document() { + columns = append(columns, k) + points[0] = append(points[0], msg.Document()[k]) + } + series := &client.Series{ + Name: i.seriesName, + Columns: columns, + Points: points, + } + + return msg, i.influxClient.WriteSeries([]*client.Series{series}) + } + return msg, nil +} + +func (i *Influxdb) setupClient() (influxClient *client.Client, err error) { + // set up the clientConfig, we need host:port, username, password, and database name + clientConfig := &client.ClientConfig{ + Database: i.database, + } + + if i.uri.User != nil { + clientConfig.Username = i.uri.User.Username() + if password, set := i.uri.User.Password(); set { + clientConfig.Password = password + } + } + clientConfig.Host = i.uri.Host + + return client.NewClient(clientConfig) +} diff --git a/pkg/adaptor/mongodb.go b/pkg/adaptor/mongodb.go new file mode 100644 index 000000000..e9a31f31b --- /dev/null +++ b/pkg/adaptor/mongodb.go @@ -0,0 +1,304 @@ +package adaptor + +import ( + "fmt" + "strings" + "time" + + "github.com/compose/transporter/pkg/message" + "github.com/compose/transporter/pkg/pipe" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" +) + +// Mongodb is an adaptor to read / write to mongodb. +// it works as a source by copying files, and then optionally tailing the oplog +type Mongodb struct { + // pull these in from the node + uri string + tail bool // run the tail oplog + debug bool + + // save time by setting these once + collection string + database string + + oplogTime bson.MongoTimestamp + + // + pipe *pipe.Pipe + path string + + // mongo connection and options + mongoSession *mgo.Session + oplogTimeout time.Duration + + restartable bool // this refers to being able to refresh the iterator, not to the restart based on session op +} + +// NewMongodb creates a new Mongodb adaptor +func NewMongodb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + var ( + conf MongodbConfig + err error + ) + if err = extra.Construct(&conf); err != nil { + return nil, err + } + + if conf.URI == "" || conf.Namespace == "" { + return nil, fmt.Errorf("both uri and namespace required, but missing ") + } + + if conf.Debug { + fmt.Printf("Mongo Config %+v\n", conf) + } + + m := &Mongodb{ + restartable: true, // assume for that we're able to restart the process + oplogTimeout: 5 * time.Second, // timeout the oplog iterator + pipe: p, + uri: conf.URI, + tail: conf.Tail, + debug: conf.Debug, + path: path, + } + + m.database, m.collection, err = m.splitNamespace(conf.Namespace) + if err != nil { + return m, err + } + + m.mongoSession, err = mgo.Dial(m.uri) + return m, err +} + +// Start the adaptor as a source +func (m *Mongodb) Start() (err error) { + defer func() { + m.pipe.Stop() + }() + + m.oplogTime = nowAsMongoTimestamp() + if m.debug { + fmt.Printf("setting start timestamp: %d", m.oplogTime) + } + + err = m.catData() + if err != nil { + m.pipe.Err <- err + return err + } + if m.tail { + // replay the oplog + err = m.tailData() + if err != nil { + m.pipe.Err <- err + return err + } + } + + return +} + +// Listen starts the pipe's listener +func (m *Mongodb) Listen() (err error) { + defer func() { + m.pipe.Stop() + }() + return m.pipe.Listen(m.writeMessage) +} + +// Stop the adaptor +func (m *Mongodb) Stop() error { + m.pipe.Stop() + return nil +} + +// writeMessage writes one message to the destination mongo, or sends an error down the pipe +// TODO this can be cleaned up. I'm not sure whether this should pipe the error, or whether the +// caller should pipe the error +func (m *Mongodb) writeMessage(msg *message.Msg) (*message.Msg, error) { + collection := m.mongoSession.DB(m.database).C(m.collection) + err := collection.Insert(msg.Document()) + if mgo.IsDup(err) { + err = collection.Update(bson.M{"_id": msg.ID}, msg.Document()) + } + if err != nil { + m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), msg.Document()) + } + return msg, nil +} + +// catdata pulls down the original collection +func (m *Mongodb) catData() (err error) { + var ( + collection = m.mongoSession.DB(m.database).C(m.collection) + query = bson.M{} + result bson.M // hold the document + ) + + iter := collection.Find(query).Sort("_id").Iter() + + for { + for iter.Next(&result) { + if stop := m.pipe.Stopped; stop { + return + } + + // set up the message + msg := message.NewMsg(message.Insert, result) + + m.pipe.Send(msg) + result = bson.M{} + } + + // we've exited the mongo read loop, lets figure out why + // check here again if we've been asked to quit + if stop := m.pipe.Stopped; stop { + return + } + + if iter.Err() != nil && m.restartable { + fmt.Printf("got err reading collection. reissuing query %v\n", iter.Err()) + time.Sleep(1 * time.Second) + iter = collection.Find(query).Sort("_id").Iter() + continue + } + + return + } +} + +/* + * tail the oplog + */ +func (m *Mongodb) tailData() (err error) { + + var ( + collection = m.mongoSession.DB("local").C("oplog.rs") + result oplogDoc // hold the document + query = bson.M{ + "ts": bson.M{"$gte": m.oplogTime}, + "ns": m.getNamespace(), + } + + iter = collection.Find(query).LogReplay().Sort("$natural").Tail(m.oplogTimeout) + ) + + for { + for iter.Next(&result) { + if stop := m.pipe.Stopped; stop { + return + } + if result.validOp() { + msg := message.NewMsg(message.OpTypeFromString(result.Op), nil) + msg.Timestamp = int64(result.Ts) >> 32 + + switch result.Op { + case "i": + msg.SetDocument(result.O) + case "d": + msg.SetDocument(result.O) + case "u": + doc, err := m.getOriginalDoc(result.O2) + if err != nil { // errors aren't fatal here, but we need to send it down the pipe + m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), nil) + continue + } + msg.SetDocument(doc) + default: + m.pipe.Err <- NewError(ERROR, m.path, "Mongodb error (unknown op type)", nil) + continue + } + m.oplogTime = result.Ts + m.pipe.Send(msg) + } + result = oplogDoc{} + } + + // we've exited the mongo read loop, lets figure out why + // check here again if we've been asked to quit + if stop := m.pipe.Stopped; stop { + return + } + if iter.Timeout() { + continue + } + if iter.Err() != nil { + return NewError(CRITICAL, m.path, fmt.Sprintf("Mongodb error (error reading collection %s)", iter.Err()), nil) + } + + // query will change, + query = bson.M{ + "ts": bson.M{"$gte": m.oplogTime}, + "ns": m.getNamespace(), + } + iter = collection.Find(query).LogReplay().Tail(m.oplogTimeout) + } +} + +// getOriginalDoc retrieves the original document from the database. transport has no knowledge of update operations, all updates +// work as wholesale document replaces +func (m *Mongodb) getOriginalDoc(doc bson.M) (result bson.M, err error) { + id, exists := doc["_id"] + if !exists { + return result, fmt.Errorf("Can't get _id from document") + } + + err = m.mongoSession.DB(m.database).C(m.collection).FindId(id).One(&result) + if err != nil { + err = fmt.Errorf("%s %v %v", m.getNamespace(), id, err) + } + return +} + +func (m *Mongodb) getNamespace() string { + return strings.Join([]string{m.database, m.collection}, ".") +} + +// splitNamespace split's a mongo namespace by the first '.' into a database and a collection +func (m *Mongodb) splitNamespace(namespace string) (string, string, error) { + fields := strings.SplitN(namespace, ".", 2) + + if len(fields) != 2 { + return "", "", fmt.Errorf("malformed mongo namespace") + } + return fields[0], fields[1], nil +} + +// oplogDoc are representations of the mongodb oplog document +// detailed here, among other places. http://www.kchodorow.com/blog/2010/10/12/replication-internals/ +type oplogDoc struct { + Ts bson.MongoTimestamp `bson:"ts"` + H int64 `bson:"h"` + V int `bson:"v"` + Op string `bson:"op"` + Ns string `bson:"ns"` + O bson.M `bson:"o"` + O2 bson.M `bson:"o2"` +} + +// validOp checks to see if we're an insert, delete, or update, otherwise the +// document is skilled. +// TODO: skip system collections +func (o *oplogDoc) validOp() bool { + return o.Op == "i" || o.Op == "d" || o.Op == "u" +} + +// MongodbConfig provides configuration options for a mongodb adaptor +// the notable difference between this and dbConfig is the presence of the Tail option +type MongodbConfig struct { + URI string `json:"uri"` + Namespace string `json:"namespace"` + Debug bool `json:"debug"` + Tail bool `json:"tail"` +} + +func nowAsMongoTimestamp() bson.MongoTimestamp { + return bson.MongoTimestamp(time.Now().Unix() << 32) +} + +func newMongoTimestamp(s, i int) bson.MongoTimestamp { + return bson.MongoTimestamp(int64(s)<<32 + int64(i)) +} diff --git a/pkg/adaptor/rethinkdb.go b/pkg/adaptor/rethinkdb.go new file mode 100644 index 000000000..73600b6db --- /dev/null +++ b/pkg/adaptor/rethinkdb.go @@ -0,0 +1,137 @@ +package adaptor + +import ( + "fmt" + "net/url" + "strings" + "time" + + "github.com/compose/transporter/pkg/message" + "github.com/compose/transporter/pkg/pipe" + gorethink "github.com/dancannon/gorethink" +) + +// Rethinkdb is an adaptor that writes metrics to rethinkdb (http://rethinkdb.com/) +// An open-source distributed database +type Rethinkdb struct { + // pull these in from the config + uri *url.URL + + // save time by setting these once + database string + table string + + debug bool + + // + pipe *pipe.Pipe + path string + + // rethinkdb connection and options + client *gorethink.Session +} + +// NewRethinkdb creates a new Rethinkdb database adaptor +func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + var ( + conf dbConfig + err error + ) + if err = extra.Construct(&conf); err != nil { + return nil, err + } + + u, err := url.Parse(conf.URI) + if err != nil { + return nil, err + } + + r := &Rethinkdb{ + uri: u, + pipe: p, + path: path, + } + + r.database, r.table, err = extra.splitNamespace() + if err != nil { + return r, err + } + r.debug = conf.Debug + + return r, nil +} + +// Start the adaptor as a source (not implemented) +func (r *Rethinkdb) Start() error { + return fmt.Errorf("Rethinkdb can't function as a source") +} + +// Listen start's the adaptor's listener +func (r *Rethinkdb) Listen() (err error) { + r.client, err = r.setupClient() + if err != nil { + r.pipe.Err <- err + return err + } + + return r.pipe.Listen(r.applyOp) +} + +// Stop the adaptor +func (r *Rethinkdb) Stop() error { + r.pipe.Stop() + return nil +} + +// applyOp applies one operation to the database +func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) { + var ( + resp gorethink.WriteResponse + err error + ) + + switch msg.Op { + case message.Delete: + resp, err = gorethink.Table(r.table).Get(msg.IDString()).Delete().RunWrite(r.client) + case message.Insert: + resp, err = gorethink.Table(r.table).Insert(msg.Document()).RunWrite(r.client) + case message.Update: + resp, err = gorethink.Table(r.table).Insert(msg.DocumentWithID("id"), gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client) + } + if err != nil { + return msg, err + } + + return msg, r.handleResponse(&resp) +} + +func (r *Rethinkdb) setupClient() (*gorethink.Session, error) { + // set up the clientConfig, we need host:port, username, password, and database name + client, err := gorethink.Connect(gorethink.ConnectOpts{ + Address: r.uri.Host, + MaxIdle: 10, + IdleTimeout: time.Second * 10, + }) + if err != nil { + return nil, fmt.Errorf("Unable to connect: %s", err) + } + + gorethink.Db(r.database).TableDrop(r.table).RunWrite(client) + gorethink.Db(r.database).TableCreate(r.table).RunWrite(client) + + client.Use(r.database) + return client, nil +} + +// handleresponse takes the rethink response and turn it into something we can consume elsewhere +func (r *Rethinkdb) handleResponse(resp *gorethink.WriteResponse) error { + if resp.Errors != 0 { + if !strings.Contains(resp.FirstError, "Duplicate primary key") { // we don't care about this error + if r.debug { + fmt.Printf("Reported %d errors\n", resp.Errors) + } + return fmt.Errorf("%s\n%s", "Problem inserting docs", resp.FirstError) + } + } + return nil +} diff --git a/pkg/adaptor/transformer.go b/pkg/adaptor/transformer.go new file mode 100644 index 000000000..2eb64bb04 --- /dev/null +++ b/pkg/adaptor/transformer.go @@ -0,0 +1,170 @@ +package adaptor + +import ( + "fmt" + "io/ioutil" + "time" + + "github.com/compose/mejson" + "github.com/compose/transporter/pkg/message" + "github.com/compose/transporter/pkg/pipe" + "github.com/robertkrimen/otto" + _ "github.com/robertkrimen/otto/underscore" // enable underscore +) + +// Transformer is an adaptor which consumes data from a source, transforms it using a supplied javascript +// function and then emits it. The javascript transformation function is supplied as a seperate file on disk, +// and is called by calling the defined module.exports function +type Transformer struct { + fn string + + pipe *pipe.Pipe + path string + + debug bool + script *otto.Script + vm *otto.Otto +} + +// NewTransformer creates a new transformer object +func NewTransformer(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { + var ( + conf TransformerConfig + err error + ) + if err = extra.Construct(&conf); err != nil { + return nil, err + } + + t := &Transformer{pipe: p, path: path} + + if conf.Filename == "" { + return t, fmt.Errorf("No filename specified") + } + + ba, err := ioutil.ReadFile(conf.Filename) + if err != nil { + return t, err + } + + t.fn = string(ba) + + return t, nil +} + +// Listen starts the transformer's listener, reads each message from the incoming channel +// transformers it into mejson, and then uses the supplied javascript module.exports function +// to transform the document. The document is then emited to this adaptor's children +func (t *Transformer) Listen() (err error) { + t.vm = otto.New() + + // set up the vm environment, make `module = {}` + if _, err = t.vm.Run(`module = {}`); err != nil { + return t.transformerError(CRITICAL, err, nil) + } + + // compile our script + if t.script, err = t.vm.Compile("", t.fn); err != nil { + return t.transformerError(CRITICAL, err, nil) + } + + // run the script, ignore the output + _, err = t.vm.Run(t.script) + if err != nil { + return t.transformerError(CRITICAL, err, nil) + } + + return t.pipe.Listen(t.transformOne) +} + +// Start the adaptor as a source (not implemented for this adaptor) +func (t *Transformer) Start() error { + return fmt.Errorf("Transformers can't be used as a source") +} + +// Stop the adaptor +func (t *Transformer) Stop() error { + t.pipe.Stop() + return nil +} + +func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) { + + var ( + doc interface{} + value otto.Value + outDoc otto.Value + result interface{} + err error + ) + + // short circuit for deletes and commands + if msg.Op == message.Delete || msg.Op == message.Command { + return msg, nil + } + + now := time.Now().Nanosecond() + + if doc, err = mejson.Marshal(msg.Document()); err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + + if value, err = t.vm.ToValue(doc); err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + + // now that we have finished casting our map to a bunch of different types, + // lets run our transformer on the document + beforeVM := time.Now().Nanosecond() + if outDoc, err = t.vm.Call(`module.exports`, nil, value); err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + + if result, err = outDoc.Export(); err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + + afterVM := time.Now().Nanosecond() + + switch r := result.(type) { + case map[string]interface{}: + doc, err := mejson.Unmarshal(r) + if err != nil { + t.pipe.Err <- t.transformerError(ERROR, err, msg) + return msg, nil + } + msg.SetDocument(doc) + default: + if t.debug { + fmt.Println("transformer skipping doc") + } + } + + if t.debug { + then := time.Now().Nanosecond() + fmt.Printf("document transformed in %dus. %d to marshal, %d in the vm, %d to unmarshal\n", (then-now)/1000, (beforeVM-now)/1000, (afterVM-beforeVM)/1000, (then-afterVM)/1000) + } + + return msg, nil +} + +func (t *Transformer) transformerError(lvl ErrorLevel, err error, msg *message.Msg) error { + if e, ok := err.(*otto.Error); ok { + return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", e.String()), msg.Document()) + } + return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", err.Error()), msg.Document()) +} + +// TransformerConfig holds config options for a transformer adaptor +type TransformerConfig struct { + // file containing transformer javascript + // must define a module.exports = function(doc) { .....; return doc } + Filename string `json:"filename"` + + // verbose output + Debug bool `json:"debug"` // debug mode +} diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go new file mode 100644 index 000000000..d165a472c --- /dev/null +++ b/pkg/events/emitter.go @@ -0,0 +1,208 @@ +package events + +import ( + "bytes" + "io/ioutil" + "log" + "net/http" + "sync" + "time" +) + +// Emitter types are used by the transporter pipeline to consume events from a pipeline's event channel +// and process them. +// Start() will start the emitter and being consuming events +// Init() serves to set the Emitter's listening channel +// Stop() stops the event loop and releases any resources. Stop is expected to shut down the process cleanly, +// the pipeline process will block until Stop() returns +type Emitter interface { + Start() + Init(chan Event) + Stop() +} + +// HTTPPostEmitter listens on the event channel and posts the events to an http server +// Events are serialized into json, and sent via a POST request to the given Uri +// http errors are logged as warnings to the console, and won't stop the Emitter +type HTTPPostEmitter struct { + uri string + key string + pid string + + inflight *sync.WaitGroup + ch chan Event + chstop chan chan bool +} + +// NewHTTPPostEmitter creates a new HTTPPostEmitter +func NewHTTPPostEmitter(uri, key, pid string) *HTTPPostEmitter { + return &HTTPPostEmitter{ + uri: uri, + key: key, + pid: pid, + chstop: make(chan chan bool), + inflight: &sync.WaitGroup{}, + } +} + +// Start the emitter +func (e *HTTPPostEmitter) Start() { + go e.startEventListener() +} + +// Init sets the event channel +func (e *HTTPPostEmitter) Init(ch chan Event) { + e.ch = ch +} + +// Stop sends a stop signal and waits for the inflight posts to complete before exiting +func (e *HTTPPostEmitter) Stop() { + s := make(chan bool) + e.chstop <- s + <-s + e.inflight.Wait() +} + +func (e *HTTPPostEmitter) startEventListener() { + for { + select { + case s := <-e.chstop: + s <- true + return + case event := <-e.ch: + e.inflight.Add(1) + go func(event Event) { + defer e.inflight.Done() + + ba, err := event.Emit() + if err != err { + log.Printf("EventEmitter Error: %s", err) + return + } + + req, err := http.NewRequest("POST", e.uri, bytes.NewBuffer(ba)) + if err != nil { + log.Printf("EventEmitter Error: %s", err) + return + } + req.Header.Set("Content-Type", "application/json") + if len(e.pid) > 0 && len(e.key) > 0 { + req.SetBasicAuth(e.pid, e.key) + } + cli := &http.Client{} + resp, err := cli.Do(req) + + if err != nil { + log.Printf("EventEmitter Error: %s", err) + return + } + _, err = ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if resp.StatusCode != 200 && resp.StatusCode != 201 { + log.Printf("EventEmitter Error: http error code, expected 200 or 201, got %d, (%s)", resp.StatusCode, ba) + return + } + // fmt.Printf("EventEmitter, got http statuscode:%d for event: %s", resp.StatusCode, event) + }(event) + case <-time.After(100 * time.Millisecond): + continue + // noop + } + } +} + +// NewNoopEmitter constructs a NoopEmitter to use with a transporter pipeline. +// a NoopEmitter consumes the events from the listening channel and does nothing with them +// this is useful for cli utilities that dump output to stdout in any case, and don't want +// to clutter the program's output with metrics +func NewNoopEmitter() *NoopEmitter { + return &NoopEmitter{chstop: make(chan chan bool)} +} + +// NoopEmitter consumes the events from the listening channel and does nothing with them +// this is useful for cli utilities that dump output to stdout in any case, and don't want +// to clutter the program's output with metrics +type NoopEmitter struct { + chstop chan chan bool + ch chan Event +} + +// Start the event consumer +func (e *NoopEmitter) Start() { + go func() { + for { + select { + case s := <-e.chstop: + s <- true + return + case <-e.ch: + continue + case <-time.After(100 * time.Millisecond): + continue + } + } + }() +} + +// Init sets the event channel +func (e *NoopEmitter) Init(ch chan Event) { + e.ch = ch +} + +// Stop the event consumer +func (e *NoopEmitter) Stop() { + s := make(chan bool) + e.chstop <- s + <-s +} + +// NewLogEmitter creates a new LogEmitter +func NewLogEmitter() *LogEmitter { + return &LogEmitter{ + chstop: make(chan chan bool), + } +} + +// LogEmitter constructs a LogEmitter to use with a transporter pipeline. +// A LogEmitter listens on the event channel and uses go's log package to emit the event, +// eg. +// 2014/11/28 16:56:58 boot map[source:mongo out:mongo] +// 2014/11/28 16:56:58 metrics source recordsIn: 0, recordsOut: 203 +// 2014/11/28 16:56:58 exit +// 2014/11/28 16:56:58 metrics source/out recordsIn: 203, recordsOut: 0 +type LogEmitter struct { + chstop chan chan bool + ch chan Event +} + +// Start the emitter +func (e *LogEmitter) Start() { + go e.startEventListener() +} + +// Init sets the event channel +func (e *LogEmitter) Init(ch chan Event) { + e.ch = ch +} + +// Stop the emitter +func (e *LogEmitter) Stop() { + s := make(chan bool) + e.chstop <- s + <-s +} + +func (e *LogEmitter) startEventListener() { + for { + select { + case s := <-e.chstop: + s <- true + return + case event := <-e.ch: + log.Println(event.String()) + case <-time.After(100 * time.Millisecond): + continue + // noop + } + } +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 000000000..e635a2b71 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,130 @@ +package events + +import ( + "encoding/json" + "fmt" + // "time" + + "gopkg.in/mgo.v2/bson" +) + +// Event is an interface that describes data which is produced periodically by the running transporter. +// +// Events come in multiple kinds. baseEvents are emitted when the transporter starts and stops, +// metricsEvents are emittied by each pipe and include a measure of how many messages have been processed +type Event interface { + Emit() ([]byte, error) + String() string +} + +// BaseEvent is an event that is sent when the pipeline has been started or exited +type BaseEvent struct { + Ts int64 `json:"ts"` + Kind string `json:"name"` + Version string `json:"version,omitempty"` + Endpoints map[string]string `json:"endpoints,omitempty"` +} + +// NewBootEvent (surprisingly) creates a new baseEvent +func NewBootEvent(ts int64, version string, endpoints map[string]string) *BaseEvent { + e := &BaseEvent{ + Ts: ts, + Kind: "boot", + Version: version, + Endpoints: endpoints, + } + return e +} + +// NewExitEvent (surprisingly) creates a new BaseEvent +func NewExitEvent(ts int64, version string, endpoints map[string]string) *BaseEvent { + e := &BaseEvent{ + Ts: ts, + Kind: "exit", + Version: version, + Endpoints: endpoints, + } + return e +} + +// Emit prepares the event to be emitted and marshalls the event into an json +func (e *BaseEvent) Emit() ([]byte, error) { + return json.Marshal(e) +} + +// String +func (e *BaseEvent) String() string { + msg := fmt.Sprintf("%s", e.Kind) + msg += fmt.Sprintf(" %v", e.Endpoints) + return msg +} + +// MetricsEvent is an event used to indicated progress. +type MetricsEvent struct { + Ts int64 `json:"ts"` + Kind string `json:"name"` + Path string `json:"path"` + + // Records indicated the total number of documents that have been transmitted + Records int `json:"records"` +} + +// NewMetricsEvent creates a new metrics event +func NewMetricsEvent(ts int64, path string, records int) *MetricsEvent { + e := &MetricsEvent{ + Ts: ts, + Kind: "metrics", + Path: path, + Records: records, + } + return e +} + +// Emit prepares the event to be emitted and marshalls the event into an json +func (e *MetricsEvent) Emit() ([]byte, error) { + return json.Marshal(e) +} + +func (e *MetricsEvent) String() string { + msg := fmt.Sprintf("%s %s", e.Kind, e.Path) + msg += fmt.Sprintf(" records: %d", e.Records) + return msg +} + +// ErrorEvent is an event that indicates an error occured +// during the processing of a pipeline +type ErrorEvent struct { + Ts int64 `json:"ts"` + Kind string `json:"name"` + Path string `json:"path"` + + // Record is the document (if any) that was in progress when the error occured + Record bson.M `json:"record,omitempty"` + + // Message is the error message as a string + Message string `json:"message,omitempty"` +} + +// NewErrorEvent are events sent to indicate a problem processing on one of the nodes +func NewErrorEvent(ts int64, path string, record bson.M, message string) *ErrorEvent { + e := &ErrorEvent{ + Ts: ts, + Kind: "error", + Path: path, + Record: record, + Message: message, + } + return e +} + +// Emit prepares the event to be emitted and marshalls the event into an json +func (e *ErrorEvent) Emit() ([]byte, error) { + return json.Marshal(e) +} + +// String +func (e *ErrorEvent) String() string { + msg := fmt.Sprintf("%s", e.Kind) + msg += fmt.Sprintf(" record: %v, message: %s", e.Record, e.Message) + return msg +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 000000000..eb964948c --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,40 @@ +package events + +import ( + "encoding/json" + "reflect" + "testing" +) + +func TestEvent(t *testing.T) { + + data := []struct { + in Event + want []byte + }{ + { + NewBootEvent(12345, "1.2.3", nil), + []byte("{\"ts\":12345,\"name\":\"boot\",\"version\":\"1.2.3\"}"), + }, + { + NewBootEvent(12345, "1.2.3", map[string]string{"nick": "yay"}), + []byte("{\"ts\":12345,\"name\":\"boot\",\"version\":\"1.2.3\",\"endpoints\":{\"nick\":\"yay\"}}"), + }, + { + NewMetricsEvent(12345, "nick/yay", 1), + []byte("{\"ts\":12345,\"name\":\"metrics\",\"path\":\"nick/yay\",\"records\":1}"), + }, + } + + for _, d := range data { + ba, err := json.Marshal(d.in) + if err != nil { + t.Errorf("got error: %s", err) + t.FailNow() + } + + if !reflect.DeepEqual(ba, d.want) { + t.Errorf("wanted: %s, got: %s", d.want, ba) + } + } +} diff --git a/pkg/message/message.go b/pkg/message/message.go new file mode 100644 index 000000000..5115cfac4 --- /dev/null +++ b/pkg/message/message.go @@ -0,0 +1,101 @@ +// Copyright 2014 The Transporter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package message provides wrapper structs and helper methods to pipe +// actual database documents throughout transporter. +package message + +import ( + "fmt" + "time" + + "gopkg.in/mgo.v2/bson" +) + +var ( + idKeys = []string{"_id", "id"} +) + +// A Msg serves to wrap the actual document to +// provide additional metadata about the document +// being transported. +type Msg struct { + Timestamp int64 + Op OpType + ID interface{} + OriginalID interface{} + document bson.M // document is private + idKey string // where the original id value is stored, either "_id" or "id" +} + +// NewMsg returns a new Msg with the ID extracted +// from the original document +func NewMsg(op OpType, doc bson.M) *Msg { + m := &Msg{ + Timestamp: time.Now().Unix(), + Op: op, + } + if doc != nil { + m.document, m.ID = m.extractID(doc) + m.OriginalID = m.ID + } + + return m +} + +// extractID will handle separating the id field from the +// rest of the document, can handle both 'id' and '_id' +func (m *Msg) extractID(doc bson.M) (bson.M, interface{}) { + for _, key := range idKeys { + id, exists := doc[key] + if exists { + m.idKey = key + delete(doc, key) + return doc, id + } + } + + fmt.Printf("id not found %+v\n", doc) + return doc, nil +} + +// IDString returns the original id as a string value +func (m *Msg) IDString() string { + switch t := m.ID.(type) { + case string: + return t + case bson.ObjectId: + return t.Hex() + case int32, int64, uint32, uint64: + return fmt.Sprintf("%d", t) + case float32, float64: + return fmt.Sprintf("%f", t) + default: + return fmt.Sprintf("%v", t) + } +} + +// Document returns the original doc, unaltered +func (m *Msg) Document() bson.M { + return m.DocumentWithID(m.idKey) +} + +// SetDocument will set the document variable and +// extract out the id and preserve it +func (m *Msg) SetDocument(doc bson.M) { + m.document, m.ID = m.extractID(doc) + if m.OriginalID == nil { // if we don't have an original id, then set it here + m.OriginalID = m.ID + } +} + +// DocumentWithID returns the document with the id field +// attached to the specified key +func (m *Msg) DocumentWithID(key string) bson.M { + doc := m.document + if m.ID != nil { + doc[key] = m.ID + } + return doc +} diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go new file mode 100644 index 000000000..1bfc1b58e --- /dev/null +++ b/pkg/message/message_test.go @@ -0,0 +1,190 @@ +package message + +import ( + "reflect" + "testing" + + "gopkg.in/mgo.v2/bson" +) + +type FakeMessage struct { + Op OpType + Doc bson.M +} + +func TestNewMsg(t *testing.T) { + data := []struct { + in FakeMessage + out *Msg + }{ + { + FakeMessage{Op: Insert, Doc: nil}, + &Msg{Op: Insert, ID: nil, document: nil}, + }, + { + FakeMessage{Op: Command, Doc: bson.M{"field1": 1}}, + &Msg{Op: Command, ID: nil, document: bson.M{"field1": 1}}, + }, + { + FakeMessage{Op: Insert, Doc: bson.M{"id": "nick", "field2": 1}}, + &Msg{Op: Insert, ID: "nick", document: bson.M{"field2": 1}, idKey: "id"}, + }, + { + FakeMessage{Op: Insert, Doc: bson.M{"_id": "nick", "field2": 1}}, + &Msg{Op: Insert, ID: "nick", document: bson.M{"field2": 1}, idKey: "_id"}, + }, + } + + for _, v := range data { + m := NewMsg(v.in.Op, v.in.Doc) + + if !reflect.DeepEqual(m.Document(), v.out.Document()) { + t.Errorf("Bad doc. expected %v, got %v", v.out.Document(), m.Document()) + } + + if !reflect.DeepEqual(m.ID, v.out.ID) { + t.Errorf("Bad Id. expected %v, got %v", v.out.ID, m.ID) + } + } +} + +func TestDocument(t *testing.T) { + data := []struct { + in *Msg + out bson.M + }{ + { + NewMsg(Insert, nil), + nil, + }, + { + NewMsg(Insert, bson.M{"field": 1}), + bson.M{"field": 1}, + }, + { + NewMsg(Insert, bson.M{"id": "nick", "field": 1}), + bson.M{"id": "nick", "field": 1}, + }, + { + NewMsg(Insert, bson.M{"_id": "nick", "field": 1}), + bson.M{"_id": "nick", "field": 1}, + }, + } + + for _, v := range data { + if !reflect.DeepEqual(v.in.Document(), v.out) { + t.Errorf("Bad doc. expected %+v, got %+v", v.out, v.in.Document()) + } + } +} + +func TestDocumentWithId(t *testing.T) { + data := []struct { + in *Msg + idkey string + out bson.M + }{ + { + NewMsg(Insert, nil), + "_id", + nil, + }, + + { + NewMsg(Insert, bson.M{"field": 1}), + "_id", + bson.M{"field": 1}, + }, + { + NewMsg(Insert, bson.M{"id": "nick", "field": 1}), + "id", + bson.M{"id": "nick", "field": 1}, + }, + { + NewMsg(Insert, bson.M{"id": "nick", "field": 1}), + "_id", + bson.M{"_id": "nick", "field": 1}, + }, + { + NewMsg(Insert, bson.M{"_id": "nick", "field": 1}), + "id", + bson.M{"id": "nick", "field": 1}, + }, + { + NewMsg(Insert, bson.M{"id": "nick", "field": 1}), + "_id", + bson.M{"_id": "nick", "field": 1}, + }, + } + + for _, v := range data { + if !reflect.DeepEqual(v.in.DocumentWithID(v.idkey), v.out) { + t.Errorf("Bad doc. expected %+v, got %+v", v.out, v.in.DocumentWithID(v.idkey)) + } + } +} + +func TestOriginalIdOnNew(t *testing.T) { + data := []struct { + in bson.M + originalID interface{} + }{ + { + nil, + nil, + }, + + { + bson.M{"field0": 1}, + nil, + }, + { + bson.M{"id": "nick1", "field1": 1}, + "nick1", + }, + { + bson.M{"_id": "nick2", "field2": 1}, + "nick2", + }, + } + + for _, v := range data { + msg := NewMsg(OpTypeFromString("insertable"), v.in) + if msg.OriginalID != v.originalID { + t.Errorf("NewMsg failed. expected %+v, got %+v", v.originalID, msg.OriginalID) + } + } +} + +func TestOriginalIdOnSet(t *testing.T) { + data := []struct { + in bson.M + originalID interface{} + }{ + { + nil, + nil, + }, + + { + bson.M{"field0": 1}, + nil, + }, + { + bson.M{"id": "nick1", "field1": 1}, + "nick1", + }, + { + bson.M{"_id": "nick2", "field2": 1}, + "nick2", + }, + } + + for _, v := range data { + msg := NewMsg(OpTypeFromString("inserty"), nil) + msg.SetDocument(v.in) + if msg.OriginalID != v.originalID { + t.Errorf("SetDocument failed. expected %+v, got %+v", v.originalID, msg.OriginalID) + } + } +} diff --git a/pkg/message/ops.go b/pkg/message/ops.go new file mode 100644 index 000000000..226ac3bce --- /dev/null +++ b/pkg/message/ops.go @@ -0,0 +1,64 @@ +// Copyright 2014 The Transporter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package message + +// OpType represents the many different Operations being +// performed against a document (i.e. Insert, Update, etc.) +type OpType int + +// messages refer to specific types database operations which are enumerated here. +const ( + Insert OpType = iota + Update + Delete + Command + Unknown +) + +// String returns the constant of the +// string representation of the OpType object. +func (o OpType) String() string { + switch o { + case Insert: + return "insert" + case Update: + return "update" + case Delete: + return "delete" + case Command: + return "command" + default: + return "unknown" + } +} + +// OpTypeFromString returns the constant +// representing the passed in string +func OpTypeFromString(s string) OpType { + switch s[0] { + case 'i': + return Insert + case 'u': + return Update + case 'd': + return Delete + case 'c': + return Command + default: + return Unknown + } +} + +// CommandType represents the different Commands capable +// of being executed against a database. +type CommandType int + +// Transporter understands the following different command types +const ( + + // Flush is interpreted by the recieving sink adaptors to attempt to flush all buffered + // operations to the database. This can be useful when switching from a copy to a tail operation + Flush CommandType = iota +) diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go new file mode 100644 index 000000000..2bd95258e --- /dev/null +++ b/pkg/pipe/pipe.go @@ -0,0 +1,137 @@ +// Copyright 2014 The Transporter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pipe provides types to help manage transporter communication channels as well as +// event types. +package pipe + +import ( + "time" + + "github.com/compose/transporter/pkg/events" + "github.com/compose/transporter/pkg/message" +) + +type messageChan chan *message.Msg + +func newMessageChan() messageChan { + return make(chan *message.Msg) +} + +// Pipe provides a set of methods to let transporter nodes communicate with each other. +// +// Pipes contain In, Out, Err, and Event channels. Messages are consumed by a node through the 'in' chan, emited from the node by the 'out' chan. +// Pipes come in three flavours, a sourcePipe, which only emits messages and has no listening loop, a sinkPipe which has a listening loop, but doesn't emit any messages, +// and joinPipe which has a li tening loop that also emits messages. +type Pipe struct { + In messageChan + Out []messageChan + Err chan error + Event chan events.Event + Stopped bool // has the pipe been stopped? + + MessageCount int + + path string // the path of this pipe (for events and errors) + chStop chan chan bool + listening bool +} + +// NewPipe creates a new Pipe. If the pipe that is passed in is nil, then this pipe will be treaded as a source pipe that just serves to emit messages. +// Otherwise, the pipe returned will be created and chained from the last member of the Out slice of the parent. This function has side effects, and will add +// an Out channel to the pipe that is passed in +func NewPipe(pipe *Pipe, path string) *Pipe { + + p := &Pipe{ + Out: make([]messageChan, 0), + path: path, + chStop: make(chan chan bool), + } + + if pipe != nil { + pipe.Out = append(pipe.Out, newMessageChan()) + p.In = pipe.Out[len(pipe.Out)-1] // use the last out channel + p.Err = pipe.Err + p.Event = pipe.Event + } else { + p.Err = make(chan error) + p.Event = make(chan events.Event) + } + + return p +} + +// Listen starts a listening loop that pulls messages from the In chan, applies fn(msg), a `func(message.Msg) error`, and emits them on the Out channel. +// Errors will be emited to the Pipe's Err chan, and will terminate the loop. +// The listening loop can be interupted by calls to Stop(). +func (m *Pipe) Listen(fn func(*message.Msg) (*message.Msg, error)) error { + if m.In == nil { + return nil + } + m.listening = true + defer func() { + m.Stopped = true + }() + for { + // check for stop + select { + case c := <-m.chStop: + c <- true + return nil + default: + } + + select { + case msg := <-m.In: + + outmsg, err := fn(msg) + if err != nil { + m.Err <- err + return err + } + if len(m.Out) > 0 { + m.Send(outmsg) + } else { + m.MessageCount++ // update the count anyway + } + case <-time.After(100 * time.Millisecond): + // NOP, just breath + } + } +} + +// Stop terminates the channels listening loop, and allows any timeouts in send to fail +func (m *Pipe) Stop() { + if !m.Stopped { + m.Stopped = true + + // we only worry about the stop channel if we're in a listening loop + if m.listening { + c := make(chan bool) + m.chStop <- c + <-c + } + } +} + +// Send emits the given message on the 'Out' channel. the send Timesout after 100 ms in order to chaeck of the Pipe has stopped and we've been asked to exit. +// If the Pipe has been stopped, the send will fail and there is no guarantee of either success or failure +func (m *Pipe) Send(msg *message.Msg) { + for _, ch := range m.Out { + + A: + for { + select { + case ch <- msg: + m.MessageCount++ + break A + case <-time.After(100 * time.Millisecond): + if m.Stopped { + // return, with no guarantee + return + } + } + } + } +} diff --git a/pkg/transporter/doc.go b/pkg/transporter/doc.go new file mode 100644 index 000000000..2e492101e --- /dev/null +++ b/pkg/transporter/doc.go @@ -0,0 +1,25 @@ +// Copyright 2014 The Transporter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package transporter provides all adaptoremented functionality to move +// data through transporter. +// +// A transporter pipeline consists of a tree of Nodes, with the root Node attached to the source database, +// and each child node is either a data transformer or a database sink. +// Node's can be defined like: +// +// transporter.NewNode("source", "mongo", map[string]interface{}{"uri": "mongodb://localhost/, "namespace": "test.colln", "debug": false, "tail": true}). +// Add(transporter.NewNode("out", "file", map[string]interface{}{"uri": "stdout://"})) +// +// and pipelines can be defined : +// pipeline, err := transporter.NewPipeline(source, events.NewNoopEmitter(), 1*time.Second) +// if err != nil { +// fmt.Println(err) +// os.Exit(1) +// } +// pipeline.Run() +// +// the event emitter's are defined in transporter/pkg/events, and are used to deliver error/metrics/etc about the running process + +package transporter diff --git a/pkg/transporter/node.go b/pkg/transporter/node.go new file mode 100644 index 000000000..a7456d68e --- /dev/null +++ b/pkg/transporter/node.go @@ -0,0 +1,196 @@ +// Copyright 2014 The Transporter Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package transporter provides all adaptoremented functionality to move +// data through transporter. +package transporter + +import ( + "fmt" + "time" + + "github.com/compose/transporter/pkg/adaptor" + "github.com/compose/transporter/pkg/pipe" +) + +// A Node is the basic building blocks of transporter pipelines. +// Nodes are constructed in a tree, with the first node broadcasting +// data to each of it's children. +// Node tree's can be constructed as follows: +// source := transporter.NewNode("name1", "mongo", adaptor.Config{"uri": "mongodb://localhost/boom", "namespace": "boom.foo", "debug": true}) +// sink1 := transporter.NewNode("foofile", "file", adaptor.Config{"uri": "stdout://"}) +// sink2 := transporter.NewNode("foofile2", "file", adaptor.Config{"uri": "stdout://"}) +// source.Add(sink1) +// source.Add(sink2) +// +type Node struct { + Name string `json:"name"` // the name of this node + Type string `json:"type"` // the node's type, used to create the adaptorementation + Extra adaptor.Config `json:"extra"` // extra config options that are passed to the adaptorementation + Children []*Node `json:"children"` // the nodes are set up as a tree, this is an array of this nodes children + Parent *Node `json:"parent"` // this node's parent node, if this is nil, this is a 'source' node + + adaptor adaptor.StopStartListener + pipe *pipe.Pipe +} + +// NewNode creates a new Node struct +func NewNode(name, kind string, extra adaptor.Config) *Node { + return &Node{ + Name: name, + Type: kind, + Extra: extra, + Children: make([]*Node, 0), + } +} + +// String +func (n *Node) String() string { + var ( + uri string + s string + prefix string + namespace = n.Extra.GetString("namespace") + depth = n.depth() + ) + if n.Type == "transformer" { + uri = n.Extra.GetString("filename") + } else { + uri = n.Extra.GetString("uri") + } + + prefixformatter := fmt.Sprintf("%%%ds%%-%ds", depth, 18-depth) + + if n.Parent == nil { // root node + // s = fmt.Sprintf("%18s %-40s %-15s %-30s %s\n", " ", "Name", "Type", "Namespace", "URI") + prefix = fmt.Sprintf(prefixformatter, " ", "- Source: ") + } else if len(n.Children) == 0 { + prefix = fmt.Sprintf(prefixformatter, " ", "- Sink: ") + } else if n.Type == "transformer" { + prefix = fmt.Sprintf(prefixformatter, " ", "- Transformer: ") + } + + s += fmt.Sprintf("%-18s %-40s %-15s %-30s %s", prefix, n.Name, n.Type, namespace, uri) + + for _, child := range n.Children { + s += "\n" + child.String() + } + return s +} + +// depth is a measure of how deep into the node tree this node is. Used to indent the String() stuff +func (n *Node) depth() int { + if n.Parent == nil { + return 1 + } + + return 1 + n.Parent.depth() +} + +// Path returns a string representation of the names of all the node's parents concatenated with "/" used in metrics +// eg. for the following tree +// source := transporter.NewNode("name1", "mongo", adaptor.Config{"uri": "mongodb://localhost/boom", "namespace": "boom.foo", "debug": true}) +// sink1 := transporter.NewNode("foofile", "file", adaptor.Config{"uri": "stdout://"}) +// source.Add(sink1) +// 'source' will have a Path of 'name1', and 'sink1' will have a path of 'name1/sink1' +func (n *Node) Path() string { + if n.Parent == nil { + return n.Name + } + + return n.Parent.Path() + "/" + n.Name +} + +// Add the given node as a child of this node. +// This has side effects, and sets the parent of the given node +func (n *Node) Add(node *Node) *Node { + node.Parent = n + n.Children = append(n.Children, node) + return n +} + +// Init sets up the node for action. It creates a pipe and adaptor for this node, +// and then recurses down the tree calling Init on each child +func (n *Node) Init(interval time.Duration) (err error) { + path := n.Path() + if n.Parent == nil { // we don't have a parent, we're the source + n.pipe = pipe.NewPipe(nil, path) + } else { // we have a parent, so pass in the parent's pipe here + n.pipe = pipe.NewPipe(n.Parent.pipe, path) + } + + n.adaptor, err = adaptor.Createadaptor(n.Type, path, n.Extra, n.pipe) + if err != nil { + return err + } + + for _, child := range n.Children { + err = child.Init(interval) // init each child + if err != nil { + return err + } + } + + return nil +} + +// Stop this node's adaptor, and sends a stop to each child of this node +func (n *Node) Stop() { + for _, node := range n.Children { + node.Stop() + } + n.adaptor.Stop() +} + +// Start starts the nodes children in a go routine, and then runs either Start() or Listen() +// on the node's adaptor. Root nodes (nodes with no parent) will run Start() +// and will emit messages to it's children, +// All descendant nodes run Listen() on the adaptor +func (n *Node) Start() error { + for _, child := range n.Children { + go func(node *Node) { + node.Start() + }(child) + } + + if n.Parent == nil { + return n.adaptor.Start() + } + + return n.adaptor.Listen() +} + +// Validate ensures that the node tree conforms to a proper structure. +// Node trees must have at least one source, and one sink. +// dangling transformers are forbidden. Validate only knows about default adaptors +// in the adaptor package, it can't validate any custom adaptors +func (n *Node) Validate() bool { + if n.Parent == nil && len(n.Children) == 0 { // the root node should have children + return false + } + + if n.Type == "transformer" && len(n.Children) == 0 { // transformers need children + return false + } + + for _, child := range n.Children { + if !child.Validate() { + return false + } + } + return true +} + +// Endpoints recurses down the node tree and accumulates a map associating node name with node type +// this is primarly used with the boot event +func (n *Node) Endpoints() map[string]string { + m := map[string]string{n.Name: n.Type} + for _, child := range n.Children { + childMap := child.Endpoints() + for k, v := range childMap { + m[k] = v + } + } + return m +} diff --git a/pkg/transporter/node_test.go b/pkg/transporter/node_test.go new file mode 100644 index 000000000..ce4108bba --- /dev/null +++ b/pkg/transporter/node_test.go @@ -0,0 +1,94 @@ +package transporter + +import ( + "testing" + + "github.com/compose/transporter/pkg/adaptor" +) + +func TestNodeString(t *testing.T) { + data := []struct { + in *Node + out string + }{ + { + &Node{}, + " - Source: ", + }, + { + NewNode("name", "mongodb", adaptor.Config{"uri": "uri", "namespace": "ns", "debug": false}), + " - Source: name mongodb ns uri", + }, + } + + for _, v := range data { + if v.in.String() != v.out { + t.Errorf("\nexpected: '%s'\n got: '%s'\n", v.out, v.in.String()) + } + } +} + +func TestValidate(t *testing.T) { + data := []struct { + in *Node + out bool + }{ + { + NewNode("first", "mongo", adaptor.Config{}), + false, + }, + { + NewNode("second", "mongo", adaptor.Config{}).Add(NewNode("name", "mongo", adaptor.Config{})), + true, + }, + { + NewNode("third", "mongo", adaptor.Config{}).Add(NewNode("name", "transformer", adaptor.Config{})), + false, + }, + { + NewNode("fourth", "mongo", adaptor.Config{}).Add(NewNode("name", "transformer", adaptor.Config{}).Add(NewNode("name", "mongo", adaptor.Config{}))), + true, + }, + } + + for _, v := range data { + if v.in.Validate() != v.out { + t.Errorf("%s: expected: %t got: %t", v.in.Name, v.out, v.in.Validate()) + } + } +} + +func TestPath(t *testing.T) { + data := []struct { + in *Node + out string + }{ + { + NewNode("first", "mongo", adaptor.Config{}), + "first", + }, + { + NewNode("first", "mongo", adaptor.Config{}).Add(NewNode("second", "mongo", adaptor.Config{})), + "first/second", + }, + { + NewNode("first", "mongo", adaptor.Config{}).Add(NewNode("second", "transformer", adaptor.Config{}).Add(NewNode("third", "mongo", adaptor.Config{}))), + "first/second/third", + }, + } + + for _, v := range data { + node := v.in + var path string + for { + if len(node.Children) == 0 { + path = node.Path() + break + } + node = node.Children[0] + } + if path != v.out { + t.Errorf("%s: expected: %s got: %s", node.Name, v.out, path) + } + } +} diff --git a/pkg/transporter/pipeline.go b/pkg/transporter/pipeline.go new file mode 100644 index 000000000..cb42c31d5 --- /dev/null +++ b/pkg/transporter/pipeline.go @@ -0,0 +1,161 @@ +package transporter + +import ( + "time" + + "github.com/compose/transporter/pkg/adaptor" + "github.com/compose/transporter/pkg/events" +) + +// VERSION the library +const ( + VERSION = "0.0.1" +) + +// A Pipeline is a the end to end description of a transporter data flow. +// including the source, sink, and all the transformers along the way +type Pipeline struct { + source *Node + emitter events.Emitter + metricsTicker *time.Ticker + + // Err is the fatal error that was sent from the adaptor + // that caused us to stop this process. If this is nil, then + // the transporter is running + Err error +} + +// NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and +// uses the events.HttpPostEmitter to deliver metrics. +// eg. +// source := +// transporter.NewNode("source", "mongo", adaptor.Config{"uri": "mongodb://localhost/", "namespace": "boom.foo", "debug": false, "tail": true}). +// Add(transporter.NewNode("out", "file", adaptor.Config{"uri": "stdout://"})) +// pipeline, err := transporter.NewDefaultPipeline(source, events.Api{URI: "http://localhost/endpoint"}, 1*time.Second) +// if err != nil { +// fmt.Println(err) +// os.Exit(1) +// } +// pipeline.Run() +func NewDefaultPipeline(source *Node, uri, key, pid string, interval time.Duration) (*Pipeline, error) { + emitter := events.NewHTTPPostEmitter(uri, key, pid) + return NewPipeline(source, emitter, interval) +} + +// NewPipeline creates a new Transporter Pipeline using the given tree of nodes, and Event Emitter +// eg. +// source := +// transporter.NewNode("source", "mongo", adaptor.Config{"uri": "mongodb://localhost/", "namespace": "boom.foo", "debug": false, "tail": true}). +// Add(transporter.NewNode("out", "file", adaptor.Config{"uri": "stdout://"})) +// pipeline, err := transporter.NewPipeline(source, events.NewNoopEmitter(), 1*time.Second) +// if err != nil { +// fmt.Println(err) +// os.Exit(1) +// } +// pipeline.Run() +func NewPipeline(source *Node, emitter events.Emitter, interval time.Duration) (*Pipeline, error) { + pipeline := &Pipeline{ + source: source, + emitter: emitter, + metricsTicker: time.NewTicker(interval), + } + + // init the pipeline + err := pipeline.source.Init(interval) + if err != nil { + return pipeline, err + } + + // init the emitter with the right chan + pipeline.emitter.Init(source.pipe.Event) + + // start the emitters + go pipeline.startErrorListener(source.pipe.Err) + go pipeline.startMetricsGatherer() + pipeline.emitter.Start() + + return pipeline, nil +} + +func (pipeline *Pipeline) String() string { + out := pipeline.source.String() + return out +} + +// Stop sends a stop signal to the emitter and all the nodes, whether they are running or not. +// the node's database adaptors are expected to clean up after themselves, and stop will block until +// all nodes have stopped successfully +func (pipeline *Pipeline) Stop() { + pipeline.source.Stop() + pipeline.emitter.Stop() + pipeline.metricsTicker.Stop() +} + +// Run the pipeline +func (pipeline *Pipeline) Run() error { + endpoints := pipeline.source.Endpoints() + // send a boot event + pipeline.source.pipe.Event <- events.NewBootEvent(time.Now().Unix(), VERSION, endpoints) + + // start the source + err := pipeline.source.Start() + if err != nil && pipeline.Err == nil { + pipeline.Err = err // only set it if it hasn't been set already. + } + + // pipeline has stopped, emit one last round of metrics and send the exit event + pipeline.emitMetrics() + pipeline.source.pipe.Event <- events.NewExitEvent(time.Now().Unix(), VERSION, endpoints) + + // the source has exited, stop all the other nodes + pipeline.Stop() + + return pipeline.Err +} + +// start error listener consumes all the events on the pipe's Err channel, and stops the pipeline +// when it receives one +func (pipeline *Pipeline) startErrorListener(cherr chan error) { + for err := range cherr { + if aerr, ok := err.(adaptor.Error); ok { + pipeline.source.pipe.Event <- events.NewErrorEvent(time.Now().Unix(), aerr.Path, aerr.Record, aerr.Error()) + } else { + if pipeline.Err == nil { + pipeline.Err = err + } + pipeline.Stop() + } + } +} + +func (pipeline *Pipeline) startMetricsGatherer() { + for _ = range pipeline.metricsTicker.C { + pipeline.emitMetrics() + } +} + +// emit the metrics +func (pipeline *Pipeline) emitMetrics() { + + frontier := make([]*Node, 1) + frontier[0] = pipeline.source + + for { + // pop the first item + node := frontier[0] + frontier = frontier[1:] + + // do something with the node + pipeline.source.pipe.Event <- events.NewMetricsEvent(time.Now().Unix(), node.Path(), node.pipe.MessageCount) + + // add this nodes children to the frontier + for _, child := range node.Children { + frontier = append(frontier, child) + } + + // if we're empty + if len(frontier) == 0 { + break + } + } +} diff --git a/pkg/transporter/pipeline_events_integration_test.go b/pkg/transporter/pipeline_events_integration_test.go new file mode 100644 index 000000000..4f04dfe68 --- /dev/null +++ b/pkg/transporter/pipeline_events_integration_test.go @@ -0,0 +1,105 @@ +// +build integration + +package transporter + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/compose/transporter/pkg/adaptor" +) + +var ( + metricsEvents = make([][]byte, 0) +) + +type EventHolder struct { + rawEvents [][]byte +} + +func TestEventsBroadcast(t *testing.T) { + data := []struct { + evt string + evtPath string + }{ + { + "boot", + "", + }, + { + "metrics", + "dummyFileOut", + }, + { + "metrics", + "dummyFileOut/dummyFileIn", + }, + { + "exit", + "", + }, + } + + eh := &EventHolder{rawEvents: make([][]byte, 0)} + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + event, _ := ioutil.ReadAll(r.Body) + r.Body.Close() + eh.rawEvents = append(eh.rawEvents, event) + })) + defer ts.Close() + ts.Start() + + var ( + inFile = "/tmp/dummyFileIn" + outFile = "/tmp/dummyFileOut" + ) + + setupFiles(inFile, outFile) + + // set up the nodes + dummyOutNode := NewNode("dummyFileOut", "file", adaptor.Config{"uri": "file://" + outFile}) + dummyOutNode.Add(NewNode("dummyFileIn", "file", adaptor.Config{"uri": "file://" + inFile})) + + p, err := NewDefaultPipeline(dummyOutNode, ts.URL, "asdf", "jklm", 1*time.Second) + if err != nil { + t.Errorf("can't create pipeline, got %s", err.Error()) + t.FailNow() + } + + p.Run() + + time.Sleep(time.Duration(5) * time.Second) + + if len(eh.rawEvents) != 4 { + t.Errorf("did not receive all events\nexp: %d\ngot: %d", 4, len(eh.rawEvents)) + } + + for _, val := range data { + if err = eh.lookupMetricEvent(val.evt, val.evtPath); err != nil { + t.Errorf("problem validating metric event, %s", err.Error()) + } + } + +} + +func (events EventHolder) lookupMetricEvent(metric, path string) error { + var evt map[string]interface{} + for _, val := range events.rawEvents { + if err := json.Unmarshal(val, &evt); err != nil { + return err + } + if evt["name"].(string) == metric { + // check for path if provided + if path != "" && evt["path"].(string) == path { + return nil + } + return nil + } + } + return fmt.Errorf("unable to locate metric, %s, in received metric events", metric) +} diff --git a/pkg/transporter/pipeline_integration_test.go b/pkg/transporter/pipeline_integration_test.go new file mode 100644 index 000000000..889e4a1f3 --- /dev/null +++ b/pkg/transporter/pipeline_integration_test.go @@ -0,0 +1,161 @@ +// +build integration + +package transporter + +import ( + "os" + "reflect" + "testing" + "time" + + "github.com/compose/transporter/pkg/adaptor" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" +) + +var ( + mongoUri = "mongodb://localhost/test" +) + +// set up some local files +func setupFiles(in, out string) { + // setup files + os.Remove(out) + os.Remove(in) + + fh, _ := os.Create(out) + defer func() { + fh.Close() + }() + fh.WriteString("{\"_id\":\"546656989330a846dc7ce327\",\"test\":\"hello world\"}\n") +} + +// set up local mongo +func setupMongo() { + // setup mongo + mongoSess, _ := mgo.Dial(mongoUri) + collection := mongoSess.DB("test").C("outColl") + collection.DropCollection() + + for i := 0; i <= 5; i += 1 { + collection.Insert(bson.M{"index": i}) + } + + mongoSess.Close() + mongoSess, _ = mgo.Dial(mongoUri) + collection = mongoSess.DB("test").C("inColl") + collection.DropCollection() + mongoSess.Close() +} + +// +// +// + +func TestFileToFile(t *testing.T) { + var ( + inFile = "/tmp/crapIn" + outFile = "/tmp/crapOut" + ) + + setupFiles(inFile, outFile) + + // create the source node and attach our sink + outNode := NewNode("localfileout", "file", adaptor.Config{"uri": "file://" + outFile}). + Add(NewNode("localfilein", "file", adaptor.Config{"uri": "file://" + inFile})) + + // create the pipeline + p, err := NewDefaultPipeline(outNode, "", "", "", 100*time.Millisecond) + if err != nil { + t.Errorf("can't create pipeline, got %s", err.Error()) + t.FailNow() + } + + // run it + err = p.Run() + if err != nil { + t.Errorf("error running pipeline, got %s", err.Error()) + t.FailNow() + } + + // compare the files + sourceFile, _ := os.Open(outFile) + sourceSize, _ := sourceFile.Stat() + defer sourceFile.Close() + sinkFile, _ := os.Open(inFile) + sinkSize, _ := sinkFile.Stat() + defer sinkFile.Close() + + if sourceSize.Size() == 0 || sourceSize.Size() != sinkSize.Size() { + t.Errorf("Incorrect file size\nexp %d\ngot %d", sourceSize.Size(), sinkSize.Size()) + } +} + +// +// +// + +func TestMongoToMongo(t *testing.T) { + setupMongo() + + var ( + inNs = "test.inColl" + outNs = "test.outColl" + ) + + // create the source node and attach our sink + outNode := NewNode("localOutmongo", "mongo", adaptor.Config{"uri": mongoUri, "namespace": outNs}). + Add(NewNode("localInmongo", "mongo", adaptor.Config{"uri": mongoUri, "namespace": inNs})) + + // create the pipeline + p, err := NewDefaultPipeline(outNode, "", "", "", 100*time.Millisecond) + if err != nil { + t.Errorf("can't create pipeline, got %s", err.Error()) + t.FailNow() + } + + // run it + err = p.Run() + if err != nil { + t.Errorf("error running pipeline, got %s", err.Error()) + t.FailNow() + } + + // connect to mongo and compare results + mongoSess, err := mgo.Dial(mongoUri) + if err != nil { + t.Error(err.Error()) + } + defer mongoSess.Close() + + collOut := mongoSess.DB("test").C("outColl") + collIn := mongoSess.DB("test").C("inColl") + + // are the counts the same? + outCount, _ := collOut.Count() + inCount, _ := collIn.Count() + + if outCount != inCount { + t.Errorf("Incorrect collection size\nexp %d\ngot %d\n", outCount, inCount) + } + + // iterate over the results and compare the documents + var result bson.M + iter := collIn.Find(bson.M{}).Iter() + for iter.Next(&result) { + var origDoc bson.M + err := collOut.Find(bson.M{"_id": result["_id"]}).One(&origDoc) + if err != nil { + t.Errorf("Unable to locate source doc +%v\n", result) + t.FailNow() + } + if !reflect.DeepEqual(result, origDoc) { + t.Errorf("Documents do not match\nexp %v\n, got %v\n", origDoc, result) + } + } + + // clean up + mongoSess.DB("test").C("outColl").DropCollection() + mongoSess.DB("test").C("inColl").DropCollection() + +} diff --git a/pkg/transporter/pipeline_test.go b/pkg/transporter/pipeline_test.go new file mode 100644 index 000000000..4cddb5877 --- /dev/null +++ b/pkg/transporter/pipeline_test.go @@ -0,0 +1,75 @@ +package transporter + +import ( + "errors" + "testing" + "time" + + "github.com/compose/transporter/pkg/adaptor" + "github.com/compose/transporter/pkg/pipe" +) + +var ( + fakesourceCN = NewNode("source1", "source", adaptor.Config{"value": "rockettes"}) + fileNode = NewNode("localfile", "file", adaptor.Config{"uri": "file:///tmp/foo"}) +) + +// a noop node adaptor to help test +type Testadaptor struct { + value string +} + +func NewTestadaptor(p *pipe.Pipe, path string, extra adaptor.Config) (adaptor.StopStartListener, error) { + val, ok := extra["value"] + if !ok { + return nil, errors.New("this is an error") + } + return &Testadaptor{value: val.(string)}, nil +} + +func (s *Testadaptor) Stop() error { + return nil +} + +func (s *Testadaptor) Start() error { + return nil +} + +func (s *Testadaptor) Listen() error { + return nil +} + +func TestPipelineString(t *testing.T) { + adaptor.Register("source", NewTestadaptor) + + data := []struct { + in *Node + terminalNode *Node + out string + }{ + { + fakesourceCN, + nil, + " - Source: source1 source ", + }, + { + fakesourceCN, + fileNode, + " - Source: source1 source \n - Sink: localfile file file:///tmp/foo", + }, + } + + for _, v := range data { + if v.terminalNode != nil { + v.in.Add(v.terminalNode) + } + p, err := NewDefaultPipeline(v.in, "", "", "", 100*time.Millisecond) + if err != nil { + t.Errorf("can't create pipeline, got %s", err.Error()) + t.FailNow() + } + if p.String() != v.out { + t.Errorf("\nexpected:\n%s\ngot:\n%s\n", v.out, p.String()) + } + } +} diff --git a/test/application-es.js b/test/application-es.js new file mode 100644 index 000000000..74b1c6758 --- /dev/null +++ b/test/application-es.js @@ -0,0 +1,4 @@ + +pipeline = Source({name:"testfile"}) + .transform("transformers/passthrough_and_log.js") + .save({name:"es", namespace: "test.test"}) diff --git a/test/application-filex2.js b/test/application-filex2.js new file mode 100644 index 000000000..f6d8e01a9 --- /dev/null +++ b/test/application-filex2.js @@ -0,0 +1,3 @@ + +Source({name:"foofile"}).transform("transformers/passthrough_and_log.js").save({name:"stdout"}) +Source({name:"foofile"}).transform("transformers/passthrough_and_log.js").save({name:"foofile2"}) diff --git a/test/application-influx.js b/test/application-influx.js new file mode 100644 index 000000000..3ff6b83a7 --- /dev/null +++ b/test/application-influx.js @@ -0,0 +1,4 @@ + +pipeline = Source({name:"localmongo", namespace: "gru-development.backups"}) + .transform("transformers/influx_test.js") + .save({name:"timeseries", namespace: "compose.backups"}) diff --git a/test/application-mongo-file.js b/test/application-mongo-file.js new file mode 100644 index 000000000..25a20f68a --- /dev/null +++ b/test/application-mongo-file.js @@ -0,0 +1,6 @@ + +// create a pipeline +pipeline = Source({name:"localmongo", namespace: "boom.foo", tail: true}) + .transform("transformers/passthrough_and_log.js") + .save({name:"stdout"}) + diff --git a/test/application-multisave.js b/test/application-multisave.js new file mode 100644 index 000000000..cf8eccc34 --- /dev/null +++ b/test/application-multisave.js @@ -0,0 +1,5 @@ +pipeline = Source({name:"foofile"}) + +pipeline.transform("transformers/passthrough_and_log.js").save({name:"foofile2"}) +pipeline.save({name:"localmongo", namespace: "boom.bas"}) +pipeline.save({name:"localmongo", namespace: "boom.baz"}) diff --git a/test/application.js b/test/application.js new file mode 100644 index 000000000..2b4f1472c --- /dev/null +++ b/test/application.js @@ -0,0 +1,3 @@ + +// create a pipeline that reads documents from a file, transforms them, and writes them +pipeline = Source({name:"foofile"}).transform("transformers/passthrough_and_log.js").save({name:"errorfile"}) diff --git a/test/config.yaml b/test/config.yaml new file mode 100644 index 000000000..7383a2504 --- /dev/null +++ b/test/config.yaml @@ -0,0 +1,31 @@ +api: + interval: 1s + uri: "https://app.compose.io/services/transporter/v1/events" + # uri: "http://requestb.in/13gerls1" + key: "48593282-b38d-4bf5-af58-f7327271e73d" + pid: "something-static" +nodes: + localmongo: + type: mongo + uri: mongodb://localhost/boom + es: + type: elasticsearch + uri: https://nick:darling@haproxy1.dblayer.com:10291/thisgetsignored + timeseries: + type: influx + uri: influxdb://root:root@localhost:8086/compose + debug: + type: file + uri: stdout:// + foofile: + type: file + uri: file:///tmp/foo + foofile2: + type: file + uri: file:///tmp/foo2 + errorfile: + type: file + uri: file:///var/gonnaerror + stdout: + type: file + uri: stdout:// diff --git a/test/transformers/influx_test.js b/test/transformers/influx_test.js new file mode 100644 index 000000000..1462bf8c7 --- /dev/null +++ b/test/transformers/influx_test.js @@ -0,0 +1 @@ +module.exports = function(doc) { return _.pick(doc, ["_id", "download_count"]) } \ No newline at end of file diff --git a/test/transformers/passthrough_and_log.js b/test/transformers/passthrough_and_log.js new file mode 100644 index 000000000..fd952e20b --- /dev/null +++ b/test/transformers/passthrough_and_log.js @@ -0,0 +1,4 @@ +module.exports = function(doc) { + console.log("transformer: " + JSON.stringify(doc)) + return doc +} \ No newline at end of file diff --git a/test/transformers/passthrough_and_log2.js b/test/transformers/passthrough_and_log2.js new file mode 100644 index 000000000..4cfe9709e --- /dev/null +++ b/test/transformers/passthrough_and_log2.js @@ -0,0 +1,4 @@ +module.exports = function(doc) { + console.log("transformer2: " + JSON.stringify(doc)) + return doc +} \ No newline at end of file diff --git a/test/transformers/transform2.js b/test/transformers/transform2.js new file mode 100644 index 000000000..7aa8e78f6 --- /dev/null +++ b/test/transformers/transform2.js @@ -0,0 +1,6 @@ +module.exports = function(doc) { + if (doc["name"] == "errorme") { + throwanerroryay(); + } + return doc +}