Skip to content

Commit

Permalink
Fixed go-getter processor, added details to templates
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkreidler committed Dec 29, 2019
1 parent dbcbe72 commit 3919a8e
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 15 deletions.
33 changes: 30 additions & 3 deletions _templates/processor/new/processor.ejs.t
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import (

type <%= Name %>Processor struct {
state chan api.DataChunkState
config interface{} //TODO: change this to your config type
config ConfigType //TODO: change this to your config type
}

func (p *<%= Name %>Processor) Configure(config interface{}) error {
p.config = config
// TODO: cast to your config type
opts := config.(*ConfigType)
p.config = *opts

return nil
}

Expand Down Expand Up @@ -47,12 +50,36 @@ func (p *<%= Name %>Processor) done() {
}

func (p *<%= Name %>Processor) Run(data interface{}) {
defer p.done()
p.updateState(api.DataChunkStateRUNNING)

// Remember to decode from the map[string]interface{} data to your config type
// First we decode the map into the correct structure
var opts ConfigType
log.Printf("got raw data: %#+v \n", data)
err := mapstructure.Decode(data, &opts)
if err != nil {
log.Println(err)
}

// Then we merge the config into the data
log.Printf("existing config: %#+v, new data: %#+v \n", p.config, opts)
err = mergo.Merge(&opts, p.config, func(config *mergo.Config) {
config.Overwrite = true
})
if err != nil {
log.Println(err)
}
// opts now contains the merged options

// TODO: Add your code here

if err != nil {
p.updateState(api.DataChunkStateFAILED)
return
}

p.updateState(api.DataChunkStateSUCCEEDED)
p.done()
}

func (p *<%= Name %>Processor) GetError() error {
Expand Down
2 changes: 1 addition & 1 deletion executor/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func rManager(data api.Data, r *runProcessor) {

r.dataLock.Lock()
r.workers[data.ChunkID].out.Format = api.DataFormatRAW
r.workers[data.ChunkID].out.State = api.DataChunkStateSUCCEEDED
r.workers[data.ChunkID].out.State = r.workers[data.ChunkID].in.State
r.workers[data.ChunkID].out.RawData = out
r.dataLock.Unlock()

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/gin-gonic/gin v1.5.0
github.com/gorilla/mux v1.7.3
github.com/hashicorp/go-getter v1.4.0
github.com/imdario/mergo v0.3.8
github.com/joeybloggs/go-download v2.1.0+incompatible
github.com/mitchellh/mapstructure v1.1.2
Expand Down
129 changes: 129 additions & 0 deletions go.sum

Large diffs are not rendered by default.

67 changes: 57 additions & 10 deletions processors/get/get.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@

/*
The get processor uses go-getter from Hashicorp to quickly and easily get data from many sources including Git, HTTP, S3, etc
It takes in a simple string as input which contains a reference to the file and also any required configuration such as S3 access keys or the Git SHA
*/
package get

import (
"github.com/alexkreidler/wiz/api"
"github.com/alexkreidler/wiz/processors/processor"
gogetter "github.com/hashicorp/go-getter"
"github.com/imdario/mergo"
"github.com/mitchellh/mapstructure"
"io/ioutil"
"log"
"time"
)

type GoGetConfig struct {
// Source is the source to download. It can either be a file or a folder, and Go-Getter will fetch it with GetAny
// It can also include any go-getter configuration
Source string
}

type GetProcessor struct {
state chan api.DataChunkState
config interface{} //TODO: change this to your config type
state chan api.DataChunkState
config GoGetConfig //TODO: change this to your config type
dir string
}

func (p *GetProcessor) Configure(config interface{}) error {
p.config = config
return nil
opts := config.(*GoGetConfig)
p.config = *opts

return nil
}

func (p *GetProcessor) GetConfig() interface{} {
Expand All @@ -32,7 +47,7 @@ func (p *GetProcessor) State() <-chan api.DataChunkState {
}

func (p *GetProcessor) Output() interface{} {
return map[string]string{"test": "output"}
return map[string]string{"OutputDir": p.dir}
}

func (p *GetProcessor) updateState(state api.DataChunkState) {
Expand All @@ -44,12 +59,44 @@ func (p *GetProcessor) done() {
}

func (p *GetProcessor) Run(data interface{}) {
defer p.done()
p.updateState(api.DataChunkStateRUNNING)

// TODO: Add your code here
// First we decode the map into the correct structure
var opts GoGetConfig
log.Printf("got raw data: %#+v \n", data)
err := mapstructure.Decode(data, &opts)
if err != nil {
log.Println(err)
}

// Then we merge the config into the data
log.Printf("existing config: %#+v, new data: %#+v \n", p.config, opts)
err = mergo.Merge(&opts, p.config, func(config *mergo.Config) {
config.Overwrite = true
})
if err != nil {
log.Println(err)
}

dir, err := ioutil.TempDir("", "go-get")
if err != nil {
log.Println(err)
p.updateState(api.DataChunkStateFAILED)
return
}
log.Println("created temp dir", dir)

// TODO: expose more options
err = gogetter.GetAny(dir, opts.Source)
if err != nil {
log.Println(err)
p.updateState(api.DataChunkStateFAILED)
return
}
p.dir = dir

p.updateState(api.DataChunkStateSUCCEEDED)
p.done()
}

func (p *GetProcessor) GetError() error {
Expand All @@ -59,7 +106,7 @@ func (p *GetProcessor) GetError() error {
func (p *GetProcessor) Metadata() api.Processor {
return api.Processor{
ProcID: "get",
Name: "Go-Get (Hashicorp) Processor",
Name: "Go-Getter (Hashicorp) Processor",
Version: "0.1.0",
}
}
2 changes: 1 addition & 1 deletion processors/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (g *GitProcessor) State() <-chan api.DataChunkState {
}

func (g *GitProcessor) Output() interface{} {
return map[string]string{"Dir": g.dir}
return map[string]string{"OutputDir": g.dir}
}

func (g *GitProcessor) updateState(state api.DataChunkState) {
Expand Down
2 changes: 2 additions & 0 deletions processors/processors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package processors

import (
"github.com/alexkreidler/wiz/processors/get"
"github.com/alexkreidler/wiz/processors/git"
"github.com/alexkreidler/wiz/processors/noop"
"github.com/alexkreidler/wiz/processors/processor"
Expand All @@ -25,5 +26,6 @@ func ConfiguredProcessorRegistry() ProcessorRegistry {
p := ProcessorRegistry{make(ProcessorMap)}
p.AddProcessor("noop", noop.NoopProcessor{})
p.AddProcessor("git", &git.GitProcessor{})
p.AddProcessor("get", &get.GetProcessor{})
return p
}

0 comments on commit 3919a8e

Please sign in to comment.