Skip to content

Commit

Permalink
Added RestartAsync API to rerun existing orchestrator instances (#1545)
Browse files Browse the repository at this point in the history
  • Loading branch information
amdeel committed Nov 25, 2020
1 parent 51066ec commit e600de9
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,19 @@ Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestratorFun
return ((IDurableOrchestrationClient)this).StartNewAsync<T>(orchestratorFunctionName, string.Empty, input);
}

async Task<string> IDurableOrchestrationClient.RestartAsync(string instanceId, bool restartWithNewInstanceId)
{
DurableOrchestrationStatus status = await ((IDurableOrchestrationClient)this).GetStatusAsync(instanceId, showHistory: false, showHistoryOutput: false, showInput: true);

if (status == null)
{
throw new ArgumentException($"An orchestrastion with the instanceId {instanceId} was not found.");
}

return restartWithNewInstanceId ? await ((IDurableOrchestrationClient)this).StartNewAsync(orchestratorFunctionName: status.Name, status.Input)
: await ((IDurableOrchestrationClient)this).StartNewAsync(orchestratorFunctionName: status.Name, instanceId: status.InstanceId, status.Input);
}

/// <inheritdoc/>
Task IDurableEntityClient.SignalEntityAsync<TEntityInterface>(string entityKey, Action<TEntityInterface> operation)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,5 +288,15 @@ public interface IDurableOrchestrationClient
/// <param name="cancellationToken">Cancellation token that can be used to cancel the status query operation.</param>
/// <returns>Returns each page of orchestration status for all instances and continuation token of next page.</returns>
Task<OrchestrationStatusQueryResult> ListInstancesAsync(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken);

/// <summary>
/// Restarts an existing orchestrator with the original input.
/// </summary>
/// <param name="instanceId">InstanceId of a previously run orchestrator to restart.</param>
/// <param name="restartWithNewInstanceId">Optional parameter that configures if restarting an orchestration will use a new instanceId or if it will
/// reuse the old instanceId. Defauls to <c>true</c>.</param>
/// <returns>A task that completes when the orchestration is started. The task contains the instance id of the started
/// orchestratation instance.</returns>
Task<string> RestartAsync(string instanceId, bool restartWithNewInstanceId = true);
}
}
111 changes: 89 additions & 22 deletions src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal class HttpApiHandler : IDisposable
private const string RaiseEventOperation = "raiseEvent";
private const string TerminateOperation = "terminate";
private const string RewindOperation = "rewind";
private const string RestartOperation = "restart";
private const string ShowHistoryParameter = "showHistory";
private const string ShowHistoryOutputParameter = "showHistoryOutput";
private const string ShowInputParameter = "showInput";
Expand All @@ -52,6 +53,9 @@ internal class HttpApiHandler : IDisposable
private const string ReturnInternalServerErrorOnFailure = "returnInternalServerErrorOnFailure";
private const string LastOperationTimeFrom = "lastOperationTimeFrom";
private const string LastOperationTimeTo = "lastOperationTimeTo";
private const string RestartWithNewInstanceId = "restartWithNewInstanceId";
private const string TimeoutParameter = "timeout";
private const string PollingInterval = "pollingInterval";

private const string EmptyEntityKeySymbol = "$";

Expand Down Expand Up @@ -110,7 +114,8 @@ public void Dispose()
httpManagementPayload.StatusQueryGetUri,
httpManagementPayload.SendEventPostUri,
httpManagementPayload.TerminatePostUri,
httpManagementPayload.PurgeHistoryDeleteUri);
httpManagementPayload.PurgeHistoryDeleteUri,
httpManagementPayload.RestartUri);
}

// /orchestrators/{functionName}/{instanceId?}
Expand Down Expand Up @@ -155,13 +160,16 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()
internal HttpManagementPayload CreateHttpManagementPayload(
string instanceId,
string taskHub,
string connectionName)
string connectionName,
bool returnInternalServerErrorOnFailure = false,
bool restartWithNewInstanceId = true)
{
HttpManagementPayload httpManagementPayload = this.GetClientResponseLinks(
null,
instanceId,
taskHub,
connectionName);
connectionName,
returnInternalServerErrorOnFailure);
return httpManagementPayload;
}

Expand Down Expand Up @@ -239,7 +247,8 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()
httpManagementPayload.StatusQueryGetUri,
httpManagementPayload.SendEventPostUri,
httpManagementPayload.TerminatePostUri,
httpManagementPayload.PurgeHistoryDeleteUri);
httpManagementPayload.PurgeHistoryDeleteUri,
httpManagementPayload.RestartUri);
}
}
}
Expand Down Expand Up @@ -353,7 +362,7 @@ public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage req
return request.CreateResponse(HttpStatusCode.NotFound);
}
}
else
else if (request.Method == HttpMethod.Post)
{
if (string.Equals(operation, TerminateOperation, StringComparison.OrdinalIgnoreCase))
{
Expand All @@ -363,11 +372,15 @@ public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage req
{
return await this.HandleRewindInstanceRequestAsync(request, instanceId);
}
else
else if (string.Equals(operation, RestartOperation, StringComparison.OrdinalIgnoreCase))
{
return request.CreateResponse(HttpStatusCode.NotFound);
return await this.HandleRestartInstanceRequestAsync(request, instanceId);
}
}
else
{
return request.CreateResponse(HttpStatusCode.NotFound);
}
}

if (InstanceRaiseEventRoute.TryMatch(path, routeValues))
Expand Down Expand Up @@ -692,6 +705,18 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
return int.TryParse(value, out intValue);
}

private static bool TryGetTimeSpanQueryParameterValue(NameValueCollection queryStringNameValueCollection, string queryParameterName, out TimeSpan? timeSpanValue)
{
timeSpanValue = null;
string value = queryStringNameValueCollection[queryParameterName];
if (double.TryParse(value, out double doubleValue))
{
timeSpanValue = TimeSpan.FromSeconds(doubleValue);
}

return timeSpanValue != null;
}

private async Task<HttpResponseMessage> HandleTerminateInstanceRequestAsync(
HttpRequestMessage request,
string instanceId)
Expand Down Expand Up @@ -729,6 +754,8 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
{
IDurableOrchestrationClient client = this.GetClient(request);

var queryNameValuePairs = request.GetQueryNameValuePairs();

object input = null;
if (request.Content != null && request.Content.Headers?.ContentLength != 0)
{
Expand All @@ -738,8 +765,8 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString

string id = await client.StartNewAsync(functionName, instanceId, input);

TimeSpan? timeout = GetTimeSpan(request, "timeout");
TimeSpan? pollingInterval = GetTimeSpan(request, "pollingInterval");
TryGetTimeSpanQueryParameterValue(queryNameValuePairs, TimeoutParameter, out TimeSpan? timeout);
TryGetTimeSpanQueryParameterValue(queryNameValuePairs, PollingInterval, out TimeSpan? pollingInterval);

// for durability providers that support poll-free waiting, we override the specified polling interval
if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
Expand All @@ -749,7 +776,7 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString

if (timeout.HasValue && pollingInterval.HasValue)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout, pollingInterval);
}
else
{
Expand All @@ -762,6 +789,54 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
}
}

private async Task<HttpResponseMessage> HandleRestartInstanceRequestAsync(
HttpRequestMessage request,
string instanceId)
{
try
{
IDurableOrchestrationClient client = this.GetClient(request);

var queryNameValuePairs = request.GetQueryNameValuePairs();

string newInstanceId;
if (TryGetBooleanQueryParameterValue(queryNameValuePairs, RestartWithNewInstanceId, out bool restartWithNewInstanceId))
{
newInstanceId = await client.RestartAsync(instanceId, restartWithNewInstanceId);
}
else
{
newInstanceId = await client.RestartAsync(instanceId);
}

TryGetTimeSpanQueryParameterValue(queryNameValuePairs, TimeoutParameter, out TimeSpan? timeout);
TryGetTimeSpanQueryParameterValue(queryNameValuePairs, PollingInterval, out TimeSpan? pollingInterval);

// for durability providers that support poll-free waiting, we override the specified polling interval
if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
{
pollingInterval = timeout;
}

if (timeout.HasValue && pollingInterval.HasValue)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, newInstanceId, timeout, pollingInterval);
}
else
{
return client.CreateCheckStatusResponse(request, newInstanceId);
}
}
catch (ArgumentException e)
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "InstanceId does not match a valid orchestration instance.", e);
}
catch (JsonReaderException e)
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e);
}
}

private async Task<HttpResponseMessage> HandleRewindInstanceRequestAsync(
HttpRequestMessage request,
string instanceId)
Expand Down Expand Up @@ -1017,6 +1092,7 @@ internal HttpCreationPayload GetInstanceCreationLinks()
TerminatePostUri = instancePrefix + "/" + TerminateOperation + "?reason={text}&" + querySuffix,
RewindPostUri = instancePrefix + "/" + RewindOperation + "?reason={text}&" + querySuffix,
PurgeHistoryDeleteUri = instancePrefix + "?" + querySuffix,
RestartUri = instancePrefix + "/" + RestartOperation + "?" + querySuffix,
};

if (returnInternalServerErrorOnFailure)
Expand All @@ -1033,7 +1109,8 @@ internal HttpCreationPayload GetInstanceCreationLinks()
string statusQueryGetUri,
string sendEventPostUri,
string terminatePostUri,
string purgeHistoryDeleteUri)
string purgeHistoryDeleteUri,
string restartUri)
{
HttpResponseMessage response = request.CreateResponse(
HttpStatusCode.Accepted,
Expand All @@ -1044,6 +1121,7 @@ internal HttpCreationPayload GetInstanceCreationLinks()
sendEventPostUri,
terminatePostUri,
purgeHistoryDeleteUri,
restartUri,
});

// Implement the async HTTP 202 pattern.
Expand Down Expand Up @@ -1104,16 +1182,5 @@ internal async Task StopLocalHttpServerAsync()
}
}
#endif

private static TimeSpan? GetTimeSpan(HttpRequestMessage request, string queryParameterName)
{
string queryParameterStringValue = request.GetQueryNameValuePairs()[queryParameterName];
if (string.IsNullOrEmpty(queryParameterStringValue))
{
return null;
}

return TimeSpan.FromSeconds(double.Parse(queryParameterStringValue));
}
}
}
9 changes: 9 additions & 0 deletions src/WebJobs.Extensions.DurableTask/HttpManagementPayload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,14 @@ public class HttpManagementPayload
/// </value>
[JsonProperty("purgeHistoryDeleteUri")]
public string PurgeHistoryDeleteUri { get; internal set; }

/// <summary>
/// Gets the HTTP POST instance restart endpoint.
/// </summary>
/// <value>
/// The HTTP URL for restarting an orchestration instance.
/// </value>
[JsonProperty("restartUri")]
public string RestartUri { get; internal set; }
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e600de9

Please sign in to comment.