Permalink
Browse files

1089: Exception inside PushStreamContent won't be caught by

ExceptionFilterAttribute

Adding overloads for async onStreamAvailable handlers. Aslo, getting rid
of the whole CompleteTaskOnCloseStream. The task is complete either if you
return from your sync OnStreamAvailable handler or when the task that you
return from you async OnStreamAvailable handler completes.

Also, note that adding the overloads is potentaillay a compile time
breaking change due to the way method group overload resolutions work.
http://stackoverflow.com/questions/2057146/compiler-ambiguous-invocation-error-anonymous-method-and-method-group-with-fun
  • Loading branch information...
1 parent 188a4f9 commit 262ec8b273e2c8b7a4ae4cc7d43ad8e3f9c36c64 raghuramn committed Aug 21, 2013
@@ -18,36 +18,58 @@ namespace System.Net.Http
/// </summary>
public class PushStreamContent : HttpContent
{
- private readonly Action<Stream, HttpContent, TransportContext> _onStreamAvailable;
+ private readonly Func<Stream, HttpContent, TransportContext, Task> _onStreamAvailable;
/// <summary>
/// Initializes a new instance of the <see cref="PushStreamContent"/> class. The
/// <paramref name="onStreamAvailable"/> action is called when an output stream
/// has become available allowing the action to write to it directly. When the
/// stream is closed, it will signal to the content that is has completed and the
/// HTTP request or response will be completed.
- /// </summary>
- /// <param name="onStreamAvailable">The action to call when an output stream
- /// is available. Close the stream to complete the HTTP request or response.</param>
+ /// <param name="onStreamAvailable">The action to call when an output stream is available.</param>
public PushStreamContent(Action<Stream, HttpContent, TransportContext> onStreamAvailable)
+ : this(Taskify(onStreamAvailable), (MediaTypeHeaderValue)null)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PushStreamContent"/> class.
+ /// </summary>
+ /// <param name="onStreamAvailable">The action to call when an output stream is available. The stream is automatically
+ /// closed when the return task is completed.</param>
+ public PushStreamContent(Func<Stream, HttpContent, TransportContext, Task> onStreamAvailable)
: this(onStreamAvailable, (MediaTypeHeaderValue)null)
{
}
/// <summary>
- /// Initializes a new instance of the <see cref="PushStreamContent"/> class
- /// with the given media type.
+ /// Initializes a new instance of the <see cref="PushStreamContent"/> class with the given media type.
/// </summary>
public PushStreamContent(Action<Stream, HttpContent, TransportContext> onStreamAvailable, string mediaType)
+ : this(Taskify(onStreamAvailable), new MediaTypeHeaderValue(mediaType))
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PushStreamContent"/> class with the given media type.
+ /// </summary>
+ public PushStreamContent(Func<Stream, HttpContent, TransportContext, Task> onStreamAvailable, string mediaType)
: this(onStreamAvailable, new MediaTypeHeaderValue(mediaType))
{
}
/// <summary>
- /// Initializes a new instance of the <see cref="PushStreamContent"/> class
- /// with the given <see cref="MediaTypeHeaderValue"/>.
+ /// Initializes a new instance of the <see cref="PushStreamContent"/> class with the given <see cref="MediaTypeHeaderValue"/>.
/// </summary>
public PushStreamContent(Action<Stream, HttpContent, TransportContext> onStreamAvailable, MediaTypeHeaderValue mediaType)
+ : this(Taskify(onStreamAvailable), mediaType)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PushStreamContent"/> class with the given <see cref="MediaTypeHeaderValue"/>.
+ /// </summary>
+ public PushStreamContent(Func<Stream, HttpContent, TransportContext, Task> onStreamAvailable, MediaTypeHeaderValue mediaType)
{
if (onStreamAvailable == null)
{
@@ -58,6 +80,21 @@ public PushStreamContent(Action<Stream, HttpContent, TransportContext> onStreamA
Headers.ContentType = mediaType ?? MediaTypeConstants.ApplicationOctetStreamMediaType;
}
+ private static Func<Stream, HttpContent, TransportContext, Task> Taskify(
+ Action<Stream, HttpContent, TransportContext> onStreamAvailable)
+ {
+ if (onStreamAvailable == null)
+ {
+ throw Error.ArgumentNull("onStreamAvailable");
+ }
+
+ return (Stream stream, HttpContent content, TransportContext transportContext) =>
+ {
+ onStreamAvailable(stream, content, transportContext);
+ return TaskHelpers.Completed();
+ };
+ }
+
/// <summary>
/// When this method is called, it calls the action provided in the constructor with the output
/// stream to write to. Once the action has completed its work it closes the stream which will
@@ -67,20 +104,15 @@ public PushStreamContent(Action<Stream, HttpContent, TransportContext> onStreamA
/// <param name="context">The associated <see cref="TransportContext"/>.</param>
/// <returns>A <see cref="Task"/> instance that is asynchronously serializing the object's content.</returns>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Exception is passed as task result.")]
- protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
+ protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
TaskCompletionSource<bool> serializeToStreamTask = new TaskCompletionSource<bool>();
- try
- {
- Stream wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
- _onStreamAvailable(wrappedStream, this, context);
- }
- catch (Exception e)
- {
- serializeToStreamTask.TrySetException(e);
- }
- return serializeToStreamTask.Task;
+ Stream wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
+ await _onStreamAvailable(wrappedStream, this, context);
+
+ // wait for wrappedStream.Close/Dispose to get called.
+ await serializeToStreamTask.Task;
}
/// <summary>
@@ -14,30 +14,31 @@ public class PushStreamContentTest
[Fact]
public void Constructor_ThrowsOnNullAction()
{
- Assert.ThrowsArgumentNull(() => new PushStreamContent(null), "onStreamAvailable");
+ Action<Stream, HttpContent, TransportContext> action = null;
+ Assert.ThrowsArgumentNull(() => new PushStreamContent(action), "onStreamAvailable");
}
[Fact]
public void Constructor_SetsDefaultMediaType()
{
- MockStreamAction streamAction = new MockStreamAction();
- PushStreamContent content = new PushStreamContent(streamAction.Action);
+ Action<Stream, HttpContent, TransportContext> streamAction = new MockStreamAction().Action;
+ PushStreamContent content = new PushStreamContent(streamAction);
Assert.Equal(MediaTypeConstants.ApplicationOctetStreamMediaType, content.Headers.ContentType);
}
[Fact]
public void Constructor_SetsMediaTypeFromString()
{
- MockStreamAction streamAction = new MockStreamAction();
- PushStreamContent content = new PushStreamContent(streamAction.Action, "text/xml");
+ Action<Stream, HttpContent, TransportContext> streamAction = new MockStreamAction().Action;
+ PushStreamContent content = new PushStreamContent(streamAction, "text/xml");
Assert.Equal(MediaTypeConstants.TextXmlMediaType, content.Headers.ContentType);
}
[Fact]
public void Constructor_SetsMediaType()
{
- MockStreamAction streamAction = new MockStreamAction();
- PushStreamContent content = new PushStreamContent(streamAction.Action, MediaTypeConstants.TextXmlMediaType);
+ Action<Stream, HttpContent, TransportContext> streamAction = new MockStreamAction().Action;
+ PushStreamContent content = new PushStreamContent(streamAction, MediaTypeConstants.TextXmlMediaType);
Assert.Equal(MediaTypeConstants.TextXmlMediaType, content.Headers.ContentType);
}
@@ -47,7 +48,7 @@ public Task SerializeToStreamAsync_CallsAction()
// Arrange
MemoryStream outputStream = new MemoryStream();
MockStreamAction streamAction = new MockStreamAction(close: true);
- PushStreamContent content = new PushStreamContent(streamAction.Action);
+ PushStreamContent content = new PushStreamContent((Action<Stream, HttpContent, TransportContext>)streamAction.Action);
// Act
return content.CopyToAsync(outputStream).ContinueWith(
@@ -67,7 +68,7 @@ public Task SerializeToStreamAsync_CallsAction()
Assert.True(outputStream.CanRead);
#endif
- });
+ });
}
[Fact]
@@ -76,7 +77,7 @@ public Task SerializeToStreamAsync_CompletesTaskOnActionException()
// Arrange
MemoryStream outputStream = new MemoryStream();
MockStreamAction streamAction = new MockStreamAction(throwException: true);
- PushStreamContent content = new PushStreamContent(streamAction.Action);
+ PushStreamContent content = new PushStreamContent((Action<Stream, HttpContent, TransportContext>)streamAction.Action);
// Act
return content.CopyToAsync(outputStream).ContinueWith(
@@ -150,6 +151,33 @@ public void NonClosingDelegatingStream_Close_CompletesTaskButDoNotCloseInnerStre
Assert.True(serializeToStreamTask.Task.Result);
}
#endif
+ [Fact]
+ public async Task PushStreamContentWithAsyncOnStreamAvailableHandler_ExceptionsInOnStreamAvailable_AreCaught()
+ {
+ // Arrange
+ bool faulted = false;
+ Exception exception = new ApplicationException();
+ PushStreamContent content = new PushStreamContent(async (s, c, tc) =>
+ {
+ await Task.FromResult(42);
+ throw exception;
+ });
+ MemoryStream stream = new MemoryStream();
+
+ try
+ {
+ // Act
+ await content.CopyToAsync(stream);
+ }
+ catch (ApplicationException e)
+ {
+ Assert.Same(exception, e);
+ faulted = true;
+ }
+
+ // Assert
+ Assert.True(faulted);
+ }
private class MockStreamAction
{

0 comments on commit 262ec8b

Please sign in to comment.