Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a bug resulting in high number of PartitionKeyRange ReadFeed requests when using bulk execution not using Spark connector. #37920

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@
*/
package com.azure.cosmos;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.RxGatewayStoreModel;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import com.azure.cosmos.models.CosmosContainerIdentity;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.rx.TestSuiteBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;
public class FeedRangeTest extends TestSuiteBase {
Expand Down Expand Up @@ -58,4 +73,116 @@ public void feedRange_RecreateContainerWithSameName() {
}
}
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void feedRange_withForceRefresh() {
String containerName = UUID.randomUUID().toString();
String databaseName = preExistingDatabaseId;
CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(containerName, "/PE_Name");
houseKeepingClient.getDatabase(databaseName).createContainerIfNotExists(cosmosContainerProperties);
try(CosmosAsyncClient clientUnderTest = cosmosClientBuilderUnderTest.buildAsyncClient()) {
RxDocumentClientImpl rxClient = (RxDocumentClientImpl)clientUnderTest.getContextClient();
RxGatewayStoreModel rxGatewayStoreModel = (RxGatewayStoreModel)ReflectionUtils.getGatewayProxy(rxClient);
DelegatingRxStoreModel pkRangeFeedTrackingGatewayStoreModel = new DelegatingRxStoreModel(rxGatewayStoreModel);
ReflectionUtils.setGatewayProxy(rxClient, pkRangeFeedTrackingGatewayStoreModel);
for (int i = 0; i < 10; i++) {
List<FeedRange> rsp =
clientUnderTest.getDatabase(databaseName).getContainer(containerName).getFeedRanges().block();
assertThat(rsp).isNotNull();
assertThat(rsp).hasSize(1);
}
assertThat(pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount()).isGreaterThanOrEqualTo(10);
}

houseKeepingClient.getDatabase(databaseName).getContainer(containerName).delete();
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void feedRange_noForceRefresh() {
String containerName = UUID.randomUUID().toString();
String databaseName = preExistingDatabaseId;
CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(containerName, "/PE_Name");
houseKeepingClient.getDatabase(databaseName).createContainerIfNotExists(cosmosContainerProperties);
try(CosmosAsyncClient clientUnderTest = cosmosClientBuilderUnderTest.buildAsyncClient()) {
RxDocumentClientImpl rxClient = (RxDocumentClientImpl)clientUnderTest.getContextClient();
RxGatewayStoreModel rxGatewayStoreModel = (RxGatewayStoreModel)ReflectionUtils.getGatewayProxy(rxClient);
DelegatingRxStoreModel pkRangeFeedTrackingGatewayStoreModel = new DelegatingRxStoreModel(rxGatewayStoreModel);
ReflectionUtils.setGatewayProxy(rxClient, pkRangeFeedTrackingGatewayStoreModel);
clientUnderTest.getDatabase(databaseName).getContainer(containerName).getFeedRanges().block();

long baselinePkRangeFeedCount = pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount();
logger.info("Baseline PKRangeFeedCount: {}", baselinePkRangeFeedCount);

for (int i = 0; i < 10; i++) {
List<FeedRange> rsp =
ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
.getCosmosAsyncContainerAccessor()
.getFeedRanges(
clientUnderTest.getDatabase(databaseName).getContainer(containerName),
false).block();
assertThat(rsp).isNotNull();
assertThat(rsp).hasSize(1);
}
assertThat(pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount())
.isEqualTo(baselinePkRangeFeedCount);
}

houseKeepingClient.getDatabase(databaseName).getContainer(containerName).delete();
}

static class DelegatingRxStoreModel extends RxGatewayStoreModel {

private final AtomicLong partitionKeyRangeFeedCounter= new AtomicLong(0);

private final RxGatewayStoreModel inner;

public DelegatingRxStoreModel(RxGatewayStoreModel inner) {
super(inner);

this.inner = inner;
}

@Override
public Mono<RxDocumentServiceResponse> processMessage(RxDocumentServiceRequest request) {

if (request.getResourceType() == ResourceType.PartitionKeyRange
&& request.getOperationType() == OperationType.ReadFeed) {
partitionKeyRangeFeedCounter.incrementAndGet();
}

return inner.processMessage(request);
}

@Override
public void enableThroughputControl(ThroughputControlStore throughputControlStore) {
inner.enableThroughputControl(throughputControlStore);
}

@Override
public Flux<Void> submitOpenConnectionTasksAndInitCaches(
CosmosContainerProactiveInitConfig proactiveContainerInitConfig) {

return inner.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig);
}

@Override
public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) {
inner.configureFaultInjectorProvider(injectorProvider, configs);
}

@Override
public void recordOpenConnectionsAndInitCachesCompleted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
inner.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities);
}

@Override
public void recordOpenConnectionsAndInitCachesStarted(List<CosmosContainerIdentity> cosmosContainerIdentities) {
inner.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities);
}

public long getPartitionKeyRangeFeedCount() {
return this.partitionKeyRangeFeedCounter.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void partitionedSessionToken(boolean isNameBased) throws NoSuchMethodExce

// Session token validation for feed ranges query
spyClient.clearCapturedRequests();
List<FeedRange> feedRanges = spyClient.getFeedRanges(getCollectionLink(isNameBased)).block();
List<FeedRange> feedRanges = spyClient.getFeedRanges(getCollectionLink(isNameBased), true).block();
queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setFeedRange(feedRanges.get(0));
dummyState = TestUtils.createDummyQueryFeedOperationState(
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed high number of PKRangeFeed calls when using BulkExecution without SparkConnector - See [PR 37920](https://github.com/Azure/azure-sdk-for-java/pull/37920)

#### Other Changes
* Changed to `DEBUG` log level in `WebExceptionRetryPolicy` for non-handled exception scenario and retry scenario - See [PR 37918](https://github.com/Azure/azure-sdk-for-java/pull/37918)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2539,7 +2539,11 @@ ItemDeserializer getItemDeserializer() {
* @return An unmodifiable list of {@link FeedRange}
*/
public Mono<List<FeedRange>> getFeedRanges() {
return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink());
return this.getFeedRanges(true);
}

Mono<List<FeedRange>> getFeedRanges(boolean forceRefresh) {
return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink(), forceRefresh);
}

/**
Expand Down Expand Up @@ -2759,6 +2763,11 @@ public <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInt
Class<T> classType) {
return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpecMono, cosmosQueryRequestOptions, classType);
}

@Override
public Mono<List<FeedRange>> getFeedRanges(CosmosAsyncContainer cosmosAsyncContainer, boolean forceRefresh) {
return cosmosAsyncContainer.getFeedRanges(forceRefresh);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,10 @@ <T> Flux<FeedResponse<T>> queryDocumentChangeFeedFromPagedFlux(
* Gets the feed ranges of a container.
*
* @param collectionLink the link to the parent document collection.
* @param forceRefresh a flag indicating whether to force a refresh
* @return a {@link List} of @{link FeedRange} containing the feed ranges of a container.
*/
Mono<List<FeedRange>> getFeedRanges(String collectionLink);
Mono<List<FeedRange>> getFeedRanges(String collectionLink, boolean forceRefresh);

/**
* Creates a stored procedure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.azure.cosmos.models.CosmosMetricName;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -931,6 +932,8 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
Mono<SqlQuerySpec> sqlQuerySpecMono,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
Class<T> classType);

Mono<List<FeedRange>> getFeedRanges(CosmosAsyncContainer cosmosAsyncContainer, boolean forceRefresh);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5207,7 +5207,7 @@ private static SqlQuerySpec createLogicalPartitionScanQuerySpec(
}

@Override
public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {
public Mono<List<FeedRange>> getFeedRanges(String collectionLink, boolean forceRefresh) {
InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(
this.collectionCache,
null,
Expand All @@ -5224,12 +5224,16 @@ public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {
invalidPartitionExceptionRetryPolicy.onBeforeSendRequest(request);

return ObservableHelper.inlineIfPossibleAsObs(
() -> getFeedRangesInternal(request, collectionLink),
() -> getFeedRangesInternal(request, collectionLink, forceRefresh),
invalidPartitionExceptionRetryPolicy);
}

private Mono<List<FeedRange>> getFeedRangesInternal(RxDocumentServiceRequest request, String collectionLink) {
logger.debug("getFeedRange collectionLink=[{}]", collectionLink);
private Mono<List<FeedRange>> getFeedRangesInternal(
RxDocumentServiceRequest request,
String collectionLink,
boolean forceRefresh) {

logger.debug("getFeedRange collectionLink=[{}] - forceRefresh={}", collectionLink, forceRefresh);

if (StringUtils.isEmpty(collectionLink)) {
throw new IllegalArgumentException("collectionLink");
Expand All @@ -5247,7 +5251,10 @@ private Mono<List<FeedRange>> getFeedRangesInternal(RxDocumentServiceRequest req
Mono<Utils.ValueHolder<List<PartitionKeyRange>>> valueHolderMono = partitionKeyRangeCache
.tryGetOverlappingRangesAsync(
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null);
collection.getResourceId(),
RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES,
forceRefresh,
null);

return valueHolderMono.map(partitionKeyRangeList -> toFeedRanges(partitionKeyRangeList, request));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ public RxGatewayStoreModel(
this.sessionContainer = sessionContainer;
}

public RxGatewayStoreModel(RxGatewayStoreModel inner) {
this.clientContext = inner.clientContext;
this.defaultHeaders = inner.defaultHeaders;
this.defaultConsistencyLevel = inner.defaultConsistencyLevel;
this.globalEndpointManager = inner.globalEndpointManager;
this.queryCompatibilityMode = inner.queryCompatibilityMode;

this.httpClient = inner.httpClient;
this.sessionContainer = inner.sessionContainer;
}

void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) {
this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeCore() {
.getMaxConcurrentCosmosPartitions(cosmosBulkExecutionOptions);
Mono<Integer> maxConcurrentCosmosPartitionsMono = nullableMaxConcurrentCosmosPartitions != null ?
Mono.just(Math.max(256, nullableMaxConcurrentCosmosPartitions)) :
this.container.getFeedRanges().map(ranges -> Math.max(256, ranges.size() * 2));
ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
.getCosmosAsyncContainerAccessor()
.getFeedRanges(this.container, false).map(ranges -> Math.max(256, ranges.size() * 2));

return
maxConcurrentCosmosPartitionsMono
Expand Down
Loading