-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add multi instance query support logics #702
Changes from 2 commits
7a454d9
99caa9c
aa58a8d
7c92b5e
2127a00
3076aec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -30,6 +30,7 @@ namespace DurableTask.AzureStorage | |||||
using DurableTask.Core; | ||||||
using DurableTask.Core.Exceptions; | ||||||
using DurableTask.Core.History; | ||||||
using DurableTask.Core.Query; | ||||||
using Microsoft.WindowsAzure.Storage; | ||||||
using Newtonsoft.Json; | ||||||
|
||||||
|
@@ -39,7 +40,8 @@ namespace DurableTask.AzureStorage | |||||
public sealed class AzureStorageOrchestrationService : | ||||||
IOrchestrationService, | ||||||
IOrchestrationServiceClient, | ||||||
IDisposable | ||||||
IDisposable, | ||||||
IOrchestrationServiceQueryClient | ||||||
{ | ||||||
static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0]; | ||||||
|
||||||
|
@@ -1890,6 +1892,47 @@ public void Dispose() | |||||
this.orchestrationSessionManager.Dispose(); | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Gets the status of all orchestration instances with paging that match the specified conditions. | ||||||
/// </summary> | ||||||
public async Task<OrchestrationStatusQueryResult> GetOrchestrationStateWithFiltersAsync(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken) | ||||||
{ | ||||||
OrchestrationInstanceStatusQueryCondition convertedCondition = this.ToAzureStorageCondition(condition); | ||||||
var statusContext = await this.GetOrchestrationStateAsync(convertedCondition, condition.PageSize, condition.ContinuationToken, cancellationToken); | ||||||
return this.ConvertFrom(statusContext); | ||||||
} | ||||||
|
||||||
private OrchestrationInstanceStatusQueryCondition ToAzureStorageCondition(OrchestrationStatusQueryCondition condition) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make this method
Suggested change
Doing so actually creates a slight performance improvement but more importantly helps the reader better understand the scope of the method. |
||||||
{ | ||||||
return new OrchestrationInstanceStatusQueryCondition | ||||||
{ | ||||||
RuntimeStatus = condition.RuntimeStatus?.Select( | ||||||
p => (OrchestrationStatus)Enum.Parse(typeof(OrchestrationStatus), p.ToString())), | ||||||
cgillum marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
CreatedTimeFrom = condition.CreatedTimeFrom ?? default(DateTime), | ||||||
CreatedTimeTo = condition.CreatedTimeTo ?? default(DateTime), | ||||||
TaskHubNames = condition.TaskHubNames, | ||||||
InstanceIdPrefix = condition.InstanceIdPrefix, | ||||||
FetchInput = condition.FetchInputsAndOutputs, | ||||||
}; | ||||||
} | ||||||
|
||||||
private OrchestrationStatusQueryResult ConvertFrom(DurableStatusQueryResult statusContext) | ||||||
{ | ||||||
var results = new List<DurableOrchestrationStatus>(); | ||||||
foreach (var state in statusContext.OrchestrationState) | ||||||
{ | ||||||
results.Add(QueryUtils.ConvertOrchestrationStateToStatus(state)); | ||||||
} | ||||||
|
||||||
var result = new OrchestrationStatusQueryResult | ||||||
{ | ||||||
DurableOrchestrationState = results, | ||||||
ContinuationToken = statusContext.ContinuationToken, | ||||||
}; | ||||||
|
||||||
cgillum marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return result; | ||||||
} | ||||||
|
||||||
class PendingMessageBatch | ||||||
{ | ||||||
public string OrchestrationInstanceId { get; set; } | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
|
||
using System; | ||
using Newtonsoft.Json.Linq; | ||
|
||
#nullable enable | ||
|
||
namespace DurableTask.Core.Query | ||
{ | ||
/// <summary> | ||
/// Represents the status of a durable orchestration instance. | ||
/// </summary> | ||
public class DurableOrchestrationStatus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this also copied from the Durable Functions extension project? You should instead be using the existing DurableTask.Core.OrchestrationState class. |
||
{ | ||
/// <summary> | ||
/// Gets the name of the queried orchestrator function. | ||
/// </summary> | ||
/// <value> | ||
/// The orchestrator function name. | ||
/// </value> | ||
public string? Name { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the ID of the queried orchestration instance. | ||
/// </summary> | ||
/// <remarks> | ||
/// The instance ID is generated and fixed when the orchestrator function is scheduled. It can be either | ||
/// auto-generated, in which case it is formatted as a GUID, or it can be user-specified with any format. | ||
/// </remarks> | ||
/// <value> | ||
/// The unique ID of the instance. | ||
/// </value> | ||
public string? InstanceId { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the time at which the orchestration instance was created. | ||
/// </summary> | ||
/// <value> | ||
/// The instance creation time in UTC. | ||
/// </value> | ||
public DateTime? CreatedTime { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the time at which the orchestration instance last updated its execution history. | ||
/// </summary> | ||
/// <value> | ||
/// The last-updated time in UTC. | ||
/// </value> | ||
public DateTime? LastUpdatedTime { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the input of the orchestrator function instance. | ||
/// </summary> | ||
/// <value> | ||
/// The input as either a <c>JToken</c> or <c>null</c> if no input was provided. | ||
/// </value> | ||
public JToken? Input { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the output of the queried orchestration instance. | ||
/// </summary> | ||
/// <value> | ||
/// The output as either a <c>JToken</c> object or <c>null</c> if it has not yet completed. | ||
/// </value> | ||
public JToken? Output { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the runtime status of the queried orchestration instance. | ||
/// </summary> | ||
/// <value> | ||
/// Expected values include `Running`, `Pending`, `Failed`, `Canceled`, `Terminated`, `Completed`. | ||
/// </value> | ||
public OrchestrationRuntimeStatus RuntimeStatus { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the custom status payload (if any) that was set by the orchestrator function. | ||
/// </summary> | ||
/// <value> | ||
/// The custom status as either a <c>JToken</c> object or <c>null</c> if no custom status has been set. | ||
/// </value> | ||
public JToken? CustomStatus { get; set; } | ||
|
||
/// <summary> | ||
/// Gets the execution history of the orchestration instance. | ||
/// </summary> | ||
/// <value> | ||
/// The output as a <c>JArray</c> object or <c>null</c>. | ||
/// </value> | ||
public JArray? History { get; set; } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you double-copied the license text? |
||
|
||
namespace DurableTask.Core.Query | ||
{ | ||
using System.Threading.Tasks; | ||
using System.Threading; | ||
|
||
/// <summary> | ||
/// Interface to allow query multi-instance status with filter. | ||
/// </summary> | ||
public interface IOrchestrationServiceQueryClient | ||
{ | ||
/// <summary> | ||
/// Gets the status of all orchestration instances with paging that match the specified conditions. | ||
/// </summary> | ||
/// <param name="condition">Return orchestration instances that match the specified conditions.</param> | ||
/// <param name="cancellationToken">Cancellation token that can be used to cancel the query operation.</param> | ||
/// <returns>Returns each page of orchestration status for all instances and continuation token of next page.</returns> | ||
Task<OrchestrationStatusQueryResult> GetOrchestrationStateWithFiltersAsync(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken); | ||
cgillum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
|
||
namespace DurableTask.Core.Query | ||
{ | ||
// Must be kept consistent with DurableTask.Core.OrchestrationStatus: | ||
// https://github.com/Azure/durabletask/blob/master/src/DurableTask.Core/OrchestrationStatus.cs | ||
|
||
/// <summary> | ||
/// Represents the possible runtime execution status values for an orchestration instance. | ||
/// </summary> | ||
public enum OrchestrationRuntimeStatus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this copied from the Durable Functions extension project? We should not be using this. We should be instead using the existing DurableTask.Core.OrchestrationStatus enum mentioned in the comment on line 17 since that already exists in this assembly. No need to create yet another copy of it. |
||
{ | ||
/// <summary> | ||
/// The status of the orchestration could not be determined. | ||
/// </summary> | ||
Unknown = -1, | ||
|
||
/// <summary> | ||
/// The orchestration is running (it may be actively running or waiting for input). | ||
/// </summary> | ||
Running = 0, | ||
|
||
/// <summary> | ||
/// The orchestration ran to completion. | ||
/// </summary> | ||
Completed = 1, | ||
|
||
/// <summary> | ||
/// The orchestration completed with ContinueAsNew as is in the process of restarting. | ||
/// </summary> | ||
ContinuedAsNew = 2, | ||
|
||
/// <summary> | ||
/// The orchestration failed with an error. | ||
/// </summary> | ||
Failed = 3, | ||
|
||
/// <summary> | ||
/// The orchestration was canceled. | ||
/// </summary> | ||
Canceled = 4, | ||
|
||
/// <summary> | ||
/// The orchestration was terminated via an API call. | ||
/// </summary> | ||
Terminated = 5, | ||
|
||
/// <summary> | ||
/// The orchestration was scheduled but has not yet started. | ||
/// </summary> | ||
Pending = 6, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,75 @@ | ||||||
// ---------------------------------------------------------------------------------- | ||||||
// Copyright Microsoft Corporation | ||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
// you may not use this file except in compliance with the License. | ||||||
// You may obtain a copy of the License at | ||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||
// Unless required by applicable law or agreed to in writing, software | ||||||
// distributed under the License is distributed on an "AS IS" BASIS, | ||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
// See the License for the specific language governing permissions and | ||||||
// limitations under the License. | ||||||
// ---------------------------------------------------------------------------------- | ||||||
|
||||||
cgillum marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
using System; | ||||||
using System.Collections.Generic; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The coding convention in this repo is to put the using statements inside the |
||||||
|
||||||
#nullable enable | ||||||
|
||||||
namespace DurableTask.Core.Query | ||||||
{ | ||||||
/// <summary> | ||||||
/// Query condition for searching the status of orchestration instances. | ||||||
/// </summary> | ||||||
public class OrchestrationStatusQueryCondition | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's rename this to |
||||||
{ | ||||||
/// <summary> | ||||||
/// Initializes a new instance of the <see cref="OrchestrationStatusQueryCondition"/> class. | ||||||
/// </summary> | ||||||
public OrchestrationStatusQueryCondition() { } | ||||||
|
||||||
/// <summary> | ||||||
/// Return orchestration instances which matches the runtimeStatus. | ||||||
/// </summary> | ||||||
public ICollection<OrchestrationRuntimeStatus>? RuntimeStatus { get; set; } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be using
Suggested change
|
||||||
|
||||||
/// <summary> | ||||||
/// Return orchestration instances which were created after this DateTime. | ||||||
/// </summary> | ||||||
public DateTime? CreatedTimeFrom { get; set; } | ||||||
|
||||||
/// <summary> | ||||||
/// Return orchestration instances which were created before this DateTime. | ||||||
/// </summary> | ||||||
public DateTime? CreatedTimeTo { get; set; } | ||||||
|
||||||
/// <summary> | ||||||
/// Return orchestration instances which matches the TaskHubNames. | ||||||
/// </summary> | ||||||
public ICollection<string>? TaskHubNames { get; set; } | ||||||
|
||||||
/// <summary> | ||||||
/// Maximum number of records that can be returned by the request. The default value is 100. | ||||||
/// </summary> | ||||||
/// <remarks> | ||||||
/// Requests may return fewer records than the specified page size, even if there are more records. | ||||||
/// Always check the continuation token to determine whether there are more records. | ||||||
/// </remarks> | ||||||
public int PageSize { get; set; } = 100; | ||||||
|
||||||
/// <summary> | ||||||
/// ContinuationToken of the pager. | ||||||
/// </summary> | ||||||
public string? ContinuationToken { get; set; } | ||||||
|
||||||
/// <summary> | ||||||
/// Return orchestration instances that have this instance id prefix. | ||||||
/// </summary> | ||||||
public string? InstanceIdPrefix { get; set; } | ||||||
|
||||||
/// <summary> | ||||||
/// Determines whether the query will include the input of the orchestration. | ||||||
/// </summary> | ||||||
public bool FetchInputsAndOutputs { get; set; } = true; | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,35 @@ | ||||||
// ---------------------------------------------------------------------------------- | ||||||
// Copyright Microsoft Corporation | ||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
// you may not use this file except in compliance with the License. | ||||||
// You may obtain a copy of the License at | ||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||
// Unless required by applicable law or agreed to in writing, software | ||||||
// distributed under the License is distributed on an "AS IS" BASIS, | ||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
// See the License for the specific language governing permissions and | ||||||
// limitations under the License. | ||||||
// ---------------------------------------------------------------------------------- | ||||||
|
||||||
using System.Collections.Generic; | ||||||
|
||||||
namespace DurableTask.Core.Query | ||||||
{ | ||||||
/// <summary> | ||||||
/// The status of all orchestration instances with paging for a given query. | ||||||
/// </summary> | ||||||
public class OrchestrationStatusQueryResult | ||||||
cgillum marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{ | ||||||
/// <summary> | ||||||
/// Gets or sets a collection of statuses of orchestration instances matching the query description. | ||||||
/// </summary> | ||||||
/// <value>A collection of orchestration instance status values.</value> | ||||||
public IEnumerable<DurableOrchestrationStatus> DurableOrchestrationState { get; set; } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be using
Suggested change
|
||||||
|
||||||
/// <summary> | ||||||
/// Gets or sets a token that can be used to resume the query with data not already returned by this query. | ||||||
/// </summary> | ||||||
/// <value>A server-generated continuation token or <c>null</c> if there are no further continuations.</value> | ||||||
public string ContinuationToken { get; set; } | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid using
var
unless the type is already mentioned in the right-hand expression (which it's not in this case).