Skip to content

Commit

Permalink
Pool (#1)
Browse files Browse the repository at this point in the history
* implement workers pool

* bump ci versions

* lint: unused in tests

* lint: test warns
  • Loading branch information
umputun committed Sep 19, 2020
1 parent d32b1ea commit 507090e
Show file tree
Hide file tree
Showing 11 changed files with 965 additions and 19 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: set up go 1.14
- name: set up go 1.15
uses: actions/setup-go@v1
with:
go-version: 1.14
go-version: 1.15
id: go

- name: checkout
Expand All @@ -30,7 +30,7 @@ jobs:
- name: install golangci-lint and goveralls
run: |
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GITHUB_WORKSPACE v1.24.0
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GITHUB_WORKSPACE v1.31.0
GO111MODULE=off go get -u -v github.com/mattn/goveralls
- name: run linters
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ issues:
exclude-use-default: false

service:
golangci-lint-version: 1.24.x
golangci-lint-version: 1.31.x
103 changes: 97 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Flow - FBP / pipelines [![Build Status](https://github.com/go-pkgz/flow/workflows/build/badge.svg)](https://github.com/go-pkgz/flow/actions) [![Go Report Card](https://goreportcard.com/badge/github.com/go-pkgz/flow)](https://goreportcard.com/report/github.com/go-pkgz/flow) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/flow/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/flow?branch=master)
# Flow - FBP / pipelines / workers pool

[![Build Status](https://github.com/go-pkgz/flow/workflows/build/badge.svg)](https://github.com/go-pkgz/flow/actions) [![Go Report Card](https://goreportcard.com/badge/github.com/go-pkgz/flow)](https://goreportcard.com/report/github.com/go-pkgz/flow) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/flow/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/flow?branch=master)

Package `flow` provides support for very basic FBP / pipelines. It helps to structure multistage processing as
a set of independent handlers communicating via channels. The typical use case is for ETL (extract, transform, load)
type of processing.
type of processing. Package `flow` doesn't introduce any high-level abstraction and keeps everything in the hand of the user.

Package `flow` doesnt't introduce any high-level abstraction and keeps everything in the hand of the user.

## Details
Package `pool` provides a simplified version of `flow` suitable for cases with a single-handler flows.

## Details about `flow` package

- Each handler represents an async stage. It consumes data from an input channel and publishes results to an output channel.
- Each handler runs in a separate goroutine.
Expand All @@ -26,7 +29,7 @@ running handlers gracefully and won't keep any goroutine running/leaking.

`go get -u github.com/go-pkgz/flow`

## Example of the handler
## Example of the flow's handler

```go
// ReaderHandler creates flow.Handler, reading strings from any io.Reader
Expand Down Expand Up @@ -55,7 +58,7 @@ func ReaderHandler(reader io.Reader) Handler {
}
```

## Usage
## Usage of the flow package

_for complete example see [example](https://github.com/go-pkgz/flow/tree/master/_example)_

Expand Down Expand Up @@ -195,4 +198,92 @@ func ExampleFlow_parallel() {
fmt.Printf("all done, result=%v", <-f.Channel())
}
}
```

## Details about `pool` package

- In addition to the default "run a func in multiple goroutines" mode, it also provides an optional support of chunked workers. It means - each key, detected by user-provide func guaranteed to be processed by the same worker. Such mode needed for stateful flows where each set of input records has to be processed sequentially and some state should be kept.
- another thing `pool` provides is a batch size. This one is a simple performance optimization keeping input request into a buffer and send them to worker channel in batches (slices) instead of per-submit call

Options:

- `ChunkFn` - the function returns string identifying the chunk
- `Batch` - sets batch size (default 1)
- `ChanResSize` sets the size of output buffered channel (default 1)
- `ChanWorkerSize` sets the size of workers buffered channel (default 1)
- `ContinueOnError` allows workers continuation after error occurred
- `OnCompletion` sets callback for each worker called on successful completion

### worker function

Worker function passed by user and will run in multiple workers (goroutines).
This is the function: `type workerFn func(ctx context.Context, inp interface{}, resCh interface{}, store WorkerStore} error`

It takes `inp` parameter, does the job and optionally send result(s) to `resCh`. Error will terminate all workers.
Note: `workerFn` can be stateful, collect anything it needs and sends 0 or more results. Results wrapped in `Response` struct
allowing to communicate error code back to consumer. `workerFn` doesn't need to send errors, enough just return non-nil error.

### worker store

Each worker gets `WorkerStore` and can be used as thread-safe per-worker storage for any intermediate results.

```go
type WorkerStore interface {
Set(key string, val interface{})
Get(key string) (interface{}, bool)
GetInt(key string) int
GetFloat(key string) float64
GetString(key string) string
GetBool(key string) bool
Keys() []string
Delete(key string)
}
```

_alternatively state can be kept outside of workers as a slice of values and accessed by worker ID._

### usage

```go
p := pool.New(8, func(ctx context.Context, v interface{}, resCh interface{}, ws pool.WorkerStore} error {
// worker function gets input v processes it and response(s) channel to send results

input, ok := v.(string) // in this case it gets string as input
if !ok {
return errors.New("incorrect input type")
}
// do something with input
// ...

v := ws.GetInt("something") // access thread-local var

resCh <- pool.Response{Data: "foo"}
resCh <- pool.Response{Data: "bar"}
pool.Metrics(ctx).Inc("counter")
ws.Set("something", 1234) // keep thread-local things
return "something", true, nil
})

ch := p.Go(context.TODO()) // start all workers in 8 goroutines

// submit values (consumer side)
go func() {
p.Submit("something")
p.Submit("something else")
p.Close() // indicates completion of all inputs
}()

for rec := range ch {
if rec.Errors != nil { // error happened
return err
}
log.Print(rec.Data) // print value
}

// alternatively ReadAll helper can be used to get everything from response channel
res, err := pool.ReadAll(ch)

// metrics the same as for flow
metrics := pool.Metrics()
log.Print(metrics.Get("counter"))
```
8 changes: 4 additions & 4 deletions _example/example.go → _example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package main
import (
"bufio"
"context"
"fmt"
"log"
"os"
"strings"

"github.com/go-pkgz/flow"
"github.com/pkg/errors"
)

// this toy example demonstrates a typical use of flow library.
Expand Down Expand Up @@ -52,7 +52,7 @@ func lineSplitHandler(ctx context.Context, ch chan interface{}) (chan interface{
fname := val.(string)
fh, err := os.Open(fname)
if err != nil {
return errors.Wrapf(err, "failed to open %s", fname)
return fmt.Errorf("failed to open %s: %w", fname, err)
}

lines := 0
Expand All @@ -72,7 +72,7 @@ func lineSplitHandler(ctx context.Context, ch chan interface{}) (chan interface{

log.Printf("file reader completed for %s, read %d lines (total %d)", fname, lines, metrics.Get("lines"))
if scanner.Err() != nil {
return errors.Wrapf(scanner.Err(), "scanner failed for %s", fname)
return fmt.Errorf("scanner failed for %s: %w", fname, scanner.Err())
}
}
log.Printf("line split handler completed, id=%d lines=%d", flow.CID(ctx), metrics.Get("lines"))
Expand Down Expand Up @@ -107,7 +107,7 @@ func wordsHandler(minSize int) flow.Handler {
count++
wi := wordsInfo{size: len(w), special: strings.Contains(w, "data")}
if err := flow.Send(ctx, wordsCh, wi); err != nil {
return errors.Wrapf(err, "failed words handler %d", flow.CID(ctx))
return fmt.Errorf("failed words handler %d: %w", flow.CID(ctx), err)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/go-pkgz/flow

go 1.14
go 1.15

require (
github.com/stretchr/testify v1.3.0
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
github.com/stretchr/testify v1.6.1
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
)
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,10 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
52 changes: 52 additions & 0 deletions pool/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package pool

// Option func type
type Option func(p *Workers)

// ChunkFn functional option defines chunk func distributing records to particular workers.
// The function should return key string identifying the record.
// Record with a given key string guaranteed to be processed by the same worker.
func ChunkFn(chunkFn func(val interface{}) string) Option {
return func(p *Workers) {
p.chunkFn = chunkFn
}
}

// ResChanSize sets size of response's channel buffer
func ResChanSize(size int) Option {
return func(p *Workers) {
if size >= 0 {
p.resChanSize = size
}
}
}

// WorkerChanSize sets size of worker channel(s)
func WorkerChanSize(size int) Option {
return func(p *Workers) {
if size >= 0 {
p.workerChanSize = size
}
}
}

// Batch sets batch size to collect incoming records in a buffer before sending to workers
func Batch(size int) Option {
return func(p *Workers) {
if size >= 1 {
p.batchSize = size
}
}
}

// ContinueOnError change default early termination on the first error and continue after error
func ContinueOnError(p *Workers) {
p.continueOnError = true
}

// OnCompletion set function called on completion for each worker id
func OnCompletion(completeFn CompleteFn) Option {
return func(p *Workers) {
p.completeFn = completeFn
}
}
Loading

0 comments on commit 507090e

Please sign in to comment.