Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the collection of task values and errors #5

Merged
merged 3 commits into from
Mar 19, 2024
Merged

Conversation

creachadair
Copy link
Owner

@creachadair creachadair commented Mar 18, 2024

Simplify the implementations of Group and Collector.

Remove the separate goroutine collecting errors, and deliver them directly to
the error filter and the output field. Moreover simplify the setup and teardown
so that there is not so much coordinated state. Although performance was not a
primary consideration, benchmarking suggests this is actually faster than the
previous implementation, and uses less memory.

Also expand and clarify the documentation of the Wait method.

Instead of maintaining a separate goroutine to synchronize delivery of values,
rework the collector to use a plain sync.Mutex.

In addition, add a new Report method, replacing Stream. Instead of a channel,
tasks using this method accepts a report function that sends values to the
collector. The report function ensures control does not return to the task
until the reported value has been serviced, which allows tasks to ensure they
do not exit until all their values have been addressed.

The Stream method still works, but is deprecated. To preserve its interface,
each Stream call now spins up a new goroutine to service the values from its
task. This is wasteful, but easily replaced by switching to Report.

Co-Authored-By: David Anderson

@creachadair creachadair force-pushed the mjf/nochan branch 3 times, most recently from e358024 to 1a906a6 Compare March 18, 2024 05:35
@creachadair creachadair changed the title WIP remove collector channels (mostly) Simplify the collection of task values and errors Mar 18, 2024
@creachadair creachadair marked this pull request as ready for review March 18, 2024 12:13
@creachadair creachadair force-pushed the mjf/nochan branch 7 times, most recently from 20e1885 to 9a6c1f5 Compare March 18, 2024 20:18
collector.go Outdated Show resolved Hide resolved
taskgroup.go Show resolved Hide resolved
taskgroup.go Outdated Show resolved Hide resolved
taskgroup.go Show resolved Hide resolved
@creachadair creachadair force-pushed the mjf/nochan branch 3 times, most recently from aad792e to a3af8db Compare March 19, 2024 00:42
Roughly compare the performance of accumulating values with a separate
goroutine via a channel, vs. accumulating them directly under a lock.
@creachadair creachadair force-pushed the mjf/nochan branch 2 times, most recently from fd420e2 to 82c12d2 Compare March 19, 2024 00:52
creachadair and others added 2 commits March 18, 2024 17:55
Instead of maintaining a separate goroutine to synchronize delivery of values,
rework the collector to use a plain sync.Mutex.

This:

- Greatly simplifies the code (with one exception, noted below).

- Eliminates the need for a separate goroutine to service values. Each task now
  handles its own service, mediated by the collector. That, in turn:

- Eliminates the need to Wait for the Collector: Once all the goroutines
  running tasks in the collector have exited, the state is fully settled.
  The Wait method is now a no-op, and is marked as deprecated.

In addition, add a new Report method, replacing Stream. Instead of a channel,
tasks using this method accepts a report function that sends values to the
collector.  The report function ensures control does not return to the task
until the reported value has been serviced, which allows tasks to ensure they
do not exit until all their values have been addressed.

The Stream method still works, but is deprecated. To preserve its interface,
each Stream call now spins up a new goroutine to service the values from its
task. This is wasteful, but easily replaced by switching to Report.

Co-Authored-By: David Anderson <dave@natulte.net>
Remove the separate goroutine collecting errors, and deliver them directly to
the error filter and the output field. Moreover simplify the setup and teardown
so that there is not so much coordinated state. Although performance was not a
primary consideration, benchmarking suggests this is actually faster than the
previous implementation, and uses less memory.

Also expand and clarify the documentation of the Wait method.

Co-Authored-By: David Anderson <dave@natulte.net>
@creachadair creachadair merged commit 0ed7876 into main Mar 19, 2024
1 check passed
@creachadair creachadair deleted the mjf/nochan branch March 19, 2024 01:00
README.md Show resolved Hide resolved
taskgroup.go Show resolved Hide resolved
// progress. Once all Wait calls have returned, the group is ready for reuse.
func (g *Group) Wait() error { g.cleanup(); return g.err }
// As with sync.WaitGroup, new tasks can be added to g during a call to Wait
// only if there was already at least one task active when Wait was called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be stronger? Any new Go must occur while at least one task is continuously active until after Go has returned.

As stated in the comment, I believe it permits the sequence:

  • G1 calls g.Go, spawns task G2
  • G1 calls g.Wait (G2 is alive, contract is satisfied)
  • G3 calls g.Go, gets through the activation check, is about to call wg.Add
  • G2 completes and calls wg.Done concurrently with the prior Add
  • wg.Add observes a zero bounce and panics

The required ordering is quite annoying to describe cleanly :/

"new tasks can be added to g during a call to Wait only if the group continuously contains at least one active task at all times, starting before the call to Wait and until after the final concurrent call to Go returns" ? :/

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I was focusing on the Wait'er too much.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

// As with sync.WaitGroup, new tasks can be added to g during a call to Wait
// only if the group contains at least one active task when Wait is called and
// continuously thereafter until the last concurrent call to g.Go returns.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants