Skip to content

Commit

Permalink
Add response and request pattern to worker (#46)
Browse files Browse the repository at this point in the history
* Add response and request pattern to worker

* Add info for code generation
  • Loading branch information
donutloop committed Jan 15, 2020
1 parent 0af635e commit 193de0d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 31 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ In each sub directory is a set of examples

## Code generation

`Currently, it's only compatiable with version 2.x.x`

The code generation tool generates for a pattern an none generic version for spefici type

### Supported pattern
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/donutloop/toolkit

require github.com/fatih/astrewrite v0.0.0-20180730114054-bef4001d457f

go 1.13
11 changes: 6 additions & 5 deletions worker/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
)

func ExampleWorker() {
workerHandler := func(parameter worker.GenericType) {
workerHandler := func(parameter interface{}) (interface{}, error) {
v := parameter.(string)
fmt.Println(v)
return v + " world", nil
}

queue := worker.New(2, workerHandler, 10)
request, response, _ := worker.New(2, workerHandler, 10)

queue <- "hello"
request <- "hello"
<-time.After(time.Millisecond * 250)
fmt.Println(<-response)

// Output: hello
// Output: hello world
}
45 changes: 30 additions & 15 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,34 @@ import (
"container/list"
)

type Request chan<- interface{}

type Response <-chan interface{}

type worker struct {
c chan interface{}
done chan bool
jobs chan interface{}
fn func(n GenericType)
buf *list.List
request chan interface{}
response chan interface{}
errs chan error
done chan bool
jobs chan interface{}
fn func(n interface{}) (interface{}, error)
buf *list.List
}

// NewWorker starts n*Workers goroutines running func on incoming
// parameters sent on the returned channel.
func New(nWorkers uint, fn func(gt GenericType), buffer uint) chan<- interface{} {
retc := make(chan interface{}, buffer)
func New(nWorkers uint, fn func(gt interface{}) (interface{}, error), buffer uint) (Request, Response, <-chan error) {
request := make(chan interface{}, buffer)
response := make(chan interface{}, buffer)
errs := make(chan error, buffer)
w := &worker{
c: retc,
jobs: make(chan interface{}, buffer),
done: make(chan bool),
fn: fn,
buf: list.New(),
errs: errs,
request: request,
response: response,
jobs: make(chan interface{}, buffer),
done: make(chan bool),
fn: fn,
buf: list.New(),
}
go w.listener()
for i := uint(0); i < nWorkers; i++ {
Expand All @@ -32,11 +42,11 @@ func New(nWorkers uint, fn func(gt GenericType), buffer uint) chan<- interface{}
<-w.done
}
}()
return retc
return request, response, errs
}

func (w *worker) listener() {
inc := w.c
inc := w.request
for inc != nil || w.buf.Len() > 0 {
outc := w.jobs
var frontNode interface{}
Expand Down Expand Up @@ -67,7 +77,12 @@ func (w *worker) work() {
w.done <- true
return
}
w.fn(genericType)
v, err := w.fn(genericType)
if err != nil {
w.errs <- err
continue
}
w.response <- v
}
}
}
29 changes: 22 additions & 7 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker_test

import (
"fmt"
"github.com/donutloop/toolkit/worker"
"sync/atomic"
"testing"
Expand All @@ -19,11 +20,10 @@ func TestWorker(t *testing.T) {
}

counter := int32(0)
workerHandler := func(parameter worker.GenericType) {
workerHandler := func(parameter interface{}) (interface{}, error) {
v, ok := parameter.(string)
if !ok {
t.Errorf("value is not a string got=%v", parameter)
return
return false, fmt.Errorf("value is not a string got=%v", parameter)
}

if !containes([]string{"hello", "golang", "world"}, v) {
Expand All @@ -32,13 +32,28 @@ func TestWorker(t *testing.T) {

t.Logf("value: %v", v)
atomic.AddInt32(&counter, 1)
return true, nil
}

queue := worker.New(3, workerHandler, 10)
request, response, errs := worker.New(3, workerHandler, 10)

queue <- "hello"
queue <- "golang"
queue <- "world"
request <- "hello"
request <- "golang"
request <- "world"

go func() {
for err := range errs {
t.Error(err)
}
}()

go func() {
for v := range response {
if !v.(bool) {
t.Error("bad type")
}
}
}()

<-time.After(500 * time.Millisecond)

Expand Down
4 changes: 0 additions & 4 deletions worker/worker_type.go

This file was deleted.

0 comments on commit 193de0d

Please sign in to comment.