New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of System.Threading.Tasks.Channels #335

Merged
merged 4 commits into from Oct 17, 2015

Conversation

Projects
None yet
6 participants
@stephentoub
Member

stephentoub commented Oct 16, 2015

Prototype of a new System.Threading.Tasks.Channels library, which provides data structures for handing off data between producers and consumers. The included README.md provides an overview of the library and its exposed functionality.

An initial set of functional tests are included, with ~95% line and branch coverage.

Further work is desired to expose more operations over channels, provide additional built-in channel implementations, and optimize the existing channel and operator implementations further.

@stephentoub

This comment has been minimized.

Show comment
Hide comment
Member

stephentoub commented Oct 16, 2015

@StephenCleary

This comment has been minimized.

Show comment
Hide comment
@StephenCleary

StephenCleary Oct 16, 2015

I think this line in the README:

```WaitToRead```/```WriteAsync```

would be less confusing as:

```WaitToReadAsync```/```WaitToWriteAsync```

Also, I would really like to see a separate Deque collection proposal. .NET has long needed one, as illustrated by the SimpleQueue included here and other places. Ditto for IAsyncEnumerator - it feels as though that would be a type with wider application than Channels.

StephenCleary commented Oct 16, 2015

I think this line in the README:

```WaitToRead```/```WriteAsync```

would be less confusing as:

```WaitToReadAsync```/```WaitToWriteAsync```

Also, I would really like to see a separate Deque collection proposal. .NET has long needed one, as illustrated by the SimpleQueue included here and other places. Ditto for IAsyncEnumerator - it feels as though that would be a type with wider application than Channels.

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 16, 2015

Member

I think this line in the README

Yes, that's just a typo, I'll fix it.

Ditto for IAsyncEnumerator

Yes, I included it here simply as a convenient place to put it. If it were to be included in the framework, it would certainly be put somewhere more general, especially with language support depending on it.

as illustrated by the SimpleQueue included here and other places

Didn't quite understand this... SimpleQueue isn't a deque, it's just a regular queue without some of the (small) overheads of Queue<T>.

Member

stephentoub commented Oct 16, 2015

I think this line in the README

Yes, that's just a typo, I'll fix it.

Ditto for IAsyncEnumerator

Yes, I included it here simply as a convenient place to put it. If it were to be included in the framework, it would certainly be put somewhere more general, especially with language support depending on it.

as illustrated by the SimpleQueue included here and other places

Didn't quite understand this... SimpleQueue isn't a deque, it's just a regular queue without some of the (small) overheads of Queue<T>.

Initial implementation of System.Threading.Tasks.Channels
Prototype of a new System.Threading.Tasks.Channels library, which provides data structures for handing off data between producers and consumers.  The included README.md provides an overview of the library and its exposed functionality.

An initial set of functional tests are included, with ~95% line and branch coverage.

Further work is desired to expose more operations over channels, provide additional built-in channel implementations, and optimize the existing channel and operator implementations further.
@StephenCleary

This comment has been minimized.

Show comment
Hide comment
@StephenCleary

StephenCleary Oct 16, 2015

Didn't quite understand this... SimpleQueue isn't a deque, it's just a regular queue without some of the (small) overheads of Queue.

You're right, of course. Not sure why I was thinking it was a deque. :)

StephenCleary commented Oct 16, 2015

Didn't quite understand this... SimpleQueue isn't a deque, it's just a regular queue without some of the (small) overheads of Queue.

You're right, of course. Not sure why I was thinking it was a deque. :)

/// <summary>A task completed when the channel is done.</summary>
private readonly TaskCompletionSource<VoidResult> _completion = new TaskCompletionSource<VoidResult>(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>The wrapped enumerator.</summary>
private readonly IEnumerator<T> _source;

This comment has been minimized.

@MrJul

MrJul Oct 16, 2015

This enumerator is never disposed. Shouldn't it be disposed when the channel is complete?

@MrJul

MrJul Oct 16, 2015

This enumerator is never disposed. Shouldn't it be disposed when the channel is complete?

This comment has been minimized.

@stephentoub

stephentoub Oct 16, 2015

Member

Thanks. Yup, it should. I'll fix that.

@stephentoub

stephentoub Oct 16, 2015

Member

Thanks. Yup, it should. I'll fix that.

@i3arnon

This comment has been minimized.

Show comment
Hide comment
@i3arnon

i3arnon Oct 17, 2015

Shouldn't IAsyncEnumerator (like IEnumerable) be covariant (i.e. out T)?

Shouldn't IAsyncEnumerator (like IEnumerable) be covariant (i.e. out T)?

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 17, 2015

Owner

Fixed.

Owner

stephentoub replied Oct 17, 2015

Fixed.

stephentoub added a commit that referenced this pull request Oct 17, 2015

Merge pull request #335 from stephentoub/initial_channels
Initial implementation of System.Threading.Tasks.Channels

@stephentoub stephentoub merged commit a614ace into dotnet:master Oct 17, 2015

@stephentoub stephentoub deleted the stephentoub:initial_channels branch Oct 17, 2015

@letrec

This comment has been minimized.

Show comment
Hide comment
@letrec

letrec Oct 19, 2015

Support cancellation?

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

I didn't have one here on purpose. Cancellation can be supported by passing the CancellationToken into the thing that creates the IAsyncEnumerator (if you look at the Channel.GetAsyncEnumerator extension, you'll see it takes a CancellationToken). My thinking is that it's very unlikely you'd want a difference CancellationToken for different pulls on the enumerator, and assuming there was language support eventually, the logical place to pass the token would be to the creation of the enumerable/enumerator, as the calls to MoveNextAsync would be hidden the compiler-generated machinery, e.g.

foreach (await int i in channel.GetAsyncEnumerator(token)) { ... }

This would also help to avoid operator explosion if we were to implement a set of LINQ operators over IAsyncEnumerable/IAsyncEnumerator, as you could take an approach like we have in PLINQ, exposing a WithCancellation operator that takes the token and flows it through the remainder of the query.

Owner

stephentoub replied Oct 19, 2015

I didn't have one here on purpose. Cancellation can be supported by passing the CancellationToken into the thing that creates the IAsyncEnumerator (if you look at the Channel.GetAsyncEnumerator extension, you'll see it takes a CancellationToken). My thinking is that it's very unlikely you'd want a difference CancellationToken for different pulls on the enumerator, and assuming there was language support eventually, the logical place to pass the token would be to the creation of the enumerable/enumerator, as the calls to MoveNextAsync would be hidden the compiler-generated machinery, e.g.

foreach (await int i in channel.GetAsyncEnumerator(token)) { ... }

This would also help to avoid operator explosion if we were to implement a set of LINQ operators over IAsyncEnumerable/IAsyncEnumerator, as you could take an approach like we have in PLINQ, exposing a WithCancellation operator that takes the token and flows it through the remainder of the query.

@letrec

This comment has been minimized.

Show comment
Hide comment
@letrec

letrec Oct 19, 2015

Is lack of .ConfigureAwait(false) intentional?

Is lack of .ConfigureAwait(false) intentional?

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

Yes. This function is only used when t has already completed, and has completed due to some kind of failure. In that case the await will complete synchronously and will never create a continuation, such that ConfigureAwait isn't relevant.

Owner

stephentoub replied Oct 19, 2015

Yes. This function is only used when t has already completed, and has completed due to some kind of failure. In that case the await will complete synchronously and will never create a continuation, such that ConfigureAwait isn't relevant.

This comment has been minimized.

Show comment
Hide comment
@i3arnon

i3arnon Oct 19, 2015

@letrec It's already faulted or canceled. await will throw synchronously anyway...

i3arnon replied Oct 19, 2015

@letrec It's already faulted or canceled. await will throw synchronously anyway...

@letrec

This comment has been minimized.

Show comment
Hide comment
@letrec

letrec Oct 19, 2015

Judging by the implementation, a channel can be completed at most once and it's an error to complete it more than once. It would be nice to document this fact (and the exception) if it should be the case for all implementations.

Judging by the implementation, a channel can be completed at most once and it's an error to complete it more than once. It would be nice to document this fact (and the exception) if it should be the case for all implementations.

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

Thanks. That was my intention, yes. Do you agree with the approach? Disagree? I'll update the docs to include the current approach/requirement.

Owner

stephentoub replied Oct 19, 2015

Thanks. That was my intention, yes. Do you agree with the approach? Disagree? I'll update the docs to include the current approach/requirement.

This comment has been minimized.

Show comment
Hide comment
@i3arnon

i3arnon Oct 19, 2015

@stephentoub If Complete may throw then a TryComplete would be useful. Especially if a channel can have concurrent producers.

i3arnon replied Oct 19, 2015

@stephentoub If Complete may throw then a TryComplete would be useful. Especially if a channel can have concurrent producers.

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

Thanks. I'll think about it. Might make sense to change the interface to provide a TryComplete method that returns true/false and then have a Complete extension method that calls TryComplete and throws if it returns false.

Owner

stephentoub replied Oct 19, 2015

Thanks. I'll think about it. Might make sense to change the interface to provide a TryComplete method that returns true/false and then have a Complete extension method that calls TryComplete and throws if it returns false.

This comment has been minimized.

Show comment
Hide comment
@letrec

letrec Oct 19, 2015

It's hard to say without knowing the intended semantics of WriteAsync and WaitToWriteAsync. It seems like it's OK to attempt writing to an already completed channel, not sure why completion should be any different. I personally have found the Rx approach of silently ignoring all On* operations invoked on a completed sequence very convenient.

letrec replied Oct 19, 2015

It's hard to say without knowing the intended semantics of WriteAsync and WaitToWriteAsync. It seems like it's OK to attempt writing to an already completed channel, not sure why completion should be any different. I personally have found the Rx approach of silently ignoring all On* operations invoked on a completed sequence very convenient.

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

without knowing the intended semantics of WriteAsync and WaitToWriteAsync

TryWrite should return false if the write can't complete. Once such reason would be if the channel is completed.

WaitToWriteAsync returns a Task<bool> that should complete with a result of false if the channel completed and is no longer accepting written data.

WriteAsync returns a Task that should complete as either faulted or canceled if the channel completed is no longer acepting written data.

Since you control when the channel completes, it shouldn't be completed until no one else will be writing to it, and if someone else does write to it, they get a false result or an error that they can choose to respond to, either by ignoring it or by taking some special action. I think it'd be dangerous in general for the interfaces to make it easy to think that data was put into the channel when in reality it wasn't, though I understand for some situations there's a small convenience factor to that.

Owner

stephentoub replied Oct 19, 2015

without knowing the intended semantics of WriteAsync and WaitToWriteAsync

TryWrite should return false if the write can't complete. Once such reason would be if the channel is completed.

WaitToWriteAsync returns a Task<bool> that should complete with a result of false if the channel completed and is no longer accepting written data.

WriteAsync returns a Task that should complete as either faulted or canceled if the channel completed is no longer acepting written data.

Since you control when the channel completes, it shouldn't be completed until no one else will be writing to it, and if someone else does write to it, they get a false result or an error that they can choose to respond to, either by ignoring it or by taking some special action. I think it'd be dangerous in general for the interfaces to make it easy to think that data was put into the channel when in reality it wasn't, though I understand for some situations there's a small convenience factor to that.

This comment has been minimized.

Show comment
Hide comment
@letrec

letrec Oct 19, 2015

I do agree completely that the caller should have a clear indication of the effect of the call.
Should WaitToWriteAsync return true if the channel hasn't been completed yet and one can call WriteAsync? It is going to be hard to use correctly, if true.
Would coalescing the two into a single method public Task<bool> WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) make it safer?

letrec replied Oct 19, 2015

I do agree completely that the caller should have a clear indication of the effect of the call.
Should WaitToWriteAsync return true if the channel hasn't been completed yet and one can call WriteAsync? It is going to be hard to use correctly, if true.
Would coalescing the two into a single method public Task<bool> WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) make it safer?

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

Should WaitToWriteAsync return true if the channel hasn't been completed yet and one can call WriteAsync?

It'll return a Task<bool> with a result of true when the caller can attempt to write... doesn't mean the write will be successful, but it means the channel is suggesting that something has changed such that the caller can try. For example, let's say there's a channel with a bounded capacity of 5 and there are already 5 items in it. If you call WaitToWriteAsync, you'll get back a Task<bool> that'll complete as true when someone reads/removes one of the items. But between the time that you see that true result and try to write, someone else may write to the channel and fill it.

Would coalescing the two into a single method public Task WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) make it safer?

You're suggesting changing WriteAsync to return a Task<bool> instead of a Task, right? I considered that, but it breaks the parallel with ReadAsync, which returns a Task<TResult>. ReadAsync could of course be updated to Task<KeyValuePair<bool, TResult>> or something like that, but that then starts to complicate the model in other ways. And I'm not convinced that the case of writing after completion (given that completion is optional and entirely the responsibility of the writer) is common enough to warrant such changes to the model, when the alternate is simply to catch an exception / check task.IsFaulted / etc. if you are actually concerned about writing after completion.

Owner

stephentoub replied Oct 19, 2015

Should WaitToWriteAsync return true if the channel hasn't been completed yet and one can call WriteAsync?

It'll return a Task<bool> with a result of true when the caller can attempt to write... doesn't mean the write will be successful, but it means the channel is suggesting that something has changed such that the caller can try. For example, let's say there's a channel with a bounded capacity of 5 and there are already 5 items in it. If you call WaitToWriteAsync, you'll get back a Task<bool> that'll complete as true when someone reads/removes one of the items. But between the time that you see that true result and try to write, someone else may write to the channel and fill it.

Would coalescing the two into a single method public Task WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) make it safer?

You're suggesting changing WriteAsync to return a Task<bool> instead of a Task, right? I considered that, but it breaks the parallel with ReadAsync, which returns a Task<TResult>. ReadAsync could of course be updated to Task<KeyValuePair<bool, TResult>> or something like that, but that then starts to complicate the model in other ways. And I'm not convinced that the case of writing after completion (given that completion is optional and entirely the responsibility of the writer) is common enough to warrant such changes to the model, when the alternate is simply to catch an exception / check task.IsFaulted / etc. if you are actually concerned about writing after completion.

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 19, 2015

Owner

(All that said, I'm certainly open to changing it if my assumptions prove incorrect. I'd like to leave it as-is for now, but we can always change it later. Please keep the feedback coming 😄)

Owner

stephentoub replied Oct 19, 2015

(All that said, I'm certainly open to changing it if my assumptions prove incorrect. I'd like to leave it as-is for now, but we can always change it later. Please keep the feedback coming 😄)

@IanYates

This comment has been minimized.

Show comment
Hide comment
@IanYates

IanYates Oct 20, 2015

Just a quick note to say the link to the readme in the first post is now broken - it should be to https://github.com/stephentoub/corefxlab/tree/master/src/System.Threading.Tasks.Channels (and possibly having README.md tack on there if you wish) instead of https://github.com/stephentoub/corefxlab/blob/initial_channels/src/System.Threading.Tasks.Channels/README.md

IanYates commented Oct 20, 2015

Just a quick note to say the link to the readme in the first post is now broken - it should be to https://github.com/stephentoub/corefxlab/tree/master/src/System.Threading.Tasks.Channels (and possibly having README.md tack on there if you wish) instead of https://github.com/stephentoub/corefxlab/blob/initial_channels/src/System.Threading.Tasks.Channels/README.md

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 20, 2015

Member

Thanks, @IanYates. I've now fixed the link.

Member

stephentoub commented Oct 20, 2015

Thanks, @IanYates. I've now fixed the link.

@i3arnon

This comment has been minimized.

Show comment
Hide comment
@i3arnon

i3arnon Oct 21, 2015

Collaborator

I think a sentence or two on what's ValueTask<T> and what is it for would be nice as It's part of the public API but isn't a common concept (as far as I know).

Collaborator

i3arnon commented Oct 21, 2015

I think a sentence or two on what's ValueTask<T> and what is it for would be nice as It's part of the public API but isn't a common concept (as far as I know).

@i3arnon

This comment has been minimized.

Show comment
Hide comment
@i3arnon

i3arnon Oct 21, 2015

DeserializationChannel.TryRead (together with SerializationChannel.TryWrite) is marked as unsafe without an apparent reason (to me). Am I missing something?

DeserializationChannel.TryRead (together with SerializationChannel.TryWrite) is marked as unsafe without an apparent reason (to me). Am I missing something?

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Oct 21, 2015

Member

I think a sentence or two on what's ValueTask

Sure, I can add that.

Am I missing something?

Probably not. I likely just neglected to remove the unsafe annotation after some experiments with different implementations. I'll take a look and remove them assuming it's possible.

Member

stephentoub commented Oct 21, 2015

I think a sentence or two on what's ValueTask

Sure, I can add that.

Am I missing something?

Probably not. I likely just neglected to remove the unsafe annotation after some experiments with different implementations. I'll take a look and remove them assuming it's possible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment