Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Pawelka/channels client #141

Closed
wants to merge 7 commits into from
Closed

Pawelka/channels client #141

wants to merge 7 commits into from

Conversation

moozzyk
Copy link
Contributor

@moozzyk moozzyk commented Jan 20, 2017

No description provided.

"xmlDoc": true
"xmlDoc": true,
"compile": {
"include": [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create a separate common project to share this code but it felt too much for these 3 file (one of which we hope will make it to the channels)

using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Sockets.Client
{
public class Connection : IPipelineConnection
public class Connection : IChannelConnection<Message>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about Connection being IChannelConnection

  • exposing Message
  • stopping the connection is ugly and has loose ends (especially you need to drain the channel in order to make the Completion task complete)

I updated issue #51 but we need more to do more thinking here (or maybe it is fine at the low level?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related/rephrased - it is impossible to tell currently what state the client is in when it is disconnected but the channel is not drained

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this a bit and I wonder if there's an upside in exposing the channel interfaces on the client side. The benefit in theory is the ability to share code between client and server but we aren't really doing that. We should make sure the design is right here before going too far with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We need to talk about this but this would be separate from this commit.

{
public struct Message : IDisposable
{
public Format MessageFormat { get; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want the connection to be IChannel then we should add make using this struct more convenient (e.g. add Message(string data)/Message(byte[] data) ctors etc)

{
while (!cancellationToken.IsCancellationRequested)
Message message;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should dispose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated everywhere we Read messages.

{
public class ConnectionTests
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

@moozzyk moozzyk force-pushed the pawelka/channels-client branch 2 times, most recently from c289461 to 4be02b7 Compare January 20, 2017 20:00
new Message(ReadableBuffer.Create(new byte[] { 1, 1, 2, 3, 5, 8 }).Preserve(), Format.Binary));

Assert.Equal(sendTcs.Task, await Task.WhenAny(Task.Delay(10000), sendTcs.Task));
Assert.Equal(new byte[] { 1, 1, 2, 3, 5, 8 }, sendTcs.Task.Result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract to var data or something

connection.Output.TryWrite(
new Message(ReadableBuffer.Create(new byte[] { 1, 1, 2, 3, 5, 8 }).Preserve(), Format.Binary));

Assert.Equal(sendTcs.Task, await Task.WhenAny(Task.Delay(10000), sendTcs.Task));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 seconds seems a bit much

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. I set it to 10 secs when debugging.

@@ -23,7 +24,8 @@ public class LongPollingTransport : ITransport
private readonly CancellationTokenSource _senderCts = new CancellationTokenSource();
private readonly CancellationTokenSource _pollCts = new CancellationTokenSource();

private IPipelineConnection _pipeline;
private IChannelConnection<Message> _toFromConnection;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove new line

Assert.False(connection.Input.Completion.IsCompleted);
connection.Output.TryComplete();

var completionTask = Task.WhenAny(Task.Delay(1000), connection.Input.Completion);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd, if we need to read messages to complete the task then Task.Delay will always be returned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice we don't await the task here but below, after we have drained the channel. Perhaps I should call it whenAny task and not the completionTask.

await buffer.FlushAsync();
var ms = new MemoryStream();
await response.Content.CopyToAsync(ms);
var message = new Message(ReadableBuffer.Create(ms.ToArray()).Preserve(), Format.Text);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this message be disposed as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We are sending it and the receiver is responsible for disposing it. Otherwise we could dispose something they are using.

[Fact]
public async Task LongPollingTransportStopsPollAndSendLoopsWhenTransportDisposed()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be repeated. Can we move some of this set up into a helper method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no point in just moving this single line and the configuration varies (the returned response) across the tests so I decided to leave it as is.

// and stash it here because we know the callback will have finished before the end of the await.
var message = await _adapter.ReadMessageAsync(_stream, _binder, cancellationToken);
Message incomingMessage;
if (!_connection.Input.TryRead(out incomingMessage))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a while loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? No one else is reading so it should always succeed. If it fails (what would be the reason?) is there a guarantee that a subsequent call will succeed because if there isn't then we are in tight infinite loop that can't be completed (unless we also check the cancellation token inside the loop).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple messages can come in. It's not about failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You basically want to save a call to await _connection.Input.WaitToReadAsync? Because even now multiple messages will work only we will have to call WaitToReadAsync for each message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's the same thing we do on the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.


// Start sending and polling
_poller = Poll(Utils.AppendPath(url, "poll"), _pollCts.Token);
_sender = SendMessages(Utils.AppendPath(url, "send"), _senderCts.Token);
_poller = Poll(Utils.AppendPath(url, "poll"), _pollCts.Token).ContinueWith(_ => _senderCts.Cancel());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to cancel the CTS? Who is it for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to exit the sending loop. They run independently and if polling fails/completes we need to stop the loop that is waiting on the user's requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just complete the channel from the outside? Why the cancellation token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that even if you TryComplete from outside the send loop will continue to run until it drains the channel. This means that until the channel is drained we will keep sending requests to the server even though the connection is already disconnected. Cancellation token prevents that.
(Note: the channel from application to transport is not currently drained - I think it should be done by the side writing to the channel i.e. HubConnection in case of hubs or the user himself when using Sockets but this is one of the issues I wanted to talk about)
I also think that we actually could use just one cancellation tokens for both send and poll loops.

}

public Task StartAsync(Uri url, IPipelineConnection pipeline)
public Task StartAsync(Uri url, IChannelConnection<Message> toFromConnection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming is throwing me off. Can we name things similar to the server? Transport and Application?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually helped me (especially at the beginning) but can change it.

@moozzyk moozzyk force-pushed the pawelka/channels-client branch 3 times, most recently from 0c47a50 to 47f20f1 Compare January 25, 2017 06:07

connectionToTransport.Out.Complete();

Assert.Equal(longPollingTransport.Running, await Task.WhenAny(Task.Delay(1000), longPollingTransport.Running));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to update it to use the new helpers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. I was thinking about creating a Testing.Common project that contains this kind of things since we would want to use it across different test projects in this repo.

@moozzyk
Copy link
Contributor Author

moozzyk commented Jan 26, 2017

🆙 📅

private Task _sender;
private Task _poller;
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();

public Task Running { get; private set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is this task for? It's not part of the interface right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it isn't. It was there and I left it because it is useful for testing but it's a bit weird.

Copy link
Member

@davidfowl davidfowl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now but I'd like to remove all uses of continue with and replace them with async await.

@moozzyk
Copy link
Contributor Author

moozzyk commented Jan 26, 2017

@davidfowl - This change was to convert from pipelines to channels but I agree we need to redesign the client a little bit and remove ContinueWith() usages.

@mikaelm12 mikaelm12 mentioned this pull request Jan 26, 2017
18 tasks
@moozzyk
Copy link
Contributor Author

moozzyk commented Jan 26, 2017

I updated #51 with follow up items from this PR.

@moozzyk
Copy link
Contributor Author

moozzyk commented Jan 26, 2017

Fixed in c997ea8

@moozzyk moozzyk closed this Jan 26, 2017
@moozzyk moozzyk deleted the pawelka/channels-client branch August 23, 2017 23:17
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants