Skip to content

Commit

Permalink
Add ValueTask support to PipeTo (#6025)
Browse files Browse the repository at this point in the history
* Add ValueTask support to PipeTo

* Update API Verify list

* Exclude NETFX from Akka.Tests linux build
  • Loading branch information
Arkatufus committed Jun 28, 2022
1 parent 6839192 commit 0c9677f
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 1 deletion.
Expand Up @@ -1387,9 +1387,13 @@ namespace Akka.Actor
public class static PipeToSupport
{
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
}
public sealed class PoisonPill : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Event.IDeadLetterSuppression
{
Expand Down
Expand Up @@ -1389,9 +1389,13 @@ namespace Akka.Actor
public class static PipeToSupport
{
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
}
public sealed class PoisonPill : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Event.IDeadLetterSuppression
{
Expand Down
Expand Up @@ -1387,9 +1387,13 @@ namespace Akka.Actor
public class static PipeToSupport
{
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
}
public sealed class PoisonPill : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Event.IDeadLetterSuppression
{
Expand Down
63 changes: 63 additions & 0 deletions src/core/Akka.Tests/Actor/PipeToSupportSpec.cs
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
Expand All @@ -22,11 +23,18 @@ public class PipeToSupportSpec : AkkaSpec
private readonly Task<string> _task;
private readonly Task _taskWithoutResult;

private readonly ValueTask<string> _valueTask;
private readonly ValueTask _valueTaskWithoutResult;

public PipeToSupportSpec()
{
_taskCompletionSource = new TaskCompletionSource<string>();
_task = _taskCompletionSource.Task;
_taskWithoutResult = _taskCompletionSource.Task;

_valueTask = new ValueTask<string>(_taskCompletionSource.Task);
_valueTaskWithoutResult = new ValueTask(_taskCompletionSource.Task);

Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
}

Expand All @@ -38,6 +46,14 @@ public async Task Should_immediately_PipeTo_completed_Task()
await ExpectMsgAsync("foo");
}

[Fact]
public async Task ValueTask_Should_immediately_PipeTo_completed_Task()
{
var task = new ValueTask<string>("foo");
task.PipeTo(TestActor);
await ExpectMsgAsync("foo");
}

[Fact]
public async Task Should_by_default_send_task_result_as_message()
{
Expand All @@ -46,6 +62,14 @@ public async Task Should_by_default_send_task_result_as_message()
await ExpectMsgAsync("Hello");
}

[Fact]
public async Task ValueTask_Should_by_default_send_task_result_as_message()
{
_valueTask.PipeTo(TestActor);
_taskCompletionSource.SetResult("Hello");
await ExpectMsgAsync("Hello");
}

[Fact]
public async Task Should_by_default_not_send_a_success_message_if_the_task_does_not_produce_a_result()
{
Expand All @@ -54,6 +78,14 @@ public async Task Should_by_default_not_send_a_success_message_if_the_task_does_
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}

[Fact]
public async Task ValueTask_Should_by_default_not_send_a_success_message_if_the_task_does_not_produce_a_result()
{
_valueTaskWithoutResult.PipeTo(TestActor);
_taskCompletionSource.SetResult("Hello");
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}

[Fact]
public async Task Should_by_default_send_task_exception_as_status_failure_message()
{
Expand All @@ -64,6 +96,16 @@ public async Task Should_by_default_send_task_exception_as_status_failure_messag
await ExpectMsgAsync<Status.Failure>(x => x.Cause.Message == "Boom");
}

[Fact]
public async Task ValueTask_Should_by_default_send_task_exception_as_status_failure_message()
{
_valueTask.PipeTo(TestActor);
_valueTaskWithoutResult.PipeTo(TestActor);
_taskCompletionSource.SetException(new Exception("Boom"));
await ExpectMsgAsync<Status.Failure>(x => x.Cause.Message == "Boom");
await ExpectMsgAsync<Status.Failure>(x => x.Cause.Message == "Boom");
}

[Fact]
public async Task Should_use_success_handling_to_transform_task_result()
{
Expand All @@ -75,6 +117,17 @@ public async Task Should_use_success_handling_to_transform_task_result()
pipeTo.Should().Contain("Hello World");
}

[Fact]
public async Task ValueTask_Should_use_success_handling_to_transform_task_result()
{
_valueTask.PipeTo(TestActor, success: x => "Hello " + x);
_valueTaskWithoutResult.PipeTo(TestActor, success: () => "Hello");
_taskCompletionSource.SetResult("World");
var pipeTo = await ReceiveNAsync(2, default).Cast<string>().ToListAsync();
pipeTo.Should().Contain("Hello");
pipeTo.Should().Contain("Hello World");
}

[Fact]
public async Task Should_use_failure_handling_to_transform_task_exception()
{
Expand All @@ -84,5 +137,15 @@ public async Task Should_use_failure_handling_to_transform_task_exception()
await ExpectMsgAsync("Such a failure...");
await ExpectMsgAsync("Such a failure...");
}

[Fact]
public async Task ValueTask_Should_use_failure_handling_to_transform_task_exception()
{
_valueTask.PipeTo(TestActor, failure: e => "Such a " + e.Message);
_valueTaskWithoutResult.PipeTo(TestActor, failure: e => "Such a " + e.Message);
_taskCompletionSource.SetException(new Exception("failure..."));
await ExpectMsgAsync("Such a failure...");
await ExpectMsgAsync("Such a failure...");
}
}
}
3 changes: 2 additions & 1 deletion src/core/Akka.Tests/Akka.Tests.csproj
Expand Up @@ -4,7 +4,8 @@

<PropertyGroup>
<AssemblyTitle>Akka.Tests</AssemblyTitle>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' != 'Windows_NT' ">$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand Down
69 changes: 69 additions & 0 deletions src/core/Akka/Actor/PipeToSupport.cs
Expand Up @@ -36,6 +36,14 @@ public static class PipeToSupport
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

public static Task PipeTo<T>(
this ValueTask<T> taskToPipe,
ICanTell recipient,
IActorRef sender = null,
Func<T, object> success = null,
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

/// <summary>
/// Pipes the output of a Task directly to the <paramref name="recipient"/>'s mailbox once
/// the task completes
Expand Down Expand Up @@ -74,6 +82,32 @@ public static class PipeToSupport
}
}

public static async Task PipeTo<T>(
this ValueTask<T> taskToPipe,
ICanTell recipient,
bool useConfigureAwait,
IActorRef sender = null,
Func<T, object> success = null,
Func<Exception, object> failure = null)
{
sender ??= ActorRefs.NoSender;

try
{
var result = await taskToPipe.ConfigureAwait(useConfigureAwait);

recipient.Tell(success != null
? success(result)
: result, sender);
}
catch (Exception ex)
{
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
}

/// <summary>
/// Pipes the output of a Task directly to the <paramref name="recipient"/>'s mailbox once
/// the task completes. As this task has no result, only exceptions will be piped to the <paramref name="recipient"/>
Expand All @@ -92,6 +126,14 @@ public static class PipeToSupport
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

public static Task PipeTo(
this ValueTask taskToPipe,
ICanTell recipient,
IActorRef sender = null,
Func<object> success = null,
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

/// <summary>
/// Pipes the output of a Task directly to the <paramref name="recipient"/>'s mailbox once
/// the task completes. As this task has no result, only exceptions will be piped to the <paramref name="recipient"/>
Expand Down Expand Up @@ -129,6 +171,33 @@ public static class PipeToSupport
: new Status.Failure(ex), sender);
}
}

public static async Task PipeTo(
this ValueTask taskToPipe,
ICanTell recipient,
bool useConfigureAwait,
IActorRef sender = null,
Func<object> success = null,
Func<Exception, object> failure = null)
{
sender = sender ?? ActorRefs.NoSender;

try
{
await taskToPipe.ConfigureAwait(useConfigureAwait);

if (success != null)
{
recipient.Tell(success(), sender);
}
}
catch (Exception ex)
{
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
}
}
}

0 comments on commit 0c9677f

Please sign in to comment.