Skip to content

Commit

Permalink
API change: iter.Go now uses a bare channel instead of a callback. Al…
Browse files Browse the repository at this point in the history
…so, remove a stray debugging panic.
  • Loading branch information
bobg committed May 13, 2022
1 parent 318a184 commit 5d8285d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 30 deletions.
17 changes: 5 additions & 12 deletions iter/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func (ch *chanIter[T]) Next() bool {
select {
case <-done:
ch.err = ch.ctx.Err()
panic(ch.err) // xxx
return false
case val, ok := <-ch.ch:
if !ok {
Expand Down Expand Up @@ -127,22 +126,16 @@ func toChan[T any](ctx context.Context, inp Of[T]) (<-chan T, func() error) {
}

// Go runs a function in a goroutine and returns an iterator over the values it produces.
// The function receives a callback for producing values.
func Go[T any](ctx context.Context, f func(send func(T) error) error) Of[T] {
// The function receives a channel for producing values.
// The channel closes when the function exits.
// Any error produced by the function is the value of the iterator's Err method.
func Go[T any](ctx context.Context, f func(ch chan<- T) error) Of[T] {
var (
ch = make(chan T)
res = &chanIter[T]{ch: ch, ctx: ctx}
)
go func() {
send := func(val T) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- val:
}
return nil
}
res.err = f(send)
res.err = f(ch)
close(ch)
}()
return res
Expand Down
13 changes: 5 additions & 8 deletions iter/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,11 @@ func ExampleDup() {
}

func ExampleGo() {
it := iter.Go(context.Background(), func(send func(val int) error) error {
if err := send(1); err != nil {
return err
}
if err := send(2); err != nil {
return err
}
return send(3)
it := iter.Go(context.Background(), func(ch chan<- int) error {
ch <- 1
ch <- 2
ch <- 3
return nil
})
slice, err := iter.ToSlice(it)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions iter/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func SQL[T any](ctx context.Context, db QueryerContext, query string, args ...an
return nil, fmt.Errorf("executing query: %w", err)
}

res := Go(ctx, func(send func(T) error) error {
res := Go(ctx, func(ch chan<- T) error {
defer rows.Close()

for rows.Next() {
Expand All @@ -58,9 +58,11 @@ func SQL[T any](ctx context.Context, db QueryerContext, query string, args ...an
if err != nil {
return fmt.Errorf("scanning row: %w", err)
}
err = send(rowval.Interface().(T))
if err != nil {
return fmt.Errorf("sending row: %w", err)

select {
case <-ctx.Done():
return ctx.Err()
case ch <- rowval.Interface().(T):
}
}
return rows.Err()
Expand Down
13 changes: 7 additions & 6 deletions parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ func Producers[T any](ctx context.Context, n int, f func(context.Context, int, f
// and the value.
//
// The caller receives two callbacks:
// one for sending a value to the workers,
// and one for closing that channel
// (signaling the end of input and causing the workers to exit normally).
// one for sending a value to the workers via an internal channel,
// and one for closing that channel,
// signaling the end of input and causing the workers to exit normally.
//
// The callback that the callers uses to produce values for the workers may block until a worker is available to consume the value.
// The value-sending callback may block until a worker is available to consume the value.
//
// An error from any worker cancels them all,
// whereupon the send-value callback will return an error.
// An error from any worker cancels them all.
// This error is returned from the close-channel callback.
// After any error, the value-sending callback will return an error.
// (Not the original error, however.
// For that, the caller should still invoke the close callback.)
func Consumers[T any](ctx context.Context, n int, f func(context.Context, int, T) error) (func(T) error, func() error) {
Expand Down

0 comments on commit 5d8285d

Please sign in to comment.