Skip to content

Commit

Permalink
Merge pull request #3 from falmar/dev
Browse files Browse the repository at this point in the history
add tests, and sig breaking change
  • Loading branch information
falmar committed Aug 6, 2023
2 parents aa4bb1e + 36c1b9b commit 4f1a9d4
Show file tree
Hide file tree
Showing 4 changed files with 593 additions and 32 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ package main
import (
"context"
"fmt"
"github.com/falmar/krun"
"os"
"time"

"github.com/falmar/krun"
)

func main() {
queue := krun.New(krun.NewConfig{
queue := krun.New(&krun.Config{
Size: 5, // number of workers
WaitSleep: time.Millisecond * 10,
WaitSleep: time.Microsecond,
})

job := func(ctx context.Context) (interface{}, error) {
Expand All @@ -47,7 +48,9 @@ func main() {
os.Exit(1)
}

fmt.Println("Result:", res.Data)
queue.Wait(ctx)

fmt.Println("Result:", res.Data.(string))
}
```

Expand All @@ -57,5 +60,5 @@ Krun is released under the MIT License. See [LICENSE](LICENSE) for more informat


## TODO:
- [ ] unit test
- [x] unit test
- [ ] go releaser
33 changes: 17 additions & 16 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,33 @@ package main
import (
"context"
"fmt"
"math/rand"
"os"
"time"

"github.com/falmar/krun"
)

func main() {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
ctx := context.Background()
k := krun.New(krun.NewConfig{
Size: 10,
queue := krun.New(&krun.Config{
Size: 5, // number of workers
WaitSleep: time.Microsecond,
})

for i := 0; i < 100; i++ {
ctx := context.WithValue(ctx, "key", i)
job := func(ctx context.Context) (interface{}, error) {
time.Sleep(time.Millisecond * 100)
return "Hello, Krun!", nil
}

r := k.Run(ctx, func(ctx context.Context) (interface{}, error) {
// do some work
time.Sleep(time.Millisecond * (300 + time.Duration(random.Intn(700))))
return ctx.Value("key"), nil
})
ctx := context.Background()
resChan := queue.Run(ctx, job)

go func(r <-chan *krun.Result) {
fmt.Println("hello from index:", (<-r).Data)
}(r)
res := <-resChan
if res.Error != nil {
fmt.Println("Error:", res.Error)
os.Exit(1)
}

k.Wait(context.Background())
queue.Wait(ctx)

fmt.Println("Result:", res.Data.(string))
}
25 changes: 14 additions & 11 deletions krun.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ type krun struct {
n int
waitSleep time.Duration
workers chan *worker
working int
mu sync.RWMutex
}
type worker struct {
ctx context.Context
job Job
result chan *Result
}

type NewConfig struct {
type Config struct {
Size int
WaitSleep time.Duration
}

func New(cfg NewConfig) Krun {
func New(cfg *Config) Krun {
k := &krun{
n: cfg.Size,
workers: make(chan *worker, cfg.Size),
Expand Down Expand Up @@ -72,19 +70,25 @@ func (k *krun) Run(ctx context.Context, f Job) <-chan *Result {
}

func (k *krun) Wait(ctx context.Context) {
breakL:
k.mu.RLock()
n := k.n
k.mu.RUnlock()

if k.len() == n {
return
}

for {
select {
case <-ctx.Done():
return
default:
case <-time.After(k.waitSleep):
// "wait" until all workers are back
if k.len() < k.n {
time.Sleep(k.waitSleep)
if k.len() < n {
continue
}

break breakL
return
}
}
}
Expand All @@ -105,8 +109,7 @@ func (k *krun) push(w *worker) {
}

func (k *krun) pop() *worker {
w := <-k.workers
return w
return <-k.workers
}

func (k *krun) len() int {
Expand Down
Loading

0 comments on commit 4f1a9d4

Please sign in to comment.