Skip to content

Commit

Permalink
[Internal] Query: Adds ClientQL Compatibility Level support on client (
Browse files Browse the repository at this point in the history
…#4177)

* Added ability to accept the AllowOptimisticDirectExecution flag from the backend and use that flag to decide if the Ode pipeline should be used or not.

* Added comment and removed extra spacing

* Added test coverage

* Added exception handling logic

* Resolved comments

* Added null check for key parameter

* Removed changes to common test infra

* Removed all changes from QueryPartitionProviderTestInstance

* Remove changes pt2

* Removed the dictionary in QueryPartitionProvider and added a bool instead

* Updated GetClientDisableOptimisticDirectExecution()

* Fixed comments

* Revert QueryIterator.cs

* Undoing changes to settings.json

* Undoing changes to QueryIterator.cs

* Updated error message

* Made functions static

* Cast to bool instead of recasting in GetClientDisableOptimisticDirectExecution()

* Added clientQLCompatibilityLevel support on client

* Updated assert

* Updated client tests to use Data Contract serializer and deserializer

* Added support to obtain the distribution plan payload on the client

* Added binary support

* Removed CheckCompatibilityLevelFlagInQuerySpec()

* Renamed ParseElementsFromRestStream()

* Updated test response

* Switched to using binary json navigator

* Using local variable before type check

* Updated name to DistributionPlanSpec

* Added support to test multiple distribution plan samples

* Removed binary sample

* Updated binary test

* Added ignore flag

* Fixed merge conflicts 2.0

* Resolved comments

* Updated function name

* Removed an unnecessary assert

* Fixed comment

* Improved debug assert message

* Added checks to confirm that resourceType is document before getting distributionPlan

* Removed changes to resourceManagement.yml
  • Loading branch information
akotalwar committed Dec 14, 2023
1 parent 6258e24 commit ea269c9
Show file tree
Hide file tree
Showing 41 changed files with 5,327 additions and 94 deletions.
31 changes: 31 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/Core/DistributionPlanSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core
{
using System;

internal sealed class DistributionPlanSpec
{
public DistributionPlanSpec(string backendDistributionPlan, string clientDistributionPlan)
{
if (string.IsNullOrEmpty(backendDistributionPlan))
{
throw new ArgumentException("Backend distribution plan cannot be null or empty.");
}

if (string.IsNullOrEmpty(clientDistributionPlan))
{
throw new ArgumentException("Client distribution plan cannot be null or empty.");
}

this.BackendDistributionPlan = backendDistributionPlan;
this.ClientDistributionPlan = clientDistributionPlan;
}

public string BackendDistributionPlan { get; }

public string ClientDistributionPlan { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: default,
responseLengthInBytes: responseLengthBytes,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
state: default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: queryState);
Expand All @@ -169,6 +170,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: default,
responseLengthInBytes: default,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
state: default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private static class Expressions
activityId: string.Empty,
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
state: this.state));
Expand Down Expand Up @@ -153,6 +154,7 @@ private static class Expressions
activityId: string.IsNullOrEmpty(page.ActivityId) ? Guid.NewGuid().ToString() : page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: null));
Expand All @@ -173,6 +175,7 @@ private static class Expressions
activityId: page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: this.state));
Expand Down Expand Up @@ -235,6 +238,7 @@ private static class Expressions
activityId: string.IsNullOrEmpty(page.ActivityId) ? Guid.NewGuid().ToString() : page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: null));
Expand Down Expand Up @@ -278,6 +282,7 @@ private static class Expressions
activityId: page.ActivityId,
responseLengthInBytes: page.ResponseLengthInBytes,
cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage,
additionalHeaders: page.AdditionalHeaders,
state: InitializingQueryState));
Expand Down Expand Up @@ -426,6 +431,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
activityId: default,
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: currentEnumerator.Current.Result.Page.AdditionalHeaders,
state: this.state));
Expand Down Expand Up @@ -481,6 +487,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
activityId: default,
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: currentEnumerator?.Current.Result.Page.AdditionalHeaders,
state: this.state));
Expand Down Expand Up @@ -543,6 +550,7 @@ public ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: Guid.NewGuid().ToString(),
responseLengthInBytes: 0,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
state: null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
backendQueryPage.ActivityId,
backendQueryPage.ResponseLengthInBytes,
backendQueryPage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
backendQueryPage.DisallowContinuationTokenMessage,
backendQueryPage.AdditionalHeaders,
queryState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: default,
responseLengthInBytes: responseLengthBytes,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: additionalHeaders,
state: default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: queryState);
Expand All @@ -151,6 +152,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: default,
responseLengthInBytes: default,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
state: default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: state);
Expand All @@ -185,6 +186,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: ClientDistinctQueryPipelineStage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: ComputeDistinctQueryPipelineStage.UseTryGetContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: queryState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: default,
responseLengthInBytes: responseLengthInBytes,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: ClientGroupByQueryPipelineStage.ContinuationTokenNotSupportedWithGroupBy,
additionalHeaders: addtionalHeaders,
state: default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: null,
additionalHeaders: sourcePage.AdditionalHeaders,
state: state);
Expand Down Expand Up @@ -172,6 +173,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: default,
responseLengthInBytes: default,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: default,
state: state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ private OptimisticDirectExecutionQueryPipelineStage(TryCatch<IQueryPipelineStage
{
this.previousRequiresDistribution = false;
}

}

public delegate Task<TryCatch<IQueryPipelineStage>> FallbackQueryPipelineStageFactory(CosmosElement continuationToken);
Expand Down Expand Up @@ -165,6 +164,7 @@ private async Task<bool> SwitchToFallbackPipelineAsync(CosmosElement continuatio

private sealed class OptimisticDirectExecutionQueryPipelineImpl : IQueryPipelineStage
{
private const int ClientQLCompatibilityLevel = 1;
private readonly QueryPartitionRangePageAsyncEnumerator queryPartitionRangePageAsyncEnumerator;

private OptimisticDirectExecutionQueryPipelineImpl(
Expand Down Expand Up @@ -230,6 +230,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
backendQueryPage.ActivityId,
backendQueryPage.ResponseLengthInBytes,
backendQueryPage.CosmosQueryExecutionInfo,
backendQueryPage.DistributionPlanSpec,
disallowContinuationTokenMessage: null,
backendQueryPage.AdditionalHeaders,
queryState);
Expand Down Expand Up @@ -268,10 +269,15 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
return TryCatch<IQueryPipelineStage>.FromException(monadicExtractState.Exception);
}

SqlQuerySpec updatedSqlQuerySpec = new SqlQuerySpec(sqlQuerySpec.QueryText, sqlQuerySpec.Parameters)
{
ClientQLCompatibilityLevel = ClientQLCompatibilityLevel
};

FeedRangeState<QueryState> feedRangeState = monadicExtractState.Result;
QueryPartitionRangePageAsyncEnumerator partitionPageEnumerator = new QueryPartitionRangePageAsyncEnumerator(
documentContainer,
sqlQuerySpec,
updatedSqlQuerySpec,
feedRangeState,
partitionKey,
queryPaginationOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal sealed class QueryPage : Page<QueryState>
string activityId,
long responseLengthInBytes,
Lazy<CosmosQueryExecutionInfo> cosmosQueryExecutionInfo,
DistributionPlanSpec distributionPlanSpec,
string disallowContinuationTokenMessage,
IReadOnlyDictionary<string, string> additionalHeaders,
QueryState state)
Expand All @@ -35,6 +36,7 @@ internal sealed class QueryPage : Page<QueryState>
this.Documents = documents ?? throw new ArgumentNullException(nameof(documents));
this.ResponseLengthInBytes = responseLengthInBytes < 0 ? throw new ArgumentOutOfRangeException(nameof(responseLengthInBytes)) : responseLengthInBytes;
this.CosmosQueryExecutionInfo = cosmosQueryExecutionInfo;
this.DistributionPlanSpec = distributionPlanSpec;
this.DisallowContinuationTokenMessage = disallowContinuationTokenMessage;
}

Expand All @@ -44,6 +46,8 @@ internal sealed class QueryPage : Page<QueryState>

public Lazy<CosmosQueryExecutionInfo> CosmosQueryExecutionInfo { get; }

public DistributionPlanSpec DistributionPlanSpec { get; }

public string DisallowContinuationTokenMessage { get; }

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => QueryPage.BannedHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: Guid.Empty.ToString(),
responseLengthInBytes: this.cumulativeResponseLengthInBytes,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: this.cumulativeAdditionalHeaders,
state: default);
Expand Down Expand Up @@ -92,6 +93,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes + this.cumulativeResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: default);
Expand All @@ -117,6 +119,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes + this.cumulativeResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: sourcePage.State);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public override async ValueTask<bool> MoveNextAsync(ITrace trace)
activityId: sourcePage.ActivityId,
responseLengthInBytes: sourcePage.ResponseLengthInBytes,
cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo,
distributionPlanSpec: default,
disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage,
additionalHeaders: sourcePage.AdditionalHeaders,
state: queryState);
Expand Down

0 comments on commit ea269c9

Please sign in to comment.