Skip to content

Commit

Permalink
Add backoff to WorkflowExecutionUtils#getInstanceCloseEvent (temporal…
Browse files Browse the repository at this point in the history
…io#667)

Update some options of DefaultStubServiceOperationRpcRetryOptions to be close to Go SDK behavior
Issue temporalio#651
  • Loading branch information
Spikhalskiy committed Aug 26, 2021
1 parent c167ff9 commit 6d55180
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.rpcretry.DefaultStubLongPollRpcRetryOptions;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
Expand Down Expand Up @@ -98,17 +99,6 @@ public class WorkflowExecutionUtils {
*/
private static final String INDENTATION = " ";

private static final Logger log = LoggerFactory.getLogger(WorkflowExecutionUtils.class);

private static final RpcRetryOptions GET_INSTANCE_CLOSE_EVENT_RETRY_OPTIONS =
RpcRetryOptions.newBuilder()
.setBackoffCoefficient(1)
.setInitialInterval(Duration.ofMillis(1))
.setMaximumAttempts(Integer.MAX_VALUE)
.addDoNotRetry(Status.Code.INVALID_ARGUMENT, null)
.addDoNotRetry(Status.Code.NOT_FOUND, null)
.build();

/**
* Returns result of a workflow instance execution or throws an exception if workflow did not
* complete successfully.
Expand Down Expand Up @@ -240,7 +230,7 @@ public static HistoryEvent getInstanceCloseEvent(

if (millisRemaining > 0) {
RpcRetryOptions retryOptions =
RpcRetryOptions.newBuilder(GET_INSTANCE_CLOSE_EVENT_RETRY_OPTIONS)
DefaultStubLongPollRpcRetryOptions.getBuilder()
.setExpiration(Duration.ofMillis(millisRemaining))
.build();
response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.uber.m3.tally.Scope;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.internal.BackoffThrottler;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.internal.BackoffThrottler;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class GrpcRetryerUtils {
case DEADLINE_EXCEEDED:
return exception;
default:
for (RpcRetryOptions.DoNotRetryPair pair : options.getDoNotRetry()) {
for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) {
if (pair.getCode() == code
&& (pair.getDetailsClass() == null
|| StatusUtils.hasFailure(exception, pair.getDetailsClass()))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.google.common.base.Defaults;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Status;
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class RpcRetryOptions {

Expand All @@ -48,11 +51,23 @@ public static RpcRetryOptions getDefaultInstance() {
DEFAULT_INSTANCE = RpcRetryOptions.newBuilder().build();
}

public static class DoNotRetryPair {
public static class DoNotRetryItem {
private final Status.Code code;
private final Class<? extends GeneratedMessageV3> detailsClass;

private DoNotRetryPair(Status.Code code, Class<? extends GeneratedMessageV3> detailsClass) {
/**
* @param code errors with this code will be considered non retryable. {@link
* Status.Code#CANCELLED} and {@link Status.Code#DEADLINE_EXCEEDED} are always considered
* non-retryable.
* @param detailsClass If not null, only failures with the {@code code} and details of this
* {@code detailsClass} class are non retryable If null, all failures with the code are non
* retryable.
*/
public DoNotRetryItem(
@Nonnull Status.Code code, @Nullable Class<? extends GeneratedMessageV3> detailsClass) {
if (code == null) {
throw new NullPointerException("code");
}
this.code = code;
this.detailsClass = detailsClass;
}
Expand All @@ -78,7 +93,7 @@ public static final class Builder {

private Duration maximumInterval;

private List<DoNotRetryPair> doNotRetry = new ArrayList<>();
private List<DoNotRetryItem> doNotRetry = new ArrayList<>();

private Builder() {}

Expand Down Expand Up @@ -165,11 +180,23 @@ public Builder setMaximumInterval(Duration maximumInterval) {
*/
public Builder addDoNotRetry(
Status.Code code, Class<? extends GeneratedMessageV3> detailsClass) {
doNotRetry.add(new DoNotRetryPair(code, detailsClass));
doNotRetry.add(new DoNotRetryItem(code, detailsClass));
return this;
}

/**
* Add {@link DoNotRetryItem} to not retry. If <code>DoNotRetryItem#detailsClass</code> is null
* all failures with the code are non retryable.
*
* <p>{@link Status.Code#CANCELLED} and {@link Status.Code#DEADLINE_EXCEEDED} are always
* considered non-retryable.
*/
public Builder addDoNotRetry(DoNotRetryItem doNotRetryItem) {
doNotRetry.add(doNotRetryItem);
return this;
}

Builder setDoNotRetry(List<DoNotRetryPair> pairs) {
Builder setDoNotRetry(List<DoNotRetryItem> pairs) {
doNotRetry = pairs;
return this;
}
Expand All @@ -196,7 +223,7 @@ private static <G> G merge(G annotation, G options, Class<G> type) {
return annotation;
}

private List<DoNotRetryPair> merge(List<DoNotRetryPair> o1, List<DoNotRetryPair> o2) {
private List<DoNotRetryItem> merge(List<DoNotRetryItem> o1, List<DoNotRetryItem> o2) {
if (o2 != null) {
return new ArrayList<>(o2);
}
Expand Down Expand Up @@ -230,22 +257,19 @@ public RpcRetryOptions buildWithDefaultsFrom(RpcRetryOptions rpcRetryOptions) {
public RpcRetryOptions validateBuildWithDefaults() {
double backoff = backoffCoefficient;
if (backoff == 0d) {
backoff = DefaultServiceOperationRpcRetryOptions.BACKOFF;
backoff = DefaultStubServiceOperationRpcRetryOptions.BACKOFF;
}
if (initialInterval == null || initialInterval.isZero() || initialInterval.isNegative()) {
initialInterval =
DefaultServiceOperationRpcRetryOptions.INITIAL_INTERVAL;
initialInterval = DefaultStubServiceOperationRpcRetryOptions.INITIAL_INTERVAL;
}
if (expiration == null || expiration.isZero() || expiration.isNegative()) {
expiration =
DefaultServiceOperationRpcRetryOptions.EXPIRATION_INTERVAL;
expiration = DefaultStubServiceOperationRpcRetryOptions.EXPIRATION_INTERVAL;
}
if (maximumInterval == null || maximumInterval.isZero() || maximumInterval.isNegative()) {
maximumInterval =
DefaultServiceOperationRpcRetryOptions.MAXIMUM_INTERVAL;
maximumInterval = DefaultStubServiceOperationRpcRetryOptions.MAXIMUM_INTERVAL;
}
if (doNotRetry == null || doNotRetry.size() == 0) {
doNotRetry = DefaultServiceOperationRpcRetryOptions.INSTANCE.doNotRetry;
doNotRetry = DefaultStubServiceOperationRpcRetryOptions.INSTANCE.doNotRetry;
}
RpcRetryOptions result =
new RpcRetryOptions(
Expand All @@ -265,15 +289,15 @@ public RpcRetryOptions validateBuildWithDefaults() {

private final Duration maximumInterval;

private final List<DoNotRetryPair> doNotRetry;
private final List<DoNotRetryItem> doNotRetry;

private RpcRetryOptions(
Duration initialInterval,
double backoffCoefficient,
Duration expiration,
int maximumAttempts,
Duration maximumInterval,
List<DoNotRetryPair> doNotRetry) {
List<DoNotRetryItem> doNotRetry) {
this.initialInterval = initialInterval;
this.backoffCoefficient = backoffCoefficient;
this.expiration = expiration;
Expand Down Expand Up @@ -325,7 +349,7 @@ public void validate() {
}
}

public List<DoNotRetryPair> getDoNotRetry() {
public List<DoNotRetryItem> getDoNotRetry() {
return doNotRetry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.*;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -292,7 +293,11 @@ public Duration getRpcQueryTimeout() {
return rpcQueryTimeout;
}

/** @return Returns rpc retry options for outgoing requests to the temporal server. */
/**
* @return Returns rpc retry options for outgoing requests to the temporal server that supposed to
* be processed and returned fast, like start workflow (not long polls or awaits for workflow
* finishing).
*/
public RpcRetryOptions getRpcRetryOptions() {
return rpcRetryOptions;
}
Expand Down Expand Up @@ -356,7 +361,7 @@ public static class Builder {
private Duration rpcTimeout = DEFAULT_RPC_TIMEOUT;
private Duration rpcLongPollTimeout = DEFAULT_POLL_RPC_TIMEOUT;
private Duration rpcQueryTimeout = DEFAULT_QUERY_RPC_TIMEOUT;
private RpcRetryOptions rpcRetryOptions = DefaultServiceOperationRpcRetryOptions.INSTANCE;
private RpcRetryOptions rpcRetryOptions = DefaultStubServiceOperationRpcRetryOptions.INSTANCE;
private Duration connectionBackoffResetFrequency = DEFAULT_CONNECTION_BACKOFF_RESET_FREQUENCY;
private Duration grpcReconnectFrequency = DEFAULT_GRPC_RECONNECT_FREQUENCY;
private Metadata headers;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.serviceclient.rpcretry;

import io.temporal.serviceclient.RpcRetryOptions;
import java.time.Duration;

/** Default rpc retry options for long polls like waiting for the workflow finishing and result. */
public class DefaultStubLongPollRpcRetryOptions {
public static final Duration INITIAL_INTERVAL = Duration.ofMillis(1);
public static final Duration MAXIMUM_INTERVAL = Duration.ofMinutes(1);
public static final double BACKOFF = 1.2;

public static RpcRetryOptions.Builder getBuilder() {
RpcRetryOptions.Builder roBuilder =
RpcRetryOptions.newBuilder()
.setInitialInterval(INITIAL_INTERVAL)
.setBackoffCoefficient(BACKOFF)
.setMaximumInterval(MAXIMUM_INTERVAL);

DefaultStubServiceOperationRpcRetryOptions.TEMPORAL_SERVER_DEFAULT_NON_RETRY.forEach(
roBuilder::addDoNotRetry);

return roBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,38 @@
* permissions and limitations under the License.
*/

package io.temporal.serviceclient;
package io.temporal.serviceclient.rpcretry;

import io.grpc.Status;
import io.temporal.api.errordetails.v1.QueryFailedFailure;
import io.temporal.serviceclient.RpcRetryOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class DefaultServiceOperationRpcRetryOptions {
public static final Duration INITIAL_INTERVAL = Duration.ofMillis(20);
/**
* Default rpc retry options for outgoing requests to the temporal server that supposed to be
* processed and returned fast, like workflow start (not long polls or awaits for workflow
* finishing).
*/
public class DefaultStubServiceOperationRpcRetryOptions {
public static final Duration INITIAL_INTERVAL = Duration.ofMillis(50);
public static final Duration EXPIRATION_INTERVAL = Duration.ofMinutes(1);
public static final Duration MAXIMUM_INTERVAL;
public static final double BACKOFF = 1.2;
public static final double BACKOFF = 2;

public static final List<RpcRetryOptions.DoNotRetryItem> TEMPORAL_SERVER_DEFAULT_NON_RETRY =
new ArrayList<RpcRetryOptions.DoNotRetryItem>() {
{
// CANCELLED and DEADLINE_EXCEEDED are always considered non-retryable
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.INVALID_ARGUMENT, null));
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.NOT_FOUND, null));
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.ALREADY_EXISTS, null));
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.FAILED_PRECONDITION, null));
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.PERMISSION_DENIED, null));
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.UNAUTHENTICATED, null));
add(new RpcRetryOptions.DoNotRetryItem(Status.Code.UNIMPLEMENTED, null));
}
};

public static final RpcRetryOptions INSTANCE;

Expand All @@ -38,26 +59,17 @@ public class DefaultServiceOperationRpcRetryOptions {
}
MAXIMUM_INTERVAL = maxInterval;

INSTANCE = getDefaultServiceOperationRetryOptionsBuilder().validateBuildWithDefaults();
INSTANCE = getBuilder().validateBuildWithDefaults();
}

private static RpcRetryOptions.Builder getDefaultServiceOperationRetryOptionsBuilder() {
public static RpcRetryOptions.Builder getBuilder() {
RpcRetryOptions.Builder roBuilder =
RpcRetryOptions.newBuilder()
.setInitialInterval(INITIAL_INTERVAL)
.setExpiration(EXPIRATION_INTERVAL)
.setBackoffCoefficient(BACKOFF)
.setMaximumInterval(MAXIMUM_INTERVAL);
// CANCELLED and DEADLINE_EXCEEDED are always considered non-retryable
roBuilder
.addDoNotRetry(Status.Code.INVALID_ARGUMENT, null)
.addDoNotRetry(Status.Code.NOT_FOUND, null)
.addDoNotRetry(Status.Code.ALREADY_EXISTS, null)
.addDoNotRetry(Status.Code.FAILED_PRECONDITION, null)
.addDoNotRetry(Status.Code.PERMISSION_DENIED, null)
.addDoNotRetry(Status.Code.UNAUTHENTICATED, null)
.addDoNotRetry(Status.Code.UNIMPLEMENTED, null)
.addDoNotRetry(Status.Code.INTERNAL, QueryFailedFailure.class);
TEMPORAL_SERVER_DEFAULT_NON_RETRY.forEach(roBuilder::addDoNotRetry);
return roBuilder;
}
}

0 comments on commit 6d55180

Please sign in to comment.