Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Add support for streaming arguments to clients using Channel (#2441)
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanDmitri authored and analogrelay committed Aug 1, 2018
1 parent e6b5c1c commit 7670614
Show file tree
Hide file tree
Showing 41 changed files with 1,797 additions and 222 deletions.
2 changes: 1 addition & 1 deletion NuGet.config
Expand Up @@ -2,6 +2,6 @@
<configuration>
<packageSources>
<clear />
<!-- Restore sources should be defined in build/sources.props. -->
<!-- Restore sources should be defined in build/sources.props -->
</packageSources>
</configuration>
Expand Up @@ -59,5 +59,10 @@ public Type GetReturnType(string invocationId)
}
throw new InvalidOperationException("Unexpected binder call");
}

public Type GetStreamItemType(string streamId)
{
throw new NotImplementedException();
}
}
}
254 changes: 127 additions & 127 deletions clients/ts/FunctionalTests/package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions samples/ClientSample/Program.cs
Expand Up @@ -26,6 +26,8 @@ public static void Main(string[] args)

RawSample.Register(app);
HubSample.Register(app);
StreamingSample.Register(app);
UploadSample.Register(app);

app.Command("help", cmd =>
{
Expand Down
46 changes: 46 additions & 0 deletions samples/ClientSample/StreamingSample.cs
@@ -0,0 +1,46 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.CommandLineUtils;

namespace ClientSample
{
internal class StreamingSample
{
internal static void Register(CommandLineApplication app)
{
app.Command("streaming", cmd =>
{
cmd.Description = "Tests a streaming connection to a hub";
var baseUrlArgument = cmd.Argument("<BASEURL>", "The URL to the Chat Hub to test");
cmd.OnExecute(() => ExecuteAsync(baseUrlArgument.Value));
});
}

public static async Task<int> ExecuteAsync(string baseUrl)
{
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.Build();

await connection.StartAsync();

var reader = await connection.StreamAsChannelAsync<int>("ChannelCounter", 10, 2000);

while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var item))
{
Console.WriteLine($"received: {item}");
}
}

return 0;
}
}
}
90 changes: 90 additions & 0 deletions samples/ClientSample/UploadSample.cs
@@ -0,0 +1,90 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.CommandLineUtils;

namespace ClientSample
{
internal class UploadSample
{
internal static void Register(CommandLineApplication app)
{
app.Command("uploading", cmd =>
{
cmd.Description = "Tests a streaming invocation from client to hub";
var baseUrlArgument = cmd.Argument("<BASEURL>", "The URL to the Chat Hub to test");
cmd.OnExecute(() => ExecuteAsync(baseUrlArgument.Value));
});
}

public static async Task<int> ExecuteAsync(string baseUrl)
{
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.Build();
await connection.StartAsync();

await BasicInvoke(connection);
//await MultiParamInvoke(connection);
//await AdditionalArgs(connection);

return 0;
}

public static async Task BasicInvoke(HubConnection connection)
{
var channel = Channel.CreateUnbounded<string>();
var invokeTask = connection.InvokeAsync<string>("UploadWord", channel.Reader);

foreach (var c in "hello")
{
await channel.Writer.WriteAsync(c.ToString());
}
channel.Writer.TryComplete();

var result = await invokeTask;
Debug.WriteLine($"You message was: {result}");
}

private static async Task WriteStreamAsync<T>(IEnumerable<T> sequence, ChannelWriter<T> writer)
{
foreach (T element in sequence)
{
await writer.WriteAsync(element);
await Task.Delay(100);
}

writer.TryComplete();
}

public static async Task MultiParamInvoke(HubConnection connection)
{
var letters = Channel.CreateUnbounded<string>();
var numbers = Channel.CreateUnbounded<int>();

_ = WriteStreamAsync(new[] { "h", "i", "!" }, letters.Writer);
_ = WriteStreamAsync(new[] { 1, 2, 3, 4, 5 }, numbers.Writer);

var result = await connection.InvokeAsync<string>("DoubleStreamUpload", letters.Reader, numbers.Reader);

Debug.WriteLine(result);
}

public static async Task AdditionalArgs(HubConnection connection)
{
var channel = Channel.CreateUnbounded<char>();
_ = WriteStreamAsync<char>("main message".ToCharArray(), channel.Writer);

var result = await connection.InvokeAsync<string>("UploadWithSuffix", channel.Reader, " + wooh I'm a suffix");
Debug.WriteLine($"Your message was: {result}");
}
}
}

110 changes: 110 additions & 0 deletions samples/SignalRSamples/Hubs/UploadHub.cs
@@ -0,0 +1,110 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;

namespace SignalRSamples.Hubs
{
public class UploadHub : Hub
{
public async Task<string> DoubleStreamUpload(ChannelReader<string> letters, ChannelReader<int> numbers)
{
var total = await Sum(numbers);
var word = await UploadWord(letters);

return string.Format("You sent over <{0}> <{1}s>", total, word);
}

public async Task<int> Sum(ChannelReader<int> source)
{
var total = 0;
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
total += item;
}
}
return total;
}

public async Task LocalSum(ChannelReader<int> source)
{
var total = 0;
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
total += item;
}
}
Debug.WriteLine(String.Format("Complete, your total is <{0}>.", total));
}

public async Task<string> UploadWord(ChannelReader<string> source)
{
var sb = new StringBuilder();

// receiving a StreamCompleteMessage should cause this WaitToRead to return false
while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
Debug.WriteLine($"received: {item}");
Console.WriteLine($"received: {item}");
sb.Append(item);
}
}

// method returns, somewhere else returns a CompletionMessage with any errors
return sb.ToString();
}

public async Task<string> UploadWithSuffix(ChannelReader<string> source, string suffix)
{
var sb = new StringBuilder();

while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
await Task.Delay(50);
Debug.WriteLine($"received: {item}");
sb.Append(item);
}
}

sb.Append(suffix);

return sb.ToString();
}

public async Task<string> UploadFile(ChannelReader<byte[]> source, string filepath)
{
var result = Enumerable.Empty<byte>();
int chunk = 1;

while (await source.WaitToReadAsync())
{
while (source.TryRead(out var item))
{
Debug.WriteLine($"received chunk #{chunk++}");
result = result.Concat(item); // atrocious
await Task.Delay(50);
}
}

File.WriteAllBytes(filepath, result.ToArray());

Debug.WriteLine("returning status code");
return $"file written to '{filepath}'";
}
}
}
1 change: 1 addition & 0 deletions samples/SignalRSamples/Startup.cs
Expand Up @@ -60,6 +60,7 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env)
routes.MapHub<DynamicChat>("/dynamic");
routes.MapHub<Chat>("/default");
routes.MapHub<Streaming>("/streaming");
routes.MapHub<UploadHub>("/uploading");
routes.MapHub<HubTChat>("/hubT");
});

Expand Down
39 changes: 39 additions & 0 deletions src/Common/ReflectionHelper.cs
@@ -0,0 +1,39 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Channels;

namespace Microsoft.AspNetCore.SignalR
{
internal static class ReflectionHelper
{
public static bool IsStreamingType(Type type)
{
// IMPORTANT !!
// All valid types must be generic
// because HubConnectionContext gets the generic argument and uses it to determine the expected item type of the stream
// The long-term solution is making a (streaming type => expected item type) method.

if (!type.IsGenericType)
{
return false;
}

// walk up inheritance chain, until parent is either null or a ChannelReader<T>
// TODO #2594 - add Streams here, to make sending files easy
while (type != null)
{
if (type.GetGenericTypeDefinition() == typeof(ChannelReader<>))
{
return true;
}

type = type.BaseType;
}
return false;
}
}
}
32 changes: 32 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.Log.cs
Expand Up @@ -186,6 +186,18 @@ private static class Log
private static readonly Action<ILogger, Exception> _unableToAcquireConnectionLockForPing =
LoggerMessage.Define(LogLevel.Trace, new EventId(62, "UnableToAcquireConnectionLockForPing"), "Skipping ping because a send is already in progress.");

private static readonly Action<ILogger, string, Exception> _startingStream =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(63, "StartingStream"), "Initiating stream '{StreamId}'.");

private static readonly Action<ILogger, string, Exception> _sendingStreamItem =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(64, "StreamItemSent"), "Sending item for stream '{StreamId}'.");

private static readonly Action<ILogger, string, Exception> _cancelingStream =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(65, "CancelingStream"), "Stream '{StreamId}' has been canceled by client.");

private static readonly Action<ILogger, string, Exception> _completingStream =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(66, "CompletingStream"), "Sending completion message for stream '{StreamId}'.");

public static void PreparingNonBlockingInvocation(ILogger logger, string target, int count)
{
_preparingNonBlockingInvocation(logger, target, count, null);
Expand Down Expand Up @@ -496,6 +508,26 @@ public static void UnableToAcquireConnectionLockForPing(ILogger logger)
{
_unableToAcquireConnectionLockForPing(logger, null);
}

public static void StartingStream(ILogger logger, string streamId)
{
_startingStream(logger, streamId, null);
}

public static void SendingStreamItem(ILogger logger, string streamId)
{
_sendingStreamItem(logger, streamId, null);
}

public static void CancelingStream(ILogger logger, string streamId)
{
_cancelingStream(logger, streamId, null);
}

public static void CompletingStream(ILogger logger, string streamId)
{
_completingStream(logger, streamId, null);
}
}
}
}
Expand Down

0 comments on commit 7670614

Please sign in to comment.