Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open connections through rntbd #28470

Merged
merged 22 commits into from
May 18, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ public <T> CosmosPagedFlux<T> queryItems(String query, Class<T> classType) {
*
* @return Mono of Void
*/
@Deprecated
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Mono<Void> openConnectionsAndInitCaches() {
int retryCount = Configs.getOpenConnectionsRetriesCount();
Expand Down Expand Up @@ -487,6 +488,43 @@ public Mono<Void> openConnectionsAndInitCaches() {
}
}


/***
* Initializes the container by warming up the caches and connections for the current read region.
*
* <p>
* <br>NOTE: This API ideally should be called only once during application initialization before any workload.
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
* <br>In case of any transient error, caller should consume the error and continue the regular workload.
* </p>
*
* @return A String representative of open connections result.
*/
@Beta(value = Beta.SinceVersion.V4_29_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public Mono<String> openConnectionsAndInitializeCaches() {
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved

if(isInitialized.compareAndSet(false, true)) {
return withContext(context -> openConnectionsAndInitCachesInternal());
} else {
String message =
String.format(
"openConnectionsAndInitializeCaches is already called once on Container %s, no operation will take place in this call",
this.getId());
logger.warn(message);
return Mono.just(message);
}
}

private Mono<String> openConnectionsAndInitCachesInternal() {
return this.database.getDocClientWrapper().openConnectionsAndInitCaches(getLink())
.collectList()
.flatMap(openConnectionResponses -> {
// Generate a simple statistics string for open connections
int total = openConnectionResponses.size();
long connectionsEstablished = openConnectionResponses.stream().filter(response -> response.isConnected()).count();
return Mono.just(String.format("Established: %s, Failed: %s", connectionsEstablished, total - connectionsEstablished));
});
}

/**
* Query for items in the current container using a string.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,27 @@ public void enableGlobalThroughputControlGroup(ThroughputControlGroupConfig grou
* </p>
*
*/
@Deprecated
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public void openConnectionsAndInitCaches() {
blockVoidResponse(this.asyncContainer.openConnectionsAndInitCaches());
}

/***
* Initializes the container by warming up the caches and connections for the current read region.
*
* <p>
* <br>NOTE: This API ideally should be called only once during application initialization before any workload.
* <br>In case of any transient error, caller should consume the error and continue the regular workload.
* </p>
*
* @return A String representative of open connections result.
*/
@Beta(value = Beta.SinceVersion.V4_29_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public String openConnectionsAndInitializeCaches() {
return blockOpenConnectionResponse(this.asyncContainer.openConnectionsAndInitializeCaches());
}

private void blockVoidResponse(Mono<Void> voidMono) {
try {
voidMono.block();
Expand All @@ -822,4 +838,17 @@ private void blockVoidResponse(Mono<Void> voidMono) {
}
}
}

private String blockOpenConnectionResponse(Mono<String> openConnectionMono) {
try {
return openConnectionMono.block();
} catch (Exception ex) {
final Throwable throwable = Exceptions.unwrap(ex);
if (throwable instanceof CosmosException) {
throw (CosmosException) throwable;
} else {
throw Exceptions.propagate(ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1617,4 +1617,11 @@ <T> Flux<FeedResponse<T>> readAllDocuments(
* @param group the throughput control group.
*/
void enableThroughputControlGroup(ThroughputControlGroupInternal group);

/***
* Warming up the caches and connections to all replicas of the container for the current read region.
*
* @param containerLink the container link.
*/
Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.directconnectivity.Uri;
import reactor.core.publisher.Flux;

import java.util.List;

public interface IOpenConnectionsHandler {
Flux<OpenConnectionResponse> openConnections(List<Uri> addresses);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.directconnectivity.Uri;

public class OpenConnectionResponse {
private final boolean connected;
private final Throwable exception;
private final Uri uri;

public OpenConnectionResponse(Uri uri, boolean connected) {
this(uri, connected, null);
}

public OpenConnectionResponse(Uri uri, boolean connected, Throwable exception) {
this.uri = uri;
this.connected = connected;
this.exception = exception;
}

public boolean isConnected() {
return connected;
}

public Throwable getException() {
return exception;
}

public Uri getUri() {
return uri;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.azure.cosmos.implementation.query.IDocumentQueryExecutionContext;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.implementation.query.PartitionedQueryExecutionInfo;
import com.azure.cosmos.implementation.query.PipelinedDocumentQueryExecutionContext;
import com.azure.cosmos.implementation.query.PipelinedQueryExecutionContextBase;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
Expand Down Expand Up @@ -4123,6 +4122,16 @@ public synchronized void enableThroughputControlGroup(ThroughputControlGroupInte
this.throughputControlStore.enableThroughputControlGroup(group);
}

@Override
public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink) {
checkArgument(StringUtils.isNotEmpty(containerLink), "Argument 'containerLink' should not be null nor empty");

DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
return ObservableHelper.fluxInlineIfPossibleAsObs(
() -> this.storeModel.openConnectionsAndInitCaches(containerLink),
retryPolicyInstance);
}

private static SqlQuerySpec createLogicalPartitionScanQuerySpec(
PartitionKey partitionKey,
String partitionKeySelector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ public void enableThroughputControl(ThroughputControlStore throughputControlStor
// Disable throughput control for gateway mode
}

@Override
public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink) {
return Flux.empty();
}

private void captureSessionToken(RxDocumentServiceRequest request, Map<String, String> responseHeaders) {
if (request.getResourceType() == ResourceType.DocumentCollection &&
request.getOperationType() == OperationType.Delete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.implementation.spark.OperationListener;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -50,4 +51,12 @@ default Mono<RxDocumentServiceResponse> processMessage(RxDocumentServiceRequest
* @param throughputControlStore
*/
void enableThroughputControl(ThroughputControlStore throughputControlStore);

/***
* Open connections and init caches.
*
* @param containerLink the container link.
* @return a flux of {@link OpenConnectionResponse}.
*/
Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.IOpenConnectionsHandler;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
Expand Down Expand Up @@ -89,6 +92,16 @@ public int updateAddresses(URI serverKey) {
throw new NotImplementedException("updateAddresses() is not supported in AddressResolver");
}

@Override
public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink) {
return Flux.empty();
}

@Override
public void setOpenConnectionsHandler(IOpenConnectionsHandler openConnectionHandler) {
throw new NotImplementedException("setOpenConnectionsHandler is not supported on AddressResolver");
}

private static boolean isSameCollection(PartitionKeyRange initiallyResolved, PartitionKeyRange newlyResolved) {
if (initiallyResolved == null) {
throw new IllegalArgumentException("parent");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.OpenConnectionResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
Expand Down Expand Up @@ -82,4 +84,8 @@ public Mono<List<AddressInformation>> resolveAddressesAsync(RxDocumentServiceReq
}
);
}

public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(String containerLink) {
return this.addressResolver.openConnectionsAndInitCaches(containerLink);
}
}
Loading