diff --git a/README.md b/README.md index 6de142a..c39ffad 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ In each sub directory is a set of examples ## Code generation +`Currently, it's only compatiable with version 2.x.x` + The code generation tool generates for a pattern an none generic version for spefici type ### Supported pattern diff --git a/go.mod b/go.mod index a2c0ce9..1af03db 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/donutloop/toolkit require github.com/fatih/astrewrite v0.0.0-20180730114054-bef4001d457f + +go 1.13 diff --git a/worker/doc_test.go b/worker/doc_test.go index 2bcc741..39cbad7 100644 --- a/worker/doc_test.go +++ b/worker/doc_test.go @@ -7,15 +7,16 @@ import ( ) func ExampleWorker() { - workerHandler := func(parameter worker.GenericType) { + workerHandler := func(parameter interface{}) (interface{}, error) { v := parameter.(string) - fmt.Println(v) + return v + " world", nil } - queue := worker.New(2, workerHandler, 10) + request, response, _ := worker.New(2, workerHandler, 10) - queue <- "hello" + request <- "hello" <-time.After(time.Millisecond * 250) + fmt.Println(<-response) - // Output: hello + // Output: hello world } diff --git a/worker/worker.go b/worker/worker.go index ff2038b..fedf3b0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,24 +4,34 @@ import ( "container/list" ) +type Request chan<- interface{} + +type Response <-chan interface{} + type worker struct { - c chan interface{} - done chan bool - jobs chan interface{} - fn func(n GenericType) - buf *list.List + request chan interface{} + response chan interface{} + errs chan error + done chan bool + jobs chan interface{} + fn func(n interface{}) (interface{}, error) + buf *list.List } // NewWorker starts n*Workers goroutines running func on incoming // parameters sent on the returned channel. -func New(nWorkers uint, fn func(gt GenericType), buffer uint) chan<- interface{} { - retc := make(chan interface{}, buffer) +func New(nWorkers uint, fn func(gt interface{}) (interface{}, error), buffer uint) (Request, Response, <-chan error) { + request := make(chan interface{}, buffer) + response := make(chan interface{}, buffer) + errs := make(chan error, buffer) w := &worker{ - c: retc, - jobs: make(chan interface{}, buffer), - done: make(chan bool), - fn: fn, - buf: list.New(), + errs: errs, + request: request, + response: response, + jobs: make(chan interface{}, buffer), + done: make(chan bool), + fn: fn, + buf: list.New(), } go w.listener() for i := uint(0); i < nWorkers; i++ { @@ -32,11 +42,11 @@ func New(nWorkers uint, fn func(gt GenericType), buffer uint) chan<- interface{} <-w.done } }() - return retc + return request, response, errs } func (w *worker) listener() { - inc := w.c + inc := w.request for inc != nil || w.buf.Len() > 0 { outc := w.jobs var frontNode interface{} @@ -67,7 +77,12 @@ func (w *worker) work() { w.done <- true return } - w.fn(genericType) + v, err := w.fn(genericType) + if err != nil { + w.errs <- err + continue + } + w.response <- v } } } diff --git a/worker/worker_test.go b/worker/worker_test.go index c58ff34..3234b86 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,6 +1,7 @@ package worker_test import ( + "fmt" "github.com/donutloop/toolkit/worker" "sync/atomic" "testing" @@ -19,11 +20,10 @@ func TestWorker(t *testing.T) { } counter := int32(0) - workerHandler := func(parameter worker.GenericType) { + workerHandler := func(parameter interface{}) (interface{}, error) { v, ok := parameter.(string) if !ok { - t.Errorf("value is not a string got=%v", parameter) - return + return false, fmt.Errorf("value is not a string got=%v", parameter) } if !containes([]string{"hello", "golang", "world"}, v) { @@ -32,13 +32,28 @@ func TestWorker(t *testing.T) { t.Logf("value: %v", v) atomic.AddInt32(&counter, 1) + return true, nil } - queue := worker.New(3, workerHandler, 10) + request, response, errs := worker.New(3, workerHandler, 10) - queue <- "hello" - queue <- "golang" - queue <- "world" + request <- "hello" + request <- "golang" + request <- "world" + + go func() { + for err := range errs { + t.Error(err) + } + }() + + go func() { + for v := range response { + if !v.(bool) { + t.Error("bad type") + } + } + }() <-time.After(500 * time.Millisecond) diff --git a/worker/worker_type.go b/worker/worker_type.go deleted file mode 100644 index dad1cdf..0000000 --- a/worker/worker_type.go +++ /dev/null @@ -1,4 +0,0 @@ -package worker - -// used for code generation replacing -type GenericType interface{}