Skip to content

Commit

Permalink
add tests for new,run, and wait
Browse files Browse the repository at this point in the history
  • Loading branch information
falmar committed Aug 6, 2023
1 parent bb77b61 commit 891316e
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 25 deletions.
16 changes: 11 additions & 5 deletions krun.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,25 @@ func (k *krun) Run(ctx context.Context, f Job) <-chan *Result {
}

func (k *krun) Wait(ctx context.Context) {
breakL:
k.mu.RLock()
n := k.n
k.mu.RUnlock()

if k.len() == n {
return
}

for {
select {
case <-ctx.Done():
return
default:
case <-time.After(k.waitSleep):
// "wait" until all workers are back
if k.len() < k.n {
time.Sleep(k.waitSleep)
if k.len() < n {
continue
}

break breakL
return
}
}
}
Expand Down
330 changes: 310 additions & 20 deletions krun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,295 @@ import (
"time"
)

func TestWork(t *testing.T) {
func TestNew(t *testing.T) {
t.Parallel()

t.Run("returns a Krun", func(t *testing.T) {
k := New(NewConfig{})
if k == nil {
t.Errorf("Expected Krun, got nil")
}

v, ok := k.(*krun)
if !ok {
t.Errorf("Expected *krun, got %T", v)
}
})

t.Run("returns a Krun with the correct size", func(t *testing.T) {
k := New(NewConfig{Size: 5}).(*krun)

if k.n != 5 {
t.Errorf("Expected 5, got %v", k.Size())
}

if len(k.workers) != 5 {
t.Errorf("Expected 5, got %v", len(k.workers))
}
})

t.Run("returns a Krun with the correct waitSleep", func(t *testing.T) {
k := New(NewConfig{WaitSleep: time.Second}).(*krun)

if k.waitSleep != time.Second {
t.Errorf("Expected 1s, got %v", k.waitSleep)
}
})
}

func TestKrun_Size(t *testing.T) {
t.Parallel()

t.Run("returns the correct size", func(t *testing.T) {
k := krun{
n: 5,
}

if k.Size() != 5 {
t.Errorf("Expected 5, got %v", k.Size())
}
})
}

func TestRun(t *testing.T) {
t.Parallel()

t.Run("returns a channel", func(t *testing.T) {
k := krun{
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}
k.push(&worker{})

r := k.Run(context.Background(), func(ctx context.Context) (interface{}, error) {
return nil, nil
})

if r == nil {
t.Errorf("Expected channel, got nil")
}
})

t.Run("blocks waiting for available worker", func(t *testing.T) {
k := krun{
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}

errChan := make(chan error)

go func() {
k.Run(context.Background(), func(ctx context.Context) (interface{}, error) {
return nil, nil
})
errChan <- errors.New("expected k.Run to block")
}()

select {
case e := <-errChan:
t.Errorf(e.Error())
case <-time.After(time.Millisecond):
return
}
})

t.Run("sends off the job for work and return result", func(t *testing.T) {
k := krun{
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}
k.push(&worker{})

r := k.Run(context.Background(), func(ctx context.Context) (interface{}, error) {
return 5, nil
})

res := <-r
if i, ok := res.Data.(int); !ok {
t.Errorf("Expected int, got %T", res.Data)
} else if i != 5 {
t.Errorf("Expected 5, got %v", i)
}
})

t.Run("pass context to job", func(t *testing.T) {
k := krun{
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}
k.push(&worker{})

ctx := context.Background()

r := k.Run(ctx, func(ctx2 context.Context) (interface{}, error) {
if ctx != ctx2 {
t.Errorf("Expected %v, got %v", ctx, ctx2)
}
return nil, nil
})

if d := <-r; d.Error != nil {
t.Errorf("Expected nil, got %v", d.Error)
} else if d.Data != nil {
t.Errorf("Expected nil, got %v", d.Data)
}

ctx, cancel := context.WithCancel(context.Background())
cancel()

r = k.Run(ctx, func(ctx2 context.Context) (interface{}, error) {
select {
case <-ctx2.Done():
return nil, ctx2.Err()
default:
t.Errorf("Expected context to be cancelled")
return nil, nil
}
})

if d := <-r; d.Error == nil {
t.Errorf("Expected error, got nil")
} else if !errors.Is(d.Error, context.Canceled) {
t.Errorf("Expected context.Canceled, got %v", d.Error)
}
})
}

func TestKrun_Wait(t *testing.T) {
t.Parallel()

t.Run("blocks if workers are not done", func(t *testing.T) {
k := krun{
waitSleep: time.Millisecond * 50,
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}
ctx := context.Background()
wChan := make(chan struct{})
// len 1, has no workers

go func() {
k.Wait(ctx)
wChan <- struct{}{}
}()

select {
case <-wChan:
t.Errorf("Expected k.Wait to block")
case <-time.After(time.Millisecond * 100):
break
}
})

t.Run("unblocks when workers are done", func(t *testing.T) {
k := krun{
waitSleep: time.Millisecond * 50,
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}
ctx := context.Background()
wChan := make(chan struct{})
// len 1, work is "done"
k.workers <- &worker{}

go func() {
k.Wait(ctx)
wChan <- struct{}{}
}()

time.Sleep(time.Millisecond)

select {
case <-wChan:
break
case <-time.After(time.Millisecond):
t.Errorf("Expected k.Wait to unblock")
}
})

t.Run("sleep between checks", func(t *testing.T) {
k := krun{
waitSleep: time.Millisecond * 50,
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}
ctx := context.Background()
wChan := make(chan struct{})
// len 1, work is "done"

var start time.Time
var end time.Time

go func() {
time.Sleep(time.Millisecond * 60)
k.workers <- &worker{}
}()

go func() {
start = time.Now()
k.Wait(ctx)
end = time.Now()
wChan <- struct{}{}
}()

time.Sleep(time.Millisecond)

select {
case <-wChan:
if end.Sub(start) < time.Millisecond*50 {
t.Errorf("Expected k.Wait to sleep for at least 50ms")
}
case <-time.After(time.Millisecond * 100):
t.Errorf("Expected k.Wait to unblock")
}
})

t.Run("context done unblock", func(t *testing.T) {
k := krun{
waitSleep: time.Millisecond * 50,
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}

var start time.Time
var end time.Time
c := make(chan struct{})

go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()

start = time.Now()
k.Wait(ctx)
end = time.Now()

c <- struct{}{}
}()

select {
case <-c:
if end.Sub(start) < time.Millisecond*100 {
t.Errorf("Expected k.Wait to sleep for at least 100ms")
}

break
case <-time.After(time.Millisecond * 200):
t.Errorf("Expected k.Wait to unblock")
}
})
}

func TestKrun_Work(t *testing.T) {
t.Parallel()

t.Run("data is passed through", func(t *testing.T) {
k := krun{
workers: make(chan *worker, 1),
Expand Down Expand Up @@ -135,13 +423,14 @@ func TestWork(t *testing.T) {
})
}

func TestPush(t *testing.T) {
func TestKrun_Push(t *testing.T) {
t.Parallel()

t.Run("pushes worker to channel", func(t *testing.T) {
k := krun{
waitSleep: time.Microsecond,
workers: make(chan *worker, 3),
mu: sync.RWMutex{},
n: 3,
workers: make(chan *worker, 3),
mu: sync.RWMutex{},
n: 3,
}
workers := []*worker{
&worker{},
Expand Down Expand Up @@ -178,10 +467,9 @@ func TestPush(t *testing.T) {

t.Run("blocks until worker is pulled", func(t *testing.T) {
k := krun{
waitSleep: time.Microsecond,
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
workers: make(chan *worker, 1),
mu: sync.RWMutex{},
n: 1,
}

errChan := make(chan error)
Expand All @@ -201,12 +489,13 @@ func TestPush(t *testing.T) {
})
}

func TestPop(t *testing.T) {
func TestKrun_Pop(t *testing.T) {
t.Parallel()

k := krun{
waitSleep: time.Microsecond,
workers: make(chan *worker, 3),
mu: sync.RWMutex{},
n: 3,
workers: make(chan *worker, 3),
mu: sync.RWMutex{},
n: 3,
}
workers := []*worker{
&worker{},
Expand Down Expand Up @@ -235,12 +524,13 @@ func TestPop(t *testing.T) {
}
}

func TestLen(t *testing.T) {
func TestKrun_Len(t *testing.T) {
t.Parallel()

k := krun{
waitSleep: time.Microsecond,
workers: make(chan *worker, 3),
mu: sync.RWMutex{},
n: 3,
workers: make(chan *worker, 3),
mu: sync.RWMutex{},
n: 3,
}
workers := []*worker{
&worker{},
Expand Down

0 comments on commit 891316e

Please sign in to comment.