Skip to content

Commit

Permalink
add multi instance query support logics (#702)
Browse files Browse the repository at this point in the history
* add multi instance query support logics

* have return type as OrchestrationStatusQueryResult

* remove unnecessary files copied from durable extension

* made property in OrchestrationQueryResult read only

* fix parameter name - make ContinuationToken nullable - throw exception if orchestrationState is null

* make continuationToken null in constructor
  • Loading branch information
kaibocai committed Apr 13, 2022
1 parent dcc9ad7 commit 3641e3f
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 1 deletion.
38 changes: 37 additions & 1 deletion src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Expand Up @@ -30,6 +30,7 @@ namespace DurableTask.AzureStorage
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;

Expand All @@ -39,7 +40,8 @@ namespace DurableTask.AzureStorage
public sealed class AzureStorageOrchestrationService :
IOrchestrationService,
IOrchestrationServiceClient,
IDisposable
IDisposable,
IOrchestrationServiceQueryClient
{
static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0];

Expand Down Expand Up @@ -1890,6 +1892,40 @@ public void Dispose()
this.orchestrationSessionManager.Dispose();
}

/// <summary>
/// Gets the status of all orchestration instances with paging that match the specified conditions.
/// </summary>
public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
{
OrchestrationInstanceStatusQueryCondition convertedCondition = ToAzureStorageCondition(query);
DurableStatusQueryResult statusContext = await this.GetOrchestrationStateAsync(convertedCondition, query.PageSize, query.ContinuationToken, cancellationToken);
return ConvertFrom(statusContext);
}

private static OrchestrationInstanceStatusQueryCondition ToAzureStorageCondition(OrchestrationQuery condition)
{
return new OrchestrationInstanceStatusQueryCondition
{
RuntimeStatus = condition.RuntimeStatus,
CreatedTimeFrom = condition.CreatedTimeFrom ?? default(DateTime),
CreatedTimeTo = condition.CreatedTimeTo ?? default(DateTime),
TaskHubNames = condition.TaskHubNames,
InstanceIdPrefix = condition.InstanceIdPrefix,
FetchInput = condition.FetchInputsAndOutputs,
};
}

private static OrchestrationQueryResult ConvertFrom(DurableStatusQueryResult statusContext)
{
var results = new List<OrchestrationState>();
foreach (var state in statusContext.OrchestrationState)
{
results.Add(state);
}

return new OrchestrationQueryResult(results, statusContext.ContinuationToken);
}

class PendingMessageBatch
{
public string OrchestrationInstanceId { get; set; }
Expand Down
30 changes: 30 additions & 0 deletions src/DurableTask.Core/Query/IOrchestrationServiceQueryClient.cs
@@ -0,0 +1,30 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.Query
{
using System.Threading.Tasks;
using System.Threading;

/// <summary>
/// Interface to allow query multi-instance status with filter.
/// </summary>
public interface IOrchestrationServiceQueryClient
{
/// <summary>
/// Gets the status of all orchestration instances with paging that match the specified conditions.
/// </summary>
/// <param name="query">Return orchestration instances that match the specified query.</param>
/// <param name="cancellationToken">Cancellation token that can be used to cancel the query operation.</param>
/// <returns>Returns each page of orchestration status for all instances and continuation token of next page.</returns>
Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken);
}
}
73 changes: 73 additions & 0 deletions src/DurableTask.Core/Query/OrchestrationQuery.cs
@@ -0,0 +1,73 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.Core.Query
{
using System;
using System.Collections.Generic;

/// <summary>
/// Query condition for searching the status of orchestration instances.
/// </summary>
public class OrchestrationQuery
{
/// <summary>
/// Initializes a new instance of the <see cref="OrchestrationQuery"/> class.
/// </summary>
public OrchestrationQuery() { }

/// <summary>
/// Return orchestration instances which matches the runtimeStatus.
/// </summary>
public ICollection<OrchestrationStatus>? RuntimeStatus { get; set; }

/// <summary>
/// Return orchestration instances which were created after this DateTime.
/// </summary>
public DateTime? CreatedTimeFrom { get; set; }

/// <summary>
/// Return orchestration instances which were created before this DateTime.
/// </summary>
public DateTime? CreatedTimeTo { get; set; }

/// <summary>
/// Return orchestration instances which matches the TaskHubNames.
/// </summary>
public ICollection<string>? TaskHubNames { get; set; }

/// <summary>
/// Maximum number of records that can be returned by the request. The default value is 100.
/// </summary>
/// <remarks>
/// Requests may return fewer records than the specified page size, even if there are more records.
/// Always check the continuation token to determine whether there are more records.
/// </remarks>
public int PageSize { get; set; } = 100;

/// <summary>
/// ContinuationToken of the pager.
/// </summary>
public string? ContinuationToken { get; set; }

/// <summary>
/// Return orchestration instances that have this instance id prefix.
/// </summary>
public string? InstanceIdPrefix { get; set; }

/// <summary>
/// Determines whether the query will include the input of the orchestration.
/// </summary>
public bool FetchInputsAndOutputs { get; set; } = true;
}
}
46 changes: 46 additions & 0 deletions src/DurableTask.Core/Query/OrchestrationQueryResult.cs
@@ -0,0 +1,46 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.Core.Query
{
using System;
using System.Collections.Generic;

/// <summary>
/// The status of all orchestration instances with paging for a given query.
/// </summary>
public class OrchestrationQueryResult
{
/// <summary>
/// Constructor of OrchestrationQueryResult Class.
/// </summary>
/// <param name="orchestrationState">A collection of orchestration instance status values.</param>
/// <param name="continuationToken">A server-generated continuation token or <c>null</c> if there are no further continuations.</param>
public OrchestrationQueryResult(IReadOnlyCollection<OrchestrationState> orchestrationState, string? continuationToken)
{
this.OrchestrationState = orchestrationState ?? throw new ArgumentNullException(nameof(orchestrationState));
this.ContinuationToken = continuationToken;
}
/// <summary>
/// Gets a collection of statuses of orchestration instances matching the query description.
/// </summary>
/// <value>A collection of orchestration instance status values.</value>
public IReadOnlyCollection<OrchestrationState> OrchestrationState { get; }

/// <summary>
/// Gets token that can be used to resume the query with data not already returned by this query.
/// </summary>
/// <value>A server-generated continuation token or <c>null</c> if there are no further continuations.</value>
public string? ContinuationToken { get; }
}
}

0 comments on commit 3641e3f

Please sign in to comment.