Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Timeout Property to DurableHttpRequest #1547

Merged
merged 9 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,21 @@ Task<DurableHttpResponse> IDurableOrchestrationContext.CallHttpAsync(HttpMethod

async Task<DurableHttpResponse> IDurableOrchestrationContext.CallHttpAsync(DurableHttpRequest req)
{
DurableHttpResponse durableHttpResponse = await this.ScheduleDurableHttpActivityAsync(req);
// calculate timeout expiration if DurableHttpRequest.Timeout is set
if (req.Timeout != null)
{
req.TimeoutExpirationDateTime = this.InnerContext.CurrentUtcDateTime + req.Timeout.Value;
}

DurableHttpResponse durableHttpResponse;
try
{
durableHttpResponse = await this.ScheduleDurableHttpActivityAsync(req);
}
catch (TimeoutException)
{
throw;
}
bachuv marked this conversation as resolved.
Show resolved Hide resolved

HttpStatusCode currStatusCode = durableHttpResponse.StatusCode;

Expand All @@ -266,7 +280,18 @@ async Task<DurableHttpResponse> IDurableOrchestrationContext.CallHttpAsync(Durab
}

this.IncrementActionsOrThrowException();
await this.InnerContext.CreateTimer(fireAt, CancellationToken.None);

if (req.Timeout == null)
{
await this.InnerContext.CreateTimer(fireAt, CancellationToken.None);
}
else
{
TimeSpan timeLeft = req.TimeoutExpirationDateTime - this.InnerContext.CurrentUtcDateTime;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
bachuv marked this conversation as resolved.
Show resolved Hide resolved
cancellationTokenSource.CancelAfter(timeLeft);
await this.InnerContext.CreateTimer(fireAt, cancellationTokenSource.Token);
bachuv marked this conversation as resolved.
Show resolved Hide resolved
}

DurableHttpRequest durableAsyncHttpRequest = this.CreateLocationPollRequest(
req,
Expand Down
19 changes: 18 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableHttpRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ public class DurableHttpRequest
/// <param name="content">Content added to the body of the HTTP request.</param>
/// <param name="tokenSource">AAD authentication attached to the HTTP request.</param>
/// <param name="asynchronousPatternEnabled">Specifies whether the DurableHttpRequest should handle the asynchronous pattern.</param>
/// <param name="timeout">TimeSpan used for HTTP request timeout.</param>
public DurableHttpRequest(
HttpMethod method,
Uri uri,
IDictionary<string, StringValues> headers = null,
string content = null,
ITokenSource tokenSource = null,
bool asynchronousPatternEnabled = true)
bool asynchronousPatternEnabled = true,
TimeSpan? timeout = null)
{
this.Method = method;
this.Uri = uri;
this.Headers = HttpHeadersConverter.CreateCopy(headers);
this.Content = content;
this.TokenSource = tokenSource;
this.AsynchronousPatternEnabled = asynchronousPatternEnabled;
this.Timeout = timeout;
}

/// <summary>
Expand Down Expand Up @@ -82,6 +85,20 @@ public class DurableHttpRequest
[JsonProperty("asynchronousPatternEnabled")]
public bool AsynchronousPatternEnabled { get; }

/// <summary>
/// The total timeout for the original HTTP request and any
/// asynchronous polling.
/// </summary>
[JsonProperty("timeout")]
public TimeSpan? Timeout { get; }

/// <summary>
/// The timeout expiration DateTime used to calculate when
/// the timeout will expire.
/// </summary>
[JsonProperty("timeoutExpiration")]
internal DateTime TimeoutExpirationDateTime { get; set; }

private class HttpMethodConverter : JsonConverter
{
public override bool CanConvert(Type objectType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.Extensions.Primitives;
Expand Down Expand Up @@ -35,20 +36,59 @@ public override string Run(TaskContext context, string input)

public async override Task<string> RunAsync(TaskContext context, string rawInput)
{
HttpRequestMessage requestMessage = await this.ReconstructHttpRequestMessage(rawInput);
HttpResponseMessage response = await this.httpClient.SendAsync(requestMessage);
DurableHttpResponse durableHttpResponse = await DurableHttpResponse.CreateDurableHttpResponseWithHttpResponseMessage(response);
DurableHttpRequest durableHttpRequest = ReconstructDurableHttpRequest(rawInput);
HttpRequestMessage requestMessage = await this.ReconstructHttpRequestMessage(durableHttpRequest);

return JsonConvert.SerializeObject(durableHttpResponse);
CancellationTokenSource cts = new CancellationTokenSource();
bachuv marked this conversation as resolved.
Show resolved Hide resolved
if (durableHttpRequest.Timeout != null)
{
try
{
TimeSpan timeout = durableHttpRequest.TimeoutExpirationDateTime - DateTime.UtcNow;
this.httpClient.Timeout = timeout;
bachuv marked this conversation as resolved.
Show resolved Hide resolved
cts.CancelAfter(timeout);
}
catch (Exception)
bachuv marked this conversation as resolved.
Show resolved Hide resolved
{
TimeSpan timeout = durableHttpRequest.Timeout.Value;
this.httpClient.Timeout = timeout;
cts.CancelAfter(timeout);
}
bachuv marked this conversation as resolved.
Show resolved Hide resolved
}

try
{
HttpResponseMessage response;
if (durableHttpRequest.Timeout == null)
{
response = await this.httpClient.SendAsync(requestMessage);
}
else
{
response = await this.httpClient.SendAsync(requestMessage, cts.Token);
}

DurableHttpResponse durableHttpResponse = await DurableHttpResponse.CreateDurableHttpResponseWithHttpResponseMessage(response);

return JsonConvert.SerializeObject(durableHttpResponse);
}
catch (OperationCanceledException ex) // when (cts.Token.IsCancellationRequested)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this more accurate, it could be catch (OperationCanceledException ex) when (cts.Token.IsCancellationRequested) (added the commented out part). This way we know that the exception came when a cancel request was sent to CancellationTokenSource. The only issue with this is adding tests that mirror the action of cancelling a CancellationTokenSource. The Mock<HttpMessageHandler> Setup call takes ItExpr.IsAny<CancellationToken>() and the test would need the CancellationTokenSource to cancel the task, not the CancellationToken.

I saw that that ItExpr.IsAny() could evaluate conditions, but didn't see anything about sending in another object to manipulate. Let me know if there's a way to pass in a CancellationTokenSource using Moq.

This is how the HttpMessageHandler is mocked right now:

var handlerMock = new Mock<HttpMessageHandler>(MockBehavior.Strict);
handlerMock 
  .Protected()
  .Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
  .ReturnsAsync(httpResponseMessage);

{
throw new TimeoutException(ex.Message + $"Reached user specified timeout: {durableHttpRequest.Timeout.Value}.");
}
}

private async Task<HttpRequestMessage> ReconstructHttpRequestMessage(string serializedRequest)
private static DurableHttpRequest ReconstructDurableHttpRequest(string serializedRequest)
bachuv marked this conversation as resolved.
Show resolved Hide resolved
{
// DeserializeObject deserializes into a List and then the first element
// of that list is the DurableHttpRequest
IList<DurableHttpRequest> input = JsonConvert.DeserializeObject<IList<DurableHttpRequest>>(serializedRequest);
DurableHttpRequest durableHttpRequest = input.First();
return durableHttpRequest;
}

private async Task<HttpRequestMessage> ReconstructHttpRequestMessage(DurableHttpRequest durableHttpRequest)
bachuv marked this conversation as resolved.
Show resolved Hide resolved
{
string contentType = "";
HttpRequestMessage requestMessage = new HttpRequestMessage(durableHttpRequest.Method, durableHttpRequest.Uri);
if (durableHttpRequest.Headers != null)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 133 additions & 0 deletions test/Common/DurableHttpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
Expand Down Expand Up @@ -310,6 +311,86 @@ public async Task DurableHttpAsync_SynchronousAPI_Returns200(string storageProvi
}
}

/// <summary>
/// End-to-end test which checks if the CallHttpAsync Orchestrator returns an OK (200) status code
/// when a DurableHttpRequest timeout value is set.
/// </summary>
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
[MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
public async Task DurableHttpAsync_Synchronous_Timeout(string storageProvider)
{
HttpResponseMessage testHttpResponseMessage = CreateTestHttpResponseMessage(HttpStatusCode.OK);
HttpMessageHandler httpMessageHandler = MockSynchronousHttpMessageHandlerWithTimeout(testHttpResponseMessage, TimeSpan.FromMilliseconds(10000));

using (ITestHost host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableHttpAsync_Synchronous_Timeout),
enableExtendedSessions: false,
storageProviderType: storageProvider,
durableHttpMessageHandler: new DurableHttpMessageHandlerFactory(httpMessageHandler)))
{
await host.StartAsync();

Dictionary<string, string> headers = new Dictionary<string, string>();
headers.Add("Accept", "application/json");
TestDurableHttpRequest testRequest = new TestDurableHttpRequest(
httpMethod: HttpMethod.Get,
headers: headers,
timeout: TimeSpan.FromMilliseconds(5000));

string functionName = nameof(TestOrchestrations.CallHttpAsyncOrchestrator);
var client = await host.StartOrchestratorAsync(functionName, testRequest, this.output);
var status = await client.WaitForCompletionAsync(this.output, timeout: TimeSpan.FromSeconds(400));

var output = status?.Output;
DurableHttpResponse response = output.ToObject<DurableHttpResponse>();
Assert.Equal(HttpStatusCode.OK, response.StatusCode);

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which checks if the CallHttpAsync Orchestrator fails when the
/// HTTP request times out and the CallHttpAsync API throws a TimeoutException.
/// </summary>
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
[MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
public async Task DurableHttpAsync_Synchronous_TimeoutException(string storageProvider)
{
HttpResponseMessage testHttpResponseMessage = CreateTestHttpResponseMessage(HttpStatusCode.OK);
HttpMessageHandler httpMessageHandler = MockSynchronousHttpMessageHandlerWithTimeoutException();

using (ITestHost host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableHttpAsync_Synchronous_TimeoutException),
enableExtendedSessions: false,
storageProviderType: storageProvider,
durableHttpMessageHandler: new DurableHttpMessageHandlerFactory(httpMessageHandler)))
{
await host.StartAsync();

Dictionary<string, string> headers = new Dictionary<string, string>();
headers.Add("Accept", "application/json");
TestDurableHttpRequest testRequest = new TestDurableHttpRequest(
httpMethod: HttpMethod.Get,
headers: headers,
timeout: TimeSpan.FromMilliseconds(5000));

string functionName = nameof(TestOrchestrations.CallHttpAsyncOrchestrator);
var client = await host.StartOrchestratorAsync(functionName, testRequest, this.output);
var status = await client.WaitForCompletionAsync(this.output, timeout: TimeSpan.FromSeconds(400));

var output = status?.Output;
Assert.Contains("System.TimeoutException: The operation was canceled.Reached user specified timeout", output.ToString());
Assert.Equal(OrchestrationRuntimeStatus.Failed, status?.RuntimeStatus);
bachuv marked this conversation as resolved.
Show resolved Hide resolved

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which checks if the UserAgent header is set in the HttpResponseMessage.
/// </summary>
Expand Down Expand Up @@ -1423,6 +1504,37 @@ private static HttpMessageHandler MockSynchronousHttpMessageHandler(HttpResponse
return handlerMock.Object;
}

private static HttpMessageHandler MockSynchronousHttpMessageHandlerWithTimeout(HttpResponseMessage httpResponseMessage, TimeSpan timeoutTimespan)
{
var handlerMock = new Mock<HttpMessageHandler>(MockBehavior.Strict);
handlerMock
.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns(async () =>
{
await Task.Delay(timeoutTimespan);
return httpResponseMessage;
});

return handlerMock.Object;
}

private static HttpMessageHandler MockSynchronousHttpMessageHandlerWithTimeoutException()
{
HttpResponseMessage httpResponseMessage = CreateTestHttpResponseMessage(HttpStatusCode.OK);
CancellationTokenSource cts = new CancellationTokenSource();

httpResponseMessage.Content = new ExceptionThrowingContent(new OperationCanceledException());

var handlerMock = new Mock<HttpMessageHandler>(MockBehavior.Strict);
handlerMock
.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.ReturnsAsync(httpResponseMessage);

return handlerMock.Object;
}

private static HttpMessageHandler MockHttpMessageHandlerCheckUserAgent()
{
HttpResponseMessage okHttpResponseMessage = CreateTestHttpResponseMessage(HttpStatusCode.OK);
Expand Down Expand Up @@ -1612,5 +1724,26 @@ public ManagedIdentityOptions GetOptions()
return this.options;
}
}

private class ExceptionThrowingContent : HttpContent
{
private readonly Exception exception;

public ExceptionThrowingContent(Exception exception)
{
this.exception = exception;
}

protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
return Task.FromException(this.exception);
}

protected override bool TryComputeLength(out long length)
{
length = 0L;
return false;
}
}
}
}
Loading