Skip to content

Commit

Permalink
Fixing region fail over issue for direct tcp calls (#351)
Browse files Browse the repository at this point in the history
* Fixing reagion failover issue for direct tcp calls

* indentation

* adding test case and covering issue scenerio for writes

* Fixing lazy indexing tests and wrapping ex to cosmos exception for network test

* resolving comments

* indentation change

* resolving comment
  • Loading branch information
simplynaveen20 committed Aug 31, 2020
1 parent 0c73fb2 commit fbca799
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 26 deletions.
Expand Up @@ -224,6 +224,10 @@ public static <E extends DocumentClientException> void setRequestHeaders(Docume
documentClientException.setRequestHeaders(requestHeaders);
}

public static void setSubStatusCode(DocumentClientException documentClientException, int subStatusCode) {
documentClientException.setSubStatusCode(subStatusCode);
}

public static <E extends DocumentClientException> Map<String, String> getRequestHeaders(DocumentClientException documentClientException) {
return documentClientException.getRequestHeaders();
}
Expand Down
Expand Up @@ -28,9 +28,9 @@
import com.microsoft.azure.cosmosdb.internal.directconnectivity.Uri;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* This class defines a custom exception type for all operations on
Expand All @@ -51,8 +51,8 @@ public class DocumentClientException extends Exception {
private static final long serialVersionUID = 1L;

private final Map<String, String> requestHeaders;
private final Map<String, String> responseHeaders;
private final int statusCode;
private final Map<String, String> responseHeaders;

private volatile ClientSideRequestStatistics clientSideRequestStatistics;
private volatile Error error;
Expand Down Expand Up @@ -321,6 +321,10 @@ public String toString() {
'}';
}

void setSubStatusCode(int subStatusCode) {
this.responseHeaders.put(HttpConstants.HttpHeaders.SUB_STATUS, Integer.toString(subStatusCode));
}

String getInnerErrorMessage() {
String innerErrorMessage = super.getMessage();
if (error != null) {
Expand Down
Expand Up @@ -323,6 +323,9 @@ public static class SubStatusCodes {

// 404: LSN in session token is higher
public static final int READ_SESSION_NOT_AVAILABLE = 1002;

// Client generated gateway network error substatus
public static final int GATEWAY_ENDPOINT_UNAVAILABLE = 10001;
}

public static class HeaderValues {
Expand Down
Expand Up @@ -333,14 +333,37 @@ Single<List<Address>> getServerAddressesViaGatewayAsync(
Single<RxDocumentServiceResponse> dsrObs = responseObs.toSingle().flatMap(rsp ->
HttpClientUtils.parseResponseAsync(rsp));
return dsrObs.map(
dsr -> {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync deserializes result");
}
logAddressResolutionEnd(request, identifier);
List<Address> addresses = dsr.getQueryResponse(Address.class);
return addresses;
});
dsr -> {
if (logger.isDebugEnabled()) {
logger.debug("getServerAddressesViaGatewayAsync deserializes result");
}
logAddressResolutionEnd(request, identifier);
List<Address> addresses = dsr.getQueryResponse(Address.class);
return addresses;
}).onErrorResumeNext(throwable -> {
if (!(throwable instanceof Exception)) {
// fatal error
logger.error("Unexpected failure {}", throwable.getMessage(), throwable);
return Single.error(throwable);
}

Exception exception = (Exception) throwable;
DocumentClientException dce;
if (!(exception instanceof DocumentClientException)) {
// wrap in DocumentClientException
logger.error("Network failure", exception);
dce = new DocumentClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
} else {
dce = (DocumentClientException) exception;
}

if (WebExceptionUtility.isNetworkFailure(dce)) {
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
}

return Single.error(dce);
});
}

public void dispose() {
Expand Down
Expand Up @@ -49,6 +49,7 @@ public class ClientRetryPolicy implements IDocumentClientRetryPolicy {

final static int RetryIntervalInMS = 1000; //Once we detect failover wait for 1 second before retrying request.
final static int MaxRetryCount = 120;
private final static int MaxServiceUnavailableRetryCount = 1;

private final IDocumentClientRetryPolicy throttlingRetry;
private final ConnectionPoolExhaustedRetry rxNettyConnectionPoolExhaustedRetry;
Expand All @@ -63,6 +64,7 @@ public class ClientRetryPolicy implements IDocumentClientRetryPolicy {
private RetryContext retryContext;
private ClientSideRequestStatistics clientSideRequestStatistics;
private AtomicInteger cnt = new AtomicInteger(0);
private int serviceUnavailableRetryCount;

public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager,
boolean enableEndpointDiscovery,
Expand Down Expand Up @@ -125,11 +127,16 @@ public Single<ShouldRetryResult> shouldRetry(Exception e) {

// Received Connection error (HttpRequestException), initiate the endpoint rediscovery
if (WebExceptionUtility.isNetworkFailure(e)) {
if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
logger.warn("Endpoint not reachable. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
if (clientException != null && Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE)) {
if (this.isReadRequest || WebExceptionUtility.isWebExceptionRetriable(e)) {
logger.warn("Gateway endpoint not reachable. Will refresh cache and retry. ", e);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
} else {
return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false);
}
} else {
return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false);
logger.warn("Backend endpoint not reachable. ", e);
return this.shouldRetryOnBackendServiceUnavailableAsync(this.isReadRequest, WebExceptionUtility.isWebExceptionRetriable(e));
}
}

Expand Down Expand Up @@ -207,6 +214,36 @@ private Single<ShouldRetryResult> shouldNotRetryOnEndpointFailureAsync(boolean i
return refreshCompletion.andThen(Single.just(ShouldRetryResult.noRetry()));
}

private Single<ShouldRetryResult> shouldRetryOnBackendServiceUnavailableAsync(boolean isReadRequest, boolean isWebExceptionRetriable) {
if (!isReadRequest && !isWebExceptionRetriable) {
logger.warn("shouldRetryOnBackendServiceUnavailableAsync() Not retrying on write with non retriable exception. Retry count = {}", this.serviceUnavailableRetryCount);
return Single.just(ShouldRetryResult.noRetry());
}

if (this.serviceUnavailableRetryCount++ > MaxServiceUnavailableRetryCount) {
logger.warn("shouldRetryOnBackendServiceUnavailableAsync() Not retrying. Retry count = {}", this.serviceUnavailableRetryCount);
return Single.just(ShouldRetryResult.noRetry());
}

if (!this.canUseMultipleWriteLocations && !isReadRequest) {
// Write requests on single master cannot be retried, no other regions available
return Single.just(ShouldRetryResult.noRetry());
}

int availablePreferredLocations = this.globalEndpointManager.getPreferredLocationCount();
if (availablePreferredLocations <= 1) {
logger.warn("shouldRetryOnServiceUnavailable() Not retrying. No other regions available for the request. AvailablePreferredLocations = {}", availablePreferredLocations);
return Single.just(ShouldRetryResult.noRetry());
}

logger.warn("shouldRetryOnServiceUnavailable() Retrying. Received on endpoint {}, IsReadRequest = {}", this.locationEndpoint, isReadRequest);

// Retrying on second PreferredLocations
// RetryCount is used as zero-based index
this.retryContext = new RetryContext(this.serviceUnavailableRetryCount, true);
return Single.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}

private Completable refreshLocation(boolean isReadRequest, boolean forceRefresh) {
this.failoverRetryCount++;

Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.microsoft.azure.cosmosdb.DatabaseAccountManagerInternal;
import com.microsoft.azure.cosmosdb.internal.routing.LocationCache;
import com.microsoft.azure.cosmosdb.rx.internal.routing.LocationHelper;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
Expand All @@ -37,8 +38,6 @@
import rx.Single;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import org.apache.commons.collections4.list.UnmodifiableList;


import java.net.URISyntaxException;
import java.net.URL;
Expand All @@ -48,7 +47,6 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -151,6 +149,10 @@ public boolean CanUseMultipleWriteLocations(RxDocumentServiceRequest request) {
return this.locationCache.canUseMultipleWriteLocations(request);
}

public int getPreferredLocationCount() {
return this.connectionPolicy.getPreferredLocations() != null ? this.connectionPolicy.getPreferredLocations().size() : 0;
}

public void close() {
this.isClosed = true;
this.executor.shutdown();
Expand Down
Expand Up @@ -37,6 +37,7 @@
import com.microsoft.azure.cosmosdb.internal.UserAgentContainer;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.HttpUtils;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.StoreResponse;
import com.microsoft.azure.cosmosdb.internal.directconnectivity.WebExceptionUtility;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
Expand Down Expand Up @@ -401,15 +402,21 @@ private Observable<RxDocumentServiceResponse> toDocumentServiceResponse(Observab
}

Exception exception = (Exception) throwable;
DocumentClientException dce;
if (!(exception instanceof DocumentClientException)) {
// wrap in DocumentClientException
logger.error("Network failure", exception);
DocumentClientException dce = new DocumentClientException(0, exception);
dce = new DocumentClientException(0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
return Observable.error(dce);
} else {
dce = (DocumentClientException) exception;
}

return Observable.error(exception);
if (WebExceptionUtility.isNetworkFailure(dce)) {
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
}

return Observable.error(dce);
});
}
}
Expand Down

0 comments on commit fbca799

Please sign in to comment.