Skip to content

Commit

Permalink
Fixed a bug resulting in high number of PartitionKeyRange ReadFeed re…
Browse files Browse the repository at this point in the history
…quests when using bulk execution not using Spark connector. (#37920)

* Fixing high number of PKRangeFeed calls when using BulkExecution without SparkConnector

* Adding unit test coverage

* Update CHANGELOG.md
  • Loading branch information
FabianMeiswinkel committed Dec 5, 2023
1 parent bbd2107 commit fc8e083
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@
*/
package com.azure.cosmos;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.RxGatewayStoreModel;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import com.azure.cosmos.models.CosmosContainerIdentity;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.rx.TestSuiteBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;
public class FeedRangeTest extends TestSuiteBase {
Expand Down Expand Up @@ -58,4 +73,116 @@ public void feedRange_RecreateContainerWithSameName() {
}
}
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void feedRange_withForceRefresh() {
String containerName = UUID.randomUUID().toString();
String databaseName = preExistingDatabaseId;
CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(containerName, "/PE_Name");
houseKeepingClient.getDatabase(databaseName).createContainerIfNotExists(cosmosContainerProperties);
try(CosmosAsyncClient clientUnderTest = cosmosClientBuilderUnderTest.buildAsyncClient()) {
RxDocumentClientImpl rxClient = (RxDocumentClientImpl)clientUnderTest.getContextClient();
RxGatewayStoreModel rxGatewayStoreModel = (RxGatewayStoreModel)ReflectionUtils.getGatewayProxy(rxClient);
DelegatingRxStoreModel pkRangeFeedTrackingGatewayStoreModel = new DelegatingRxStoreModel(rxGatewayStoreModel);
ReflectionUtils.setGatewayProxy(rxClient, pkRangeFeedTrackingGatewayStoreModel);
for (int i = 0; i < 10; i++) {
List<FeedRange> rsp =
clientUnderTest.getDatabase(databaseName).getContainer(containerName).getFeedRanges().block();
assertThat(rsp).isNotNull();
assertThat(rsp).hasSize(1);
}
assertThat(pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount()).isGreaterThanOrEqualTo(10);
}

houseKeepingClient.getDatabase(databaseName).getContainer(containerName).delete();
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void feedRange_noForceRefresh() {
String containerName = UUID.randomUUID().toString();
String databaseName = preExistingDatabaseId;
CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(containerName, "/PE_Name");
houseKeepingClient.getDatabase(databaseName).createContainerIfNotExists(cosmosContainerProperties);
try(CosmosAsyncClient clientUnderTest = cosmosClientBuilderUnderTest.buildAsyncClient()) {
RxDocumentClientImpl rxClient = (RxDocumentClientImpl)clientUnderTest.getContextClient();
RxGatewayStoreModel rxGatewayStoreModel = (RxGatewayStoreModel)ReflectionUtils.getGatewayProxy(rxClient);
DelegatingRxStoreModel pkRangeFeedTrackingGatewayStoreModel = new DelegatingRxStoreModel(rxGatewayStoreModel);
ReflectionUtils.setGatewayProxy(rxClient, pkRangeFeedTrackingGatewayStoreModel);
clientUnderTest.getDatabase(databaseName).getContainer(containerName).getFeedRanges().block();

long baselinePkRangeFeedCount = pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount();
logger.info("Baseline PKRangeFeedCount: {}", baselinePkRangeFeedCount);

for (int i = 0; i < 10; i++) {
List<FeedRange> rsp =
ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
.getCosmosAsyncContainerAccessor()
.getFeedRanges(
clientUnderTest.getDatabase(databaseName).getContainer(containerName),
false).block();
assertThat(rsp).isNotNull();
assertThat(rsp).hasSize(1);
}
assertThat(pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount())
.isEqualTo(baselinePkRangeFeedCount);
}

houseKeepingClient.getDatabase(databaseName).getContainer(containerName).delete();
}

static class DelegatingRxStoreModel extends RxGatewayStoreModel {

private final AtomicLong partitionKeyRangeFeedCounter= new AtomicLong(0);

private final RxGatewayStoreModel inner;

public DelegatingRxStoreModel(RxGatewayStoreModel inner) {
super(inner);

this.inner = inner;
}

@Override
public Mono<RxDocumentServiceResponse> processMessage(RxDocumentServiceRequest request) {

if (request.getResourceType() == ResourceType.PartitionKeyRange
&& request.getOperationType() == OperationType.ReadFeed) {
partitionKeyRangeFeedCounter.incrementAndGet();
}

return inner.processMessage(request);
}

@Override
public void enableThroughputControl(ThroughputControlStore throughputControlStore) {
inner.enableThroughputControl(throughputControlStore);
}

@Override
public Flux<Void> submitOpenConnectionTasksAndInitCaches(
CosmosContainerProactiveInitConfig proactiveContainerInitConfig) {

return inner.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig);
}

@Override
public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) {
inner.configureFaultInjectorProvider(injectorProvider, configs);
}

@Override
public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
inner.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities);
}

@Override
public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
inner.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities);
}

public long getPartitionKeyRangeFeedCount() {
return this.partitionKeyRangeFeedCounter.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void partitionedSessionToken(boolean isNameBased) throws NoSuchMethodExce

// Session token validation for feed ranges query
spyClient.clearCapturedRequests();
List<FeedRange> feedRanges = spyClient.getFeedRanges(getCollectionLink(isNameBased)).block();
List<FeedRange> feedRanges = spyClient.getFeedRanges(getCollectionLink(isNameBased), true).block();
queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setFeedRange(feedRanges.get(0));
dummyState = TestUtils.createDummyQueryFeedOperationState(
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed high number of PKRangeFeed calls when using BulkExecution without SparkConnector - See [PR 37920](https://github.com/Azure/azure-sdk-for-java/pull/37920)

#### Other Changes
* Changed to `DEBUG` log level in `WebExceptionRetryPolicy` for non-handled exception scenario and retry scenario - See [PR 37918](https://github.com/Azure/azure-sdk-for-java/pull/37918)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2539,7 +2539,11 @@ ItemDeserializer getItemDeserializer() {
* @return An unmodifiable list of {@link FeedRange}
*/
public Mono<List<FeedRange>> getFeedRanges() {
return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink());
return this.getFeedRanges(true);
}

Mono<List<FeedRange>> getFeedRanges(boolean forceRefresh) {
return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink(), forceRefresh);
}

/**
Expand Down Expand Up @@ -2759,6 +2763,11 @@ public <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInt
Class<T> classType) {
return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpecMono, cosmosQueryRequestOptions, classType);
}

@Override
public Mono<List<FeedRange>> getFeedRanges(CosmosAsyncContainer cosmosAsyncContainer, boolean forceRefresh) {
return cosmosAsyncContainer.getFeedRanges(forceRefresh);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,10 @@ <T> Flux<FeedResponse<T>> queryDocumentChangeFeedFromPagedFlux(
* Gets the feed ranges of a container.
*
* @param collectionLink the link to the parent document collection.
* @param forceRefresh a flag indicating whether to force a refresh
* @return a {@link List} of @{link FeedRange} containing the feed ranges of a container.
*/
Mono<List<FeedRange>> getFeedRanges(String collectionLink);
Mono<List<FeedRange>> getFeedRanges(String collectionLink, boolean forceRefresh);

/**
* Creates a stored procedure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.azure.cosmos.models.CosmosMetricName;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -931,6 +932,8 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
Mono<SqlQuerySpec> sqlQuerySpecMono,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
Class<T> classType);

Mono<List<FeedRange>> getFeedRanges(CosmosAsyncContainer cosmosAsyncContainer, boolean forceRefresh);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5207,7 +5207,7 @@ private static SqlQuerySpec createLogicalPartitionScanQuerySpec(
}

@Override
public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {
public Mono<List<FeedRange>> getFeedRanges(String collectionLink, boolean forceRefresh) {
InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
this.collectionCache,
null,
Expand All @@ -5224,12 +5224,16 @@ public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {
invalidPartitionExceptionRetryPolicy.onBeforeSendRequest(request);

return ObservableHelper.inlineIfPossibleAsObs(
() -> getFeedRangesInternal(request, collectionLink),
() -> getFeedRangesInternal(request, collectionLink, forceRefresh),
invalidPartitionExceptionRetryPolicy);
}

private Mono<List<FeedRange>> getFeedRangesInternal(RxDocumentServiceRequest request, String collectionLink) {
logger.debug("getFeedRange collectionLink=[{}]", collectionLink);
private Mono<List<FeedRange>> getFeedRangesInternal(
RxDocumentServiceRequest request,
String collectionLink,
boolean forceRefresh) {

logger.debug("getFeedRange collectionLink=[{}] - forceRefresh={}", collectionLink, forceRefresh);

if (StringUtils.isEmpty(collectionLink)) {
throw new IllegalArgumentException("collectionLink");
Expand All @@ -5247,7 +5251,10 @@ private Mono<List<FeedRange>> getFeedRangesInternal(RxDocumentServiceRequest req
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> valueHolderMono = partitionKeyRangeCache
.tryGetOverlappingRangesAsync(
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null);
collection.getResourceId(),
RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES,
forceRefresh,
null);

return valueHolderMono.map(partitionKeyRangeList -> toFeedRanges(partitionKeyRangeList, request));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ public RxGatewayStoreModel(
this.sessionContainer = sessionContainer;
}

public RxGatewayStoreModel(RxGatewayStoreModel inner) {
this.clientContext = inner.clientContext;
this.defaultHeaders = inner.defaultHeaders;
this.defaultConsistencyLevel = inner.defaultConsistencyLevel;
this.globalEndpointManager = inner.globalEndpointManager;
this.queryCompatibilityMode = inner.queryCompatibilityMode;

this.httpClient = inner.httpClient;
this.sessionContainer = inner.sessionContainer;
}

void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) {
this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeCore() {
.getMaxConcurrentCosmosPartitions(cosmosBulkExecutionOptions);
Mono<Integer> maxConcurrentCosmosPartitionsMono = nullableMaxConcurrentCosmosPartitions != null ?
Mono.just(Math.max(256, nullableMaxConcurrentCosmosPartitions)) :
this.container.getFeedRanges().map(ranges -> Math.max(256, ranges.size() * 2));
ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
.getCosmosAsyncContainerAccessor()
.getFeedRanges(this.container, false).map(ranges -> Math.max(256, ranges.size() * 2));

return
maxConcurrentCosmosPartitionsMono
Expand Down

0 comments on commit fc8e083

Please sign in to comment.