Skip to content

Commit

Permalink
Fixed offset and limit query clause returning partial query results w…
Browse files Browse the repository at this point in the history
…hen used with distinct (#37860)

* Fixed offset and limit query clause when used with distinct

* Added offset limit order by distinct tests
  • Loading branch information
kushagraThapar committed Dec 1, 2023
1 parent 8c6a63f commit 2b506b7
Show file tree
Hide file tree
Showing 10 changed files with 448 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.FeedResponseListValidator;
import com.azure.cosmos.implementation.FeedResponseValidator;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.query.OffsetContinuationToken;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
import io.reactivex.subscribers.TestSubscriber;
import org.testng.annotations.AfterClass;
Expand All @@ -39,11 +38,12 @@
public class OffsetLimitQueryTests extends TestSuiteBase {
private CosmosAsyncDatabase createdDatabase;
private CosmosAsyncContainer createdCollection;
private ArrayList<InternalObjectNode> docs = new ArrayList<>();
private final ArrayList<InternalObjectNode> docs = new ArrayList<>();

private String partitionKey = "mypk";
private int firstPk = 0;
private int secondPk = 1;
private int thirdPk = 2;
private String field = "field";

private CosmosAsyncClient client;
Expand Down Expand Up @@ -110,6 +110,44 @@ public void drainAllDocumentsUsingOffsetLimit() {
assertThat(finalResponse.getContinuationToken()).isNull();
}

@Test(groups = {"query"}, timeOut = TIMEOUT)
public void drainAllDistinctDocumentsUsingOffsetLimit() {
int skipCount = 0;
int takeCount = 300;
String query = "SELECT DISTINCT c.id from c OFFSET " + skipCount + " LIMIT " + takeCount;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
CosmosPagedFlux<InternalObjectNode> queryObservable;

int totalDocsObtained = 0;

queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
Iterator<FeedResponse<InternalObjectNode>> iterator = queryObservable.byPage(5).toIterable().iterator();
while (iterator.hasNext()) {
FeedResponse<InternalObjectNode> next = iterator.next();
totalDocsObtained += next.getResults().size();
}
assertThat(totalDocsObtained).isEqualTo(docs.size());
}

@Test(groups = {"query"}, timeOut = TIMEOUT)
public void drainAllDistinctDocumentsUsingOrderByAndOffsetLimit() {
int skipCount = 0;
int takeCount = 300;
String query = "SELECT DISTINCT c.id from c ORDER BY c.id OFFSET " + skipCount + " LIMIT " + takeCount;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
CosmosPagedFlux<InternalObjectNode> queryObservable;

int totalDocsObtained = 0;

queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
Iterator<FeedResponse<InternalObjectNode>> iterator = queryObservable.byPage(5).toIterable().iterator();
while (iterator.hasNext()) {
FeedResponse<InternalObjectNode> next = iterator.next();
totalDocsObtained += next.getResults().size();
}
assertThat(totalDocsObtained).isEqualTo(docs.size());
}

@Test(groups = {"query"}, timeOut = TIMEOUT)
public void offsetContinuationTokenRoundTrips() {
// Positive
Expand Down Expand Up @@ -167,6 +205,34 @@ public void queryDocumentsWithDistinct(Boolean qmEnabled) {
validateQuerySuccess(queryObservable.byPage(5), validator, TIMEOUT);
}

@Test(groups = {"query"}, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider")
public void queryDocumentsWithDistinctAndOrderBy(Boolean qmEnabled) {
int skipCount = 4;
int takeCount = 10;
String query =
String.format("SELECT DISTINCT c.id from c ORDER BY c.id OFFSET %s LIMIT %s", skipCount, takeCount);
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();

if (qmEnabled != null) {
options.setQueryMetricsEnabled(qmEnabled);
}

options.setMaxDegreeOfParallelism(2);
CosmosPagedFlux<InternalObjectNode> queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);

List<String> expectedIds =
docs.stream().skip(4).limit(10).map(doc -> doc.getResourceId()).collect(Collectors.toList());

FeedResponseListValidator<InternalObjectNode> validator =
new FeedResponseListValidator.Builder<InternalObjectNode>()
.containsExactly(expectedIds)
.numberOfPages(3)
.hasValidQueryMetrics(qmEnabled)
.build();

validateQuerySuccess(queryObservable.byPage(5), validator, TIMEOUT);
}

@Test(groups = {"query"}, timeOut = TIMEOUT, dataProvider = "queryMetricsArgProvider")
public void queryDocumentsWithAggregate(Boolean qmEnabled) {
int skipCount = 0;
Expand Down Expand Up @@ -259,6 +325,14 @@ public void generateTestData() {
BridgeInternal.setProperty(d, partitionKey, secondPk);
docs.add(d);
}

for (int i = 20; i < 100; i++) {
InternalObjectNode d = new InternalObjectNode();
d.setId(Integer.toString(i));
BridgeInternal.setProperty(d, field, i);
BridgeInternal.setProperty(d, partitionKey, thirdPk);
docs.add(d);
}
}

@AfterClass(groups = {"query"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.query.LimitContinuationToken;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
Expand All @@ -14,7 +15,7 @@
import com.azure.cosmos.implementation.FeedResponseListValidator;
import com.azure.cosmos.implementation.RetryAnalyzer;
import com.azure.cosmos.implementation.Utils.ValueHolder;
import com.azure.cosmos.implementation.query.TakeContinuationToken;
import com.azure.cosmos.implementation.query.TopContinuationToken;
import com.fasterxml.jackson.databind.JsonNode;
import io.reactivex.subscribers.TestSubscriber;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -112,29 +113,53 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception {
}

@Test(groups = { "query" }, timeOut = TIMEOUT)
public void topContinuationTokenRoundTrips() throws Exception {
public void limitContinuationTokenRoundTrips() throws Exception {
{
// Positive
TakeContinuationToken takeContinuationToken = new TakeContinuationToken(42, "asdf");
String serialized = takeContinuationToken.toString();
ValueHolder<TakeContinuationToken> outTakeContinuationToken = new ValueHolder<TakeContinuationToken>();
LimitContinuationToken limitContinuationToken = new LimitContinuationToken(42, "asdf");
String serialized = limitContinuationToken.toString();
ValueHolder<LimitContinuationToken> outLimitContinuationToken = new ValueHolder<LimitContinuationToken>();

assertThat(TakeContinuationToken.tryParse(serialized, outTakeContinuationToken)).isTrue();
TakeContinuationToken deserialized = outTakeContinuationToken.v;
assertThat(LimitContinuationToken.tryParse(serialized, outLimitContinuationToken)).isTrue();
LimitContinuationToken deserialized = outLimitContinuationToken.v;

assertThat(deserialized.getTakeCount()).isEqualTo(42);
assertThat(deserialized.getLimitCount()).isEqualTo(42);
assertThat(deserialized.getSourceToken()).isEqualTo("asdf");
}

{
// Negative
ValueHolder<TakeContinuationToken> outTakeContinuationToken = new ValueHolder<TakeContinuationToken>();
ValueHolder<LimitContinuationToken> outLimitContinuationToken = new ValueHolder<LimitContinuationToken>();
assertThat(
TakeContinuationToken.tryParse("{\"property\": \"Not a valid token\"}", outTakeContinuationToken))
LimitContinuationToken.tryParse("{\"property\": \"Not a valid token\"}", outLimitContinuationToken))
.isFalse();
}
}

@Test(groups = { "query" }, timeOut = TIMEOUT)
public void topContinuationTokenRoundTrips() throws Exception {
{
// Positive
TopContinuationToken topContinuationToken = new TopContinuationToken(42, "asdf");
String serialized = topContinuationToken.toString();
ValueHolder<TopContinuationToken> outTopContinuationToken = new ValueHolder<TopContinuationToken>();

assertThat(TopContinuationToken.tryParse(serialized, outTopContinuationToken)).isTrue();
TopContinuationToken deserialized = outTopContinuationToken.v;

assertThat(deserialized.getTopCount()).isEqualTo(42);
assertThat(deserialized.getSourceToken()).isEqualTo("asdf");
}

{
// Negative
ValueHolder<TopContinuationToken> outTopContinuationToken = new ValueHolder<TopContinuationToken>();
assertThat(
TopContinuationToken.tryParse("{\"property\": \"Not a valid token\"}", outTopContinuationToken))
.isFalse();
}
}

@Test(groups = { "query" }, timeOut = TIMEOUT * 10, retryAnalyzer = RetryAnalyzer.class)
public void queryDocumentsWithTopContinuationTokens() throws Exception {
String query = "SELECT TOP 8 * FROM c";
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Fixed an issue where client level `EndToEndOperationLatencyPolicyConfig.availabilityStrategy` is not being applied for `query` - See [PR 37511](https://github.com/Azure/azure-sdk-for-java/pull/37511)
* Fixed an issue where operation is not cancelled based on `CosmosEndToEndOperationLatencyPolicyConfig.endToEndOperationTimeout` when `429` happens - See [PR 37764](https://github.com/Azure/azure-sdk-for-java/pull/37764)
* Fixed an issue where `CosmosEndToEndOperationLatencyPolicyConfig.endToEndOperationTimeout` is not applied for `ReadMany` - See [PR 37764](https://github.com/Azure/azure-sdk-for-java/pull/37764)
* Fixed an issue with OFFSET and LIMIT query clause returning partial query results when used with DISTINCT - See [PR 37860](https://github.com/Azure/azure-sdk-for-java/pull/37860)

#### Other Changes
* Modified the event payload when diagnostic details are traced (vis Open telemetry traces). The diagnostics can exceed the max. attribute size of 8KB. This PR will split the diagnostics and trace them in multiple events (ordered by `SequenceNumber` attribute) to ensure the full diagnostics message is available in logged events. - See [PR 37376](https://github.com/Azure/azure-sdk-for-java/pull/37376)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,12 @@ private static boolean isClientSideContinuationToken(String continuationToken) {
return true;
}

ValueHolder<TakeContinuationToken> outTakeContinuationToken = new ValueHolder<>();
return TakeContinuationToken.tryParse(continuationToken, outTakeContinuationToken);
ValueHolder<TopContinuationToken> outTopContinuationToken = new ValueHolder<>();
if (TopContinuationToken.tryParse(continuationToken, outTopContinuationToken)) {
return true;
}
ValueHolder<LimitContinuationToken> outLimitContinuationToken = new ValueHolder<>();
return LimitContinuationToken.tryParse(continuationToken, outLimitContinuationToken);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class DistinctContinuationToken extends JsonSerializable {
private static final String LAST_HASH_PROPERTY_NAME = "lastHash";
private static final String SOURCE_TOKEN_PROPERTY_NAME = "sourceToken";

private static final Logger logger = LoggerFactory.getLogger(TakeContinuationToken.class);
private static final Logger logger = LoggerFactory.getLogger(DistinctContinuationToken.class);

public DistinctContinuationToken(UInt128 lastHash, String sourceToken) {
this.setLastHash(lastHash);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.Utils.ValueHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* While this class is public, but it is not part of our published public APIs.
* This is meant to be internally used only by our sdk.
*/
public final class LimitContinuationToken extends JsonSerializable {
private static final String LIMIT_PROPERTY_NAME = "limit";
private static final String SOURCE_TOKEN_PROPERTY_NAME = "sourceToken";
private static final Logger logger = LoggerFactory.getLogger(LimitContinuationToken.class);

public LimitContinuationToken(int limitCount, String sourceToken) {
if (limitCount < 0) {
throw new IllegalArgumentException("limitCount must be a non negative number.");
}

this.setLimitCount(limitCount);
this.setSourceToken(sourceToken);
}

private LimitContinuationToken(String serializedTakeContinuationToken) {
super(serializedTakeContinuationToken);
}

public static boolean tryParse(String serializedTakeContinuationToken, ValueHolder<LimitContinuationToken> outTakeContinuationToken) {
boolean parsed;
try {
LimitContinuationToken takeContinuationToken = new LimitContinuationToken(serializedTakeContinuationToken);
takeContinuationToken.getSourceToken();
takeContinuationToken.getLimitCount();
outTakeContinuationToken.v = takeContinuationToken;
parsed = true;
} catch (Exception ex) {
logger.debug(
"Received exception {} when trying to parse: {}",
ex.getMessage(),
serializedTakeContinuationToken);
parsed = false;
outTakeContinuationToken.v = null;
}

return parsed;
}

public int getLimitCount() {
return super.getInt(LIMIT_PROPERTY_NAME);
}

public String getSourceToken() {
return super.getString(SOURCE_TOKEN_PROPERTY_NAME);
}

private void setLimitCount(int limitCount) {
BridgeInternal.setProperty(this, LIMIT_PROPERTY_NAME, limitCount);
}

private void setSourceToken(String sourceToken) {
BridgeInternal.setProperty(this, SOURCE_TOKEN_PROPERTY_NAME, sourceToken);
}

@Override
public String toJson() {
return super.toJson();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,35 +95,28 @@ protected static <T> BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<ID
createSkipComponentFunction = createBaseComponent;
}

BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createTopComponentFunction;
if (queryInfo.hasTop()) {
createTopComponentFunction =
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createLimitComponentFunction;
if (queryInfo.hasLimit()) {
createLimitComponentFunction =
(continuationToken, documentQueryParams) ->
TopDocumentQueryExecutionContext.createAsync(createSkipComponentFunction,
queryInfo.getTop(),
queryInfo.getTop(),
continuationToken,
documentQueryParams);
TakeDocumentQueryExecutionContext.createAsync(createSkipComponentFunction,
queryInfo.getLimit(),
continuationToken,
documentQueryParams,
TakeDocumentQueryExecutionContext.TakeEnum.LIMIT);
} else {
createTopComponentFunction = createSkipComponentFunction;
createLimitComponentFunction = createSkipComponentFunction;
}

BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createTakeComponentFunction;
if (queryInfo.hasLimit()) {
return (continuationToken, documentQueryParams) -> {
int totalLimit = queryInfo.getLimit();
if (queryInfo.hasOffset()) {
// This is being done to match the limit from rewritten query
totalLimit = queryInfo.getOffset() + queryInfo.getLimit();
}
return TopDocumentQueryExecutionContext.createAsync(createTopComponentFunction,
queryInfo.getLimit(),
totalLimit,
continuationToken,
documentQueryParams);
};
if (queryInfo.hasTop()) {
return (continuationToken, documentQueryParams) ->
TakeDocumentQueryExecutionContext.createAsync(createLimitComponentFunction,
queryInfo.getTop(),
continuationToken,
documentQueryParams,
TakeDocumentQueryExecutionContext.TakeEnum.TOP);
} else {
return createTopComponentFunction;
return createLimitComponentFunction;
}
}

Expand Down

0 comments on commit 2b506b7

Please sign in to comment.