Skip to content

Commit

Permalink
API change: hide the sending channel from the writer callback passed …
Browse files Browse the repository at this point in the history
…to New. Remove Group (for now).
  • Loading branch information
bobg committed Jan 2, 2022
1 parent efb19b0 commit 4b5175f
Show file tree
Hide file tree
Showing 15 changed files with 48 additions and 238 deletions.
56 changes: 6 additions & 50 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,46 +85,6 @@ func Gen[T any](ctx context.Context, f func() (T, bool, error)) *Iter[T]
Gen produces an iterator whose members are generated by successive calls to
a given function.
func Group[T any, U comparable](ctx context.Context, inp *Iter[T], partition func(T) (U, error)) *Iter[Pair[U, *Iter[T]]]
Group partitions the elements of an iterator into multiple separate iterator
streams based on a given partitioning function. Each item in the input is
fed to the function to see which partition it belongs in. The output is an
iterator of X,Y pairs where X is the partition key and Y is an iterator over
the elements in that partition.
Callers reading the top-level output iterator should launch goroutines to
consume the nested iterators. Otherwise the process trying to consume items
in partition P1 is likely to block while the Group iterator waits for
something to consume an item in partition P2.
Example:
var (
groups = Group(ctx, input, partitionFunc)
wg sync.WaitGroup
)
for {
pair, ok, err := groups.Next()
// ...check err...
if !ok {
break
}
wg.Add(1)
go func() {
defer wg.Done()
partitionKey, partitionItems := pair.X, pair.Y
for {
item, ok, err := partitionItems.Next()
// ...check err...
if !ok {
break
}
// ...handle item...
}
}()
}
wg.Wait()
func Ints(ctx context.Context, start, delta int) *Iter[int]
Ints produces an infinite iterator of integers, starting at start and
incrementing by delta.
Expand All @@ -139,17 +99,13 @@ func Map[T, U any](ctx context.Context, inp *Iter[T], f func(T) (U, error)) *Ite
Map transforms a sequence of T-type elements into a sequence of U-type
elements by applying a function to each one.
func New[T any](ctx context.Context, writer func(context.Context, chan<- T) error) *Iter[T]
New[T] creates a new Iter[T]. The writer function is invoked once (in a
goroutine), and must supply all of the iterator's elements on the given
channel.
The writer function should return early, with an error, if its context is
canceled, without blocking on a channel send. This is done with a Go select
statement, or (for convenience) the Send function in this package.
func New[T any](ctx context.Context, writer func(send func(T) error) error) *Iter[T]
New[T] creates a new Iter[T].
The writer function must not close the channel; this will happen
automatically when the function exits.
The writer function is invoked once (in a goroutine), and must supply all of
the iterator's elements by repeated calls to the send function. If the send
function returns an error, the writer function should return early with that
error.
Any error returned by the writer function will be placed in the Err field of
the resulting iterator.
Expand Down
4 changes: 2 additions & 2 deletions accum.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import "context"
// and
// out[i+1] == f(out[i], inp[i+1])
func Accum[T any](ctx context.Context, inp *Iter[T], f func(T, T) (T, error)) *Iter[T] {
return New(ctx, func(ctx context.Context, ch chan<- T) error {
return New(ctx, func(send func(T) error) error {
var (
last T
first = true
Expand All @@ -32,7 +32,7 @@ func Accum[T any](ctx context.Context, inp *Iter[T], f func(T, T) (T, error)) *I
return err
}
}
err = Send(ctx, ch, last)
err = send(last)
if err != nil {
return err
}
Expand Down
28 changes: 15 additions & 13 deletions chit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@ type Iter[T any] struct {
}

// New[T] creates a new Iter[T].
//
// The writer function is invoked once
// (in a goroutine),
// and must supply all of the iterator's elements on the given channel.
//
// The writer function should return early,
// with an error,
// if its context is canceled,
// without blocking on a channel send.
// This is done with a Go select statement,
// or (for convenience) the Send function in this package.
//
// The writer function must not close the channel;
// this will happen automatically when the function exits.
// and must supply all of the iterator's elements
// by repeated calls to the send function
// (which, buffering aside, will block until a downstream reader requires the value being sent).
// If the send function returns an error,
// the writer function should return early with that error.
//
// Any error returned by the writer function will be placed in the Err field of the resulting iterator.
func New[T any](ctx context.Context, writer func(context.Context, chan<- T) error) *Iter[T] {
func New[T any](ctx context.Context, writer func(send func(T) error) error) *Iter[T] {
ctx, cancel := context.WithCancel(ctx)

// Benchmark results:
Expand All @@ -48,7 +43,14 @@ func New[T any](ctx context.Context, writer func(context.Context, chan<- T) erro
cancel: cancel,
}
go func() {
iter.Err = writer(ctx, ch)
iter.Err = writer(func(x T) error {
select {
case ch <- x:
return nil
case <-ctx.Done():
return ctx.Err()
}
})
close(ch)
}()
return iter
Expand Down
4 changes: 2 additions & 2 deletions concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "context"

// Concat[T] takes a sequence of iterators and produces an iterator over all the elements of the input iterators, in sequence.
func Concat[T any](ctx context.Context, inps ...*Iter[T]) *Iter[T] {
return New(ctx, func(ctx context.Context, ch chan<- T) error {
return New(ctx, func(send func(T) error) error {
for _, inp := range inps {
for {
x, ok, err := inp.Next()
Expand All @@ -14,7 +14,7 @@ func Concat[T any](ctx context.Context, inps ...*Iter[T]) *Iter[T] {
if !ok {
break
}
err = Send(ctx, ch, x)
err = send(x)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions dup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Dup[T any](ctx context.Context, inp *Iter[T], n int) []*Iter[T] {
for idx := 0; idx < n; idx++ {
idx := idx // Go loop-var pitfall
var iter *Iter[T]
iter = New(ctx, func(ctx context.Context, ch chan<- T) error {
iter = New(ctx, func(send func(T) error) error {
for {
x, ok, err := func() (T, bool, error) {
mu.Lock()
Expand Down Expand Up @@ -75,7 +75,7 @@ func Dup[T any](ctx context.Context, inp *Iter[T], n int) []*Iter[T] {
if !ok {
return nil
}
err = Send(ctx, ch, x)
err = send(x)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "context"
// Filter filters the elements of an iterator according to a predicate function.
// Only the elements in the input iterator producing a true value appear in the output iterator.
func Filter[T any](ctx context.Context, inp *Iter[T], f func(T) (bool, error)) *Iter[T] {
return New(ctx, func(ctx context.Context, ch chan<- T) error {
return New(ctx, func(send func(T) error) error {
for {
x, ok, err := inp.Next()
if err != nil {
Expand All @@ -21,7 +21,7 @@ func Filter[T any](ctx context.Context, inp *Iter[T], f func(T) (bool, error)) *
if !ok {
continue
}
err = Send(ctx, ch, x)
err = send(x)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "context"

// Gen produces an iterator whose members are generated by successive calls to a given function.
func Gen[T any](ctx context.Context, f func() (T, bool, error)) *Iter[T] {
return New(ctx, func(ctx context.Context, ch chan<- T) error {
return New(ctx, func(send func(T) error) error {
for {
x, ok, err := f()
if err != nil {
Expand All @@ -13,7 +13,7 @@ func Gen[T any](ctx context.Context, f func() (T, bool, error)) *Iter[T] {
if !ok {
return nil
}
err = Send(ctx, ch, x)
err = send(x)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions gomap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import "context"

// FromMap creates a channel iterator over a map.
func FromMap[K comparable, V any](ctx context.Context, inp map[K]V) *Iter[Pair[K, V]] {
return New(ctx, func(ctx context.Context, ch chan<- Pair[K, V]) error {
return New(ctx, func(send func(Pair[K, V]) error) error {
for k, v := range inp {
k, v := k, v // Go loop-var pitfall
err := Send(ctx, ch, Pair[K, V]{X: k, Y: v})
err := send(Pair[K, V]{X: k, Y: v})
if err != nil {
return err
}
Expand Down
99 changes: 0 additions & 99 deletions group.go

This file was deleted.

49 changes: 0 additions & 49 deletions group_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "context"
// Map transforms a sequence of T-type elements into a sequence of U-type elements
// by applying a function to each one.
func Map[T, U any](ctx context.Context, inp *Iter[T], f func(T) (U, error)) *Iter[U] {
return New(ctx, func(ctx context.Context, ch chan<- U) error {
return New(ctx, func(send func(U) error) error {
for {
x, ok, err := inp.Next()
if err != nil {
Expand All @@ -18,7 +18,7 @@ func Map[T, U any](ctx context.Context, inp *Iter[T], f func(T) (U, error)) *Ite
if err != nil {
return err
}
err = Send(ctx, ch, y)
err = send(y)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4b5175f

Please sign in to comment.